-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcertpatrol.py
More file actions
1536 lines (1314 loc) · 55.7 KB
/
certpatrol.py
File metadata and controls
1536 lines (1314 loc) · 55.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
Torito CertPatrol - Tiny local CT tailer that filters domains by a regex pattern.
Author: Martin Aberastegue
Website: https://torito.io
Repository: https://github.com/ToritoIO/CertPatrol
Options:
-p, --pattern PATTERN Regex pattern to match domains against (required)
-l, --logs LOGS CT logs to tail (default: fetch all usable logs)
-b, --batch SIZE Batch size for fetching entries (default: 256)
-s, --poll-sleep SECONDS Seconds to sleep between polls (default: 3.0)
-v, --verbose Verbose output (extra info for matches)
-q, --quiet-warnings Suppress parse warnings (only show actual matches)
-e, --etld1 Match against registrable base domain instead of full domain
-d, --debug-all With -v, print per-batch and per-entry domain listings
-x, --quiet-parse-errors Suppress ASN.1 parsing warnings (common in CT logs)
-c, --checkpoint-prefix Custom prefix for checkpoint file (useful for multiple instances)
-k, --cleanup-checkpoints Clean up orphaned checkpoint files and exit
-m, --max-memory-mb Maximum memory usage in MB for batch processing (default: 100)
-mn, --min-poll-sleep Minimum poll sleep time for adaptive polling (default: 1.0)
-mx, --max-poll-sleep Maximum poll sleep time for adaptive polling (default: 60.0)
-h, --help Show this help message and exit
Requirements:
pip install requests cryptography idna
# Optional but recommended for --etld1
pip install tldextract
"""
# Suppress OpenSSL and cryptography warnings BEFORE any imports
import warnings
warnings.filterwarnings("ignore", category=UserWarning, module="urllib3")
warnings.filterwarnings("ignore", message=".*OpenSSL.*")
warnings.filterwarnings("ignore", message=".*LibreSSL.*")
warnings.filterwarnings("ignore", category=DeprecationWarning, module="cryptography")
warnings.filterwarnings("ignore", message=".*serial number.*")
try:
from urllib3.exceptions import NotOpenSSLWarning
warnings.filterwarnings("ignore", category=NotOpenSSLWarning)
except ImportError:
pass
import argparse
import json
import logging
import os
import re
import signal
import sys
import time
import multiprocessing
import gc
from typing import List, Tuple, Optional, Dict, Any, Iterator, Union
from contextlib import contextmanager
import idna
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from cryptography import x509
from cryptography.hazmat.primitives import serialization
# Make checkpoint file unique per process to avoid conflicts when running multiple instances
CHECKPOINT_DIR = "checkpoints"
CHECKPOINT_FILE = os.path.join(CHECKPOINT_DIR, f"certpatrol_checkpoints_{os.getpid()}.json")
USER_AGENT = "torito-certpatrol/1.2.0 (+local)"
LOG_LIST_URL = "https://www.gstatic.com/ct/log_list/v3/log_list.json"
# Phase 2: Connection pooling and rate limiting constants
DEFAULT_POOL_CONNECTIONS = 10
DEFAULT_POOL_MAXSIZE = 20
DEFAULT_MAX_RETRIES = 3
DEFAULT_BACKOFF_FACTOR = 0.3
DEFAULT_TIMEOUT = (10, 30) # (connect, read)
# Phase 2: Memory management constants
DEFAULT_MAX_MEMORY_MB = 100
MEMORY_CHECK_INTERVAL = 100 # Check memory every N entries
# Phase 2: Adaptive polling constants
DEFAULT_MIN_POLL_SLEEP = 1.0
DEFAULT_MAX_POLL_SLEEP = 60.0
BACKOFF_MULTIPLIER = 1.5
SUCCESS_REDUCTION_FACTOR = 0.8
# Dynamic CT log discovery - fetched from Google's official list
CT_LOGS = {}
# Phase 3: Logging setup
logger = logging.getLogger('certpatrol')
def setup_logging(verbose: bool = False, quiet_warnings: bool = False) -> None:
"""
Setup logging configuration based on verbosity settings.
Maintains backward compatibility with print-based output.
Args:
verbose: Enable debug level logging
quiet_warnings: Suppress warning level messages
"""
# Remove existing handlers
for handler in logger.handlers[:]:
logger.removeHandler(handler)
# Create console handler
console_handler = logging.StreamHandler()
# Set logging level based on verbose and quiet settings
if verbose:
logger.setLevel(logging.DEBUG)
console_handler.setLevel(logging.DEBUG)
elif quiet_warnings:
logger.setLevel(logging.ERROR)
console_handler.setLevel(logging.ERROR)
else:
logger.setLevel(logging.INFO)
console_handler.setLevel(logging.INFO)
# Create formatter that mimics original print output
formatter = logging.Formatter('%(message)s')
console_handler.setFormatter(formatter)
# Add handler to logger
logger.addHandler(console_handler)
logger.propagate = False
# Signal handling for graceful shutdown
class GracefulShutdownHandler:
"""
Handles graceful shutdown on various signals (SIGTERM, SIGINT, etc.).
This class manages the shutdown process by:
- Catching termination signals
- Saving current checkpoints
- Cleaning up resources
- Providing clean exit
"""
def __init__(self):
self.shutdown_requested = False
self.checkpoints: Optional[Dict[str, int]] = None
self.cleanup_functions: List[callable] = []
def request_shutdown(self, signum: int, frame) -> None:
"""
Signal handler that requests graceful shutdown.
Args:
signum: Signal number
frame: Current stack frame
"""
signal_names = {
signal.SIGTERM: "SIGTERM",
signal.SIGINT: "SIGINT",
signal.SIGHUP: "SIGHUP",
}
signal_name = signal_names.get(signum, f"Signal {signum}")
if not self.shutdown_requested:
# Always show shutdown message regardless of quiet mode
print(f"Received {signal_name}, initiating graceful shutdown...", flush=True)
self.shutdown_requested = True
# Save checkpoints if available
if self.checkpoints is not None:
try:
save_checkpoints(self.checkpoints)
print("Checkpoints saved successfully", flush=True)
except CheckpointError as e:
print(f"Failed to save checkpoints during shutdown: {e}", flush=True)
# Run cleanup functions
for cleanup_func in self.cleanup_functions:
try:
cleanup_func()
except Exception as e:
print(f"Error during cleanup: {e}", flush=True)
else:
# Second signal, force exit
print(f"Received second {signal_name}, forcing immediate exit", flush=True)
sys.exit(1)
def register_cleanup(self, cleanup_func: callable) -> None:
"""
Register a cleanup function to be called during shutdown.
Args:
cleanup_func: Function to call during cleanup
"""
self.cleanup_functions.append(cleanup_func)
def set_checkpoints(self, checkpoints: Dict[str, int]) -> None:
"""
Set the current checkpoints for potential saving during shutdown.
Args:
checkpoints: Current checkpoint data
"""
self.checkpoints = checkpoints
def should_shutdown(self) -> bool:
"""
Check if shutdown has been requested.
Returns:
True if shutdown was requested, False otherwise
"""
return self.shutdown_requested
def setup_signal_handlers(self) -> None:
"""Setup signal handlers for graceful shutdown."""
# Handle common termination signals
signals_to_handle = [signal.SIGTERM, signal.SIGINT]
# Add SIGHUP on Unix systems (not available on Windows)
if hasattr(signal, 'SIGHUP'):
signals_to_handle.append(signal.SIGHUP)
for sig in signals_to_handle:
signal.signal(sig, self.request_shutdown)
# Global shutdown handler instance
shutdown_handler = GracefulShutdownHandler()
# Phase 3: Configuration validation
def validate_config_args(args: argparse.Namespace) -> List[str]:
"""
Validate command line arguments and configuration.
This function performs comprehensive validation of all input parameters
to catch configuration errors early and provide helpful error messages.
Args:
args: Parsed command line arguments
Returns:
List of validation error messages (empty if all valid)
"""
errors = []
# Validate pattern if provided (not required for cleanup operation)
if hasattr(args, 'pattern') and args.pattern:
try:
re.compile(args.pattern, re.IGNORECASE)
except re.error as e:
errors.append(f"Invalid regex pattern: {e}")
# Validate numeric parameters
if hasattr(args, 'batch') and args.batch is not None:
if args.batch <= 0:
errors.append("Batch size must be positive")
elif args.batch > 10000:
errors.append("Batch size too large (max 10000)")
if hasattr(args, 'poll_sleep') and args.poll_sleep is not None:
if args.poll_sleep < 0:
errors.append("Poll sleep cannot be negative")
elif args.poll_sleep > 3600:
errors.append("Poll sleep too large (max 3600 seconds)")
if hasattr(args, 'max_memory_mb') and args.max_memory_mb is not None:
if args.max_memory_mb <= 0:
errors.append("Max memory must be positive")
elif args.max_memory_mb < 10:
errors.append("Max memory too small (min 10MB)")
elif args.max_memory_mb > 10000:
errors.append("Max memory too large (max 10GB)")
if hasattr(args, 'min_poll_sleep') and args.min_poll_sleep is not None:
if args.min_poll_sleep < 0:
errors.append("Min poll sleep cannot be negative")
elif args.min_poll_sleep > 300:
errors.append("Min poll sleep too large (max 300 seconds)")
if hasattr(args, 'max_poll_sleep') and args.max_poll_sleep is not None:
if args.max_poll_sleep < 0:
errors.append("Max poll sleep cannot be negative")
elif args.max_poll_sleep > 3600:
errors.append("Max poll sleep too large (max 3600 seconds)")
# Cross-parameter validation
if (hasattr(args, 'min_poll_sleep') and hasattr(args, 'max_poll_sleep') and
args.min_poll_sleep is not None and args.max_poll_sleep is not None):
if args.max_poll_sleep < args.min_poll_sleep:
errors.append("Max poll sleep must be >= min poll sleep")
# Validate checkpoint prefix if provided
if hasattr(args, 'checkpoint_prefix') and args.checkpoint_prefix:
import string
safe_chars = string.ascii_letters + string.digits + "_-"
if not all(c in safe_chars for c in args.checkpoint_prefix):
errors.append("Checkpoint prefix can only contain letters, digits, underscores, and hyphens")
if len(args.checkpoint_prefix) > 50:
errors.append("Checkpoint prefix too long (max 50 characters)")
return errors
# Phase 1 Improvement: Custom exceptions for better error handling
class CertPatrolError(Exception):
"""
Base exception for CertPatrol errors.
All custom exceptions in CertPatrol inherit from this base class
to allow for easy exception handling and categorization.
"""
pass
class CheckpointError(CertPatrolError):
"""
Raised when checkpoint operations fail.
This includes scenarios such as:
- Failed to create checkpoint directory
- Corrupted checkpoint files
- Invalid checkpoint data structure
- Atomic write failures
"""
pass
class CTLogError(CertPatrolError):
"""
Raised when CT log operations fail.
This includes scenarios such as:
- Network errors when connecting to CT logs
- Invalid responses from CT log endpoints
- Malformed JSON data from CT logs
- Authentication or rate limiting issues
"""
pass
class MemoryError(CertPatrolError):
"""
Raised when memory limits are exceeded.
This custom memory error is separate from Python's built-in
MemoryError to distinguish between system-level and
application-level memory management issues.
"""
pass
# Phase 2: HTTP Session Management with Connection Pooling
class HTTPSessionManager:
"""
Manages HTTP sessions with connection pooling and retries.
This class provides a centralized way to manage HTTP connections
for CT log communication, implementing connection pooling for
improved performance and retry strategies for better reliability.
Attributes:
timeout: Tuple of (connect_timeout, read_timeout) in seconds
session: The underlying requests.Session object
"""
def __init__(
self,
pool_connections: int = DEFAULT_POOL_CONNECTIONS,
pool_maxsize: int = DEFAULT_POOL_MAXSIZE,
max_retries: int = DEFAULT_MAX_RETRIES,
backoff_factor: float = DEFAULT_BACKOFF_FACTOR,
timeout: Tuple[int, int] = DEFAULT_TIMEOUT
) -> None:
"""
Initialize the HTTP session manager.
Args:
pool_connections: Number of connection pools to cache
pool_maxsize: Maximum number of connections to save in the pool
max_retries: Maximum number of retry attempts
backoff_factor: Backoff factor for retry delays
timeout: Tuple of (connect, read) timeout values in seconds
"""
self.timeout = timeout
self.session = self._create_session(pool_connections, pool_maxsize, max_retries, backoff_factor)
def _create_session(
self,
pool_connections: int,
pool_maxsize: int,
max_retries: int,
backoff_factor: float
) -> requests.Session:
"""
Create a requests session with connection pooling and retry strategy.
Args:
pool_connections: Number of connection pools to cache
pool_maxsize: Maximum number of connections to save in the pool
max_retries: Maximum number of retry attempts
backoff_factor: Backoff factor for retry delays
Returns:
Configured requests.Session object
"""
session = requests.Session()
# Configure retry strategy
retry_strategy = Retry(
total=max_retries,
status_forcelist=[429, 500, 502, 503, 504],
backoff_factor=backoff_factor,
allowed_methods=["HEAD", "GET", "OPTIONS"]
)
# Configure HTTP adapter with connection pooling
adapter = HTTPAdapter(
pool_connections=pool_connections,
pool_maxsize=pool_maxsize,
max_retries=retry_strategy,
pool_block=False
)
session.mount("http://", adapter)
session.mount("https://", adapter)
session.headers.update({"User-Agent": USER_AGENT})
return session
def get(self, url: str, **kwargs) -> requests.Response:
"""
Make a GET request using the managed session.
Args:
url: The URL to request
**kwargs: Additional arguments passed to requests.get()
Returns:
Response object from the request
"""
kwargs.setdefault('timeout', self.timeout)
return self.session.get(url, **kwargs)
def close(self) -> None:
"""Close the session and clean up connections."""
if hasattr(self, 'session'):
self.session.close()
def __enter__(self) -> 'HTTPSessionManager':
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Context manager exit with cleanup."""
self.close()
# Phase 2: Adaptive Rate Limiting
class AdaptiveRateLimiter:
"""
Manages adaptive polling intervals with exponential backoff on errors.
This class implements an adaptive rate limiting strategy that:
- Increases sleep intervals on consecutive errors (exponential backoff)
- Decreases sleep intervals on consecutive successes
- Maintains configurable minimum and maximum sleep bounds
Attributes:
current_sleep: Current sleep interval in seconds
min_sleep: Minimum allowed sleep interval
max_sleep: Maximum allowed sleep interval
consecutive_errors: Count of consecutive error occurrences
consecutive_successes: Count of consecutive successful operations
"""
def __init__(
self,
initial_sleep: float,
min_sleep: float = DEFAULT_MIN_POLL_SLEEP,
max_sleep: float = DEFAULT_MAX_POLL_SLEEP
) -> None:
"""
Initialize the adaptive rate limiter.
Args:
initial_sleep: Starting sleep interval in seconds
min_sleep: Minimum allowed sleep interval in seconds
max_sleep: Maximum allowed sleep interval in seconds
"""
self.current_sleep = initial_sleep
self.min_sleep = min_sleep
self.max_sleep = max_sleep
self.consecutive_errors = 0
self.consecutive_successes = 0
def on_success(self) -> None:
"""
Called when an operation succeeds.
Resets error counter and potentially reduces sleep interval
after multiple consecutive successes.
"""
self.consecutive_errors = 0
self.consecutive_successes += 1
# Gradually reduce sleep time on consecutive successes
if self.consecutive_successes >= 3:
self.current_sleep = max(
self.min_sleep,
self.current_sleep * SUCCESS_REDUCTION_FACTOR
)
self.consecutive_successes = 0
def on_error(self) -> None:
"""
Called when an operation fails.
Resets success counter and increases sleep interval
using exponential backoff strategy.
"""
self.consecutive_successes = 0
self.consecutive_errors += 1
# Exponential backoff on consecutive errors
self.current_sleep = min(
self.max_sleep,
self.current_sleep * (BACKOFF_MULTIPLIER ** self.consecutive_errors)
)
def sleep(self) -> None:
"""Sleep for the current adaptive interval."""
time.sleep(self.current_sleep)
def get_current_sleep(self) -> float:
"""
Get the current sleep interval.
Returns:
Current sleep interval in seconds
"""
return self.current_sleep
# Phase 2: Memory Monitor
class MemoryMonitor:
"""
Monitors and manages memory usage during processing.
This class provides memory monitoring capabilities to prevent
excessive memory usage during certificate processing. It can
trigger garbage collection and raise exceptions when limits
are exceeded.
Attributes:
max_memory_bytes: Maximum allowed memory usage in bytes
check_counter: Counter for periodic memory checks
"""
def __init__(self, max_memory_mb: int = DEFAULT_MAX_MEMORY_MB) -> None:
"""
Initialize the memory monitor.
Args:
max_memory_mb: Maximum allowed memory usage in megabytes
"""
self.max_memory_bytes = max_memory_mb * 1024 * 1024
self.check_counter = 0
def check_memory(self) -> None:
"""
Check current memory usage and trigger GC if needed.
Raises:
MemoryError: If memory usage exceeds the configured limit
even after garbage collection
"""
self.check_counter += 1
if self.check_counter % MEMORY_CHECK_INTERVAL == 0:
try:
import psutil # type: ignore[import-untyped]
process = psutil.Process()
memory_info = process.memory_info()
if memory_info.rss > self.max_memory_bytes:
# Force garbage collection
gc.collect()
# Check again after GC
memory_info = process.memory_info()
if memory_info.rss > self.max_memory_bytes:
raise MemoryError(
f"Memory usage ({memory_info.rss / 1024 / 1024:.1f}MB) "
f"exceeds limit ({self.max_memory_bytes / 1024 / 1024:.1f}MB)"
)
except ImportError:
# psutil not available, use basic GC trigger
if self.check_counter % (MEMORY_CHECK_INTERVAL * 10) == 0:
gc.collect()
# --- Phase 1: Certificate parsing with cryptography library ---
def _read_uint24(b: bytes, offset: int) -> Tuple[int, int]:
"""
Read a 3-byte big-endian unsigned int, return (value, new_offset).
Args:
b: Byte array to read from
offset: Starting position in the byte array
Returns:
Tuple of (parsed_value, new_offset)
Raises:
ValueError: If there are insufficient bytes remaining
"""
if offset + 3 > len(b):
raise ValueError("Truncated uint24")
return (b[offset] << 16) | (b[offset+1] << 8) | b[offset+2], offset + 3
def parse_tls_cert_chain(extra_data_b64: str) -> List[bytes]:
"""
Parse certificates from CT 'extra_data' (base64).
CT logs concatenate DER certificates directly, not in TLS structure.
Returns a list of DER cert bytes [leaf, intermediates...].
Phase 1: Replaced manual ASN.1 parsing with cryptography library-based parsing
Phase 2: Added memory efficiency improvements
"""
import base64
try:
raw = base64.b64decode(extra_data_b64)
if len(raw) < 10: # Minimum reasonable certificate size
return []
certs = []
pos = 0
# Phase 2: Process certificates with memory awareness
while pos < len(raw):
try:
# Look for ASN.1 SEQUENCE start (0x30) which indicates start of certificate
if pos + 1 >= len(raw) or raw[pos] != 0x30:
pos += 1
continue
# Use a more robust approach: try different certificate lengths
min_cert_size = 100 # Very small certificates are unlikely
max_cert_size = min(len(raw) - pos, 10 * 1024 * 1024) # Max 10MB per cert
cert_found = False
# Try to find the correct certificate boundary by testing if we can parse it
for try_end in range(pos + min_cert_size, min(pos + max_cert_size + 1, len(raw) + 1)):
try:
candidate_der = raw[pos:try_end]
# Attempt to parse with cryptography library - this validates the DER structure
test_cert = x509.load_der_x509_certificate(candidate_der)
# If we got here, the certificate parsed successfully
certs.append(candidate_der)
pos = try_end
cert_found = True
break
except (ValueError, TypeError, x509.ExtensionNotFound, x509.InvalidVersion) as e:
# These are expected for partial certificates or invalid DER
continue
except Exception:
# Unexpected error, skip this attempt
continue
if not cert_found:
# No valid certificate found starting at this position, advance by 1
pos += 1
except Exception:
# If anything goes wrong, advance position and continue
pos += 1
continue
return certs
except Exception:
# Fallback: if base64 decode fails or other fundamental error
return []
def extract_domains_from_der(der_bytes: bytes) -> List[str]:
"""
Extract DNS names from SAN; if absent, fallback to CN when it looks like a DNS name.
Returns lowercased, Unicode (IDNA-decoded) domains.
"""
domains = []
# Suppress warnings for certificates with non-compliant serial numbers
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", message=".*serial number.*")
cert = x509.load_der_x509_certificate(der_bytes)
# Try SAN first
try:
san = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
for name in san.value.get_values_for_type(x509.DNSName):
domains.append(name)
except x509.ExtensionNotFound:
pass
# Fallback: subject CN
if not domains:
try:
cn = cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)[0].value
# crude DNS-ish check: contains a dot or wildcard
if "." in cn or cn.startswith("*."):
domains.append(cn)
except IndexError:
pass
# Normalize: lower-case, IDNA decode to Unicode for display, but keep ASCII if decode fails
normed = []
for d in domains:
d = d.strip().lower()
if d.startswith("*."):
base = d[2:]
try:
u = idna.decode(base)
normed.append("*." + u)
except Exception:
normed.append(d)
else:
try:
u = idna.decode(d)
normed.append(u)
except Exception:
normed.append(d)
return list(dict.fromkeys(normed)) # dedupe, keep order
def registrable_domain(domain: str) -> str:
"""
Return the registrable base domain (eTLD+1) for a given domain string.
Falls back to a best-effort heuristic if tldextract is unavailable.
Keeps Unicode/IDNA-decoded input as-is.
"""
# Strip wildcard for matching purposes
d = domain.lstrip("*.")
try:
# Import locally to avoid hard dependency unless used
import tldextract # type: ignore
ext = tldextract.extract(d)
if ext.domain and ext.suffix:
return f"{ext.domain}.{ext.suffix}"
return d
except Exception:
parts = d.split(".")
if len(parts) >= 2:
return ".".join(parts[-2:])
return d
# --- Phase 2: Enhanced Dynamic CT log discovery with connection pooling ---
def fetch_usable_ct_logs(verbose: bool = False, session_manager: HTTPSessionManager = None) -> Dict[str, str]:
"""
Fetch the current list of usable CT logs from Google's official list.
Returns a dict mapping log names to base URLs.
Phase 2: Uses HTTP session manager for connection pooling.
"""
close_session = False
if session_manager is None:
session_manager = HTTPSessionManager()
close_session = True
try:
if verbose:
logger.info("Fetching current CT log list from Google...")
resp = session_manager.get(LOG_LIST_URL)
resp.raise_for_status()
data = resp.json()
usable_logs = {}
# Extract logs from all operators
for operator in data.get("operators", []):
operator_name = operator.get("name", "unknown")
for log in operator.get("logs", []):
# Check if log is usable/qualified
state = log.get("state", {})
if "usable" in state or "qualified" in state:
url = log["url"].rstrip("/")
description = log.get("description", "")
# Create a simple name from description or URL
if description:
# Extract meaningful name from description
name = description.lower()
name = name.replace("'", "").replace('"', "")
name = name.replace(" log", "").replace(" ", "_")
# Take first part if too long
name = name.split("_")[0:2]
name = "_".join(name)
else:
# Fallback to URL-based name
name = url.split("/")[-1] or url.split("/")[-2]
# Ensure unique names
original_name = name
counter = 1
while name in usable_logs:
name = f"{original_name}_{counter}"
counter += 1
usable_logs[name] = url
if verbose:
logger.info(f"Found usable log: {name} -> {url}")
if verbose:
logger.info(f"Found {len(usable_logs)} usable CT logs")
return usable_logs
except requests.RequestException as e:
if verbose:
logger.warning(f"Network error fetching CT log list: {e}")
# Fallback to a known working log
return {"xenon2023": "https://ct.googleapis.com/logs/xenon2023"}
except (json.JSONDecodeError, KeyError, TypeError) as e:
if verbose:
logger.warning(f"Failed to parse CT log list: {e}")
# Fallback to a known working log
return {"xenon2023": "https://ct.googleapis.com/logs/xenon2023"}
except Exception as e:
if verbose:
logger.warning(f"Unexpected error fetching CT log list: {e}")
# Fallback to a known working log
return {"xenon2023": "https://ct.googleapis.com/logs/xenon2023"}
finally:
if close_session:
session_manager.close()
def save_debug_response(name: str, entry: dict, absolute_idx: int) -> None:
"""
Save a CT log entry to a debug file for analysis.
Args:
name: Name of the CT log
entry: The CT log entry data to save
absolute_idx: Absolute index of the entry in the log
"""
debug_dir = "debug_responses"
try:
if not os.path.exists(debug_dir):
os.makedirs(debug_dir)
filename = f"{debug_dir}/{name}_{absolute_idx}.json"
with open(filename, "w", encoding="utf-8") as f:
json.dump(entry, f, indent=2, default=str)
logger.debug(f"Saved response to {filename}")
except (OSError, IOError, TypeError, ValueError) as e:
logger.debug(f"Failed to save response: {e}")
# --- Phase 2: Enhanced CT polling with connection pooling ---
def get_sth(base_url: str, session_manager: HTTPSessionManager) -> int:
"""
Return current tree_size of the CT log.
Phase 2: Uses HTTP session manager for connection pooling.
"""
try:
r = session_manager.get(f"{base_url}/ct/v1/get-sth")
r.raise_for_status()
data = r.json()
return int(data["tree_size"])
except requests.RequestException as e:
raise CTLogError(f"Network error getting STH from {base_url}: {e}")
except (json.JSONDecodeError, KeyError, ValueError, TypeError) as e:
raise CTLogError(f"Invalid STH response from {base_url}: {e}")
def get_entries(base_url: str, start: int, end: int, session_manager: HTTPSessionManager) -> List[dict]:
"""
Fetch entries [start..end] inclusive (may return fewer).
Phase 2: Uses HTTP session manager for connection pooling.
"""
try:
r = session_manager.get(
f"{base_url}/ct/v1/get-entries",
params={"start": start, "end": end}
)
r.raise_for_status()
data = r.json()
return data.get("entries", [])
except requests.RequestException as e:
raise CTLogError(f"Network error getting entries from {base_url}: {e}")
except (json.JSONDecodeError, KeyError, TypeError) as e:
raise CTLogError(f"Invalid entries response from {base_url}: {e}")
# --- Phase 1: Enhanced Checkpoint Management ---
def ensure_checkpoint_dir():
"""Ensure the checkpoints directory exists."""
try:
if not os.path.exists(CHECKPOINT_DIR):
os.makedirs(CHECKPOINT_DIR)
except OSError as e:
raise CheckpointError(f"Failed to create checkpoint directory: {e}")
def validate_checkpoint_data(data: Any) -> Dict[str, int]:
"""
Validate checkpoint data structure and content.
Returns validated data or raises CheckpointError.
"""
if not isinstance(data, dict):
raise CheckpointError("Checkpoint data must be a dictionary")
validated = {}
for key, value in data.items():
if not isinstance(key, str):
raise CheckpointError(f"Checkpoint key must be string, got {type(key)}")
if not isinstance(value, (int, float)):
raise CheckpointError(f"Checkpoint value must be numeric, got {type(value)}")
# Convert to int and validate range
try:
int_value = int(value)
if int_value < 0:
raise CheckpointError(f"Checkpoint value must be non-negative, got {int_value}")
validated[key] = int_value
except (ValueError, OverflowError) as e:
raise CheckpointError(f"Invalid checkpoint value for {key}: {e}")
return validated
def load_checkpoints() -> Dict[str, int]:
"""
Enhanced checkpoint loading with validation.
Returns:
Dictionary mapping log names to checkpoint positions
"""
ensure_checkpoint_dir()
if os.path.exists(CHECKPOINT_FILE):
try:
with open(CHECKPOINT_FILE, "r", encoding="utf-8") as fh:
raw_data = json.load(fh)
return validate_checkpoint_data(raw_data)
except (json.JSONDecodeError, IOError) as e:
logger.warning(f"Corrupted checkpoint file, starting fresh: {e}")
return {}
except CheckpointError as e:
logger.warning(f"Invalid checkpoint data, starting fresh: {e}")
return {}
return {}
def save_checkpoints(cp: Dict[str, int]) -> None:
"""
Enhanced atomic write with validation and integrity checks.
Args:
cp: Dictionary mapping log names to checkpoint positions
Raises:
CheckpointError: If checkpoint data is invalid or save operation fails
"""
ensure_checkpoint_dir()
# Validate checkpoint data before saving
try:
validated_cp = validate_checkpoint_data(cp)
except CheckpointError as e:
raise CheckpointError(f"Cannot save invalid checkpoint data: {e}")
tmp = CHECKPOINT_FILE + ".tmp"
max_retries = 3
retry_delay = 0.1
for attempt in range(max_retries):
try:
# Write to temporary file
with open(tmp, "w", encoding="utf-8") as fh:
json.dump(validated_cp, fh, indent=2)
# Verify the file was written correctly by reading it back
try:
with open(tmp, "r", encoding="utf-8") as fh:
verify_data = json.load(fh)
validate_checkpoint_data(verify_data)
except Exception as e:
raise CheckpointError(f"Checkpoint verification failed: {e}")
# Atomically replace the original file
os.replace(tmp, CHECKPOINT_FILE)
return
except (OSError, IOError, CheckpointError) as e:
if attempt < max_retries - 1:
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
continue
else:
# Clean up temp file on final failure
try:
if os.path.exists(tmp):
os.unlink(tmp)
except OSError:
pass
raise CheckpointError(f"Failed to save checkpoints after {max_retries} attempts: {e}")