Compare commits
2 Commits
7cce1cdc04
...
195c779088
| Author | SHA1 | Date | |
|---|---|---|---|
| 195c779088 | |||
| df33ce79bb |
@ -1,15 +1,34 @@
|
|||||||
import os
|
|
||||||
import io
|
import io
|
||||||
|
import os
|
||||||
import traceback
|
import traceback
|
||||||
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update, ReplyKeyboardMarkup, KeyboardButton
|
|
||||||
from telegram.ext import filters, ApplicationBuilder, MessageHandler, CommandHandler, ContextTypes
|
|
||||||
from pypdf import PdfReader
|
|
||||||
from vacancies.main.models import Customer, CustomerCV
|
|
||||||
from langchain.agents import create_agent
|
from langchain.agents import create_agent
|
||||||
from langchain_openai import ChatOpenAI
|
from langchain_openai import ChatOpenAI
|
||||||
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
|
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
|
||||||
from vacancies.main.vector_store import add_vectors, extract_features, get_next_vacancy
|
from pypdf import PdfReader
|
||||||
|
from telegram import (
|
||||||
|
InlineKeyboardButton,
|
||||||
|
InlineKeyboardMarkup,
|
||||||
|
KeyboardButton,
|
||||||
|
ReplyKeyboardMarkup,
|
||||||
|
Update,
|
||||||
|
)
|
||||||
|
from telegram.ext import (
|
||||||
|
ApplicationBuilder,
|
||||||
|
CommandHandler,
|
||||||
|
ContextTypes,
|
||||||
|
MessageHandler,
|
||||||
|
filters,
|
||||||
|
)
|
||||||
|
|
||||||
from vacancies.conf.settings import DB_URI
|
from vacancies.conf.settings import DB_URI
|
||||||
|
from vacancies.main.models import Customer, CustomerCV
|
||||||
|
from vacancies.main.vector_store import (
|
||||||
|
add_vectors,
|
||||||
|
batch_extract_features,
|
||||||
|
get_next_vacancy,
|
||||||
|
embed_features,
|
||||||
|
)
|
||||||
|
|
||||||
SYSTEM_PROMPT = """
|
SYSTEM_PROMPT = """
|
||||||
Ты — карьерный копилот для ИТ. Ты можешь отвечать на любые вопросы по тематике карьеры.
|
Ты — карьерный копилот для ИТ. Ты можешь отвечать на любые вопросы по тематике карьеры.
|
||||||
@ -107,11 +126,11 @@ async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|||||||
customer_cv, _ = await CustomerCV.objects.aupdate_or_create(customer=customer, defaults=dict(
|
customer_cv, _ = await CustomerCV.objects.aupdate_or_create(customer=customer, defaults=dict(
|
||||||
content=resume,
|
content=resume,
|
||||||
))
|
))
|
||||||
features = extract_features(customer_cv.content)
|
features = batch_extract_features(customer_cv.content)[0]
|
||||||
add_vectors(
|
add_vectors(
|
||||||
"cvs",
|
"cvs",
|
||||||
customer_cv.id,
|
customer_cv.id,
|
||||||
features.model_dump(),
|
embed_features(features.model_dump())[0],
|
||||||
{'content': customer_cv.content, 'features_json': features.model_dump()},
|
{'content': customer_cv.content, 'features_json': features.model_dump()},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@ -1,19 +1,23 @@
|
|||||||
import traceback
|
|
||||||
from itertools import batched
|
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from itertools import batched
|
||||||
|
|
||||||
|
import clickhouse_connect
|
||||||
from django.core.management import BaseCommand
|
from django.core.management import BaseCommand
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
import clickhouse_connect
|
from qdrant_client.models import OrderBy
|
||||||
from vacancies.main.vector_store import add_vectors, extract_features, qdrant_client
|
|
||||||
from vacancies.conf.settings import CLICKHOUSE_HOST, CLICKHOUSE_PORT
|
|
||||||
|
|
||||||
clickhouse_client = clickhouse_connect.create_client(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT)
|
from vacancies.main.vector_store import (
|
||||||
|
add_vectors,
|
||||||
|
batch_extract_features,
|
||||||
|
embed_features,
|
||||||
|
qdrant_client,
|
||||||
|
)
|
||||||
|
|
||||||
query = """
|
query = """
|
||||||
SELECT id, chat_username, telegram_id, message, timestamp
|
SELECT DISTINCT ON (message) id, chat_username, telegram_id, message, timestamp
|
||||||
FROM telegram_parser_chatmessage
|
FROM telegram_parser_chatmessage
|
||||||
WHERE timestamp >= now() - INTERVAL 30 DAY
|
WHERE timestamp >= %(timestamp)s
|
||||||
AND length(message) > 150
|
AND length(message) > 150
|
||||||
AND arrayCount(x -> position(message, x) > 0, [
|
AND arrayCount(x -> position(message, x) > 0, [
|
||||||
'ваканси', 'ищем', 'требуется', 'разработчик', 'будет плюсом',
|
'ваканси', 'ищем', 'требуется', 'разработчик', 'будет плюсом',
|
||||||
@ -26,7 +30,7 @@ WHERE timestamp >= now() - INTERVAL 30 DAY
|
|||||||
AND arrayCount(x -> position(lower(message), x) > 0, [
|
AND arrayCount(x -> position(lower(message), x) > 0, [
|
||||||
'о себе', 'обо мне', 'умею', '#ищу', '#резюме', 'университет', 'колледж'
|
'о себе', 'обо мне', 'умею', '#ищу', '#резюме', 'университет', 'колледж'
|
||||||
]) = 0
|
]) = 0
|
||||||
AND id NOT IN %(exist_points)s
|
ORDER BY timestamp ASC
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
@ -34,39 +38,23 @@ class Command(BaseCommand):
|
|||||||
help = "Collect vacancies from telegram messages"
|
help = "Collect vacancies from telegram messages"
|
||||||
|
|
||||||
def handle(self, *args, **options):
|
def handle(self, *args, **options):
|
||||||
next_page_offset = 0
|
response = qdrant_client.scroll(collection_name="vacancies", limit=1, order_by=OrderBy(key="timestamp", direction="desc"))
|
||||||
exist_points_ids = [-1]
|
last_point_timestamp = datetime.now() - timedelta(days=30)
|
||||||
while next_page_offset is not None:
|
if response[0]:
|
||||||
response = qdrant_client.scroll(
|
last_point_timestamp = response[0][0].payload["timestamp"]
|
||||||
collection_name="vacancies",
|
|
||||||
limit=100_000,
|
|
||||||
offset=next_page_offset,
|
|
||||||
with_payload=False,
|
|
||||||
with_vectors=False,
|
|
||||||
timeout=30,
|
|
||||||
)
|
|
||||||
exist_points_ids.extend([point.id for point in response[0]])
|
|
||||||
next_page_offset = response[1]
|
|
||||||
exist_points_set = tuple(set(exist_points_ids))
|
|
||||||
|
|
||||||
result_rows = clickhouse_client.query(query, parameters={"exist_points": exist_points_set}).result_rows
|
clickhouse_client = clickhouse_connect.create_client(host=settings.CLICKHOUSE_HOST, port=settings.CLICKHOUSE_PORT)
|
||||||
with ThreadPoolExecutor(max_workers=settings.COLLECT_VACANCIES_BATCH_SIZE) as pool:
|
result_rows = clickhouse_client.query(query, parameters={"timestamp": last_point_timestamp}).result_rows
|
||||||
pool.map(self._process_batch, batched(result_rows, settings.COLLECT_VACANCIES_BATCH_SIZE))
|
|
||||||
|
|
||||||
def _process_batch(self, result_rows):
|
for index, rows in enumerate(batched(result_rows, settings.COLLECT_VACANCIES_BATCH_SIZE)):
|
||||||
try:
|
vacancies_features = batch_extract_features([row[3] for row in rows])
|
||||||
for index, row in enumerate(result_rows):
|
|
||||||
|
print(f"Processing {index+1}/{len(result_rows)//settings.COLLECT_VACANCIES_BATCH_SIZE}")
|
||||||
|
with ThreadPoolExecutor() as pool:
|
||||||
|
vacancies_vectors = pool.map(embed_features, [vacancy_features.model_dump() for vacancy_features in vacancies_features])
|
||||||
|
|
||||||
|
for row, vacancy_features, vacancy_vectors in zip(rows, vacancies_features, vacancies_vectors):
|
||||||
(id, chat_username, telegram_id, message, timestamp) = row
|
(id, chat_username, telegram_id, message, timestamp) = row
|
||||||
|
|
||||||
link = f"https://t.me/{chat_username}/{telegram_id}"
|
link = f"https://t.me/{chat_username}/{telegram_id}"
|
||||||
print(f"Processing {index+1}/{len(result_rows)} link: {link}")
|
payload = {'content': message, 'features_json': vacancy_features.model_dump(), "link": link, "timestamp": timestamp}
|
||||||
features = extract_features(message)
|
add_vectors("vacancies", id, vacancy_features.model_dump(), payload, vacancy_vectors)
|
||||||
|
|
||||||
add_vectors(
|
|
||||||
"vacancies",
|
|
||||||
id,
|
|
||||||
features.model_dump(),
|
|
||||||
{'content': message, 'features_json': features.model_dump(), "link": link, "timestamp": timestamp},
|
|
||||||
)
|
|
||||||
except Exception as exc:
|
|
||||||
traceback.print_exception(exc)
|
|
||||||
|
|||||||
@ -1,12 +1,9 @@
|
|||||||
from qdrant_client import models
|
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
|
||||||
from langchain_openai import OpenAIEmbeddings
|
from qdrant_client import QdrantClient, models
|
||||||
from langchain_openai import ChatOpenAI
|
from qdrant_client.models import Filter, HasIdCondition
|
||||||
from qdrant_client import QdrantClient
|
|
||||||
from qdrant_client.models import Filter
|
|
||||||
from vacancies.main.models import VacancyFeatures
|
|
||||||
from vacancies.conf.settings import QDRANT_URL
|
from vacancies.conf.settings import QDRANT_URL
|
||||||
from vacancies.main.models import RecommendedVacancy
|
from vacancies.main.models import RecommendedVacancy, VacancyFeatures
|
||||||
from qdrant_client.models import HasIdCondition
|
|
||||||
|
|
||||||
qdrant_client = QdrantClient(url=QDRANT_URL)
|
qdrant_client = QdrantClient(url=QDRANT_URL)
|
||||||
|
|
||||||
@ -16,19 +13,9 @@ FEATURE_NAMES = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
weights = {
|
weights = {
|
||||||
"job_title": 25,
|
"job_title": 70,
|
||||||
"employment_type": 5,
|
"tech_stack": 10,
|
||||||
"work_format": 5,
|
"salary_range": 10,
|
||||||
"experience": 8,
|
|
||||||
"position_level": 12,
|
|
||||||
"industry": 10,
|
|
||||||
"tech_stack": 14,
|
|
||||||
"location": 5,
|
|
||||||
"salary_range": 5,
|
|
||||||
"languages": 5,
|
|
||||||
"education": 2,
|
|
||||||
"schedule": 2,
|
|
||||||
"additional_requirements": 2,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
vectors_config = {
|
vectors_config = {
|
||||||
@ -38,18 +25,22 @@ vectors_config = {
|
|||||||
if not qdrant_client.collection_exists("vacancies"):
|
if not qdrant_client.collection_exists("vacancies"):
|
||||||
qdrant_client.create_collection(
|
qdrant_client.create_collection(
|
||||||
collection_name="vacancies",
|
collection_name="vacancies",
|
||||||
vectors_config=vectors_config
|
vectors_config=vectors_config,
|
||||||
|
)
|
||||||
|
qdrant_client.create_payload_index(
|
||||||
|
collection_name="vacancies",
|
||||||
|
field_name="timestamp",
|
||||||
|
field_schema="datetime",
|
||||||
)
|
)
|
||||||
if not qdrant_client.collection_exists("cvs"):
|
if not qdrant_client.collection_exists("cvs"):
|
||||||
qdrant_client.create_collection(
|
qdrant_client.create_collection(
|
||||||
collection_name="cvs",
|
collection_name="cvs",
|
||||||
vectors_config=vectors_config
|
vectors_config=vectors_config,
|
||||||
)
|
)
|
||||||
|
|
||||||
embedding = OpenAIEmbeddings(model="text-embedding-3-large")
|
embedding = OpenAIEmbeddings(model="text-embedding-3-large")
|
||||||
|
|
||||||
def _prepare_texts(features):
|
def _prepare_texts(features):
|
||||||
"""Prepare texts for each feature from features dict."""
|
|
||||||
texts = {}
|
texts = {}
|
||||||
for name in FEATURE_NAMES:
|
for name in FEATURE_NAMES:
|
||||||
value = features.get(name)
|
value = features.get(name)
|
||||||
@ -61,31 +52,21 @@ def _prepare_texts(features):
|
|||||||
return texts
|
return texts
|
||||||
|
|
||||||
|
|
||||||
def add_vectors(collection_name: str, _id: int, features: dict, payload: dict):
|
def embed_features(features):
|
||||||
"""Add vectors for a vacancy based on its features."""
|
features = {key: value for key, value in features.items() if value}
|
||||||
texts = _prepare_texts(features)
|
features_texts = _prepare_texts(features)
|
||||||
vectors = {}
|
names, texts = features_texts.keys(), features_texts.values()
|
||||||
for name, text in texts.items():
|
vectors = dict(zip(names, embedding.embed_documents(texts)))
|
||||||
vectors[name] = [0.0] * 3072
|
return vectors
|
||||||
if text:
|
|
||||||
vec = embedding.embed_query(text)
|
|
||||||
vectors[name] = vec
|
|
||||||
|
|
||||||
|
|
||||||
|
def add_vectors(collection_name: str, _id: int, features: dict, payload: dict, vectors):
|
||||||
max_similarities = {}
|
max_similarities = {}
|
||||||
for name, vec in vectors.items():
|
for name, vec in vectors.items():
|
||||||
if any(v != 0 for v in vec):
|
results = qdrant_client.query_points(collection_name="vacancies", query=vec, using=name, limit=100)
|
||||||
results = qdrant_client.query_points(
|
for res in results.points:
|
||||||
collection_name="vacancies",
|
max_similarities.setdefault(res.id, {})
|
||||||
query=vec,
|
max_similarities[res.id][name] = res.score
|
||||||
using=name,
|
|
||||||
limit=100,
|
|
||||||
)
|
|
||||||
for res in results.points:
|
|
||||||
vid = res.id
|
|
||||||
sim = res.score
|
|
||||||
if vid not in max_similarities:
|
|
||||||
max_similarities[vid] = {}
|
|
||||||
max_similarities[vid][name] = sim
|
|
||||||
|
|
||||||
scored = []
|
scored = []
|
||||||
for vid, feature_sims in max_similarities.items():
|
for vid, feature_sims in max_similarities.items():
|
||||||
@ -93,77 +74,57 @@ def add_vectors(collection_name: str, _id: int, features: dict, payload: dict):
|
|||||||
scored.append({"id": vid, "score": total})
|
scored.append({"id": vid, "score": total})
|
||||||
|
|
||||||
scored.sort(key=lambda x: x["score"], reverse=True)
|
scored.sort(key=lambda x: x["score"], reverse=True)
|
||||||
if scored and scored[0]["score"] > 90: # threshold
|
if scored and scored[0]["score"] > 80: # threshold
|
||||||
return
|
return
|
||||||
|
|
||||||
qdrant_client.upsert(
|
qdrant_client.upsert(
|
||||||
collection_name=collection_name,
|
collection_name=collection_name,
|
||||||
points=[
|
points=[models.PointStruct(id=_id, vector=vectors, payload=payload)]
|
||||||
models.PointStruct(
|
|
||||||
id=_id,
|
|
||||||
vector=vectors,
|
|
||||||
payload=payload,
|
|
||||||
)
|
|
||||||
]
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def search_similarities(query_filter: Filter, cv_id: int):
|
def search_similarities(query_filter: Filter, cv_id: int):
|
||||||
cv = qdrant_client.retrieve(
|
cv = qdrant_client.retrieve(collection_name="cvs", ids=[cv_id], with_vectors=True)[0]
|
||||||
collection_name="cvs",
|
|
||||||
ids=[cv_id],
|
|
||||||
with_vectors=True,
|
|
||||||
)[0]
|
|
||||||
|
|
||||||
max_similarities = {}
|
max_similarities, vacancies_content = {}, {}
|
||||||
vacancies_content = {}
|
|
||||||
for name, vec in cv.vector.items():
|
for name, vec in cv.vector.items():
|
||||||
if any(v != 0 for v in vec):
|
results = qdrant_client.query_points(
|
||||||
results = qdrant_client.query_points(
|
collection_name="vacancies",
|
||||||
collection_name="vacancies",
|
query=vec,
|
||||||
query=vec,
|
using=name,
|
||||||
using=name,
|
limit=100000,
|
||||||
limit=100,
|
with_payload=True,
|
||||||
with_payload=True,
|
query_filter=query_filter,
|
||||||
query_filter=query_filter,
|
)
|
||||||
)
|
for res in results.points:
|
||||||
for res in results.points:
|
max_similarities.setdefault(res.id, {})
|
||||||
vid = res.id
|
vacancies_content.setdefault(res.id, {})
|
||||||
sim = res.score
|
|
||||||
if vid not in max_similarities:
|
max_similarities[res.id][name] = res.score
|
||||||
max_similarities[vid] = {}
|
vacancies_content[res.id]["content"] = res.payload["content"]
|
||||||
max_similarities[vid][name] = sim
|
vacancies_content[res.id]["features_json"] = res.payload["features_json"]
|
||||||
if vid not in vacancies_content:
|
vacancies_content[res.id]["link"] = res.payload["link"]
|
||||||
vacancies_content[vid] = {}
|
|
||||||
vacancies_content[vid]["content"] = res.payload["content"]
|
|
||||||
vacancies_content[vid]["link"] = res.payload["link"]
|
|
||||||
|
|
||||||
scored = []
|
scored = []
|
||||||
for vid, feature_sims in max_similarities.items():
|
for vid, feature_sims in max_similarities.items():
|
||||||
total = sum(feature_sims[feature] * weights.get(feature, 1) for feature in feature_sims)
|
total = sum(feature_sims[feature] * weights.get(feature, 1) for feature in feature_sims)
|
||||||
scored.append({"id": vid, "score": total, "content": vacancies_content[vid]["content"], "link": vacancies_content[vid]["link"]})
|
scored.append({
|
||||||
|
"id": vid,
|
||||||
|
"score": total,
|
||||||
|
"content": vacancies_content[vid]["content"],
|
||||||
|
"features_json": vacancies_content[vid]["features_json"],
|
||||||
|
"link": vacancies_content[vid]["link"],
|
||||||
|
"sims": feature_sims,
|
||||||
|
})
|
||||||
|
|
||||||
scored.sort(key=lambda x: x["score"], reverse=True)
|
scored.sort(key=lambda x: x["score"], reverse=True)
|
||||||
|
|
||||||
prompt = f"""
|
return scored[0]["id"], scored[0]["content"], scored[0]["link"]
|
||||||
Резюме: {cv.payload['content']}
|
|
||||||
|
|
||||||
Среди вакансий ниже выбери одну наиболее релевантную и выведи ее индекс(от 0 до 9).
|
|
||||||
Иногда могут попадаться чужие резюме вместо вакансий, их отдавать нельзя.
|
|
||||||
В ответе выведи только число. Если среди вакансий нет подходящих, то верни -1.
|
|
||||||
{scored[:10]}
|
|
||||||
"""
|
|
||||||
openai_client = ChatOpenAI(model_name="gpt-5-mini", reasoning_effort="minimal", temperature=0, seed=42, top_p=1)
|
|
||||||
response = openai_client.invoke(prompt)
|
|
||||||
index = int(response.content)
|
|
||||||
if index == -1:
|
|
||||||
return None
|
|
||||||
|
|
||||||
return scored[index]["id"], scored[index]["content"], scored[index]["link"]
|
|
||||||
|
|
||||||
|
|
||||||
def extract_features(content: str) -> VacancyFeatures:
|
def batch_extract_features(contents: list[str]) -> list[VacancyFeatures]:
|
||||||
prompt = f"""
|
prompts = [
|
||||||
|
f"""
|
||||||
Extract the following features from the job vacancy description. If a feature is not mentioned, set it to null.
|
Extract the following features from the job vacancy description. If a feature is not mentioned, set it to null.
|
||||||
Features:
|
Features:
|
||||||
- job_title: Должность (e.g., DevOps, Python программист)
|
- job_title: Должность (e.g., DevOps, Python программист)
|
||||||
@ -182,9 +143,11 @@ def extract_features(content: str) -> VacancyFeatures:
|
|||||||
Vacancy content:
|
Vacancy content:
|
||||||
{content}
|
{content}
|
||||||
"""
|
"""
|
||||||
|
for content in contents
|
||||||
|
]
|
||||||
openai_client = ChatOpenAI(model_name="gpt-5-mini", reasoning_effort="minimal", temperature=0, seed=42, top_p=1)
|
openai_client = ChatOpenAI(model_name="gpt-5-mini", reasoning_effort="minimal", temperature=0, seed=42, top_p=1)
|
||||||
structured_llm = openai_client.with_structured_output(VacancyFeatures)
|
structured_llm = openai_client.with_structured_output(VacancyFeatures)
|
||||||
response = structured_llm.invoke(prompt)
|
response = structured_llm.batch(prompts)
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user