Compare commits

..

No commits in common. "195c77908828113a0959a8f53ad7b9e1019eb9d3" and "7cce1cdc04136dc546f12b1386811e7d6026aa01" have entirely different histories.

4 changed files with 773 additions and 742 deletions

1249
uv.lock

File diff suppressed because it is too large Load Diff

View File

@ -1,34 +1,15 @@
import io
import os
import io
import traceback
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardMarkup, KeyboardButton
from telegram.ext import filters, ApplicationBuilder, MessageHandler, CommandHandler, ContextTypes
from pypdf import PdfReader
from vacancies.main.models import Customer, CustomerCV
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from pypdf import PdfReader
from telegram import (
InlineKeyboardButton,
InlineKeyboardMarkup,
KeyboardButton,
ReplyKeyboardMarkup,
Update,
)
from telegram.ext import (
ApplicationBuilder,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
)
from vacancies.main.vector_store import add_vectors, extract_features, get_next_vacancy
from vacancies.conf.settings import DB_URI
from vacancies.main.models import Customer, CustomerCV
from vacancies.main.vector_store import (
add_vectors,
batch_extract_features,
get_next_vacancy,
embed_features,
)
SYSTEM_PROMPT = """
Ты карьерный копилот для ИТ. Ты можешь отвечать на любые вопросы по тематике карьеры.
@ -126,11 +107,11 @@ async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE):
customer_cv, _ = await CustomerCV.objects.aupdate_or_create(customer=customer, defaults=dict(
content=resume,
))
features = batch_extract_features(customer_cv.content)[0]
features = extract_features(customer_cv.content)
add_vectors(
"cvs",
customer_cv.id,
embed_features(features.model_dump())[0],
features.model_dump(),
{'content': customer_cv.content, 'features_json': features.model_dump()},
)

View File

@ -1,23 +1,19 @@
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
import traceback
from itertools import batched
from concurrent.futures import ThreadPoolExecutor
import clickhouse_connect
from django.core.management import BaseCommand
from django.conf import settings
from qdrant_client.models import OrderBy
import clickhouse_connect
from vacancies.main.vector_store import add_vectors, extract_features, qdrant_client
from vacancies.conf.settings import CLICKHOUSE_HOST, CLICKHOUSE_PORT
from vacancies.main.vector_store import (
add_vectors,
batch_extract_features,
embed_features,
qdrant_client,
)
clickhouse_client = clickhouse_connect.create_client(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT)
query = """
SELECT DISTINCT ON (message) id, chat_username, telegram_id, message, timestamp
SELECT id, chat_username, telegram_id, message, timestamp
FROM telegram_parser_chatmessage
WHERE timestamp >= %(timestamp)s
WHERE timestamp >= now() - INTERVAL 30 DAY
AND length(message) > 150
AND arrayCount(x -> position(message, x) > 0, [
'ваканси', 'ищем', 'требуется', 'разработчик', 'будет плюсом',
@ -30,7 +26,7 @@ WHERE timestamp >= %(timestamp)s
AND arrayCount(x -> position(lower(message), x) > 0, [
'о себе', 'обо мне', 'умею', '#ищу', '#резюме', 'университет', 'колледж'
]) = 0
ORDER BY timestamp ASC
AND id NOT IN %(exist_points)s
"""
@ -38,23 +34,39 @@ class Command(BaseCommand):
help = "Collect vacancies from telegram messages"
def handle(self, *args, **options):
response = qdrant_client.scroll(collection_name="vacancies", limit=1, order_by=OrderBy(key="timestamp", direction="desc"))
last_point_timestamp = datetime.now() - timedelta(days=30)
if response[0]:
last_point_timestamp = response[0][0].payload["timestamp"]
next_page_offset = 0
exist_points_ids = [-1]
while next_page_offset is not None:
response = qdrant_client.scroll(
collection_name="vacancies",
limit=100_000,
offset=next_page_offset,
with_payload=False,
with_vectors=False,
timeout=30,
)
exist_points_ids.extend([point.id for point in response[0]])
next_page_offset = response[1]
exist_points_set = tuple(set(exist_points_ids))
clickhouse_client = clickhouse_connect.create_client(host=settings.CLICKHOUSE_HOST, port=settings.CLICKHOUSE_PORT)
result_rows = clickhouse_client.query(query, parameters={"timestamp": last_point_timestamp}).result_rows
result_rows = clickhouse_client.query(query, parameters={"exist_points": exist_points_set}).result_rows
with ThreadPoolExecutor(max_workers=settings.COLLECT_VACANCIES_BATCH_SIZE) as pool:
pool.map(self._process_batch, batched(result_rows, settings.COLLECT_VACANCIES_BATCH_SIZE))
for index, rows in enumerate(batched(result_rows, settings.COLLECT_VACANCIES_BATCH_SIZE)):
vacancies_features = batch_extract_features([row[3] for row in rows])
print(f"Processing {index+1}/{len(result_rows)//settings.COLLECT_VACANCIES_BATCH_SIZE}")
with ThreadPoolExecutor() as pool:
vacancies_vectors = pool.map(embed_features, [vacancy_features.model_dump() for vacancy_features in vacancies_features])
for row, vacancy_features, vacancy_vectors in zip(rows, vacancies_features, vacancies_vectors):
def _process_batch(self, result_rows):
try:
for index, row in enumerate(result_rows):
(id, chat_username, telegram_id, message, timestamp) = row
link = f"https://t.me/{chat_username}/{telegram_id}"
payload = {'content': message, 'features_json': vacancy_features.model_dump(), "link": link, "timestamp": timestamp}
add_vectors("vacancies", id, vacancy_features.model_dump(), payload, vacancy_vectors)
print(f"Processing {index+1}/{len(result_rows)} link: {link}")
features = extract_features(message)
add_vectors(
"vacancies",
id,
features.model_dump(),
{'content': message, 'features_json': features.model_dump(), "link": link, "timestamp": timestamp},
)
except Exception as exc:
traceback.print_exception(exc)

View File

@ -1,9 +1,12 @@
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from qdrant_client import QdrantClient, models
from qdrant_client.models import Filter, HasIdCondition
from qdrant_client import models
from langchain_openai import OpenAIEmbeddings
from langchain_openai import ChatOpenAI
from qdrant_client import QdrantClient
from qdrant_client.models import Filter
from vacancies.main.models import VacancyFeatures
from vacancies.conf.settings import QDRANT_URL
from vacancies.main.models import RecommendedVacancy, VacancyFeatures
from vacancies.main.models import RecommendedVacancy
from qdrant_client.models import HasIdCondition
qdrant_client = QdrantClient(url=QDRANT_URL)
@ -13,9 +16,19 @@ FEATURE_NAMES = [
]
weights = {
"job_title": 70,
"tech_stack": 10,
"salary_range": 10,
"job_title": 25,
"employment_type": 5,
"work_format": 5,
"experience": 8,
"position_level": 12,
"industry": 10,
"tech_stack": 14,
"location": 5,
"salary_range": 5,
"languages": 5,
"education": 2,
"schedule": 2,
"additional_requirements": 2,
}
vectors_config = {
@ -25,22 +38,18 @@ vectors_config = {
if not qdrant_client.collection_exists("vacancies"):
qdrant_client.create_collection(
collection_name="vacancies",
vectors_config=vectors_config,
)
qdrant_client.create_payload_index(
collection_name="vacancies",
field_name="timestamp",
field_schema="datetime",
vectors_config=vectors_config
)
if not qdrant_client.collection_exists("cvs"):
qdrant_client.create_collection(
collection_name="cvs",
vectors_config=vectors_config,
vectors_config=vectors_config
)
embedding = OpenAIEmbeddings(model="text-embedding-3-large")
def _prepare_texts(features):
"""Prepare texts for each feature from features dict."""
texts = {}
for name in FEATURE_NAMES:
value = features.get(name)
@ -52,21 +61,31 @@ def _prepare_texts(features):
return texts
def embed_features(features):
features = {key: value for key, value in features.items() if value}
features_texts = _prepare_texts(features)
names, texts = features_texts.keys(), features_texts.values()
vectors = dict(zip(names, embedding.embed_documents(texts)))
return vectors
def add_vectors(collection_name: str, _id: int, features: dict, payload: dict):
"""Add vectors for a vacancy based on its features."""
texts = _prepare_texts(features)
vectors = {}
for name, text in texts.items():
vectors[name] = [0.0] * 3072
if text:
vec = embedding.embed_query(text)
vectors[name] = vec
def add_vectors(collection_name: str, _id: int, features: dict, payload: dict, vectors):
max_similarities = {}
for name, vec in vectors.items():
results = qdrant_client.query_points(collection_name="vacancies", query=vec, using=name, limit=100)
for res in results.points:
max_similarities.setdefault(res.id, {})
max_similarities[res.id][name] = res.score
if any(v != 0 for v in vec):
results = qdrant_client.query_points(
collection_name="vacancies",
query=vec,
using=name,
limit=100,
)
for res in results.points:
vid = res.id
sim = res.score
if vid not in max_similarities:
max_similarities[vid] = {}
max_similarities[vid][name] = sim
scored = []
for vid, feature_sims in max_similarities.items():
@ -74,57 +93,77 @@ def add_vectors(collection_name: str, _id: int, features: dict, payload: dict, v
scored.append({"id": vid, "score": total})
scored.sort(key=lambda x: x["score"], reverse=True)
if scored and scored[0]["score"] > 80: # threshold
if scored and scored[0]["score"] > 90: # threshold
return
qdrant_client.upsert(
collection_name=collection_name,
points=[models.PointStruct(id=_id, vector=vectors, payload=payload)]
points=[
models.PointStruct(
id=_id,
vector=vectors,
payload=payload,
)
]
)
def search_similarities(query_filter: Filter, cv_id: int):
cv = qdrant_client.retrieve(collection_name="cvs", ids=[cv_id], with_vectors=True)[0]
cv = qdrant_client.retrieve(
collection_name="cvs",
ids=[cv_id],
with_vectors=True,
)[0]
max_similarities, vacancies_content = {}, {}
max_similarities = {}
vacancies_content = {}
for name, vec in cv.vector.items():
results = qdrant_client.query_points(
collection_name="vacancies",
query=vec,
using=name,
limit=100000,
with_payload=True,
query_filter=query_filter,
)
for res in results.points:
max_similarities.setdefault(res.id, {})
vacancies_content.setdefault(res.id, {})
max_similarities[res.id][name] = res.score
vacancies_content[res.id]["content"] = res.payload["content"]
vacancies_content[res.id]["features_json"] = res.payload["features_json"]
vacancies_content[res.id]["link"] = res.payload["link"]
if any(v != 0 for v in vec):
results = qdrant_client.query_points(
collection_name="vacancies",
query=vec,
using=name,
limit=100,
with_payload=True,
query_filter=query_filter,
)
for res in results.points:
vid = res.id
sim = res.score
if vid not in max_similarities:
max_similarities[vid] = {}
max_similarities[vid][name] = sim
if vid not in vacancies_content:
vacancies_content[vid] = {}
vacancies_content[vid]["content"] = res.payload["content"]
vacancies_content[vid]["link"] = res.payload["link"]
scored = []
for vid, feature_sims in max_similarities.items():
total = sum(feature_sims[feature] * weights.get(feature, 1) for feature in feature_sims)
scored.append({
"id": vid,
"score": total,
"content": vacancies_content[vid]["content"],
"features_json": vacancies_content[vid]["features_json"],
"link": vacancies_content[vid]["link"],
"sims": feature_sims,
})
scored.append({"id": vid, "score": total, "content": vacancies_content[vid]["content"], "link": vacancies_content[vid]["link"]})
scored.sort(key=lambda x: x["score"], reverse=True)
return scored[0]["id"], scored[0]["content"], scored[0]["link"]
prompt = f"""
Резюме: {cv.payload['content']}
Среди вакансий ниже выбери одну наиболее релевантную и выведи ее индекс(от 0 до 9).
Иногда могут попадаться чужие резюме вместо вакансий, их отдавать нельзя.
В ответе выведи только число. Если среди вакансий нет подходящих, то верни -1.
{scored[:10]}
"""
openai_client = ChatOpenAI(model_name="gpt-5-mini", reasoning_effort="minimal", temperature=0, seed=42, top_p=1)
response = openai_client.invoke(prompt)
index = int(response.content)
if index == -1:
return None
return scored[index]["id"], scored[index]["content"], scored[index]["link"]
def batch_extract_features(contents: list[str]) -> list[VacancyFeatures]:
prompts = [
f"""
def extract_features(content: str) -> VacancyFeatures:
prompt = f"""
Extract the following features from the job vacancy description. If a feature is not mentioned, set it to null.
Features:
- job_title: Должность (e.g., DevOps, Python программист)
@ -143,11 +182,9 @@ def batch_extract_features(contents: list[str]) -> list[VacancyFeatures]:
Vacancy content:
{content}
"""
for content in contents
]
openai_client = ChatOpenAI(model_name="gpt-5-mini", reasoning_effort="minimal", temperature=0, seed=42, top_p=1)
structured_llm = openai_client.with_structured_output(VacancyFeatures)
response = structured_llm.batch(prompts)
response = structured_llm.invoke(prompt)
return response