import clickhouse_connect from django.conf import settings from django.core.management import BaseCommand from qdrant_client import QdrantClient from qdrant_client.models import VectorParams, Distance, PointStruct from openai import OpenAI from itertools import batched query = """ SELECT DISTINCT ON (message) id, message FROM telegram_parser_chatmessage WHERE timestamp >= now() - INTERVAL 30 DAYS AND length(message) > 200 AND position(message, '?') = 0 AND position(message, 'spam') = 0 ORDER BY timestamp ASC """ class Command(BaseCommand): help = "Sync clickhouse and qdrant" def handle(self, *args, **options): clickhouse_client = clickhouse_connect.create_client(host=settings.CLICKHOUSE_HOST, port=settings.CLICKHOUSE_PORT) qdrant_client = QdrantClient(url=settings.QDRANT_URL) if not qdrant_client.collection_exists("messages"): qdrant_client.create_collection( collection_name="messages", vectors_config=VectorParams(size=4096, distance=Distance.COSINE), ) openai_client = OpenAI(base_url="https://openrouter.ai/api/v1") result_rows = clickhouse_client.query(query).result_rows batches = list(batched(result_rows, 100)) batches_quantity = len(batches) for index, batch in enumerate(batches): ids, messages = list(zip(*batch)) embedding = openai_client.embeddings.create(model="qwen/qwen3-embedding-8b", input=messages, encoding_format="float") embeddings = [row.embedding for row in embedding.data] qdrant_client.upsert( collection_name="messages", points=[ PointStruct(id=idx, vector=vector, payload={"message": message}) for idx, message, vector in zip(ids, messages, embeddings) ] ) print(f"{index+1}/{batches_quantity} processed")