Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 94 additions & 2 deletions sdk/python/feast/infra/offline_stores/duckdb.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from datetime import datetime
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Callable, List, Optional, Union

Expand All @@ -23,8 +23,10 @@
write_logged_features_ibis,
)
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
from feast.infra.offline_stores.offline_utils import DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL
from feast.infra.registry.base_registry import BaseRegistry
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.utils import make_tzaware, to_naive_utc


def _read_data_source(data_source: DataSource, repo_path: str) -> Table:
Expand Down Expand Up @@ -113,6 +115,66 @@ def _write_data_source(
)


def _build_entity_df_from_sources(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add docstring for this

config: RepoConfig,
feature_views: List[FeatureView],
start_date: datetime,
end_date: datetime,
data_source_reader: Callable[[DataSource, str], Table],
) -> pd.DataFrame:

entity_dfs: List[pd.DataFrame] = []

for fv in feature_views:
fv_table = data_source_reader(fv.batch_source, str(config.repo_path))

for old_name, new_name in fv.batch_source.field_mapping.items():
if old_name in fv_table.columns:
fv_table = fv_table.rename({new_name: old_name})

timestamp_field = fv.batch_source.timestamp_field

start_naive = to_naive_utc(start_date)
end_naive = to_naive_utc(end_date)

fv_table = fv_table.filter(
ibis.and_(
fv_table[timestamp_field] >= ibis.literal(start_naive),
fv_table[timestamp_field] <= ibis.literal(end_naive),
)
)

join_key_map = fv.projection.join_key_map or {
e.name: e.name for e in fv.entity_columns
}
source_join_key_cols = list(join_key_map.keys())

if source_join_key_cols:
# join_key_map is {feature_key: entity_key}; ibis rename({new: old}) renames
# old->new, so pass inverted map to rename feature columns to entity names.
distinct_entities = (
fv_table.select(*source_join_key_cols)
.rename({v: k for k, v in join_key_map.items()})
.distinct()
.execute()
)
entity_dfs.append(distinct_entities)

if not entity_dfs:
return pd.DataFrame(
{DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL: [end_date]}
)

combined = pd.concat(entity_dfs, ignore_index=True)

all_cols = list(combined.columns)
combined = combined.drop_duplicates(subset=all_cols).reset_index(drop=True)

combined[DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL] = end_date

return combined


class DuckDBOfflineStoreConfig(FeastConfigBaseModel):
type: StrictStr = "duckdb"
# """ Offline store type selector"""
Expand Down Expand Up @@ -154,11 +216,41 @@ def get_historical_features(
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
entity_df: Optional[Union[pd.DataFrame, str]],
registry: BaseRegistry,
project: str,
full_feature_names: bool = False,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> RetrievalJob:
non_entity_mode = entity_df is None

if non_entity_mode:
end_date = (
make_tzaware(end_date) if end_date else datetime.now(timezone.utc)
)

if start_date is None:
max_ttl_seconds = 0
for fv in feature_views:
if fv.ttl and isinstance(fv.ttl, timedelta):
max_ttl_seconds = max(
max_ttl_seconds, int(fv.ttl.total_seconds())
)
if max_ttl_seconds > 0:
start_date = end_date - timedelta(seconds=max_ttl_seconds)
else:
start_date = end_date - timedelta(days=30)
start_date = make_tzaware(start_date)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 229 - Line 244 is common across the offline store to figure out the start_date & end_date. If its not too much, is it possible to create utility function which can be re-used?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I’ll refactor this into a reusable utility function and apply it to similar date range calculations across the codebase in a follow-up PR


entity_df = _build_entity_df_from_sources(
config=config,
feature_views=feature_views,
start_date=start_date,
end_date=end_date,
data_source_reader=_read_data_source,
)

return get_historical_features_ibis(
config=config,
feature_views=feature_views,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@


@pytest.mark.integration
@pytest.mark.universal_offline_stores
@pytest.mark.ray_offline_stores_only
@pytest.mark.universal_offline_stores(only=["file", "duckdb"])
def test_non_entity_mode_basic(environment, universal_data_sources):
"""Test historical features retrieval without entity_df (non-entity mode).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ def test_historical_features_field_mapping(


@pytest.mark.integration
@pytest.mark.universal_offline_stores(only=["file"])
@pytest.mark.universal_offline_stores(only=["file", "duckdb"])
def test_historical_features_non_entity_retrieval(environment):
"""Test get_historical_features with entity_df=None using start_date/end_date.
Expand Down
Loading