Index vacancies using batches
All checks were successful
release / docker (push) Successful in 37s

This commit is contained in:
estromenko 2025-11-02 22:29:18 +03:00
parent 9da00d5d1d
commit d4b28b8e9f

View File

@ -1,3 +1,7 @@
import traceback
from itertools import batched
from concurrent.futures import ThreadPoolExecutor
from django.core.management import BaseCommand from django.core.management import BaseCommand
import clickhouse_connect import clickhouse_connect
from vacancies.main.vector_store import add_vectors, extract_features, qdrant_client from vacancies.main.vector_store import add_vectors, extract_features, qdrant_client
@ -45,12 +49,17 @@ class Command(BaseCommand):
exist_points_set = tuple(set(exist_points_ids)) exist_points_set = tuple(set(exist_points_ids))
result_rows = clickhouse_client.query(query, parameters={"exist_points": exist_points_set}).result_rows result_rows = clickhouse_client.query(query, parameters={"exist_points": exist_points_set}).result_rows
result_rows_len = len(result_rows) batch_size = 10
with ThreadPoolExecutor(max_workers=batch_size) as pool:
pool.map(self._process_batch, batched(result_rows, batch_size))
def _process_batch(self, result_rows):
try:
for index, row in enumerate(result_rows): for index, row in enumerate(result_rows):
(id, chat_username, telegram_id, message, timestamp) = row (id, chat_username, telegram_id, message, timestamp) = row
link = f"https://t.me/{chat_username}/{telegram_id}" link = f"https://t.me/{chat_username}/{telegram_id}"
print(f"Processing {index+1}/{result_rows_len} link: {link}") print(f"Processing {index+1}/{len(result_rows)} link: {link}")
features = extract_features(message) features = extract_features(message)
add_vectors( add_vectors(
@ -59,3 +68,5 @@ class Command(BaseCommand):
features.model_dump(), features.model_dump(),
{'content': message, 'features_json': features.model_dump(), "link": link, "timestamp": timestamp}, {'content': message, 'features_json': features.model_dump(), "link": link, "timestamp": timestamp},
) )
except Exception as exc:
traceback.print_exception(exc)