From cf1fae549065d52e77b82ade55cc02b926a625c9 Mon Sep 17 00:00:00 2001 From: Max Ostapenko <1611259+max-ostapenko@users.noreply.github.com> Date: Mon, 30 Jun 2025 23:14:44 +0200 Subject: [PATCH 1/3] report adjusted --- .../output/reports/tech_report_categories.js | 65 ++++++++----------- 1 file changed, 27 insertions(+), 38 deletions(-) diff --git a/definitions/output/reports/tech_report_categories.js b/definitions/output/reports/tech_report_categories.js index f12357f..c675dc4 100644 --- a/definitions/output/reports/tech_report_categories.js +++ b/definitions/output/reports/tech_report_categories.js @@ -8,9 +8,11 @@ publish('tech_report_categories', { WITH pages AS ( SELECT DISTINCT client, - root_page, - technologies - FROM ${ctx.ref('crawl', 'pages')} + category, + root_page + FROM ${ctx.ref('crawl', 'pages')} AS pages + INNER JOIN pages.technologies AS tech + INNER JOIN tech.categories AS category WHERE date = '${pastMonth}' ${constants.devRankFilter} @@ -36,7 +38,7 @@ crux AS ( merged_pages AS ( SELECT DISTINCT client, - technologies, + category, root_page FROM pages INNER JOIN crux @@ -45,46 +47,40 @@ merged_pages AS ( category_stats AS ( SELECT + client, category, - STRUCT( - MAX(IF(client = 'desktop', origins, 0)) AS desktop, - MAX(IF(client = 'mobile', origins, 0)) AS mobile - ) AS origins - FROM ( - SELECT - client, - category, - COUNT(DISTINCT root_page) AS origins - FROM merged_pages - INNER JOIN merged_pages.technologies AS tech - INNER JOIN tech.categories AS category - WHERE - category IS NOT NULL - GROUP BY - client, - category - ) - GROUP BY category + COUNT(DISTINCT root_page) AS origins + FROM merged_pages + GROUP BY + client, + category ), technology_stats AS ( SELECT + client, technology, category_obj AS categories, - origins.mobile AS mobile_origins - FROM ${ctx.ref('reports', 'tech_report_technologies')} + IF(client = 'mobile', origins.mobile, origins.desktop) AS origins + FROM ${ctx.ref('reports', 'tech_report_technologies')}, + UNNEST(ARRAY['desktop', 'mobile']) AS client ) SELECT + client, category, description, origins, - ARRAY_AGG(technology IGNORE NULLS ORDER BY technology_stats.mobile_origins DESC) AS technologies + IF( + client = 'mobile', + ARRAY_AGG(technology IGNORE NULLS ORDER BY technology_stats.origins.mobile DESC), + ARRAY_AGG(technology IGNORE NULLS ORDER BY technology_stats.origins.desktop DESC) + ) AS technologies FROM category_stats INNER JOIN technology_stats ON category_stats.category IN UNNEST(technology_stats.categories) INNER JOIN category_descriptions -USING (category) +USING (category, client) GROUP BY category, description, @@ -93,20 +89,13 @@ GROUP BY UNION ALL SELECT + client, 'ALL' AS category, NULL AS description, - STRUCT( - MAX(IF(client = 'desktop', origins, 0)) AS desktop, - MAX(IF(client = 'mobile', origins, 0)) AS mobile - ) AS origins, + COUNT(DISTINCT root_page) AS origins, NULL AS technologies -FROM ( - SELECT - client, - COUNT(DISTINCT root_page) AS origins - FROM merged_pages - GROUP BY client -) +FROM merged_pages +GROUP BY client `).postOps(ctx => ` SELECT reports.run_export_job( From 7173b0a02c662aa7e67acdff3fa028536c5c0689 Mon Sep 17 00:00:00 2001 From: Max Ostapenko <1611259+max-ostapenko@users.noreply.github.com> Date: Mon, 30 Jun 2025 23:23:11 +0200 Subject: [PATCH 2/3] cleanup --- infra/bigquery_export_spark/Dockerfile | 37 ----- infra/bigquery_export_spark/requirements.txt | 2 - infra/bigquery_export_spark/src/firestore.py | 158 ------------------- 3 files changed, 197 deletions(-) delete mode 100644 infra/bigquery_export_spark/Dockerfile delete mode 100644 infra/bigquery_export_spark/requirements.txt delete mode 100644 infra/bigquery_export_spark/src/firestore.py diff --git a/infra/bigquery_export_spark/Dockerfile b/infra/bigquery_export_spark/Dockerfile deleted file mode 100644 index 49b16fd..0000000 --- a/infra/bigquery_export_spark/Dockerfile +++ /dev/null @@ -1,37 +0,0 @@ -# Dataproc image example: https://cloud.google.com/dataproc-serverless/docs/guides/custom-containers -# Recommendation: Use Debian 12. -FROM debian:12-slim -# python:3.12-slim - -# Suppress interactive prompts -ENV DEBIAN_FRONTEND=noninteractive - -# Install utilities required by Spark scripts. -RUN apt-get update && apt-get install -y procps=\* tini=\* libjemalloc2=\* \ - && apt-get clean \ - && rm -rf /var/lib/apt/lists/* - -# Enable jemalloc2 as default memory allocator -ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2 - -# Install and configure Miniconda3. -ENV CONDA_HOME=/opt/miniforge3 -ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python -ENV PATH=${CONDA_HOME}/bin:${PATH} -ADD https://github.com/conda-forge/miniforge/releases/latest/download/Miniforge3-Linux-x86_64.sh . -RUN bash Miniforge3-Linux-x86_64.sh -b -p /opt/miniforge3 \ - && ${CONDA_HOME}/bin/conda config --system --set always_yes True \ - && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \ - && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict - -WORKDIR /app -COPY requirements.txt . - -# Install pip packages. -RUN ${PYSPARK_PYTHON} -m pip install --no-cache-dir -r requirements.txt - -# Create the 'spark' group/user. -# The GID and UID must be 1099. Home directory is required. -RUN groupadd -g 1099 spark -RUN useradd -u 1099 -g 1099 -d /home/spark -m spark -USER spark diff --git a/infra/bigquery_export_spark/requirements.txt b/infra/bigquery_export_spark/requirements.txt deleted file mode 100644 index be4c0b9..0000000 --- a/infra/bigquery_export_spark/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -google-cloud-firestore==2.20.1 -# pyspark==3.5.5 diff --git a/infra/bigquery_export_spark/src/firestore.py b/infra/bigquery_export_spark/src/firestore.py deleted file mode 100644 index 0e6d1e0..0000000 --- a/infra/bigquery_export_spark/src/firestore.py +++ /dev/null @@ -1,158 +0,0 @@ -"""This module processes Firestore documents from BigQuery using Spark.""" - -import json -import os - -from google.cloud import firestore # type: ignore -from pyspark.sql import SparkSession # type: ignore - -PROJECT = "httparchive" -DATABASE = "tech-report-api" - - -# pylint: disable=too-many-instance-attributes -class FirestoreBatch: - """Handles Firestore data batching from BigQuery using Spark.""" - - def __init__(self, export_config): - """Initialize FirestoreBatch with default settings.""" - self.config = { - "collection_name": export_config["name"], - "date": getattr(export_config, "date", ""), - "collection_type": export_config["type"], - } - self.firestore = firestore.Client(project=PROJECT, database=DATABASE) - self.batch_size = 500 - self.max_concurrent_batches = 200 - self.current_batch = [] - self.batch_promises = [] - self.spark = SparkSession.builder.appName( - "FirestoreBatchProcessor" - ).getOrCreate() - - def queue_batch(self, operation): - """Queue a batch commit operation for Firestore.""" - batch = self.firestore.batch() - - for doc in self.current_batch: - if operation == "delete": - batch.delete(doc.reference) - elif operation == "set": - doc_ref = self.firestore.collection( - self.config["collection_name"] - ).document() - batch.set(doc_ref, doc) - else: - raise ValueError("Invalid operation") - self.batch_promises.append(batch.commit()) - self.current_batch = [] - - def commit_batches(self): - """Commit all queued batch promises.""" - print( - f"Committing {len(self.batch_promises)} " - f"batches to {self.config['collection_name']}" - ) - for batch_promise in self.batch_promises: - try: - batch_promise - except Exception as e: - print(f"Error committing batch: {e}") - raise - self.batch_promises = [] - - def final_flush(self, operation): - """Flush any pending batch operations.""" - if self.current_batch: - self.queue_batch(operation) - if self.batch_promises: - self.commit_batches() - - def batch_delete(self): - """Delete Firestore documents in batches.""" - print("Starting batch deletion...") - start_time = self.spark.sparkContext.startTime - self.current_batch = [] - self.batch_promises = [] - total_docs_deleted = 0 - - collection_ref = self.firestore.collection(self.config["collection_name"]) - if self.config["collection_type"] == "report": - print( - f"Deleting documents from {self.config['collection_name']} " - f"for date {self.config['date']}" - ) - collection_query = collection_ref.where("date", "==", self.config["date"]) - elif self.config["collection_type"] == "dict": - print(f"Deleting documents from {self.config['collection_name']}") - collection_query = collection_ref - else: - raise ValueError("Invalid collection type") - while True: - docs = list( - collection_query.limit( - self.batch_size * self.max_concurrent_batches - ).stream() - ) - if not docs: - break - - for doc in docs: - self.current_batch.append(doc) - if len(self.current_batch) >= self.batch_size: - self.queue_batch("delete") - if len(self.batch_promises) >= self.max_concurrent_batches: - self.commit_batches() - total_docs_deleted += 1 - - self.final_flush("delete") - duration = (self.spark.sparkContext.startTime - start_time) / 1000 - print( - f"Deletion complete. " - f"Total docs deleted: {total_docs_deleted}. " - f"Time: {duration} seconds" - ) - - def stream_from_bigquery(self, query_str): - """Stream data from BigQuery to Firestore.""" - print("Starting BigQuery to Firestore transfer...") - start_time = self.spark.sparkContext.startTime - total_rows_processed = 0 - - df = self.spark.read.format("bigquery").option("query", query_str).load() - - for row in df.collect(): - self.current_batch.append(row.asDict()) - if len(self.current_batch) >= self.batch_size: - self.queue_batch("set") - if len(self.batch_promises) >= self.max_concurrent_batches: - self.commit_batches() - total_rows_processed += 1 - - self.final_flush("set") - duration = (self.spark.sparkContext.startTime - start_time) / 1000 - print( - f"Transfer to {self.config['collection_name']} " - f"complete. " - f"Total rows processed: " - f"{total_rows_processed}. " - f"Time: {duration} " - f"seconds" - ) - - def export(self, query_str): - """Export data from BigQuery to Firestore.""" - - self.batch_delete() - self.stream_from_bigquery(query_str) - - -if __name__ == "__main__": - # config_data = json.loads('{"name": "technologies", "type": "dict", "environment": "dev"}') - # QUERY_STR = str(json.loads("SELECT * FROM report.tech_report_technologies")) - - config_data = json.loads(os.environ["BIGQUERY_PROC_PARAM.export_config"]) - QUERY_STR = str(json.loads(os.environ["BIGQUERY_PROC_PARAM.query"])) - - processor = FirestoreBatch(config_data) - processor.export(QUERY_STR) From 7683f23e8ef2d582425a2c77a48efc8fc297af96 Mon Sep 17 00:00:00 2001 From: Max Ostapenko <1611259+max-ostapenko@users.noreply.github.com> Date: Tue, 1 Jul 2025 00:15:30 +0200 Subject: [PATCH 3/3] flattenned results --- .../output/reports/tech_report_categories.js | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/definitions/output/reports/tech_report_categories.js b/definitions/output/reports/tech_report_categories.js index c675dc4..5e73144 100644 --- a/definitions/output/reports/tech_report_categories.js +++ b/definitions/output/reports/tech_report_categories.js @@ -58,12 +58,14 @@ category_stats AS ( technology_stats AS ( SELECT - client, - technology, - category_obj AS categories, - IF(client = 'mobile', origins.mobile, origins.desktop) AS origins + category, + STRUCT( + ARRAY_AGG(technology IGNORE NULLS ORDER BY origins.mobile DESC) AS mobile, + ARRAY_AGG(technology IGNORE NULLS ORDER BY origins.desktop DESC) AS desktop + ) AS technologies FROM ${ctx.ref('reports', 'tech_report_technologies')}, - UNNEST(ARRAY['desktop', 'mobile']) AS client + UNNEST(category_obj) AS category + GROUP BY category ) SELECT @@ -73,18 +75,14 @@ SELECT origins, IF( client = 'mobile', - ARRAY_AGG(technology IGNORE NULLS ORDER BY technology_stats.origins.mobile DESC), - ARRAY_AGG(technology IGNORE NULLS ORDER BY technology_stats.origins.desktop DESC) + technologies.mobile, + technologies.desktop ) AS technologies FROM category_stats INNER JOIN technology_stats -ON category_stats.category IN UNNEST(technology_stats.categories) +USING (category) INNER JOIN category_descriptions -USING (category, client) -GROUP BY - category, - description, - origins +USING (category) UNION ALL