diff --git a/sdk/python/feast/templates/spark/bootstrap.py b/sdk/python/feast/templates/spark/bootstrap.py index fc0be4ea0ab..7a397e31571 100644 --- a/sdk/python/feast/templates/spark/bootstrap.py +++ b/sdk/python/feast/templates/spark/bootstrap.py @@ -21,6 +21,7 @@ def bootstrap(): driver_stats_df.to_parquet( path=str(data_path / "driver_hourly_stats.parquet"), allow_truncated_timestamps=True, + coerce_timestamps="us", ) customer_entities = [201, 202, 203] @@ -30,6 +31,7 @@ def bootstrap(): customer_profile_df.to_parquet( path=str(data_path / "customer_daily_profile.parquet"), allow_truncated_timestamps=True, + coerce_timestamps="us", ) diff --git a/sdk/python/feast/templates/spark/feature_repo/example_repo.py b/sdk/python/feast/templates/spark/feature_repo/example_repo.py index 8ad48f53fc4..826e9db0d0c 100644 --- a/sdk/python/feast/templates/spark/feature_repo/example_repo.py +++ b/sdk/python/feast/templates/spark/feature_repo/example_repo.py @@ -5,25 +5,21 @@ from datetime import timedelta from pathlib import Path -from feast import Entity, FeatureService, FeatureView, Field +import pandas as pd +from feast.on_demand_feature_view import on_demand_feature_view +from feast import Entity, FeatureService, FeatureView, Field, RequestSource from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import ( SparkSource, ) -from feast.types import Float32, Int64 +from feast.types import Float32, Float64, Int64 # Constants related to the generated data sets CURRENT_DIR = Path(__file__).parent # Entity definitions -driver = Entity( - name="driver", - description="driver id", -) -customer = Entity( - name="customer", - description="customer id", -) +driver = Entity(name="driver", description="driver id", join_keys=["driver_id"]) +customer = Entity(name="customer", description="customer id", join_keys=["customer_id"]) # Sources driver_hourly_stats = SparkSource( @@ -55,6 +51,17 @@ source=driver_hourly_stats, tags={}, ) + +# Define a request data source which encodes features / information only +# available at request time (e.g. part of the user initiated HTTP request) +input_request = RequestSource( + name="vals_to_add", + schema=[ + Field(name="val_to_add", dtype=Int64), + Field(name="val_to_add_2", dtype=Int64), + ], +) + customer_daily_profile_view = FeatureView( name="customer_daily_profile", entities=[customer], @@ -73,3 +80,19 @@ name="driver_activity", features=[driver_hourly_stats_view, customer_daily_profile_view], ) + + +# Define an on demand feature view which can generate new features based on +# existing feature views and RequestSource features +@on_demand_feature_view( + sources=[driver_hourly_stats_view, input_request], + schema=[ + Field(name="conv_rate_plus_val1", dtype=Float64), + Field(name="conv_rate_plus_val2", dtype=Float64), + ], +) +def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame: + df = pd.DataFrame() + df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"] + df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"] + return df diff --git a/sdk/python/feast/templates/spark/feature_repo/test_workflow.py b/sdk/python/feast/templates/spark/feature_repo/test_workflow.py index 08d493fc54f..484b91eea6f 100644 --- a/sdk/python/feast/templates/spark/feature_repo/test_workflow.py +++ b/sdk/python/feast/templates/spark/feature_repo/test_workflow.py @@ -4,7 +4,6 @@ import pandas as pd from feast import FeatureStore -from feast.data_source import PushMode def run_demo(): @@ -27,27 +26,6 @@ def run_demo(): print("\n--- Online features retrieved (instead) through a feature service---") fetch_online_features(store, use_feature_service=True) - print("\n--- Simulate a stream event ingestion of the hourly stats df ---") - event_df = pd.DataFrame.from_dict( - { - "driver_id": [1001], - "event_timestamp": [ - datetime(2021, 5, 13, 10, 59, 42), - ], - "created": [ - datetime(2021, 5, 13, 10, 59, 42), - ], - "conv_rate": [1.0], - "acc_rate": [1.0], - "avg_daily_trips": [1000], - } - ) - print(event_df) - store.push("driver_stats_push_source", event_df, to=PushMode.ONLINE) - - print("\n--- Online features again with updated values from a stream push---") - fetch_online_features(store, use_feature_service=True) - print("\n--- Run feast teardown ---") subprocess.run(["feast", "teardown"]) @@ -82,8 +60,6 @@ def fetch_historical_features_entity_df(store: FeatureStore, for_batch_scoring: "driver_hourly_stats:conv_rate", "driver_hourly_stats:acc_rate", "driver_hourly_stats:avg_daily_trips", - "transformed_conv_rate:conv_rate_plus_val1", - "transformed_conv_rate:conv_rate_plus_val2", ], ).to_df() print(training_df.head()) @@ -94,23 +70,24 @@ def fetch_online_features(store, use_feature_service: bool): # {join_key: entity_value} { "driver_id": 1001, + "customer_id": 201, "val_to_add": 1000, "val_to_add_2": 2000, }, { "driver_id": 1002, + "customer_id": 202, "val_to_add": 1001, "val_to_add_2": 2002, }, ] if use_feature_service: - features_to_fetch = store.get_feature_service("driver_activity_v1") + features_to_fetch = store.get_feature_service("driver_activity") else: features_to_fetch = [ "driver_hourly_stats:acc_rate", "driver_hourly_stats:avg_daily_trips", "transformed_conv_rate:conv_rate_plus_val1", - "transformed_conv_rate:conv_rate_plus_val2", ] returned_features = store.get_online_features( features=features_to_fetch,