diff --git a/.github/workflows/cc-server-check.yaml b/.github/workflows/cc-server-check.yaml new file mode 100644 index 0000000..72f7a06 --- /dev/null +++ b/.github/workflows/cc-server-check.yaml @@ -0,0 +1,50 @@ +# This tests if the fail2ban filtering on the CC cdx index server is too strict. + +name: CC server check (weekly) + +on: + schedule: + # Weekly schedule: every Monday at 9:00 AM UTC + - cron: '0 9 * * 1' + workflow_dispatch: # Allows manual triggering + # pull_request: # Run automatically for PRs + push: + branches: + - 'feat/**' # Trigger on feature branches + paths: + - 'tests/cc_server_check.py' + - '.github/workflows/cc-server-check.yaml' + +jobs: + check: + runs-on: ubuntu-latest + + steps: + - name: checkout + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + + - name: Get Runner IP + run: | + echo "Runner IP: $(curl -s https://ipinfo.io/ip)" + + - name: Install dependencies + run: | + pip install requests + + - name: Run external API tests + id: api_test + run: | + python tests/cc_server_check.py + + - name: Upload test results + if: always() + uses: actions/upload-artifact@v4 + with: + name: fail2ban-test-results-${{ github.run_number }} + path: fail2ban_test_results.json + retention-days: 14 diff --git a/tests/cc_server_check.py b/tests/cc_server_check.py new file mode 100644 index 0000000..d93f023 --- /dev/null +++ b/tests/cc_server_check.py @@ -0,0 +1,344 @@ +#!/usr/bin/env python3 +"""Check CDX API endpoints from external source (e.g., GitHub action) to detect if fail2ban is working as expected. + +NOTE: This is a dedicated script and NOT a unit test. + +Usage: + +```bash +python tests/cc_server_check.py +``` + +""" + +import requests +import time +import sys +import json +from datetime import datetime +from typing import List, Dict + +API_BASE = 'https://index.commoncrawl.org' # Update with your actual domain +USER_AGENT = 'pypi_cdx_toolkit/fail2ban-monitor' +CRAWL_ID = 'CC-MAIN-2025-43' +DEFAULT_LIMIT = 1 + +DOMAINS = [ + 'blogspot.com', + 'wikipedia.org', + 'wordpress.org', + 'ebay.com', + 'europa.eu', + 'app.link', + 'google.com', + 'wiktionary.org', + 'ning.com', +] # taken from https://commoncrawl.github.io/cc-crawl-statistics/plots/domains + +# Test scenarios that simulate legitimate usage patterns +test_scenarios = [ + { + 'name': 'Single user normal browsing', + 'description': 'Simulates a user making occasional requests', + 'requests': [ + { + 'url': f'{API_BASE}/{CRAWL_ID}-index', + 'params': {'url': 'example.com/*', 'output': 'json', 'limit': DEFAULT_LIMIT}, + }, + { + 'url': f'{API_BASE}/{CRAWL_ID}-index', + 'params': {'url': 'example.org/*', 'output': 'json', 'limit': DEFAULT_LIMIT}, + }, + { + 'url': f'{API_BASE}/{CRAWL_ID}-index', + 'params': {'url': 'example.net/*', 'output': 'json', 'limit': DEFAULT_LIMIT}, + }, + ], + 'delay_between': 2.0, # seconds + 'should_succeed': True, + }, + { + 'name': 'Moderate API usage', + 'description': 'Simulates a script making regular requests', + 'requests': [ + { + 'url': f'{API_BASE}/{CRAWL_ID}-index', + 'params': {'url': f'{DOMAINS[i]}/*', 'output': 'json', 'limit': DEFAULT_LIMIT}, + } + for i in range(8) + ], + 'delay_between': 8.0, # 8 requests over ~64 seconds (within cdx limit of 10/60s) + 'should_succeed': True, + }, + { + 'name': 'Collection info check', + 'description': 'Checking collection info (stricter limits)', + 'requests': [ + {'url': f'{API_BASE}/collinfo.json', 'params': {}}, + {'url': f'{API_BASE}/collinfo.json', 'params': {}}, + ], + 'delay_between': 6.0, # 2 requests over 6+ seconds (within limit of 3/10s) + 'should_succeed': True, + }, + { + 'name': 'Edge case - near limit', + 'description': 'Tests behavior near the rate limit threshold', + 'requests': [ + { + 'url': f'{API_BASE}/cc-index', + 'params': { + 'url': f'{DOMAINS[i]}/*', + 'output': 'json', + 'limit': DEFAULT_LIMIT + } + } + for i in range(9) + ], + 'delay_between': 7.0, # 9 requests in ~63 seconds (just under 10/60s limit) + 'should_succeed': True, + }, + { + 'name': 'Burst detection - collinfo', + 'description': 'Tests if legitimate burst triggers ban on collinfo endpoint', + 'requests': [{'url': f'{API_BASE}/collinfo.json', 'params': {}} for _ in range(4)], + 'delay_between': 4.0, # 4 requests over 8 seconds (WILL trigger ban at 3/10s) + 'should_succeed': False, # This SHOULD get banned + }, +] + + +def is_connection_blocked(error_msg: str) -> bool: + """Determine if an error indicates IP blocking.""" + blocking_indicators = [ + 'Connection refused', + '[Errno 61]', # macOS/BSD connection refused + '[Errno 111]', # Linux connection refused + 'Max retries exceeded', + 'NewConnectionError', + ] + return any(indicator in str(error_msg) for indicator in blocking_indicators) + + +def make_request(url: str, params: Dict, request_num: int) -> Dict: + """Make a single request and return results.""" + result = { + 'request_num': request_num, + 'timestamp': datetime.now().isoformat(), + 'url': url, + 'params': params, + 'success': False, + 'status_code': None, + 'blocked': False, + 'error': None, + 'response_time': None, + } + + start_time = time.time() + try: + response = requests.get(url, params=params, timeout=15, headers={'User-Agent': 'fail2ban-monitor/1.0'}) + result['response_time'] = time.time() - start_time + result['status_code'] = response.status_code + + if response.status_code == 200: + result['success'] = True + elif response.status_code in [403, 429, 503]: + result['blocked'] = True + result['error'] = f'HTTP blocked: {response.status_code}' + else: + result['error'] = f'Unexpected status: {response.status_code}' + + except requests.exceptions.Timeout: + result['error'] = 'Request timeout' + result['response_time'] = time.time() - start_time + result['blocked'] = True + except requests.exceptions.ConnectionError as e: + result['response_time'] = time.time() - start_time + error_str = str(e) + result['error'] = f'Connection error: {error_str}' + # Connection refused is a strong indicator of IP blocking + if is_connection_blocked(error_str): + result['blocked'] = True + except requests.exceptions.RequestException as e: + result['error'] = f'Request error: {str(e)}' + + return result + + +def run_scenario(scenario: Dict) -> Dict: + """Run a complete test scenario.""" + print(f'\n{"=" * 70}') + print(f'Scenario: {scenario["name"]}') + print(f'Description: {scenario["description"]}') + print(f'Total requests: {len(scenario["requests"])}') + print(f'{"=" * 70}') + + results = [] + blocked_at = None + + for i, request in enumerate(scenario['requests'], 1): + print(f'\n Request {i}/{len(scenario["requests"])}') + result = make_request(request['url'], request['params'], i) + results.append(result) + + if result['success']: + print(f' ✓ Success (200) - {result["response_time"]:.2f}s') + elif result['blocked']: + print(f' ✗ BLOCKED - {result["error"]}') + if result['status_code']: + print(f' Status code: {result["status_code"]}') + blocked_at = i + # Don't break immediately - log that we're blocked but continue to see pattern + # Actually, we should break because we can't make more requests + break + else: + print(f' ⚠ Failed - {result["error"]}') + + # Wait before next request (except after last one or if blocked) + if i < len(scenario['requests']) and not result['blocked']: + print(f' ⏱ Waiting {scenario["delay_between"]}s...') + time.sleep(scenario['delay_between']) + + # Analyze results + successful = sum(1 for r in results if r['success']) + blocked = sum(1 for r in results if r['blocked']) + failed = len(results) - successful - blocked + + summary = { + 'name': scenario['name'], + 'description': scenario['description'], + 'should_succeed': scenario['should_succeed'], + 'total_requests': len(scenario['requests']), + 'completed_requests': len(results), + 'successful': successful, + 'blocked': blocked, + 'failed': failed, + 'blocked_at_request': blocked_at, + 'unexpected_block': blocked > 0 and scenario['should_succeed'], + 'results': results, + } + + return summary + + +def print_summary(all_summaries: List[Dict]): + """Print overall test summary.""" + print(f'\n\n{"=" * 70}') + print('OVERALL SUMMARY') + print(f'{"=" * 70}\n') + + total_scenarios = len(all_summaries) + problematic_scenarios = [] + + for summary in all_summaries: + # Problematic if: (1) should succeed but got blocked, OR (2) should fail but didn't get blocked + is_problematic = False + + if summary['should_succeed'] and summary['blocked'] > 0: + # Should have worked but got blocked = TOO STRICT + is_problematic = True + status = '❌ TOO STRICT' + elif not summary['should_succeed'] and summary['blocked'] == 0: + # Should have been blocked but wasn't = TOO LENIENT + is_problematic = True + status = '❌ TOO LENIENT' + elif not summary['should_succeed'] and summary['blocked'] > 0: + # Correctly blocked as expected + status = '✅ BLOCKED (expected)' + else: + # Correctly succeeded + status = '✅ OK' + + if is_problematic: + problematic_scenarios.append(summary) + + print(f'{status} {summary["name"]}') + print(f' {summary["successful"]}/{summary["completed_requests"]} successful', end='') + + if summary['blocked'] > 0: + print(f', {summary["blocked"]} blocked at request #{summary["blocked_at_request"]}') + else: + print() + + if summary['failed'] > 0: + print(f' ⚠ {summary["failed"]} requests failed (non-block errors)') + + print(f'\n{"=" * 70}') + print(f'Total scenarios: {total_scenarios}') + print(f'Problematic: {len(problematic_scenarios)}') + print(f'{"=" * 70}\n') + + if problematic_scenarios: + print('⚠️ FAIL2BAN CONFIGURATION ISSUES DETECTED ⚠️\n') + + too_strict = [s for s in problematic_scenarios if s['should_succeed'] and s['blocked'] > 0] + too_lenient = [s for s in problematic_scenarios if not s['should_succeed'] and s['blocked'] == 0] + + if too_strict: + print('🔒 TOO STRICT - Legitimate usage patterns are being blocked:\n') + for scenario in too_strict: + print(f' • {scenario["name"]}') + print(f' Blocked at request {scenario["blocked_at_request"]}/{scenario["total_requests"]}') + print() + + if too_lenient: + print('🔓 TOO LENIENT - Abuse patterns are NOT being blocked:\n') + for scenario in too_lenient: + print(f' • {scenario["name"]}') + print(f' Completed {scenario["completed_requests"]}/{scenario["total_requests"]} without ban') + print() + + # print('📋 Recommendations:') + # if too_strict: + # print(' - Increase maxretry values') + # print(' - Increase findtime windows') + # print(' - Review filter patterns for false positives') + # if too_lenient: + # print(' - Decrease maxretry values') + # print(' - Decrease findtime windows') + # print(' - Verify fail2ban is running and filters are active') + return False + else: + print('✅ All test scenarios behaved as expected') + print(' fail2ban rules appear correctly configured') + return True + + +def main(): + print(f'Starting fail2ban external monitoring at {datetime.now()}') + print(f'Target: {API_BASE}\n') + + all_summaries = [] + + for scenario in test_scenarios: + summary = run_scenario(scenario) + all_summaries.append(summary) + + # If we got blocked, warn and wait longer before next scenario + if summary['blocked'] > 0: + print('\n⚠️ IP may be blocked - waiting 60s for potential unban...') + time.sleep(60) + else: + print('\n⏱ Waiting 60s before next scenario...') + time.sleep(60) + + # Print summary and determine exit code + success = print_summary(all_summaries) + + # Save detailed results to file for GitHub Actions artifact + with open('fail2ban_test_results.json', 'w') as f: + json.dump( + { + 'timestamp': datetime.now().isoformat(), + 'api_base': API_BASE, + 'overall_success': success, + 'summaries': all_summaries, + }, + f, + indent=2, + ) + + sys.exit(0 if success else 1) + + +if __name__ == '__main__': + main()