Compare commits

..

No commits in common. "195c77908828113a0959a8f53ad7b9e1019eb9d3" and "7cce1cdc04136dc546f12b1386811e7d6026aa01" have entirely different histories.

4 changed files with 773 additions and 742 deletions

1249
uv.lock

File diff suppressed because it is too large Load Diff

View File

@ -1,34 +1,15 @@
import io
import os import os
import io
import traceback import traceback
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardMarkup, KeyboardButton
from telegram.ext import filters, ApplicationBuilder, MessageHandler, CommandHandler, ContextTypes
from pypdf import PdfReader
from vacancies.main.models import Customer, CustomerCV
from langchain.agents import create_agent from langchain.agents import create_agent
from langchain_openai import ChatOpenAI from langchain_openai import ChatOpenAI
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from pypdf import PdfReader from vacancies.main.vector_store import add_vectors, extract_features, get_next_vacancy
from telegram import (
InlineKeyboardButton,
InlineKeyboardMarkup,
KeyboardButton,
ReplyKeyboardMarkup,
Update,
)
from telegram.ext import (
ApplicationBuilder,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
)
from vacancies.conf.settings import DB_URI from vacancies.conf.settings import DB_URI
from vacancies.main.models import Customer, CustomerCV
from vacancies.main.vector_store import (
add_vectors,
batch_extract_features,
get_next_vacancy,
embed_features,
)
SYSTEM_PROMPT = """ SYSTEM_PROMPT = """
Ты карьерный копилот для ИТ. Ты можешь отвечать на любые вопросы по тематике карьеры. Ты карьерный копилот для ИТ. Ты можешь отвечать на любые вопросы по тематике карьеры.
@ -126,11 +107,11 @@ async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE):
customer_cv, _ = await CustomerCV.objects.aupdate_or_create(customer=customer, defaults=dict( customer_cv, _ = await CustomerCV.objects.aupdate_or_create(customer=customer, defaults=dict(
content=resume, content=resume,
)) ))
features = batch_extract_features(customer_cv.content)[0] features = extract_features(customer_cv.content)
add_vectors( add_vectors(
"cvs", "cvs",
customer_cv.id, customer_cv.id,
embed_features(features.model_dump())[0], features.model_dump(),
{'content': customer_cv.content, 'features_json': features.model_dump()}, {'content': customer_cv.content, 'features_json': features.model_dump()},
) )

View File

