""" Video analysis service """ from pathlib import Path from typing import Optional, Tuple, List from app.models.video import Video from app.models.analysis import AnalysisResult, Summary from app.models.comparison import Comparison from app.services.dashscope_service import DashScopeService from app.config import Config class AnalysisService: """Video analysis service""" def __init__(self, config: Config): """ Initialize analysis service Args: config: Application configuration """ self.config = config self.dashscope_service = DashScopeService(config) def analyze_video(self, video_id: str, prompt: Optional[str] = None) -> Tuple[Optional[AnalysisResult], Optional[str]]: """ Analyze video content Args: video_id: Video ID prompt: Optional custom prompt Returns: Tuple of (AnalysisResult, error_message) """ video = Video.find_by_id(video_id) if not video: return None, "Video not found" video_path = Path(video.file_path) if not video_path.exists(): return None, "Video file not found" # Update status video.update_status('analyzing') # Default prompt if not prompt: prompt = "这段视频描绘的是什么景象? 请详细描述视频中的内容。" # Call DashScope API try: result = self.dashscope_service.analyze_video(video_path, prompt) except ValueError as e: video.update_status('failed') return None, str(e) if not result.get('success'): video.update_status('failed') return None, result.get('error', 'Analysis failed') # Save analysis result analysis = AnalysisResult( video_id=video._id, content=result['content'], fps=result['fps'], analysis_type='content' ) analysis.save() # Update video status video.update_status('analyzed') return analysis, None def summarize_video(self, video_id: str) -> Tuple[Optional[Summary], Optional[str]]: """ Generate video summary Args: video_id: Video ID Returns: Tuple of (Summary, error_message) """ video = Video.find_by_id(video_id) if not video: return None, "Video not found" video_path = Path(video.file_path) if not video_path.exists(): return None, "Video file not found" # Call DashScope API for summary try: result = self.dashscope_service.summarize_video(video_path) except ValueError as e: return None, str(e) if not result.get('success'): return None, result.get('error', 'Summary generation failed') # Save summary summary = Summary( video_id=video._id, summary_text=result['content'] ) summary.save() return summary, None def compare_videos(self, video_ids: List[str]) -> Tuple[Optional[Comparison], Optional[str]]: """ Compare multiple videos Args: video_ids: List of video IDs Returns: Tuple of (Comparison, error_message) """ if len(video_ids) < 2: return None, "At least two videos are required for comparison" # Get all videos videos = [] video_paths = [] for video_id in video_ids: video = Video.find_by_id(video_id) if not video: return None, f"Video not found: {video_id}" video_path = Path(video.file_path) if not video_path.exists(): return None, f"Video file not found: {video_id}" videos.append(video) video_paths.append(video_path) # Call DashScope API for comparison try: result = self.dashscope_service.compare_videos(video_paths) except ValueError as e: return None, str(e) if not result.get('success'): return None, result.get('error', 'Comparison failed') # Save comparison comparison = Comparison( video_ids=[video._id for video in videos], comparison_result=result['content'] ) comparison.save() return comparison, None def get_analysis(self, video_id: str) -> Optional[AnalysisResult]: """Get analysis result for video""" return AnalysisResult.find_by_video_id(video_id, 'content') def get_summary(self, video_id: str) -> Optional[Summary]: """Get summary for video""" return Summary.find_by_video_id(video_id)