#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 优化的文件处理器 支持自动清理、内存处理和流式上传 """ import os import io import tempfile import logging import uuid from contextlib import contextmanager from typing import Dict, List, Optional, Any, Union, BinaryIO from pathlib import Path from PIL import Image import numpy as np # Optional external managers (BOS/Mongo) are disabled in minimal setup # Previously: # from baidu_bos_manager import get_bos_manager # from mongodb_manager import get_mongodb_manager logger = logging.getLogger(__name__) class OptimizedFileHandler: """优化的文件处理器""" # 小文件阈值 (5MB) SMALL_FILE_THRESHOLD = 5 * 1024 * 1024 # 支持的图像格式 SUPPORTED_IMAGE_FORMATS = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp'} def __init__(self, local_storage_dir=None): # In minimal setup, BOS and MongoDB are not used self.bos_manager = None self.mongodb_manager = None self.temp_files = set() # 跟踪临时文件 self.local_storage_dir = local_storage_dir or tempfile.gettempdir() # 确保本地存储目录存在 if self.local_storage_dir: os.makedirs(self.local_storage_dir, exist_ok=True) @contextmanager def temp_file_context(self, content: bytes = None, suffix: str = None, delete_on_exit: bool = True): """临时文件上下文管理器,确保自动清理""" temp_fd, temp_path = tempfile.mkstemp(suffix=suffix, dir=self.local_storage_dir) self.temp_files.add(temp_path) # 如果提供了内容,写入文件 if content is not None: with os.fdopen(temp_fd, 'wb') as f: f.write(content) else: os.close(temp_fd) # 关闭文件描述符 try: yield temp_path finally: if delete_on_exit and os.path.exists(temp_path): try: os.unlink(temp_path) self.temp_files.discard(temp_path) logger.debug(f"🗑️ 临时文件已清理: {temp_path}") except Exception as e: logger.warning(f"⚠️ 临时文件清理失败: {temp_path}, {e}") def cleanup_all_temp_files(self): """清理所有跟踪的临时文件""" for temp_path in list(self.temp_files): if os.path.exists(temp_path): try: os.unlink(temp_path) logger.debug(f"🗑️ 清理临时文件: {temp_path}") except Exception as e: logger.warning(f"⚠️ 清理临时文件失败: {temp_path}, {e}") self.temp_files.clear() def get_file_size(self, file_obj) -> int: """获取文件大小""" if hasattr(file_obj, 'content_length') and file_obj.content_length: return file_obj.content_length # 通过读取内容获取大小 current_pos = file_obj.tell() file_obj.seek(0, 2) # 移动到文件末尾 size = file_obj.tell() file_obj.seek(current_pos) # 恢复原位置 return size def is_small_file(self, file_obj) -> bool: """判断是否为小文件""" return self.get_file_size(file_obj) <= self.SMALL_FILE_THRESHOLD def process_image_in_memory(self, file_obj, filename: str) -> Optional[Dict[str, Any]]: """在内存中处理小图像文件""" try: # 读取文件内容到内存 file_obj.seek(0) file_content = file_obj.read() file_obj.seek(0) # 验证图像格式 try: image = Image.open(io.BytesIO(file_content)) image.verify() # 验证图像完整性 except Exception as e: logger.error(f"❌ 图像验证失败: {filename}, {e}") return None # 生成唯一ID file_id = str(uuid.uuid4()) # 保存到本地存储 local_path = os.path.join(self.local_storage_dir, f"{file_id}_{filename}") with open(local_path, 'wb') as f: f.write(file_content) # 存储元数据到MongoDB metadata = { "_id": file_id, "filename": filename, "file_type": "image", "file_size": len(file_content), "processing_method": "memory", "local_path": local_path } # 如果有BOS管理器,也上传到BOS if self.bos_manager: bos_key = f"images/memory_{file_id}_{filename}" bos_result = self._upload_to_bos_from_memory(file_content, bos_key, filename) if bos_result: metadata["bos_key"] = bos_key metadata["bos_url"] = bos_result["url"] if self.mongodb_manager: self.mongodb_manager.store_file_metadata(metadata=metadata) logger.info(f"✅ 内存处理图像成功: {filename} ({len(file_content)} bytes)") return { "file_id": file_id, "filename": filename, "local_path": local_path, "processing_method": "memory" } except Exception as e: logger.error(f"❌ 内存处理图像失败: {filename}, {e}") return None def process_image_with_temp_file(self, file_obj, filename: str) -> Optional[Dict[str, Any]]: """使用临时文件处理大图像文件""" try: # 获取文件扩展名 ext = os.path.splitext(filename)[1].lower() # 生成唯一ID file_id = str(uuid.uuid4()) # 创建永久文件路径 permanent_path = os.path.join(self.local_storage_dir, f"{file_id}_{filename}") with self.temp_file_context(suffix=ext) as temp_path: # 保存到临时文件 file_obj.seek(0) with open(temp_path, 'wb') as temp_file: temp_file.write(file_obj.read()) # 验证图像 try: with Image.open(temp_path) as image: image.verify() except Exception as e: logger.error(f"❌ 图像验证失败: {filename}, {e}") return None # 复制到永久存储位置 with open(temp_path, 'rb') as src, open(permanent_path, 'wb') as dst: dst.write(src.read()) # 获取文件信息 file_stat = os.stat(permanent_path) # 存储元数据 metadata = { "_id": file_id, "filename": filename, "file_type": "image", "file_size": file_stat.st_size, "processing_method": "temp_file", "local_path": permanent_path } # 如果有BOS管理器,也上传到BOS if self.bos_manager: bos_key = f"images/temp_{file_id}_{filename}" bos_result = self.bos_manager.upload_file(temp_path, bos_key) if bos_result: metadata["bos_key"] = bos_key metadata["bos_url"] = bos_result["url"] # 存储元数据到MongoDB if self.mongodb_manager: self.mongodb_manager.store_file_metadata(metadata=metadata) logger.info(f"✅ 临时文件处理图像成功: {filename} ({file_stat.st_size} bytes)") return { "file_id": file_id, "filename": filename, "local_path": permanent_path, "processing_method": "temp_file" } except Exception as e: logger.error(f"❌ 临时文件处理图像失败: {filename}, {e}") return None def process_image_smart(self, file_obj, filename: str) -> Optional[Dict[str, Any]]: """智能处理图像文件(自动选择内存或临时文件)""" if self.is_small_file(file_obj): logger.info(f"📦 小文件内存处理: {filename}") return self.process_image_in_memory(file_obj, filename) else: logger.info(f"📁 大文件临时处理: {filename}") return self.process_image_with_temp_file(file_obj, filename) def process_text_in_memory(self, texts: List[str]) -> List[Dict[str, Any]]: """在内存中处理文本数据""" processed_texts = [] for i, text in enumerate(texts): try: # 生成唯一ID和BOS键 file_id = str(uuid.uuid4()) bos_key = f"texts/memory_{file_id}.txt" # 将文本转换为字节 text_bytes = text.encode('utf-8') # 直接上传到BOS bos_result = self._upload_to_bos_from_memory( text_bytes, bos_key, f"text_{i}.txt" ) if bos_result: # 存储元数据到MongoDB metadata = { "_id": file_id, "filename": f"text_{i}.txt", "file_type": "text", "file_size": len(text_bytes), "processing_method": "memory", "bos_key": bos_key, "bos_url": bos_result["url"], "text_content": text } self.mongodb_manager.store_file_metadata(metadata=metadata) processed_texts.append({ "file_id": file_id, "text_content": text, "bos_key": bos_key, "bos_result": bos_result }) logger.info(f"✅ 内存处理文本成功: text_{i} ({len(text_bytes)} bytes)") except Exception as e: logger.error(f"❌ 内存处理文本失败 {i}: {e}") return processed_texts def download_from_bos_for_processing(self, bos_key: str, local_filename: str = None) -> Optional[str]: """从BOS下载文件用于模型处理""" try: # 生成临时文件路径 if local_filename: ext = os.path.splitext(local_filename)[1] else: ext = os.path.splitext(bos_key)[1] with self.temp_file_context(suffix=ext, delete_on_exit=False) as temp_path: # 从BOS下载文件 if not self.bos_manager: logger.warning("BOS manager is not available; skip download.") return None success = self.bos_manager.download_file(bos_key, temp_path) if success: logger.info(f"✅ 从BOS下载文件用于处理: {bos_key}") return temp_path else: logger.error(f"❌ 从BOS下载文件失败: {bos_key}") return None except Exception as e: logger.error(f"❌ 从BOS下载文件异常: {bos_key}, {e}") return None def _upload_to_bos_from_memory(self, content: bytes, bos_key: str, filename: str) -> Optional[Dict[str, Any]]: """从内存直接上传到BOS""" try: if not self.bos_manager: return None # 创建临时文件用于上传 with self.temp_file_context() as temp_path: with open(temp_path, 'wb') as temp_file: temp_file.write(content) result = self.bos_manager.upload_file(temp_path, bos_key) return result except Exception as e: logger.error(f"❌ 内存上传到BOS失败: {filename}, {e}") return None def get_temp_file_for_model(self, file_obj, filename: str) -> Optional[str]: """为模型处理获取临时文件路径(确保文件存在于本地)""" try: ext = os.path.splitext(filename)[1].lower() # 生成唯一ID file_id = str(uuid.uuid4()) # 创建临时文件(不自动删除,供模型使用) temp_fd, temp_path = tempfile.mkstemp(suffix=ext, dir=self.local_storage_dir) self.temp_files.add(temp_path) try: # 写入文件内容 file_obj.seek(0) with os.fdopen(temp_fd, 'wb') as temp_file: temp_file.write(file_obj.read()) logger.debug(f"📁 为模型创建临时文件: {temp_path}") return temp_path except Exception as e: os.close(temp_fd) raise e except Exception as e: logger.error(f"❌ 为模型创建临时文件失败: {filename}, {e}") return None def cleanup_temp_file(self, temp_path: str): """清理指定的临时文件""" if temp_path and os.path.exists(temp_path): try: os.unlink(temp_path) self.temp_files.discard(temp_path) logger.debug(f"🗑️ 清理临时文件: {temp_path}") except Exception as e: logger.warning(f"⚠️ 清理临时文件失败: {temp_path}, {e}") # 全局实例 file_handler = None def get_file_handler() -> OptimizedFileHandler: """获取优化文件处理器实例""" global file_handler if file_handler is None: file_handler = OptimizedFileHandler() return file_handler