vision-career/vacancies/main/management/commands/sync_clickhouse_and_qdrant.py
estromenko 3d9e1f2239
All checks were successful
release / docker (push) Successful in 35s
Add sync_clickhouse_and_qdrant command
2025-12-02 22:55:58 +03:00

49 lines
1.9 KiB
Python

import clickhouse_connect
from django.conf import settings
from django.core.management import BaseCommand
from qdrant_client import QdrantClient
from qdrant_client.models import VectorParams, Distance, PointStruct
from openai import OpenAI
from itertools import batched
query = """
SELECT DISTINCT ON (message) id, message
FROM telegram_parser_chatmessage
WHERE timestamp >= now() - INTERVAL 30 DAYS AND length(message) > 200
AND position(message, '?') = 0 AND position(message, 'spam') = 0
ORDER BY timestamp ASC
"""
class Command(BaseCommand):
help = "Sync clickhouse and qdrant"
def handle(self, *args, **options):
clickhouse_client = clickhouse_connect.create_client(host=settings.CLICKHOUSE_HOST, port=settings.CLICKHOUSE_PORT)
qdrant_client = QdrantClient(url=settings.QDRANT_URL)
if not qdrant_client.collection_exists("messages"):
qdrant_client.create_collection(
collection_name="messages",
vectors_config=VectorParams(size=4096, distance=Distance.COSINE),
)
openai_client = OpenAI(base_url="https://openrouter.ai/api/v1")
result_rows = clickhouse_client.query(query).result_rows
batches = list(batched(result_rows, 100))
batches_quantity = len(batches)
for index, batch in enumerate(batches):
ids, messages = list(zip(*batch))
embedding = openai_client.embeddings.create(model="qwen/qwen3-embedding-8b", input=messages, encoding_format="float")
embeddings = [row.embedding for row in embedding.data]
qdrant_client.upsert(
collection_name="messages",
points=[
PointStruct(id=idx, vector=vector, payload={"message": message})
for idx, message, vector in zip(ids, messages, embeddings)
]
)
print(f"{index+1}/{batches_quantity} processed")