vision-career/vacancies/main/management/commands/collect_vacancies_from_telegram_messages.py
2025-12-04 00:51:28 +03:00

88 lines
3.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import timedelta
from itertools import batched
from typing import Literal
import clickhouse_connect
from django.conf import settings
from django.core.management import BaseCommand
from django.utils import timezone
from langchain_openai import ChatOpenAI
from pydantic import BaseModel
from vacancies.main import prompts
from vacancies.main.models import JobTitle, Vacancy
query = """
SELECT DISTINCT ON (message) id, chat_username, telegram_id, message, timestamp
FROM telegram_parser_chatmessage
WHERE timestamp >= %(timestamp)s
AND length(message) > 150
AND arrayCount(x -> position(message, x) > 0, [
'ваканси', 'ищем', 'требуется', 'разработчик', 'будет плюсом',
'зарплат', 'оклад', 'з/п', 'руб', 'опыт',
'требовани', 'обязанности', 'условия', 'офис',
'удаленн', 'гибкий график', 'полный день', 'занятост',
'резюме', 'собеседовани', 'junior', 'middle', 'senior', 'ждем', 'компани',
'заниматься', 'формат', 'занятость', 'вилка', 'должност', 'контакт'
]) >= 5
AND arrayCount(x -> position(lower(message), x) > 0, [
'о себе', 'обо мне', 'умею', '#ищу', '#резюме', 'университет', 'колледж',
'не будет опубликовано'
]) = 0
ORDER BY timestamp ASC
"""
class Command(BaseCommand):
help = "Collect vacancies from telegram messages"
def handle(self, *args, **options):
job_titles = JobTitle.objects.values_list('title', flat=True)
job_title_map = dict(JobTitle.objects.values_list('title', 'id'))
class Structure(BaseModel):
job_title: Literal[tuple(job_titles)]
original_title: str
min_salary_rub: int | None
max_salary_rub: int | None
company_name: str
requirements: str
openai_client = ChatOpenAI(
model_name="openai/gpt-5-mini",
openai_api_base="https://openrouter.ai/api/v1",
temperature=0,
seed=42,
top_p=1,
)
structured_llm = openai_client.with_structured_output(Structure)
last_timestamp = timezone.now() - timedelta(days=30)
if last_vacancy := Vacancy.objects.order_by("-timestamp").first():
last_timestamp = last_vacancy.timestamp
clickhouse_client = clickhouse_connect.create_client(host=settings.CLICKHOUSE_HOST, port=settings.CLICKHOUSE_PORT)
result_rows = clickhouse_client.query(query, parameters={"timestamp": last_timestamp}).result_rows
batches = list(batched(result_rows, settings.COLLECT_VACANCIES_BATCH_SIZE))
for index, rows in enumerate(batches):
prompts = [f"{prompts.STRUCTURED_OUTPUT_PROMPT} {row[3]}" for row in rows]
responses = structured_llm.batch(prompts)
vacancies = []
for row, response in zip(rows, responses):
(id, chat_username, telegram_id, message, timestamp) = row
vacancies.append(Vacancy(
external_id=id,
job_title_id=job_title_map[response.job_title],
original_title=response.original_title,
min_salary_rub=response.min_salary_rub,
max_salary_rub=response.max_salary_rub,
company_name=response.company_name,
requirements=response.requirements,
content=message,
timestamp=timezone.make_aware(timestamp),
link=f"https://t.me/{chat_username}/{telegram_id}",
))
Vacancy.objects.bulk_create(vacancies, ignore_conflicts=True)
print(f"Processed {index+1}/{len(batches)}")