diff --git a/sdk/python/feast/infra/offline_stores/duckdb.py b/sdk/python/feast/infra/offline_stores/duckdb.py index e0a69e53c5..d5b5da185f 100644 --- a/sdk/python/feast/infra/offline_stores/duckdb.py +++ b/sdk/python/feast/infra/offline_stores/duckdb.py @@ -1,5 +1,5 @@ import os -from datetime import datetime +from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any, Callable, List, Optional, Union @@ -23,8 +23,10 @@ write_logged_features_ibis, ) from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob +from feast.infra.offline_stores.offline_utils import DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL from feast.infra.registry.base_registry import BaseRegistry from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.utils import make_tzaware, to_naive_utc def _read_data_source(data_source: DataSource, repo_path: str) -> Table: @@ -113,6 +115,66 @@ def _write_data_source( ) +def _build_entity_df_from_sources( + config: RepoConfig, + feature_views: List[FeatureView], + start_date: datetime, + end_date: datetime, + data_source_reader: Callable[[DataSource, str], Table], +) -> pd.DataFrame: + + entity_dfs: List[pd.DataFrame] = [] + + for fv in feature_views: + fv_table = data_source_reader(fv.batch_source, str(config.repo_path)) + + for old_name, new_name in fv.batch_source.field_mapping.items(): + if old_name in fv_table.columns: + fv_table = fv_table.rename({new_name: old_name}) + + timestamp_field = fv.batch_source.timestamp_field + + start_naive = to_naive_utc(start_date) + end_naive = to_naive_utc(end_date) + + fv_table = fv_table.filter( + ibis.and_( + fv_table[timestamp_field] >= ibis.literal(start_naive), + fv_table[timestamp_field] <= ibis.literal(end_naive), + ) + ) + + join_key_map = fv.projection.join_key_map or { + e.name: e.name for e in fv.entity_columns + } + source_join_key_cols = list(join_key_map.keys()) + + if source_join_key_cols: + # join_key_map is {feature_key: entity_key}; ibis rename({new: old}) renames + # old->new, so pass inverted map to rename feature columns to entity names. + distinct_entities = ( + fv_table.select(*source_join_key_cols) + .rename({v: k for k, v in join_key_map.items()}) + .distinct() + .execute() + ) + entity_dfs.append(distinct_entities) + + if not entity_dfs: + return pd.DataFrame( + {DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL: [end_date]} + ) + + combined = pd.concat(entity_dfs, ignore_index=True) + + all_cols = list(combined.columns) + combined = combined.drop_duplicates(subset=all_cols).reset_index(drop=True) + + combined[DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL] = end_date + + return combined + + class DuckDBOfflineStoreConfig(FeastConfigBaseModel): type: StrictStr = "duckdb" # """ Offline store type selector""" @@ -154,11 +216,41 @@ def get_historical_features( config: RepoConfig, feature_views: List[FeatureView], feature_refs: List[str], - entity_df: Union[pd.DataFrame, str], + entity_df: Optional[Union[pd.DataFrame, str]], registry: BaseRegistry, project: str, full_feature_names: bool = False, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, ) -> RetrievalJob: + non_entity_mode = entity_df is None + + if non_entity_mode: + end_date = ( + make_tzaware(end_date) if end_date else datetime.now(timezone.utc) + ) + + if start_date is None: + max_ttl_seconds = 0 + for fv in feature_views: + if fv.ttl and isinstance(fv.ttl, timedelta): + max_ttl_seconds = max( + max_ttl_seconds, int(fv.ttl.total_seconds()) + ) + if max_ttl_seconds > 0: + start_date = end_date - timedelta(seconds=max_ttl_seconds) + else: + start_date = end_date - timedelta(days=30) + start_date = make_tzaware(start_date) + + entity_df = _build_entity_df_from_sources( + config=config, + feature_views=feature_views, + start_date=start_date, + end_date=end_date, + data_source_reader=_read_data_source, + ) + return get_historical_features_ibis( config=config, feature_views=feature_views, diff --git a/sdk/python/tests/integration/offline_store/test_non_entity_mode.py b/sdk/python/tests/integration/offline_store/test_non_entity_mode.py index f17352fb35..9879f331a0 100644 --- a/sdk/python/tests/integration/offline_store/test_non_entity_mode.py +++ b/sdk/python/tests/integration/offline_store/test_non_entity_mode.py @@ -11,8 +11,7 @@ @pytest.mark.integration -@pytest.mark.universal_offline_stores -@pytest.mark.ray_offline_stores_only +@pytest.mark.universal_offline_stores(only=["file", "duckdb"]) def test_non_entity_mode_basic(environment, universal_data_sources): """Test historical features retrieval without entity_df (non-entity mode). diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index a2c7381ddc..342e61b5c3 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -731,7 +731,7 @@ def test_historical_features_field_mapping( @pytest.mark.integration -@pytest.mark.universal_offline_stores(only=["file"]) +@pytest.mark.universal_offline_stores(only=["file", "duckdb"]) def test_historical_features_non_entity_retrieval(environment): """Test get_historical_features with entity_df=None using start_date/end_date. diff --git a/sdk/python/tests/unit/infra/offline_stores/test_duckdb_non_entity.py b/sdk/python/tests/unit/infra/offline_stores/test_duckdb_non_entity.py new file mode 100644 index 0000000000..575b024389 --- /dev/null +++ b/sdk/python/tests/unit/infra/offline_stores/test_duckdb_non_entity.py @@ -0,0 +1,553 @@ +from datetime import datetime, timedelta, timezone +from unittest.mock import MagicMock, patch + +import ibis +import pandas as pd + +from feast.entity import Entity +from feast.feature_view import FeatureView, Field +from feast.infra.offline_stores import duckdb as duckdb_mod +from feast.infra.offline_stores.duckdb import ( + DuckDBOfflineStore, + DuckDBOfflineStoreConfig, +) +from feast.infra.offline_stores.file_source import FileSource +from feast.infra.offline_stores.offline_store import RetrievalJob +from feast.repo_config import RepoConfig +from feast.types import Float32, Int64, ValueType + + +def _mock_duckdb_offline_store_config(): + return DuckDBOfflineStoreConfig(type="duckdb") + + +def _mock_entity(): + return [ + Entity( + name="driver_id", + join_keys=["driver_id"], + description="Driver ID", + value_type=ValueType.INT64, + ) + ] + + +def _mock_feature_view(name: str = "driver_stats", ttl: timedelta = None): + return FeatureView( + name=name, + entities=_mock_entity(), + schema=[ + Field(name="conv_rate", dtype=Float32), + ], + source=FileSource( + path="dummy.parquet", + timestamp_field="event_timestamp", + ), + ttl=ttl, + ) + + +def _mock_data_source_reader(src_df): + """Return a data_source_reader that wraps a pandas DataFrame as an ibis memtable.""" + + def reader(data_source, repo_path): + return ibis.memtable(src_df) + + return reader + + +def test_duckdb_non_entity_historical_retrieval_accepts_dates(monkeypatch): + src = pd.DataFrame( + { + "driver_id": [1], + "event_timestamp": pd.to_datetime(["2023-01-01T12:00:00Z"]), + "conv_rate": [0.5], + } + ) + monkeypatch.setattr(duckdb_mod, "_read_data_source", _mock_data_source_reader(src)) + + repo_config = RepoConfig( + project="test_project", + registry="test_registry", + provider="local", + offline_store=_mock_duckdb_offline_store_config(), + ) + + fv = _mock_feature_view() + + retrieval_job = DuckDBOfflineStore.get_historical_features( + config=repo_config, + feature_views=[fv], + feature_refs=["driver_stats:conv_rate"], + entity_df=None, + registry=MagicMock(), + project="test_project", + full_feature_names=False, + start_date=datetime(2023, 1, 1, tzinfo=timezone.utc), + end_date=datetime(2023, 1, 2, tzinfo=timezone.utc), + ) + + assert isinstance(retrieval_job, RetrievalJob) + + +class TestNonEntityRetrieval: + """Test suite for non-entity retrieval functionality (entity_df=None) in DuckDB offline store.""" + + def test_duckdb_non_entity_snapshot_ttl_and_dedup(self, monkeypatch): + src = pd.DataFrame( + { + "driver_id": [1, 1, 1, 2, 2], + "event_timestamp": pd.to_datetime( + [ + "2025-01-01T10:00:00Z", + "2025-01-01T10:00:00Z", + "2024-12-30T10:00:00Z", + "2025-01-01T12:00:00Z", + "2025-01-02T11:00:00Z", + ] + ), + "created_ts": pd.to_datetime( + [ + "2025-01-01T10:00:01Z", + "2025-01-01T10:00:02Z", + "2024-12-30T10:00:00Z", + "2025-01-01T12:00:00Z", + "2025-01-02T11:00:00Z", + ] + ), + "conv_rate": [0.1, 0.2, 0.05, 0.3, 0.4], + } + ) + monkeypatch.setattr( + duckdb_mod, "_read_data_source", _mock_data_source_reader(src) + ) + + fv = FeatureView( + name="driver_stats", + entities=_mock_entity(), + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="conv_rate", dtype=Float32), + ], + source=FileSource( + path="unused", + timestamp_field="event_timestamp", + created_timestamp_column="created_ts", + ), + ttl=timedelta(days=1), + ) + + repo_config = RepoConfig( + project="proj", + registry="unused", + provider="local", + offline_store=_mock_duckdb_offline_store_config(), + ) + + end = datetime(2025, 1, 2, 10, 0, tzinfo=timezone.utc) + start = end - timedelta(days=7) + + job = DuckDBOfflineStore.get_historical_features( + config=repo_config, + feature_views=[fv], + feature_refs=["driver_stats:conv_rate"], + entity_df=None, + registry=MagicMock(), + project="proj", + full_feature_names=False, + start_date=start, + end_date=end, + ) + + df = job.to_df() + + assert set(df["driver_id"]) == {1, 2} + out = df.set_index("driver_id")["conv_rate"].to_dict() + assert out[1] == 0.2 + assert out[2] == 0.3 + + def test_non_entity_mode_with_both_dates_retrieves_data(self, monkeypatch): + src = pd.DataFrame( + { + "driver_id": [1, 1, 1, 2], + "event_timestamp": pd.to_datetime( + [ + "2023-01-01T08:00:00Z", + "2023-01-03T10:00:00Z", + "2023-01-05T12:00:00Z", + "2023-01-08T14:00:00Z", + ] + ), + "conv_rate": [0.1, 0.2, 0.3, 0.4], + } + ) + monkeypatch.setattr( + duckdb_mod, "_read_data_source", _mock_data_source_reader(src) + ) + + repo_config = RepoConfig( + project="test_project", + registry="test_registry", + provider="local", + offline_store=_mock_duckdb_offline_store_config(), + ) + + fv = FeatureView( + name="test_fv", + entities=_mock_entity(), + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="conv_rate", dtype=Float32), + ], + source=FileSource(path="unused", timestamp_field="event_timestamp"), + ttl=None, + ) + + start_date = datetime(2023, 1, 2, tzinfo=timezone.utc) + end_date = datetime(2023, 1, 7, tzinfo=timezone.utc) + + job = DuckDBOfflineStore.get_historical_features( + config=repo_config, + feature_views=[fv], + feature_refs=["test_fv:conv_rate"], + entity_df=None, + registry=MagicMock(), + project="test_project", + start_date=start_date, + end_date=end_date, + ) + + df = job.to_df() + + assert job.metadata.min_event_timestamp == end_date + assert job.metadata.max_event_timestamp == end_date + + assert len(df) >= 1 + assert set(df["driver_id"]) == {1} + driver1_data = df[df["driver_id"] == 1] + assert 0.3 in driver1_data["conv_rate"].values + + def test_non_entity_mode_with_end_date_only_calculates_start_from_ttl( + self, monkeypatch + ): + src = pd.DataFrame( + { + "driver_id": [1, 1, 1], + "event_timestamp": pd.to_datetime( + [ + "2023-01-05T08:00:00Z", + "2023-01-06T10:00:00Z", + "2023-01-07T12:00:00Z", + ] + ), + "conv_rate": [0.1, 0.2, 0.3], + } + ) + monkeypatch.setattr( + duckdb_mod, "_read_data_source", _mock_data_source_reader(src) + ) + + repo_config = RepoConfig( + project="test_project", + registry="test_registry", + provider="local", + offline_store=_mock_duckdb_offline_store_config(), + ) + + fv = FeatureView( + name="test_fv", + entities=_mock_entity(), + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="conv_rate", dtype=Float32), + ], + source=FileSource(path="unused", timestamp_field="event_timestamp"), + ttl=timedelta(days=1), + ) + + end_date = datetime(2023, 1, 7, 12, 0, tzinfo=timezone.utc) + + job = DuckDBOfflineStore.get_historical_features( + config=repo_config, + feature_views=[fv], + feature_refs=["test_fv:conv_rate"], + entity_df=None, + registry=MagicMock(), + project="test_project", + end_date=end_date, + ) + + df = job.to_df() + + driver1_data = df[df["driver_id"] == 1] + assert len(driver1_data) >= 1 + assert 0.3 in driver1_data["conv_rate"].values + + @patch("feast.infra.offline_stores.duckdb.datetime") + def test_no_dates_provided_defaults_to_current_time_and_filters_data( + self, mock_datetime, monkeypatch + ): + fixed_now = datetime(2023, 1, 7, 12, 0, 0, tzinfo=timezone.utc) + mock_datetime.now.return_value = fixed_now + + src = pd.DataFrame( + { + "driver_id": [1, 1, 1], + "event_timestamp": pd.to_datetime( + [ + "2023-01-05T12:00:00Z", + "2023-01-06T18:00:00Z", + "2023-01-07T11:00:00Z", + ] + ), + "conv_rate": [0.1, 0.2, 0.3], + } + ) + monkeypatch.setattr( + duckdb_mod, "_read_data_source", _mock_data_source_reader(src) + ) + + repo_config = RepoConfig( + project="test_project", + registry="test_registry", + provider="local", + offline_store=_mock_duckdb_offline_store_config(), + ) + + fv = FeatureView( + name="test_fv", + entities=_mock_entity(), + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="conv_rate", dtype=Float32), + ], + source=FileSource(path="unused", timestamp_field="event_timestamp"), + ttl=timedelta(days=1), + ) + + job = DuckDBOfflineStore.get_historical_features( + config=repo_config, + feature_views=[fv], + feature_refs=["test_fv:conv_rate"], + entity_df=None, + registry=MagicMock(), + project="test_project", + ) + + df = job.to_df() + + driver1_data = df[df["driver_id"] == 1] + assert len(driver1_data) >= 1 + assert 0.3 in driver1_data["conv_rate"].values + + def test_ttl_filtering_excludes_old_rows(self, monkeypatch): + src = pd.DataFrame( + { + "driver_id": [1, 1, 1], + "event_timestamp": pd.to_datetime( + [ + "2025-01-01T08:00:00Z", + "2025-01-01T09:30:00Z", + "2025-01-01T10:00:00Z", + ] + ), + "conv_rate": [0.1, 0.2, 0.3], + } + ) + monkeypatch.setattr( + duckdb_mod, "_read_data_source", _mock_data_source_reader(src) + ) + + fv = FeatureView( + name="driver_stats", + entities=_mock_entity(), + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="conv_rate", dtype=Float32), + ], + source=FileSource(path="unused", timestamp_field="event_timestamp"), + ttl=timedelta(hours=1), + ) + + repo_config = RepoConfig( + project="proj", + registry="unused", + provider="local", + offline_store=_mock_duckdb_offline_store_config(), + ) + + end = datetime(2025, 1, 1, 10, 0, tzinfo=timezone.utc) + start = end - timedelta(days=1) + + job = DuckDBOfflineStore.get_historical_features( + config=repo_config, + feature_views=[fv], + feature_refs=["driver_stats:conv_rate"], + entity_df=None, + registry=MagicMock(), + project="proj", + full_feature_names=False, + start_date=start, + end_date=end, + ) + + df = job.to_df() + + driver1_data = df[df["driver_id"] == 1] + assert len(driver1_data) >= 1 + assert ( + 0.2 in driver1_data["conv_rate"].values + or 0.3 in driver1_data["conv_rate"].values + ) + assert 0.1 not in driver1_data["conv_rate"].values + + def test_multiple_feature_views_with_different_ttls(self, monkeypatch): + src1 = pd.DataFrame( + { + "driver_id": [1, 1], + "event_timestamp": pd.to_datetime( + [ + "2025-01-01T08:00:00Z", + "2025-01-01T09:30:00Z", + ] + ), + "age": [25, 26], + } + ) + + src2 = pd.DataFrame( + { + "driver_id": [1, 1], + "event_timestamp": pd.to_datetime( + [ + "2025-01-01T08:00:00Z", + "2025-01-01T09:30:00Z", + ] + ), + "total_trips": [100, 101], + } + ) + + def mock_read_data_source(data_source, repo_path): + if data_source.path == "unused1": + return ibis.memtable(src1) + return ibis.memtable(src2) + + monkeypatch.setattr(duckdb_mod, "_read_data_source", mock_read_data_source) + + fv1 = FeatureView( + name="user_fv", + entities=_mock_entity(), + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="age", dtype=Float32), + ], + source=FileSource(path="unused1", timestamp_field="event_timestamp"), + ttl=timedelta(hours=1), + ) + + fv2 = FeatureView( + name="transaction_fv", + entities=_mock_entity(), + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="total_trips", dtype=Float32), + ], + source=FileSource(path="unused2", timestamp_field="event_timestamp"), + ttl=timedelta(days=1), + ) + + repo_config = RepoConfig( + project="proj", + registry="unused", + provider="local", + offline_store=_mock_duckdb_offline_store_config(), + ) + + end = datetime(2025, 1, 1, 10, 0, tzinfo=timezone.utc) + start = end - timedelta(days=1) + + job = DuckDBOfflineStore.get_historical_features( + config=repo_config, + feature_views=[fv1, fv2], + feature_refs=["user_fv:age", "transaction_fv:total_trips"], + entity_df=None, + registry=MagicMock(), + project="proj", + full_feature_names=False, + start_date=start, + end_date=end, + ) + + df = job.to_df() + + assert "age" in df.columns or "user_fv__age" in df.columns + assert ( + "total_trips" in df.columns or "transaction_fv__total_trips" in df.columns + ) + + if "age" in df.columns: + age_values = df["age"].values + assert 26 in age_values + assert 25 not in age_values + + if "total_trips" in df.columns: + trips_values = df["total_trips"].values + assert 101 in trips_values + + def test_entity_df_still_works(self, monkeypatch): + """Verify standard entity_df path is not broken by the changes.""" + src = pd.DataFrame( + { + "driver_id": [1, 2], + "event_timestamp": pd.to_datetime( + ["2025-01-01T10:00:00Z", "2025-01-01T11:00:00Z"] + ), + "conv_rate": [0.5, 0.6], + } + ) + monkeypatch.setattr( + duckdb_mod, "_read_data_source", _mock_data_source_reader(src) + ) + + repo_config = RepoConfig( + project="proj", + registry="unused", + provider="local", + offline_store=_mock_duckdb_offline_store_config(), + ) + + fv = FeatureView( + name="driver_stats", + entities=_mock_entity(), + schema=[ + Field(name="driver_id", dtype=Int64), + Field(name="conv_rate", dtype=Float32), + ], + source=FileSource(path="unused", timestamp_field="event_timestamp"), + ttl=timedelta(days=1), + ) + + entity_df = pd.DataFrame( + { + "driver_id": [1, 2], + "event_timestamp": pd.to_datetime( + ["2025-01-01T12:00:00Z", "2025-01-01T12:00:00Z"] + ), + } + ) + + job = DuckDBOfflineStore.get_historical_features( + config=repo_config, + feature_views=[fv], + feature_refs=["driver_stats:conv_rate"], + entity_df=entity_df, + registry=MagicMock(), + project="proj", + full_feature_names=False, + ) + + df = job.to_df() + + assert set(df["driver_id"]) == {1, 2} + assert len(df) == 2 diff --git a/sdk/python/tests/universal/feature_repos/repo_configuration.py b/sdk/python/tests/universal/feature_repos/repo_configuration.py index ddd952f71d..454ca12d97 100644 --- a/sdk/python/tests/universal/feature_repos/repo_configuration.py +++ b/sdk/python/tests/universal/feature_repos/repo_configuration.py @@ -91,6 +91,12 @@ OFFLINE_STORE_TO_PROVIDER_CONFIG: Dict[str, Tuple[str, Type[DataSourceCreator]]] = { "file": ("local", FileDataSourceCreator), + "duckdb": ( + "local", + importlib.import_module( + "tests.universal.feature_repos.duckdb_repo_configuration" + ).DuckDBDataSourceCreator, + ), } AVAILABLE_OFFLINE_STORES: List[Tuple[str, Type[DataSourceCreator]]] = [