-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feat: DuckDB historical retrieval without entity dataframe #6108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
7661335
317244c
7bb36ee
72c1b91
da34d49
8c3b25a
7ec2d22
fd71654
6dba432
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,5 @@ | ||
| import os | ||
| from datetime import datetime | ||
| from datetime import datetime, timedelta, timezone | ||
| from pathlib import Path | ||
| from typing import Any, Callable, List, Optional, Union | ||
|
|
||
|
|
@@ -23,8 +23,10 @@ | |
| write_logged_features_ibis, | ||
| ) | ||
| from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob | ||
| from feast.infra.offline_stores.offline_utils import DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL | ||
| from feast.infra.registry.base_registry import BaseRegistry | ||
| from feast.repo_config import FeastConfigBaseModel, RepoConfig | ||
| from feast.utils import make_tzaware, to_naive_utc | ||
|
|
||
|
|
||
| def _read_data_source(data_source: DataSource, repo_path: str) -> Table: | ||
|
|
@@ -113,6 +115,66 @@ def _write_data_source( | |
| ) | ||
|
|
||
|
|
||
| def _build_entity_df_from_sources( | ||
| config: RepoConfig, | ||
| feature_views: List[FeatureView], | ||
| start_date: datetime, | ||
| end_date: datetime, | ||
| data_source_reader: Callable[[DataSource, str], Table], | ||
| ) -> pd.DataFrame: | ||
|
|
||
| entity_dfs: List[pd.DataFrame] = [] | ||
|
|
||
| for fv in feature_views: | ||
| fv_table = data_source_reader(fv.batch_source, str(config.repo_path)) | ||
|
|
||
| for old_name, new_name in fv.batch_source.field_mapping.items(): | ||
| if old_name in fv_table.columns: | ||
| fv_table = fv_table.rename({new_name: old_name}) | ||
|
|
||
| timestamp_field = fv.batch_source.timestamp_field | ||
|
|
||
| start_naive = to_naive_utc(start_date) | ||
| end_naive = to_naive_utc(end_date) | ||
|
|
||
| fv_table = fv_table.filter( | ||
| ibis.and_( | ||
| fv_table[timestamp_field] >= ibis.literal(start_naive), | ||
| fv_table[timestamp_field] <= ibis.literal(end_naive), | ||
| ) | ||
| ) | ||
|
|
||
| join_key_map = fv.projection.join_key_map or { | ||
| e.name: e.name for e in fv.entity_columns | ||
| } | ||
| source_join_key_cols = list(join_key_map.keys()) | ||
|
|
||
| if source_join_key_cols: | ||
| # join_key_map is {feature_key: entity_key}; ibis rename({new: old}) renames | ||
| # old->new, so pass inverted map to rename feature columns to entity names. | ||
| distinct_entities = ( | ||
| fv_table.select(*source_join_key_cols) | ||
| .rename({v: k for k, v in join_key_map.items()}) | ||
| .distinct() | ||
| .execute() | ||
| ) | ||
| entity_dfs.append(distinct_entities) | ||
|
|
||
| if not entity_dfs: | ||
| return pd.DataFrame( | ||
| {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL: [end_date]} | ||
| ) | ||
|
|
||
| combined = pd.concat(entity_dfs, ignore_index=True) | ||
|
|
||
| all_cols = list(combined.columns) | ||
| combined = combined.drop_duplicates(subset=all_cols).reset_index(drop=True) | ||
|
|
||
| combined[DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL] = end_date | ||
|
|
||
| return combined | ||
|
|
||
|
|
||
| class DuckDBOfflineStoreConfig(FeastConfigBaseModel): | ||
| type: StrictStr = "duckdb" | ||
| # """ Offline store type selector""" | ||
|
|
@@ -154,11 +216,41 @@ def get_historical_features( | |
| config: RepoConfig, | ||
| feature_views: List[FeatureView], | ||
| feature_refs: List[str], | ||
| entity_df: Union[pd.DataFrame, str], | ||
| entity_df: Optional[Union[pd.DataFrame, str]], | ||
| registry: BaseRegistry, | ||
| project: str, | ||
| full_feature_names: bool = False, | ||
| start_date: Optional[datetime] = None, | ||
| end_date: Optional[datetime] = None, | ||
| ) -> RetrievalJob: | ||
| non_entity_mode = entity_df is None | ||
|
|
||
| if non_entity_mode: | ||
| end_date = ( | ||
| make_tzaware(end_date) if end_date else datetime.now(timezone.utc) | ||
| ) | ||
|
|
||
| if start_date is None: | ||
| max_ttl_seconds = 0 | ||
| for fv in feature_views: | ||
| if fv.ttl and isinstance(fv.ttl, timedelta): | ||
| max_ttl_seconds = max( | ||
| max_ttl_seconds, int(fv.ttl.total_seconds()) | ||
| ) | ||
| if max_ttl_seconds > 0: | ||
| start_date = end_date - timedelta(seconds=max_ttl_seconds) | ||
| else: | ||
| start_date = end_date - timedelta(days=30) | ||
| start_date = make_tzaware(start_date) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line 229 - Line 244 is common across the offline store to figure out the start_date & end_date. If its not too much, is it possible to create utility function which can be re-used?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I’ll refactor this into a reusable utility function and apply it to similar date range calculations across the codebase in a follow-up PR |
||
|
|
||
| entity_df = _build_entity_df_from_sources( | ||
| config=config, | ||
| feature_views=feature_views, | ||
| start_date=start_date, | ||
| end_date=end_date, | ||
| data_source_reader=_read_data_source, | ||
| ) | ||
|
|
||
| return get_historical_features_ibis( | ||
| config=config, | ||
| feature_views=feature_views, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add docstring for this