Export access logs from CloudWatch to Parquet files#921
Export access logs from CloudWatch to Parquet files#921
Conversation
- Created Python package in management_tools/pulp-access-logs-exporter/ - Added pyproject.toml with CalVer versioning (2026.2.0) - Dependencies: boto3, pyarrow, pandas (for test validation) - Created module stubs: cli.py, cloudwatch.py, writer.py - Added test structure with conftest.py and test_export.py - Comprehensive README with usage examples and architecture Package will export Pulp PyPI access logs from CloudWatch to Parquet format for analytics and monitoring. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
CloudWatch client (cloudwatch.py):
- Build CloudWatch Logs Insights query with server-side parsing
- Fetch logs with time-based chunking (5-minute windows) for >10K results
- Poll async query until complete
- Convert "-" to None for proper null handling
- Parse all log fields: user, org_id, domain, distribution, package, status_code, user_agent, x_forwarded_for
Parquet writer (writer.py):
- Write PyArrow Table to Parquet with snappy compression
- Support both local file and S3 output (detect s3:// prefix)
- Print statistics (record count, file size)
CLI interface (cli.py):
- Parse command-line arguments with argparse
- Support ISO timestamps and relative times ("1 hour ago", "now")
- Orchestrate full pipeline: CloudWatch → PyArrow → Parquet
- Print detailed progress and statistics
Tests:
- Add pandas import for test validation
- Integration tests for full pipeline
- Unit tests for schema validation
Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Added export-access-logs-hourly CronJob to ClowdApp spec:
- Schedule: 15 * * * * (every hour at :15 for 15-min log buffer)
- Image: UBI9 Python 3.12
- Install package from Pulp PyPI at runtime
- Export logs from CloudWatch to S3 bucket
- Time range: 1 hour ago to 15 minutes ago
- Resources: 250m CPU, 512Mi memory (expandable to 1Gi)
- Concurrency: Forbid (prevent overlapping runs)
Added template parameters:
- PULP_PYPI_URL: Internal Pulp PyPI instance URL
- S3_BUCKET_NAME: S3 bucket for log archives
Environment variables:
- AWS_REGION: us-east-1
- CLOUDWATCH_LOG_GROUP: /aws/containerinsights/${ENV_NAME}/application
- ACCESS_LOGS_OUTPUT_PATH: ${S3_BUCKET_NAME}/pypi-logs/
- PULP_PYPI_URL: Package installation URL
AWS credentials provided automatically via IRSA/ServiceAccount.
Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Reviewer's GuideAdds a new Sequence diagram for hourly access log export runsequenceDiagram
participant CronJob as CronJob_export_access_logs_hourly
participant Pod as Exporter_Pod
participant CLI as Exporter_CLI
participant CW as CloudWatch_Logs
participant S3 as S3_Bucket
CronJob->>Pod: Start Pod on schedule 15 * * * *
Pod->>Pod: pip install pulp-access-logs-exporter from PULP_PYPI_URL
Pod->>CLI: Invoke export-access-logs with env and args
CLI->>CLI: parse_args()
CLI->>CLI: parse_time(start_time, end_time)
CLI->>CLI: build_query(filter_paths, exclude_paths)
CLI->>CW: fetch_cloudwatch_logs(log_group, query, start_time, end_time, region)
activate CW
loop 5-minute_chunks
CW-->>CLI: chunk_results (parsed fields)
end
deactivate CW
CLI->>CLI: convert_to_arrow_table(results)
CLI->>S3: write_parquet(table, output_path)
S3-->>CLI: confirm write
CLI-->>Pod: exit code 0
Pod-->>CronJob: Job completion status
Class diagram for pulp-access-logs-exporter package structureclassDiagram
class CliModule {
+parse_args(args)
+parse_time(time_str)
+main() int
}
class CloudwatchModule {
+build_query(filter_paths, exclude_paths) str
+fetch_cloudwatch_logs(log_group, query, start_time, end_time, region) List~Dict~
+convert_to_arrow_table(results) Table
}
class WriterModule {
+SCHEMA
+write_parquet(table, output_path) void
}
class PackageInit {
+__version__ : str
}
class PackageMain {
+__main__() void
}
CliModule ..> CloudwatchModule : uses
CliModule ..> WriterModule : uses
PackageMain ..> CliModule : calls main
PackageInit <.. CliModule : imports version (project metadata)
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 8 issues, and left some high level feedback:
- The
build_queryfunction acceptsfilter_pathsandexclude_pathsbut currently ignores them and uses hard-coded filters; consider interpolating these values into the query or removing the arguments to avoid a misleading CLI/API surface. - The CLI exposes
--cloudwatch-streambut this parameter is not used anywhere in the CloudWatch query/execution path; either wire it into the query logic or drop the option to keep the interface consistent with behavior.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The `build_query` function accepts `filter_paths` and `exclude_paths` but currently ignores them and uses hard-coded filters; consider interpolating these values into the query or removing the arguments to avoid a misleading CLI/API surface.
- The CLI exposes `--cloudwatch-stream` but this parameter is not used anywhere in the CloudWatch query/execution path; either wire it into the query logic or drop the option to keep the interface consistent with behavior.
## Individual Comments
### Comment 1
<location> `management_tools/pulp-access-logs-exporter/src/pulp_access_logs_exporter/cloudwatch.py:11-20` </location>
<code_context>
+def build_query(filter_paths: str = "/api/pypi/", exclude_paths: str = "/livez,/status") -> str:
</code_context>
<issue_to_address>
**issue (bug_risk):** Filter/exclude path arguments are currently ignored in the query construction.
`filter_paths` and `exclude_paths` are accepted by the CLI and passed into `build_query`, but the query is currently hard‑coded to `/api/pypi` and ignores these arguments. This makes the options misleading and blocks path-based filtering. Either incorporate these parameters into the query (e.g., via `LIKE` / `NOT LIKE` on the given prefixes) or remove them from the CLI and function if configurability isn’t needed.
</issue_to_address>
### Comment 2
<location> `management_tools/pulp-access-logs-exporter/src/pulp_access_logs_exporter/cloudwatch.py:160-161` </location>
<code_context>
+ org_id = None
+
+ # Parse timestamp from ISO format
+ timestamp_str = result.get('@timestamp', '')
+ timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
+
+ # Convert status_code to int
</code_context>
<issue_to_address>
**issue (bug_risk):** Potential mismatch between timezone-aware timestamps and the PyArrow schema.
`datetime.fromisoformat(... '+00:00')` returns timezone-aware datetimes, while `writer.SCHEMA` uses `pa.timestamp('ns')` (tz-naive). Passing tz-aware values into a tz-naive field can fail or behave differently across PyArrow versions. Either strip the timezone and store naive UTC (e.g., `datetime.fromisoformat(...).replace(tzinfo=None)`), or change the schema to `pa.timestamp('ns', tz='UTC')` and keep the datetimes tz-aware.
</issue_to_address>
### Comment 3
<location> `management_tools/pulp-access-logs-exporter/src/pulp_access_logs_exporter/cloudwatch.py:93-107` </location>
<code_context>
+ query_id = response['queryId']
+
+ # Poll until complete
+ while True:
+ result = logs_client.get_query_results(queryId=query_id)
+ status = result['status']
+
+ if status == 'Complete':
+ break
+ elif status in ['Failed', 'Cancelled']:
+ raise RuntimeError(f"Query {query_id} failed with status: {status}")
+
+ time.sleep(2) # Poll every 2 seconds
+
+ # Process results
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Polling loop for query completion lacks timeout and backoff.
This loop can run indefinitely if the status never reaches one of the handled terminal states (e.g., remains `Running` or a new non-terminal status is introduced), causing the job to hang. Add a maximum wait time or retry limit, and consider exponential backoff, so the job eventually fails with a clear error instead of hanging.
```suggestion
query_id = response['queryId']
# Poll until complete with timeout and backoff
max_wait_seconds = 300 # Maximum total wait time (5 minutes)
poll_interval = 2 # Initial poll interval in seconds
max_poll_interval = 30 # Maximum poll interval in seconds
start_time = time.time()
while True:
if time.time() - start_time > max_wait_seconds:
raise TimeoutError(
f"Timed out after {max_wait_seconds} seconds waiting for query {query_id} to complete"
)
result = logs_client.get_query_results(queryId=query_id)
status = result['status']
if status == 'Complete':
break
elif status in ['Failed', 'Cancelled']:
raise RuntimeError(f"Query {query_id} failed with status: {status}")
time.sleep(poll_interval)
poll_interval = min(poll_interval * 2, max_poll_interval)
# Process results
```
</issue_to_address>
### Comment 4
<location> `management_tools/pulp-access-logs-exporter/src/pulp_access_logs_exporter/cli.py:16-23` </location>
<code_context>
+ description="Export Pulp access logs from CloudWatch to Parquet format"
+ )
+
+ parser.add_argument(
+ "--cloudwatch-group",
+ required=True,
+ help="CloudWatch log group name (e.g., /aws/containerinsights/cluster/application)",
+ )
+
+ parser.add_argument(
+ "--cloudwatch-stream",
+ help="CloudWatch stream name (optional)",
+ )
</code_context>
<issue_to_address>
**issue (bug_risk):** Defined `--cloudwatch-stream` argument is not used anywhere.
The option is parsed but never used in the query or `fetch_cloudwatch_logs` call, so it’s effectively a no-op and may mislead users. Please either integrate it into the log retrieval (e.g., filter by stream) or remove it until it’s supported.
</issue_to_address>
### Comment 5
<location> `management_tools/pulp-access-logs-exporter/src/pulp_access_logs_exporter/writer.py:57-58` </location>
<code_context>
+ print(f"Successfully wrote {len(table)} records to S3")
+ else:
+ # Local file
+ print(f"Writing to local file: {output_path}")
+ pq.write_table(table, output_path, compression='snappy')
+
+ # Print file size
</code_context>
<issue_to_address>
**suggestion:** Local write assumes the output directory already exists.
If `output_path` includes non-existent parent directories, `pq.write_table` will raise an error. In the local branch, you could create the parent directory first (e.g., `Path(output_path).parent.mkdir(parents=True, exist_ok=True)`) before writing, while ensuring this logic is only applied for true local paths, not S3-style URIs.
Suggested implementation:
```python
else:
# Local file
print(f"Writing to local file: {output_path}")
import os
# Ensure parent directory exists for local file paths
parent_dir = os.path.dirname(output_path)
if parent_dir:
os.makedirs(parent_dir, exist_ok=True)
pq.write_table(table, output_path, compression='snappy')
# Print file size
file_size = os.path.getsize(output_path)
file_size_kb = file_size / 1024
print(f"Successfully wrote {len(table)} records ({file_size_kb:.2f} KB)")
```
None required, assuming this is the only place local files are written in this module. If there are other local write paths, you may want to apply the same parent-directory creation logic there for consistency.
</issue_to_address>
### Comment 6
<location> `management_tools/pulp-access-logs-exporter/tests/test_export.py:12` </location>
<code_context>
+pytestmark = pytest.mark.integration
+
+
+def test_cloudwatch_to_parquet_conversion(sample_cloudwatch_results, sample_parquet_path):
+ """Test conversion from CloudWatch results to Parquet file."""
+ from pulp_access_logs_exporter.cloudwatch import convert_to_arrow_table
</code_context>
<issue_to_address>
**suggestion (testing):** Add a test for convert_to_arrow_table with empty results to lock in schema and null-handling behavior
Currently only non-empty `sample_cloudwatch_results` are tested. Please add a unit test for `convert_to_arrow_table([])` that verifies it returns a table without raising, uses `SCHEMA` as the schema, and has zero rows, so the empty-CloudWatch-window behavior is locked in for future changes.
</issue_to_address>
### Comment 7
<location> `management_tools/pulp-access-logs-exporter/tests/test_export.py:102` </location>
<code_context>
+ assert actual_status == expected_status, f"Status code mismatch: {actual_status} != {expected_status}"
+
+
+def test_query_building():
+ """Test CloudWatch Logs Insights query construction."""
+ from pulp_access_logs_exporter.cloudwatch import build_query
</code_context>
<issue_to_address>
**suggestion (testing):** Strengthen query-building tests to assert key parsing and filtering semantics
The current `test_query_building` only checks for a substring, so it may miss regressions in how the query is parsed and filtered. Consider asserting that the built query includes the specific parse/field expressions relied on downstream, such as:
- `parse` patterns for `user` and `org_id`.
- Parsing of domain/distribution/package paths.
- The `coalesce(client_ip, xff) as x_forwarded_for` expression.
- Exclusion of `django.request:WARNING` and health-check paths when `exclude_paths` is set.
It would also help to add a test that calls `build_query(filter_paths='foo', exclude_paths='bar,baz')` and asserts those values are reflected, so refactors can’t silently drop these parameters.
</issue_to_address>
### Comment 8
<location> `management_tools/pulp-access-logs-exporter/tests/test_export.py:121-122` </location>
<code_context>
+
+
+@pytest.mark.unit
+def test_schema_definition():
+ """Test that schema is properly defined."""
+ from pulp_access_logs_exporter.writer import SCHEMA
</code_context>
<issue_to_address>
**suggestion (testing):** Extend schema test to assert nullability of user/org_id and match the Parquet schema contract more tightly
Since this test already checks field presence and some types, consider also:
- Asserting `SCHEMA.field('user').nullable` and `SCHEMA.field('org_id').nullable` are `True`, as documented for unauthenticated users.
- Optionally asserting that other core fields (`timestamp`, `message`, `domain`, `distribution`, `package`, `status_code`, `user_agent`, `x_forwarded_for`) are non-nullable, if that’s the intended contract.
This will better enforce the documented Parquet schema and catch accidental nullability changes that could affect downstream analytics.
```suggestion
# Check schema has expected fields
field_names = [field.name for field in SCHEMA]
# Enforce Parquet schema nullability contract
# user and org_id are nullable to support unauthenticated users
assert SCHEMA.field('user').nullable is True
assert SCHEMA.field('org_id').nullable is True
# Core analytical fields should be non-nullable
for name in [
'timestamp',
'message',
'domain',
'distribution',
'package',
'status_code',
'user_agent',
'x_forwarded_for',
]:
field = SCHEMA.field(name)
assert field.nullable is False
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
management_tools/pulp-access-logs-exporter/src/pulp_access_logs_exporter/cloudwatch.py
Show resolved
Hide resolved
management_tools/pulp-access-logs-exporter/src/pulp_access_logs_exporter/cloudwatch.py
Outdated
Show resolved
Hide resolved
management_tools/pulp-access-logs-exporter/src/pulp_access_logs_exporter/cloudwatch.py
Show resolved
Hide resolved
management_tools/pulp-access-logs-exporter/src/pulp_access_logs_exporter/cli.py
Outdated
Show resolved
Hide resolved
management_tools/pulp-access-logs-exporter/src/pulp_access_logs_exporter/writer.py
Show resolved
Hide resolved
| pytestmark = pytest.mark.integration | ||
|
|
||
|
|
||
| def test_cloudwatch_to_parquet_conversion(sample_cloudwatch_results, sample_parquet_path): |
There was a problem hiding this comment.
suggestion (testing): Add a test for convert_to_arrow_table with empty results to lock in schema and null-handling behavior
Currently only non-empty sample_cloudwatch_results are tested. Please add a unit test for convert_to_arrow_table([]) that verifies it returns a table without raising, uses SCHEMA as the schema, and has zero rows, so the empty-CloudWatch-window behavior is locked in for future changes.
| assert actual_status == expected_status, f"Status code mismatch: {actual_status} != {expected_status}" | ||
|
|
||
|
|
||
| def test_query_building(): |
There was a problem hiding this comment.
suggestion (testing): Strengthen query-building tests to assert key parsing and filtering semantics
The current test_query_building only checks for a substring, so it may miss regressions in how the query is parsed and filtered. Consider asserting that the built query includes the specific parse/field expressions relied on downstream, such as:
parsepatterns foruserandorg_id.- Parsing of domain/distribution/package paths.
- The
coalesce(client_ip, xff) as x_forwarded_forexpression. - Exclusion of
django.request:WARNINGand health-check paths whenexclude_pathsis set.
It would also help to add a test that calls build_query(filter_paths='foo', exclude_paths='bar,baz') and asserts those values are reflected, so refactors can’t silently drop these parameters.
| # Check schema has expected fields | ||
| field_names = [field.name for field in SCHEMA] |
There was a problem hiding this comment.
suggestion (testing): Extend schema test to assert nullability of user/org_id and match the Parquet schema contract more tightly
Since this test already checks field presence and some types, consider also:
- Asserting
SCHEMA.field('user').nullableandSCHEMA.field('org_id').nullableareTrue, as documented for unauthenticated users. - Optionally asserting that other core fields (
timestamp,message,domain,distribution,package,status_code,user_agent,x_forwarded_for) are non-nullable, if that’s the intended contract.
This will better enforce the documented Parquet schema and catch accidental nullability changes that could affect downstream analytics.
| # Check schema has expected fields | |
| field_names = [field.name for field in SCHEMA] | |
| # Check schema has expected fields | |
| field_names = [field.name for field in SCHEMA] | |
| # Enforce Parquet schema nullability contract | |
| # user and org_id are nullable to support unauthenticated users | |
| assert SCHEMA.field('user').nullable is True | |
| assert SCHEMA.field('org_id').nullable is True | |
| # Core analytical fields should be non-nullable | |
| for name in [ | |
| 'timestamp', | |
| 'message', | |
| 'domain', | |
| 'distribution', | |
| 'package', | |
| 'status_code', | |
| 'user_agent', | |
| 'x_forwarded_for', | |
| ]: | |
| field = SCHEMA.field(name) | |
| assert field.nullable is False |
Critical bug fixes:
1. Fix build_query to use filter_paths and exclude_paths parameters
- Previously these arguments were ignored
- Now properly generates dynamic query with custom paths
- Escapes forward slashes for CloudWatch Logs Insights regex
2. Remove unused --cloudwatch-stream CLI argument
- Was defined but never used in the code
- Removed to avoid misleading users
3. Fix timezone handling in timestamp parsing
- Convert timezone-aware datetime to timezone-naive UTC
- PyArrow schema uses pa.timestamp('ns') which is timezone-naive
- Prevents timezone mismatch errors
4. Add timeout and exponential backoff to polling loop
- Prevents hanging if query status never completes
- 5-minute timeout with exponential backoff (2s → 30s max)
- Raises TimeoutError with clear message
Improvements:
5. Create parent directories for local file writes
- Prevents errors when output path has non-existent parents
- Uses os.makedirs with exist_ok=True
6. Add test for empty CloudWatch results
- Verifies empty results produce zero-row table with correct schema
- Ensures schema is preserved even with no data
7. Strengthen query building tests
- Assert key parsing expressions (user, org_id, domain, etc.)
- Assert coalesce expression for x_forwarded_for
- Test custom filter_paths and exclude_paths
- Verify exclude filters are applied
8. Extend schema test to assert nullability
- Verify user and org_id fields are nullable
- Document nullability contract for unauthenticated users
Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
✅ Addressed Sourcery AI Review FeedbackThanks @sourcery-ai for the thorough review! I've addressed all 8 issues in commit 96245a9. Critical Bug Fixes1. ✅ Fixed
2. ✅ Removed unused
3. ✅ Fixed timezone mismatch in timestamp parsing
4. ✅ Added timeout and exponential backoff to polling loop
Improvements5. ✅ Create parent directories for local file writes
6. ✅ Added test for empty CloudWatch results
7. ✅ Strengthened query building tests
8. ✅ Extended schema test to assert nullability
Test CoverageAll tests pass with these changes:
The code is now more robust and all edge cases are properly handled. |
|
@sourcery-ai review |
SourceryAI
left a comment
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- In
build_query, someparsestatements reference themessagefield instead of@message, which is likely invalid in CloudWatch Logs Insights and should be made consistent with the earlierfields @timestamp, @messageusage. - The CronJob currently writes to a static S3 URI (
${S3_BUCKET_NAME}/pypi-logs/) without an hour-specific filename, so each run will overwrite the same object; consider incorporating the time window (e.g., hour) into the--output-pathto avoid data loss.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `build_query`, some `parse` statements reference the `message` field instead of `@message`, which is likely invalid in CloudWatch Logs Insights and should be made consistent with the earlier `fields @timestamp, @message` usage.
- The CronJob currently writes to a static S3 URI (`${S3_BUCKET_NAME}/pypi-logs/`) without an hour-specific filename, so each run will overwrite the same object; consider incorporating the time window (e.g., hour) into the `--output-path` to avoid data loss.
## Individual Comments
### Comment 1
<location> `deploy/clowdapp.yaml:988-989` </location>
<code_context>
+ value: "us-east-1"
+ - name: CLOUDWATCH_LOG_GROUP
+ value: "/aws/containerinsights/${ENV_NAME}/application"
+ - name: ACCESS_LOGS_OUTPUT_PATH
+ value: "${S3_BUCKET_NAME}/pypi-logs/"
+ - name: PULP_PYPI_URL
+ value: "${PULP_PYPI_URL}"
</code_context>
<issue_to_address>
**question (bug_risk):** S3 output path looks like a prefix rather than a concrete object key, which may cause overwrites or ambiguous layout.
The exporter treats `output_path` as a concrete file path or S3 URI (`s3://bucket/key`), not a directory. With `${S3_BUCKET_NAME}/pypi-logs/`, each run will write to the same key and overwrite previous data. If you want to retain historical runs, include a varying component (e.g. timestamped filename or date-partitioned path like `.../pypi-logs/YYYY/MM/DD/HH/export.parquet`).
</issue_to_address>
### Comment 2
<location> `management_tools/pulp-access-logs-exporter/tests/test_export.py:125` </location>
<code_context>
+
+
+@pytest.mark.unit
+def test_query_building():
+ """Test CloudWatch Logs Insights query construction."""
+ from pulp_access_logs_exporter.cloudwatch import build_query
</code_context>
<issue_to_address>
**suggestion (testing):** Extend query-building tests to cover edge cases like empty or None exclude_paths and regex escaping
To more thoroughly validate `build_query`, please add tests that cover:
- `exclude_paths=""` and `exclude_paths=None`, ensuring no extra `filter @message not like` clauses are added beyond the Django warning filter.
- `filter_paths` values with multiple `/` or other regex-significant characters (e.g. `/api/pypi/foo.bar/`) to confirm they’re escaped correctly.
These cases will better exercise the string/regex handling and guard against regressions for unusual inputs.
</issue_to_address>Hi @decko! 👋
Thanks for trying out Sourcery by commenting with @sourcery-ai review! 🚀
Install the sourcery-ai bot to get automatic code reviews on every pull request ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.|
|
||
|
|
||
| @pytest.mark.unit | ||
| def test_query_building(): |
There was a problem hiding this comment.
suggestion (testing): Extend query-building tests to cover edge cases like empty or None exclude_paths and regex escaping
To more thoroughly validate build_query, please add tests that cover:
exclude_paths=""andexclude_paths=None, ensuring no extrafilter @message not likeclauses are added beyond the Django warning filter.filter_pathsvalues with multiple/or other regex-significant characters (e.g./api/pypi/foo.bar/) to confirm they’re escaped correctly.
These cases will better exercise the string/regex handling and guard against regressions for unusual inputs.
There was a problem hiding this comment.
Hey - I've found 3 issues, and left some high level feedback:
- In
build_query, someparseexpressions referencemessageinstead of@message(e.g. parsing status_code, user_agent, x_forwarded_for), which may not work as expected in CloudWatch Logs Insights and should be aligned to use the correct field name consistently. - The
export-access-logs-hourlyCronJob writes to a fixedACCESS_LOGS_OUTPUT_PATH(e.g.s3://.../pypi-logs/), which will cause each run to overwrite the same S3 object; consider incorporating the hour (or full timestamp/partitioning) into the output path to keep historical hourly exports. - The
pyproject.tomldeclarespandasas a core dependency even though it appears to be used only in tests; you could move it into thetestoptional-dependency group to keep the runtime package lighter.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `build_query`, some `parse` expressions reference `message` instead of `@message` (e.g. parsing status_code, user_agent, x_forwarded_for), which may not work as expected in CloudWatch Logs Insights and should be aligned to use the correct field name consistently.
- The `export-access-logs-hourly` CronJob writes to a fixed `ACCESS_LOGS_OUTPUT_PATH` (e.g. `s3://.../pypi-logs/`), which will cause each run to overwrite the same S3 object; consider incorporating the hour (or full timestamp/partitioning) into the output path to keep historical hourly exports.
- The `pyproject.toml` declares `pandas` as a core dependency even though it appears to be used only in tests; you could move it into the `test` optional-dependency group to keep the runtime package lighter.
## Individual Comments
### Comment 1
<location> `management_tools/pulp-access-logs-exporter/src/pulp_access_logs_exporter/cli.py:73-82` </location>
<code_context>
+ """
+ # Handle "now"
+ if time_str.lower() == "now":
+ return datetime.utcnow()
+
+ # Handle relative times like "1 hour ago", "15 minutes ago"
+ relative_match = re.match(r'(\d+)\s+(minute|hour|day)s?\s+ago', time_str.lower())
+ if relative_match:
+ value = int(relative_match.group(1))
+ unit = relative_match.group(2)
+
+ if unit == 'minute':
+ delta = timedelta(minutes=value)
+ elif unit == 'hour':
+ delta = timedelta(hours=value)
+ elif unit == 'day':
+ delta = timedelta(days=value)
+
+ return datetime.utcnow() - delta
+
+ # Try to parse as ISO format
+ try:
+ # Handle both with and without 'Z' suffix
+ if time_str.endswith('Z'):
+ return datetime.fromisoformat(time_str.replace('Z', '+00:00'))
+ else:
+ return datetime.fromisoformat(time_str)
</code_context>
<issue_to_address>
**issue (bug_risk):** Mixing naive UTC datetimes with `fromisoformat` may lead to subtle timezone inconsistencies when computing epoch seconds.
`now`/relative paths return naive UTC (`datetime.utcnow()`), while `fromisoformat` can return offset-aware datetimes (e.g., when a `Z` or offset is present). `int(start_dt.timestamp())` will then interpret naive values in the local timezone, which can diverge from UTC (DST, non-UTC containers). To avoid this, normalize all values to UTC-aware datetimes (e.g., `datetime.now(timezone.utc)` and `.astimezone(timezone.utc)` on parsed values) before calling `timestamp()`.
</issue_to_address>
### Comment 2
<location> `management_tools/pulp-access-logs-exporter/pyproject.toml:14` </location>
<code_context>
+dependencies = [
+ "boto3>=1.26.0",
+ "pyarrow>=14.0.0",
+ "pandas>=2.0.0",
+]
+
</code_context>
<issue_to_address>
**suggestion:** `pandas` is declared as a dependency but is not used in the implementation.
Since the code only uses `pyarrow`, keeping `pandas` as a runtime dependency adds install size and time without benefit. Unless you plan to use it soon, consider removing it from `dependencies`.
</issue_to_address>
### Comment 3
<location> `deploy/clowdapp.yaml:975` </location>
<code_context>
+ command: ['/bin/bash', '-c']
+ args:
+ - |
+ pip install --index-url ${PULP_PYPI_URL}/simple/ \
+ pulp-access-logs-exporter && \
+ export-access-logs \
</code_context>
<issue_to_address>
**suggestion (performance):** Installing the package on every cron run may be unnecessarily slow and adds external dependencies at runtime.
Installing `pulp-access-logs-exporter` on every hourly run adds latency, depends on the internal PyPI being up, and makes behavior vary with package updates. Prefer building a dedicated image (or layer) with the tool preinstalled and version-pinned, so the cron job only runs the CLI.
Suggested implementation:
```
podSpec:
# Image with pulp-access-logs-exporter preinstalled and version-pinned
image: quay.io/pulp/pulp-access-logs-exporter:<PINNED_VERSION>
command:
- export-access-logs
args:
- --cloudwatch-group
- ${CLOUDWATCH_LOG_GROUP}
- --start-time
- "1 hour ago"
- --end-time
- "15 minutes ago"
- --output-path
- ${ACCESS_LOGS_OUTPUT_PATH}
- --aws-region
- ${AWS_REGION}
```
1. Create and publish a dedicated image (e.g. `quay.io/pulp/pulp-access-logs-exporter`) that:
- Uses an appropriate base image (ubi9/python-312 or similar).
- Installs `pulp-access-logs-exporter` with a pinned version (e.g. via `pip install pulp-access-logs-exporter==X.Y.Z`).
- Optionally sets `export-access-logs` as the container entrypoint.
2. Replace `<PINNED_VERSION>` with a specific, maintained version tag of that image to avoid implicit upgrades.
3. If the image sets `export-access-logs` as `ENTRYPOINT`, you can simplify the pod spec further by removing `command:` and only keeping the `args:` list.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| return datetime.utcnow() | ||
|
|
||
| # Handle relative times like "1 hour ago", "15 minutes ago" | ||
| relative_match = re.match(r'(\d+)\s+(minute|hour|day)s?\s+ago', time_str.lower()) | ||
| if relative_match: | ||
| value = int(relative_match.group(1)) | ||
| unit = relative_match.group(2) | ||
|
|
||
| if unit == 'minute': | ||
| delta = timedelta(minutes=value) |
There was a problem hiding this comment.
issue (bug_risk): Mixing naive UTC datetimes with fromisoformat may lead to subtle timezone inconsistencies when computing epoch seconds.
now/relative paths return naive UTC (datetime.utcnow()), while fromisoformat can return offset-aware datetimes (e.g., when a Z or offset is present). int(start_dt.timestamp()) will then interpret naive values in the local timezone, which can diverge from UTC (DST, non-UTC containers). To avoid this, normalize all values to UTC-aware datetimes (e.g., datetime.now(timezone.utc) and .astimezone(timezone.utc) on parsed values) before calling timestamp().
About the parse statements referencing |
1. Move pandas to test-only dependency
- pandas is only used in tests for DataFrame validation
- Not needed in production runtime
- Reduces install size and time
- Added comment explaining test-only usage
2. Fix CronJob to use timestamped S3 paths
- Previously used static path that would overwrite data hourly
- Now generates unique path with Hive-style partitioning:
s3://bucket/pypi-logs/year=YYYY/month=MM/day=DD/hour=HH/logs.parquet
- Uses the hour being exported (1 hour ago) for the path
- Compatible with AWS Athena, Presto, and other analytics tools
- Prevents data loss from overwrites
Example output paths:
- s3://bucket/pypi-logs/year=2026/month=02/day=04/hour=14/logs.parquet
- s3://bucket/pypi-logs/year=2026/month=02/day=04/hour=15/logs.parquet
Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Changes: - Changed CLOUDWATCH_LOG_GROUP from hardcoded pattern to template parameter - Removed hardcoded default values for deployment parameters - Parameters will be injected during deployment process Based on CloudWatch console URL analysis: - Production log group: crcp01ue1.pulp-prod - Staging log group: crcp01ue1.pulp-stage (inferred) - Not using /aws/containerinsights/ pattern Parameters to inject: - CLOUDWATCH_LOG_GROUP: Log group name (e.g., crcp01ue1.pulp-prod) - PULP_PYPI_URL: Internal PyPI URL for package installation - S3_BUCKET_NAME: S3 bucket for Parquet file storage Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
CloudWatch Logs was auto-extracting fields like 'user', 'org_id', etc. from the log entries, causing conflicts when our query tried to parse and create fields with the same names. Solution: Use unique intermediate field names (parsed_*) during parsing, then alias them to the expected names in the final fields statement. Error was: "Ephemeral field is already defined: user" Changes: - user -> parsed_user -> user (in final fields) - org_id -> parsed_org_id -> org_id - domain -> parsed_domain -> domain - distribution -> parsed_distribution -> distribution - package -> parsed_package -> package - status_code -> parsed_status_code -> status_code - user_agent -> parsed_user_agent -> user_agent This allows the query to work regardless of CloudWatch's automatic field detection settings. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Updated test assertions to check for parsed_* intermediate field names and the final field aliasing in the query. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Changed from YYYY.MM.MICRO to YYYY.0M.0D.MICRO format: - Before: 2026.2.0 (no day information) - After: 2026.02.06.0 (includes day, allows multiple releases per day) Benefits: - Clear indication of release date - Support for multiple releases per day (increment MICRO: .0, .1, .2, etc.) - Follows common CalVer practices for frequently updated tools Examples: - 2026.02.06.0 - First release on February 6, 2026 - 2026.02.06.1 - Second release on February 6, 2026 - 2026.02.07.0 - First release on February 7, 2026 Updated README to reflect new version format. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Changed from YYYY.0M.0D.MICRO to YYYYMMDD.MICRO format per calver.org: - Before: 2026.02.06.0 (normalized to 2026.2.6.0 by PEP 440) - After: 20260206.0 (no normalization issues) Benefits: - Follows CalVer standard more closely - No PEP 440 normalization conflicts - Unambiguous date representation - Similar to youtube-dl's format (YYYY.0M.0D) - Shorter, cleaner version strings Examples: - 20260206.0 - First release on February 6, 2026 - 20260206.1 - Second release on February 6, 2026 - 20260207.0 - First release on February 7, 2026 Reference: https://calver.org/ Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Changed S3 output path from partitioned directories to flat structure with CalVer-style filenames: Before: s3://bucket/pypi-logs/year=2026/month=02/day=10/hour=15/logs.parquet After: s3://bucket/pypi-logs/pulp-access-logs-20260210T150000Z.parquet Benefits: - Simpler directory structure - Easier to list and browse files - Filesystem-safe (no special characters) - Lexicographically sortable - Still contains full timestamp information - No need for partition discovery in analytics tools Filename format: pulp-access-logs-YYYYMMDDTHHMMSSZ.parquet - Uses ISO 8601 basic format (no colons) - Z indicates UTC timezone - Minutes/seconds set to 00 (hourly exports) Examples: - pulp-access-logs-20260210T150000Z.parquet (Feb 10, 2026 at 15:00 UTC) - pulp-access-logs-20260210T160000Z.parquet (Feb 10, 2026 at 16:00 UTC) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Added CLI arguments and environment variable support for S3 credentials that differ from CloudWatch credentials, enabling cross-account usage. CLI changes: - Added --s3-access-key-id argument - Added --s3-secret-access-key argument - Added --s3-session-token argument (optional, for temporary creds) Writer changes: - write_parquet() now accepts s3_credentials parameter - PyArrow S3FileSystem uses explicit credentials if provided - Falls back to default credentials (env vars/IAM) if not provided CronJob changes: - Passes S3 credentials to CLI if env vars are set - Uses bash parameter expansion for optional arguments - Added commented examples for configuring S3 credentials via secrets Use cases: 1. Same account: Leave S3 credentials unset, uses default AWS credentials 2. Cross-account: Set S3_ACCESS_KEY_ID and S3_SECRET_ACCESS_KEY env vars 3. Temporary creds: Also set S3_SESSION_TOKEN for STS credentials Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Adds a new --s3-endpoint-url CLI argument to allow specifying a custom S3 endpoint (e.g. MinIO in ephemeral environments). The endpoint is passed through to PyArrow's S3FileSystem via endpoint_override. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Add a standalone upload-parquet command for uploading local parquet files to S3, enabling a write-local-then-upload-to-multiple-buckets workflow for redundancy. Uses boto3 instead of PyArrow S3FileSystem to avoid region auto-detection issues with multipart uploads. Also adds --s3-region to both export-access-logs and upload-parquet CLIs, and only passes optional S3FileSystem kwargs when explicitly set to avoid overriding PyArrow's auto-detection with None values. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Write parquet locally first, then upload to both S3 destinations
- Upload to dataverse bucket (pulp-dataverse-s3 secret)
- Upload to Clowder objectStore bucket (cdappconfig.json, redundancy)
- Use exact hour boundaries for CloudWatch query window
- Install uv + jq at runtime (matching tested pattern)
- Gate execution to pulp* clusters only (skip crc* ephemeral)
- Use UV_INDEX for private PyPI with credentials from env vars
- CloudWatch credentials from cloudwatch-pulp-log-access-read-only secret
- Add CLUSTER_NAME, S3_BUCKET_PREFIX, PULP_PYPI_HOST, PYPI_USERNAME,
PYPI_PASSWORD parameters
- Fix env var format to use ${{PARAM}} matching file conventions
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
e235453 to
75985b2
Compare
Summary
Implements hourly export of Pulp PyPI access logs from CloudWatch Logs to Parquet format for analytics and monitoring.
Architecture
Implementation
Python Package:
pulp-access-logs-exporterCreated a new Python package in
management_tools/pulp-access-logs-exporter/that:/api/pypi/*endpoints (excludes health checks)Key Features:
OpenShift CronJob
Added
export-access-logs-hourlyCronJob todeploy/clowdapp.yaml:15 * * * *(every hour at :15 for 15-min log buffer)Parquet Schema
Exported fields matching Grafana dashboard usage:
timestamp- Event timestamp (timestamp[ns])message- Full raw log entry (preserved for ad-hoc analysis)user- Username (nullable for unauthenticated)org_id- Organization ID (nullable for unauthenticated)domain- Pulp domain namedistribution- Distribution namepackage- Package namestatus_code- HTTP status code (int16)user_agent- User-Agent header with metadatax_forwarded_for- Client IP (first IP from header)Package Structure
Usage
CLI:
CronJob: Runs automatically every hour at :15
Testing Plan
Rollback Strategy
oc patch cronjob export-access-logs-hourly -p '{"spec":{"suspend":true}}'Benefits
Related Files
management_tools/pulp-access-logs-exporter/deploy/clowdapp.yaml(added CronJob + parameters)deploy/clowdapp.yamlline 289deploy/dashboards/grafana-dashboard-*.configmap.yaml🤖 Generated with Claude Code
Summary by Sourcery
Introduce a Python-based tool and scheduled job to export Pulp PyPI access logs from CloudWatch Logs to Parquet files for analytics and monitoring.
New Features:
Enhancements:
Deployment:
Tests: