""" Video model """ from datetime import datetime from bson import ObjectId from typing import Optional, Dict, Any, List from app.models.database import get_database class Video: """Video document model""" COLLECTION_NAME = 'videos' def __init__(self, filename: str, file_path: str, file_size: int, mime_type: str, upload_time: Optional[datetime] = None, status: str = 'uploaded', _id: Optional[ObjectId] = None): """ Initialize Video model Args: filename: Video filename file_path: Video file path file_size: File size in bytes mime_type: MIME type upload_time: Upload timestamp status: Video status (uploaded, analyzing, analyzed, failed) _id: Document ID """ self._id = _id or ObjectId() self.filename = filename self.file_path = file_path self.file_size = file_size self.mime_type = mime_type self.upload_time = upload_time or datetime.utcnow() self.status = status def to_dict(self) -> Dict[str, Any]: """Convert to dictionary""" return { '_id': self._id, 'filename': self.filename, 'file_path': self.file_path, 'file_size': self.file_size, 'mime_type': self.mime_type, 'upload_time': self.upload_time, 'status': self.status } @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'Video': """Create Video from dictionary""" return cls( _id=data.get('_id'), filename=data['filename'], file_path=data['file_path'], file_size=data['file_size'], mime_type=data['mime_type'], upload_time=data.get('upload_time'), status=data.get('status', 'uploaded') ) def save(self) -> ObjectId: """Save video to database""" db = get_database() result = db[self.COLLECTION_NAME].insert_one(self.to_dict()) self._id = result.inserted_id return self._id @classmethod def find_by_id(cls, video_id: str) -> Optional['Video']: """Find video by ID""" db = get_database() doc = db[cls.COLLECTION_NAME].find_one({'_id': ObjectId(video_id)}) if doc: return cls.from_dict(doc) return None @classmethod def find_all(cls, limit: int = 100, skip: int = 0) -> List['Video']: """Find all videos""" db = get_database() cursor = db[cls.COLLECTION_NAME].find().sort('upload_time', -1).skip(skip).limit(limit) return [cls.from_dict(doc) for doc in cursor] def update_status(self, status: str): """Update video status""" db = get_database() db[self.COLLECTION_NAME].update_one( {'_id': self._id}, {'$set': {'status': status}} ) self.status = status def delete(self): """Delete video from database""" db = get_database() db[self.COLLECTION_NAME].delete_one({'_id': self._id})