from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta from itertools import batched import clickhouse_connect from django.core.management import BaseCommand from django.conf import settings from qdrant_client.models import OrderBy from vacancies.main.vector_store import ( add_vectors, batch_extract_features, embed_features, qdrant_client, ) query = """ SELECT DISTINCT ON (message) id, chat_username, telegram_id, message, timestamp FROM telegram_parser_chatmessage WHERE timestamp >= %(timestamp)s AND length(message) > 150 AND arrayCount(x -> position(message, x) > 0, [ 'ваканси', 'ищем', 'требуется', 'разработчик', 'будет плюсом', 'зарплат', 'оклад', 'з/п', 'руб', 'опыт', 'требовани', 'обязанности', 'условия', 'офис', 'удаленн', 'гибкий график', 'полный день', 'занятост', 'резюме', 'собеседовани', 'junior', 'middle', 'senior', 'ждем', 'компани', 'заниматься', 'формат', 'занятость', 'вилка', 'должност', 'контакт' ]) >= 5 AND arrayCount(x -> position(lower(message), x) > 0, [ 'о себе', 'обо мне', 'умею', '#ищу', '#резюме', 'университет', 'колледж' ]) = 0 ORDER BY timestamp ASC """ class Command(BaseCommand): help = "Collect vacancies from telegram messages" def handle(self, *args, **options): response = qdrant_client.scroll(collection_name="vacancies", limit=1, order_by=OrderBy(key="timestamp", direction="desc")) last_point_timestamp = datetime.now() - timedelta(days=30) if response[0]: last_point_timestamp = response[0][0].payload["timestamp"] clickhouse_client = clickhouse_connect.create_client(host=settings.CLICKHOUSE_HOST, port=settings.CLICKHOUSE_PORT) result_rows = clickhouse_client.query(query, parameters={"timestamp": last_point_timestamp}).result_rows for index, rows in enumerate(batched(result_rows, settings.COLLECT_VACANCIES_BATCH_SIZE)): vacancies_features = batch_extract_features([row[3] for row in rows]) print(f"Processing {index+1}/{len(result_rows)//settings.COLLECT_VACANCIES_BATCH_SIZE}") with ThreadPoolExecutor() as pool: vacancies_vectors = pool.map(embed_features, [vacancy_features.model_dump() for vacancy_features in vacancies_features]) for row, vacancy_features, vacancy_vectors in zip(rows, vacancies_features, vacancies_vectors): (id, chat_username, telegram_id, message, timestamp) = row link = f"https://t.me/{chat_username}/{telegram_id}" payload = {'content': message, 'features_json': vacancy_features.model_dump(), "link": link, "timestamp": timestamp} add_vectors("vacancies", id, vacancy_features.model_dump(), payload, vacancy_vectors)