import traceback from itertools import batched from concurrent.futures import ThreadPoolExecutor from django.core.management import BaseCommand from django.conf import settings import clickhouse_connect from vacancies.main.vector_store import add_vectors, extract_features, qdrant_client from vacancies.conf.settings import CLICKHOUSE_HOST, CLICKHOUSE_PORT clickhouse_client = clickhouse_connect.create_client(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT) query = """ SELECT id, chat_username, telegram_id, message, timestamp FROM telegram_parser_chatmessage WHERE timestamp >= now() - INTERVAL 30 DAY AND length(message) > 150 AND arrayCount(x -> position(message, x) > 0, [ 'ваканси', 'ищем', 'требуется', 'разработчик', 'будет плюсом', 'зарплат', 'оклад', 'з/п', 'руб', 'опыт', 'требовани', 'обязанности', 'условия', 'офис', 'удаленн', 'гибкий график', 'полный день', 'занятост', 'резюме', 'собеседовани', 'junior', 'middle', 'senior', 'ждем', 'компани', 'заниматься', 'формат', 'занятость', 'вилка', 'должност', 'контакт' ]) >= 5 AND arrayCount(x -> position(lower(message), x) > 0, [ 'о себе', 'обо мне', 'умею', '#ищу', '#резюме', 'университет', 'колледж' ]) = 0 AND id NOT IN %(exist_points)s """ class Command(BaseCommand): help = "Collect vacancies from telegram messages" def handle(self, *args, **options): next_page_offset = 0 exist_points_ids = [-1] while next_page_offset is not None: response = qdrant_client.scroll( collection_name="vacancies", limit=100_000, offset=next_page_offset, with_payload=False, with_vectors=False, timeout=30, ) exist_points_ids.extend([point.id for point in response[0]]) next_page_offset = response[1] exist_points_set = tuple(set(exist_points_ids)) result_rows = clickhouse_client.query(query, parameters={"exist_points": exist_points_set}).result_rows with ThreadPoolExecutor(max_workers=settings.COLLECT_VACANCIES_BATCH_SIZE) as pool: pool.map(self._process_batch, batched(result_rows, settings.COLLECT_VACANCIES_BATCH_SIZE)) def _process_batch(self, result_rows): try: for index, row in enumerate(result_rows): (id, chat_username, telegram_id, message, timestamp) = row link = f"https://t.me/{chat_username}/{telegram_id}" print(f"Processing {index+1}/{len(result_rows)} link: {link}") features = extract_features(message) add_vectors( "vacancies", id, features.model_dump(), {'content': message, 'features_json': features.model_dump(), "link": link, "timestamp": timestamp}, ) except Exception as exc: traceback.print_exception(exc)