145 lines
4.3 KiB
Python
145 lines
4.3 KiB
Python
"""
|
|
Video service
|
|
"""
|
|
from pathlib import Path
|
|
from typing import Optional, Tuple, List
|
|
from werkzeug.datastructures import FileStorage
|
|
from app.models.video import Video
|
|
from app.models.database import get_database
|
|
from app.config import Config
|
|
from app.utils.validators import is_allowed_file, validate_file_size, secure_file_path
|
|
from app.utils.file_utils import ensure_directory_exists, get_file_size
|
|
|
|
class VideoService:
|
|
"""Video management service"""
|
|
|
|
def __init__(self, config: Config):
|
|
"""
|
|
Initialize video service
|
|
|
|
Args:
|
|
config: Application configuration
|
|
"""
|
|
self.config = config
|
|
self.upload_folder = config.upload_folder
|
|
ensure_directory_exists(self.upload_folder)
|
|
|
|
def upload_video(self, file: FileStorage) -> Tuple[Optional[Video], Optional[str]]:
|
|
"""
|
|
Upload video file
|
|
|
|
Args:
|
|
file: Uploaded file
|
|
|
|
Returns:
|
|
Tuple of (Video object, error_message)
|
|
"""
|
|
# Validate filename
|
|
if not file.filename:
|
|
return None, "No filename provided"
|
|
|
|
# Validate file extension
|
|
if not is_allowed_file(file.filename, self.config.allowed_extensions):
|
|
allowed = ', '.join(self.config.allowed_extensions)
|
|
return None, f"File format not supported. Allowed formats: {allowed}"
|
|
|
|
# Save file temporarily to get size
|
|
temp_path = self.upload_folder / f"temp_{file.filename}"
|
|
try:
|
|
file.save(str(temp_path))
|
|
file_size = get_file_size(temp_path)
|
|
|
|
# Validate file size
|
|
is_valid, error_msg = validate_file_size(file_size, self.config.max_upload_size)
|
|
if not is_valid:
|
|
temp_path.unlink()
|
|
return None, error_msg
|
|
|
|
# Generate secure file path
|
|
secure_path = secure_file_path(file.filename, self.upload_folder)
|
|
|
|
# Handle filename conflicts
|
|
counter = 1
|
|
original_path = secure_path
|
|
while secure_path.exists():
|
|
stem = original_path.stem
|
|
suffix = original_path.suffix
|
|
secure_path = self.upload_folder / f"{stem}_{counter}{suffix}"
|
|
counter += 1
|
|
|
|
# Move file to final location
|
|
temp_path.rename(secure_path)
|
|
|
|
# Create video document
|
|
video = Video(
|
|
filename=file.filename,
|
|
file_path=str(secure_path),
|
|
file_size=file_size,
|
|
mime_type=file.content_type or 'video/mp4',
|
|
status='uploaded'
|
|
)
|
|
video.save()
|
|
|
|
return video, None
|
|
|
|
except Exception as e:
|
|
if temp_path.exists():
|
|
temp_path.unlink()
|
|
return None, f"Upload failed: {str(e)}"
|
|
|
|
def get_video(self, video_id: str) -> Optional[Video]:
|
|
"""
|
|
Get video by ID
|
|
|
|
Args:
|
|
video_id: Video ID
|
|
|
|
Returns:
|
|
Video object or None
|
|
"""
|
|
try:
|
|
return Video.find_by_id(video_id)
|
|
except Exception:
|
|
return None
|
|
|
|
def list_videos(self, limit: int = 100, skip: int = 0) -> List[Video]:
|
|
"""
|
|
List all videos
|
|
|
|
Args:
|
|
limit: Maximum number of videos to return
|
|
skip: Number of videos to skip
|
|
|
|
Returns:
|
|
List of Video objects
|
|
"""
|
|
return Video.find_all(limit=limit, skip=skip)
|
|
|
|
def delete_video(self, video_id: str) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Delete video
|
|
|
|
Args:
|
|
video_id: Video ID
|
|
|
|
Returns:
|
|
Tuple of (success, error_message)
|
|
"""
|
|
video = self.get_video(video_id)
|
|
if not video:
|
|
return False, "Video not found"
|
|
|
|
try:
|
|
# Delete file
|
|
file_path = Path(video.file_path)
|
|
if file_path.exists():
|
|
file_path.unlink()
|
|
|
|
# Delete from database
|
|
video.delete()
|
|
|
|
return True, None
|
|
except Exception as e:
|
|
return False, f"Delete failed: {str(e)}"
|
|
|