Compare commits

..

2 Commits

Author SHA1 Message Date
195c779088 Optimize vacancies vectorization
All checks were successful
release / docker (push) Successful in 1m0s
2025-11-08 12:23:32 +03:00
df33ce79bb Change recomendation strategy 2025-11-07 00:06:31 +03:00
4 changed files with 742 additions and 773 deletions

1249
uv.lock

File diff suppressed because it is too large Load Diff

View File

@ -1,15 +1,34 @@
import os
import io import io
import os
import traceback import traceback
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardMarkup, KeyboardButton
from telegram.ext import filters, ApplicationBuilder, MessageHandler, CommandHandler, ContextTypes
from pypdf import PdfReader
from vacancies.main.models import Customer, CustomerCV
from langchain.agents import create_agent from langchain.agents import create_agent
from langchain_openai import ChatOpenAI from langchain_openai import ChatOpenAI
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from vacancies.main.vector_store import add_vectors, extract_features, get_next_vacancy from pypdf import PdfReader
from telegram import (
InlineKeyboardButton,
InlineKeyboardMarkup,
KeyboardButton,
ReplyKeyboardMarkup,
Update,
)
from telegram.ext import (
ApplicationBuilder,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
)
from vacancies.conf.settings import DB_URI from vacancies.conf.settings import DB_URI
from vacancies.main.models import Customer, CustomerCV
from vacancies.main.vector_store import (
add_vectors,
batch_extract_features,
get_next_vacancy,
embed_features,
)
SYSTEM_PROMPT = """ SYSTEM_PROMPT = """
Ты карьерный копилот для ИТ. Ты можешь отвечать на любые вопросы по тематике карьеры. Ты карьерный копилот для ИТ. Ты можешь отвечать на любые вопросы по тематике карьеры.
@ -107,11 +126,11 @@ async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE):
customer_cv, _ = await CustomerCV.objects.aupdate_or_create(customer=customer, defaults=dict( customer_cv, _ = await CustomerCV.objects.aupdate_or_create(customer=customer, defaults=dict(
content=resume, content=resume,
)) ))
features = extract_features(customer_cv.content) features = batch_extract_features(customer_cv.content)[0]
add_vectors( add_vectors(
"cvs", "cvs",
customer_cv.id, customer_cv.id,
features.model_dump(), embed_features(features.model_dump())[0],
{'content': customer_cv.content, 'features_json': features.model_dump()}, {'content': customer_cv.content, 'features_json': features.model_dump()},
) )

View File

