Описание
Записи сохраняются локально. При сбое диска, случайном удалении или катастрофе пользователь теряет все данные. Необходима интеграция с облачными хранилищами для автоматического бэкапа.
Текущее поведение
Запись завершена → Сохранение локально → ???
Result: Данные уязвимы к потере
Ожидаемое поведение
Запись завершена → Локальное сохранение → Автозагрузка в облако → История синхронизации
Result: Защищённые данные
Технические требования
1. Архитектура провайдеров
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import BinaryIO
class CloudProvider(Enum):
S3 = "s3"
GOOGLE_DRIVE = "google_drive"
ONEDRIVE = "onedrive"
WEBDAV = "webdav"
@dataclass
class CloudUploadResult:
"""Результат загрузки в облако."""
success: bool
remote_path: str | None
remote_url: str | None
size_bytes: int
error: str | None
@dataclass
class CloudSyncStatus:
"""Статус синхронизации."""
file_path: str
remote_path: str
status: str # "pending", "uploading", "completed", "failed"
progress: float # 0.0 - 1.0
error: str | None
class CloudStorageProvider(ABC):
"""Абстрактный провайдер облачного хранилища."""
@abstractmethod
def configure(self, credentials: dict) -> bool:
"""Настройка провайдера с credentials."""
...
@abstractmethod
def is_configured(self) -> bool:
"""Проверка настройки провайдера."""
...
@abstractmethod
async def upload_file(
self,
local_path: Path,
remote_path: str,
progress_callback: Callable[[float], None] = None
) -> CloudUploadResult:
"""Загрузка файла в облако."""
...
@abstractmethod
async def download_file(
self,
remote_path: str,
local_path: Path,
progress_callback: Callable[[float], None] = None
) -> bool:
"""Скачивание файла из облака."""
...
@abstractmethod
def list_files(self, prefix: str = "") -> list[dict]:
"""Список файлов в хранилище."""
...
@abstractmethod
def delete_file(self, remote_path: str) -> bool:
"""Удаление файла из хранилища."""
...
2. S3 провайдер
import boto3
from botocore.exceptions import ClientError
class S3Provider(CloudStorageProvider):
"""Amazon S3 / S3-совместимое хранилище."""
def __init__(self):
self._client: boto3.Session | None = None
self._bucket: str | None = None
def configure(self, credentials: dict) -> bool:
"""Настройка S3."""
try:
self._client = boto3.Session(
aws_access_key_id=credentials["access_key"],
aws_secret_access_key=credentials["secret_key"],
region_name=credentials.get("region", "us-east-1")
)
self._bucket = credentials["bucket"]
self._endpoint = credentials.get("endpoint") # Для MinIO
# Тестовая проверка
s3 = self._client.resource("s3")
s3.Bucket(self._bucket).load()
return True
except Exception as e:
logger.error(f"S3 configuration failed: {e}")
return False
def is_configured(self) -> bool:
return self._client is not None and self._bucket is not None
async def upload_file(
self,
local_path: Path,
remote_path: str,
progress_callback: Callable[[float], None] = None
) -> CloudUploadResult:
"""Загрузка в S3."""
try:
s3 = self._client.client("s3")
file_size = local_path.stat().st_size
extra_args = {"Metadata": {"original-name": local_path.name}}
# Multipart upload для больших файлов
config = boto3.s3.transfer.TransferConfig(
multipart_threshold=100 * 1024 * 1024, # 100 MB
multipart_chunksize=50 * 1024 * 1024, # 50 MB
)
s3.upload_file(
str(local_path),
self._bucket,
remote_path,
Config=config,
ExtraArgs=extra_args,
Callback=self._create_progress_callback(progress_callback, file_size)
)
url = f"https://{self._bucket}.s3.amazonaws.com/{remote_path}"
return CloudUploadResult(
success=True,
remote_path=remote_path,
remote_url=url,
size_bytes=file_size,
error=None
)
except ClientError as e:
return CloudUploadResult(
success=False,
remote_path=None,
remote_url=None,
size_bytes=0,
error=str(e)
)
def _create_progress_callback(self, user_callback, total_size):
class ProgressCallback:
def __init__(self):
self._uploaded = 0
def __call__(self, chunk_size):
self._uploaded += chunk_size
if user_callback:
user_callback(self._uploaded / total_size)
return ProgressCallback()
3. Google Drive провайдер
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from googleapiclient.errors import HttpError
class GoogleDriveProvider(CloudStorageProvider):
"""Google Drive провайдер."""
SCOPES = ["https://www.googleapis.com/auth/drive.file"]
def __init__(self):
self._service = None
self._folder_id = None
def configure(self, credentials: dict) -> bool:
"""Настройка через OAuth токен."""
try:
creds = Credentials.from_authorized_user_info(credentials)
self._service = build("drive", "v3", credentials=creds)
# Создать/найти папку для бэкапов
self._folder_id = self._get_or_create_backup_folder()
return True
except Exception as e:
logger.error(f"Google Drive configuration failed: {e}")
return False
def _get_or_create_backup_folder(self) -> str:
"""Получает или создаёт папку для бэкапов."""
# Поиск существующей
results = self._service.files().list(
q="name='MIA-ScreenCapture Backups' and mimeType='application/vnd.google-apps.folder'",
spaces="drive",
fields="files(id, name)"
).execute()
if results.get("files"):
return results["files"][0]["id"]
# Создание новой
file_metadata = {
"name": "MIA-ScreenCapture Backups",
"mimeType": "application/vnd.google-apps.folder"
}
folder = self._service.files().create(
body=file_metadata,
fields="id"
).execute()
return folder["id"]
async def upload_file(
self,
local_path: Path,
remote_path: str,
progress_callback: Callable[[float], None] = None
) -> CloudUploadResult:
"""Загрузка в Google Drive."""
try:
file_metadata = {
"name": local_path.name,
"parents": [self._folder_id]
}
media = MediaFileUpload(
str(local_path),
resumable=True
)
request = self._service.files().create(
body=file_metadata,
media_body=media,
fields="id, webViewLink"
)
# resumable upload с прогрессом
response = None
while response is None:
status, response = request.next_chunk()
if status and progress_callback:
progress_callback(status.progress())
return CloudUploadResult(
success=True,
remote_path=remote_path,
remote_url=response.get("webViewLink"),
size_bytes=local_path.stat().st_size,
error=None
)
except HttpError as e:
return CloudUploadResult(
success=False,
remote_path=None,
remote_url=None,
size_bytes=0,
error=str(e)
)
4. Менеджер синхронизации
import asyncio
from dataclasses import dataclass
from typing import TypedDict
class SyncState(TypedDict):
status: str
progress: float
last_sync: str | None
error: str | None
class CloudSyncManager:
"""Менеджер синхронизации с облаком."""
def __init__(
self,
provider: CloudStorageProvider,
config: CloudSyncConfig
):
self._provider = provider
self._config = config
self._queue: asyncio.Queue = asyncio.Queue()
self._running = False
self._sync_state: dict[str, SyncState] = {}
async def start(self) -> None:
"""Запуск фоновой синхронизации."""
self._running = True
self._worker_task = asyncio.create_task(self._worker())
async def stop(self) -> None:
"""Остановка синхронизации."""
self._running = False
await self._worker_task
async def queue_upload(self, file_path: Path) -> None:
"""Добавить файл в очередь загрузки."""
await self._queue.put(file_path)
async def _worker(self) -> None:
"""Worker для обработки очереди."""
while self._running:
try:
# Взять следующий файл (с таймаутом)
file_path = await asyncio.wait_for(
self._queue.get(),
timeout=5.0
)
await self._upload_file(file_path)
except asyncio.TimeoutError:
continue
except Exception as e:
logger.error(f"Sync worker error: {e}")
async def _upload_file(self, file_path: Path) -> None:
"""Загрузка одного файла."""
file_key = str(file_path)
self._sync_state[file_key] = {
"status": "uploading",
"progress": 0.0,
"last_sync": None,
"error": None
}
# Формирование remote path
remote_path = self._generate_remote_path(file_path)
def progress(progress_value: float) -> None:
self._sync_state[file_key]["progress"] = progress_value
result = await self._provider.upload_file(
file_path,
remote_path,
progress_callback=progress
)
if result.success:
self._sync_state[file_key]["status"] = "completed"
self._sync_state[file_key]["last_sync"] = datetime.now().isoformat()
else:
self._sync_state[file_key]["status"] = "failed"
self._sync_state[file_key]["error"] = result.error
def _generate_remote_path(self, file_path: Path) -> str:
"""Генерация пути в облаке."""
date = datetime.now()
return (
f"Recordings/{date.year}/{date.month:02d}/"
f"{date.strftime('%Y%m%d_%H%M%S')}_{file_path.name}"
)
5. Интеграция с RecordingService
class RecordingService:
def __init__(self, ...):
# ... existing init ...
self._cloud_sync: CloudSyncManager | None = None
self._cloud_config: CloudSyncConfig | None = None
def configure_cloud_sync(
self,
provider: CloudProvider,
credentials: dict,
config: CloudSyncConfig
) -> bool:
"""Настройка облачной синхронизации."""
provider_impl = self._create_provider(provider)
if not provider_impl.configure(credentials):
return False
self._cloud_sync = CloudSyncManager(provider_impl, config)
self._cloud_config = config
return True
def _on_recording_completed(self, result: RecordingResult) -> None:
"""Обработчик завершения записи."""
# Существующая логика...
# Отправить в облако
if self._cloud_sync and self._cloud_config:
min_size = self._cloud_config.min_file_size_mb * 1024 * 1024
if result.file_size >= min_size:
await self._cloud_sync.queue_upload(result.output_path)
6. API endpoints
# Настройка облачного провайдера
POST /api/v1/config/cloud
Body: {
"provider": "s3",
"credentials": {
"access_key": "...",
"secret_key": "...",
"bucket": "my-recordings",
"region": "eu-west-1"
},
"config": {
"auto_sync": true,
"min_file_size_mb": 10,
"sync_after_minutes": 5
}
}
Response: {"success": true, "provider": "s3"}
# Проверка статуса
GET /api/v1/cloud/status
Response: {
"provider": "s3",
"is_configured": true,
"queue_size": 3,
"sync_status": {
"recording_001.mp4": {"status": "completed"},
"recording_002.mp4": {"status": "uploading", "progress": 0.45}
}
}
# Вручную синхронизировать
POST /api/v1/cloud/sync
Body: {"file_paths": ["path/to/file.mp4"]}
Response: {"queued": 1}
Конфигурация
# config.py
class CloudSyncConfig(BaseModel):
"""Конфигурация облачной синхронизации."""
provider: CloudProvider | None = None
auto_sync: bool = False
min_file_size_mb: float = Field(default=10.0, ge=0)
sync_delay_minutes: int = Field(default=5, ge=0)
retention_days: int | None = Field(default=None)
credentials: dict = Field(default_factory=dict)
class AppSettingsSchema(BaseModel):
# ... existing fields ...
cloud_sync: CloudSyncConfig = Field(default_factory=CloudSyncConfig)
Критерии приёмки
Файлы для изменения
core/cloud_providers/ — новый пакет с провайдерами
core/cloud_sync.py — менеджер синхронизации
core/recording_service.py — интеграция
api/routes_config.py — endpoints
gui/views/settings_view.py — UI настроек
config.py — настройки синхронизации
tests/unit/ — тесты
Приоритет: P3
Тэги: enhancement, functionality
Описание
Записи сохраняются локально. При сбое диска, случайном удалении или катастрофе пользователь теряет все данные. Необходима интеграция с облачными хранилищами для автоматического бэкапа.
Текущее поведение
Ожидаемое поведение
Технические требования
1. Архитектура провайдеров
2. S3 провайдер
3. Google Drive провайдер
4. Менеджер синхронизации
5. Интеграция с RecordingService
6. API endpoints
Конфигурация
Критерии приёмки
Файлы для изменения
core/cloud_providers/— новый пакет с провайдерамиcore/cloud_sync.py— менеджер синхронизацииcore/recording_service.py— интеграцияapi/routes_config.py— endpointsgui/views/settings_view.py— UI настроекconfig.py— настройки синхронизацииtests/unit/— тестыПриоритет: P3
Тэги:
enhancement,functionality