@ -1,23 +1,19 @@
from concurrent.futures import ThreadPoolExecutor import traceback
from datetime import datetime, timedelta
from itertools import batched from itertools import batched
from concurrent.futures import ThreadPoolExecutor
import clickhouse_connect
from django.core.management import BaseCommand from django.core.management import BaseCommand
from django.conf import settings from django.conf import settings
from qdrant_client.models import OrderBy import clickhouse_connect
from vacancies.main.vector_store import add_vectors, extract_features, qdrant_client
from vacancies.conf.settings import CLICKHOUSE_HOST, CLICKHOUSE_PORT
from vacancies.main.vector_store import ( clickhouse_client = clickhouse_connect.create_client(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT)
add_vectors,
batch_extract_features,
embed_features,
qdrant_client,
)
query = """ query = """
SELECT DISTINCT ON (message) id, chat_username, telegram_id, message, timestamp SELECT id, chat_username, telegram_id, message, timestamp
FROM telegram_parser_chatmessage FROM telegram_parser_chatmessage
WHERE timestamp >= %(timestamp)s WHERE timestamp >= now() - INTERVAL 30 DAY
AND length(message) > 150 AND length(message) > 150
AND arrayCount(x -> position(message, x) > 0, [ AND arrayCount(x -> position(message, x) > 0, [
'ваканси', 'ищем', 'требуется', 'разработчик', 'будет плюсом', 'ваканси', 'ищем', 'требуется', 'разработчик', 'будет плюсом',
@ -30,7 +26,7 @@ WHERE timestamp >= %(timestamp)s
AND arrayCount(x -> position(lower(message), x) > 0, [ AND arrayCount(x -> position(lower(message), x) > 0, [
'о себе', 'обо мне', 'умею', '#ищу', '#резюме', 'университет', 'колледж' 'о себе', 'обо мне', 'умею', '#ищу', '#резюме', 'университет', 'колледж'
]) = 0 ]) = 0
ORDER BY timestamp ASC AND id NOT IN %(exist_points)s
""" """
@ -38,23 +34,39 @@ class Command(BaseCommand):
help = "Collect vacancies from telegram messages" help = "Collect vacancies from telegram messages"
def handle(self, *args, **options): def handle(self, *args, **options):
response = qdrant_client.scroll(collection_name="vacancies", limit=1, order_by=OrderBy(key="timestamp", direction="desc")) next_page_offset = 0
last_point_timestamp = datetime.now() - timedelta(days=30) exist_points_ids = [-1]
if response[0]: while next_page_offset is not None:
last_point_timestamp = response[0][0].payload["timestamp"] response = qdrant_client.scroll(
collection_name="vacancies",
limit=100_000,
offset=next_page_offset,
with_payload=False,
with_vectors=False,
timeout=30,
)
exist_points_ids.extend([point.id for point in response[0]])
next_page_offset = response[1]
exist_points_set = tuple(set(exist_points_ids))
clickhouse_client = clickhouse_connect.create_client(host=settings.CLICKHOUSE_HOST, port=settings.CLICKHOUSE_PORT) result_rows = clickhouse_client.query(query, parameters={"exist_points": exist_points_set}).result_rows
result_rows = clickhouse_client.query(query, parameters={"timestamp": last_point_timestamp}).result_rows with ThreadPoolExecutor(max_workers=settings.COLLECT_VACANCIES_BATCH_SIZE) as pool:
pool.map(self._process_batch, batched(result_rows, settings.COLLECT_VACANCIES_BATCH_SIZE))
for index, rows in enumerate(batched(result_rows, settings.COLLECT_VACANCIES_BATCH_SIZE)): def _process_batch(self, result_rows):
vacancies_features = batch_extract_features([row[3] for row in rows]) try:
for index, row in enumerate(result_rows):
print(f"Processing {index+1}/{len(result_rows)//settings.COLLECT_VACANCIES_BATCH_SIZE}")
with ThreadPoolExecutor() as pool:
vacancies_vectors = pool.map(embed_features, [vacancy_features.model_dump() for vacancy_features in vacancies_features])
for row, vacancy_features, vacancy_vectors in zip(rows, vacancies_features, vacancies_vectors):
(id, chat_username, telegram_id, message, timestamp) = row (id, chat_username, telegram_id, message, timestamp) = row
link = f"https://t.me/{chat_username}/{telegram_id}" link = f"https://t.me/{chat_username}/{telegram_id}"
payload = {'content': message, 'features_json': vacancy_features.model_dump(), "link": link, "timestamp": timestamp} print(f"Processing {index+1}/{len(result_rows)} link: {link}")
add_vectors("vacancies", id, vacancy_features.model_dump(), payload, vacancy_vectors) features = extract_features(message)
add_vectors(
"vacancies",
id,
features.model_dump(),
{'content': message, 'features_json': features.model_dump(), "link": link, "timestamp": timestamp},
)
except Exception as exc:
traceback.print_exception(exc)

View File

@ -1,9 +1,12 @@
from langchain_openai import ChatOpenAI, OpenAIEmbeddings from qdrant_client import models
from qdrant_client import QdrantClient, models from langchain_openai import OpenAIEmbeddings
from qdrant_client.models import Filter, HasIdCondition from langchain_openai import ChatOpenAI
from qdrant_client import QdrantClient
from qdrant_client.models import Filter
from vacancies.main.models import VacancyFeatures
from vacancies.conf.settings import QDRANT_URL from vacancies.conf.settings import QDRANT_URL
from vacancies.main.models import RecommendedVacancy, VacancyFeatures from vacancies.main.models import RecommendedVacancy
from qdrant_client.models import HasIdCondition
qdrant_client = QdrantClient(url=QDRANT_URL) qdrant_client = QdrantClient(url=QDRANT_URL)
@ -13,9 +16,19 @@ FEATURE_NAMES = [
] ]
weights = { weights = {
"job_title": 70, "job_title": 25,
"tech_stack": 10, "employment_type": 5,
"salary_range": 10, "work_format": 5,
"experience": 8,
"position_level": 12,
"industry": 10,
"tech_stack": 14,
"location": 5,
"salary_range": 5,
"languages": 5,
"education": 2,
"schedule": 2,
"additional_requirements": 2,
} }
vectors_config = { vectors_config = {
@ -25,22 +38,18 @@ vectors_config = {
if not qdrant_client.collection_exists("vacancies"): if not qdrant_client.collection_exists("vacancies"):
qdrant_client.create_collection( qdrant_client.create_collection(
collection_name="vacancies", collection_name="vacancies",
vectors_config=vectors_config, vectors_config=vectors_config
)
qdrant_client.create_payload_index(
collection_name="vacancies",
field_name="timestamp",
field_schema="datetime",
) )
if not qdrant_client.collection_exists("cvs"): if not qdrant_client.collection_exists("cvs"):
qdrant_client.create_collection( qdrant_client.create_collection(
collection_name="cvs", collection_name="cvs",
vectors_config=vectors_config, vectors_config=vectors_config
) )
embedding = OpenAIEmbeddings(model="text-embedding-3-large") embedding = OpenAIEmbeddings(model="text-embedding-3-large")
def _prepare_texts(features): def _prepare_texts(features):
"""Prepare texts for each feature from features dict."""
texts = {} texts = {}
for name in FEATURE_NAMES: for name in FEATURE_NAMES:
value = features.get(name) value = features.get(name)
@ -52,21 +61,31 @@ def _prepare_texts(features):
return texts return texts
def embed_features(features): def add_vectors(collection_name: str, _id: int, features: dict, payload: dict):
features = {key: value for key, value in features.items() if value} """Add vectors for a vacancy based on its features."""
features_texts = _prepare_texts(features) texts = _prepare_texts(features)
names, texts = features_texts.keys(), features_texts.values() vectors = {}
vectors = dict(zip(names, embedding.embed_documents(texts))) for name, text in texts.items():
return vectors vectors[name] = [0.0] * 3072
if text:
vec = embedding.embed_query(text)
vectors[name] = vec
def add_vectors(collection_name: str, _id: int, features: dict, payload: dict, vectors):
max_similarities = {} max_similarities = {}
for name, vec in vectors.items(): for name, vec in vectors.items():
results = qdrant_client.query_points(collection_name="vacancies", query=vec, using=name, limit=100) if any(v != 0 for v in vec):
results = qdrant_client.query_points(
collection_name="vacancies",
query=vec,
using=name,
limit=100,
)
for res in results.points: for res in results.points:
max_similarities.setdefault(res.id, {}) vid = res.id
max_similarities[res.id][name] = res.score sim = res.score
if vid not in max_similarities:
max_similarities[vid] = {}
max_similarities[vid][name] = sim
scored = [] scored = []
for vid, feature_sims in max_similarities.items(): for vid, feature_sims in max_similarities.items():
@ -74,57 +93,77 @@ def add_vectors(collection_name: str, _id: int, features: dict, payload: dict, v
scored.append({"id": vid, "score": total}) scored.append({"id": vid, "score": total})
scored.sort(key=lambda x: x["score"], reverse=True) scored.sort(key=lambda x: x["score"], reverse=True)
if scored and scored[0]["score"] > 80: # threshold if scored and scored[0]["score"] > 90: # threshold
return return
qdrant_client.upsert( qdrant_client.upsert(
collection_name=collection_name, collection_name=collection_name,
points=[models.PointStruct(id=_id, vector=vectors, payload=payload)] points=[
models.PointStruct(
id=_id,
vector=vectors,
payload=payload,
)
]
) )
def search_similarities(query_filter: Filter, cv_id: int): def search_similarities(query_filter: Filter, cv_id: int):
cv = qdrant_client.retrieve(collection_name="cvs", ids=[cv_id], with_vectors=True)[0] cv = qdrant_client.retrieve(
collection_name="cvs",
ids=[cv_id],
with_vectors=True,
)[0]
max_similarities, vacancies_content = {}, {} max_similarities = {}
vacancies_content = {}
for name, vec in cv.vector.items(): for name, vec in cv.vector.items():
if any(v != 0 for v in vec):
results = qdrant_client.query_points( results = qdrant_client.query_points(
collection_name="vacancies", collection_name="vacancies",
query=vec, query=vec,
using=name, using=name,
limit=100000, limit=100,
with_payload=True, with_payload=True,
query_filter=query_filter, query_filter=query_filter,
) )
for res in results.points: for res in results.points:
max_similarities.setdefault(res.id, {}) vid = res.id
vacancies_content.setdefault(res.id, {}) sim = res.score
if vid not in max_similarities:
max_similarities[res.id][name] = res.score max_similarities[vid] = {}
vacancies_content[res.id]["content"] = res.payload["content"] max_similarities[vid][name] = sim
vacancies_content[res.id]["features_json"] = res.payload["features_json"] if vid not in vacancies_content:
vacancies_content[res.id]["link"] = res.payload["link"] vacancies_content[vid] = {}
vacancies_content[vid]["content"] = res.payload["content"]
vacancies_content[vid]["link"] = res.payload["link"]
scored = [] scored = []
for vid, feature_sims in max_similarities.items(): for vid, feature_sims in max_similarities.items():
total = sum(feature_sims[feature] * weights.get(feature, 1) for feature in feature_sims) total = sum(feature_sims[feature] * weights.get(feature, 1) for feature in feature_sims)
scored.append({ scored.append({"id": vid, "score": total, "content": vacancies_content[vid]["content"], "link": vacancies_content[vid]["link"]})
"id": vid,
"score": total,
"content": vacancies_content[vid]["content"],
"features_json": vacancies_content[vid]["features_json"],
"link": vacancies_content[vid]["link"],
"sims": feature_sims,
})
scored.sort(key=lambda x: x["score"], reverse=True) scored.sort(key=lambda x: x["score"], reverse=True)
return scored[0]["id"], scored[0]["content"], scored[0]["link"] prompt = f"""
Резюме: {cv.payload['content']}
Среди вакансий ниже выбери одну наиболее релевантную и выведи ее индекс(от 0 до 9).
Иногда могут попадаться чужие резюме вместо вакансий, их отдавать нельзя.
В ответе выведи только число. Если среди вакансий нет подходящих, то верни -1.
{scored[:10]}
"""
openai_client = ChatOpenAI(model_name="gpt-5-mini", reasoning_effort="minimal", temperature=0, seed=42, top_p=1)
response = openai_client.invoke(prompt)
index = int(response.content)
if index == -1:
return None
return scored[index]["id"], scored[index]["content"], scored[index]["link"]
def batch_extract_features(contents: list[str]) -> list[VacancyFeatures]: def extract_features(content: str) -> VacancyFeatures:
prompts = [ prompt = f"""
f"""
Extract the following features from the job vacancy description. If a feature is not mentioned, set it to null. Extract the following features from the job vacancy description. If a feature is not mentioned, set it to null.
Features: Features:
- job_title: Должность (e.g., DevOps, Python программист) - job_title: Должность (e.g., DevOps, Python программист)
@ -143,11 +182,9 @@ def batch_extract_features(contents: list[str]) -> list[VacancyFeatures]:
Vacancy content: Vacancy content:
{content} {content}
""" """
for content in contents
]
openai_client = ChatOpenAI(model_name="gpt-5-mini", reasoning_effort="minimal", temperature=0, seed=42, top_p=1) openai_client = ChatOpenAI(model_name="gpt-5-mini", reasoning_effort="minimal", temperature=0, seed=42, top_p=1)
structured_llm = openai_client.with_structured_output(VacancyFeatures) structured_llm = openai_client.with_structured_output(VacancyFeatures)
response = structured_llm.batch(prompts) response = structured_llm.invoke(prompt)
return response return response