Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/python/feast/templates/spark/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def bootstrap():
driver_stats_df.to_parquet(
path=str(data_path / "driver_hourly_stats.parquet"),
allow_truncated_timestamps=True,
coerce_timestamps="us",
)

customer_entities = [201, 202, 203]
Expand All @@ -30,6 +31,7 @@ def bootstrap():
customer_profile_df.to_parquet(
path=str(data_path / "customer_daily_profile.parquet"),
allow_truncated_timestamps=True,
coerce_timestamps="us",
)


Expand Down
43 changes: 33 additions & 10 deletions sdk/python/feast/templates/spark/feature_repo/example_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,21 @@
from datetime import timedelta
from pathlib import Path

from feast import Entity, FeatureService, FeatureView, Field
import pandas as pd
from feast.on_demand_feature_view import on_demand_feature_view
from feast import Entity, FeatureService, FeatureView, Field, RequestSource
from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
SparkSource,
)
from feast.types import Float32, Int64
from feast.types import Float32, Float64, Int64

# Constants related to the generated data sets
CURRENT_DIR = Path(__file__).parent


# Entity definitions
driver = Entity(
name="driver",
description="driver id",
)
customer = Entity(
name="customer",
description="customer id",
)
driver = Entity(name="driver", description="driver id", join_keys=["driver_id"])
customer = Entity(name="customer", description="customer id", join_keys=["customer_id"])

# Sources
driver_hourly_stats = SparkSource(
Expand Down Expand Up @@ -55,6 +51,17 @@
source=driver_hourly_stats,
tags={},
)

# Define a request data source which encodes features / information only
# available at request time (e.g. part of the user initiated HTTP request)
input_request = RequestSource(
name="vals_to_add",
schema=[
Field(name="val_to_add", dtype=Int64),
Field(name="val_to_add_2", dtype=Int64),
],
)

customer_daily_profile_view = FeatureView(
name="customer_daily_profile",
entities=[customer],
Expand All @@ -73,3 +80,19 @@
name="driver_activity",
features=[driver_hourly_stats_view, customer_daily_profile_view],
)


# Define an on demand feature view which can generate new features based on
# existing feature views and RequestSource features
@on_demand_feature_view(
sources=[driver_hourly_stats_view, input_request],
schema=[
Field(name="conv_rate_plus_val1", dtype=Float64),
Field(name="conv_rate_plus_val2", dtype=Float64),
],
)
def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:
df = pd.DataFrame()
df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"]
df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"]
return df
29 changes: 3 additions & 26 deletions sdk/python/feast/templates/spark/feature_repo/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import pandas as pd

from feast import FeatureStore
from feast.data_source import PushMode


def run_demo():
Expand All @@ -27,27 +26,6 @@ def run_demo():
print("\n--- Online features retrieved (instead) through a feature service---")
fetch_online_features(store, use_feature_service=True)

print("\n--- Simulate a stream event ingestion of the hourly stats df ---")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove online feature retrieval?

event_df = pd.DataFrame.from_dict(
{
"driver_id": [1001],
"event_timestamp": [
datetime(2021, 5, 13, 10, 59, 42),
],
"created": [
datetime(2021, 5, 13, 10, 59, 42),
],
"conv_rate": [1.0],
"acc_rate": [1.0],
"avg_daily_trips": [1000],
}
)
print(event_df)
store.push("driver_stats_push_source", event_df, to=PushMode.ONLINE)

print("\n--- Online features again with updated values from a stream push---")
fetch_online_features(store, use_feature_service=True)

print("\n--- Run feast teardown ---")
subprocess.run(["feast", "teardown"])

Expand Down Expand Up @@ -82,8 +60,6 @@ def fetch_historical_features_entity_df(store: FeatureStore, for_batch_scoring:
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips",
"transformed_conv_rate:conv_rate_plus_val1",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you mean to remove these? I think you use them later.

"transformed_conv_rate:conv_rate_plus_val2",
],
).to_df()
print(training_df.head())
Expand All @@ -94,23 +70,24 @@ def fetch_online_features(store, use_feature_service: bool):
# {join_key: entity_value}
{
"driver_id": 1001,
"customer_id": 201,
"val_to_add": 1000,
"val_to_add_2": 2000,
},
{
"driver_id": 1002,
"customer_id": 202,
"val_to_add": 1001,
"val_to_add_2": 2002,
},
]
if use_feature_service:
features_to_fetch = store.get_feature_service("driver_activity_v1")
features_to_fetch = store.get_feature_service("driver_activity")
else:
features_to_fetch = [
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips",
"transformed_conv_rate:conv_rate_plus_val1",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh these didn't work?

Copy link
Author

@Felix-neko Felix-neko May 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This on-the-fly feature was not declared in the example feature repo for Spark and i have just removed it from this example.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the code under aws/feature_repo/example_repo.py had this ODFV:

# Define an on demand feature view which can generate new features based on
# existing feature views and RequestSource features
@on_demand_feature_view(
    sources=[driver_stats_fv, input_request],
    schema=[
        Field(name="conv_rate_plus_val1", dtype=Float64),
        Field(name="conv_rate_plus_val2", dtype=Float64),
    ],
)
def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:
    df = pd.DataFrame()
    df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"]
    df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"]
    return df

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, it works. I've restored it.

"transformed_conv_rate:conv_rate_plus_val2",
]
returned_features = store.get_online_features(
features=features_to_fetch,
Expand Down