vision-career/vacancies/main/management/commands/collect_vacancies_from_telegram_messages.py
estromenko d0131ab2a1
All checks were successful
release / docker (push) Successful in 36s
Improve vacancies indexing and exclude more cvs from vacancy list
2025-11-03 18:09:11 +03:00

73 lines
3.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import traceback
from itertools import batched
from concurrent.futures import ThreadPoolExecutor
from django.core.management import BaseCommand
from django.conf import settings
import clickhouse_connect
from vacancies.main.vector_store import add_vectors, extract_features, qdrant_client
from vacancies.conf.settings import CLICKHOUSE_HOST, CLICKHOUSE_PORT
clickhouse_client = clickhouse_connect.create_client(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT)
query = """
SELECT id, chat_username, telegram_id, message, timestamp
FROM telegram_parser_chatmessage
WHERE timestamp >= now() - INTERVAL 30 DAY
AND length(message) > 150
AND arrayCount(x -> position(message, x) > 0, [
'ваканси', 'ищем', 'требуется', 'разработчик', 'будет плюсом',
'зарплат', 'оклад', 'з/п', 'руб', 'опыт',
'требовани', 'обязанности', 'условия', 'офис',
'удаленн', 'гибкий график', 'полный день', 'занятост',
'резюме', 'собеседовани', 'junior', 'middle', 'senior', 'ждем', 'компани',
'заниматься', 'формат', 'занятость', 'вилка', 'должност', 'контакт'
]) >= 5
AND arrayCount(x -> position(lower(message), x) > 0, [
'о себе', 'обо мне', 'умею', '#ищу', '#резюме', 'университет', 'колледж'
]) = 0
AND id NOT IN %(exist_points)s
"""
class Command(BaseCommand):
help = "Collect vacancies from telegram messages"
def handle(self, *args, **options):
next_page_offset = 0
exist_points_ids = [-1]
while next_page_offset is not None:
response = qdrant_client.scroll(
collection_name="vacancies",
limit=100_000,
offset=next_page_offset,
with_payload=False,
with_vectors=False,
timeout=30,
)
exist_points_ids.extend([point.id for point in response[0]])
next_page_offset = response[1]
exist_points_set = tuple(set(exist_points_ids))
result_rows = clickhouse_client.query(query, parameters={"exist_points": exist_points_set}).result_rows
with ThreadPoolExecutor(max_workers=settings.COLLECT_VACANCIES_BATCH_SIZE) as pool:
pool.map(self._process_batch, batched(result_rows, settings.COLLECT_VACANCIES_BATCH_SIZE))
def _process_batch(self, result_rows):
try:
for index, row in enumerate(result_rows):
(id, chat_username, telegram_id, message, timestamp) = row
link = f"https://t.me/{chat_username}/{telegram_id}"
print(f"Processing {index+1}/{len(result_rows)} link: {link}")
features = extract_features(message)
add_vectors(
"vacancies",
id,
features.model_dump(),
{'content': message, 'features_json': features.model_dump(), "link": link, "timestamp": timestamp},
)
except Exception as exc:
traceback.print_exception(exc)