Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
301 changes: 301 additions & 0 deletions data_manager/uv.lock

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion datasets/gdp_grid/readme.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
# NOAA GDP Grid
NOAA's Global Drifter Program (GDP) provides an array of ~1300 *'satellite-tracked surface drifting buoys'.* It also provides a system framework to process collected data. Additional program information is available on NOAA's [website](https://www.aoml.noaa.gov/global-drifter-program/).

Download data from:
https://www.ngdc.noaa.gov/eog/data/GDP/

Ungzip data
Potential secondary source to download data here:
https://www.aoml.noaa.gov/phod/gdp/real-time_data.php

Unzip data
6 changes: 6 additions & 0 deletions datasets/ookla_speedtest/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Ookla Speedtest Dataset
---

The Global Broadband Speed Test is collected by Ookla, which gathers data on mobile and broadband network speeds.

The dataset contains geospatial data of fixed and mobile performance statistics by fiscal quarter ranging from 2019 to 2024. Information may be found on Ookla's Open Data Initiative Website [here](https://www.ookla.com/ookla-for-good/open-data). Ookla's datasets are public access via AWS S3 buckets; moreover, additional information regarding data location may be found [here](https://registry.opendata.aws/speedtest-global-performance/)
37 changes: 37 additions & 0 deletions datasets/ookla_speedtest/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Paths of input and output directories
# raw_dir = "/sciclone/aiddata10/REU/geo/raw/esa_landcover"
# process_dir = "/tmp"
# output_dir = "/sciclone/aiddata10/REU/geo/data/rasters/esa_landcover"

years = [ 2019]
# 2020, 2021, 2022, 2023, 2024

# Overwrite existing files?
overwrite_download = false
overwrite_processing = false


[run]
backend = "local"
task_runner = "concurrent"
run_parallel = false
max_workers = 4
bypass_error_wrapper = true
# log_dir = "/sciclone/aiddata10/REU/geo/raw/ookla_speedtest/logs"
log_dir = "./logs"


[repo]
url = "https://github.com/aiddata/geo-datasets.git"
branch = "master"
directory = "datasets/ookla_speedtest"


[deploy]
deployment_name = "ookla_speedtest"
image_tag = "05dea6e"
version = 3
flow_file_name = "main"
flow_name = "ookla_speedtest"
work_pool = "geodata-pool"
data_manager_version = "0.4.0"
108 changes: 108 additions & 0 deletions datasets/ookla_speedtest/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from pathlib import Path
from typing import List
import os
from data_manager import BaseDatasetConfiguration, Dataset, get_config
from src.download_dataset import download_files
from src.helpers import GEOPARQUET_DIR, RASTER_OUTPUT_DIR


class OoklaSpeedtestConfiguration(BaseDatasetConfiguration):
raw_dir: str = GEOPARQUET_DIR
output_dir: str = RASTER_OUTPUT_DIR
years: List[int] = [2019, 2020, 2021, 2022, 2023, 2024]
overwrite_download: bool
overwrite_processing: bool


class OoklaSpeedtest(Dataset):
name = "Ookla Speedtest"

def __init__(self, config: OoklaSpeedtestConfiguration):
self.raw_dir = Path(config.raw_dir)
self.output_dir = Path(config.output_dir)
self.years = config.years
self.overwrite_download = config.overwrite_download
self.overwrite_processing = config.overwrite_processing

def download(self, year: int, output_file_path=None) -> List[Path]:
"""
Downloading a single year's worth of data
"""
logger = self.get_logger()
logger.info(f"Downloading parquet files for year {year}")
return download_files(year)

def process(self, input_path: Path, output_path: Path):
logger = self.get_logger()

if self.overwrite_download and not self.overwrite_processing:
logger.warning("Overwrite download set but not overwrite processing.")
if output_path.exists() and not self.overwrite_processing:
logger.info(f"Processed layer exists: {output_path}")
else:
logger.info(
f"Processing file: {input_path}. Ouput will be saved to: {output_path}"
)

# importing from src files
from src.helpers import GRID_SIZE, NUM_BANDS, BAND16_COLS, BAND32_COLS
from src.transform_populate import read_parquet
from src.generate_raster import (
make_raster_profile,
write_multiband_raster_chunks,
)

# going through pipeline processing steps
gdf = read_parquet(str(input_path))
profile = make_raster_profile(num_bands=NUM_BANDS, grid_size=GRID_SIZE)

write_multiband_raster_chunks(
gdf=gdf,
band32_cols=BAND32_COLS,
band16_cols=BAND16_COLS,
profile=profile,
output_path=output_path,
)
return

def main(self):
logger = self.get_logger()

# os.makedirs(self.raw_dir / "compressed", exist_ok=True)
# os.makedirs(self.raw_dir / "uncompressed", exist_ok=True)
os.makedirs(self.raw_dir, exist_ok=True) # maybe necessary?

# Download data
logger.info("Running data download")
download = self.run_tasks(self.download, [[y] for y in self.years])
self.log_run(download)

os.makedirs(self.output_dir, exist_ok=True)

# Process data
logger.info("Running processing")
all_downloads = [f for result in download.results() for f in result]
# ensuring output_path name name matches with the input_path name
process_inputs = [
(input_path, self.output_dir / f"{input_path.stem}.tif")
for input_path in all_downloads
]
process = self.run_tasks(self.process, process_inputs)
self.log_run(process)


# ---- BEGIN BOILERPLATE ----
try:
from prefect import flow
except Exception:
pass
else:

@flow
def ookla_speedtest(config: OoklaSpeedtestConfiguration):
OoklaSpeedtest(config).run(config.run)


if __name__ == "__main__":
config = get_config(OoklaSpeedtestConfiguration)
OoklaSpeedtest(config).run(config.run)
69 changes: 69 additions & 0 deletions datasets/ookla_speedtest/src/download_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import boto3
import os
from pathlib import Path
import logging
from botocore import UNSIGNED
from botocore.client import BaseClient
from botocore.config import Config
from src.helpers import GEOPARQUET_DIR, BUCKET_NAME, REGION_NAME, QUARTERS

# Configure logging (less noise from botocore)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logging.getLogger("botocore").setLevel(
logging.WARNING
) # suppressing botocore checksum spam

# Making the directory
# os.makedirs(GEOPARQUET_DIR, exist_ok=True)


# Creating the s3 client with anonymous token
def create_s3_client() -> BaseClient:
return boto3.client(
"s3", region_name=REGION_NAME, config=Config(signature_version=UNSIGNED)
)


# Creating the download file function and creating the loggers
def prepare_download(
s3_client: BaseClient,
s3_key: str,
output_path: str = GEOPARQUET_DIR,
target_bucket_name=BUCKET_NAME,
) -> None:
filename = os.path.basename(s3_key)
# checking if directory exists
os.makedirs(output_path, exist_ok=True)
local_file_path = os.path.join(output_path, filename)
if os.path.exists(local_file_path):
logger.info(f"File {local_file_path} already exists. Skipping download.")
return
logger.info(f"Downloading s3://{target_bucket_name}/{s3_key} to {local_file_path}")
try:
s3_client.download_file(target_bucket_name, s3_key, local_file_path)
logger.info(f"Successfully downloaded {local_file_path}")
except Exception as e:
logger.error(f"Error downloading s3://{target_bucket_name}/{s3_key}: {e}")


# Doing the actual downloading; calling the S3 client, and putting the S3 filenames together
def download_files(year: int, quarters: dict = QUARTERS) -> None:
"""
Downloads the performance data files from the target Ookla S3 bucket for 1 year to a local directory
Disclaimer: there is similarity in the naming convention download_file and download_files, which cannot be changed
because of a requirement by botocore. Please note the difference.
"""
s3_client = create_s3_client()
downloaded_files = []
formats = ["parquet"] # "shapefiles" is an option to download too
service_types = ["mobile", "fixed"]
for quarter, month in quarters.items():
for format_type in formats:
for service_type in service_types:
filename = f"{year}-{month}-01_performance_{service_type}_tiles.parquet"
s3_key = f"{format_type}/performance/type={service_type}/year={year}/quarter={quarter}/{filename}"
prepare_download(s3_client, s3_key, GEOPARQUET_DIR, BUCKET_NAME)
local_file_path = Path(GEOPARQUET_DIR) / filename
downloaded_files.append(local_file_path)
return downloaded_files
104 changes: 104 additions & 0 deletions datasets/ookla_speedtest/src/generate_raster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from src.helpers import (
GRID_SIZE,
NUM_BANDS,
OUTPUT_RASTER_FILE,
MAP_BOUNDS,
BAND16_COLS,
BAND32_COLS,
)
from src.transform_populate import create_band_array
from rasterio.transform import from_bounds
from rasterio.crs import CRS
import rasterio
import geopandas as gpd
import numpy as np
import logging
import sys
import gc
from typing import Dict, List

# instantiating the logging
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
logger = logging.getLogger(__name__)


def make_raster_profile(
num_bands: int = NUM_BANDS,
grid_size: int = GRID_SIZE,
map_bounds: tuple[float, float, float, float] = MAP_BOUNDS,
) -> Dict[str, object]:
"""
Creates the raster metadata profile for writing the GeoTIFF file.
Profile includes CRS, transform, and dtype information.
---
Args:
grid_size (int): dimensions of the raster grid (width x height)
num_bands (int): number of raster bands included in the raster
map_bounds (tuple): bounds in projected coordinates
Returns:
Dict[str, object]: raster metadata profile
"""
left, bottom, right, top = map_bounds
transform = from_bounds(left, bottom, right, top, grid_size, grid_size)
profile = {
"driver": "GTiff",
"count": num_bands,
"dtype": "uint32",
"crs": CRS.from_epsg(3857),
"compress": "lzw",
"transform": transform,
"width": grid_size,
"height": grid_size,
}
return profile


def write_multiband_raster_chunks(
gdf: gpd.GeoDataFrame,
profile: dict,
output_path: str = OUTPUT_RASTER_FILE,
band32_cols: List[str] = BAND32_COLS,
band16_cols: List[str] = BAND16_COLS,
) -> None:
"""
Writes a multiband GeoTIFF raster file from the GeoDataFrame. Completes this
one band at a time then deletes in order to maximize computational expense.
Each band is created from create_band_array(), converted to dense 2D array,
flipped vertically for top-left origin, and written to the output raster file.
---
Args:
gdf (gpd.GeoDataFrame) is the input GeoDataFrame
profile (dict) is the profile outputted from make_raster_profile()
output_path (str) the target file path for saving the GeoTIFF
band32_cols (List[str]) are the column names for the uint32 bands
band16_cols (List[str]) are the column names for the uint16 bands
Returns: None
"""
total_bands = len(band32_cols) + len(band16_cols)
profile = profile.copy()
profile["count"] = total_bands
profile["BIGTIFF"] = "YES"
profile["dtype"] = "uint32" # this is a placeholder

# Writing the uint32 bands
with rasterio.open(output_path, "w", **profile) as dst:
for i, col in enumerate(band32_cols):
logger.info(f"Writing uint32 band {i + 1}: {col}")
band_sparse = create_band_array(gdf, col, dtype=np.uint32)
band_dense = band_sparse.toarray()
band_dense = np.flip(band_dense, axis=0)
dst.write(band_dense.astype(np.uint32), i + 1)
del band_sparse, band_dense
gc.collect()
# Then writing in the uint16 bands
for j, col in enumerate(band16_cols):
band_index = len(band32_cols) + j + 1
logger.info(f"Writing uint16 band {band_index}: {col}")
band_sparse = create_band_array(gdf, col, dtype=np.uint16)
band_dense = band_sparse.toarray()
band_dense = np.flip(band_dense, axis=0)
dst.write(band_dense.astype(np.uint16), band_index)
del band_sparse, band_dense
gc.collect()
logger.info(f"All {total_bands} bands written to {output_path}.")
return None
38 changes: 38 additions & 0 deletions datasets/ookla_speedtest/src/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import os

# Constants
BUCKET_NAME = "ookla-open-data"
REGION_NAME = "us-west-2"
QUARTERS = {1: "01", 2: "04", 3: "07", 4: "10"}

# Paths (Relative to Project Root)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))

GEOPARQUET_DIR = os.path.join(BASE_DIR, "data", "datasets")
RASTER_OUTPUT_DIR = os.path.join(BASE_DIR, "visualizations")

os.makedirs(GEOPARQUET_DIR, exist_ok=True)
os.makedirs(RASTER_OUTPUT_DIR, exist_ok=True)

# WILL NEED TO DEVELOP EITHER A NAMING CONVENTION OR JUST REPLACING
## THE TEST_PARQUET_FILE AND OUTPUT_RASTER_FILE FOR FINAL CHANGES
# Other Constants
TEST_PARQUET_FILE = "2019-01-01_performance_fixed_tiles.parquet"
OUTPUT_RASTER_FILE = os.path.join(
RASTER_OUTPUT_DIR, TEST_PARQUET_FILE.replace(".parquet", ".tif")
)

# Raster Processing Constants
ZOOM_LEVEL = 16
GRID_SIZE = 2**ZOOM_LEVEL
BAND32_COLS = ["avg_d_kbps", "avg_u_kbps"]
BAND16_COLS = ["avg_lat_ms", "tests", "devices"]
NUM_BANDS = 5

# For EPSG 3857
MAP_BOUNDS = (
-20037508.342789244,
-20037508.342789244,
20037508.342789244,
20037508.342789244,
)
Loading