49 lines
1.9 KiB
Python
49 lines
1.9 KiB
Python
import clickhouse_connect
|
|
from django.conf import settings
|
|
from django.core.management import BaseCommand
|
|
from qdrant_client import QdrantClient
|
|
from qdrant_client.models import VectorParams, Distance, PointStruct
|
|
from openai import OpenAI
|
|
from itertools import batched
|
|
|
|
query = """
|
|
SELECT DISTINCT ON (message) id, message
|
|
FROM telegram_parser_chatmessage
|
|
WHERE timestamp >= now() - INTERVAL 30 DAYS AND length(message) > 200
|
|
AND position(message, '?') = 0 AND position(message, 'spam') = 0
|
|
ORDER BY timestamp ASC
|
|
"""
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = "Sync clickhouse and qdrant"
|
|
|
|
def handle(self, *args, **options):
|
|
clickhouse_client = clickhouse_connect.create_client(host=settings.CLICKHOUSE_HOST, port=settings.CLICKHOUSE_PORT)
|
|
|
|
qdrant_client = QdrantClient(url=settings.QDRANT_URL)
|
|
if not qdrant_client.collection_exists("messages"):
|
|
qdrant_client.create_collection(
|
|
collection_name="messages",
|
|
vectors_config=VectorParams(size=4096, distance=Distance.COSINE),
|
|
)
|
|
|
|
openai_client = OpenAI(base_url="https://openrouter.ai/api/v1")
|
|
|
|
result_rows = clickhouse_client.query(query).result_rows
|
|
batches = list(batched(result_rows, 100))
|
|
batches_quantity = len(batches)
|
|
for index, batch in enumerate(batches):
|
|
ids, messages = list(zip(*batch))
|
|
embedding = openai_client.embeddings.create(model="qwen/qwen3-embedding-8b", input=messages, encoding_format="float")
|
|
embeddings = [row.embedding for row in embedding.data]
|
|
|
|
qdrant_client.upsert(
|
|
collection_name="messages",
|
|
points=[
|
|
PointStruct(id=idx, vector=vector, payload={"message": message})
|
|
for idx, message, vector in zip(ids, messages, embeddings)
|
|
]
|
|
)
|
|
print(f"{index+1}/{batches_quantity} processed")
|