imgsearcher/app/api/robot_manager.py
2025-06-17 20:14:11 +08:00

218 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import datetime
from pymongo import MongoClient
from dotenv import load_dotenv
import base64
import uuid
# 加载环境变量
load_dotenv()
class RobotManager:
"""管理机器人角色的类"""
def __init__(self):
"""初始化MongoDB连接"""
# 从环境变量获取MongoDB连接信息
mongo_uri = os.getenv('MONGO_URI', 'mongodb://localhost:27017/')
db_name = os.getenv('MONGO_DB_NAME', 'imgsearcher')
# 连接MongoDB
self.client = MongoClient(mongo_uri)
self.db = self.client[db_name]
self.collection = self.db['robot_characters']
# 确保有索引以提高查询性能
self.collection.create_index('robot_id')
self.collection.create_index('name')
# 上传文件夹路径
self.upload_folder = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), 'uploads')
if not os.path.exists(self.upload_folder):
os.makedirs(self.upload_folder)
print(f"MongoDB RobotManager 初始化完成,连接到 {db_name}")
def add_robot(self, name, background, avatar_file=None):
"""
添加新的机器人角色
Args:
name: 机器人名称
background: 机器人背景故事
avatar_file: 机器人头像文件(可选)
Returns:
dict: 包含新添加的机器人信息
"""
if not name or not background:
return {"error": "机器人名称和背景故事不能为空"}
# 检查是否已存在同名机器人
if self.collection.find_one({"name": name}):
return {"error": f"已存在名为 '{name}' 的机器人"}
# 生成唯一ID
robot_id = str(uuid.uuid4())
# 处理头像文件
avatar_path = None
if avatar_file:
# 保存头像文件
filename = f"robot_avatar_{robot_id}.{avatar_file.filename.split('.')[-1]}"
file_path = os.path.join(self.upload_folder, filename)
avatar_file.save(file_path)
avatar_path = file_path
# 创建机器人记录
robot = {
"robot_id": robot_id,
"name": name,
"background": background,
"avatar_path": avatar_path,
"created_at": datetime.datetime.now()
}
# 保存到数据库
self.collection.insert_one(robot)
# 返回新添加的机器人信息不包含MongoDB的_id字段
robot.pop("_id", None)
return robot
def get_robot(self, robot_id):
"""
获取指定ID的机器人信息
Args:
robot_id: 机器人ID
Returns:
dict: 机器人信息如果不存在则返回None
"""
robot = self.collection.find_one({"robot_id": robot_id})
if robot:
# 转换MongoDB的_id为字符串
robot["_id"] = str(robot["_id"])
# 如果有头像添加base64编码的头像数据
if robot.get("avatar_path") and os.path.exists(robot["avatar_path"]):
with open(robot["avatar_path"], "rb") as f:
avatar_data = base64.b64encode(f.read()).decode('utf-8')
robot["avatar_data"] = avatar_data
return robot
return None
def update_robot(self, robot_id, name=None, background=None, avatar_file=None):
"""
更新机器人信息
Args:
robot_id: 机器人ID
name: 新的机器人名称(可选)
background: 新的背景故事(可选)
avatar_file: 新的头像文件(可选)
Returns:
dict: 更新后的机器人信息
"""
# 获取现有机器人
robot = self.collection.find_one({"robot_id": robot_id})
if not robot:
return {"error": f"找不到ID为 '{robot_id}' 的机器人"}
# 准备更新数据
update_data = {}
if name:
# 检查新名称是否与其他机器人冲突
existing = self.collection.find_one({"name": name, "robot_id": {"$ne": robot_id}})
if existing:
return {"error": f"已存在名为 '{name}' 的机器人"}
update_data["name"] = name
if background:
update_data["background"] = background
# 处理新头像
if avatar_file:
# 删除旧头像
old_avatar_path = robot.get("avatar_path")
if old_avatar_path and os.path.exists(old_avatar_path):
try:
os.remove(old_avatar_path)
except:
pass
# 保存新头像
filename = f"robot_avatar_{robot_id}.{avatar_file.filename.split('.')[-1]}"
file_path = os.path.join(self.upload_folder, filename)
avatar_file.save(file_path)
update_data["avatar_path"] = file_path
# 更新数据库
if update_data:
update_data["updated_at"] = datetime.datetime.now()
self.collection.update_one({"robot_id": robot_id}, {"$set": update_data})
# 返回更新后的机器人信息
return self.get_robot(robot_id)
def delete_robot(self, robot_id):
"""
删除机器人
Args:
robot_id: 机器人ID
Returns:
bool: 是否成功删除
"""
# 获取机器人信息
robot = self.collection.find_one({"robot_id": robot_id})
if not robot:
return False
# 删除头像文件
avatar_path = robot.get("avatar_path")
if avatar_path and os.path.exists(avatar_path):
try:
os.remove(avatar_path)
except:
pass
# 从数据库中删除
result = self.collection.delete_one({"robot_id": robot_id})
return result.deleted_count > 0
def get_all_robots(self):
"""
获取所有机器人列表
Returns:
list: 机器人信息列表
"""
robots = []
for robot in self.collection.find():
# 转换MongoDB的_id为字符串
robot["_id"] = str(robot["_id"])
# 如果有头像添加base64编码的头像数据
if robot.get("avatar_path") and os.path.exists(robot["avatar_path"]):
with open(robot["avatar_path"], "rb") as f:
avatar_data = base64.b64encode(f.read()).decode('utf-8')
robot["avatar_data"] = avatar_data
robots.append(robot)
return robots
def close(self):
"""关闭MongoDB连接"""
if self.client:
self.client.close()