Compare commits

...

2 Commits

Author SHA1 Message Date
195c779088 Optimize vacancies vectorization
All checks were successful
release / docker (push) Successful in 1m0s
2025-11-08 12:23:32 +03:00
df33ce79bb Change recomendation strategy 2025-11-07 00:06:31 +03:00
4 changed files with 742 additions and 773 deletions

1249
uv.lock

File diff suppressed because it is too large Load Diff

View File

@ -1,15 +1,34 @@
import os
import io
import os
import traceback
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardMarkup, KeyboardButton
from telegram.ext import filters, ApplicationBuilder, MessageHandler, CommandHandler, ContextTypes
from pypdf import PdfReader
from vacancies.main.models import Customer, CustomerCV
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from vacancies.main.vector_store import add_vectors, extract_features, get_next_vacancy
from pypdf import PdfReader
from telegram import (
InlineKeyboardButton,
InlineKeyboardMarkup,
KeyboardButton,
ReplyKeyboardMarkup,
Update,
)
from telegram.ext import (
ApplicationBuilder,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
)
from vacancies.conf.settings import DB_URI
from vacancies.main.models import Customer, CustomerCV
from vacancies.main.vector_store import (
add_vectors,
batch_extract_features,
get_next_vacancy,
embed_features,
)
SYSTEM_PROMPT = """
Ты карьерный копилот для ИТ. Ты можешь отвечать на любые вопросы по тематике карьеры.
@ -107,11 +126,11 @@ async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE):
customer_cv, _ = await CustomerCV.objects.aupdate_or_create(customer=customer, defaults=dict(
content=resume,
))
features = extract_features(customer_cv.content)
features = batch_extract_features(customer_cv.content)[0]
add_vectors(
"cvs",
customer_cv.id,
features.model_dump(),
embed_features(features.model_dump())[0],
{'content': customer_cv.content, 'features_json': features.model_dump()},
)

View File

@ -1,19 +1,23 @@
import traceback
from itertools import batched
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from itertools import batched
import clickhouse_connect
from django.core.management import BaseCommand
from django.conf import settings
import clickhouse_connect
from vacancies.main.vector_store import add_vectors, extract_features, qdrant_client
from vacancies.conf.settings import CLICKHOUSE_HOST, CLICKHOUSE_PORT
from qdrant_client.models import OrderBy
clickhouse_client = clickhouse_connect.create_client(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT)
from vacancies.main.vector_store import (
add_vectors,
batch_extract_features,
embed_features,
qdrant_client,
)
query = """
SELECT id, chat_username, telegram_id, message, timestamp
SELECT DISTINCT ON (message) id, chat_username, telegram_id, message, timestamp
FROM telegram_parser_chatmessage
WHERE timestamp >= now() - INTERVAL 30 DAY
WHERE timestamp >= %(timestamp)s
AND length(message) > 150
AND arrayCount(x -> position(message, x) > 0, [
'ваканси', 'ищем', 'требуется', 'разработчик', 'будет плюсом',
@ -26,7 +30,7 @@ WHERE timestamp >= now() - INTERVAL 30 DAY
AND arrayCount(x -> position(lower(message), x) > 0, [
'о себе', 'обо мне', 'умею', '#ищу', '#резюме', 'университет', 'колледж'
]) = 0
AND id NOT IN %(exist_points)s
ORDER BY timestamp ASC
"""
@ -34,39 +38,23 @@ class Command(BaseCommand):
help = "Collect vacancies from telegram messages"
def handle(self, *args, **options):
next_page_offset = 0
exist_points_ids = [-1]
while next_page_offset is not None:
response = qdrant_client.scroll(
collection_name="vacancies",
limit=100_000,
offset=next_page_offset,
with_payload=False,
with_vectors=False,
timeout=30,
)
exist_points_ids.extend([point.id for point in response[0]])
next_page_offset = response[1]
exist_points_set = tuple(set(exist_points_ids))
response = qdrant_client.scroll(collection_name="vacancies", limit=1, order_by=OrderBy(key="timestamp", direction="desc"))
last_point_timestamp = datetime.now() - timedelta(days=30)
if response[0]:
last_point_timestamp = response[0][0].payload["timestamp"]
result_rows = clickhouse_client.query(query, parameters={"exist_points": exist_points_set}).result_rows
with ThreadPoolExecutor(max_workers=settings.COLLECT_VACANCIES_BATCH_SIZE) as pool:
pool.map(self._process_batch, batched(result_rows, settings.COLLECT_VACANCIES_BATCH_SIZE))
clickhouse_client = clickhouse_connect.create_client(host=settings.CLICKHOUSE_HOST, port=settings.CLICKHOUSE_PORT)
result_rows = clickhouse_client.query(query, parameters={"timestamp": last_point_timestamp}).result_rows
def _process_batch(self, result_rows):
try:
for index, row in enumerate(result_rows):
for index, rows in enumerate(batched(result_rows, settings.COLLECT_VACANCIES_BATCH_SIZE)):
vacancies_features = batch_extract_features([row[3] for row in rows])
print(f"Processing {index+1}/{len(result_rows)//settings.COLLECT_VACANCIES_BATCH_SIZE}")
with ThreadPoolExecutor() as pool:
vacancies_vectors = pool.map(embed_features, [vacancy_features.model_dump() for vacancy_features in vacancies_features])
for row, vacancy_features, vacancy_vectors in zip(rows, vacancies_features, vacancies_vectors):
(id, chat_username, telegram_id, message, timestamp) = row
link = f"https://t.me/{chat_username}/{telegram_id}"
print(f"Processing {index+1}/{len(result_rows)} link: {link}")
features = extract_features(message)
add_vectors(
"vacancies",
id,
features.model_dump(),
{'content': message, 'features_json': features.model_dump(), "link": link, "timestamp": timestamp},
)
except Exception as exc:
traceback.print_exception(exc)
payload = {'content': message, 'features_json': vacancy_features.model_dump(), "link": link, "timestamp": timestamp}
add_vectors("vacancies", id, vacancy_features.model_dump(), payload, vacancy_vectors)

View File

@ -1,12 +1,9 @@
from qdrant_client import models
from langchain_openai import OpenAIEmbeddings
from langchain_openai import ChatOpenAI
from qdrant_client import QdrantClient
from qdrant_client.models import Filter
from vacancies.main.models import VacancyFeatures
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from qdrant_client import QdrantClient, models
from qdrant_client.models import Filter, HasIdCondition
from vacancies.conf.settings import QDRANT_URL
from vacancies.main.models import RecommendedVacancy
from qdrant_client.models import HasIdCondition
from vacancies.main.models import RecommendedVacancy, VacancyFeatures
qdrant_client = QdrantClient(url=QDRANT_URL)
@ -16,19 +13,9 @@ FEATURE_NAMES = [
]
weights = {
"job_title": 25,
"employment_type": 5,
"work_format": 5,
"experience": 8,
"position_level": 12,
"industry": 10,
"tech_stack": 14,
"location": 5,
"salary_range": 5,
"languages": 5,
"education": 2,
"schedule": 2,
"additional_requirements": 2,
"job_title": 70,
"tech_stack": 10,
"salary_range": 10,
}
vectors_config = {
@ -38,18 +25,22 @@ vectors_config = {
if not qdrant_client.collection_exists("vacancies"):
qdrant_client.create_collection(
collection_name="vacancies",
vectors_config=vectors_config
vectors_config=vectors_config,
)
qdrant_client.create_payload_index(
collection_name="vacancies",
field_name="timestamp",
field_schema="datetime",
)
if not qdrant_client.collection_exists("cvs"):
qdrant_client.create_collection(
collection_name="cvs",
vectors_config=vectors_config
vectors_config=vectors_config,
)
embedding = OpenAIEmbeddings(model="text-embedding-3-large")
def _prepare_texts(features):
"""Prepare texts for each feature from features dict."""
texts = {}
for name in FEATURE_NAMES:
value = features.get(name)
@ -61,31 +52,21 @@ def _prepare_texts(features):
return texts
def add_vectors(collection_name: str, _id: int, features: dict, payload: dict):
"""Add vectors for a vacancy based on its features."""
texts = _prepare_texts(features)
vectors = {}
for name, text in texts.items():
vectors[name] = [0.0] * 3072
if text:
vec = embedding.embed_query(text)
vectors[name] = vec
def embed_features(features):
features = {key: value for key, value in features.items() if value}
features_texts = _prepare_texts(features)
names, texts = features_texts.keys(), features_texts.values()
vectors = dict(zip(names, embedding.embed_documents(texts)))
return vectors
def add_vectors(collection_name: str, _id: int, features: dict, payload: dict, vectors):
max_similarities = {}
for name, vec in vectors.items():
if any(v != 0 for v in vec):
results = qdrant_client.query_points(
collection_name="vacancies",
query=vec,
using=name,
limit=100,
)
for res in results.points:
vid = res.id
sim = res.score
if vid not in max_similarities:
max_similarities[vid] = {}
max_similarities[vid][name] = sim
results = qdrant_client.query_points(collection_name="vacancies", query=vec, using=name, limit=100)
for res in results.points:
max_similarities.setdefault(res.id, {})
max_similarities[res.id][name] = res.score
scored = []
for vid, feature_sims in max_similarities.items():
@ -93,77 +74,57 @@ def add_vectors(collection_name: str, _id: int, features: dict, payload: dict):
scored.append({"id": vid, "score": total})
scored.sort(key=lambda x: x["score"], reverse=True)
if scored and scored[0]["score"] > 90: # threshold
if scored and scored[0]["score"] > 80: # threshold
return
qdrant_client.upsert(
collection_name=collection_name,
points=[
models.PointStruct(
id=_id,
vector=vectors,
payload=payload,
)
]
points=[models.PointStruct(id=_id, vector=vectors, payload=payload)]
)
def search_similarities(query_filter: Filter, cv_id: int):
cv = qdrant_client.retrieve(
collection_name="cvs",
ids=[cv_id],
with_vectors=True,
)[0]
cv = qdrant_client.retrieve(collection_name="cvs", ids=[cv_id], with_vectors=True)[0]
max_similarities = {}
vacancies_content = {}
max_similarities, vacancies_content = {}, {}
for name, vec in cv.vector.items():
if any(v != 0 for v in vec):
results = qdrant_client.query_points(
collection_name="vacancies",
query=vec,
using=name,
limit=100,
with_payload=True,
query_filter=query_filter,
)
for res in results.points:
vid = res.id
sim = res.score
if vid not in max_similarities:
max_similarities[vid] = {}
max_similarities[vid][name] = sim
if vid not in vacancies_content:
vacancies_content[vid] = {}
vacancies_content[vid]["content"] = res.payload["content"]
vacancies_content[vid]["link"] = res.payload["link"]
results = qdrant_client.query_points(
collection_name="vacancies",
query=vec,
using=name,
limit=100000,
with_payload=True,
query_filter=query_filter,
)
for res in results.points:
max_similarities.setdefault(res.id, {})
vacancies_content.setdefault(res.id, {})
max_similarities[res.id][name] = res.score
vacancies_content[res.id]["content"] = res.payload["content"]
vacancies_content[res.id]["features_json"] = res.payload["features_json"]
vacancies_content[res.id]["link"] = res.payload["link"]
scored = []
for vid, feature_sims in max_similarities.items():
total = sum(feature_sims[feature] * weights.get(feature, 1) for feature in feature_sims)
scored.append({"id": vid, "score": total, "content": vacancies_content[vid]["content"], "link": vacancies_content[vid]["link"]})
scored.append({
"id": vid,
"score": total,
"content": vacancies_content[vid]["content"],
"features_json": vacancies_content[vid]["features_json"],
"link": vacancies_content[vid]["link"],
"sims": feature_sims,
})
scored.sort(key=lambda x: x["score"], reverse=True)
prompt = f"""
Резюме: {cv.payload['content']}
Среди вакансий ниже выбери одну наиболее релевантную и выведи ее индекс(от 0 до 9).
Иногда могут попадаться чужие резюме вместо вакансий, их отдавать нельзя.
В ответе выведи только число. Если среди вакансий нет подходящих, то верни -1.
{scored[:10]}
"""
openai_client = ChatOpenAI(model_name="gpt-5-mini", reasoning_effort="minimal", temperature=0, seed=42, top_p=1)
response = openai_client.invoke(prompt)
index = int(response.content)
if index == -1:
return None
return scored[index]["id"], scored[index]["content"], scored[index]["link"]
return scored[0]["id"], scored[0]["content"], scored[0]["link"]
def extract_features(content: str) -> VacancyFeatures:
prompt = f"""
def batch_extract_features(contents: list[str]) -> list[VacancyFeatures]:
prompts = [
f"""
Extract the following features from the job vacancy description. If a feature is not mentioned, set it to null.
Features:
- job_title: Должность (e.g., DevOps, Python программист)
@ -182,9 +143,11 @@ def extract_features(content: str) -> VacancyFeatures:
Vacancy content:
{content}
"""
for content in contents
]
openai_client = ChatOpenAI(model_name="gpt-5-mini", reasoning_effort="minimal", temperature=0, seed=42, top_p=1)
structured_llm = openai_client.with_structured_output(VacancyFeatures)
response = structured_llm.invoke(prompt)
response = structured_llm.batch(prompts)
return response