qa-and-rag-ai-assistant/database/minio_processor.py
2026-01-13 15:38:27 +03:00

114 lines
3.8 KiB
Python

import os
from typing import List, Optional, BinaryIO
from minio import Minio
from minio.error import S3Error
class MinIOProcessor:
def __init__(
self,
endpoint: str,
access_key: str,
secret_key: str,
secure: bool = False
):
self._client = Minio(
endpoint,
access_key=access_key,
secret_key=secret_key,
secure=secure
)
def list_buckets(self) -> List[str]:
try:
buckets = self._client.list_buckets()
return [bucket.name for bucket in buckets]
except S3Error as e:
raise RuntimeError(f"Failed to list buckets: {e}")
def bucket_exists(self, bucket_name: str) -> bool:
try:
return self._client.bucket_exists(bucket_name)
except S3Error as e:
raise RuntimeError(f"Failed to check bucket existence: {e}")
def create_bucket(self, bucket_name: str) -> bool:
if self.bucket_exists(bucket_name):
return False
try:
self._client.make_bucket(bucket_name)
return True
except S3Error as e:
raise RuntimeError(f"Failed to create bucket '{bucket_name}': {e}")
def put_object(
self,
bucket_name: str,
object_name: str,
data: BinaryIO,
length: int = -1,
content_type: str = "application/octet-stream"
) -> str:
if not self.bucket_exists(bucket_name):
self.create_bucket(bucket_name)
try:
self._client.put_object(
bucket_name,
object_name,
data,
length=length,
content_type=content_type
)
return object_name
except S3Error as e:
raise RuntimeError(f"Failed to put object '{object_name}' in bucket '{bucket_name}': {e}")
def get_object(self, bucket_name: str, object_name: str) -> bytes:
if not self.bucket_exists(bucket_name):
raise ValueError(f"Bucket '{bucket_name}' does not exist")
try:
response = self._client.get_object(bucket_name, object_name)
return response.read()
except S3Error as e:
raise RuntimeError(f"Failed to get object '{object_name}' from bucket '{bucket_name}': {e}")
finally:
response.close()
response.release_conn()
def delete_object(self, bucket_name: str, object_name: str) -> bool:
if not self.bucket_exists(bucket_name):
return False
try:
self._client.remove_object(bucket_name, object_name)
return True
except S3Error as e:
raise RuntimeError(f"Failed to delete object '{object_name}' from bucket '{bucket_name}': {e}")
def list_objects(self, bucket_name: str, prefix: Optional[str] = None) -> List[str]:
if not self.bucket_exists(bucket_name):
raise ValueError(f"Bucket '{bucket_name}' does not exist")
try:
objects = self._client.list_objects(bucket_name, prefix=prefix)
return [obj.object_name for obj in objects]
except S3Error as e:
raise RuntimeError(f"Failed to list objects in bucket '{bucket_name}': {e}")
if __name__ == "__main__":
processor = MinIOProcessor(
endpoint=os.getenv("MINIO_ENDPOINT", "localhost:9000"),
access_key=os.getenv("MINIO_ACCESS_KEY", "minioadmin"),
secret_key=os.getenv("MINIO_SECRET_KEY", "minioadmin"),
secure=False
)
buckets = processor.list_buckets()
processor.create_bucket("new-bucket")
# processor.put_object("new-bucket", "file.pdf", file_data, content_type="application/pdf")
print(buckets)
# print(processor.get_object("new-bucket", "file.pdf"))