From 068c3887c8a05854f5b000ba6d9a763f2ad52f2a Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 12 May 2026 01:35:16 -0700 Subject: [PATCH 1/2] fix: Apply field mapping to join keys in local compute engine nodes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a batch source defines a `field_mapping` that renames an entity join key (e.g. `USERID` -> `user_id`), the source-read node renames the columns on the pulled Arrow table to their mapped names. Downstream `LocalDedupNode` and `LocalJoinNode` then look up the *pre-mapping* names from `column_info.join_keys`, which raises `KeyError: Index(['USERID'])` during materialization (or returns an empty join). Add a `join_keys_columns` property on `ColumnInfo` that mirrors the existing `timestamp_column` / `created_timestamp_column` properties — returning join keys translated through `field_mapping` — and use it from the dedup and join nodes. Fixes #5942. Signed-off-by: 1fanwang <1fannnw@gmail.com> --- .../infra/compute_engines/dag/context.py | 12 +++++ .../infra/compute_engines/local/nodes.py | 14 ++++-- .../infra/compute_engines/local/test_nodes.py | 50 +++++++++++++++++++ 3 files changed, 72 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/compute_engines/dag/context.py b/sdk/python/feast/infra/compute_engines/dag/context.py index 46eda356223..38b9a788872 100644 --- a/sdk/python/feast/infra/compute_engines/dag/context.py +++ b/sdk/python/feast/infra/compute_engines/dag/context.py @@ -42,6 +42,18 @@ def created_timestamp_column(self) -> Optional[str]: """ return self._get_mapped_column(self.created_ts_col) + @property + def join_keys_columns(self) -> List[str]: + """ + Get the join keys, mapped through field_mapping to their post-rename + column names. Use this when looking up columns on a DataFrame that has + already had its source columns renamed (e.g. inside DAG nodes that + consume the output of a source-read node). + """ + if not self.field_mapping: + return list(self.join_keys) + return [self.field_mapping.get(key, key) for key in self.join_keys] + def _get_mapped_column(self, column: Optional[str]) -> Optional[str]: """ Helper method to get the mapped column name if it exists in field_mapping. diff --git a/sdk/python/feast/infra/compute_engines/local/nodes.py b/sdk/python/feast/infra/compute_engines/local/nodes.py index 808231419cd..3274568671b 100644 --- a/sdk/python/feast/infra/compute_engines/local/nodes.py +++ b/sdk/python/feast/infra/compute_engines/local/nodes.py @@ -79,6 +79,10 @@ def execute(self, context: ExecutionContext) -> ArrowTableValue: for val in input_values: val.assert_format(DAGFormat.ARROW) + # The upstream source-read node has already renamed columns via + # field_mapping, so use the mapped join keys for joining (see #5942). + join_keys = self.column_info.join_keys_columns + # Convert all upstream ArrowTables to backend DataFrames joined_df = self.backend.from_arrow(input_values[0].data) for val in input_values[1:]: @@ -86,7 +90,7 @@ def execute(self, context: ExecutionContext) -> ArrowTableValue: joined_df = self.backend.join( joined_df, next_df, - on=self.column_info.join_keys, + on=join_keys, how=self.how, ) @@ -105,7 +109,7 @@ def execute(self, context: ExecutionContext) -> ArrowTableValue: joined_df = self.backend.join( entity_df, joined_df, - on=self.column_info.join_keys, + on=join_keys, how="left", ) @@ -193,8 +197,10 @@ def execute(self, context: ExecutionContext) -> ArrowTableValue: # Extract join_keys, timestamp, and created_ts from context - # Dedup strategy: sort and drop_duplicates - dedup_keys = self.column_info.join_keys + # Dedup strategy: sort and drop_duplicates. Use the mapped join key + # names so we look up the columns that the source-read node has + # already renamed (see issue #5942). + dedup_keys = self.column_info.join_keys_columns if dedup_keys: sort_keys = [self.column_info.timestamp_column] if ( diff --git a/sdk/python/tests/unit/infra/compute_engines/local/test_nodes.py b/sdk/python/tests/unit/infra/compute_engines/local/test_nodes.py index 897211b374e..6d79000b979 100644 --- a/sdk/python/tests/unit/infra/compute_engines/local/test_nodes.py +++ b/sdk/python/tests/unit/infra/compute_engines/local/test_nodes.py @@ -186,6 +186,56 @@ def test_local_dedup_node(): assert set(df_result["entity_id"]) == {1, 2} +def test_local_dedup_node_with_field_mapping_on_join_key(): + """Regression test for materialization failure when a join key has a field mapping. + + The source-read node renames columns via field_mapping (e.g. ``USERID`` -> ``user_id``) + before passing the table to downstream nodes. Without mapping ``column_info.join_keys`` + the dedup node would look up the pre-mapping name and raise ``KeyError(['USERID'])``. + + See https://github.com/feast-dev/feast/issues/5942. + """ + # Simulate a source-read node output: columns already renamed to the mapped names. + df = pd.DataFrame( + { + "user_id": [1, 1, 2], + "value": [100, 200, 300], + "event_timestamp": [ + now - timedelta(seconds=1), + now, + now, + ], + } + ) + + context = create_context( + node_outputs={"source": ArrowTableValue(pa.Table.from_pandas(df))} + ) + + node = LocalDedupNode( + name="dedup", + backend=backend, + column_info=ColumnInfo( + # The raw join key matches the source column name; field_mapping maps + # it to the user-facing name that the source-read node has already + # renamed the column to. + join_keys=["USERID"], + feature_cols=["value"], + ts_col="EVENT_TIMESTAMP", + created_ts_col=None, + field_mapping={"USERID": "user_id", "EVENT_TIMESTAMP": "event_timestamp"}, + ), + ) + node.add_input(MagicMock()) + node.inputs[0].name = "source" + + result = node.execute(context) + + df_result = result.data.to_pandas() + assert df_result.shape[0] == 2 + assert set(df_result["user_id"]) == {1, 2} + + def test_local_transformation_node(): context = create_context( node_outputs={"source": ArrowTableValue(pa.Table.from_pandas(sample_df))} From 61e440f05e78e38ded075f6835dc1f7c48dda292 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 12 May 2026 02:01:56 -0700 Subject: [PATCH 2/2] test: also cover LocalJoinNode field_mapping case Signed-off-by: 1fanwang <1fannnw@gmail.com> --- .../infra/compute_engines/local/test_nodes.py | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/sdk/python/tests/unit/infra/compute_engines/local/test_nodes.py b/sdk/python/tests/unit/infra/compute_engines/local/test_nodes.py index 6d79000b979..872dd978c9e 100644 --- a/sdk/python/tests/unit/infra/compute_engines/local/test_nodes.py +++ b/sdk/python/tests/unit/infra/compute_engines/local/test_nodes.py @@ -236,6 +236,72 @@ def test_local_dedup_node_with_field_mapping_on_join_key(): assert set(df_result["user_id"]) == {1, 2} +def test_local_join_node_with_field_mapping_on_join_key(): + """Regression test for materialization failure when a join key has a field mapping. + + The source-read node renames columns via field_mapping (e.g. ``USERID`` -> ``user_id``) + before passing the table to downstream nodes. Without mapping ``column_info.join_keys`` + the join node would call ``backend.join(..., on=["USERID"], ...)`` and raise + ``KeyError(['USERID'])`` because the columns have already been renamed. + + See https://github.com/feast-dev/feast/issues/5942. + """ + # Simulate two source-read node outputs: columns already renamed to the mapped names. + left_df = pd.DataFrame( + { + "user_id": [1, 2], + "value": [10, 20], + "event_timestamp": [now, now], + } + ) + right_df = pd.DataFrame( + { + "user_id": [1, 2], + "other_value": [100, 200], + "event_timestamp": [now, now], + } + ) + + context = create_context( + node_outputs={ + "left": ArrowTableValue(pa.Table.from_pandas(left_df)), + "right": ArrowTableValue(pa.Table.from_pandas(right_df)), + } + ) + # Bypass the trailing entity_df join — this test exercises the input-table + # join path that consumed the raw (unmapped) join keys before the fix. + context.entity_df = None + + join_node = LocalJoinNode( + name="join", + backend=backend, + column_info=ColumnInfo( + # Raw join key matches the source column name; field_mapping maps it + # to the user-facing name that the source-read node has already + # renamed the column to. + join_keys=["USERID"], + feature_cols=["value", "other_value"], + ts_col="EVENT_TIMESTAMP", + created_ts_col=None, + field_mapping={"USERID": "user_id", "EVENT_TIMESTAMP": "event_timestamp"}, + ), + ) + left_input = MagicMock() + left_input.name = "left" + right_input = MagicMock() + right_input.name = "right" + join_node.add_input(left_input) + join_node.add_input(right_input) + + result = join_node.execute(context) + + df_result = result.data.to_pandas() + assert df_result.shape[0] == 2 + assert set(df_result["user_id"]) == {1, 2} + assert "value" in df_result.columns + assert "other_value" in df_result.columns + + def test_local_transformation_node(): context = create_context( node_outputs={"source": ArrowTableValue(pa.Table.from_pandas(sample_df))}