videoSummary/app/services/dashscope_service.py
2025-12-02 18:54:14 +08:00

500 lines
21 KiB
Python

"""
DashScope API service
"""
import base64
import json
import logging
from pathlib import Path
from typing import Optional, Dict, Any
from dashscope import MultiModalConversation
import dashscope
from app.config import Config
logger = logging.getLogger('videoSummary')
class DashScopeService:
"""DashScope API service wrapper"""
def __init__(self, config: Config):
"""
Initialize DashScope service
Args:
config: Application configuration
"""
self.config = config
self.api_key = config.dashscope_api_key
self.model = config.dashscope_model
self.fps = config.dashscope_fps
self._api_key_set = False
def _ensure_api_key(self):
"""
Ensure API key is set before making API calls
Raises:
ValueError: If API key is not configured
"""
if not self.api_key:
raise ValueError(
"DashScope API key is required for video analysis. "
"Please set 'dashscope.api_key' in config.yaml file. "
"You can obtain your API key from: https://dashscope.console.aliyun.com/"
)
if not self._api_key_set:
dashscope.api_key = self.api_key
self._api_key_set = True
def _log_request(self, method_name: str, request_data: Dict[str, Any]):
"""
Log complete request data for third-party API calls
Args:
method_name: Name of the calling method
request_data: Request data dictionary containing model, messages, api_key, etc.
"""
try:
# Create a copy for logging to avoid modifying original
log_data = request_data.copy()
# Handle Base64 encoded video data - only log summary for large data
if 'messages' in log_data:
messages_copy = []
for msg in log_data['messages']:
msg_copy = msg.copy() if isinstance(msg, dict) else msg
if isinstance(msg_copy, dict) and 'content' in msg_copy:
content_copy = []
for content_item in msg_copy['content']:
if isinstance(content_item, dict) and 'video' in content_item:
video_value = content_item['video']
# If it's a Base64 data URI, only log the prefix and size
if isinstance(video_value, str) and video_value.startswith('data:video'):
# Extract size info if available
size_info = ''
if 'encoded_size' in log_data:
size_info = f" (encoded size: {log_data['encoded_size']} bytes)"
content_copy.append({
'video': f"data:video/mp4;base64,[BASE64_DATA{size_info}]",
'fps': content_item.get('fps', 'N/A')
})
else:
content_copy.append(content_item)
else:
content_copy.append(content_item)
msg_copy['content'] = content_copy
messages_copy.append(msg_copy)
log_data['messages'] = messages_copy
# Log complete request at DEBUG level
logger.debug(f"DashScope API Request [{method_name}] - Complete Request Data:")
logger.debug(json.dumps(log_data, indent=2, ensure_ascii=False))
# Log summary at INFO level
video_count = log_data.get('video_count', 'N/A')
if video_count == 'N/A' and 'video_paths' in log_data:
video_paths = log_data.get('video_paths', [])
video_count = len(video_paths) if isinstance(video_paths, list) else 'N/A'
summary = {
'method': method_name,
'model': log_data.get('model', 'N/A'),
'api_key': log_data.get('api_key', 'N/A'),
'messages_count': len(log_data.get('messages', [])),
'fps': log_data.get('fps', 'N/A'),
'video_count': video_count
}
logger.info(f"DashScope API Request [{method_name}] - Summary: {json.dumps(summary, ensure_ascii=False)}")
except Exception as e:
logger.warning(f"Failed to log request data: {str(e)}")
def _log_response(self, method_name: str, response: Any):
"""
Log complete response data from third-party API calls
Args:
method_name: Name of the calling method
response: Response object from DashScope API
"""
try:
# Log complete response at DEBUG level
logger.debug(f"DashScope API Response [{method_name}] - Complete Response Data:")
response_data = {
'status_code': getattr(response, 'status_code', 'N/A'),
'message': getattr(response, 'message', 'N/A'),
'request_id': getattr(response, 'request_id', 'N/A'),
}
# Try to get response output if available
if hasattr(response, 'output'):
try:
output_dict = {
'choices': []
}
if hasattr(response.output, 'choices') and response.output.choices:
for choice in response.output.choices:
choice_dict = {}
if hasattr(choice, 'message'):
if hasattr(choice.message, 'content'):
content_list = []
for content_item in choice.message.content:
if isinstance(content_item, dict):
# For text content, include full text
if 'text' in content_item:
content_list.append({'text': content_item['text']})
else:
# For other content types, include type only
content_list.append({k: v for k, v in content_item.items() if k != 'video' or not isinstance(v, str) or len(v) < 100})
else:
content_list.append(str(content_item))
choice_dict['message'] = {'content': content_list}
output_dict['choices'].append(choice_dict)
response_data['output'] = output_dict
except Exception as e:
response_data['output'] = f"Error extracting output: {str(e)}"
logger.debug(json.dumps(response_data, indent=2, ensure_ascii=False))
# Log summary at INFO level
summary = {
'method': method_name,
'status_code': response_data['status_code'],
'message': response_data['message'],
'request_id': response_data['request_id']
}
logger.info(f"DashScope API Response [{method_name}] - Summary: {json.dumps(summary, ensure_ascii=False)}")
# Log error details at ERROR level if failed
if response_data['status_code'] != 200:
logger.error(f"DashScope API Error [{method_name}] - Status: {response_data['status_code']}, Message: {response_data['message']}")
except Exception as e:
logger.warning(f"Failed to log response data: {str(e)}")
def analyze_video(self, video_path: Path, prompt: str, fps: Optional[int] = None) -> Dict[str, Any]:
"""
Analyze video content using DashScope API
Args:
video_path: Path to video file
prompt: Analysis prompt
fps: Frames per second (overrides config if provided)
Returns:
API response dictionary
Raises:
Exception: If API call fails
"""
# Ensure API key is set
self._ensure_api_key()
if not video_path.exists():
raise FileNotFoundError(f"Video file not found: {video_path}")
# According to DashScope documentation for Linux/macOS Python SDK:
# local_path = "xxx/test.mp4" # absolute path string
# video_path = f"file://{local_path}"
# Example from official docs:
# local_path = "xxx/test.mp4"
# video_path = f"file://{local_path}"
absolute_path = video_path.absolute()
local_path = str(absolute_path)
video_path_for_api = f"file://{local_path}"
# Log for debugging
logger.info(f"Using video path (file:// format as per official docs): {video_path_for_api}")
# Use provided fps or default from config
fps_value = fps if fps is not None else self.fps
# Prepare messages according to DashScope official documentation format:
# {'role':'user', 'content': [{'video': video_path, "fps":2}, {'text': '...'}]}
messages = [
{
'role': 'user',
'content': [
{'video': video_path_for_api, 'fps': fps_value},
{'text': prompt}
]
}
]
# Call API
try:
# Prepare request data for logging
request_data = {
'model': self.model,
'api_key': self.api_key,
'messages': messages,
'fps': fps_value,
'video_path': video_path_for_api
}
# Log complete request
self._log_request('analyze_video', request_data)
response = MultiModalConversation.call(
model=self.model,
messages=messages
)
# Log complete response
self._log_response('analyze_video', response)
if response.status_code == 200:
# Extract text content from response
content = response.output.choices[0].message.content[0]["text"]
return {
'success': True,
'content': content,
'fps': fps_value
}
else:
# If file:// format fails, try Base64 encoding as fallback
# Note: Base64 encoding increases file size by ~33%, and API limit is 10MB for encoded video
logger.warning(f"File:// format failed, trying Base64 encoding. Error: {response.message}")
try:
file_size = video_path.stat().st_size
# Check if file is too large for Base64 encoding (10MB limit for encoded video)
if file_size > 7 * 1024 * 1024: # ~7MB raw = ~10MB encoded
logger.error(f"Video file too large for Base64 encoding: {file_size} bytes (limit: ~7MB raw)")
return {
'success': False,
'error': f"Video file too large for Base64 encoding. File size: {file_size / 1024 / 1024:.2f}MB, limit: ~7MB raw (~10MB encoded). Please use a smaller video file.",
'status_code': response.status_code
}
# Read file and encode to Base64
with open(video_path, 'rb') as f:
video_data = f.read()
video_base64 = base64.b64encode(video_data).decode('utf-8')
logger.info(f"Using Base64 encoding (file size: {file_size} bytes, encoded size: {len(video_base64)} bytes)")
# According to DashScope docs, Base64 format should be: base64 encoded string
# Try data URI format first
messages_base64 = [
{
'role': 'user',
'content': [
{'video': f"data:video/mp4;base64,{video_base64}", 'fps': fps_value},
{'text': prompt}
]
}
]
# Prepare request data for logging (Base64 encoding)
request_data_base64 = {
'model': self.model,
'api_key': self.api_key,
'messages': messages_base64,
'fps': fps_value,
'encoding': 'base64',
'file_size': file_size,
'encoded_size': len(video_base64)
}
# Log complete request (Base64 encoding)
self._log_request('analyze_video_base64', request_data_base64)
response = MultiModalConversation.call(
model=self.model,
messages=messages_base64
)
# Log complete response (Base64 encoding)
self._log_response('analyze_video_base64', response)
if response.status_code == 200:
content = response.output.choices[0].message.content[0]["text"]
return {
'success': True,
'content': content,
'fps': fps_value
}
else:
logger.error(f"DashScope API error (all formats including Base64 failed) - Status: {response.status_code}, Message: {response.message}")
return {
'success': False,
'error': f"API error: {response.message}",
'status_code': response.status_code
}
except Exception as e:
logger.error(f"Base64 encoding failed: {str(e)}")
return {
'success': False,
'error': f"All upload methods failed. Last error: {response.message}. Base64 encoding error: {str(e)}",
'status_code': response.status_code
}
except Exception as e:
logger.error(f"DashScope API exception: {str(e)}, Path used: {video_path_for_api}")
return {
'success': False,
'error': str(e)
}
def summarize_video(self, video_path: Path, fps: Optional[int] = None) -> Dict[str, Any]:
"""
Generate video summary
Args:
video_path: Path to video file
fps: Frames per second (overrides config if provided)
Returns:
Summary result dictionary
"""
prompt = "请对这段视频进行总结,包括主要内容、关键场景和重要信息。"
return self.analyze_video(video_path, prompt, fps)
def compare_videos(self, video_paths: list[Path], fps: Optional[int] = None) -> Dict[str, Any]:
"""
Compare multiple videos
Args:
video_paths: List of video file paths
fps: Frames per second (overrides config if provided)
Returns:
Comparison result dictionary
"""
# Ensure API key is set
self._ensure_api_key()
if len(video_paths) < 2:
return {
'success': False,
'error': 'At least two videos are required for comparison'
}
# Validate all video files exist
for path in video_paths:
if not path.exists():
return {
'success': False,
'error': f"Video file not found: {path}"
}
# Convert paths - try direct absolute path strings first
video_paths_for_api = []
for path in video_paths:
absolute_path = path.absolute()
local_path = str(absolute_path)
video_paths_for_api.append(local_path)
fps_value = fps if fps is not None else self.fps
# Prepare messages with multiple videos using file:// URI format
content = []
for video_uri in video_paths_for_api:
content.append({'video': video_uri, 'fps': fps_value})
prompt = "请对比这些视频的内容,找出它们的相似之处和不同之处,并详细说明。"
content.append({'text': prompt})
messages = [
{
'role': 'user',
'content': content
}
]
# Call API
try:
# Prepare request data for logging
request_data = {
'model': self.model,
'api_key': self.api_key,
'messages': messages,
'fps': fps_value,
'video_paths': video_paths_for_api,
'video_count': len(video_paths_for_api)
}
# Log complete request
self._log_request('compare_videos', request_data)
response = MultiModalConversation.call(
model=self.model,
messages=messages
)
# Log complete response
self._log_response('compare_videos', response)
if response.status_code == 200:
content_text = response.output.choices[0].message.content[0]["text"]
return {
'success': True,
'content': content_text,
'fps': fps_value
}
else:
# If direct paths fail, try file:// format
logger.warning(f"Direct paths failed, trying file:// format. Error: {response.message}")
video_urls = []
for path in video_paths:
absolute_path = path.absolute()
local_path = str(absolute_path)
video_urls.append(f"file://{local_path}")
content_retry = []
for video_url in video_urls:
content_retry.append({'video': video_url, 'fps': fps_value})
content_retry.append({'text': prompt})
messages_retry = [
{
'role': 'user',
'content': content_retry
}
]
# Prepare request data for logging (retry with file:// format)
request_data_retry = {
'model': self.model,
'api_key': self.api_key,
'messages': messages_retry,
'fps': fps_value,
'video_paths': video_urls,
'video_count': len(video_urls),
'retry': True
}
# Log complete request (retry)
self._log_request('compare_videos_retry', request_data_retry)
response = MultiModalConversation.call(
model=self.model,
messages=messages_retry
)
# Log complete response (retry)
self._log_response('compare_videos_retry', response)
if response.status_code == 200:
content_text = response.output.choices[0].message.content[0]["text"]
return {
'success': True,
'content': content_text,
'fps': fps_value
}
else:
logger.error(f"DashScope API error (both attempts failed) - Status: {response.status_code}, Message: {response.message}")
return {
'success': False,
'error': f"API error: {response.message}",
'status_code': response.status_code
}
except Exception as e:
logger.error(f"DashScope API exception (compare): {str(e)}")
return {
'success': False,
'error': str(e)
}