Compare commits

...

3 Commits

Author SHA1 Message Date
d4b28b8e9f Index vacancies using batches
All checks were successful
release / docker (push) Successful in 37s
2025-11-02 22:29:18 +03:00
9da00d5d1d Remove redundant services from docker compose 2025-11-02 22:16:53 +03:00
f8ca003942 Improve clickhouse query for vacancies 2025-11-02 22:16:53 +03:00
2 changed files with 33 additions and 43 deletions

View File

@ -1,9 +1,4 @@
services:
ofelia:
image: mcuadros/ofelia:latest
command: daemon --docker -f label=com.docker.compose.project=${COMPOSE_PROJECT_NAME}
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro
qdrant:
image: qdrant/qdrant:latest
restart: always
@ -21,22 +16,3 @@ services:
- "127.0.0.1:5432:5432"
volumes:
- "/srv/vision-career/postgres:/var/lib/postgresql/data"
bot:
image: vision-career:latest
build: .
command: [".venv/bin/python", "manage.py", "runbot"]
restart: always
init: true
network_mode: host
env_file:
- .env
labels:
ofelia.enabled: "true"
ofelia.job-exec.collect-vacancies-from-telegram-messages.schedule: "@every 1m"
ofelia.job-exec.collect-vacancies-from-telegram-messages.command: ".venv/bin/python manage.py collect_vacancies_from_telegram_messages"
ofelia.job-exec.generate-recommended-vacancies.schedule: "@every 1m"
ofelia.job-exec.generate-recommended-vacancies.command: ".venv/bin/python manage.py generate_recommended_vacancies"
develop:
watch:
- action: rebuild
path: .

View File

@ -1,3 +1,7 @@
import traceback
from itertools import batched
from concurrent.futures import ThreadPoolExecutor
from django.core.management import BaseCommand
import clickhouse_connect
from vacancies.main.vector_store import add_vectors, extract_features, qdrant_client
@ -11,13 +15,16 @@ FROM telegram_parser_chatmessage
WHERE timestamp >= now() - INTERVAL 30 DAY
AND length(message) > 150
AND arrayCount(x -> position(message, x) > 0, [
'вакансия', 'ищем', 'требуется', 'разработчик', 'будет плюсом',
'зарплата', 'оклад', 'з/п', 'руб', 'опыт работы',
'требования', 'обязанности', 'условия', 'компания', 'офис',
'удаленно', 'гибкий график', 'полный день', 'частичная занятость',
'резюме', 'собеседование', 'junior', 'middle', 'senior'
]) >= 5 AND position(message, 'О себе') = 0 AND position(message, 'Обо мне') = 0
AND position(message, '#ищу') = 0 AND position(message, 'умею') = 0
'ваканси', 'ищем', 'требуется', 'разработчик', 'будет плюсом',
'зарплат', 'оклад', 'з/п', 'руб', 'опыт',
'требовани', 'обязанности', 'условия', 'офис',
'удаленн', 'гибкий график', 'полный день', 'занятост',
'резюме', 'собеседовани', 'junior', 'middle', 'senior', 'ждем', 'компани',
'заниматься', 'формат', 'занятость', 'вилка', 'должност', 'контакт'
]) >= 5
AND arrayCount(x -> position(lower(message), x) > 0, [
'о себе', 'обо мне', 'умею', '#ищу'
]) = 0
AND id NOT IN %(exist_points)s
"""
@ -42,17 +49,24 @@ class Command(BaseCommand):
exist_points_set = tuple(set(exist_points_ids))
result_rows = clickhouse_client.query(query, parameters={"exist_points": exist_points_set}).result_rows
result_rows_len = len(result_rows)
for index, row in enumerate(result_rows):
(id, chat_username, telegram_id, message, timestamp) = row
batch_size = 10
with ThreadPoolExecutor(max_workers=batch_size) as pool:
pool.map(self._process_batch, batched(result_rows, batch_size))
link = f"https://t.me/{chat_username}/{telegram_id}"
print(f"Processing {index+1}/{result_rows_len} link: {link}")
features = extract_features(message)
def _process_batch(self, result_rows):
try:
for index, row in enumerate(result_rows):
(id, chat_username, telegram_id, message, timestamp) = row
add_vectors(
"vacancies",
id,
features.model_dump(),
{'content': message, 'features_json': features.model_dump(), "link": link, "timestamp": timestamp},
)
link = f"https://t.me/{chat_username}/{telegram_id}"
print(f"Processing {index+1}/{len(result_rows)} link: {link}")
features = extract_features(message)
add_vectors(
"vacancies",
id,
features.model_dump(),
{'content': message, 'features_json': features.model_dump(), "link": link, "timestamp": timestamp},
)
except Exception as exc:
traceback.print_exception(exc)