diff --git a/.gitignore b/.gitignore index 23ccece1a..7290b93c3 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ vendor/ generated/ .DS_Store *.swp +*.pyc diff --git a/custom_documentation/src/documentation_overrides.yaml b/custom_documentation/src/documentation_overrides.yaml new file mode 100644 index 000000000..cd444099d --- /dev/null +++ b/custom_documentation/src/documentation_overrides.yaml @@ -0,0 +1,211 @@ +- name: Endpoint.policy.applied.artifacts.global.channel + default: + description: The channel of the artifact. + example: stable + type: keyword + os: + linux: + description: The channel of the linux artifact. + windows: + description: The channel of the windows artifact. + macos: + description: The channel of the macos artifact. + # event: + # linux_malicious_behavior_alert: + # description: The channel of the artifact for linux malicious behavior alert. + # example: stable + +- name: agent.type + default: + example: endpoint + +- name: Endpoint.policy.applied.artifacts.global.identifiers.name + default: + example: global-configuration-v1 + +- name: Endpoint.policy.applied.artifacts.global.snapshot + default: + example: "latest" + +- name: Endpoint.policy.applied.artifacts.user.identifiers.name + os: + linux: + example: endpoint-trustlist-linux-v1 + windows: + example: endpoint-trustlist-windows-v1 + macos: + example: endpoint-trustlist-macos-v1 + +- name: Endpoint.policy.applied.artifacts.user.version + default: + example: "1.0.0" + +- name: agent.build.original + default: + example: "version: 9.1.0, compiled: Mon Jun 2 15:00:00 2025, branch: main, commit: 3fd26249705c5a467960870702589686ef04da43" + +- name: data_stream.dataset + default: + example: endpoint.alerts + +- name: event.action + default: + description: |- + Possible values for Endpoint include: + - elastic_endpoint_telemetry + - endpoint_metadata + - endpoint_policy_response + - endpoint_metrics + - endpoint_heartbeat + - malicious_file + - endpoint_unquarantine + - lookup_result + - lookup_requested + - creation + - deletion + - demand + - clone + - link + - exchange + - execution + - modification + - open + - query + - save + - overwrite + - rename + - extended_attributes_delete + - mount + - unknown + - load + - connection_accepted + - connection_attempted + - disconnect_received + - http_request + - udp_datagram_outgoing + - udp_datagram_incoming + - icmp_outgoing + - icmp_incoming + - already_running + - fork + - end + - exec + - gid_change + - start + - session_id_change + - uid_change + - remote_thread + - process_open + - text_output + - memfd_create + - shmget + - ptrace + - load_module + - log_on + - log_off + - workstation_locked + - workstation_unlocked + - ssh_log_on + - rdp_log_on + - service-installed + - scheduled-task-created + - scheduled-task-updated + - added-user-account + - group-membership-enumerated + - user-member-enumerated + - token-right-adjusted + - network-share-object-added + - network-share-object-access-checked + - vault-credentials-were-read + - gatekeeper_override + - mbr-overwrite + - files-encrypted + - canary-activity + - rule_detection + - rule_prevention + - api + - launch_daemon + - mount + - unmount + +- name: event.category + default: + type: array of keyword + example: '["malware", "intrusion_detection"]' + description: |- + Possible values for Endpoint include: + - authentication + - configuration + - driver + - file + - host + - iam + - intrusion_detection + - library + - malware + - network + - process + - registry + - session + - rule + - credential_hardening + - api + - volume_device + - security + +- name: event.dataset + default: + example: endpoint.alerts + +- name: event.module + default: + example: endpoint + description: |- + The module for Endpoint is always `endpoint` + +- name: event.risk_score + default: + example: "99" + description: Endpoint risk score uses a scale of 0 to 100, where 100 is the highest risk. + +- name: event.severity + default: + example: "73" + description: Endpoint severity uses a scale of 0 to 100, where 100 is the highest risk. + +- name: event.type + default: + example: '["info", "allowed"]' + type: array of keyword + description: |- + Possible values for Endpoint include: + - allowed + - change + - creation + - deletion + - denied + - end + - info + - protocol + - start + - access + - admin + - user + - group + +- name: event.kind + default: + description: |- + Possible values for Endpoint include: + - alert + - event + - metric + - state + +- name: event.outcome + default: + description: |- + Possible values for Endpoint include: + - success + - failure + - unknown diff --git a/scripts/generate-docs/pydocgen/Readme.md b/scripts/generate-docs/pydocgen/Readme.md new file mode 100644 index 000000000..2f5461b68 --- /dev/null +++ b/scripts/generate-docs/pydocgen/Readme.md @@ -0,0 +1,54 @@ +# Custom Documentation Generator + +## Description + +This module generates documentation for the custom endpoint fields defined in [custom_documentation](../../../custom_documentation/) + +### Background + +The fields defined in [custom_documentation](../../../custom_documentation/) do not have descriptions. They are simply the possible fields +of an event, including all the custom fields Endpoint uses but are not mapped. + +The fields defined in [package](../../../package/) are the fields that are mapped into Kibana. These fields have descriptions and documentation. + + +### Implementation + +This python module generates markdown for all of the fields in [custom_documentation](../../../custom_documentation/) by taking the following steps + +1. Parses all of the mapped fields defined in [package](../../../package/), collecting descriptions, examples, and other metadata + +2. Parses any override fields defined in [documentation_overrides.yaml](../../../custom_documentation/src/documentation_overrides.yaml) + - overrides can be set for any field. They can be set at the event level, the os level, or a default override that applies to all + instances of that field. + - See [documentation_overrides.yaml](../../../custom_documentation/src/documentation_overrides.yaml) for the format + - If overrides are updated, the documentation must be regenerated + +3. Puts all of that data into an sqlite database + +4. Parses all of the endpoint fields defined in [custom_documentation](../../../custom_documentation/) + +5. Iterates over the custom_documentation data, filling out descriptions and examples pulled from the database that was just created. + +### Example Usage +`python -m pydocgen --output-dir /path/to/output` + +#### Help statement +``` +usage: __main__.py [-h] [--database DATABASE] [--no-cache] [--output-dir OUTPUT_DIR] [-v] [-l {DEBUG,INFO,WARNING,ERROR,CRITICAL}] [--csv CSV] + +Create markdown documentation for the fields defined in custom_documentation + +options: + -h, --help show this help message and exit + --database DATABASE path to the database + --no-cache do not use cached database if it exists, always regenerate the database + --output-dir OUTPUT_DIR + output directory for markdown documentation + -v, --verbose Force maximum verbosity (DEBUG level + detailed output) + -l {DEBUG,INFO,WARNING,ERROR,CRITICAL}, --log-level {DEBUG,INFO,WARNING,ERROR,CRITICAL} + Set logging verbosity level + --csv CSV Path to CSV file for missing documentation fields (optional) + +Example usage: python -m pydocgen --output-dir /path/to/output +``` diff --git a/scripts/generate-docs/pydocgen/__init__.py b/scripts/generate-docs/pydocgen/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/scripts/generate-docs/pydocgen/__main__.py b/scripts/generate-docs/pydocgen/__main__.py new file mode 100644 index 000000000..74176ede5 --- /dev/null +++ b/scripts/generate-docs/pydocgen/__main__.py @@ -0,0 +1,110 @@ +import argparse +import logging +from logging import config +import pathlib +import traceback +import sys +import tempfile + +from .markdown import generate_custom_documentation_markdown + +from .models.custom_documentation import DocumentationOverrideMap + +from typing import Literal + + +def configure_logging( + log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + verbose: bool +) -> None: + """Configures the logging system with specified level and verbosity. + + Args: + log_level: String representation of logging level (DEBUG, INFO, etc.) + verbose: Boolean flag to force maximum verbosity + """ + level = getattr(logging, log_level) + + # If verbose is specified, override to DEBUG level + if verbose: + level = logging.DEBUG + + # Basic config with both handlers + logging.basicConfig( + level=level, + format="%(asctime)s - %(levelname)-8s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + + +def main(): + parser = argparse.ArgumentParser( + description="Create markdown documentation for the fields defined in custom_documentation", + epilog="Example usage: python -m pydocgen --output-dir /path/to/output", + ) + + parser.add_argument( + "--database", + default=pathlib.Path(tempfile.gettempdir()) / "generate-docs.sqlite", + type=pathlib.Path, + help="path to the database", + ) + + parser.add_argument( + "--no-cache", + action="store_true", + help="do not use cached database if it exists, always regenerate the database", + ) + + parser.add_argument( + "--output-dir", + default=pathlib.Path.cwd().resolve() / "output", + type=pathlib.Path, + help="output directory for markdown documentation", + ) + parser.add_argument( + "-v", + "--verbose", + action="store_true", + help="Force maximum verbosity (DEBUG level + detailed output)", + ) + + parser.add_argument( + "-l", + "--log-level", + type=str.upper, + choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + default="INFO", + help="Set logging verbosity level", + ) + + parser.add_argument( + "--overrides", + type=pathlib.Path, + default=pathlib.Path.cwd().resolve() / "custom_documentation" / "src" / "documentation_overrides.yaml", + ) + + parser.add_argument( + "--csv", + type=pathlib.Path, + default=None, + help="Path to CSV file for missing documentation fields (optional)", + ) + + args = parser.parse_args() + + configure_logging(args.log_level, args.verbose) + + if args.no_cache and args.database.exists(): + logging.info(f"Removing existing database {args.database} since --no-cache was specified") + args.database.unlink() + + generate_custom_documentation_markdown(args.database, args.output_dir, args.csv) + logging.info(f"Generated markdown documentation to {args.output_dir}") + +if __name__ == "__main__": + try: + main() + except Exception as e: + traceback.print_exc() + sys.exit(1) diff --git a/scripts/generate-docs/pydocgen/database.py b/scripts/generate-docs/pydocgen/database.py new file mode 100644 index 000000000..84c084cd5 --- /dev/null +++ b/scripts/generate-docs/pydocgen/database.py @@ -0,0 +1,326 @@ +import pathlib +import logging + +from sqlmodel import SQLModel, Field, create_engine, Session, select, Relationship, and_ +from sqlalchemy import Engine, Column, JSON + +from .models.custom_documentation import DocumentationOverrideMap, OsNameList +from .models.packages import Package, PackageList + +from typing import Optional, Literal, TypeAlias + + + +# +# These models represent the database tables for mapped fields +# +class PackageReference(SQLModel, table=True): + id: int = Field(default=None, primary_key=True) + package_data: str = Field(default="{}", sa_column=Column(JSON)) + + +class PackageField(SQLModel, table=True): + """ + PackageField represents a specific field as defined in package/endpoint/datastream/{type}/fields/fields.yml + each in fields.yml has a name and description, this class holds the name, description, and reference to the parent package. + These fields will be used to provide descriptions for the fields in the custom documentation. + + Note: this is the database table definition for the Package class defined in models/packages.py + + Args: + SQLModel: this is a SQLModel class (database table) + table: Defaults to True. + + Raises: + ValueError: _description_ + + Returns: + _description_ + """ + id: Optional[int] = Field(default=None, primary_key=True) + name: str + description: Optional[str] = None + example: Optional[str] = None + type: Optional[str] = None + package_reference_id: Optional[int] = Field(foreign_key="packagereference.id") + package_reference: Optional[PackageReference] = Relationship() + + @property + def package(self) -> Package: + if not self.package_reference: + raise ValueError(f"PackageReference is not set for PackageField {self}") + return Package.model_validate_json(self.package_reference.package_data) + + +# +# These models reprensent the database tables for overrides +# +class OverrideField(SQLModel, table=True): + id: int = Field(default=None, primary_key=True) + description: Optional[str] = None + example: Optional[str] = None + type: Optional[str] = None + + +class OverrideRelationship(SQLModel, table=True): + id: int = Field(default=None, primary_key=True) + name: str + event: Optional[str] = None + os: Optional[str] = None + default: bool = False + override_id: int = Field(foreign_key="overridefield.id") + override: OverrideField = Relationship(sa_relationship_kwargs={"lazy": "joined"}) + + +def populate_overrides(session: Session): + dom = DocumentationOverrideMap.from_yaml() + for name, mapping in dom.items(): + if mapping.os: + for os, override in mapping.os.items(): + record = OverrideField( + description=override.description, + example=override.example, + type=override.type, + ) + session.add(record) + session.flush() + + related_record = OverrideRelationship( + name=name, os=os, override_id=record.id + ) + session.add(related_record) + + if mapping.event: + for event, override in mapping.event.items(): + + record = OverrideField( + description=override.description, + example=override.example, + type=override.type, + ) + session.add(record) + session.flush() + + related_record = OverrideRelationship( + name=name, event=event, override_id=record.id + ) + session.add(related_record) + + if mapping.default: + record = OverrideField( + description=mapping.default.description, + example=mapping.default.example, + type=mapping.default.type, + ) + session.add(record) + session.flush() + + related_record = OverrideRelationship( + name=name, default=True, override_id=record.id + ) + session.add(related_record) + + session.commit() + + +def populate_packages_fields(session: Session): + """ + populate_packages_fields populates the package fields in the database + + Args: + session: database session + """ + + def add_to_db(field: PackageField, session: Session): + existing_field = session.exec( + select(PackageField).where(PackageField.name == field.name) + ).first() + if existing_field: + if existing_field.description != field.description: + raise ValueError( + f"Field {field.name} already exists with different description" + ) + else: + logging.debug(f" Adding field {field.name}") + session.add(field) + + package_list = PackageList.from_files() + for package in package_list: + logging.debug(f"Adding package fields for {package.filepath}") + package_ref = PackageReference(package_data=package.model_dump_json()) + session.add(package_ref) + session.flush() + for field in package.fields: + if field.fields: + for sub_field in field.fields: + name = f"{field.name}.{sub_field.name}" + add_to_db( + PackageField( + name=name, + description=sub_field.description, + package_reference_id=package_ref.id, + example=sub_field.example, + type=sub_field.type, + ), + session, + ) + else: + add_to_db( + PackageField( + name=field.name, + description=field.description, + package_reference_id=package_ref.id, + example=field.example, + type=field.type, + ), + session, + ) + session.commit() + + +class OverrideQueryResult: + """ + Represents the result of querying for field overrides, prioritized by event, OS, and default. + + This class retrieves and stores a prioritized list of field overrides for a given field name, + event name, and OS name from the database. The priority order is: event-specific override (highest), + then OS-specific override, and finally the default override (lowest). + + Properties such as `description`, `example`, and `type` return the value from the highest-priority + override that provides a non-empty value, or None if none are found. + + Args: + session: SQLModel session used to query the database. + field_name: Name of the field to retrieve overrides for. + event_name: Name of the event to prioritize event-specific overrides. + os_name: Name of the OS to prioritize OS-specific overrides. + """ + + def __init__( + self, session: Session, field_name: str, event_name: str, os_names: OsNameList + ): + """ + Initialize OverrideQueryResult. + + Args: + session: SQLModel session. + field_name: Name of the field. + event_name: Name of the event. + os_name: Name of the OS. + """ + self.overrides: list[OverrideField | None] = [] + + overrides = session.exec( + select(OverrideRelationship).where(OverrideRelationship.name == field_name) + ).all() + + # + # These functions resolve the overrides for event, os, and default respectively. + # + def event_override() -> OverrideField | None: + """ + Returns the event override if it exists, otherwise None. + """ + return next((o.override for o in overrides if o.event == event_name), None) + + def os_override() -> OverrideField | None: + """ + Returns the OS Override if it exists. There can be multiple os overrides, so the relevant + ones for this document are saved in markdown table format. + """ + description = None + example = None + type = None + for o in overrides: + if o.os: + if o.os in os_names: + if o.override.description: + if not description: + description = f"|OS|Description|\n|---|---|\n" + description += f"|{o.os}|{o.override.description}|\n" + if o.override.example: + if not example: + example = f"|OS|Example|\n|---|---|\n" + example += f"|{o.os}|{o.override.example}|\n" + if o.override.type: + if not type: + type = f"|OS|Type|\n|---|---|\n" + type += f"|{o.os}|{o.override.type}|\n" + + return ( + OverrideField( + description=description, + example=example, + type=type, + ) + if any([description, example, type]) + else None + ) + + def default_override() -> OverrideField | None: + """ + Returns the default override if it exists, otherwise None. + """ + return next((o.override for o in overrides if o.default), None) + + # We save the overrides in order of priority, so that we can return the highest-priority override + self.overrides = [event_override(), os_override(), default_override()] + + @property + def description(self) -> str | None: + """ + Returns the description from the highest-priority override that provides a non-empty value, or None. + """ + for override in self.overrides: + if override and override.description: + return override.description + return None + + @property + def example(self) -> str | None: + """ + Returns the example from the highest-priority override that provides a non-empty value, or None. + """ + for override in self.overrides: + if override and override.example: + return override.example + return None + + @property + def type(self) -> str | None: + """ + Returns the type from the highest-priority override that provides a non-empty value, or None. + """ + for override in self.overrides: + if override and override.type: + return override.type + return None + + +def getDatabase(db_path: pathlib.Path) -> Engine: + """ + getDatabase creates a database if it does not exist, otherwise it uses the existing database + + This stores the documentation in package/endpoint/data_stream in a lightweight SQLite database. We will + use this when generating markdown documentation for the fields defined in the custom_documentation. + + overrides are also added to the database here. + + Args: + db_path: path to the database + + Returns: + database Engine + """ + if db_path.exists(): + logging.info(f"Using existing database at {db_path}") + return create_engine(f"sqlite:///{db_path}") + + logging.info(f"Creating database at {db_path}") + engine = create_engine(f"sqlite:///{db_path}") + SQLModel.metadata.create_all(engine) + with Session(engine) as session: + populate_packages_fields(session) + populate_overrides(session) + session.commit() + return engine diff --git a/scripts/generate-docs/pydocgen/markdown.py b/scripts/generate-docs/pydocgen/markdown.py new file mode 100644 index 000000000..785a8caba --- /dev/null +++ b/scripts/generate-docs/pydocgen/markdown.py @@ -0,0 +1,421 @@ +import csv +import hashlib +import logging +import os +import pathlib +from typing import List, TextIO + +from sqlmodel import Session, select + +from .models.custom_documentation import OsNameList +from .database import OverrideQueryResult, PackageField, getDatabase +from .models import CustomDocumentationList, Filter + + +def quote_markdown_string(s: str) -> str: + """ + quote_markdown_string prepends each line of a string with a '>' character + Args: + s: string to quote + + Returns: + quoted string + """ + return "\n".join(f"> {line}" for line in s.splitlines()) + + +def generate_random_sha256() -> str: + """ + generate_random_sha256 generates a random SHA256 hash for use in example fields + + Returns: + random SHA256 hash + """ + return hashlib.sha256(os.urandom(32)).hexdigest() + + +class FieldMetadata: + """ + FieldMetadata contains all the information necessary to generate markdown for a field + it queries the package field database for ECS metadata and the overrides database + for endpoint-specific metadata. It also generates a random SHA256 hash if the field is a + SHA256 hash and no example is provided.f + """ + + def __init__( + self, field: str, session: Session, event_name: str, os_names: OsNameList + ) -> None: + """ + __init__ queries the database for ECS metadata and endpoint-specific metadata. Also + generates a random SHA256 hash if the field is a SHA256 hash and no example is provided. + + Args: + field: field name + session: active sqlmodel session for querying the database + event_name: name of the event + os_name: os name (e.g., "windows", "linux", "macos") + """ + self.field = field + self.event_name = event_name + self.os_names = os_names + + self.endpoint_description: str | None = None + self.endpoint_example: str | None = None + self.endpoint_type: str | None = None + self.ecs_description: str | None = None + self.ecs_example: str | None = None + self.ecs_type: str | None = None + + self._populate_ecs_metadata(session) + self._populate_endpoint_metadata(session) + + if not self.ecs_example and self.field.endswith(".sha256"): + # If the field is a SHA256 hash, generate a random example if none is provided + self.ecs_example = generate_random_sha256() + + def _populate_ecs_metadata(self, session: Session) -> None: + """ + _populate_ecs_metadata populates the ECS metadata for a field + based on the package field database + + Args: + session: SQLAlchemy session + """ + package_field: PackageField | None = session.exec( + select(PackageField).where(PackageField.name == self.field) + ).first() + if package_field: + # + # The package field description may contain newlines, so we replace them with spaces + # + self.ecs_description = package_field.description + self.ecs_example = package_field.example + self.ecs_type = package_field.type + + def _populate_endpoint_metadata(self, session: Session) -> None: + """ + _populate_endpoint_metadata populates the endpoint metadata for a field + based on the overrides database + + Args: + session: SQLAlchemy session + field: field name + event_name: event name + os_name: OS name + """ + result = OverrideQueryResult( + session, self.field, self.event_name, self.os_names + ) + if result.description: + self.endpoint_description = result.description + if result.example: + self.endpoint_example = result.example + if result.type: + self.endpoint_type = result.type + + def has_data(self) -> bool: + """ + has_data checks if the metadata has any data populated + + Returns: + True if any metadata is populated, False otherwise + """ + return any( + [ + self.ecs_description, + self.ecs_example, + self.ecs_type, + self.endpoint_description, + self.endpoint_example, + self.endpoint_type, + ] + ) + + def missing_data(self) -> bool: + """ + missing_data checks if the metadata is missing any data + + Returns: + True if any metadata is missing, False otherwise + """ + return not all( + [ + self.ecs_description, + self.ecs_example, + self.ecs_type, + self.endpoint_description, + self.endpoint_example, + self.endpoint_type, + ] + ) + + def write_markdown(self, f: TextIO) -> None: + """ + write_markdown writes the field metadata to a markdown file + Args: + f: file object to write to + """ + f.write(f"### `{self.field}`\n\n") + if not self.has_data(): + f.write("No description or example found\n\n") + f.write("
\n\n") + return + + if self.ecs_description: + f.write("**ECS Description**\n\n") + f.write(f"{quote_markdown_string(self.ecs_description)}\n\n") + if self.endpoint_description: + f.write("**Extended Description**\n\n") + f.write(f"{quote_markdown_string(self.endpoint_description)}\n\n") + if self.endpoint_example: + f.write("**Example**\n\n") + f.write(f"{quote_markdown_string(self.endpoint_example)}\n\n") + elif self.ecs_example: + f.write("**Example**\n\n") + f.write(f"{quote_markdown_string(self.ecs_example)}\n\n") + if self.endpoint_type: + f.write("**Type**\n\n") + f.write(f"{quote_markdown_string(self.endpoint_type)}\n\n") + elif self.ecs_type: + f.write("**Type**\n\n") + f.write(f"{quote_markdown_string(self.ecs_type)}\n\n") + f.write("
\n\n") + + +class MetadataCsvWriter: + """ + This class will write a CSV file that contains fields + that are missing either a description or an example. This + can be imported into a spreadsheet to track missing documentation + """ + + FIELD_NAME = "Field Name" + FIELD_EVENT_NAME = "Event Name" + FIELD_HAS_ECS_DESCRIPTION = "Has ECS Description" + FIELD_HAS_ECS_EXAMPLE = "Has ECS Example" + FIELD_HAS_ECS_TYPE = "Has ECS Type" + FIELD_HAS_ENDPOINT_DESCRIPTION = "Has Endpoint Description" + FIELD_HAS_ENDPOINT_EXAMPLE = "Has Endpoint Example" + + def __init__(self, csv_path: pathlib.Path): + + self.csv_path = csv_path + self.fields = [ + self.FIELD_NAME, + self.FIELD_EVENT_NAME, + self.FIELD_HAS_ECS_DESCRIPTION, + self.FIELD_HAS_ECS_EXAMPLE, + self.FIELD_HAS_ECS_TYPE, + self.FIELD_HAS_ENDPOINT_DESCRIPTION, + self.FIELD_HAS_ENDPOINT_EXAMPLE, + ] + self.rows = [] + + def add_row(self, field: FieldMetadata): + """ + add_row adds a row to the CSV output + + Args: + field: FieldMetadata object containing the field information + """ + self.rows.append( + { + self.FIELD_NAME: field.field, + self.FIELD_EVENT_NAME: field.event_name, + self.FIELD_HAS_ECS_DESCRIPTION: bool(field.ecs_description), + self.FIELD_HAS_ECS_EXAMPLE: bool(field.ecs_example), + self.FIELD_HAS_ECS_TYPE: bool(field.ecs_type), + self.FIELD_HAS_ENDPOINT_DESCRIPTION: bool(field.endpoint_description), + self.FIELD_HAS_ENDPOINT_EXAMPLE: bool(field.endpoint_example), + } + ) + + def write_csv(self): + """ + write_csv writes the collected rows to a CSV file + """ + logging.debug(f"Generating CSV output at {self.csv_path}") + with self.csv_path.open("w", newline="") as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=self.fields) + writer.writeheader() + for row in self.rows: + writer.writerow(row) + + +def generate_custom_documentation_markdown( + db_path: pathlib.Path, + output_dir: pathlib.Path, + csv_path: pathlib.Path | None = None, +): + """ + Generate markdown files for custom documentation + """ + + def get_output_filepath(src_path: pathlib.Path) -> pathlib.Path: + """ + get_output_filepath determines the output filename for writing markdown, based + on the source path of the package + + Args: + src_path: yaml file path + + Returns: + output filepath + """ + parts = src_path.parts + index = parts.index("data_stream") + output_filename = output_dir + for part in parts[index + 1 : -1]: + output_filename = output_filename / part + return output_filename / parts[-1].replace(".yaml", ".md") + + def get_formatted_os_name(os: str) -> str: + """ + get_formatted_os_name os names in the defintions are always lowercase, this function will + return the correct case for the os name + + Args: + os: os name + + Returns: + _description_ + """ + match os: + case "windows": + return "Windows" + case "linux": + return "Linux" + case "macos": + return "macOS" + case _: + raise ValueError( + f"Unknown OS name: {os}. Please add it to the get_formatted_os_name function." + ) + + def get_formatted_os_string(os_list: OsNameList) -> str: + """ + get_formatted_os_string some documents have multiple os's, this function will format them + correctly for the markdown output + + Args: + os_list: list of os names + + Returns: + formatted os string + """ + return ", ".join(get_formatted_os_name(os) for os in os_list) + + def get_kql_query_string(filter: Filter) -> str: + """ + get_kql_query_string generates a KQL query string from a Filter object + + Args: + filter: Filter object from the custom documentation + + Returns: + KQL query string + """ + queries = [] + for field, metadata in Filter.model_fields.items(): + field_name = metadata.alias if metadata.alias else field + if field in filter.dict(): + field_value = filter.dict()[field] + if not field_value: + continue + if isinstance(field_value, list): + if len(field_value) == 1: + field_value = f"{field_value[0]}" + else: + field_value = '" or "'.join(field_value) + queries.append(f'{field_name} : "{field_value}"') + return " and ".join(queries) + + # + # Function Begin + # + + # Create or get the populated database + engine = getDatabase(db_path) + + # Get the custom documentation + custom_docs = CustomDocumentationList.from_files() + + csv_writer: MetadataCsvWriter | None = None + if csv_path: + csv_writer = MetadataCsvWriter(csv_path) + + # Generate markdown for each custom document + with Session(engine) as session: + for custom_doc in custom_docs: + + # Get the output filename and create the parent directories + output_filename = get_output_filepath(custom_doc.filepath) + output_filename.parent.mkdir(parents=True, exist_ok=True) + + # Write the markdown file + with output_filename.open("w") as f: + f.write(f"# {custom_doc.overview.name}\n\n") + + f.write(f"## Description\n\n") + f.write(f"{custom_doc.overview.description}\n\n") + + f.write("## Overview\n\n") + f.write("\n") + f.write("\n") + f.write("\n") + f.write( + f"\n" + ) + f.write("\n") + f.write("\n") + f.write(f"\n") + f.write(f"\n") + f.write("\n") + f.write("\n") + f.write(f"\n") + f.write( + f"\n" + ) + f.write("\n") + f.write("
OS{get_formatted_os_string(custom_doc.identification.os)}
Data Stream{custom_doc.identification.data_stream}
KQL Query{get_kql_query_string(custom_doc.identification.filter)}
\n\n") + f.write(f"## Fields\n\n") + + # + # Write markdown for the individual Fields + # + for field in custom_doc.fields.endpoint: + field_metadata = FieldMetadata( + field=field, + session=session, + event_name=custom_doc.filepath.stem, + os_names=custom_doc.identification.os, + ) + + if csv_writer: + if not all( + [field_metadata.ecs_description, field_metadata.ecs_example] + ): + csv_writer.add_row(field_metadata) + + # Check if the field we are writing is a wildcard or special field + # If it is, we skip it unless it has a specific description or example + # Wildcard fields are those that end with "._" or ".*" + if any(["._" in field, ".*" in field]): + if ( + custom_doc.fields.details + and field in custom_doc.fields.details + ): + field_metadata.ecs_description = custom_doc.fields.details[ + field + ].description + else: + logging.info( + f"Skipping field {field} because it is a wildcard or special field that does not have a specific description or example" + ) + continue + field_metadata.write_markdown(f) + logging.debug(f"wrote markdown to {output_filename}") + + # If we have a CSV writer, write the CSV file + if csv_writer: + csv_writer.write_csv() diff --git a/scripts/generate-docs/pydocgen/models/__init__.py b/scripts/generate-docs/pydocgen/models/__init__.py new file mode 100644 index 000000000..3212ffaa2 --- /dev/null +++ b/scripts/generate-docs/pydocgen/models/__init__.py @@ -0,0 +1,2 @@ +from .packages import * +from .custom_documentation import * diff --git a/scripts/generate-docs/pydocgen/models/custom_documentation.py b/scripts/generate-docs/pydocgen/models/custom_documentation.py new file mode 100644 index 000000000..77643c8f0 --- /dev/null +++ b/scripts/generate-docs/pydocgen/models/custom_documentation.py @@ -0,0 +1,196 @@ +import json +import logging +import pathlib +import yaml +from pydantic import BaseModel, Field, field_validator +from typing import List, Optional, Iterator, Dict, TypeAlias, Literal + +from ..paths import CUSTOM_DOCUMENTATION_DIR, DOCUMENTATION_OVERRIDE_PATH + +OsNameList: TypeAlias = list[Literal["windows", "linux", "macos"]] + +# +# See any of the files at custom_documentation/src/endpoint/data_stream/*/*.yaml for examples +# of the data these models parse +# +class Overview(BaseModel): + """ + Overview of the package + """ + name: str + description: str + +class Filter(BaseModel): + """ + Filter for the package + """ + event_dataset: str = Field(..., alias="event.dataset") + event_module: str = Field(..., alias="event.module") + event_provider: Optional[str] = Field(None, alias="event.provider") + host_os_type: Optional[str] = Field(None, alias="host.os.type") + event_code: Optional[str] = Field(None, alias="event.code") + event_action: Optional[List[str]] = Field(None, alias="event.action") + + @field_validator("event_action", mode="before") + @classmethod + def validate_tags(cls, value): + if isinstance(value, str): + # If input is a string, convert to a list with one item + return [value] + return value + + class Config: + populate_by_name = True + + +class Identification(BaseModel): + """ + Identification of the package + """ + filter: Filter + os: OsNameList + data_stream: str + + +class Detail(BaseModel): + """ + Details for the package + """ + description: Optional[str] = None + + +class Fields(BaseModel): + """ + Fields for the package + """ + endpoint: List[str] + details: Optional[Dict[str, Detail]] = None + +class CustomDocumentation(BaseModel): + """ + Custom documentation for a package + """ + overview: Overview + identification: Identification + fields: Fields + filepath: pathlib.Path + + @classmethod + def from_yaml(cls, yaml_path: pathlib.Path) -> "CustomDocumentation": + logging.debug(f"Reading custom documentation from {yaml_path}") + with yaml_path.open("r") as f: + data = yaml.safe_load(f) + return cls(**data, filepath=yaml_path) + + +class CustomDocumentationMap(BaseModel): + """ + Map of custom documentation for a package + """ + root: dict = {} + + def __getitem__(self, key) -> CustomDocumentation: + return self.root[key] + + def __iter__(self) -> Iterator[CustomDocumentation]: + return iter(self.root.values()) + + def __len__(self) -> int: + return len(self.root) + + def append(self, package: CustomDocumentation) -> None: + self.root[package.overview.name] = package + + @classmethod + def from_yaml(cls, yaml_dir: pathlib.Path) -> "CustomDocumentationMap": + custom_docs = cls() + for yaml_path in yaml_dir.rglob("*.yaml"): + custom_doc = CustomDocumentation.from_yaml(yaml_path) + custom_docs.append(custom_doc) + + return custom_docs + + +class CustomDocumentationList(BaseModel): + """ + List of custom documentation for a package + """ + root: List[CustomDocumentation] = [] + + def __iter__(self) -> Iterator[CustomDocumentation]: + return iter(self.root) + + def __getitem__(self, index) -> List[CustomDocumentation]: + return self.root[index] + + def __len__(self) -> int: + return len(self.root) + + def append(self, package: CustomDocumentation) -> None: + self.root.append(package) + + @classmethod + def from_files( + cls, yaml_dir: pathlib.Path = CUSTOM_DOCUMENTATION_DIR + ) -> "CustomDocumentationList": + custom_docs = [] + for yaml_path in yaml_dir.rglob("*.yaml"): + custom_docs.append(CustomDocumentation.from_yaml(yaml_path)) + + return cls(root=custom_docs) + + +# +# These models reprensent the data from custom_documentation/src/documentation_overrides.yaml +# +class OverrideBase(BaseModel): + """ + Override for a field + """ + description: Optional[str] = None + example: Optional[str] = None + type: Optional[str] = None + +class OverrideMapping(BaseModel): + """ + Map of overrides for a field + """ + default: Optional[OverrideBase] = None + os: Optional[Dict[str, OverrideBase]] = None + event: Optional[Dict[str, OverrideBase]] = None + +class DocumentationOverrideMap(BaseModel): + """ + Map of documentation overrides for a field + """ + root: dict = {} + + def __getitem__(self, key) -> OverrideMapping: + return self.root[key] + + def __iter__(self) -> Iterator[OverrideMapping]: + return iter(self.root.values()) + + def __len__(self) -> int: + return len(self.root) + + def items(self): + return self.root.items() + + def append(self, name: str, om: OverrideMapping) -> None: + self.root[name] = om + + @classmethod + def from_yaml( + cls, yaml_path: pathlib.Path = DOCUMENTATION_OVERRIDE_PATH + ) -> "DocumentationOverrideMap": + logging.debug(f"Reading documentation overrides from {yaml_path}") + print(f"Reading documentation overrides from {yaml_path}") + if not yaml_path.exists(): + raise FileNotFoundError(f"Documentation override file {yaml_path} does not exist") + doc_overrides = cls() + with yaml_path.open("r") as f: + data = yaml.safe_load(f) + for item in data: + doc_overrides.append(item["name"], OverrideMapping(**item)) + return doc_overrides diff --git a/scripts/generate-docs/pydocgen/models/packages.py b/scripts/generate-docs/pydocgen/models/packages.py new file mode 100644 index 000000000..a42f26abc --- /dev/null +++ b/scripts/generate-docs/pydocgen/models/packages.py @@ -0,0 +1,153 @@ +import logging +import json +import pathlib +import yaml +from pydantic import BaseModel, RootModel +from typing import List, Optional, Any, Dict, Iterator + +from ..paths import PACKAGES_DIR + + +# +# See any of the files at package/endpoint/data_stream/*/fields/fields.yaml for examples +# of the data these models parse +# + +class MultiField(BaseModel): + """ + fields can have a number of multi_fields + """ + + name: str + type: str + norms: Optional[bool] = None + normalizer: Optional[str] = None + ignore_above: Optional[int] = None + default_field: Optional[bool] = None + + class Config: + """ + this config setting ensures that the model will raise an error if any extra fields are present + """ + + extra = "forbid" + + +class Field(BaseModel): + """ + Field field as defined in fields.yml + """ + + name: str + title: Optional[str] = None + default_field: Optional[bool] = None + level: Optional[str] = None + type: Optional[str] = None + ignore_above: Optional[int] = None + description: Optional[str] = None + fields: Optional[List["Field"]] = None + required: Optional[bool] = None + group: Optional[int] = None + multi_fields: Optional[List[MultiField]] = None + example: Optional[Any] = None + format: Optional[str] = None + enabled: Optional[bool] = None + doc_values: Optional[bool] = None + index: Optional[bool] = None + footnote: Optional[str] = None + pattern: Optional[str] = None + path: Optional[str] = None + + class Config: + """ + this config setting ensures that the model will raise an error if any extra fields are present + """ + + extra = "forbid" + + +class Package(BaseModel): + """ + A package consists of a name, a list of fields and an optional sample event + """ + + name: str + fields: List[Field] + sample_event: Optional[Dict[Any, Any]] = None + filepath: Optional[pathlib.Path] = None + + @classmethod + def from_package_dir(cls, package_dir: pathlib.Path): + """ + takes a directory and returns a Package object + - name is the name of the directory (package) + - fields are read from fields/fields.yml + - sample_event is read from sample_event.json if it exists + + Args: + package_dir: directory holding the package data + """ + logging.debug(f"Reading package from {package_dir}") + if not package_dir.exists(): + raise ValueError(f"package directory {package_dir} does not exist") + if not package_dir.is_dir(): + raise ValueError(f"package directory {package_dir} is not a directory") + + # + # read fields from fields.yml and create Field objects + # + fields_path = package_dir / "fields" / "fields.yml" + fields_data = yaml.safe_load(fields_path.read_text()) + fields = [Field(**field) for field in fields_data] + + # + # read sample event if it exists + # + sample_event = None + sample_event_path = package_dir / "sample_event.json" + if sample_event_path.exists(): + sample_event = json.loads(sample_event_path.read_text()) + + # + # return the Package object + # + return cls( + name=package_dir.name, + fields=fields, + sample_event=sample_event, + filepath=fields_path, + ) + + +class PackageList(RootModel): + """ + PackageList is a list of packages + """ + + root: List[Package] = [] + + def __iter__(self) -> Iterator[Package]: + return iter(self.root) + + def __getitem__(self, index) -> List[Package]: + return self.root[index] + + def __len__(self) -> int: + return len(self.root) + + def append(self, package: Package) -> None: + self.root.append(package) + + @classmethod + def from_files(cls, packages_dir: pathlib.Path = PACKAGES_DIR): + """ + from_packages_dir creates a PackageList from a directory of packages + + Args: + packages_dir: top level directory holding the packages + """ + package_paths = list(packages_dir.glob("*")) + packages = [ + Package.from_package_dir(package_path) for package_path in package_paths + ] + return cls(root=packages) diff --git a/scripts/generate-docs/pydocgen/paths.py b/scripts/generate-docs/pydocgen/paths.py new file mode 100644 index 000000000..e3340e75e --- /dev/null +++ b/scripts/generate-docs/pydocgen/paths.py @@ -0,0 +1,15 @@ +import pathlib + +ENDPOINT_PACKAGE_DIR = pathlib.Path(__file__).resolve().parents[3] + +CUSTOM_DOCUMENTATION_DIR = ( + ENDPOINT_PACKAGE_DIR / "custom_documentation" / "src" / "endpoint" / "data_stream" +) +PACKAGES_DIR = ENDPOINT_PACKAGE_DIR / "package" / "endpoint" / "data_stream" + +DOCUMENTATION_OVERRIDE_PATH = ( + ENDPOINT_PACKAGE_DIR + / "custom_documentation" + / "src" + / "documentation_overrides.yaml" +) diff --git a/scripts/requirements.txt b/scripts/requirements.txt index 462653e44..cc08f3c7c 100644 --- a/scripts/requirements.txt +++ b/scripts/requirements.txt @@ -3,5 +3,8 @@ GitPython~=3.1 Jinja2~=3.0 PyYAML~=6.0 pyyaml-include~=1.2 +pydantic~=2.10.6 +SQLAlchemy~=2.0.39 +sqlmodel~=0.0.24 click xxhash