@ -1,19 +1,23 @@
import traceback
from itertools import batched
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from itertools import batched
import clickhouse_connect
from django.core.management import BaseCommand from django.core.management import BaseCommand
from django.conf import settings from django.conf import settings
import clickhouse_connect from qdrant_client.models import OrderBy
from vacancies.main.vector_store import add_vectors, extract_features, qdrant_client
from vacancies.conf.settings import CLICKHOUSE_HOST, CLICKHOUSE_PORT
clickhouse_client = clickhouse_connect.create_client(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT) from vacancies.main.vector_store import (
add_vectors,
batch_extract_features,
embed_features,
qdrant_client,
)
query = """ query = """
SELECT id, chat_username, telegram_id, message, timestamp SELECT DISTINCT ON (message) id, chat_username, telegram_id, message, timestamp
FROM telegram_parser_chatmessage FROM telegram_parser_chatmessage
WHERE timestamp >= now() - INTERVAL 30 DAY WHERE timestamp >= %(timestamp)s
AND length(message) > 150 AND length(message) > 150
AND arrayCount(x -> position(message, x) > 0, [ AND arrayCount(x -> position(message, x) > 0, [
'ваканси', 'ищем', 'требуется', 'разработчик', 'будет плюсом', 'ваканси', 'ищем', 'требуется', 'разработчик', 'будет плюсом',
@ -26,7 +30,7 @@ WHERE timestamp >= now() - INTERVAL 30 DAY
AND arrayCount(x -> position(lower(message), x) > 0, [ AND arrayCount(x -> position(lower(message), x) > 0, [
'о себе', 'обо мне', 'умею', '#ищу', '#резюме', 'университет', 'колледж' 'о себе', 'обо мне', 'умею', '#ищу', '#резюме', 'университет', 'колледж'
]) = 0 ]) = 0
AND id NOT IN %(exist_points)s ORDER BY timestamp ASC
""" """
@ -34,39 +38,23 @@ class Command(BaseCommand):
help = "Collect vacancies from telegram messages" help = "Collect vacancies from telegram messages"
def handle(self, *args, **options): def handle(self, *args, **options):
next_page_offset = 0 response = qdrant_client.scroll(collection_name="vacancies", limit=1, order_by=OrderBy(key="timestamp", direction="desc"))
exist_points_ids = [-1] last_point_timestamp = datetime.now() - timedelta(days=30)
while next_page_offset is not None: if response[0]:
response = qdrant_client.scroll( last_point_timestamp = response[0][0].payload["timestamp"]
collection_name="vacancies",
limit=100_000,
offset=next_page_offset,
with_payload=False,
with_vectors=False,
timeout=30,
)
exist_points_ids.extend([point.id for point in response[0]])
next_page_offset = response[1]
exist_points_set = tuple(set(exist_points_ids))
result_rows = clickhouse_client.query(query, parameters={"exist_points": exist_points_set}).result_rows clickhouse_client = clickhouse_connect.create_client(host=settings.CLICKHOUSE_HOST, port=settings.CLICKHOUSE_PORT)
with ThreadPoolExecutor(max_workers=settings.COLLECT_VACANCIES_BATCH_SIZE) as pool: result_rows = clickhouse_client.query(query, parameters={"timestamp": last_point_timestamp}).result_rows
pool.map(self._process_batch, batched(result_rows, settings.COLLECT_VACANCIES_BATCH_SIZE))
def _process_batch(self, result_rows): for index, rows in enumerate(batched(result_rows, settings.COLLECT_VACANCIES_BATCH_SIZE)):
try: vacancies_features = batch_extract_features([row[3] for row in rows])
for index, row in enumerate(result_rows):
print(f"Processing {index+1}/{len(result_rows)//settings.COLLECT_VACANCIES_BATCH_SIZE}")
with ThreadPoolExecutor() as pool:
vacancies_vectors = pool.map(embed_features, [vacancy_features.model_dump() for vacancy_features in vacancies_features])
for row, vacancy_features, vacancy_vectors in zip(rows, vacancies_features, vacancies_vectors):
(id, chat_username, telegram_id, message, timestamp) = row (id, chat_username, telegram_id, message, timestamp) = row
link = f"https://t.me/{chat_username}/{telegram_id}" link = f"https://t.me/{chat_username}/{telegram_id}"
print(f"Processing {index+1}/{len(result_rows)} link: {link}") payload = {'content': message, 'features_json': vacancy_features.model_dump(), "link": link, "timestamp": timestamp}
features = extract_features(message) add_vectors("vacancies", id, vacancy_features.model_dump(), payload, vacancy_vectors)
add_vectors(
"vacancies",
id,
features.model_dump(),
{'content': message, 'features_json': features.model_dump(), "link": link, "timestamp": timestamp},
)
except Exception as exc:
traceback.print_exception(exc)

View File

@ -1,12 +1,9 @@
from qdrant_client import models from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_openai import OpenAIEmbeddings from qdrant_client import QdrantClient, models
from langchain_openai import ChatOpenAI from qdrant_client.models import Filter, HasIdCondition
from qdrant_client import QdrantClient
from qdrant_client.models import Filter
from vacancies.main.models import VacancyFeatures
from vacancies.conf.settings import QDRANT_URL from vacancies.conf.settings import QDRANT_URL
from vacancies.main.models import RecommendedVacancy from vacancies.main.models import RecommendedVacancy, VacancyFeatures
from qdrant_client.models import HasIdCondition
qdrant_client = QdrantClient(url=QDRANT_URL) qdrant_client = QdrantClient(url=QDRANT_URL)
@ -16,19 +13,9 @@ FEATURE_NAMES = [
] ]
weights = { weights = {
"job_title": 25, "job_title": 70,
"employment_type": 5, "tech_stack": 10,
"work_format": 5, "salary_range": 10,
"experience": 8,
"position_level": 12,
"industry": 10,
"tech_stack": 14,
"location": 5,
"salary_range": 5,
"languages": 5,
"education": 2,
"schedule": 2,
"additional_requirements": 2,
} }
vectors_config = { vectors_config = {
@ -38,18 +25,22 @@ vectors_config = {
if not qdrant_client.collection_exists("vacancies"): if not qdrant_client.collection_exists("vacancies"):
qdrant_client.create_collection( qdrant_client.create_collection(
collection_name="vacancies", collection_name="vacancies",
vectors_config=vectors_config vectors_config=vectors_config,
)
qdrant_client.create_payload_index(
collection_name="vacancies",
field_name="timestamp",
field_schema="datetime",
) )
if not qdrant_client.collection_exists("cvs"): if not qdrant_client.collection_exists("cvs"):
qdrant_client.create_collection( qdrant_client.create_collection(
collection_name="cvs", collection_name="cvs",
vectors_config=vectors_config vectors_config=vectors_config,
) )
embedding = OpenAIEmbeddings(model="text-embedding-3-large") embedding = OpenAIEmbeddings(model="text-embedding-3-large")
def _prepare_texts(features): def _prepare_texts(features):
"""Prepare texts for each feature from features dict."""
texts = {} texts = {}
for name in FEATURE_NAMES: for name in FEATURE_NAMES:
value = features.get(name) value = features.get(name)
@ -61,31 +52,21 @@ def _prepare_texts(features):
return texts return texts
def add_vectors(collection_name: str, _id: int, features: dict, payload: dict): def embed_features(features):
"""Add vectors for a vacancy based on its features.""" features = {key: value for key, value in features.items() if value}
texts = _prepare_texts(features) features_texts = _prepare_texts(features)
vectors = {} names, texts = features_texts.keys(), features_texts.values()
for name, text in texts.items(): vectors = dict(zip(names, embedding.embed_documents(texts)))
vectors[name] = [0.0] * 3072 return vectors
if text:
vec = embedding.embed_query(text)
vectors[name] = vec
def add_vectors(collection_name: str, _id: int, features: dict, payload: dict, vectors):
max_similarities = {} max_similarities = {}
for name, vec in vectors.items(): for name, vec in vectors.items():
if any(v != 0 for v in vec): results = qdrant_client.query_points(collection_name="vacancies", query=vec, using=name, limit=100)
results = qdrant_client.query_points(
collection_name="vacancies",
query=vec,
using=name,
limit=100,
)
for res in results.points: for res in results.points:
vid = res.id max_similarities.setdefault(res.id, {})
sim = res.score max_similarities[res.id][name] = res.score
if vid not in max_similarities:
max_similarities[vid] = {}
max_similarities[vid][name] = sim
scored = [] scored = []
for vid, feature_sims in max_similarities.items(): for vid, feature_sims in max_similarities.items():
@ -93,77 +74,57 @@ def add_vectors(collection_name: str, _id: int, features: dict, payload: dict):
scored.append({"id": vid, "score": total}) scored.append({"id": vid, "score": total})
scored.sort(key=lambda x: x["score"], reverse=True) scored.sort(key=lambda x: x["score"], reverse=True)
if scored and scored[0]["score"] > 90: # threshold if scored and scored[0]["score"] > 80: # threshold
return return
qdrant_client.upsert( qdrant_client.upsert(
collection_name=collection_name, collection_name=collection_name,
points=[ points=[models.PointStruct(id=_id, vector=vectors, payload=payload)]
models.PointStruct(
id=_id,
vector=vectors,
payload=payload,
)
]
) )
def search_similarities(query_filter: Filter, cv_id: int): def search_similarities(query_filter: Filter, cv_id: int):
cv = qdrant_client.retrieve( cv = qdrant_client.retrieve(collection_name="cvs", ids=[cv_id], with_vectors=True)[0]
collection_name="cvs",
ids=[cv_id],
with_vectors=True,
)[0]
max_similarities = {} max_similarities, vacancies_content = {}, {}
vacancies_content = {}
for name, vec in cv.vector.items(): for name, vec in cv.vector.items():
if any(v != 0 for v in vec):
results = qdrant_client.query_points( results = qdrant_client.query_points(
collection_name="vacancies", collection_name="vacancies",
query=vec, query=vec,
using=name, using=name,
limit=100, limit=100000,
with_payload=True, with_payload=True,
query_filter=query_filter, query_filter=query_filter,
) )
for res in results.points: for res in results.points:
vid = res.id max_similarities.setdefault(res.id, {})
sim = res.score vacancies_content.setdefault(res.id, {})
if vid not in max_similarities:
max_similarities[vid] = {} max_similarities[res.id][name] = res.score
max_similarities[vid][name] = sim vacancies_content[res.id]["content"] = res.payload["content"]
if vid not in vacancies_content: vacancies_content[res.id]["features_json"] = res.payload["features_json"]
vacancies_content[vid] = {} vacancies_content[res.id]["link"] = res.payload["link"]
vacancies_content[vid]["content"] = res.payload["content"]
vacancies_content[vid]["link"] = res.payload["link"]
scored = [] scored = []
for vid, feature_sims in max_similarities.items(): for vid, feature_sims in max_similarities.items():
total = sum(feature_sims[feature] * weights.get(feature, 1) for feature in feature_sims) total = sum(feature_sims[feature] * weights.get(feature, 1) for feature in feature_sims)
scored.append({"id": vid, "score": total, "content": vacancies_content[vid]["content"], "link": vacancies_content[vid]["link"]}) scored.append({
"id": vid,
"score": total,
"content": vacancies_content[vid]["content"],
"features_json": vacancies_content[vid]["features_json"],
"link": vacancies_content[vid]["link"],
"sims": feature_sims,
})
scored.sort(key=lambda x: x["score"], reverse=True) scored.sort(key=lambda x: x["score"], reverse=True)
prompt = f""" return scored[0]["id"], scored[0]["content"], scored[0]["link"]
Резюме: {cv.payload['content']}
Среди вакансий ниже выбери одну наиболее релевантную и выведи ее индекс(от 0 до 9).
Иногда могут попадаться чужие резюме вместо вакансий, их отдавать нельзя.
В ответе выведи только число. Если среди вакансий нет подходящих, то верни -1.
{scored[:10]}
"""
openai_client = ChatOpenAI(model_name="gpt-5-mini", reasoning_effort="minimal", temperature=0, seed=42, top_p=1)
response = openai_client.invoke(prompt)
index = int(response.content)
if index == -1:
return None
return scored[index]["id"], scored[index]["content"], scored[index]["link"]
def extract_features(content: str) -> VacancyFeatures: def batch_extract_features(contents: list[str]) -> list[VacancyFeatures]:
prompt = f""" prompts = [
f"""
Extract the following features from the job vacancy description. If a feature is not mentioned, set it to null. Extract the following features from the job vacancy description. If a feature is not mentioned, set it to null.
Features: Features:
- job_title: Должность (e.g., DevOps, Python программист) - job_title: Должность (e.g., DevOps, Python программист)
@ -182,9 +143,11 @@ def extract_features(content: str) -> VacancyFeatures:
Vacancy content: Vacancy content:
{content} {content}
""" """
for content in contents
]
openai_client = ChatOpenAI(model_name="gpt-5-mini", reasoning_effort="minimal", temperature=0, seed=42, top_p=1) openai_client = ChatOpenAI(model_name="gpt-5-mini", reasoning_effort="minimal", temperature=0, seed=42, top_p=1)
structured_llm = openai_client.with_structured_output(VacancyFeatures) structured_llm = openai_client.with_structured_output(VacancyFeatures)
response = structured_llm.invoke(prompt) response = structured_llm.batch(prompts)
return response return response