@@ -373,37 +373,95 @@ def __add_emails(self, typ: str, data: str, insert_display_names: bool = True) -
373373 # email object doesn't support display name for all email addrs
374374 pass
375375
376+ def extract_matches (self , pattern : re .Pattern [str ], text : str ) -> list [tuple [str , ...]]:
377+ """Returns all regex matches for a given pattern in a text."""
378+ return re .findall (pattern , text )
379+
380+ def add_ip_attribute (self , ip_candidate : str , received : str , seen_attributes : set [tuple [str , str ]]) -> None :
381+ """Validates and adds an IP address to MISP if it's public and not already seen during extraction."""
382+ try :
383+ ip = ipaddress .ip_address (ip_candidate )
384+ if not ip .is_private and ("received-header-ip" , ip_candidate ) not in seen_attributes :
385+ self .add_attribute ("received-header-ip" , ip_candidate , comment = received )
386+ seen_attributes .add (("received-header-ip" , ip_candidate ))
387+ except ValueError :
388+ pass # Invalid IPs are ignored
389+
390+ def add_hostname_attribute (self , hostname : str , received : str , seen_attributes : set [tuple [str , str ]]) -> None :
391+ """Validates and adds a hostname to MISP if it contains a valid TLD-like format and is not already seen."""
392+ if "." in hostname and not hostname .endswith ("." ) and len (hostname .split ("." )[- 1 ]) > 1 :
393+ if ("received-header-hostname" , hostname ) not in seen_attributes :
394+ self .add_attribute ("received-header-hostname" , hostname , comment = received )
395+ seen_attributes .add (("received-header-hostname" , hostname ))
396+
397+ def process_received_header (self , received : str , seen_attributes : set [tuple [str , str ]]) -> None :
398+ """Processes a single 'Received' header and extracts hostnames and IPs."""
399+
400+ # Regex patterns
401+ received_from_regex = re .compile (
402+ r'from\s+([\w.-]+)' # Declared sending hostname
403+ r'(?:\s+\(([^)]+)\))?' # Reverse DNS hostname inside parentheses
404+ )
405+ ipv4_regex = re .compile (
406+ r'\[(?P<ipv4_brackets>(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.'
407+ r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.'
408+ r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.'
409+ r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9]))\]' # IPv4 inside []
410+ r'|\((?P<ipv4_parentheses>(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.'
411+ r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.'
412+ r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.'
413+ r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9]))\)' # IPv4 inside ()
414+ r'|(?<=\.\s)(?P<ipv4_after_domain>(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.'
415+ r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.'
416+ r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.'
417+ r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9]))\b' # IPv4 appearing after a domain.
418+ )
419+ ipv6_regex = re .compile (
420+ r'\b(?:[a-fA-F0-9]{1,4}:[a-fA-F0-9]{1,4}(?::[a-fA-F0-9]{1,4}){0,6})\b'
421+ )
422+
423+ # Extract hostnames
424+ matches = self .extract_matches (received_from_regex , received )
425+ for match in matches :
426+ declared_sending_host = match [0 ].strip () if match [0 ] else None
427+ reverse_dns_host = match [1 ].split ()[0 ].strip ("[]()" ).rstrip ('.' ) if match [1 ] else None
428+
429+ if declared_sending_host :
430+ clean_host = declared_sending_host .strip ("[]()" )
431+ try :
432+ ipaddress .ip_address (declared_sending_host )
433+ self .add_ip_attribute (declared_sending_host , received , seen_attributes )
434+ except ValueError :
435+ self .add_hostname_attribute (declared_sending_host , received , seen_attributes )
436+
437+ if reverse_dns_host :
438+ try :
439+ ipaddress .ip_address (reverse_dns_host )
440+ self .add_ip_attribute (reverse_dns_host , received , seen_attributes )
441+ except ValueError :
442+ self .add_hostname_attribute (reverse_dns_host , received , seen_attributes )
443+
444+ # Extract and add **only valid** IPv4 addresses
445+ for ipv4_match in self .extract_matches (ipv4_regex , received ):
446+ ip_candidate = ipv4_match [0 ] or ipv4_match [1 ] or ipv4_match [2 ] # Select first non-empty match
447+ if ip_candidate :
448+ self .add_ip_attribute (ip_candidate , received , seen_attributes )
449+
450+ # Extract and add IPv6 addresses
451+ for ipv6_match in self .extract_matches (ipv6_regex , received ):
452+ self .add_ip_attribute (ipv6_match , received , seen_attributes )
453+
376454 def __generate_received (self ) -> None :
377455 """
378- Extract IP addresses from received headers that are not private. Also extract hostnames or domains .
456+ Extracts public IP addresses and hostnames from "Received" email headers .
379457 """
380- received_items = self .email .get_all ("received" )
381- if received_items is None :
382- return
383- for received in received_items :
384- fromstr = re .split (r"\sby\s" , received )[0 ].strip ()
385- if fromstr .startswith ('from' ) is not True :
386- continue
387- for i in ['(' , ')' , '[' , ']' ]:
388- fromstr = fromstr .replace (i , " " )
389- tokens = fromstr .split (" " )
390- ip = None
391- for token in tokens :
392- try :
393- ip = ipaddress .ip_address (token )
394- break
395- except ValueError :
396- pass # token is not IP address
397458
398- if not ip or ip .is_private :
399- continue # skip header if IP not found or is private
459+ received_items = self .email .get_all ("Received" )
460+ if not received_items :
461+ return
400462
401- self .add_attribute ("received-header-ip" , value = str (ip ), comment = fromstr )
463+ # Track added attributes to prevent duplicates (store as (type, value) tuples)
464+ seen_attributes : set [tuple [str , str ]] = set ()
402465
403- # The hostnames and/or domains always come after the "Received: from"
404- # part so we can use regex to pick up those attributes.
405- received_from = re .findall (r'(?<=from\s)[\w\d\.\-]+\.\w{2,24}' , str (received_items ))
406- try :
407- [self .add_attribute ("received-header-hostname" , i ) for i in received_from ]
408- except Exception :
409- pass
466+ for received in received_items :
467+ self .process_received_header (received , seen_attributes )
0 commit comments