from datetime import timedelta from itertools import batched from typing import Literal import clickhouse_connect from django.conf import settings from django.core.management import BaseCommand from django.utils import timezone from langchain_openai import ChatOpenAI from pydantic import BaseModel from vacancies.main import prompts from vacancies.main.models import JobTitle, Vacancy query = """ SELECT DISTINCT ON (message) id, chat_username, telegram_id, message, timestamp FROM telegram_parser_chatmessage WHERE timestamp >= %(timestamp)s AND length(message) > 150 AND arrayCount(x -> position(message, x) > 0, [ 'ваканси', 'ищем', 'требуется', 'разработчик', 'будет плюсом', 'зарплат', 'оклад', 'з/п', 'руб', 'опыт', 'требовани', 'обязанности', 'условия', 'офис', 'удаленн', 'гибкий график', 'полный день', 'занятост', 'резюме', 'собеседовани', 'junior', 'middle', 'senior', 'ждем', 'компани', 'заниматься', 'формат', 'занятость', 'вилка', 'должност', 'контакт' ]) >= 5 AND arrayCount(x -> position(lower(message), x) > 0, [ 'о себе', 'обо мне', 'умею', '#ищу', '#резюме', 'университет', 'колледж', 'не будет опубликовано' ]) = 0 ORDER BY timestamp ASC """ class Command(BaseCommand): help = "Collect vacancies from telegram messages" def handle(self, *args, **options): job_titles = JobTitle.objects.values_list('title', flat=True) job_title_map = dict(JobTitle.objects.values_list('title', 'id')) class Structure(BaseModel): job_title: Literal[tuple(job_titles)] min_salary_rub: int | None max_salary_rub: int | None company_name: str requirements: str openai_client = ChatOpenAI( model_name="openai/gpt-5-mini", openai_api_base="https://openrouter.ai/api/v1", temperature=0, seed=42, top_p=1, ) structured_llm = openai_client.with_structured_output(Structure) last_timestamp = timezone.now() - timedelta(days=30) if last_vacancy := Vacancy.objects.order_by("-timestamp").first(): last_timestamp = last_vacancy.timestamp clickhouse_client = clickhouse_connect.create_client(host=settings.CLICKHOUSE_HOST, port=settings.CLICKHOUSE_PORT) result_rows = clickhouse_client.query(query, parameters={"timestamp": last_timestamp}).result_rows batches = list(batched(result_rows, settings.COLLECT_VACANCIES_BATCH_SIZE)) for index, rows in enumerate(batches): prompts = [f"{prompts.STRUCTURED_OUTPUT_PROMPT} {row[3]}" for row in rows] responses = structured_llm.batch(prompts) vacancies = [] for row, response in zip(rows, responses): (id, chat_username, telegram_id, message, timestamp) = row vacancies.append(Vacancy( external_id=id, job_title_id=job_title_map[response.job_title], min_salary_rub=response.min_salary_rub, max_salary_rub=response.max_salary_rub, company_name=response.company_name, requirements=response.requirements, content=message, timestamp=timezone.make_aware(timestamp), link=f"https://t.me/{chat_username}/{telegram_id}", )) Vacancy.objects.bulk_create(vacancies, ignore_conflicts=True) print(f"Processed {index+1}/{len(batches)}")