86 lines
3.7 KiB
Python
86 lines
3.7 KiB
Python
from datetime import timedelta
|
||
from itertools import batched
|
||
from typing import Literal
|
||
|
||
import clickhouse_connect
|
||
from django.conf import settings
|
||
from django.core.management import BaseCommand
|
||
from django.utils import timezone
|
||
from langchain_openai import ChatOpenAI
|
||
from pydantic import BaseModel
|
||
|
||
from vacancies.main import prompts
|
||
from vacancies.main.models import JobTitle, Vacancy
|
||
|
||
query = """
|
||
SELECT DISTINCT ON (message) id, chat_username, telegram_id, message, timestamp
|
||
FROM telegram_parser_chatmessage
|
||
WHERE timestamp >= %(timestamp)s
|
||
AND length(message) > 150
|
||
AND arrayCount(x -> position(message, x) > 0, [
|
||
'ваканси', 'ищем', 'требуется', 'разработчик', 'будет плюсом',
|
||
'зарплат', 'оклад', 'з/п', 'руб', 'опыт',
|
||
'требовани', 'обязанности', 'условия', 'офис',
|
||
'удаленн', 'гибкий график', 'полный день', 'занятост',
|
||
'резюме', 'собеседовани', 'junior', 'middle', 'senior', 'ждем', 'компани',
|
||
'заниматься', 'формат', 'занятость', 'вилка', 'должност', 'контакт'
|
||
]) >= 5
|
||
AND arrayCount(x -> position(lower(message), x) > 0, [
|
||
'о себе', 'обо мне', 'умею', '#ищу', '#резюме', 'университет', 'колледж',
|
||
'не будет опубликовано'
|
||
]) = 0
|
||
ORDER BY timestamp ASC
|
||
"""
|
||
|
||
|
||
class Command(BaseCommand):
|
||
help = "Collect vacancies from telegram messages"
|
||
|
||
def handle(self, *args, **options):
|
||
job_titles = JobTitle.objects.values_list('title', flat=True)
|
||
job_title_map = dict(JobTitle.objects.values_list('title', 'id'))
|
||
|
||
class Structure(BaseModel):
|
||
job_title: Literal[tuple(job_titles)]
|
||
min_salary_rub: int | None
|
||
max_salary_rub: int | None
|
||
company_name: str
|
||
requirements: str
|
||
|
||
openai_client = ChatOpenAI(
|
||
model_name="openai/gpt-5-mini",
|
||
openai_api_base="https://openrouter.ai/api/v1",
|
||
temperature=0,
|
||
seed=42,
|
||
top_p=1,
|
||
)
|
||
structured_llm = openai_client.with_structured_output(Structure)
|
||
|
||
last_timestamp = timezone.now() - timedelta(days=30)
|
||
if last_vacancy := Vacancy.objects.order_by("-timestamp").first():
|
||
last_timestamp = last_vacancy.timestamp
|
||
|
||
clickhouse_client = clickhouse_connect.create_client(host=settings.CLICKHOUSE_HOST, port=settings.CLICKHOUSE_PORT)
|
||
result_rows = clickhouse_client.query(query, parameters={"timestamp": last_timestamp}).result_rows
|
||
|
||
batches = list(batched(result_rows, settings.COLLECT_VACANCIES_BATCH_SIZE))
|
||
for index, rows in enumerate(batches):
|
||
prompts = [f"{prompts.STRUCTURED_OUTPUT_PROMPT} {row[3]}" for row in rows]
|
||
responses = structured_llm.batch(prompts)
|
||
vacancies = []
|
||
for row, response in zip(rows, responses):
|
||
(id, chat_username, telegram_id, message, timestamp) = row
|
||
vacancies.append(Vacancy(
|
||
external_id=id,
|
||
job_title_id=job_title_map[response.job_title],
|
||
min_salary_rub=response.min_salary_rub,
|
||
max_salary_rub=response.max_salary_rub,
|
||
company_name=response.company_name,
|
||
requirements=response.requirements,
|
||
content=message,
|
||
timestamp=timezone.make_aware(timestamp),
|
||
link=f"https://t.me/{chat_username}/{telegram_id}",
|
||
))
|
||
Vacancy.objects.bulk_create(vacancies, ignore_conflicts=True)
|
||
print(f"Processed {index+1}/{len(batches)}")
|