qa-and-rag-ai-assistant/database/database.py
2026-01-13 15:38:27 +03:00

289 lines
9.3 KiB
Python

from sqlalchemy import Column, Integer, BigInteger, String, DateTime, Text, Index, Boolean, JSON, ForeignKey, Enum
from sqlalchemy.orm import relationship
import enum
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import declarative_base
from datetime import datetime
import os
from dotenv import load_dotenv
from pathlib import Path
from typing import AsyncGenerator
env_path = Path(__file__).parent.parent / '.env'
load_dotenv(dotenv_path=env_path)
Base = declarative_base()
# --- Models ---
class User(Base):
"""User model for storing authentication tokens and Telegram user data"""
__tablename__ = 'users'
id = Column(Integer, primary_key=True, autoincrement=True)
telegram_id = Column(BigInteger, unique=True, nullable=True, index=True) # Nullable until authenticated, BIGINT for large Telegram IDs
token = Column(String(255), unique=True, nullable=False, index=True)
username = Column(String(100), nullable=True)
status = Column(String(50), nullable=False, default='pending') # 'pending' or 'success'
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
__table_args__ = (
Index('idx_token_status', 'token', 'status'),
)
def __repr__(self):
return f"<User(id={self.id}, telegram_id={self.telegram_id}, username={self.username}, status={self.status})>"
class Profile(Base):
"""Profile model for storing user professional information"""
__tablename__ = 'profiles'
id = Column(Integer, primary_key=True, autoincrement=True)
email = Column(String(255), unique=True, nullable=False, index=True)
name = Column(String(255), nullable=False)
position = Column(String(255), nullable=False)
competencies = Column(Text, nullable=True)
experience = Column(Text, nullable=True)
skills = Column(Text, nullable=True)
country = Column(String(100), nullable=True)
languages = Column(String(255), nullable=True)
employment_format = Column(String(100), nullable=True)
rate = Column(String(100), nullable=True)
relocation = Column(String(100), nullable=True)
cv_url = Column(String(500), nullable=True)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
def __repr__(self):
return f"<Profile(id={self.id}, email={self.email}, name={self.name}, position={self.position})>"
# --- Enums ---
class ContactType(str, enum.Enum):
LINK = "LINK"
TG_LINK = "TG_LINK"
EMAIL = "EMAIL"
PHONE = "PHONE"
class WorkArrangement(str, enum.Enum):
REMOTE = "REMOTE"
OFFICE = "OFFICE"
HYBRID = "HYBRID"
class SeniorityLevel(str, enum.Enum):
JUNIOR = "JUNIOR"
MIDDLE = "MIDDLE"
SENIOR = "SENIOR"
LEAD = "LEAD"
class LanguageCode(str, enum.Enum):
RU = "RU"
EN = "EN"
DE = "DE"
FR = "FR"
ES = "ES"
ZH = "ZH"
# --- Vacancy Models ---
class Vacancy(Base):
"""Vacancy/Job posting model"""
__tablename__ = 'vacancies'
id = Column(Integer, primary_key=True, autoincrement=True)
# Basic info
title = Column(String(255), nullable=False) # Название вакансии
position = Column(String(255), nullable=False) # Позиция
company_name = Column(String(255), nullable=False)
# Salary
salary_min = Column(Integer, nullable=True)
salary_max = Column(Integer, nullable=True)
salary_currency = Column(String(10), nullable=True, default='USD')
# Location and work format
country_name = Column(String(100), nullable=True)
locations = Column(Text, nullable=True) # JSON string or comma-separated
work_arrangement = Column(String(50), nullable=False, default='REMOTE')
relocation_supported = Column(Boolean, default=False)
# Requirements
skills = Column(Text, nullable=False) # Comma-separated or JSON
key_competencies = Column(Text, nullable=True)
seniority_level = Column(String(50), nullable=True)
required_language_codes = Column(JSON, nullable=True) # ["RU", "EN"]
# Description
description = Column(Text, nullable=False)
# Contacts - stored as JSON array
# [{"id": "uuid", "type": "LINK", "value": "https://..."}]
contacts = Column(JSON, nullable=True)
# External link
link = Column(String(500), nullable=True)
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
def __repr__(self):
return f"<Vacancy(id={self.id}, title={self.title}, company={self.company_name})>"
# --- Chat Models ---
class ChatMessage(Base):
"""Chat message model"""
__tablename__ = 'chat_messages'
id = Column(Integer, primary_key=True, autoincrement=True)
user_id = Column(BigInteger, ForeignKey('users.telegram_id'), nullable=True) # Optional: link to user
# Message content
content = Column(Text, nullable=False)
role = Column(String(50), nullable=False, default='user') # user, assistant, system
# Favorite flag
is_favorite = Column(Boolean, default=False, index=True)
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
# Relationships
files = relationship("ChatFile", back_populates="message", cascade="all, delete-orphan")
def __repr__(self):
return f"<ChatMessage(id={self.id}, role={self.role}, favorite={self.is_favorite})>"
class ChatFile(Base):
"""Files attached to chat messages"""
__tablename__ = 'chat_files'
id = Column(Integer, primary_key=True, autoincrement=True)
message_id = Column(Integer, ForeignKey('chat_messages.id', ondelete='CASCADE'), nullable=False)
# File info
file_name = Column(String(255), nullable=False)
file_url = Column(String(500), nullable=False) # MinIO URL
file_type = Column(String(100), nullable=True) # MIME type
file_size = Column(Integer, nullable=True) # Size in bytes
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
# Relationships
message = relationship("ChatMessage", back_populates="files")
def __repr__(self):
return f"<ChatFile(id={self.id}, name={self.file_name})>"
# --- Database Configuration ---
def get_database_url() -> str:
"""Get PostgreSQL connection URL from environment variables"""
postgres_user = os.getenv("POSTGRES_USER", "postgres")
postgres_password = os.getenv("POSTGRES_PASSWORD", "postgres")
postgres_db = os.getenv("POSTGRES_DB", "rag_ai_assistant")
postgres_host = os.getenv("POSTGRES_HOST", "localhost")
postgres_port = os.getenv("POSTGRES_PORT", "5432")
# Use asyncpg driver for async support
database_url = f"postgresql+asyncpg://{postgres_user}:{postgres_password}@{postgres_host}:{postgres_port}/{postgres_db}"
print(f"Database URL: postgresql+asyncpg://{postgres_user}:***@{postgres_host}:{postgres_port}/{postgres_db}")
return database_url
# Create async engine
engine = create_async_engine(
get_database_url(),
echo=False, # Set to True for SQL query logging
pool_pre_ping=True, # Verify connections before using them
pool_size=5,
max_overflow=10
)
# Create async session factory
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
# --- Session Management ---
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""
Dependency for getting async database session.
Usage in FastAPI:
@app.get("/endpoint")
async def endpoint(db: AsyncSession = Depends(get_db)):
...
"""
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
async def init_db():
"""Initialize database tables"""
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
print("Database tables created successfully")
async def drop_db():
"""Drop all database tables (use with caution!)"""
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
print("Database tables dropped")
# --- For testing and manual operations ---
if __name__ == "__main__":
import asyncio
async def main():
print("Initializing database...")
await init_db()
# Test: Create a sample user
async with AsyncSessionLocal() as session:
test_user = User(
telegram_id=123456789,
token="test_token_123",
username="TestUser",
status="pending"
)
session.add(test_user)
await session.commit()
print(f"Created test user: {test_user}")
# Query the user back
from sqlalchemy import select
result = await session.execute(
select(User).where(User.telegram_id == 123456789)
)
user = result.scalar_one_or_none()
print(f"Retrieved user: {user}")
asyncio.run(main())