From bc97a4c4259bb7cfe2dc08961e5569783b151756 Mon Sep 17 00:00:00 2001 From: abhijeet-dhumal Date: Wed, 22 Apr 2026 20:45:08 +0530 Subject: [PATCH 1/5] fix(spark): Pre-create S3A event log dir before SparkContext init to prevent silent materialize failure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Spark's EventLogFileWriter.requireLogBaseDirAsDirectory() is called inside SparkContext.__init__. When spark.eventLog.dir points to an S3A path that doesn't exist yet (S3 has no real directories), SparkContext fails to initialise — silently from Feast's perspective because _materialize_one() catches the exception and returns an ERROR job. Add _ensure_s3a_event_log_dir() to utils.py: before building the SparkSession, check if the S3A prefix exists and write a zero-byte placeholder if it doesn't. Uses boto3 (already a Feast dep via S3 offline store). Non-fatal: logs a warning and lets Spark surface its own error if the write fails. Signed-off-by: abhijeet-dhumal --- .../infra/compute_engines/spark/utils.py | 68 +++++++++++++++++ .../tests/component/spark/test_compute.py | 74 ++++++++++++++++++- 2 files changed, 141 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/compute_engines/spark/utils.py b/sdk/python/feast/infra/compute_engines/spark/utils.py index 1234f895464..7951956288c 100644 --- a/sdk/python/feast/infra/compute_engines/spark/utils.py +++ b/sdk/python/feast/infra/compute_engines/spark/utils.py @@ -1,3 +1,5 @@ +import logging +import os from typing import Dict, Iterable, Literal, Optional import pandas as pd @@ -9,6 +11,68 @@ from feast.infra.common.serde import SerializedArtifacts from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping +logger = logging.getLogger(__name__) + + +def _ensure_s3a_event_log_dir(spark_config: Dict[str, str]) -> None: + """Pre-create the S3A event log prefix before SparkContext initialisation. + + Spark's EventLogFileWriter.requireLogBaseDirAsDirectory() is called inside + SparkContext.__init__ and crashes if the S3A path doesn't exist yet (S3 has no + real directories, so an empty prefix returns a 404). This function writes a + zero-byte placeholder so the prefix exists before SparkContext is built. + + This is only attempted when: + - spark.eventLog.enabled == "true" + - spark.eventLog.dir starts with "s3a://" + Failures are non-fatal: Spark will surface its own error if the dir is still missing. + """ + if spark_config.get("spark.eventLog.enabled", "false").lower() != "true": + return + event_dir = spark_config.get("spark.eventLog.dir", "") + if not event_dir.startswith("s3a://"): + return + + path = event_dir[len("s3a://") :] + bucket, _, prefix = path.partition("/") + prefix = prefix.rstrip("/") + "/" + placeholder_key = prefix + ".keep" + + endpoint = spark_config.get( + "spark.hadoop.fs.s3a.endpoint", + os.environ.get("FEAST_S3A_ENDPOINT", ""), + ) + access_key = os.environ.get("AWS_ACCESS_KEY_ID", "") + secret_key = os.environ.get("AWS_SECRET_ACCESS_KEY", "") + + try: + import boto3 + from botocore.client import Config + + s3 = boto3.client( + "s3", + endpoint_url=endpoint if endpoint else None, + aws_access_key_id=access_key or None, + aws_secret_access_key=secret_key or None, + config=Config(signature_version="s3v4"), + ) + resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix, MaxKeys=1) + if resp.get("KeyCount", 0) == 0: + s3.put_object(Bucket=bucket, Key=placeholder_key, Body=b"") + logger.debug( + "Created S3A event log dir placeholder: s3a://%s/%s", + bucket, + placeholder_key, + ) + except Exception as exc: + logger.warning( + "Could not pre-create S3A event log dir s3a://%s/%s — " + "SparkContext may fail if the path still doesn't exist: %s", + bucket, + prefix, + exc, + ) + def get_or_create_new_spark_session( spark_config: Optional[Dict[str, str]] = None, @@ -17,6 +81,10 @@ def get_or_create_new_spark_session( if not spark_session: spark_builder = SparkSession.builder if spark_config: + # Spark's EventLogFileWriter.requireLogBaseDirAsDirectory() is called + # during SparkContext.__init__ and will crash if the S3A event log + # prefix doesn't exist yet. Ensure the prefix exists first. + _ensure_s3a_event_log_dir(spark_config) spark_builder = spark_builder.config( conf=SparkConf().setAll([(k, v) for k, v in spark_config.items()]) ) diff --git a/sdk/python/tests/component/spark/test_compute.py b/sdk/python/tests/component/spark/test_compute.py index 803cd505513..90eed0dde03 100644 --- a/sdk/python/tests/component/spark/test_compute.py +++ b/sdk/python/tests/component/spark/test_compute.py @@ -1,6 +1,6 @@ from datetime import timedelta from typing import cast -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pytest from pyspark.sql import DataFrame @@ -15,6 +15,7 @@ from feast.infra.common.retrieval_task import HistoricalRetrievalTask from feast.infra.compute_engines.spark.compute import SparkComputeEngine from feast.infra.compute_engines.spark.job import SparkDAGRetrievalJob +from feast.infra.compute_engines.spark.utils import _ensure_s3a_event_log_dir from feast.infra.offline_stores.contrib.spark_offline_store.spark import ( SparkOfflineStore, ) @@ -192,5 +193,76 @@ def tqdm_builder(length): spark_environment.teardown() +# --------------------------------------------------------------------------- +# Unit tests for _ensure_s3a_event_log_dir — no Spark dependency needed +# --------------------------------------------------------------------------- + + +def _base_conf(event_log_dir: str) -> dict: + return { + "spark.eventLog.enabled": "true", + "spark.eventLog.dir": event_log_dir, + "spark.hadoop.fs.s3a.endpoint": "http://minio:9000", + } + + +@patch("feast.infra.compute_engines.spark.utils.boto3") +def test_ensure_s3a_event_log_dir_creates_placeholder_when_empty(mock_boto3): + """S3A prefix doesn't exist → placeholder object is written.""" + s3 = MagicMock() + mock_boto3.client.return_value = s3 + s3.list_objects_v2.return_value = {"KeyCount": 0} + + _ensure_s3a_event_log_dir(_base_conf("s3a://my-bucket/spark-events/")) + + s3.list_objects_v2.assert_called_once_with( + Bucket="my-bucket", Prefix="spark-events/", MaxKeys=1 + ) + s3.put_object.assert_called_once_with( + Bucket="my-bucket", Key="spark-events/.keep", Body=b"" + ) + + +@patch("feast.infra.compute_engines.spark.utils.boto3") +def test_ensure_s3a_event_log_dir_skips_when_prefix_exists(mock_boto3): + """S3A prefix already has objects → no placeholder written.""" + s3 = MagicMock() + mock_boto3.client.return_value = s3 + s3.list_objects_v2.return_value = {"KeyCount": 3} + + _ensure_s3a_event_log_dir(_base_conf("s3a://my-bucket/spark-events/")) + + s3.put_object.assert_not_called() + + +@patch("feast.infra.compute_engines.spark.utils.boto3") +def test_ensure_s3a_event_log_dir_noop_when_event_log_disabled(mock_boto3): + """spark.eventLog.enabled != true → boto3 never called.""" + _ensure_s3a_event_log_dir( + {"spark.eventLog.enabled": "false", "spark.eventLog.dir": "s3a://b/p/"} + ) + mock_boto3.client.assert_not_called() + + +@patch("feast.infra.compute_engines.spark.utils.boto3") +def test_ensure_s3a_event_log_dir_noop_for_non_s3a_path(mock_boto3): + """Non-S3A paths (hdfs://, file://, etc.) are left untouched.""" + _ensure_s3a_event_log_dir( + {"spark.eventLog.enabled": "true", "spark.eventLog.dir": "hdfs:///spark-logs"} + ) + mock_boto3.client.assert_not_called() + + +@patch("feast.infra.compute_engines.spark.utils.boto3") +def test_ensure_s3a_event_log_dir_non_fatal_on_s3_error(mock_boto3): + """boto3 errors are swallowed — SparkContext will surface its own error.""" + s3 = MagicMock() + mock_boto3.client.return_value = s3 + s3.list_objects_v2.side_effect = Exception("connection refused") + + # Must not raise + _ensure_s3a_event_log_dir(_base_conf("s3a://my-bucket/spark-events/")) + + if __name__ == "__main__": test_spark_compute_engine_get_historical_features() From c58657720119c0d75e16fbb03d9cdec18f87404a Mon Sep 17 00:00:00 2001 From: abhijeet-dhumal Date: Fri, 24 Apr 2026 13:44:22 +0530 Subject: [PATCH 2/5] fix(spark): handle bucket-root S3A paths, read credentials from spark config, add session token support Signed-off-by: abhijeet-dhumal --- .../infra/compute_engines/spark/utils.py | 34 ++- .../tests/component/spark/test_compute.py | 72 ------- .../tests/component/spark/test_spark_utils.py | 200 ++++++++++++++++++ 3 files changed, 225 insertions(+), 81 deletions(-) create mode 100644 sdk/python/tests/component/spark/test_spark_utils.py diff --git a/sdk/python/feast/infra/compute_engines/spark/utils.py b/sdk/python/feast/infra/compute_engines/spark/utils.py index 7951956288c..60a862be99f 100644 --- a/sdk/python/feast/infra/compute_engines/spark/utils.py +++ b/sdk/python/feast/infra/compute_engines/spark/utils.py @@ -11,6 +11,13 @@ from feast.infra.common.serde import SerializedArtifacts from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping +try: + import boto3 + from botocore.client import Config as BotoConfig +except ImportError: + boto3 = None # type: ignore[assignment] + BotoConfig = None # type: ignore[assignment,misc] + logger = logging.getLogger(__name__) @@ -35,26 +42,38 @@ def _ensure_s3a_event_log_dir(spark_config: Dict[str, str]) -> None: path = event_dir[len("s3a://") :] bucket, _, prefix = path.partition("/") - prefix = prefix.rstrip("/") + "/" + prefix = prefix.rstrip("/") + prefix = (prefix + "/") if prefix else prefix placeholder_key = prefix + ".keep" endpoint = spark_config.get( "spark.hadoop.fs.s3a.endpoint", os.environ.get("FEAST_S3A_ENDPOINT", ""), ) - access_key = os.environ.get("AWS_ACCESS_KEY_ID", "") - secret_key = os.environ.get("AWS_SECRET_ACCESS_KEY", "") + access_key = spark_config.get( + "spark.hadoop.fs.s3a.access.key", + os.environ.get("AWS_ACCESS_KEY_ID", ""), + ) + secret_key = spark_config.get( + "spark.hadoop.fs.s3a.secret.key", + os.environ.get("AWS_SECRET_ACCESS_KEY", ""), + ) + session_token = spark_config.get( + "spark.hadoop.fs.s3a.session.token", + os.environ.get("AWS_SESSION_TOKEN", ""), + ) or None try: - import boto3 - from botocore.client import Config + if boto3 is None: + raise ImportError("boto3 is not installed") s3 = boto3.client( "s3", endpoint_url=endpoint if endpoint else None, aws_access_key_id=access_key or None, aws_secret_access_key=secret_key or None, - config=Config(signature_version="s3v4"), + aws_session_token=session_token, + config=BotoConfig(signature_version="s3v4"), ) resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix, MaxKeys=1) if resp.get("KeyCount", 0) == 0: @@ -81,9 +100,6 @@ def get_or_create_new_spark_session( if not spark_session: spark_builder = SparkSession.builder if spark_config: - # Spark's EventLogFileWriter.requireLogBaseDirAsDirectory() is called - # during SparkContext.__init__ and will crash if the S3A event log - # prefix doesn't exist yet. Ensure the prefix exists first. _ensure_s3a_event_log_dir(spark_config) spark_builder = spark_builder.config( conf=SparkConf().setAll([(k, v) for k, v in spark_config.items()]) diff --git a/sdk/python/tests/component/spark/test_compute.py b/sdk/python/tests/component/spark/test_compute.py index 90eed0dde03..4085d1ff737 100644 --- a/sdk/python/tests/component/spark/test_compute.py +++ b/sdk/python/tests/component/spark/test_compute.py @@ -15,7 +15,6 @@ from feast.infra.common.retrieval_task import HistoricalRetrievalTask from feast.infra.compute_engines.spark.compute import SparkComputeEngine from feast.infra.compute_engines.spark.job import SparkDAGRetrievalJob -from feast.infra.compute_engines.spark.utils import _ensure_s3a_event_log_dir from feast.infra.offline_stores.contrib.spark_offline_store.spark import ( SparkOfflineStore, ) @@ -193,76 +192,5 @@ def tqdm_builder(length): spark_environment.teardown() -# --------------------------------------------------------------------------- -# Unit tests for _ensure_s3a_event_log_dir — no Spark dependency needed -# --------------------------------------------------------------------------- - - -def _base_conf(event_log_dir: str) -> dict: - return { - "spark.eventLog.enabled": "true", - "spark.eventLog.dir": event_log_dir, - "spark.hadoop.fs.s3a.endpoint": "http://minio:9000", - } - - -@patch("feast.infra.compute_engines.spark.utils.boto3") -def test_ensure_s3a_event_log_dir_creates_placeholder_when_empty(mock_boto3): - """S3A prefix doesn't exist → placeholder object is written.""" - s3 = MagicMock() - mock_boto3.client.return_value = s3 - s3.list_objects_v2.return_value = {"KeyCount": 0} - - _ensure_s3a_event_log_dir(_base_conf("s3a://my-bucket/spark-events/")) - - s3.list_objects_v2.assert_called_once_with( - Bucket="my-bucket", Prefix="spark-events/", MaxKeys=1 - ) - s3.put_object.assert_called_once_with( - Bucket="my-bucket", Key="spark-events/.keep", Body=b"" - ) - - -@patch("feast.infra.compute_engines.spark.utils.boto3") -def test_ensure_s3a_event_log_dir_skips_when_prefix_exists(mock_boto3): - """S3A prefix already has objects → no placeholder written.""" - s3 = MagicMock() - mock_boto3.client.return_value = s3 - s3.list_objects_v2.return_value = {"KeyCount": 3} - - _ensure_s3a_event_log_dir(_base_conf("s3a://my-bucket/spark-events/")) - - s3.put_object.assert_not_called() - - -@patch("feast.infra.compute_engines.spark.utils.boto3") -def test_ensure_s3a_event_log_dir_noop_when_event_log_disabled(mock_boto3): - """spark.eventLog.enabled != true → boto3 never called.""" - _ensure_s3a_event_log_dir( - {"spark.eventLog.enabled": "false", "spark.eventLog.dir": "s3a://b/p/"} - ) - mock_boto3.client.assert_not_called() - - -@patch("feast.infra.compute_engines.spark.utils.boto3") -def test_ensure_s3a_event_log_dir_noop_for_non_s3a_path(mock_boto3): - """Non-S3A paths (hdfs://, file://, etc.) are left untouched.""" - _ensure_s3a_event_log_dir( - {"spark.eventLog.enabled": "true", "spark.eventLog.dir": "hdfs:///spark-logs"} - ) - mock_boto3.client.assert_not_called() - - -@patch("feast.infra.compute_engines.spark.utils.boto3") -def test_ensure_s3a_event_log_dir_non_fatal_on_s3_error(mock_boto3): - """boto3 errors are swallowed — SparkContext will surface its own error.""" - s3 = MagicMock() - mock_boto3.client.return_value = s3 - s3.list_objects_v2.side_effect = Exception("connection refused") - - # Must not raise - _ensure_s3a_event_log_dir(_base_conf("s3a://my-bucket/spark-events/")) - - if __name__ == "__main__": test_spark_compute_engine_get_historical_features() diff --git a/sdk/python/tests/component/spark/test_spark_utils.py b/sdk/python/tests/component/spark/test_spark_utils.py new file mode 100644 index 00000000000..75f3cc62344 --- /dev/null +++ b/sdk/python/tests/component/spark/test_spark_utils.py @@ -0,0 +1,200 @@ +from unittest.mock import MagicMock, patch + +from feast.infra.compute_engines.spark.utils import _ensure_s3a_event_log_dir + +BOTO3_PATH = "feast.infra.compute_engines.spark.utils.boto3" +BOTOCONFIG_PATH = "feast.infra.compute_engines.spark.utils.BotoConfig" + + +def _base_conf(event_log_dir: str) -> dict: + return { + "spark.eventLog.enabled": "true", + "spark.eventLog.dir": event_log_dir, + "spark.hadoop.fs.s3a.endpoint": "http://minio:9000", + } + + +@patch(BOTOCONFIG_PATH, MagicMock()) +@patch(BOTO3_PATH) +def test_ensure_s3a_event_log_dir_creates_placeholder_when_empty(mock_boto3): + """S3A prefix doesn't exist -> placeholder object is written.""" + s3 = MagicMock() + mock_boto3.client.return_value = s3 + s3.list_objects_v2.return_value = {"KeyCount": 0} + + _ensure_s3a_event_log_dir(_base_conf("s3a://my-bucket/spark-events/")) + + s3.list_objects_v2.assert_called_once_with( + Bucket="my-bucket", Prefix="spark-events/", MaxKeys=1 + ) + s3.put_object.assert_called_once_with( + Bucket="my-bucket", Key="spark-events/.keep", Body=b"" + ) + + +@patch(BOTOCONFIG_PATH, MagicMock()) +@patch(BOTO3_PATH) +def test_ensure_s3a_event_log_dir_skips_when_prefix_exists(mock_boto3): + """S3A prefix already has objects -> no placeholder written.""" + s3 = MagicMock() + mock_boto3.client.return_value = s3 + s3.list_objects_v2.return_value = {"KeyCount": 3} + + _ensure_s3a_event_log_dir(_base_conf("s3a://my-bucket/spark-events/")) + + s3.put_object.assert_not_called() + + +@patch(BOTOCONFIG_PATH, MagicMock()) +@patch(BOTO3_PATH) +def test_ensure_s3a_event_log_dir_noop_when_event_log_disabled(mock_boto3): + """spark.eventLog.enabled != true -> boto3 never called.""" + _ensure_s3a_event_log_dir( + {"spark.eventLog.enabled": "false", "spark.eventLog.dir": "s3a://b/p/"} + ) + mock_boto3.client.assert_not_called() + + +@patch(BOTOCONFIG_PATH, MagicMock()) +@patch(BOTO3_PATH) +def test_ensure_s3a_event_log_dir_noop_for_non_s3a_path(mock_boto3): + """Non-S3A paths (hdfs://, file://, etc.) are left untouched.""" + _ensure_s3a_event_log_dir( + {"spark.eventLog.enabled": "true", "spark.eventLog.dir": "hdfs:///spark-logs"} + ) + mock_boto3.client.assert_not_called() + + +@patch(BOTOCONFIG_PATH, MagicMock()) +@patch(BOTO3_PATH) +def test_ensure_s3a_event_log_dir_non_fatal_on_s3_error(mock_boto3): + """boto3 errors are swallowed -> SparkContext will surface its own error.""" + s3 = MagicMock() + mock_boto3.client.return_value = s3 + s3.list_objects_v2.side_effect = Exception("connection refused") + + _ensure_s3a_event_log_dir(_base_conf("s3a://my-bucket/spark-events/")) + + +# --------------------------------------------------------------------------- +# Bucket-root edge cases (s3a://bucket, s3a://bucket/) +# --------------------------------------------------------------------------- + + +@patch(BOTOCONFIG_PATH, MagicMock()) +@patch(BOTO3_PATH) +def test_ensure_s3a_event_log_dir_bucket_root_no_trailing_slash(mock_boto3): + """s3a://bucket (no path) -> .keep at bucket root, not /.keep.""" + s3 = MagicMock() + mock_boto3.client.return_value = s3 + s3.list_objects_v2.return_value = {"KeyCount": 0} + + _ensure_s3a_event_log_dir(_base_conf("s3a://my-bucket")) + + s3.list_objects_v2.assert_called_once_with( + Bucket="my-bucket", Prefix="", MaxKeys=1 + ) + s3.put_object.assert_called_once_with( + Bucket="my-bucket", Key=".keep", Body=b"" + ) + + +@patch(BOTOCONFIG_PATH, MagicMock()) +@patch(BOTO3_PATH) +def test_ensure_s3a_event_log_dir_bucket_root_trailing_slash(mock_boto3): + """s3a://bucket/ (trailing slash, empty prefix) -> .keep at bucket root.""" + s3 = MagicMock() + mock_boto3.client.return_value = s3 + s3.list_objects_v2.return_value = {"KeyCount": 0} + + _ensure_s3a_event_log_dir(_base_conf("s3a://my-bucket/")) + + s3.list_objects_v2.assert_called_once_with( + Bucket="my-bucket", Prefix="", MaxKeys=1 + ) + s3.put_object.assert_called_once_with( + Bucket="my-bucket", Key=".keep", Body=b"" + ) + + +# --------------------------------------------------------------------------- +# Credentials from spark config / env var fallback +# --------------------------------------------------------------------------- + + +@patch.dict( + "os.environ", + { + "AWS_ACCESS_KEY_ID": "env-ak", + "AWS_SECRET_ACCESS_KEY": "env-sk", + "AWS_SESSION_TOKEN": "env-st", + }, +) +@patch(BOTOCONFIG_PATH, MagicMock()) +@patch(BOTO3_PATH) +def test_ensure_s3a_event_log_dir_uses_spark_config_credentials(mock_boto3): + """Credentials in spark config take precedence over env vars.""" + s3 = MagicMock() + mock_boto3.client.return_value = s3 + s3.list_objects_v2.return_value = {"KeyCount": 1} + + conf = { + **_base_conf("s3a://my-bucket/logs/"), + "spark.hadoop.fs.s3a.access.key": "spark-ak", + "spark.hadoop.fs.s3a.secret.key": "spark-sk", + "spark.hadoop.fs.s3a.session.token": "spark-st", + } + _ensure_s3a_event_log_dir(conf) + + mock_boto3.client.assert_called_once() + kwargs = mock_boto3.client.call_args + assert kwargs.kwargs["aws_access_key_id"] == "spark-ak" + assert kwargs.kwargs["aws_secret_access_key"] == "spark-sk" + assert kwargs.kwargs["aws_session_token"] == "spark-st" + + +@patch.dict( + "os.environ", + { + "AWS_ACCESS_KEY_ID": "env-ak", + "AWS_SECRET_ACCESS_KEY": "env-sk", + "AWS_SESSION_TOKEN": "env-st", + }, +) +@patch(BOTOCONFIG_PATH, MagicMock()) +@patch(BOTO3_PATH) +def test_ensure_s3a_event_log_dir_falls_back_to_env_credentials(mock_boto3): + """Without spark config keys, env vars are used.""" + s3 = MagicMock() + mock_boto3.client.return_value = s3 + s3.list_objects_v2.return_value = {"KeyCount": 1} + + _ensure_s3a_event_log_dir(_base_conf("s3a://my-bucket/logs/")) + + mock_boto3.client.assert_called_once() + kwargs = mock_boto3.client.call_args + assert kwargs.kwargs["aws_access_key_id"] == "env-ak" + assert kwargs.kwargs["aws_secret_access_key"] == "env-sk" + assert kwargs.kwargs["aws_session_token"] == "env-st" + + +@patch.dict("os.environ", {}, clear=True) +@patch(BOTOCONFIG_PATH, MagicMock()) +@patch(BOTO3_PATH) +def test_ensure_s3a_event_log_dir_no_credentials_passes_none(mock_boto3): + """No credentials anywhere -> None passed to boto3 (anonymous / instance role).""" + s3 = MagicMock() + mock_boto3.client.return_value = s3 + s3.list_objects_v2.return_value = {"KeyCount": 1} + + conf = { + "spark.eventLog.enabled": "true", + "spark.eventLog.dir": "s3a://my-bucket/logs/", + } + _ensure_s3a_event_log_dir(conf) + + mock_boto3.client.assert_called_once() + kwargs = mock_boto3.client.call_args + assert kwargs.kwargs["aws_access_key_id"] is None + assert kwargs.kwargs["aws_secret_access_key"] is None + assert kwargs.kwargs["aws_session_token"] is None From c7b74dbf4050c0540f21daa11e7676ec359167b0 Mon Sep 17 00:00:00 2001 From: abhijeet-dhumal Date: Mon, 27 Apr 2026 13:47:29 +0530 Subject: [PATCH 3/5] fix(spark): use AWS_ENDPOINT_URL, support path-style addressing, fix linting Signed-off-by: abhijeet-dhumal --- .../infra/compute_engines/spark/utils.py | 27 +++-- .../tests/component/spark/test_compute.py | 2 +- .../tests/component/spark/test_spark_utils.py | 98 ++++++++++++++++--- 3 files changed, 108 insertions(+), 19 deletions(-) diff --git a/sdk/python/feast/infra/compute_engines/spark/utils.py b/sdk/python/feast/infra/compute_engines/spark/utils.py index 60a862be99f..8c84c9f17a6 100644 --- a/sdk/python/feast/infra/compute_engines/spark/utils.py +++ b/sdk/python/feast/infra/compute_engines/spark/utils.py @@ -48,7 +48,7 @@ def _ensure_s3a_event_log_dir(spark_config: Dict[str, str]) -> None: endpoint = spark_config.get( "spark.hadoop.fs.s3a.endpoint", - os.environ.get("FEAST_S3A_ENDPOINT", ""), + os.environ.get("AWS_ENDPOINT_URL", ""), ) access_key = spark_config.get( "spark.hadoop.fs.s3a.access.key", @@ -58,22 +58,37 @@ def _ensure_s3a_event_log_dir(spark_config: Dict[str, str]) -> None: "spark.hadoop.fs.s3a.secret.key", os.environ.get("AWS_SECRET_ACCESS_KEY", ""), ) - session_token = spark_config.get( - "spark.hadoop.fs.s3a.session.token", - os.environ.get("AWS_SESSION_TOKEN", ""), - ) or None + session_token = ( + spark_config.get( + "spark.hadoop.fs.s3a.session.token", + os.environ.get("AWS_SESSION_TOKEN", ""), + ) + or None + ) try: if boto3 is None: raise ImportError("boto3 is not installed") + addressing_style = ( + "path" + if spark_config.get( + "spark.hadoop.fs.s3a.path.style.access", "false" + ).lower() + == "true" + else "auto" + ) + s3 = boto3.client( "s3", endpoint_url=endpoint if endpoint else None, aws_access_key_id=access_key or None, aws_secret_access_key=secret_key or None, aws_session_token=session_token, - config=BotoConfig(signature_version="s3v4"), + config=BotoConfig( + signature_version="s3v4", + s3={"addressing_style": addressing_style}, + ), ) resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix, MaxKeys=1) if resp.get("KeyCount", 0) == 0: diff --git a/sdk/python/tests/component/spark/test_compute.py b/sdk/python/tests/component/spark/test_compute.py index 4085d1ff737..803cd505513 100644 --- a/sdk/python/tests/component/spark/test_compute.py +++ b/sdk/python/tests/component/spark/test_compute.py @@ -1,6 +1,6 @@ from datetime import timedelta from typing import cast -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock import pytest from pyspark.sql import DataFrame diff --git a/sdk/python/tests/component/spark/test_spark_utils.py b/sdk/python/tests/component/spark/test_spark_utils.py index 75f3cc62344..8650c62f011 100644 --- a/sdk/python/tests/component/spark/test_spark_utils.py +++ b/sdk/python/tests/component/spark/test_spark_utils.py @@ -91,12 +91,8 @@ def test_ensure_s3a_event_log_dir_bucket_root_no_trailing_slash(mock_boto3): _ensure_s3a_event_log_dir(_base_conf("s3a://my-bucket")) - s3.list_objects_v2.assert_called_once_with( - Bucket="my-bucket", Prefix="", MaxKeys=1 - ) - s3.put_object.assert_called_once_with( - Bucket="my-bucket", Key=".keep", Body=b"" - ) + s3.list_objects_v2.assert_called_once_with(Bucket="my-bucket", Prefix="", MaxKeys=1) + s3.put_object.assert_called_once_with(Bucket="my-bucket", Key=".keep", Body=b"") @patch(BOTOCONFIG_PATH, MagicMock()) @@ -109,12 +105,8 @@ def test_ensure_s3a_event_log_dir_bucket_root_trailing_slash(mock_boto3): _ensure_s3a_event_log_dir(_base_conf("s3a://my-bucket/")) - s3.list_objects_v2.assert_called_once_with( - Bucket="my-bucket", Prefix="", MaxKeys=1 - ) - s3.put_object.assert_called_once_with( - Bucket="my-bucket", Key=".keep", Body=b"" - ) + s3.list_objects_v2.assert_called_once_with(Bucket="my-bucket", Prefix="", MaxKeys=1) + s3.put_object.assert_called_once_with(Bucket="my-bucket", Key=".keep", Body=b"") # --------------------------------------------------------------------------- @@ -198,3 +190,85 @@ def test_ensure_s3a_event_log_dir_no_credentials_passes_none(mock_boto3): assert kwargs.kwargs["aws_access_key_id"] is None assert kwargs.kwargs["aws_secret_access_key"] is None assert kwargs.kwargs["aws_session_token"] is None + + +# --------------------------------------------------------------------------- +# Path-style addressing (MinIO / S3-compatible) +# --------------------------------------------------------------------------- + + +@patch(BOTOCONFIG_PATH) +@patch(BOTO3_PATH) +def test_ensure_s3a_event_log_dir_path_style_when_enabled(mock_boto3, mock_config_cls): + """spark.hadoop.fs.s3a.path.style.access=true -> addressing_style='path'.""" + s3 = MagicMock() + mock_boto3.client.return_value = s3 + s3.list_objects_v2.return_value = {"KeyCount": 1} + + conf = { + **_base_conf("s3a://my-bucket/logs/"), + "spark.hadoop.fs.s3a.path.style.access": "true", + } + _ensure_s3a_event_log_dir(conf) + + mock_config_cls.assert_called_once() + config_kwargs = mock_config_cls.call_args + assert config_kwargs.kwargs["s3"] == {"addressing_style": "path"} + + +@patch(BOTOCONFIG_PATH) +@patch(BOTO3_PATH) +def test_ensure_s3a_event_log_dir_virtual_hosted_style_by_default( + mock_boto3, mock_config_cls +): + """No path.style.access config -> addressing_style='auto'.""" + s3 = MagicMock() + mock_boto3.client.return_value = s3 + s3.list_objects_v2.return_value = {"KeyCount": 1} + + _ensure_s3a_event_log_dir(_base_conf("s3a://my-bucket/logs/")) + + mock_config_cls.assert_called_once() + config_kwargs = mock_config_cls.call_args + assert config_kwargs.kwargs["s3"] == {"addressing_style": "auto"} + + +# --------------------------------------------------------------------------- +# Endpoint env var fallback (AWS_ENDPOINT_URL) +# --------------------------------------------------------------------------- + + +@patch.dict("os.environ", {"AWS_ENDPOINT_URL": "http://localhost:9000"}, clear=True) +@patch(BOTOCONFIG_PATH, MagicMock()) +@patch(BOTO3_PATH) +def test_ensure_s3a_event_log_dir_endpoint_from_env(mock_boto3): + """AWS_ENDPOINT_URL env var is used when spark config has no endpoint.""" + s3 = MagicMock() + mock_boto3.client.return_value = s3 + s3.list_objects_v2.return_value = {"KeyCount": 1} + + conf = { + "spark.eventLog.enabled": "true", + "spark.eventLog.dir": "s3a://my-bucket/logs/", + } + _ensure_s3a_event_log_dir(conf) + + mock_boto3.client.assert_called_once() + kwargs = mock_boto3.client.call_args + assert kwargs.kwargs["endpoint_url"] == "http://localhost:9000" + + +@patch.dict("os.environ", {"AWS_ENDPOINT_URL": "http://env-endpoint:9000"}, clear=True) +@patch(BOTOCONFIG_PATH, MagicMock()) +@patch(BOTO3_PATH) +def test_ensure_s3a_event_log_dir_spark_endpoint_over_env(mock_boto3): + """spark.hadoop.fs.s3a.endpoint takes precedence over AWS_ENDPOINT_URL.""" + s3 = MagicMock() + mock_boto3.client.return_value = s3 + s3.list_objects_v2.return_value = {"KeyCount": 1} + + _ensure_s3a_event_log_dir(_base_conf("s3a://my-bucket/logs/")) + + mock_boto3.client.assert_called_once() + kwargs = mock_boto3.client.call_args + assert kwargs.kwargs["endpoint_url"] == "http://minio:9000" From 46094c415bcd1148533acc1e9fa328b388fcfa9a Mon Sep 17 00:00:00 2001 From: abhijeet-dhumal Date: Mon, 27 Apr 2026 13:58:58 +0530 Subject: [PATCH 4/5] fix: allow sample aws secret and fix linting issue Signed-off-by: abhijeet-dhumal --- sdk/python/tests/component/spark/test_spark_utils.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/python/tests/component/spark/test_spark_utils.py b/sdk/python/tests/component/spark/test_spark_utils.py index 8650c62f011..2ff77a7268e 100644 --- a/sdk/python/tests/component/spark/test_spark_utils.py +++ b/sdk/python/tests/component/spark/test_spark_utils.py @@ -118,7 +118,7 @@ def test_ensure_s3a_event_log_dir_bucket_root_trailing_slash(mock_boto3): "os.environ", { "AWS_ACCESS_KEY_ID": "env-ak", - "AWS_SECRET_ACCESS_KEY": "env-sk", + "AWS_SECRET_ACCESS_KEY": "env-sk", # pragma: allowlist secret "AWS_SESSION_TOKEN": "env-st", }, ) @@ -133,7 +133,7 @@ def test_ensure_s3a_event_log_dir_uses_spark_config_credentials(mock_boto3): conf = { **_base_conf("s3a://my-bucket/logs/"), "spark.hadoop.fs.s3a.access.key": "spark-ak", - "spark.hadoop.fs.s3a.secret.key": "spark-sk", + "spark.hadoop.fs.s3a.secret.key": "spark-sk", # pragma: allowlist secret "spark.hadoop.fs.s3a.session.token": "spark-st", } _ensure_s3a_event_log_dir(conf) @@ -141,7 +141,7 @@ def test_ensure_s3a_event_log_dir_uses_spark_config_credentials(mock_boto3): mock_boto3.client.assert_called_once() kwargs = mock_boto3.client.call_args assert kwargs.kwargs["aws_access_key_id"] == "spark-ak" - assert kwargs.kwargs["aws_secret_access_key"] == "spark-sk" + assert kwargs.kwargs["aws_secret_access_key"] == "spark-sk" # pragma: allowlist secret assert kwargs.kwargs["aws_session_token"] == "spark-st" @@ -149,7 +149,7 @@ def test_ensure_s3a_event_log_dir_uses_spark_config_credentials(mock_boto3): "os.environ", { "AWS_ACCESS_KEY_ID": "env-ak", - "AWS_SECRET_ACCESS_KEY": "env-sk", + "AWS_SECRET_ACCESS_KEY": "env-sk", # pragma: allowlist secret "AWS_SESSION_TOKEN": "env-st", }, ) @@ -166,7 +166,7 @@ def test_ensure_s3a_event_log_dir_falls_back_to_env_credentials(mock_boto3): mock_boto3.client.assert_called_once() kwargs = mock_boto3.client.call_args assert kwargs.kwargs["aws_access_key_id"] == "env-ak" - assert kwargs.kwargs["aws_secret_access_key"] == "env-sk" + assert kwargs.kwargs["aws_secret_access_key"] == "env-sk" # pragma: allowlist secret assert kwargs.kwargs["aws_session_token"] == "env-st" From 70215e2b675e76c4d2c47e41fd9c3db04ae71bb9 Mon Sep 17 00:00:00 2001 From: abhijeet-dhumal Date: Mon, 27 Apr 2026 14:56:32 +0530 Subject: [PATCH 5/5] fix: fix linting issue Signed-off-by: abhijeet-dhumal --- .../tests/component/spark/test_spark_utils.py | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/sdk/python/tests/component/spark/test_spark_utils.py b/sdk/python/tests/component/spark/test_spark_utils.py index 2ff77a7268e..84a680e7a25 100644 --- a/sdk/python/tests/component/spark/test_spark_utils.py +++ b/sdk/python/tests/component/spark/test_spark_utils.py @@ -139,10 +139,10 @@ def test_ensure_s3a_event_log_dir_uses_spark_config_credentials(mock_boto3): _ensure_s3a_event_log_dir(conf) mock_boto3.client.assert_called_once() - kwargs = mock_boto3.client.call_args - assert kwargs.kwargs["aws_access_key_id"] == "spark-ak" - assert kwargs.kwargs["aws_secret_access_key"] == "spark-sk" # pragma: allowlist secret - assert kwargs.kwargs["aws_session_token"] == "spark-st" + kw = mock_boto3.client.call_args.kwargs + assert kw["aws_access_key_id"] == "spark-ak" + assert kw["aws_secret_access_key"] == "spark-sk" # pragma: allowlist secret + assert kw["aws_session_token"] == "spark-st" @patch.dict( @@ -164,10 +164,10 @@ def test_ensure_s3a_event_log_dir_falls_back_to_env_credentials(mock_boto3): _ensure_s3a_event_log_dir(_base_conf("s3a://my-bucket/logs/")) mock_boto3.client.assert_called_once() - kwargs = mock_boto3.client.call_args - assert kwargs.kwargs["aws_access_key_id"] == "env-ak" - assert kwargs.kwargs["aws_secret_access_key"] == "env-sk" # pragma: allowlist secret - assert kwargs.kwargs["aws_session_token"] == "env-st" + kw = mock_boto3.client.call_args.kwargs + assert kw["aws_access_key_id"] == "env-ak" + assert kw["aws_secret_access_key"] == "env-sk" # pragma: allowlist secret + assert kw["aws_session_token"] == "env-st" @patch.dict("os.environ", {}, clear=True) @@ -186,10 +186,10 @@ def test_ensure_s3a_event_log_dir_no_credentials_passes_none(mock_boto3): _ensure_s3a_event_log_dir(conf) mock_boto3.client.assert_called_once() - kwargs = mock_boto3.client.call_args - assert kwargs.kwargs["aws_access_key_id"] is None - assert kwargs.kwargs["aws_secret_access_key"] is None - assert kwargs.kwargs["aws_session_token"] is None + kw = mock_boto3.client.call_args.kwargs + assert kw["aws_access_key_id"] is None + assert kw["aws_secret_access_key"] is None + assert kw["aws_session_token"] is None # --------------------------------------------------------------------------- @@ -254,8 +254,8 @@ def test_ensure_s3a_event_log_dir_endpoint_from_env(mock_boto3): _ensure_s3a_event_log_dir(conf) mock_boto3.client.assert_called_once() - kwargs = mock_boto3.client.call_args - assert kwargs.kwargs["endpoint_url"] == "http://localhost:9000" + kw = mock_boto3.client.call_args.kwargs + assert kw["endpoint_url"] == "http://localhost:9000" @patch.dict("os.environ", {"AWS_ENDPOINT_URL": "http://env-endpoint:9000"}, clear=True) @@ -270,5 +270,5 @@ def test_ensure_s3a_event_log_dir_spark_endpoint_over_env(mock_boto3): _ensure_s3a_event_log_dir(_base_conf("s3a://my-bucket/logs/")) mock_boto3.client.assert_called_once() - kwargs = mock_boto3.client.call_args - assert kwargs.kwargs["endpoint_url"] == "http://minio:9000" + kw = mock_boto3.client.call_args.kwargs + assert kw["endpoint_url"] == "http://minio:9000"