feat: Feast-MLflow Integration#6235
Conversation
| with tempfile.TemporaryDirectory() as tmp_dir: | ||
| path = os.path.join(tmp_dir, "entity_df.parquet") | ||
| entity_df.to_parquet(path, index=False) | ||
| mlflow.log_artifact(path) |
There was a problem hiding this comment.
🟡 Mixing global mlflow.log_artifact() with explicit-URI client causes artifact/metadata to target different servers
In _auto_log_entity_df_info, tags and params are logged via a locally-created MlflowClient(tracking_uri=tracking_uri) (lines 304, 309-318), but the entity DataFrame artifact is uploaded via the global mlflow.log_artifact(path) (line 327). The global function uses the tracking URI set by mlflow.set_tracking_uri(), not the explicit tracking_uri from the config. If the global tracking URI is changed by another library in the process, or if _init_mlflow_tracking failed silently (caught by except Exception at sdk/python/feast/feature_store.py:249), the artifact would be uploaded to a different server than where the tags/params were logged, splitting metadata across two servers.
Was this helpful? React with 👍 or 👎 to provide feedback.
| try: | ||
| import mlflow | ||
|
|
||
| tracking_uri = mlflow_cfg.tracking_uri or "http://127.0.0.1:5000" |
There was a problem hiding this comment.
🔴 UI server ignores MLFLOW_TRACKING_URI env var, falls back to hardcoded localhost
In ui_server.py, both the /api/mlflow-runs and /api/mlflow-feature-models endpoints resolve the MLflow tracking URI using mlflow_cfg.tracking_uri or "http://127.0.0.1:5000". This reads the raw tracking_uri field from the config and falls back to a hardcoded localhost URL, completely bypassing the MLFLOW_TRACKING_URI environment variable. In contrast, the rest of the codebase (feature_store.py:243, feature_store.py:325, feature_store.py:1696, feature_store.py:2885) correctly calls mlflow_cfg.get_tracking_uri() which checks the env var via sdk/python/feast/mlflow_integration/config.py:19-29. When a user sets MLFLOW_TRACKING_URI without setting tracking_uri in YAML (which is a very common deployment pattern, and documented in the PR's own docs at docs/reference/mlflow.md:51), the UI endpoints will incorrectly connect to http://127.0.0.1:5000 instead of the env-var-specified server.
| tracking_uri = mlflow_cfg.tracking_uri or "http://127.0.0.1:5000" | |
| tracking_uri = mlflow_cfg.get_tracking_uri() or "http://127.0.0.1:5000" |
Was this helpful? React with 👍 or 👎 to provide feedback.
| try: | ||
| import mlflow | ||
|
|
||
| tracking_uri = mlflow_cfg.tracking_uri or "http://127.0.0.1:5000" |
There was a problem hiding this comment.
🔴 Second UI endpoint also ignores MLFLOW_TRACKING_URI env var
Same issue as in the /api/mlflow-runs endpoint: the /api/mlflow-feature-models endpoint at sdk/python/feast/ui_server.py:234 uses mlflow_cfg.tracking_uri or "http://127.0.0.1:5000" instead of mlflow_cfg.get_tracking_uri(), causing the MLFLOW_TRACKING_URI environment variable to be ignored.
Was this helpful? React with 👍 or 👎 to provide feedback.
| ) | ||
|
|
||
| # Emit MLflow event for materialization (Phase 7) | ||
| _mat_duration = time.monotonic() - _retrieval_start if '_retrieval_start' in dir() else 0 |
There was a problem hiding this comment.
🟡 Undefined _retrieval_start variable in materialize_incremental causes bogus duration or NameError
In materialize_incremental, the code at line 2115 references _retrieval_start which is never defined in that method scope. The expression '_retrieval_start' in dir() checks local variable names but dir() without arguments returns module-level names, not local variables -- so the check is unreliable. If _retrieval_start is not in the returned list, _mat_duration defaults to 0 (acceptable but meaningless). However, if it IS in the name list (e.g., a module-level _retrieval_start existed from another context), it would attempt time.monotonic() - _retrieval_start on a potentially unrelated value, resulting in a wrong duration or a NameError/TypeError at runtime.
| _mat_duration = time.monotonic() - _retrieval_start if '_retrieval_start' in dir() else 0 | |
| _mat_duration = 0 |
Was this helpful? React with 👍 or 👎 to provide feedback.
| if tracking_uri: | ||
| mlflow.set_tracking_uri(tracking_uri) | ||
|
|
||
| experiment_name = f"{project}{ops_experiment_suffix}" | ||
| mlflow.set_experiment(experiment_name) |
There was a problem hiding this comment.
🔴 log_apply_to_mlflow and log_materialize_to_mlflow mutate global MLflow tracking URI and experiment, corrupting concurrent user runs
Both log_apply_to_mlflow (sdk/python/feast/mlflow_integration/logger.py:185-189) and log_materialize_to_mlflow (sdk/python/feast/mlflow_integration/logger.py:251-255) call mlflow.set_tracking_uri(tracking_uri) and mlflow.set_experiment(experiment_name) globally. These mutate process-wide global state. If a user has an active MLflow run in a different experiment (e.g. during feast apply inside a training script), these calls redirect subsequent MLflow operations to the ops experiment. While the functions try to restore mlflow.set_experiment(project) afterward (line 224, 274), the tracking URI is never restored. In the exception path, if mlflow.start_run() raises (line 202/258), the experiment remains set to the ops experiment. This corrupts any concurrent or subsequent user operations in the same process.
Prompt for agents
The functions log_apply_to_mlflow and log_materialize_to_mlflow in sdk/python/feast/mlflow_integration/logger.py mutate process-wide global state by calling mlflow.set_tracking_uri() and mlflow.set_experiment(). This corrupts any concurrent user MLflow runs in the same process.
The fix should save and restore the original tracking URI and experiment before/after the ops logging, or preferably use the MlflowClient API exclusively (which already takes tracking_uri as a parameter) instead of the global mlflow module-level functions. The MlflowClient.create_run() method allows creating runs in specific experiments without mutating global state. This pattern is already used in log_feature_retrieval_to_mlflow which correctly uses client = mlflow.MlflowClient(tracking_uri=tracking_uri) without touching global state.
Was this helpful? React with 👍 or 👎 to provide feedback.
| def mlflow(self): | ||
| """Get the MLflow configuration.""" | ||
| if not self._mlflow: | ||
| if isinstance(self.mlflow_config, Dict): | ||
| from feast.mlflow_integration.config import MlflowConfig | ||
|
|
||
| self._mlflow = MlflowConfig(**self.mlflow_config) | ||
| elif self.mlflow_config: | ||
| self._mlflow = self.mlflow_config | ||
| return self._mlflow |
There was a problem hiding this comment.
🟡 mlflow property returns falsy None on subsequent calls when MlflowConfig(enabled=False) is set
The mlflow property in RepoConfig (sdk/python/feast/repo_config.py:497-506) uses if not self._mlflow: as the guard. When the config is a valid MlflowConfig object with enabled=False, self._mlflow will be a truthy MlflowConfig instance (Pydantic models are truthy), so this works correctly on first access. However, if mlflow_config is None (the default), self._mlflow stays None forever and the property returns None -- which is fine. The real issue is: if mlflow_config is an empty dict {}, MlflowConfig(**{}) creates a valid config with enabled=False, and self._mlflow becomes truthy. But if mlflow_config is set to some falsy-like Pydantic object (unlikely but possible with custom subclasses), the if not self._mlflow guard would re-create it every time. This is the same pattern as openlineage property so it's consistent, but not actually a bug for standard usage.
Was this helpful? React with 👍 or 👎 to provide feedback.
| - **Online feature retrieval** -- `get_online_features()` tags the run with the same metadata | ||
| - **Entity DataFrame archival** -- optionally saves the training entity DataFrame as an MLflow artifact for full reproducibility | ||
| - **Execution context tagging** -- tags runs with where they ran (workbench, KFP pipeline, feature server, or standalone) | ||
| - **Operation logging** -- optionally logs `feast apply` and `feast materialize` to a separate MLflow experiment |
There was a problem hiding this comment.
not sure this is actually useful
There was a problem hiding this comment.
actually, this is not updated one, thought of this one for backtracking from mllfow run to the workbench experiment but now it is omitted
| - **Entity DataFrame archival** -- optionally saves the training entity DataFrame as an MLflow artifact for full reproducibility | ||
| - **Execution context tagging** -- tags runs with where they ran (workbench, KFP pipeline, feature server, or standalone) | ||
| - **Operation logging** -- optionally logs `feast apply` and `feast materialize` to a separate MLflow experiment | ||
| - **Model-to-Feature resolution** -- map any MLflow model URI back to its Feast feature service |
| - **Execution context tagging** -- tags runs with where they ran (workbench, KFP pipeline, feature server, or standalone) | ||
| - **Operation logging** -- optionally logs `feast apply` and `feast materialize` to a separate MLflow experiment | ||
| - **Model-to-Feature resolution** -- map any MLflow model URI back to its Feast feature service | ||
| - **Training reproducibility** -- reconstruct the exact entity DataFrame from a past MLflow run |
| - **Operation logging** -- optionally logs `feast apply` and `feast materialize` to a separate MLflow experiment | ||
| - **Model-to-Feature resolution** -- map any MLflow model URI back to its Feast feature service | ||
| - **Training reproducibility** -- reconstruct the exact entity DataFrame from a past MLflow run | ||
| - **Training-to-prediction linkage** -- `FeastMlflowClient.load_model()` links prediction runs back to their training runs |
| - **Model-to-Feature resolution** -- map any MLflow model URI back to its Feast feature service | ||
| - **Training reproducibility** -- reconstruct the exact entity DataFrame from a past MLflow run | ||
| - **Training-to-prediction linkage** -- `FeastMlflowClient.load_model()` links prediction runs back to their training runs | ||
| - **Feast MLflow Client** -- a thin wrapper that eliminates direct `import mlflow` in user code |
| _consecutive_failures = 0 | ||
|
|
||
|
|
||
| def log_feature_retrieval_to_mlflow( |
There was a problem hiding this comment.
i guess if we have this enabled why do we need to even have the wrapper? can't we full handle this for the user?
There was a problem hiding this comment.
I guess my question is, can't we just hide all of the mlflow client usage to the user?
There was a problem hiding this comment.
the auto-logging in logger.py fully handled for users who already use mlflow.start_run() or client.start_run() considering our FeastMlflowClient wrapper which is for users who want to avoid import mlflow entirely and get additional Feast-specific behavior on top ex: logging a model, registering it, loading it for inference etc soFeastMlflowClient is for users who want the extra lineage features without touching MLflow directly
There was a problem hiding this comment.
I would rather push all these functions to integration client as everything here is a part of mlflow feast integration?
jyejare
left a comment
There was a problem hiding this comment.
Comments inline:
- We also need to enable mlflow configuratins from feast operator else admin has to do it explicitly.
- Also, we need to remove duplicate mechanisms for initiating/using mlflow client and we should provide a clean way that is
store.mlflowwhenever possible.
| 2. `MLFLOW_TRACKING_URI` environment variable | ||
| 3. MLflow's default (`./mlruns` local directory) | ||
|
|
||
| This means you can omit `tracking_uri` from the YAML and set `MLFLOW_TRACKING_URI` in your environment instead. |
There was a problem hiding this comment.
| This means you can omit `tracking_uri` from the YAML and set `MLFLOW_TRACKING_URI` in your environment instead. | |
| This means you can omit `tracking_uri` from the YAML and set `MLFLOW_TRACKING_URI` in your environment instead or it would be pulled from /.mlruns automatically when both are not set. |
wdyt ?
|
|
||
| | Artifact | Description | | ||
| |----------|-------------| | ||
| | `required_features.json` | JSON list of feature references the model was trained on | |
There was a problem hiding this comment.
Could we rename to relate it with feast better and so that it would be self describing in MLFLow ecosystem, something like:
required_feast_features.json
OR
feast_features.json
| With the configuration above, feature metadata is logged whenever there is an active MLflow run: | ||
|
|
||
| ```python | ||
| import mlflow |
There was a problem hiding this comment.
I think explicit import is no more required as feast module gives you the ability to:
store.mlflow.start_run(....
| from feast import FeatureStore | ||
|
|
||
| store = FeatureStore(".") | ||
| client = store.get_mlflow_client() |
There was a problem hiding this comment.
Even this is changed now. Lets keep only store.mlflow. examples.
| | Condition | Error | | ||
| |-----------|-------| | ||
| | No `feature_store.yaml` in cwd and no store created | `RuntimeError` with guidance to call `feast.mlflow.init(store)` | | ||
| | `mlflow.enabled` is `false` or missing | `RuntimeError` with guidance to set `mlflow.enabled=true` | |
There was a problem hiding this comment.
Even if user wants to set it to false ?
| invalid, resolution fails, or validation against the store fails. | ||
| """ | ||
| try: | ||
| import mlflow |
There was a problem hiding this comment.
Same here. Create a class and initiate in integration object/class that takes store object as param.
| if mlflow_cfg is None or not mlflow_cfg.enabled: | ||
| return | ||
|
|
||
| import mlflow |
There was a problem hiding this comment.
The store object is available to you here as self, so why not :
self.mlflow.set_tracking_uri(....
| client = MlflowClient(tracking_uri=tracking_uri) | ||
| refs = ["z_view:f1", "a_view:f2", "z_view:f3", "a_view:f4"] | ||
|
|
||
| with mlflow.start_run() as run: |
There was a problem hiding this comment.
The tests should also be focusing only on store.mlflow... culture.
| ops_suffix = mlflow_cfg.ops_experiment_suffix | ||
|
|
||
| effective_uri = self._tracking_uri or self._mlflow.get_tracking_uri() | ||
| client = self._mlflow.MlflowClient(tracking_uri=effective_uri) |
There was a problem hiding this comment.
instead of creating new client everytime, can we initialise once and reuse ?
| # Initialize feature service cache for performance optimization | ||
| self._feature_service_cache = {} | ||
|
|
||
| # Cache for _resolve_feature_service_name lookups |
There was a problem hiding this comment.
the MLflow auto-log could resolve to a deleted or renamed feature service. _fs_name_cache and _fs_name_index caches (used by _resolve_feature_service_name) are never cleared when refresh_registry() or _clear_feature_service_cache() is called
| self._fs_name_cache = {} | ||
| self._fs_name_index_ts = time.monotonic() | ||
|
|
||
| def _resolve_feature_service_name(self, feature_refs: List[str]) -> Optional[str]: |
There was a problem hiding this comment.
can't we use existing _feature_service_cache from FeatureStore?
There was a problem hiding this comment.
_feature_service_cache lookup is : service_name → [feature_refs]and is populated lazily on demand
_fs_name_index lookup is reverse {feature_refs} → service_name, populated from all registry services needed for subset matching when users pass raw refs instead of a FeatureService
|
|
||
| try: | ||
| if self.mlflow is not None and self.config.mlflow.auto_log: | ||
| import asyncio |
| self._fs_name_index_ts: float = -self._FS_NAME_INDEX_TTL_SECONDS | ||
|
|
||
| self._mlflow_client: Optional[Any] = None | ||
| self._init_mlflow() |
There was a problem hiding this comment.
I think better to do lazy loading here, first check the config and then load module.
| self._default_experiment = store.config.project | ||
|
|
||
| if self._tracking_uri: | ||
| _mlflow_mod.set_tracking_uri(self._tracking_uri) |
There was a problem hiding this comment.
remove set_tracking_uri() from __init__ and instead pass tracking_uri explicitly when needed (the MlflowClient already receives it)
| ) | ||
|
|
||
| @app.get("/api/mlflow-runs") | ||
| def get_mlflow_runs(max_results: int = 50): |
There was a problem hiding this comment.
should this be a registry API? Not a blocker for this PR, but architecture wise, registry is source of truth
There was a problem hiding this comment.
Yes, will add this one will open to other benefits too !
There was a problem hiding this comment.
@Vperiodt going to handle this in this PR or different one?
There was a problem hiding this comment.
yess, will address this in a follow up PR
|
linter is failing |
fixed |
There was a problem hiding this comment.
include mlflow here in ci dependencies
| <EuiFlexItem> | ||
| <EuiStat title={`${numOfFs}`} description="Consuming Services" /> | ||
| </EuiFlexItem> | ||
| <EuiFlexItem> |
There was a problem hiding this comment.
This should be conditionally rendered, users who don't use MLflow will wonder what these stats mean and why they're there
| const usage: FeatureUsageEntry | undefined = | ||
| data.feature_usage?.[featureViewName]; | ||
|
|
||
| if (!usage || usage.run_count === 0) { |
There was a problem hiding this comment.
same here, if mlflow is not configured, this needs to be hidden
|
@Vperiodt Guard the UI components behind a check on whether MLflow data is available. The simplest approach is to have the backend return a |
hello ! @ntkathole, Addressed all reviews |
|
@Vperiodt need to update pixi.lock file, also please rebase |
Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED refine MLflow integration, UI runs endpoint, and lockfiles Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED fix issues Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED handle fallback Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED added model to features table Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED address CI Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED add docs Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED fix-lint Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED update docs Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED update-logic Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED add-integration-test Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED mlflowclient wrapper Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED address-reviews Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED update docs Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED fix-lint Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED fix-auto-log Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED update feastMlflowClient Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED changing init Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED fixing docs Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED remove doc Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED update format Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED fix-test Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED update docs Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED addressed reviews Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED fix: update .secrets.baseline to remove pragma-allowlisted entry Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED ui-fallback Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED update requirements Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
92892e7 to
3da673d
Compare
Signed-off-by: Vanshika Vanshika <vvanshik@redhat.com> rh-pre-commit.version: 2.3.2 rh-pre-commit.check-secrets: ENABLED
|
lgtm |
What this PR does / why we need it:
final_mlflow_demo.mp4
Native MLflow integration for Feast that provides automatic feature lineage tracking alongside ML experiments. When enabled via feature_store.yaml, every feature retrieval is logged to the active MLflow run.
Key Capabilities
Auto-logging:Feature retrieval metadata is tagged on the active MLflow run (feast.feature_refs, feast.feature_views, feast.feature_service, feast.entity_count, etc.)Entity DataFrame archival:Optionally saves the training entity DataFrame as an MLflow artifact (entity_df.parquet) for full reproducibilityModel-to-feature-service resolution:store.mlflow.resolve_features() maps any MLflow model URI back to its Feast feature service, enabling serving pipelines to auto-discover which features a model needsEntity DataFrame reconstruction:store.mlflow.get_training_entity_df() rebuilds the exact entity DataFrame from a past run's artifacts, enabling training reproducibilityModel lifecycle lineage:log_model auto-attaches feast_features.json, register_model propagates feast.feature_service to model versions, load_model links prediction runs to training runsOperations audit trail:feast apply and feast materialize logged to a dedicated {project}-feast-ops experimentDataset tracking:log_training_dataset() logs training DataFrames as MLflow dataset inputsTwo access patterns:store.mlflow (explicit, multi-store safe) and feast.mlflow (drop-in replacement for import mlflow)Configuration:Controlled entirely via feature_store.yaml under a new mlflow: blockDemo
Full end-to-end demo notebook with all capabilities: integration-example
Which issue(s) this PR fixes:
Checks
git commit -s)Testing Strategy
Misc