from sqlalchemy import Column, Integer, BigInteger, String, DateTime, Text, Index, Boolean, JSON, ForeignKey, Enum from sqlalchemy.orm import relationship import enum from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker from sqlalchemy.orm import declarative_base from datetime import datetime import os from dotenv import load_dotenv from pathlib import Path from typing import AsyncGenerator env_path = Path(__file__).parent.parent / '.env' load_dotenv(dotenv_path=env_path) Base = declarative_base() # --- Models --- class User(Base): """User model for storing authentication tokens and Telegram user data""" __tablename__ = 'users' id = Column(Integer, primary_key=True, autoincrement=True) telegram_id = Column(BigInteger, unique=True, nullable=True, index=True) # Nullable until authenticated, BIGINT for large Telegram IDs token = Column(String(255), unique=True, nullable=False, index=True) username = Column(String(100), nullable=True) status = Column(String(50), nullable=False, default='pending') # 'pending' or 'success' created_at = Column(DateTime, default=datetime.utcnow, nullable=False) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) __table_args__ = ( Index('idx_token_status', 'token', 'status'), ) def __repr__(self): return f"" class Profile(Base): """Profile model for storing user professional information""" __tablename__ = 'profiles' id = Column(Integer, primary_key=True, autoincrement=True) email = Column(String(255), unique=True, nullable=False, index=True) name = Column(String(255), nullable=False) position = Column(String(255), nullable=False) competencies = Column(Text, nullable=True) experience = Column(Text, nullable=True) skills = Column(Text, nullable=True) country = Column(String(100), nullable=True) languages = Column(String(255), nullable=True) employment_format = Column(String(100), nullable=True) rate = Column(String(100), nullable=True) relocation = Column(String(100), nullable=True) cv_url = Column(String(500), nullable=True) created_at = Column(DateTime, default=datetime.utcnow, nullable=False) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) def __repr__(self): return f"" # --- Enums --- class ContactType(str, enum.Enum): LINK = "LINK" TG_LINK = "TG_LINK" EMAIL = "EMAIL" PHONE = "PHONE" class WorkArrangement(str, enum.Enum): REMOTE = "REMOTE" OFFICE = "OFFICE" HYBRID = "HYBRID" class SeniorityLevel(str, enum.Enum): JUNIOR = "JUNIOR" MIDDLE = "MIDDLE" SENIOR = "SENIOR" LEAD = "LEAD" class LanguageCode(str, enum.Enum): RU = "RU" EN = "EN" DE = "DE" FR = "FR" ES = "ES" ZH = "ZH" # --- Vacancy Models --- class Vacancy(Base): """Vacancy/Job posting model""" __tablename__ = 'vacancies' id = Column(Integer, primary_key=True, autoincrement=True) # Basic info title = Column(String(255), nullable=False) # Название вакансии position = Column(String(255), nullable=False) # Позиция company_name = Column(String(255), nullable=False) # Salary salary_min = Column(Integer, nullable=True) salary_max = Column(Integer, nullable=True) salary_currency = Column(String(10), nullable=True, default='USD') # Location and work format country_name = Column(String(100), nullable=True) locations = Column(Text, nullable=True) # JSON string or comma-separated work_arrangement = Column(String(50), nullable=False, default='REMOTE') relocation_supported = Column(Boolean, default=False) # Requirements skills = Column(Text, nullable=False) # Comma-separated or JSON key_competencies = Column(Text, nullable=True) seniority_level = Column(String(50), nullable=True) required_language_codes = Column(JSON, nullable=True) # ["RU", "EN"] # Description description = Column(Text, nullable=False) # Contacts - stored as JSON array # [{"id": "uuid", "type": "LINK", "value": "https://..."}] contacts = Column(JSON, nullable=True) # External link link = Column(String(500), nullable=True) # Timestamps created_at = Column(DateTime, default=datetime.utcnow, nullable=False) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) def __repr__(self): return f"" # --- Chat Models --- class ChatMessage(Base): """Chat message model""" __tablename__ = 'chat_messages' id = Column(Integer, primary_key=True, autoincrement=True) user_id = Column(BigInteger, ForeignKey('users.telegram_id'), nullable=True) # Optional: link to user # Message content content = Column(Text, nullable=False) role = Column(String(50), nullable=False, default='user') # user, assistant, system # Favorite flag is_favorite = Column(Boolean, default=False, index=True) # Timestamps created_at = Column(DateTime, default=datetime.utcnow, nullable=False) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) # Relationships files = relationship("ChatFile", back_populates="message", cascade="all, delete-orphan") def __repr__(self): return f"" class ChatFile(Base): """Files attached to chat messages""" __tablename__ = 'chat_files' id = Column(Integer, primary_key=True, autoincrement=True) message_id = Column(Integer, ForeignKey('chat_messages.id', ondelete='CASCADE'), nullable=False) # File info file_name = Column(String(255), nullable=False) file_url = Column(String(500), nullable=False) # MinIO URL file_type = Column(String(100), nullable=True) # MIME type file_size = Column(Integer, nullable=True) # Size in bytes # Timestamps created_at = Column(DateTime, default=datetime.utcnow, nullable=False) # Relationships message = relationship("ChatMessage", back_populates="files") def __repr__(self): return f"" # --- Database Configuration --- def get_database_url() -> str: """Get PostgreSQL connection URL from environment variables""" postgres_user = os.getenv("POSTGRES_USER", "postgres") postgres_password = os.getenv("POSTGRES_PASSWORD", "postgres") postgres_db = os.getenv("POSTGRES_DB", "rag_ai_assistant") postgres_host = os.getenv("POSTGRES_HOST", "localhost") postgres_port = os.getenv("POSTGRES_PORT", "5432") # Use asyncpg driver for async support database_url = f"postgresql+asyncpg://{postgres_user}:{postgres_password}@{postgres_host}:{postgres_port}/{postgres_db}" print(f"Database URL: postgresql+asyncpg://{postgres_user}:***@{postgres_host}:{postgres_port}/{postgres_db}") return database_url # Create async engine engine = create_async_engine( get_database_url(), echo=False, # Set to True for SQL query logging pool_pre_ping=True, # Verify connections before using them pool_size=5, max_overflow=10 ) # Create async session factory AsyncSessionLocal = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False ) # --- Session Management --- async def get_db() -> AsyncGenerator[AsyncSession, None]: """ Dependency for getting async database session. Usage in FastAPI: @app.get("/endpoint") async def endpoint(db: AsyncSession = Depends(get_db)): ... """ async with AsyncSessionLocal() as session: try: yield session await session.commit() except Exception: await session.rollback() raise finally: await session.close() async def init_db(): """Initialize database tables""" async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) print("Database tables created successfully") async def drop_db(): """Drop all database tables (use with caution!)""" async with engine.begin() as conn: await conn.run_sync(Base.metadata.drop_all) print("Database tables dropped") # --- For testing and manual operations --- if __name__ == "__main__": import asyncio async def main(): print("Initializing database...") await init_db() # Test: Create a sample user async with AsyncSessionLocal() as session: test_user = User( telegram_id=123456789, token="test_token_123", username="TestUser", status="pending" ) session.add(test_user) await session.commit() print(f"Created test user: {test_user}") # Query the user back from sqlalchemy import select result = await session.execute( select(User).where(User.telegram_id == 123456789) ) user = result.scalar_one_or_none() print(f"Retrieved user: {user}") asyncio.run(main())