diff --git a/changelog.txt b/changelog.txt new file mode 100644 index 0000000..e18ddfe --- /dev/null +++ b/changelog.txt @@ -0,0 +1,10 @@ +# Translating Feats to featureform + +Key Differences: +[] docker/setup/apply.py -> Can instead be substituted by the `featureform apply` command +[] docker/setup/materialize.py -> Not needed; Featureformt akes care of the materialization upon apply & scheduling included +[] feature_store/data_fetcher.py -> Serving features for training & inference from the ServingClient +[] create.py + features.py -> Can be combined to create the definitions.py + +Specifically, because Featureform leverages Kubernetes for orchestration, Cloud Functions and Cloud Scheduler aren't needed. +Featureform also doesn't need to store the definitions in a 3rd backing store like GCS Buckets. \ No newline at end of file diff --git a/docker/setup/FF__setup.sh b/docker/setup/FF__setup.sh new file mode 100755 index 0000000..c1dca1a --- /dev/null +++ b/docker/setup/FF__setup.sh @@ -0,0 +1,107 @@ +# Auth +gcloud auth activate-service-account $SERVICE_ACCOUNT_EMAIL \ + --key-file=$GOOGLE_APPLICATION_CREDENTIALS \ + --project=$PROJECT_ID + +# Setup GCP Project Name +echo project_id = $PROJECT_ID > ~/.bigqueryrc + +# Enable APIs +echo "\nEnabling GCP APIs" +gcloud services enable artifactregistry.googleapis.com +gcloud services enable ml.googleapis.com +gcloud services enable aiplatform.googleapis.com +gcloud services enable bigquery.googleapis.com + +# TODO: Check if this is correct +# https://docs.github.com/en/actions/deployment/deploying-to-your-cloud-provider/deploying-to-google-kubernetes-engine +gcloud services enable containerregistry.googleapis.com +gcloud services enable container.googleapis.com + + +# Create Cloud Storage Bucket +echo "\nCreating cloud storage bucket" +gsutil ls -b gs://$BUCKET_NAME || gsutil mb gs://$BUCKET_NAME + +# Create BigQuery Dataset +echo "\nCreating biqquery dataset" +bq --location=us mk --dataset $PROJECT_ID:gcp_feast_demo + +# TODO: Make sure this is the correct way to create a Kubernetes cluster +# TODO: What additional setup & info do I need to pass? +## TODO: How do I get featureform into the cluster (helm chart, etc)? +echo "\nCreating Kubernetes cluster" +gcloud container clusters create $GKE_CLUSTER --project=$GKE_PROJECT --zone=$GKE_ZONE + + + +# TODO: Replace with Featureform specific scripts +# Create & Apply the Feature Store +echo "\nCreating Feature Store" +python setup/FF_apply.py + +## Create Artifact Registry +echo "\nCreating GCP Artifact Repository for Custom Triton Serving Container" +ARTIFACT_REPOSITORY_NAME=nvidia-triton + +gcloud artifacts repositories create $ARTIFACT_REPOSITORY_NAME \ + --repository-format=docker \ + --location=$GCP_REGION \ + --description="NVIDIA Triton Docker repository" + +# Setup Vertex AI and Triton +echo "\nUploading Triton Models to Cloud Storage" +CONTAINER_IMAGE_URI=$GCP_REGION-docker.pkg.dev/$PROJECT_ID/$ARTIFACT_REPOSITORY_NAME/vertex-triton-inference +NGC_TRITON_IMAGE_URI=ghcr.io/redisventures/tritonserver-python-fil:22.11-py3 +MODEL_STORAGE_URI=gs://$BUCKET_NAME/models + +## Upload Triton Model Repository Contents +gsutil -m cp -r ./triton/models gs://$BUCKET_NAME/ +gsutil rm $MODEL_STORAGE_URI/ensemble/1/.gitkeep + +# Pull and Upload Triton Image +echo "\nPulling Triton Docker Image" +docker pull $NGC_TRITON_IMAGE_URI +docker tag $NGC_TRITON_IMAGE_URI $CONTAINER_IMAGE_URI + +echo "\nPushing Triton Docker Image to GCP" +gcloud auth configure-docker $GCP_REGION-docker.pkg.dev --quiet +docker push $CONTAINER_IMAGE_URI + +# Create Vertex AI Model +echo "\nCreating Vertex AI Model" +ENDPOINT_NAME=vaccine-predictor-endpoint +DEPLOYED_MODEL_NAME=vaccine-predictor + +gcloud ai models upload \ + --region=$GCP_REGION \ + --display-name=$DEPLOYED_MODEL_NAME \ + --container-image-uri=$CONTAINER_IMAGE_URI \ + --artifact-uri=$MODEL_STORAGE_URI \ + --container-env-vars="REDIS_CONNECTION_STRING=$REDIS_CONNECTION_STRING","REDIS_PASSWORD=$REDIS_PASSWORD","PROJECT_ID=$PROJECT_ID","GCP_REGION=$GCP_REGION","BUCKET_NAME=$BUCKET_NAME" + +# Create Endpoint +echo "\nCreating Vertex AI Endpoint" +gcloud ai endpoints create \ + --region=$GCP_REGION \ + --display-name=$ENDPOINT_NAME + +## Lookup Endpoint and Model IDs +echo "\nDeploying Model to Endpoint" +ENDPOINT_ID=$(gcloud ai endpoints list \ + --region=$GCP_REGION \ + --filter=display_name=$ENDPOINT_NAME \ + --format="value(name)") + +MODEL_ID=$(gcloud ai models list \ + --region=$GCP_REGION \ + --filter=display_name=$DEPLOYED_MODEL_NAME \ + --format="value(name)") + +# Deploy Model to the Endpoint on Vertex +gcloud ai endpoints deploy-model $ENDPOINT_ID \ + --region=$GCP_REGION \ + --model=$MODEL_ID \ + --display-name=$DEPLOYED_MODEL_NAME \ + --machine-type=n1-standard-2 \ + --service-account=$SERVICE_ACCOUNT_EMAIL \ No newline at end of file diff --git a/docker/setup/FF_apply.py b/docker/setup/FF_apply.py new file mode 100644 index 0000000..2884f7b --- /dev/null +++ b/docker/setup/FF_apply.py @@ -0,0 +1,39 @@ +# Import the configs for featureform +import featureform as ff + +from feature_store.repo import ( + FF_0_config as config, + FF_1_providers as providers, + FF_4_transformations as transformations, + FF_5_registering_sets as reg_sets, +) +from feature_store.utils import logger, storage + + +if __name__ == "__main__": + # Setup logger + logging = logger.get_logger() + + # Connect to featureform host + logging.info("Connecting to GKE cluster with Featureform") + client = ff.ResourceClient(f"{config.FEATUREFORM_HOST}") + + # TODO: Register providers (Bigquery & Redis) with Featureform + logging.info("Register BigQuery & Redis as providers with Featureform") + bigquery, redis = providers.register_providers(ff) + + # TODO: Register Sets with Featureform + logging.info("Registering entity with Featureform") + # Define an entity for the state. + # You can think of an entity as a primary key used to + state = ff.register_entity("state") + + # TODO: Defining & Registering Transformations with Featureform + transformations.register_vaccine_search_trends(bigquery, redis, state) + transformations.register_vaccine_counts(bigquery, redis, state) + + logging.info("Registering training & serving sets with Featureform") + reg_sets.register_sets(ff) + + # TODO: Apply + client.apply() diff --git a/docker/setup/FF_create.py b/docker/setup/FF_create.py new file mode 100644 index 0000000..64c77d4 --- /dev/null +++ b/docker/setup/FF_create.py @@ -0,0 +1,47 @@ +from feast import RepoConfig +from google.cloud import bigquery +from feature_store.utils import logger, storage +from feature_store.repo import config, features + + +if __name__ == "__main__": + # Setup logger + logging = logger.get_logger() + + # Create a feature store repo config + logging.info("Creating Feast repo configuration") + repo_config = RepoConfig( + project=config.FEAST_PROJECT, + # Cloud Storage Blob for the Registry + registry=f"gs://{config.BUCKET_NAME}/data/registry.db", + # Google Cloud Project -- GCP + provider="gcp", + # Redis Enterprise as the Online Store + online_store={ + "type": "redis", + "connection_string": f"{config.REDIS_CONNECTION_STRING},password={config.REDIS_PASSWORD}", + }, + entity_key_serialization_version=2, + ) + + # Host the config in cloud storage + logging.info("Uploading repo config to cloud storage bucket") + storage.upload_pkl(repo_config, config.BUCKET_NAME, config.REPO_CONFIG) + + # Generate initial features data in offline store + logging.info("Generating initial vaccine features in GCP") + client = bigquery.Client() + + features.generate_vaccine_counts( + logging, + client, + f"{config.PROJECT_ID}.{config.BIGQUERY_DATASET_NAME}.{config.WEEKLY_VACCINATIONS_TABLE}", + ) + + features.generate_vaccine_search_trends( + logging, + client, + f"{config.PROJECT_ID}.{config.BIGQUERY_DATASET_NAME}.{config.VACCINE_SEARCH_TRENDS_TABLE}", + ) + + logging.info("Done") diff --git a/docker/setup/FF_teardown.py b/docker/setup/FF_teardown.py new file mode 100644 index 0000000..c8bbab9 --- /dev/null +++ b/docker/setup/FF_teardown.py @@ -0,0 +1,19 @@ +from feature_store.repo import config +from feature_store.utils import logger, storage + + +if __name__ == "__main__": + # Setup logging + logging = logger.get_logger() + + # Create FeatureStore + logging.info("Fetching feature store") + store = storage.get_feature_store( + config_path=config.REPO_CONFIG, bucket_name=config.BUCKET_NAME + ) + + # Teardown + logging.info("Tearing down feature store") + store.teardown() + + logging.info("Done") diff --git a/docker/setup/FF_teardown.sh b/docker/setup/FF_teardown.sh new file mode 100755 index 0000000..1d78259 --- /dev/null +++ b/docker/setup/FF_teardown.sh @@ -0,0 +1,52 @@ +# Auth +gcloud auth activate-service-account $SERVICE_ACCOUNT_EMAIL \ + --key-file=$GOOGLE_APPLICATION_CREDENTIALS \ + --project=$PROJECT_ID + +# Cleanup BigQuery +bq rm -t -f "gcp_feast_demo.vaccine_search_trends" +bq rm -t -f "gcp_feast_demo.us_weekly_vaccinations" +bq rm -r -f -d "gcp_feast_demo" + + +# TODO: Need to also teardown Kubernetes cluster + +# Teardown Vertex AI Stuff +ENDPOINT_NAME=vaccine-predictor-endpoint +DEPLOYED_MODEL_NAME=vaccine-predictor +ARTIFACT_REPOSITORY_NAME=nvidia-triton + +ENDPOINT_ID=$(gcloud ai endpoints list \ + --region=$GCP_REGION \ + --filter=display_name=$ENDPOINT_NAME \ + --format="value(name)") + +DEPLOYED_MODEL_ID=$(gcloud ai endpoints describe $ENDPOINT_ID \ + --region=$GCP_REGION \ + --format="value(deployedModels.id)") + +gcloud ai endpoints undeploy-model $ENDPOINT_ID \ + --region=$GCP_REGION \ + --deployed-model-id=$DEPLOYED_MODEL_ID + +gcloud ai endpoints delete $ENDPOINT_ID \ + --region=$GCP_REGION \ + --quiet + +MODEL_ID=$(gcloud ai models list \ +--region=$GCP_REGION \ +--filter=display_name=$DEPLOYED_MODEL_NAME \ +--format="value(name)") + +gcloud ai models delete $MODEL_ID \ + --region=$GCP_REGION \ + --quiet + +gcloud artifacts repositories delete $ARTIFACT_REPOSITORY_NAME \ + --location=$GCP_REGION \ + --quiet + +# TODO: Replace with Featureform specific scripts +# Teardown Feast +echo "Tearing down Featureform infrastructure" +python setup/teardown.py \ No newline at end of file diff --git a/docker/triton/models/fetch-vaccine-features/1/model.py b/docker/triton/models/fetch-vaccine-features/1/model.py index 82aa8e5..4b40699 100644 --- a/docker/triton/models/fetch-vaccine-features/1/model.py +++ b/docker/triton/models/fetch-vaccine-features/1/model.py @@ -9,16 +9,11 @@ # and converting Triton input/output types to numpy types. import triton_python_backend_utils as pb_utils from feature_store.repo import config -from feature_store.utils import ( - DataFetcher, - logger, - storage -) +from feature_store.utils import DataFetcher, logger, storage logging = logger.get_logger() - class TritonPythonModel: """Your Python model must use the same class name. Every Python model that is created must have "TritonPythonModel" as the class name. @@ -42,20 +37,21 @@ def initialize(self, args): """ # You must parse model_config. JSON string is not parsed here - self.model_config = model_config = json.loads(args['model_config']) + self.model_config = model_config = json.loads(args["model_config"]) # Get OUTPUT0 configuration output0_config = pb_utils.get_output_config_by_name( - model_config, "feature_values") + model_config, "feature_values" + ) # Convert Triton types to numpy types self.output0_dtype = pb_utils.triton_string_to_numpy( - output0_config['data_type']) + output0_config["data_type"] + ) logging.info("Loading feature store") self.fs = storage.get_feature_store( - config_path=config.REPO_CONFIG, - bucket_name=config.BUCKET_NAME + config_path=config.REPO_CONFIG, bucket_name=config.BUCKET_NAME ) logging.info("Loading feature store") self.data_fetcher = DataFetcher(self.fs) @@ -95,16 +91,17 @@ def execute(self, requests): logging.info(state) # Fetch feature data from Feast db - feature_vector = self.data_fetcher.get_online_data(state=state[0].decode('utf-8')) + feature_vector = self.data_fetcher.get_online_data( + state=state[0].decode("utf-8") + ) feature_out = feature_vector.to_numpy().reshape(-1, 8) logging.info(feature_vector) # Create InferenceResponse inference_response = pb_utils.InferenceResponse( - output_tensors=[pb_utils.Tensor( - "feature_values", - feature_out.astype(output0_dtype) - )] + output_tensors=[ + pb_utils.Tensor("feature_values", feature_out.astype(output0_dtype)) + ] ) responses.append(inference_response) @@ -116,4 +113,4 @@ def finalize(self): Implementing `finalize` function is OPTIONAL. This function allows the model to perform any necessary clean ups before exit. """ - logging.info('Cleaning up...') \ No newline at end of file + logging.info("Cleaning up...") diff --git a/feature_store/repo/FF_0_config.py b/feature_store/repo/FF_0_config.py new file mode 100644 index 0000000..d63ea84 --- /dev/null +++ b/feature_store/repo/FF_0_config.py @@ -0,0 +1,27 @@ +import os + +PROJECT_ID = os.environ["PROJECT_ID"] +GOOGLE_APPLICATION_CREDENTIALS = os.getenv("GOOGLE_APPLICATION_CREDENTIALS") + +REDIS_CONNECTION_STRING = os.getenv("REDIS_CONNECTION_STRING", "localhost:6379") +REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", "") + +BUCKET_NAME = os.getenv("BUCKET_NAME", "gcp-feast-demo") +GCP_REGION = os.getenv("GCP_REGION", "us-east1") + +BIGQUERY_DATASET_NAME = "gcp_feast_demo" + +MODEL_NAME = "predict-vaccine-counts" +MODEL_FILENAME = "xgboost.json" + +VACCINE_SEARCH_TRENDS_TABLE = "vaccine_search_trends" +WEEKLY_VACCINATIONS_TABLE = "us_weekly_vaccinations" +DAILY_VACCINATIONS_CSV_URL = "https://raw.githubusercontent.com/owid/covid-19-data/master/public/data/vaccinations/us_state_vaccinations.csv" + +# Featureform Configuration +FEATUREFORM_HOST = os.getenv("FEATUREFORM_HOST") + +REDIS_TEAM = os.getenv("REDIS_TEAM") +REDIS_HOST = os.getenv("REDIS_HOST") +REDIS_PORT = os.getenv("REDIS_PORT") +REDIS_DB = os.getenv("REDIS_DB") diff --git a/feature_store/repo/FF_1_providers.py b/feature_store/repo/FF_1_providers.py new file mode 100644 index 0000000..5c19368 --- /dev/null +++ b/feature_store/repo/FF_1_providers.py @@ -0,0 +1,27 @@ +from feature_store.repo import FF_0_config as config + +# TODO: Check if the parameters are correct +# TODO: Check if this is the best way to structure the providers +def register_providers(ff): + # TODO: Check if the credentials_path should be pulling from somewhere else + # # i.e. in the redis_feast project it's a var but we pass in a path as a string + bigquery = ff.register_bigquery( + name="bigquery-ff", + description="A BigQuery deployment we created for the Covid-19 vaccine demo.", + project_id=f"{config.PROJECT_ID}", + dataset_id=f"{config.BIGQUERY_DATASET_NAME}", + credentials_path=f"{config.GOOGLE_APPLICATION_CREDENTIALS}", + ) + + # Redis Registeration + redis = ff.register_redis( + name=f"{config.PROJECT_ID}", + description="A Redis Deployment we created for the Covid-19 vaccine demo.", + team=f"{config.REDIS_TEAM}", + host=f"{config.REDIS_HOST}", # The internal dns name for redis - "quickstart-redis", + port=f"{config.REDIS_PORT}", + password=f"{config.REDIS_PASSWORD}", + db=f"{config.REDIS_DB}", + ) + + return bigquery, redis diff --git a/feature_store/repo/FF_4_transformations.py b/feature_store/repo/FF_4_transformations.py new file mode 100644 index 0000000..fc89254 --- /dev/null +++ b/feature_store/repo/FF_4_transformations.py @@ -0,0 +1,206 @@ +import pandas as pd +import tempfile + +from feature_store.repo import config +from feature_store.utils import storage, logger + + +logging = logger.get_logger() + + +def register_vaccine_search_trends(offline_store, online_store, entity): + @offline_store.sql_transformation( + name="generate_vaccine_search_trends", + variant="default", + description="Generate and upload weekly vaccine search trends features derived from a public Google dataset stored in BigQuery.", + ) + def generate_vaccine_search_trends(): + return f""" + WITH vaccine_trends AS ( + SELECT + date, + sub_region_1 as state, + avg(sni_covid19_vaccination) as lag_1_vaccine_interest, + avg(sni_vaccination_intent) as lag_1_vaccine_intent, + avg(sni_safety_side_effects) as lag_1_vaccine_safety + FROM + `bigquery-public-data.covid19_vaccination_search_insights.covid19_vaccination_search_insights` + GROUP BY + date, state + ), + weekly_trends AS ( + SELECT + TIMESTAMP(date) as date, + state, + lag_1_vaccine_interest, + lag(lag_1_vaccine_interest) + over (partition by state order by date ASC) as lag_2_vaccine_interest, + lag_1_vaccine_intent, + lag(lag_1_vaccine_intent) + over (partition by state order by date ASC) as lag_2_vaccine_intent, + lag_1_vaccine_safety, + lag(lag_1_vaccine_safety) + over (partition by state order by date ASC) as lag_2_vaccine_safety + FROM + vaccine_trends + ) + SELECT + date, + state, + lag_1_vaccine_interest, + lag_2_vaccine_interest, + lag_1_vaccine_intent, + lag_2_vaccine_intent, + lag_1_vaccine_safety, + lag_2_vaccine_safety + FROM + weekly_trends + WHERE + state IS NOT NULL AND + lag_1_vaccine_interest IS NOT NULL AND + lag_2_vaccine_interest IS NOT NULL AND + lag_1_vaccine_intent IS NOT NULL AND + lag_2_vaccine_intent IS NOT NULL AND + lag_1_vaccine_safety IS NOT NULL AND + lag_2_vaccine_safety IS NOT NULL + ORDER BY + date ASC, + state; + """ + + generate_vaccine_search_trends.register_resources( + name="vaccine_search_trends", + entity=entity, + entity_column="customerid", + inference_store=online_store, + features=[ + { + "name": "lag_1_vaccine_interest", + "column": "lag_1_vaccine_interest", + "type": "float32", + }, + { + "name": "lag_2_vaccine_interest", + "column": "lag_2_vaccine_interest", + "type": "float32", + }, + { + "name": "lag_1_vaccine_intent", + "column": "lag_1_vaccine_intent", + "type": "float32", + }, + { + "name": "lag_2_vaccine_intent", + "column": "lag_2_vaccine_intent", + "type": "float32", + }, + { + "name": "lag_1_vaccine_safety", + "column": "lag_1_vaccine_safety", + "type": "float32", + }, + { + "name": "lag_2_vaccine_safety", + "column": "lag_2_vaccine_safety", + "type": "float32", + }, + ], + ) + + +def register_vaccine_counts(offline_store, online_store, entity): + @offline_store.df_transformation( + name="generate_vaccine_counts", + variant="default", + description="Generate vaccine count features.", + ) + def generate_vaccine_counts(): + # Generate temp dir + tmpdir = tempfile.gettempdir() + input_filename = f"{tmpdir}/us_state_vaccinations.csv" + + # Download the CSV file from URL + storage.download_file_url( + filename=input_filename, url=config.DAILY_VACCINATIONS_CSV_URL + ) + + logging.info("Loading us_state_vaccinations.csv") + df = pd.read_csv(input_filename)[["date", "location", "daily_vaccinations"]] + logging.info(f"Loaded {len(df)} daily vaccination records") + + logging.info("Cleaning dataset") + df["date"] = df["date"].astype("datetime64[ns]") + + logging.info("Truncating records and filling NaNs") + df = df[ + (~df.location.isin(["United States", "Long Term Care"])) + & (df.date >= "2021-1-1") + ].fillna(0) + logging.info(f"{len(df)} daily records remaining") + + logging.info("Rolling up counts into weeks starting on Mondays") + df = ( + df.groupby([pd.Grouper(freq="W-Mon", key="date"), "location"])[ + "daily_vaccinations" + ] + .sum() + .reset_index() + ) + df.rename( + columns={ + "daily_vaccinations": "lag_1_weekly_vaccinations_count", + "location": "state", + }, + inplace=True, + ) + logging.info( + f"{len(df)} weekly vaccine count records for {len(df.state.value_counts())} total states & territories" + ) + + logging.info("Creating lagged features") + df["weekly_vaccinations_count"] = df.groupby( + "state" + ).lag_1_weekly_vaccinations_count.shift(periods=-1) + df["lag_2_weekly_vaccinations_count"] = df.groupby( + "state" + ).lag_1_weekly_vaccinations_count.shift(periods=1) + df.sort_values(["date", "state"], inplace=True) + + logging.info("Saving dataframe...") + df["weekly_vaccinations_count"] = df["weekly_vaccinations_count"].astype( + "Int64", errors="ignore" + ) + df["lag_1_weekly_vaccinations_count"] = df[ + "lag_1_weekly_vaccinations_count" + ].astype("Int64", errors="ignore") + df["lag_2_weekly_vaccinations_count"] = df[ + "lag_2_weekly_vaccinations_count" + ].astype("Int64", errors="ignore") + df["date"] = df["date"].dt.strftime("%Y-%m-%d %H:%M:%S") + + logging.info("Generated weekly vaccine count features") + return df + + generate_vaccine_counts.register_resources( + name="weekly_vaccinations", + entity=entity, + entity_column="customerid", + inference_store=online_store, + features=[ + { + "name": "lag_1_weekly_vaccinations_count", + "column": "lag_1_weekly_vaccinations_count", + "type": "float32", + }, + { + "name": "lag_2_weekly_vaccinations_count", + "column": "lag_2_weekly_vaccinations_count", + "type": "float32", + }, + { + "name": "weekly_vaccinations_count", + "column": "weekly_vaccinations_count", + "type": "float32", + }, + ], + ) diff --git a/feature_store/repo/FF_5_registering_sets.py b/feature_store/repo/FF_5_registering_sets.py new file mode 100644 index 0000000..53dc573 --- /dev/null +++ b/feature_store/repo/FF_5_registering_sets.py @@ -0,0 +1,38 @@ +import FF_4_transformations as transformations + + +def register_sets(ff): + # Register the training set + ff.register_training_set( + name="training_features", + variant="default", + features=[ + ("lag_1_vaccine_interest"), + ("lag_2_vaccine_interest"), + ("lag_1_vaccine_intent"), + ("lag_2_vaccine_intent"), + ("lag_1_vaccine_safety"), + ("lag_2_vaccine_safety"), + ("lag_1_weekly_vaccinations_count"), + ("lag_2_weekly_vaccinations_count"), + ], + labels=[ + ("weekly_vaccinations_count"), + ], + ) + + # TODO: There's got to be a way better way to do this + ff.register_training_set( + name="serving_features", + variant="default", + features=[ + ("lag_1_vaccine_interest"), + ("lag_2_vaccine_interest"), + ("lag_1_vaccine_intent"), + ("lag_2_vaccine_intent"), + ("lag_1_vaccine_safety"), + ("lag_2_vaccine_safety"), + ("lag_1_weekly_vaccinations_count"), + ("lag_2_weekly_vaccinations_count"), + ], + ) diff --git a/feature_store/repo/FF_6_data_fetch.py b/feature_store/repo/FF_6_data_fetch.py new file mode 100644 index 0000000..0d92d0d --- /dev/null +++ b/feature_store/repo/FF_6_data_fetch.py @@ -0,0 +1,36 @@ +import pandas as pd + +import featureform as ff + +from typing import Optional + +from feature_store.repo import FF_0_config as config + + +class DataFetcher: + X_cols = [ + "lag_1_vaccine_interest", + "lag_2_vaccine_interest", + "lag_1_vaccine_intent", + "lag_2_vaccine_intent", + "lag_1_vaccine_safety", + "lag_2_vaccine_safety", + "lag_1_weekly_vaccinations_count", + "lag_2_weekly_vaccinations_count", + ] + + y_col = ["weekly_vaccinations_count"] + + # TODO: Replace with Featureform get functions + def __init__(self, ff): + serving = ff.ServingClient(host=f"{config.FEATUREFORM_HOST}") + + # TODO: Make this actually work + def get_online_data(self, **entities) -> pd.DataFrame: + dataset = serving.training_set("fraud_training", "default") + return dataset + + # TODO: Make this actually work + def get_training_data(self) -> pd.DataFrame: + dataset = serving.training_set("fraud_training", "default") + return dataset diff --git a/feature_store/repo/features.py b/feature_store/repo/features.py index 932c3f8..d4bcf22 100644 --- a/feature_store/repo/features.py +++ b/feature_store/repo/features.py @@ -3,17 +3,8 @@ from datetime import timedelta from google.cloud import bigquery -from feast import ( - BigQuerySource, - Entity, - FeatureService, - FeatureView, - Field -) -from feast.types import ( - Float32, - Int64 -) +from feast import BigQuerySource, Entity, FeatureService, FeatureView, Field +from feast.types import Float32, Int64 from feature_store.repo import config from feature_store.utils import storage @@ -30,7 +21,7 @@ table=f"{config.PROJECT_ID}.{config.BIGQUERY_DATASET_NAME}.{config.VACCINE_SEARCH_TRENDS_TABLE}", # The event timestamp is used for point-in-time joins and for ensuring only # features within the TTL are returned - timestamp_field="date" + timestamp_field="date", ) # Feature views are a grouping based on how features are stored in either the @@ -59,7 +50,7 @@ Field(name="lag_1_vaccine_intent", dtype=Float32), Field(name="lag_2_vaccine_intent", dtype=Float32), Field(name="lag_1_vaccine_safety", dtype=Float32), - Field(name="lag_2_vaccine_safety", dtype=Float32) + Field(name="lag_2_vaccine_safety", dtype=Float32), ], source=vaccine_search_trends_src, ) @@ -68,7 +59,7 @@ weekly_vaccinations_src = BigQuerySource( name="weekly_vaccinations_src", table=f"{config.PROJECT_ID}.{config.BIGQUERY_DATASET_NAME}.{config.WEEKLY_VACCINATIONS_TABLE}", - timestamp_field="date" + timestamp_field="date", ) weekly_vaccinations_fv = FeatureView( @@ -78,7 +69,7 @@ schema=[ Field(name="lag_1_weekly_vaccinations_count", dtype=Int64), Field(name="lag_2_weekly_vaccinations_count", dtype=Int64), - Field(name="weekly_vaccinations_count", dtype=Int64) + Field(name="weekly_vaccinations_count", dtype=Int64), ], source=weekly_vaccinations_src, ) @@ -88,27 +79,19 @@ name="serving_features", features=[ vaccine_search_trends_fv, - weekly_vaccinations_fv[[ - "lag_1_weekly_vaccinations_count", - "lag_2_weekly_vaccinations_count" - ]] + weekly_vaccinations_fv[ + ["lag_1_weekly_vaccinations_count", "lag_2_weekly_vaccinations_count"] + ], ], ) training_features = FeatureService( name="training_features", - features=[ - vaccine_search_trends_fv, - weekly_vaccinations_fv - ], + features=[vaccine_search_trends_fv, weekly_vaccinations_fv], ) -def generate_vaccine_search_trends( - logging, - client: bigquery.Client, - table_id: str -): +def generate_vaccine_search_trends(logging, client: bigquery.Client, table_id: str): """ Generate and upload weekly vaccine search trends features derived from a public Google dataset stored in BigQuery. @@ -118,8 +101,7 @@ def generate_vaccine_search_trends( table_id (str): Table ID for this feature set. """ job_config = bigquery.QueryJobConfig( - destination=table_id, - write_disposition='WRITE_TRUNCATE' + destination=table_id, write_disposition="WRITE_TRUNCATE" ) sql = f""" WITH vaccine_trends AS ( @@ -177,11 +159,8 @@ def generate_vaccine_search_trends( query_job.result() logging.info("Generated weekly vaccine search trends features") -def generate_vaccine_counts( - logging, - client: bigquery.Client, - table_id: str -): + +def generate_vaccine_counts(logging, client: bigquery.Client, table_id: str): """ Generate and upload vaccine count features from a CSV to BigQuery. @@ -197,36 +176,62 @@ def generate_vaccine_counts( # Download the CSV file from URL storage.download_file_url( - filename=input_filename, - url=config.DAILY_VACCINATIONS_CSV_URL + filename=input_filename, url=config.DAILY_VACCINATIONS_CSV_URL ) logging.info("Loading us_state_vaccinations.csv") - df = pd.read_csv(input_filename)[['date', 'location', 'daily_vaccinations']] + df = pd.read_csv(input_filename)[["date", "location", "daily_vaccinations"]] logging.info(f"Loaded {len(df)} daily vaccination records") logging.info("Cleaning dataset") - df['date'] = df['date'].astype('datetime64[ns]') + df["date"] = df["date"].astype("datetime64[ns]") logging.info("Truncating records and filling NaNs") - df = df[(~df.location.isin(['United States', 'Long Term Care'])) & (df.date >= '2021-1-1')].fillna(0) + df = df[ + (~df.location.isin(["United States", "Long Term Care"])) + & (df.date >= "2021-1-1") + ].fillna(0) logging.info(f"{len(df)} daily records remaining") logging.info("Rolling up counts into weeks starting on Mondays") - df = df.groupby([pd.Grouper(freq='W-Mon', key='date'), 'location'])['daily_vaccinations'].sum().reset_index() - df.rename(columns={'daily_vaccinations': 'lag_1_weekly_vaccinations_count', 'location': 'state'}, inplace=True) - logging.info(f"{len(df)} weekly vaccine count records for {len(df.state.value_counts())} total states & territories") + df = ( + df.groupby([pd.Grouper(freq="W-Mon", key="date"), "location"])[ + "daily_vaccinations" + ] + .sum() + .reset_index() + ) + df.rename( + columns={ + "daily_vaccinations": "lag_1_weekly_vaccinations_count", + "location": "state", + }, + inplace=True, + ) + logging.info( + f"{len(df)} weekly vaccine count records for {len(df.state.value_counts())} total states & territories" + ) logging.info("Creating lagged features") - df['weekly_vaccinations_count'] = df.groupby('state').lag_1_weekly_vaccinations_count.shift(periods=-1) - df['lag_2_weekly_vaccinations_count'] = df.groupby('state').lag_1_weekly_vaccinations_count.shift(periods=1) - df.sort_values(['date', 'state'], inplace=True) + df["weekly_vaccinations_count"] = df.groupby( + "state" + ).lag_1_weekly_vaccinations_count.shift(periods=-1) + df["lag_2_weekly_vaccinations_count"] = df.groupby( + "state" + ).lag_1_weekly_vaccinations_count.shift(periods=1) + df.sort_values(["date", "state"], inplace=True) logging.info("Saving dataframe...") - df['weekly_vaccinations_count'] = df['weekly_vaccinations_count'].astype('Int64', errors='ignore') - df['lag_1_weekly_vaccinations_count'] = df['lag_1_weekly_vaccinations_count'].astype('Int64', errors='ignore') - df['lag_2_weekly_vaccinations_count'] = df['lag_2_weekly_vaccinations_count'].astype('Int64', errors='ignore') - df['date'] = df['date'].dt.strftime("%Y-%m-%d %H:%M:%S") + df["weekly_vaccinations_count"] = df["weekly_vaccinations_count"].astype( + "Int64", errors="ignore" + ) + df["lag_1_weekly_vaccinations_count"] = df[ + "lag_1_weekly_vaccinations_count" + ].astype("Int64", errors="ignore") + df["lag_2_weekly_vaccinations_count"] = df[ + "lag_2_weekly_vaccinations_count" + ].astype("Int64", errors="ignore") + df["date"] = df["date"].dt.strftime("%Y-%m-%d %H:%M:%S") logging.info("Uploading CSV") # Save back to tempfile @@ -236,7 +241,7 @@ def generate_vaccine_counts( storage.upload_file( local_filename=output_filename, remote_filename=output_storage_filename, - bucket_name=config.BUCKET_NAME + bucket_name=config.BUCKET_NAME, ) # Load bq job config @@ -246,20 +251,20 @@ def generate_vaccine_counts( bigquery.SchemaField("state", "STRING"), bigquery.SchemaField("lag_1_weekly_vaccinations_count", "INTEGER"), bigquery.SchemaField("weekly_vaccinations_count", "INTEGER"), - bigquery.SchemaField("lag_2_weekly_vaccinations_count", "INTEGER") + bigquery.SchemaField("lag_2_weekly_vaccinations_count", "INTEGER"), ], skip_leading_rows=1, max_bad_records=2, source_format=bigquery.SourceFormat.CSV, - write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE + write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, ) # Start the job logging.info("Running query") load_job = client.load_table_from_uri( f"gs://{config.BUCKET_NAME}/{output_storage_filename}", table_id, - job_config=job_config + job_config=job_config, ) # Wait for job to complete load_job.result() - logging.info("Generated weekly vaccine count features") \ No newline at end of file + logging.info("Generated weekly vaccine count features") diff --git a/feature_store/utils/FF_storage.py b/feature_store/utils/FF_storage.py new file mode 100644 index 0000000..6e20c5c --- /dev/null +++ b/feature_store/utils/FF_storage.py @@ -0,0 +1,98 @@ +import requests +import pickle + + +from google.cloud import storage +from typing import Any + + +def get_feature_store(config_path: str, bucket_name: str) -> FeatureStore: + """ + Fetch the Feast Feature Store using the repo config stored + in GCS. + + Returns: + FeatureStore: Feast FeatureStore + """ + return FeatureStore( + config=fetch_pkl(remote_filename=config_path, bucket_name=bucket_name) + ) + + +def get_blob(remote_filename: str, bucket_name: str): + """ + Grab a pointer to the GCS blob in bucket. + + Args: + remote_filename (str): Path to the remote file within the GCS bucket. + bucket_name (str): Name of the GCS bucket. + """ + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(remote_filename) + return blob + + +def upload_file(local_filename: str, bucket_name: str, remote_filename: str) -> None: + """ + Upload a local file to GCS (Google Cloud Storage) bucket + + Args: + local_filename (str): Path to the local file to upload to GCS. + bucket_name (str): Name of the GCS bucket. + remote_filename (str): Path to the remote file within the GCS bucket. + """ + blob = get_blob(remote_filename, bucket_name) + blob.upload_from_filename(local_filename) + + +def upload_pkl( + obj: Any, + bucket_name: str, + remote_filename: str, +) -> None: + """ + Upload an object to GCS as a pickle file. + + Args: + obj (Any): Some object. + bucket_name (str): Name of the GCS bucket. + remote_filename (str): Path to the remote file within the GCS bucket. + """ + blob = get_blob(remote_filename, bucket_name) + pickle_out = pickle.dumps(obj) + blob.upload_from_string(pickle_out) + + +def fetch_pkl(bucket_name: str, remote_filename: str) -> Any: + """ + Fetch a pickled object from GCS. + + Args: + bucket_name (str): Name of the GCS bucket. + remote_filename (str): Path to the remote file within the GCS bucket. + + Returns: + Any: Some object. + """ + # Get the blob and download + blob = get_blob(remote_filename, bucket_name) + pickle_in = blob.download_as_string() + obj = pickle.loads(pickle_in) + return obj + + +def download_file_url(filename: str, url: str): + """ + Download a file by iterating over chunks of content and + saving it to a local file. + + Args: + filename (str): Filename to store the resulting data in. + url (str): URL to fetch the file from. + """ + with requests.get(url, stream=True) as r: + r.raise_for_status() + with open(filename, "wb") as f: + for chunk in r.iter_content(chunk_size=8192): + f.write(chunk) diff --git a/feature_store/utils/data_fetcher.py b/feature_store/utils/data_fetcher.py index e5eacfe..5e8da39 100644 --- a/feature_store/utils/data_fetcher.py +++ b/feature_store/utils/data_fetcher.py @@ -6,17 +6,17 @@ class DataFetcher: X_cols = [ - 'lag_1_vaccine_interest', - 'lag_2_vaccine_interest', - 'lag_1_vaccine_intent', - 'lag_2_vaccine_intent', - 'lag_1_vaccine_safety', - 'lag_2_vaccine_safety', - 'lag_1_weekly_vaccinations_count', - 'lag_2_weekly_vaccinations_count' + "lag_1_vaccine_interest", + "lag_2_vaccine_interest", + "lag_1_vaccine_intent", + "lag_2_vaccine_intent", + "lag_1_vaccine_safety", + "lag_2_vaccine_safety", + "lag_1_weekly_vaccinations_count", + "lag_2_weekly_vaccinations_count", ] - y_col = ['weekly_vaccinations_count'] + y_col = ["weekly_vaccinations_count"] def __init__(self, fs: FeatureStore): """ @@ -39,8 +39,7 @@ def get_online_data(self, **entities) -> pd.DataFrame: """ try: features = self._fs.get_online_features( - features=self.serving_feature_svc, - entity_rows=[entities] + features=self.serving_feature_svc, entity_rows=[entities] ).to_df() return features[self.X_cols] except Exception as why: @@ -49,7 +48,7 @@ def get_online_data(self, **entities) -> pd.DataFrame: def get_training_data( self, entity_df: Optional[pd.DataFrame] = None, - entity_query: Optional[str] = None + entity_query: Optional[str] = None, ) -> pd.DataFrame: """ Fetch point-in-time correct ML Features from the @@ -65,14 +64,12 @@ def get_training_data( try: if entity_df: return self._fs.get_historical_features( - features=self.training_feature_svc, - entity_df=entity_df + features=self.training_feature_svc, entity_df=entity_df ).to_df() if entity_query: # Otherwise query the offline source of record return self._fs.get_historical_features( - features=self.training_feature_svc, - entity_df=entity_query + features=self.training_feature_svc, entity_df=entity_query ).to_df() except Exception as why: print(why)