diff --git a/vacancies/main/management/commands/collect_vacancies_from_telegram_messages.py b/vacancies/main/management/commands/collect_vacancies_from_telegram_messages.py index d22d7ce..122018d 100644 --- a/vacancies/main/management/commands/collect_vacancies_from_telegram_messages.py +++ b/vacancies/main/management/commands/collect_vacancies_from_telegram_messages.py @@ -1,3 +1,7 @@ +import traceback +from itertools import batched +from concurrent.futures import ThreadPoolExecutor + from django.core.management import BaseCommand import clickhouse_connect from vacancies.main.vector_store import add_vectors, extract_features, qdrant_client @@ -45,17 +49,24 @@ class Command(BaseCommand): exist_points_set = tuple(set(exist_points_ids)) result_rows = clickhouse_client.query(query, parameters={"exist_points": exist_points_set}).result_rows - result_rows_len = len(result_rows) - for index, row in enumerate(result_rows): - (id, chat_username, telegram_id, message, timestamp) = row + batch_size = 10 + with ThreadPoolExecutor(max_workers=batch_size) as pool: + pool.map(self._process_batch, batched(result_rows, batch_size)) - link = f"https://t.me/{chat_username}/{telegram_id}" - print(f"Processing {index+1}/{result_rows_len} link: {link}") - features = extract_features(message) + def _process_batch(self, result_rows): + try: + for index, row in enumerate(result_rows): + (id, chat_username, telegram_id, message, timestamp) = row - add_vectors( - "vacancies", - id, - features.model_dump(), - {'content': message, 'features_json': features.model_dump(), "link": link, "timestamp": timestamp}, - ) + link = f"https://t.me/{chat_username}/{telegram_id}" + print(f"Processing {index+1}/{len(result_rows)} link: {link}") + features = extract_features(message) + + add_vectors( + "vacancies", + id, + features.model_dump(), + {'content': message, 'features_json': features.model_dump(), "link": link, "timestamp": timestamp}, + ) + except Exception as exc: + traceback.print_exception(exc)