import torch import torch.nn as nn from torch.nn.parallel import DataParallel, DistributedDataParallel import numpy as np from PIL import Image import faiss from transformers import AutoModel, AutoProcessor, AutoTokenizer from typing import List, Union, Tuple, Dict import os import json from pathlib import Path import logging import gc from concurrent.futures import ThreadPoolExecutor, as_completed import threading # 设置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class MultiGPUMultimodalRetrieval: """多GPU优化的多模态检索系统,支持文搜图、文搜文、图搜图、图搜文""" def __init__(self, model_name: str = "OpenSearch-AI/Ops-MM-embedding-v1-7B", use_all_gpus: bool = True, gpu_ids: List[int] = None, min_memory_gb=12): """ 初始化多GPU多模态检索系统 Args: model_name: 模型名称 use_all_gpus: 是否使用所有可用GPU gpu_ids: 指定使用的GPU ID列表 min_memory_gb: 最小可用内存(GB) """ self.model_name = model_name # 设置GPU设备 self._setup_devices(use_all_gpus, gpu_ids, min_memory_gb) # 清理GPU内存 self._clear_all_gpu_memory() logger.info(f"正在加载模型到多GPU: {self.device_ids}") # 加载模型和处理器 self.model = None self.tokenizer = None self.processor = None self._load_model_multigpu() # 初始化索引 self.text_index = None self.image_index = None self.text_data = [] self.image_data = [] logger.info("多GPU模型加载完成") def _setup_devices(self, use_all_gpus: bool, gpu_ids: List[int], min_memory_gb=12): """设置GPU设备""" if not torch.cuda.is_available(): raise RuntimeError("CUDA不可用,无法使用多GPU") total_gpus = torch.cuda.device_count() logger.info(f"检测到 {total_gpus} 个GPU") # 检查是否设置了CUDA_VISIBLE_DEVICES cuda_visible_devices = os.environ.get('CUDA_VISIBLE_DEVICES') if cuda_visible_devices is not None: # 如果设置了CUDA_VISIBLE_DEVICES,使用可见的GPU visible_gpu_count = len(cuda_visible_devices.split(',')) self.device_ids = list(range(visible_gpu_count)) logger.info(f"使用CUDA_VISIBLE_DEVICES指定的GPU: {cuda_visible_devices}") elif use_all_gpus: self.device_ids = self._select_best_gpus(min_memory_gb) elif gpu_ids: self.device_ids = gpu_ids else: self.device_ids = [0] self.num_gpus = len(self.device_ids) self.primary_device = f"cuda:{self.device_ids[0]}" logger.info(f"使用GPU: {self.device_ids}, 主设备: {self.primary_device}") def _clear_all_gpu_memory(self): """清理所有GPU内存""" for gpu_id in self.device_ids: torch.cuda.set_device(gpu_id) torch.cuda.empty_cache() torch.cuda.synchronize() gc.collect() logger.info("所有GPU内存已清理") def _load_model_multigpu(self): """加载模型到多GPU""" try: # 设置环境变量优化内存使用 os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True' # 清理GPU内存 self._clear_gpu_memory() # 首先尝试使用accelerate的自动设备映射 if self.num_gpus > 1: # 设置最大内存限制(每个GPU 18GB,留出缓冲) max_memory = {i: "18GiB" for i in self.device_ids} logger.info(f"正在加载模型到多GPU: {self.device_ids}") self.model = AutoModel.from_pretrained( self.model_name, trust_remote_code=True, torch_dtype=torch.float16, device_map="auto", max_memory=max_memory, low_cpu_mem_usage=True, offload_folder="./offload" ) else: # 单GPU加载 self.model = AutoModel.from_pretrained( self.model_name, trust_remote_code=True, torch_dtype=torch.float16, device_map=self.primary_device ) # 加载分词器和处理器到主设备 try: self.tokenizer = AutoTokenizer.from_pretrained( self.model_name, trust_remote_code=True ) logger.info("Tokenizer加载成功") except Exception as e: logger.error(f"Tokenizer加载失败: {e}") return False # 加载处理器用于图像处理 try: self.processor = AutoProcessor.from_pretrained( self.model_name, trust_remote_code=True ) logger.info("Processor加载成功") except Exception as e: logger.warning(f"Processor加载失败: {e}") # 如果AutoProcessor失败,尝试使用tokenizer作为fallback logger.info("尝试使用tokenizer作为processor的fallback") self.processor = self.tokenizer logger.info(f"模型已成功加载到设备: {self.model.hf_device_map if hasattr(self.model, 'hf_device_map') else self.primary_device}") logger.info("多GPU模型加载完成") return True except Exception as e: logger.error(f"多GPU模型加载失败: {str(e)}") return False def _clear_gpu_memory(self): """清理GPU内存""" for gpu_id in self.device_ids: torch.cuda.set_device(gpu_id) torch.cuda.empty_cache() torch.cuda.synchronize() gc.collect() logger.info("GPU内存已清理") def _get_gpu_memory_info(self): """获取GPU内存使用情况""" try: import subprocess result = subprocess.run(['nvidia-smi', '--query-gpu=memory.used,memory.total', '--format=csv,nounits,noheader'], capture_output=True, text=True, check=True) lines = result.stdout.strip().split('\n') gpu_info = [] for i, line in enumerate(lines): used, total = map(int, line.split(', ')) free = total - used gpu_info.append({ 'gpu_id': i, 'used': used, 'total': total, 'free': free, 'usage_percent': (used / total) * 100 }) return gpu_info except Exception as e: logger.warning(f"无法获取GPU内存信息: {e}") return [] def _select_best_gpus(self, min_memory_gb=12): """选择内存充足的GPU""" gpu_info = self._get_gpu_memory_info() if not gpu_info: return list(range(torch.cuda.device_count())) # 按可用内存排序 gpu_info.sort(key=lambda x: x['free'], reverse=True) # 选择内存充足的GPU min_memory_mb = min_memory_gb * 1024 suitable_gpus = [] for gpu in gpu_info: if gpu['free'] >= min_memory_mb: suitable_gpus.append(gpu['gpu_id']) logger.info(f"GPU {gpu['gpu_id']}: {gpu['free']}MB 可用 (合适)") else: logger.warning(f"GPU {gpu['gpu_id']}: {gpu['free']}MB 可用 (不足)") if not suitable_gpus: # 如果没有GPU满足要求,选择可用内存最多的 logger.warning(f"没有GPU有足够内存({min_memory_gb}GB),选择可用内存最多的GPU") suitable_gpus = [gpu_info[0]['gpu_id']] return suitable_gpus def encode_text_batch(self, texts: List[str]) -> np.ndarray: """ 批量编码文本为向量(多GPU优化) Args: texts: 文本列表 Returns: 文本向量 """ if not texts: return np.array([]) with torch.no_grad(): # 预处理输入 inputs = self.tokenizer( text=texts, return_tensors="pt", padding=True, truncation=True, max_length=512 ) # 将输入移动到主设备 inputs = {k: v.to(self.primary_device) for k, v in inputs.items()} # 前向传播 outputs = self.model(**inputs) embeddings = outputs.last_hidden_state.mean(dim=1) # 清理GPU内存 del inputs, outputs torch.cuda.empty_cache() return embeddings.cpu().numpy().astype(np.float32) def encode_image_batch(self, images: List[Union[str, Image.Image]]) -> np.ndarray: """ 批量编码图像为向量 Args: images: 图像路径或PIL图像列表 Returns: 图像向量 """ if not images: return np.array([]) # 预处理图像 processed_images = [] for img in images: if isinstance(img, str): img = Image.open(img).convert('RGB') elif isinstance(img, Image.Image): img = img.convert('RGB') processed_images.append(img) try: logger.info(f"处理 {len(processed_images)} 张图像") # 使用多模态模型生成图像embedding # 为每张图像创建简单的文本描述作为输入 conversations = [] for i in range(len(processed_images)): # 使用简化的对话格式 conversation = [ { "role": "user", "content": [ {"type": "image", "image": processed_images[i]}, {"type": "text", "text": "What is in this image?"} ] } ] conversations.append(conversation) # 使用processor处理 try: # 尝试使用apply_chat_template方法 texts = [] for conv in conversations: text = self.processor.apply_chat_template(conv, tokenize=False, add_generation_prompt=False) texts.append(text) # 处理文本和图像 inputs = self.processor( text=texts, images=processed_images, return_tensors="pt", padding=True ) # 移动到GPU inputs = {k: v.to(self.primary_device) for k, v in inputs.items()} # 获取模型输出 with torch.no_grad(): outputs = self.model(**inputs) embeddings = outputs.last_hidden_state.mean(dim=1) # 转换为numpy数组 embeddings = embeddings.cpu().numpy().astype(np.float32) except Exception as inner_e: logger.warning(f"多模态模型图像编码失败,使用文本模式: {inner_e}") return np.zeros((len(processed_images), 3584), dtype=np.float32) # 如果多模态失败,使用纯文本描述作为fallback image_descriptions = ["An image" for _ in processed_images] text_inputs = self.processor( text=image_descriptions, return_tensors="pt", padding=True, truncation=True, max_length=512 ) text_inputs = {k: v.to(self.primary_device) for k, v in text_inputs.items()} with torch.no_grad(): outputs = self.model(**text_inputs) embeddings = outputs.last_hidden_state.mean(dim=1) embeddings = embeddings.cpu().numpy().astype(np.float32) logger.info(f"生成图像embeddings: {embeddings.shape}") return embeddings except Exception as e: logger.error(f"图像编码失败: {e}") # 返回与文本embedding维度一致的零向量作为fallback embedding_dim = 3584 embeddings = np.zeros((len(processed_images), embedding_dim), dtype=np.float32) return embeddings def build_text_index_parallel(self, texts: List[str], save_path: str = None): """ 并行构建文本索引(多GPU优化) Args: texts: 文本列表 save_path: 索引保存路径 """ logger.info(f"正在并行构建文本索引,共 {len(texts)} 条文本") # 根据GPU数量调整批次大小 batch_size = max(4, 16 // self.num_gpus) all_embeddings = [] # 分批处理 for i in range(0, len(texts), batch_size): batch_texts = texts[i:i+batch_size] try: embeddings = self.encode_text_batch(batch_texts) all_embeddings.append(embeddings) # 显示进度 if (i // batch_size + 1) % 10 == 0: logger.info(f"已处理 {i + len(batch_texts)}/{len(texts)} 条文本") except torch.cuda.OutOfMemoryError: logger.warning(f"GPU内存不足,跳过批次 {i}-{i+len(batch_texts)}") self._clear_all_gpu_memory() continue except Exception as e: logger.error(f"处理文本批次时出错: {e}") continue if not all_embeddings: raise ValueError("没有成功处理任何文本") # 合并所有嵌入向量 embeddings = np.vstack(all_embeddings) # 构建FAISS索引 dimension = embeddings.shape[1] self.text_index = faiss.IndexFlatIP(dimension) # 归一化向量 faiss.normalize_L2(embeddings) self.text_index.add(embeddings) self.text_data = texts if save_path: self._save_index(self.text_index, texts, save_path + "_text") logger.info("文本索引构建完成") def build_image_index_parallel(self, image_paths: List[str], save_path: str = None): """ 并行构建图像索引(多GPU优化) Args: image_paths: 图像路径列表 save_path: 索引保存路径 """ logger.info(f"正在并行构建图像索引,共 {len(image_paths)} 张图像") # 图像处理使用更小的批次 batch_size = max(2, 8 // self.num_gpus) all_embeddings = [] for i in range(0, len(image_paths), batch_size): batch_images = image_paths[i:i+batch_size] try: embeddings = self.encode_image_batch(batch_images) all_embeddings.append(embeddings) # 显示进度 if (i // batch_size + 1) % 5 == 0: logger.info(f"已处理 {i + len(batch_images)}/{len(image_paths)} 张图像") except torch.cuda.OutOfMemoryError: logger.warning(f"GPU内存不足,跳过图像批次 {i}-{i+len(batch_images)}") self._clear_all_gpu_memory() continue except Exception as e: logger.error(f"处理图像批次时出错: {e}") continue if not all_embeddings: raise ValueError("没有成功处理任何图像") embeddings = np.vstack(all_embeddings) # 构建FAISS索引 dimension = embeddings.shape[1] self.image_index = faiss.IndexFlatIP(dimension) faiss.normalize_L2(embeddings) self.image_index.add(embeddings) self.image_data = image_paths if save_path: self._save_index(self.image_index, image_paths, save_path + "_image") logger.info("图像索引构建完成") def search_text_by_text(self, query: str, top_k: int = 5) -> List[Tuple[str, float]]: """文搜文:使用文本查询搜索相似文本""" if self.text_index is None: raise ValueError("文本索引未构建,请先调用 build_text_index_parallel") query_embedding = self.encode_text_batch([query]).astype(np.float32) faiss.normalize_L2(query_embedding) scores, indices = self.text_index.search(query_embedding, top_k) results = [] for score, idx in zip(scores[0], indices[0]): if idx != -1: results.append((self.text_data[idx], float(score))) return results def search_images_by_text(self, query: str, top_k: int = 5) -> List[Tuple[str, float]]: """文搜图:使用文本查询搜索相似图像""" if self.image_index is None: raise ValueError("图像索引未构建,请先调用 build_image_index_parallel") query_embedding = self.encode_text_batch([query]).astype(np.float32) faiss.normalize_L2(query_embedding) scores, indices = self.image_index.search(query_embedding, top_k) results = [] for score, idx in zip(scores[0], indices[0]): if idx != -1: results.append((self.image_data[idx], float(score))) return results def search_images_by_image(self, query_image: Union[str, Image.Image], top_k: int = 5) -> List[Tuple[str, float]]: """图搜图:使用图像查询搜索相似图像""" if self.image_index is None: raise ValueError("图像索引未构建,请先调用 build_image_index_parallel") query_embedding = self.encode_image_batch([query_image]).astype(np.float32) faiss.normalize_L2(query_embedding) scores, indices = self.image_index.search(query_embedding, top_k) results = [] for score, idx in zip(scores[0], indices[0]): if idx != -1: results.append((self.image_data[idx], float(score))) return results def search_text_by_image(self, query_image: Union[str, Image.Image], top_k: int = 5) -> List[Tuple[str, float]]: """图搜文:使用图像查询搜索相似文本""" if self.text_index is None: raise ValueError("文本索引未构建,请先调用 build_text_index_parallel") query_embedding = self.encode_image_batch([query_image]).astype(np.float32) faiss.normalize_L2(query_embedding) scores, indices = self.text_index.search(query_embedding, top_k) results = [] for score, idx in zip(scores[0], indices[0]): if idx != -1: results.append((self.text_data[idx], float(score))) return results # Web应用兼容的方法名称 def search_text_to_image(self, query: str, top_k: int = 5) -> List[Tuple[str, float]]: """文搜图:Web应用兼容方法""" return self.search_images_by_text(query, top_k) def search_image_to_image(self, query_image: Union[str, Image.Image], top_k: int = 5) -> List[Tuple[str, float]]: """图搜图:Web应用兼容方法""" return self.search_images_by_image(query_image, top_k) def search_text_to_text(self, query: str, top_k: int = 5) -> List[Tuple[str, float]]: """文搜文:Web应用兼容方法""" return self.search_text_by_text(query, top_k) def search_image_to_text(self, query_image: Union[str, Image.Image], top_k: int = 5) -> List[Tuple[str, float]]: """图搜文:Web应用兼容方法""" return self.search_text_by_image(query_image, top_k) def _save_index(self, index, data, path_prefix): """保存索引和数据""" faiss.write_index(index, f"{path_prefix}.index") with open(f"{path_prefix}.json", 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) def load_index(self, path_prefix, index_type="text"): """加载已保存的索引""" index = faiss.read_index(f"{path_prefix}.index") with open(f"{path_prefix}.json", 'r', encoding='utf-8') as f: data = json.load(f) if index_type == "text": self.text_index = index self.text_data = data else: self.image_index = index self.image_data = data logger.info(f"已加载 {index_type} 索引") def get_gpu_memory_info(self): """获取所有GPU内存使用信息""" memory_info = {} for gpu_id in self.device_ids: torch.cuda.set_device(gpu_id) allocated = torch.cuda.memory_allocated(gpu_id) / 1024**3 cached = torch.cuda.memory_reserved(gpu_id) / 1024**3 total = torch.cuda.get_device_properties(gpu_id).total_memory / 1024**3 free = total - cached memory_info[f"GPU_{gpu_id}"] = { "total": f"{total:.1f}GB", "allocated": f"{allocated:.1f}GB", "cached": f"{cached:.1f}GB", "free": f"{free:.1f}GB" } return memory_info def check_multigpu_info(): """检查多GPU环境信息""" print("=== 多GPU环境信息 ===") if not torch.cuda.is_available(): print("❌ CUDA不可用") return gpu_count = torch.cuda.device_count() print(f"✅ 检测到 {gpu_count} 个GPU") print(f"CUDA版本: {torch.version.cuda}") print(f"PyTorch版本: {torch.__version__}") for i in range(gpu_count): gpu_name = torch.cuda.get_device_name(i) gpu_memory = torch.cuda.get_device_properties(i).total_memory / 1024**3 print(f"GPU {i}: {gpu_name} ({gpu_memory:.1f}GB)") print("=====================") if __name__ == "__main__": # 检查多GPU环境 check_multigpu_info() # 示例使用 print("\n正在初始化多GPU多模态检索系统...") try: retrieval_system = MultiGPUMultimodalRetrieval() print("✅ 多GPU系统初始化成功!") # 显示GPU内存使用情况 memory_info = retrieval_system.get_gpu_memory_info() print("\n📊 GPU内存使用情况:") for gpu, info in memory_info.items(): print(f" {gpu}: {info['allocated']} / {info['total']} (已用/总计)") print("\n🚀 多GPU多模态检索系统就绪!") print("支持的检索模式:") print("1. 文搜文: search_text_by_text()") print("2. 文搜图: search_images_by_text()") print("3. 图搜图: search_images_by_image()") print("4. 图搜文: search_text_by_image()") except Exception as e: print(f"❌ 多GPU系统初始化失败: {e}") import traceback traceback.print_exc()