Описание
Webhook уведомления (о завершении записи, ошибках) отправляются без аутентификации. Любой злоумышленник может отправить ложные webhook-запросы на настроенный URL, что может привести к:
- Ложным срабатываниям систем мониторинга
- Перегрузке внешних сервисов
- Компрометации связанных систем
Текущее поведение
Запись завершена → POST webhook.example.com/callback → ???
Result: Любой может отправить запрос
Ожидаемое поведение
Запись завершена → HMAC-SHA256(body, secret) → POST с подписью → Проверка на стороне клиента
Result: Защищённая коммуникация
Технические требования
1. Генерация и хранение секрета
import secrets
class WebhookSecretManager:
"""Управление секретами webhook."""
def __init__(self, storage: SecureStorage):
self._storage = storage
def generate_secret(self) -> str:
"""Генерирует новый секрет."""
secret = secrets.token_urlsafe(32) # 256-bit
self._storage.save("webhook_secret", secret)
return secret
def get_secret(self) -> str | None:
"""Получает сохранённый секрет."""
return self._storage.load("webhook_secret")
def validate_secret(self, secret: str) -> bool:
"""Проверяет формат секрета."""
if not secret or len(secret) < 32:
return False
try:
# Должен быть base64url
base64.urlsafe_b64decode(secret)
return True
except Exception:
return False
2. Создание подписи
import hmac
import hashlib
import time
from dataclasses import dataclass
@dataclass
class WebhookPayload:
"""Payload для webhook."""
event: str
timestamp: int
data: dict
def to_bytes(self) -> bytes:
return json.dumps({
"event": self.event,
"timestamp": self.timestamp,
"data": self.data
}, sort_keys=True).encode("utf-8")
class WebhookSigner:
"""Создание и проверка HMAC подписей."""
TIMESTAMP_TOLERANCE_SECONDS = 300 # 5 минут
def sign(self, payload: WebhookPayload, secret: str) -> dict:
"""Создаёт подпись для payload.
Returns:
Headers для включения в запрос
"""
timestamp = payload.timestamp
body = payload.to_bytes()
# Строка для подписи: timestamp.body
signature_base = f"{timestamp}.".encode("utf-8") + body
signature = hmac.new(
secret.encode("utf-8"),
signature_base,
hashlib.sha256
).hexdigest()
return {
"X-MIA-Signature": signature,
"X-MIA-Timestamp": str(timestamp),
"X-MIA-Event": payload.event
}
def verify(self, payload_body: bytes, headers: dict, secret: str) -> bool:
"""Проверяет подпись webhook.
Args:
payload_body: Тело запроса
headers: Заголовки (X-MIA-Signature, X-MIA-Timestamp)
secret: Секрет для проверки
Returns:
True если подпись валидна
"""
# Получить заголовки
signature = headers.get("X-MIA-Signature", "")
timestamp_str = headers.get("X-MIA-Timestamp", "")
if not signature or not timestamp_str:
return False
# Проверить timestamp
try:
timestamp = int(timestamp_str)
except ValueError:
return False
current_time = int(time.time())
if abs(current_time - timestamp) > self.TIMESTAMP_TOLERANCE_SECONDS:
return False # Replay attack protection
# Вычислить ожидаемую подпись
signature_base = f"{timestamp}.".encode("utf-8") + payload_body
expected = hmac.new(
secret.encode("utf-8"),
signature_base,
hashlib.sha256
).hexdigest()
# Constant-time сравнение
return hmac.compare_digest(signature, expected)
3. Отправка webhook
from urllib.request import Request, urlopen
from urllib.error import URLError
class WebhookSender:
"""Отправка webhook уведомлений."""
def __init__(self, url: str, secret: str | None = None):
self._url = url
self._secret = secret
def send(self, event: str, data: dict) -> bool:
"""Отправляет webhook."""
timestamp = int(time.time())
payload = WebhookPayload(
event=event,
timestamp=timestamp,
data=data
)
body = json.dumps(payload.to_dict(), default=str).encode("utf-8")
headers = {
"Content-Type": "application/json",
"User-Agent": "MIA-ScreenCapture/1.0",
}
# Добавить подпись если есть секрет
if self._secret:
signer = WebhookSigner()
sig_headers = signer.sign(payload, self._secret)
headers.update(sig_headers)
try:
request = Request(self._url, data=body, headers=headers, method="POST")
with urlopen(request, timeout=10) as response:
return 200 <= response.status < 300
except URLError as e:
logger.error(f"Webhook failed: {e}")
return False
def send_recording_completed(self, file_path: str, duration: float) -> bool:
"""Отправляет уведомление о завершении записи."""
return self.send("recording.completed", {
"file_path": file_path,
"duration_seconds": duration,
"file_size_bytes": Path(file_path).stat().st_size
})
def send_recording_error(self, error: str, details: str) -> bool:
"""Отправляет уведомление об ошибке."""
return self.send("recording.error", {
"error": error,
"details": details
})
4. Интеграция с RecordingService
class RecordingService:
def __init__(self, ...):
# ... existing init ...
self._webhook_sender: WebhookSender | None = None
def configure_webhook(self, url: str, secret: str | None = None) -> None:
"""Настраивает webhook."""
self._webhook_sender = WebhookSender(url, secret)
def _on_recording_completed(self, result: RecordingResult) -> None:
"""Обработчик завершения записи."""
# Существующая логика...
# Отправить webhook
if self._webhook_sender:
self._webhook_sender.send_recording_completed(
file_path=str(result.output_path),
duration=result.duration
)
5. API endpoints
# Настройка webhook
POST /api/v1/config/webhook
Body: {
"url": "https://example.com/webhook",
"secret": "auto" | "manual:base64secret" | null
}
Response: {
"webhook_url": "https://example.com/webhook",
"webhook_secret": "generated_secret_or_null",
"enabled": true
}
# Проверка webhook
POST /api/v1/config/webhook/test
Response: {
"success": true,
"response_time_ms": 45
}
# Получить настройки
GET /api/v1/config/webhook
Response: {
"url": "https://example.com/webhook",
"has_secret": true,
"enabled": true
}
Пример использования (сторона клиента)
Python (Flask)
from flask import Flask, request, abort
import hmac
import hashlib
import time
app = Flask(__name__)
WEBHOOK_SECRET = "your_secret_here"
@app.route('/webhook', methods=['POST'])
def webhook():
# Проверка подписи
signature = request.headers.get('X-MIA-Signature', '')
timestamp = request.headers.get('X-MIA-Timestamp', '')
body = request.get_data()
# Валидация timestamp
if abs(time.time() - int(timestamp)) > 300:
abort(403, 'Timestamp expired')
# Проверка подписи
expected = hmac.new(
WEBHOOK_SECRET.encode(),
f"{timestamp}.".encode() + body,
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(signature, expected):
abort(403, 'Invalid signature')
# Обработка
data = request.json
event = request.headers.get('X-MIA-Event')
if event == 'recording.completed':
print(f"Recording completed: {data['data']['file_path']}")
elif event == 'recording.error':
print(f"Recording error: {data['data']['error']}")
return {'success': True}
JavaScript (Node.js)
const crypto = require('crypto');
function verifyWebhook(req, secret) {
const signature = req.headers['x-mia-signature'];
const timestamp = req.headers['x-mia-timestamp'];
const body = JSON.stringify(req.body);
// Check timestamp
if (Math.abs(Date.now() / 1000 - parseInt(timestamp)) > 300) {
return false;
}
// Verify signature
const expected = crypto
.createHmac('sha256', secret)
.update(`${timestamp}.${body}`)
.digest('hex');
return crypto.timingSafeEqual(
Buffer.from(signature),
Buffer.from(expected)
);
}
Конфигурация
# config.py
class APISettingsSchema(BaseModel):
# ... existing fields ...
webhook_url: str | None = Field(default=None)
webhook_secret: str | None = Field(default=None)
webhook_enabled: bool = Field(default=True)
webhook_retry_count: int = Field(default=3, ge=0, le=10)
Критерии приёмки
Файлы для изменения
core/webhook.py — новый модуль
core/recording_service.py — отправка webhook
api/routes_config.py — endpoints настройки
config.py — настройки webhook
docs/ — документация с примерами
tests/unit/ — тесты
Приоритет: P1
Тэги: security, enhancement
Описание
Webhook уведомления (о завершении записи, ошибках) отправляются без аутентификации. Любой злоумышленник может отправить ложные webhook-запросы на настроенный URL, что может привести к:
Текущее поведение
Ожидаемое поведение
Технические требования
1. Генерация и хранение секрета
2. Создание подписи
3. Отправка webhook
4. Интеграция с RecordingService
5. API endpoints
Пример использования (сторона клиента)
Python (Flask)
JavaScript (Node.js)
Конфигурация
Критерии приёмки
Файлы для изменения
core/webhook.py— новый модульcore/recording_service.py— отправка webhookapi/routes_config.py— endpoints настройкиconfig.py— настройки webhookdocs/— документация с примерамиtests/unit/— тестыПриоритет: P1
Тэги:
security,enhancement