import os from typing import List, Optional, BinaryIO from minio import Minio from minio.error import S3Error class MinIOProcessor: def __init__( self, endpoint: str, access_key: str, secret_key: str, secure: bool = False ): self._client = Minio( endpoint, access_key=access_key, secret_key=secret_key, secure=secure ) def list_buckets(self) -> List[str]: try: buckets = self._client.list_buckets() return [bucket.name for bucket in buckets] except S3Error as e: raise RuntimeError(f"Failed to list buckets: {e}") def bucket_exists(self, bucket_name: str) -> bool: try: return self._client.bucket_exists(bucket_name) except S3Error as e: raise RuntimeError(f"Failed to check bucket existence: {e}") def create_bucket(self, bucket_name: str) -> bool: if self.bucket_exists(bucket_name): return False try: self._client.make_bucket(bucket_name) return True except S3Error as e: raise RuntimeError(f"Failed to create bucket '{bucket_name}': {e}") def put_object( self, bucket_name: str, object_name: str, data: BinaryIO, length: int = -1, content_type: str = "application/octet-stream" ) -> str: if not self.bucket_exists(bucket_name): self.create_bucket(bucket_name) try: self._client.put_object( bucket_name, object_name, data, length=length, content_type=content_type ) return object_name except S3Error as e: raise RuntimeError(f"Failed to put object '{object_name}' in bucket '{bucket_name}': {e}") def get_object(self, bucket_name: str, object_name: str) -> bytes: if not self.bucket_exists(bucket_name): raise ValueError(f"Bucket '{bucket_name}' does not exist") try: response = self._client.get_object(bucket_name, object_name) return response.read() except S3Error as e: raise RuntimeError(f"Failed to get object '{object_name}' from bucket '{bucket_name}': {e}") finally: response.close() response.release_conn() def delete_object(self, bucket_name: str, object_name: str) -> bool: if not self.bucket_exists(bucket_name): return False try: self._client.remove_object(bucket_name, object_name) return True except S3Error as e: raise RuntimeError(f"Failed to delete object '{object_name}' from bucket '{bucket_name}': {e}") def list_objects(self, bucket_name: str, prefix: Optional[str] = None) -> List[str]: if not self.bucket_exists(bucket_name): raise ValueError(f"Bucket '{bucket_name}' does not exist") try: objects = self._client.list_objects(bucket_name, prefix=prefix) return [obj.object_name for obj in objects] except S3Error as e: raise RuntimeError(f"Failed to list objects in bucket '{bucket_name}': {e}") if __name__ == "__main__": processor = MinIOProcessor( endpoint=os.getenv("MINIO_ENDPOINT", "localhost:9000"), access_key=os.getenv("MINIO_ACCESS_KEY", "minioadmin"), secret_key=os.getenv("MINIO_SECRET_KEY", "minioadmin"), secure=False ) buckets = processor.list_buckets() processor.create_bucket("new-bucket") # processor.put_object("new-bucket", "file.pdf", file_data, content_type="application/pdf") print(buckets) # print(processor.get_object("new-bucket", "file.pdf"))