114 lines
3.8 KiB
Python
114 lines
3.8 KiB
Python
import os
|
|
from typing import List, Optional, BinaryIO
|
|
from minio import Minio
|
|
from minio.error import S3Error
|
|
|
|
|
|
class MinIOProcessor:
|
|
def __init__(
|
|
self,
|
|
endpoint: str,
|
|
access_key: str,
|
|
secret_key: str,
|
|
secure: bool = False
|
|
):
|
|
self._client = Minio(
|
|
endpoint,
|
|
access_key=access_key,
|
|
secret_key=secret_key,
|
|
secure=secure
|
|
)
|
|
|
|
def list_buckets(self) -> List[str]:
|
|
try:
|
|
buckets = self._client.list_buckets()
|
|
return [bucket.name for bucket in buckets]
|
|
except S3Error as e:
|
|
raise RuntimeError(f"Failed to list buckets: {e}")
|
|
|
|
def bucket_exists(self, bucket_name: str) -> bool:
|
|
try:
|
|
return self._client.bucket_exists(bucket_name)
|
|
except S3Error as e:
|
|
raise RuntimeError(f"Failed to check bucket existence: {e}")
|
|
|
|
def create_bucket(self, bucket_name: str) -> bool:
|
|
if self.bucket_exists(bucket_name):
|
|
return False
|
|
|
|
try:
|
|
self._client.make_bucket(bucket_name)
|
|
return True
|
|
except S3Error as e:
|
|
raise RuntimeError(f"Failed to create bucket '{bucket_name}': {e}")
|
|
|
|
def put_object(
|
|
self,
|
|
bucket_name: str,
|
|
object_name: str,
|
|
data: BinaryIO,
|
|
length: int = -1,
|
|
content_type: str = "application/octet-stream"
|
|
) -> str:
|
|
if not self.bucket_exists(bucket_name):
|
|
self.create_bucket(bucket_name)
|
|
|
|
try:
|
|
self._client.put_object(
|
|
bucket_name,
|
|
object_name,
|
|
data,
|
|
length=length,
|
|
content_type=content_type
|
|
)
|
|
return object_name
|
|
except S3Error as e:
|
|
raise RuntimeError(f"Failed to put object '{object_name}' in bucket '{bucket_name}': {e}")
|
|
|
|
def get_object(self, bucket_name: str, object_name: str) -> bytes:
|
|
if not self.bucket_exists(bucket_name):
|
|
raise ValueError(f"Bucket '{bucket_name}' does not exist")
|
|
|
|
try:
|
|
response = self._client.get_object(bucket_name, object_name)
|
|
return response.read()
|
|
except S3Error as e:
|
|
raise RuntimeError(f"Failed to get object '{object_name}' from bucket '{bucket_name}': {e}")
|
|
finally:
|
|
response.close()
|
|
response.release_conn()
|
|
|
|
def delete_object(self, bucket_name: str, object_name: str) -> bool:
|
|
if not self.bucket_exists(bucket_name):
|
|
return False
|
|
|
|
try:
|
|
self._client.remove_object(bucket_name, object_name)
|
|
return True
|
|
except S3Error as e:
|
|
raise RuntimeError(f"Failed to delete object '{object_name}' from bucket '{bucket_name}': {e}")
|
|
|
|
def list_objects(self, bucket_name: str, prefix: Optional[str] = None) -> List[str]:
|
|
if not self.bucket_exists(bucket_name):
|
|
raise ValueError(f"Bucket '{bucket_name}' does not exist")
|
|
|
|
try:
|
|
objects = self._client.list_objects(bucket_name, prefix=prefix)
|
|
return [obj.object_name for obj in objects]
|
|
except S3Error as e:
|
|
raise RuntimeError(f"Failed to list objects in bucket '{bucket_name}': {e}")
|
|
|
|
if __name__ == "__main__":
|
|
processor = MinIOProcessor(
|
|
endpoint=os.getenv("MINIO_ENDPOINT", "localhost:9000"),
|
|
access_key=os.getenv("MINIO_ACCESS_KEY", "minioadmin"),
|
|
secret_key=os.getenv("MINIO_SECRET_KEY", "minioadmin"),
|
|
secure=False
|
|
)
|
|
|
|
buckets = processor.list_buckets()
|
|
processor.create_bucket("new-bucket")
|
|
# processor.put_object("new-bucket", "file.pdf", file_data, content_type="application/pdf")
|
|
|
|
print(buckets)
|
|
# print(processor.get_object("new-bucket", "file.pdf")) |