diff --git a/sdk/python/feast/audit/__init__.py b/sdk/python/feast/audit/__init__.py new file mode 100644 index 00000000000..7b5d8ba7bb5 --- /dev/null +++ b/sdk/python/feast/audit/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2025 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/sdk/python/feast/audit/audit_logger.py b/sdk/python/feast/audit/audit_logger.py new file mode 100644 index 00000000000..783a947a61a --- /dev/null +++ b/sdk/python/feast/audit/audit_logger.py @@ -0,0 +1,347 @@ +# Copyright 2025 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Structured audit logging for the Feast feature server. + +Emits JSONL audit events for MCP tool calls, REST requests, and +authentication/authorization decisions. Sensitive payloads (tokens, +entity rows, feature values) are never included. +""" + +import abc +import logging +import sys +import threading +import uuid +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, Field + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Audit event schema +# --------------------------------------------------------------------------- + + +class AuditPrincipal(BaseModel): + username: str = "" + roles: List[str] = Field(default_factory=list) + auth_type: str = "" + + +class AuditSource(BaseModel): + ip: str = "" + transport: str = "" + + +class AuditAction(BaseModel): + mcp_tool: str = "" + path: str = "" + + +class AuditResource(BaseModel): + type: str = "" + name: str = "" + actions: List[str] = Field(default_factory=list) + + +class AuditEvent(BaseModel): + """A single structured audit log entry. + + Fields follow the schema proposed in feast-dev/feast#6452. + No sensitive payloads (tokens, entity rows, feature values) are stored. + When OpenTelemetry is active, ``trace_id`` and ``span_id`` are populated + automatically for correlation with distributed traces. + """ + + event_type: str + timestamp: str = "" + request_id: str = "" + trace_id: Optional[str] = None + span_id: Optional[str] = None + jsonrpc_id: Optional[str] = None + principal: AuditPrincipal = Field(default_factory=AuditPrincipal) + source: AuditSource = Field(default_factory=AuditSource) + action: AuditAction = Field(default_factory=AuditAction) + resource: AuditResource = Field(default_factory=AuditResource) + outcome: str = "" + duration_ms: Optional[float] = None + detail: str = "" + + def to_jsonl(self) -> str: + return self.model_dump_json(exclude_none=True) + + +# --------------------------------------------------------------------------- +# Sink abstraction +# --------------------------------------------------------------------------- + + +class AuditSink(abc.ABC): + """Base class for audit event sinks.""" + + @abc.abstractmethod + def emit(self, event: AuditEvent) -> None: ... + + def close(self) -> None: + pass + + +class StdoutAuditSink(AuditSink): + """Write JSONL events to stdout (the default).""" + + def emit(self, event: AuditEvent) -> None: + sys.stdout.write(event.to_jsonl() + "\n") + sys.stdout.flush() + + +class FileAuditSink(AuditSink): + """Append JSONL events to a local file.""" + + def __init__(self, file_path: str) -> None: + self._file_path = file_path + self._fh = open(file_path, "a", buffering=1) + + def emit(self, event: AuditEvent) -> None: + self._fh.write(event.to_jsonl() + "\n") + + def close(self) -> None: + self._fh.close() + + +class LoggerAuditSink(AuditSink): + """Emit audit events through Python's ``logging`` module at INFO level.""" + + def __init__(self, logger_name: str = "feast.audit") -> None: + self._logger = logging.getLogger(logger_name) + + def emit(self, event: AuditEvent) -> None: + self._logger.info(event.to_jsonl()) + + +# --------------------------------------------------------------------------- +# AuditLogger — the main entry point +# --------------------------------------------------------------------------- + + +class AuditLogger: + """Central audit logger that routes events to the configured sink. + + Instantiate once during feature-server startup and share via + ``app.state.audit_logger``. + """ + + def __init__( + self, + sink: AuditSink, + *, + log_successful_reads: bool = True, + ) -> None: + self._sink = sink + self._log_successful_reads = log_successful_reads + self._lock = threading.Lock() + + # -- helpers ----------------------------------------------------------- + + @staticmethod + def new_request_id() -> str: + return str(uuid.uuid4()) + + @staticmethod + def _utcnow_iso() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" + + @staticmethod + def _inject_otel_context(event: AuditEvent) -> None: + """Populate trace_id/span_id from the active OpenTelemetry span, if any.""" + if event.trace_id is not None: + return + try: + from opentelemetry import trace as otel_trace + + span = otel_trace.get_current_span() + ctx = span.get_span_context() + if ctx and ctx.is_valid: + event.trace_id = format(ctx.trace_id, "032x") + event.span_id = format(ctx.span_id, "016x") + except ImportError: + pass + except Exception: + pass + + # -- public API -------------------------------------------------------- + + def log(self, event: AuditEvent) -> None: + if not event.timestamp: + event.timestamp = self._utcnow_iso() + if not event.request_id: + event.request_id = self.new_request_id() + + self._inject_otel_context(event) + + if event.outcome == "success" and not self._log_successful_reads: + read_events = {"mcp.tools.call", "http.request"} + if event.event_type in read_events: + read_actions = {"READ_ONLINE", "READ_OFFLINE"} + resource_actions = set(event.resource.actions) + if resource_actions and resource_actions.issubset(read_actions): + return + + try: + with self._lock: + self._sink.emit(event) + except Exception: + logger.exception("Failed to emit audit event") + + def log_mcp_call( + self, + *, + request_id: str, + tool_name: str, + path: str = "", + principal: Optional[AuditPrincipal] = None, + source: Optional[AuditSource] = None, + resource: Optional[AuditResource] = None, + outcome: str = "success", + duration_ms: Optional[float] = None, + detail: str = "", + ) -> None: + self.log( + AuditEvent( + event_type="mcp.tools.call", + request_id=request_id, + principal=principal or AuditPrincipal(), + source=source or AuditSource(transport="mcp-http"), + action=AuditAction(mcp_tool=tool_name, path=path), + resource=resource or AuditResource(), + outcome=outcome, + duration_ms=duration_ms, + detail=detail, + ) + ) + + def log_http_request( + self, + *, + request_id: str, + method: str, + path: str, + principal: Optional[AuditPrincipal] = None, + source: Optional[AuditSource] = None, + resource: Optional[AuditResource] = None, + outcome: str = "success", + duration_ms: Optional[float] = None, + status_code: int = 200, + ) -> None: + self.log( + AuditEvent( + event_type="http.request", + request_id=request_id, + principal=principal or AuditPrincipal(), + source=source or AuditSource(transport="http"), + action=AuditAction(path=path), + resource=resource or AuditResource(), + outcome=outcome, + duration_ms=duration_ms, + detail=f"{method} {path} -> {status_code}", + ) + ) + + def log_authn( + self, + *, + request_id: str, + outcome: str, + principal: Optional[AuditPrincipal] = None, + source: Optional[AuditSource] = None, + detail: str = "", + ) -> None: + event_type = "authn.success" if outcome == "success" else "authn.failure" + self.log( + AuditEvent( + event_type=event_type, + request_id=request_id, + principal=principal or AuditPrincipal(), + source=source or AuditSource(), + outcome=outcome, + detail=detail, + ) + ) + + def log_authz( + self, + *, + request_id: str, + outcome: str, + principal: Optional[AuditPrincipal] = None, + resource: Optional[AuditResource] = None, + detail: str = "", + ) -> None: + self.log( + AuditEvent( + event_type="authz.decision", + request_id=request_id, + principal=principal or AuditPrincipal(), + resource=resource or AuditResource(), + outcome=outcome, + detail=detail, + ) + ) + + def close(self) -> None: + self._sink.close() + + +# --------------------------------------------------------------------------- +# Factory +# --------------------------------------------------------------------------- + +_SINK_FACTORIES = { + "stdout": lambda cfg: StdoutAuditSink(), + "file": lambda cfg: FileAuditSink(cfg.get("file_path", "feast_audit.log")), + "logger": lambda cfg: LoggerAuditSink(cfg.get("logger_name", "feast.audit")), +} + + +def create_audit_logger_from_config( + audit_cfg: Any, +) -> Optional[AuditLogger]: + """Build an ``AuditLogger`` from an ``AuditLoggingConfig`` pydantic model. + + Returns ``None`` when audit logging is disabled. + """ + if audit_cfg is None or not getattr(audit_cfg, "enabled", False): + return None + + sink_type = getattr(audit_cfg, "sink", "stdout") + raw: Dict[str, Any] = {} + if hasattr(audit_cfg, "file_path"): + raw["file_path"] = audit_cfg.file_path + if hasattr(audit_cfg, "logger_name"): + raw["logger_name"] = audit_cfg.logger_name + + factory = _SINK_FACTORIES.get(sink_type) + if factory is None: + logger.warning("Unknown audit sink %r, falling back to stdout", sink_type) + factory = _SINK_FACTORIES["stdout"] + + sink = factory(raw) + return AuditLogger( + sink, + log_successful_reads=getattr(audit_cfg, "log_successful_reads", True), + ) diff --git a/sdk/python/feast/audit/audit_middleware.py b/sdk/python/feast/audit/audit_middleware.py new file mode 100644 index 00000000000..3ea84d2c655 --- /dev/null +++ b/sdk/python/feast/audit/audit_middleware.py @@ -0,0 +1,135 @@ +# Copyright 2025 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +FastAPI middleware for structured audit logging of REST endpoints. + +``AuditLoggingMiddleware`` logs every HTTP request/response on the REST +endpoints (``/get-online-features``, ``/push``, etc.) with principal, +resource, outcome, and duration. + +MCP tool-call auditing is handled at the protocol layer by wrapping +the ``tools/call`` handler inside ``add_mcp_support_to_app()`` (see +``feast.infra.mcp_servers.mcp_server``). + +The middleware is added only when ``audit_logging.enabled`` is ``true`` +in the feature-server configuration. +""" + +import logging +import time +from typing import Optional + +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.requests import Request +from starlette.responses import Response + +from feast.audit.audit_logger import ( + AuditLogger, + AuditPrincipal, + AuditResource, + AuditSource, +) + +logger = logging.getLogger(__name__) + +# REST paths that correspond to read/write actions +_PATH_RESOURCE_MAP = { + "/get-online-features": ("feature_service", ["READ_ONLINE"]), + "/retrieve-online-documents": ("feature_service", ["READ_ONLINE"]), + "/push": ("push_source", ["WRITE_ONLINE", "WRITE_OFFLINE"]), + "/write-to-online-store": ("feature_view", ["WRITE_ONLINE"]), + "/materialize": ("feature_view", ["WRITE_ONLINE"]), + "/materialize-incremental": ("feature_view", ["WRITE_ONLINE"]), +} + + +def _extract_client_ip(request: Request) -> str: + forwarded = request.headers.get("x-forwarded-for", "") + if forwarded: + return forwarded.split(",")[0].strip() + if request.client: + return request.client.host + return "" + + +def _principal_from_request(request: Request) -> AuditPrincipal: + """Build an ``AuditPrincipal`` from the security manager's current user.""" + try: + from feast.permissions.security_manager import get_security_manager + + sm = get_security_manager() + if sm and sm.current_user: + user = sm.current_user + return AuditPrincipal( + username=user.username, + roles=list(user.roles) if user.roles else [], + auth_type=request.headers.get("x-feast-auth-type", ""), + ) + except Exception: + pass + return AuditPrincipal() + + +class AuditLoggingMiddleware(BaseHTTPMiddleware): + """Emit ``http.request`` audit events for REST endpoints.""" + + async def dispatch(self, request: Request, call_next): # type: ignore[override] + audit: Optional[AuditLogger] = getattr(request.app.state, "audit_logger", None) + if audit is None: + return await call_next(request) + + path = request.url.path + # Skip health and static endpoints + if path in ("/health", "/docs", "/openapi.json") or path.startswith("/static"): + return await call_next(request) + + # Skip MCP endpoints — handled by McpAuditMiddleware + if path.startswith("/mcp"): + return await call_next(request) + + request_id = request.headers.get("x-request-id", audit.new_request_id()) + request.state.audit_request_id = request_id + + start = time.monotonic() + response: Response + outcome = "success" + status_code = 200 + try: + response = await call_next(request) + status_code = response.status_code + if status_code >= 400: + outcome = "failure" + except Exception: + outcome = "error" + status_code = 500 + raise + finally: + duration_ms = (time.monotonic() - start) * 1000.0 + resource_info = _PATH_RESOURCE_MAP.get(path, ("", [])) + audit.log_http_request( + request_id=request_id, + method=request.method, + path=path, + principal=_principal_from_request(request), + source=AuditSource(ip=_extract_client_ip(request), transport="http"), + resource=AuditResource( + type=resource_info[0], actions=list(resource_info[1]) + ), + outcome=outcome, + duration_ms=round(duration_ms, 2), + status_code=status_code, + ) + + return response diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index 81359222797..2861db883eb 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -349,11 +349,27 @@ def async_refresh(): active_timer = threading.Timer(registry_ttl_sec, async_refresh) active_timer.start() + # --- Audit logging setup --- + audit_logging_cfg = getattr(fs_cfg, "audit_logging", None) + audit_logger_instance = None + if audit_logging_cfg is not None and getattr(audit_logging_cfg, "enabled", False): + from feast.audit.audit_logger import create_audit_logger_from_config + + audit_logger_instance = create_audit_logger_from_config(audit_logging_cfg) + if audit_logger_instance: + logger.info( + "Structured audit logging is ENABLED (sink=%s)", + getattr(audit_logging_cfg, "sink", "stdout"), + ) + @asynccontextmanager async def lifespan(app: FastAPI): # Load static artifacts before initializing store await load_static_artifacts(app, store) + if audit_logger_instance is not None: + app.state.audit_logger = audit_logger_instance + await store.initialize() async_refresh() try: @@ -362,10 +378,19 @@ async def lifespan(app: FastAPI): stop_refresh() if offline_batcher is not None: offline_batcher.shutdown() + if audit_logger_instance is not None: + audit_logger_instance.close() await store.close() app = FastAPI(lifespan=lifespan) + # Add audit logging middleware when enabled (REST only; + # MCP audit is handled at the protocol layer in mcp_server.py) + if audit_logger_instance is not None: + from feast.audit.audit_middleware import AuditLoggingMiddleware + + app.add_middleware(AuditLoggingMiddleware) + @app.post( "/get-online-features", dependencies=[Depends(inject_user_details)], @@ -740,12 +765,12 @@ async def websocket_endpoint(websocket: WebSocket): app.mount("/static", StaticFiles(directory=static_dir), name="static") # Add MCP support if enabled in feature server configuration - _add_mcp_support_if_enabled(app, store) + _add_mcp_support_if_enabled(app, store, audit_logger_instance) return app -def _add_mcp_support_if_enabled(app, store: "feast.FeatureStore"): +def _add_mcp_support_if_enabled(app, store: "feast.FeatureStore", audit_logger=None): """Add MCP support to the FastAPI app if enabled in configuration.""" mcp_transport_not_supported_error = None try: @@ -767,7 +792,9 @@ def _add_mcp_support_if_enabled(app, store: "feast.FeatureStore"): logger.error(f"Error checking/adding MCP support: {e}") return - mcp_server = add_mcp_support_to_app(app, store, store.config.feature_server) + mcp_server = add_mcp_support_to_app( + app, store, store.config.feature_server, audit_logger=audit_logger + ) if mcp_server: logger.info("MCP support has been enabled for the Feast feature server") diff --git a/sdk/python/feast/infra/feature_servers/base_config.py b/sdk/python/feast/infra/feature_servers/base_config.py index df324dc57d3..a85e1a8b2c4 100644 --- a/sdk/python/feast/infra/feature_servers/base_config.py +++ b/sdk/python/feast/infra/feature_servers/base_config.py @@ -83,6 +83,27 @@ class MetricsConfig(FeastConfigBaseModel): (feast_feature_freshness_seconds).""" +class AuditLoggingConfig(FeastConfigBaseModel): + """Structured audit logging configuration for the feature server. + + Emits JSONL audit events for MCP tool calls, REST requests, + and authentication/authorization decisions. + """ + + enabled: StrictBool = False + """Whether structured audit logging is enabled.""" + + sink: str = "stdout" + """Audit event sink: ``stdout``, ``file``, or ``logger``.""" + + file_path: str = "feast_audit.log" + """File path when ``sink`` is ``file``.""" + + log_successful_reads: StrictBool = True + """Emit audit events for successful read operations. Set to ``False`` + to reduce log volume in high-throughput read-heavy deployments.""" + + class BaseFeatureServerConfig(FeastConfigBaseModel): """Base Feature Server config that should be extended""" @@ -96,6 +117,10 @@ class BaseFeatureServerConfig(FeastConfigBaseModel): feature_logging: Optional[FeatureLoggingConfig] = None """ Feature logging configuration """ + audit_logging: Optional[AuditLoggingConfig] = None + """Structured audit logging configuration. Emits JSONL audit events + for MCP tool calls, REST requests, and auth decisions.""" + offline_push_batching_enabled: Optional[StrictBool] = None """Whether to batch writes to the offline store via the `/push` endpoint.""" diff --git a/sdk/python/feast/infra/mcp_servers/mcp_server.py b/sdk/python/feast/infra/mcp_servers/mcp_server.py index 972023cdd12..c5e93df3225 100644 --- a/sdk/python/feast/infra/mcp_servers/mcp_server.py +++ b/sdk/python/feast/infra/mcp_servers/mcp_server.py @@ -3,10 +3,16 @@ This module provides MCP support for Feast by integrating with fastapi_mcp to expose Feast functionality through the Model Context Protocol. + +When audit logging is enabled, the ``tools/call`` handler on the low-level +MCP ``Server`` is wrapped so that every tool invocation is logged with +typed tool name, outcome, duration, and principal — without parsing raw +JSON-RPC bodies. """ import logging -from typing import Optional +import time +from typing import Any, Optional from feast.feature_store import FeatureStore @@ -30,7 +36,12 @@ class McpTransportNotSupportedError(RuntimeError): pass -def add_mcp_support_to_app(app, store: FeatureStore, config) -> Optional["FastApiMCP"]: +def add_mcp_support_to_app( + app, + store: FeatureStore, + config, + audit_logger: Optional[Any] = None, +) -> Optional["FastApiMCP"]: """Add MCP support to the FastAPI app if enabled in configuration.""" if not MCP_AVAILABLE: logger.warning("MCP support requested but fastapi_mcp is not available") @@ -68,6 +79,9 @@ def add_mcp_support_to_app(app, store: FeatureStore, config) -> Optional["FastAp f"Unsupported mcp_transport={transport!r}. Expected 'sse' or 'http'." ) + if audit_logger is not None: + _wrap_call_tool_handler(mcp, audit_logger) + logger.info( "MCP support has been enabled for the Feast feature server at /mcp endpoint" ) @@ -83,3 +97,90 @@ def add_mcp_support_to_app(app, store: FeatureStore, config) -> Optional["FastAp except Exception as e: logger.error(f"Failed to initialize MCP integration: {e}", exc_info=True) return None + + +# --------------------------------------------------------------------------- +# Audit-logging wrapper for the MCP tools/call handler +# --------------------------------------------------------------------------- + + +def _principal_from_mcp_context(ctx: Any) -> Any: + """Extract an ``AuditPrincipal`` from the MCP request context's HTTP headers. + + Unlike REST endpoints the ``SecurityManager`` ``ContextVar`` is never + populated for MCP requests, so we read directly from the HTTP headers + that ``fastapi_mcp`` forwards into the request context. + """ + from feast.audit.audit_logger import AuditPrincipal + + try: + request = getattr(ctx, "request", None) + if request is None: + return AuditPrincipal() + headers: dict[str, str] = {} + if hasattr(request, "headers"): + headers = dict(request.headers) + auth_type = headers.get("x-feast-auth-type", "") + has_auth = bool(headers.get("authorization", "")) + return AuditPrincipal( + username="(authenticated)" if has_auth else "", + auth_type=auth_type, + ) + except Exception: + return AuditPrincipal() + + +def _wrap_call_tool_handler(mcp: "FastApiMCP", audit: Any) -> None: + """Wrap the MCP server's ``tools/call`` handler with audit logging. + + Operates at the protocol layer so that ``tool_name`` and error status + come as typed Python objects — no JSON-RPC body parsing required. + """ + from feast.audit.audit_logger import AuditAction, AuditEvent, AuditSource + + handlers = getattr(mcp.server, "_request_handlers", None) + if handlers is None: + logger.warning("Cannot wrap MCP call_tool handler: _request_handlers not found") + return + + original = handlers.get("tools/call") + if original is None: + logger.debug("No tools/call handler registered; skipping audit wrapper") + return + + async def audited_call_tool(ctx: Any, params: Any) -> Any: + tool_name = getattr(params, "name", "") if params else "" + request_id = audit.new_request_id() + jsonrpc_id: Optional[str] = None + if hasattr(ctx, "request_id"): + jsonrpc_id = str(ctx.request_id) + + start = time.monotonic() + outcome = "success" + error_detail = "" + try: + result = await original(ctx, params) + if hasattr(result, "isError") and result.isError: + outcome = "mcp_error" + return result + except Exception as exc: + outcome = "error" + error_detail = str(exc)[:200] + raise + finally: + duration_ms = (time.monotonic() - start) * 1000.0 + audit.log( + AuditEvent( + event_type="mcp.tools.call", + request_id=request_id, + jsonrpc_id=jsonrpc_id, + principal=_principal_from_mcp_context(ctx), + source=AuditSource(transport="mcp-http"), + action=AuditAction(mcp_tool=tool_name), + outcome=outcome, + duration_ms=round(duration_ms, 2), + detail=error_detail, + ) + ) + + handlers["tools/call"] = audited_call_tool diff --git a/sdk/python/tests/unit/audit/__init__.py b/sdk/python/tests/unit/audit/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdk/python/tests/unit/audit/test_audit_logger.py b/sdk/python/tests/unit/audit/test_audit_logger.py new file mode 100644 index 00000000000..7b85a8222da --- /dev/null +++ b/sdk/python/tests/unit/audit/test_audit_logger.py @@ -0,0 +1,403 @@ +import json +import os +import tempfile +import threading +import unittest +from unittest.mock import MagicMock, patch + +from feast.audit.audit_logger import ( + AuditEvent, + AuditLogger, + AuditPrincipal, + AuditResource, + AuditSink, + AuditSource, + FileAuditSink, + LoggerAuditSink, + StdoutAuditSink, + create_audit_logger_from_config, +) + + +class InMemorySink(AuditSink): + """Test sink that captures events in a list.""" + + def __init__(self): + self.events: list[AuditEvent] = [] + + def emit(self, event: AuditEvent) -> None: + self.events.append(event) + + +class TestAuditEvent(unittest.TestCase): + def test_event_to_jsonl(self): + event = AuditEvent( + event_type="mcp.tools.call", + timestamp="2026-05-28T12:00:00.000Z", + request_id="abc-123", + principal=AuditPrincipal( + username="jane@co.com", roles=["reader"], auth_type="oidc" + ), + source=AuditSource(ip="10.0.0.1", transport="mcp-http"), + outcome="success", + duration_ms=42.0, + ) + line = event.to_jsonl() + parsed = json.loads(line) + + self.assertEqual(parsed["event_type"], "mcp.tools.call") + self.assertEqual(parsed["principal"]["username"], "jane@co.com") + self.assertEqual(parsed["source"]["ip"], "10.0.0.1") + self.assertEqual(parsed["duration_ms"], 42.0) + self.assertNotIn("\n", line) + + def test_event_excludes_none_duration(self): + event = AuditEvent(event_type="authn.success") + parsed = json.loads(event.to_jsonl()) + self.assertNotIn("duration_ms", parsed) + + def test_event_includes_trace_id_and_span_id(self): + event = AuditEvent( + event_type="mcp.tools.call", + trace_id="0" * 32, + span_id="f" * 16, + ) + parsed = json.loads(event.to_jsonl()) + self.assertEqual(parsed["trace_id"], "0" * 32) + self.assertEqual(parsed["span_id"], "f" * 16) + + def test_event_excludes_none_trace_fields(self): + event = AuditEvent(event_type="test") + parsed = json.loads(event.to_jsonl()) + self.assertNotIn("trace_id", parsed) + self.assertNotIn("span_id", parsed) + self.assertNotIn("jsonrpc_id", parsed) + + def test_event_includes_jsonrpc_id(self): + event = AuditEvent(event_type="mcp.tools.call", jsonrpc_id="42") + parsed = json.loads(event.to_jsonl()) + self.assertEqual(parsed["jsonrpc_id"], "42") + + +class TestAuditLogger(unittest.TestCase): + def test_log_populates_timestamp_and_request_id(self): + sink = InMemorySink() + al = AuditLogger(sink) + al.log(AuditEvent(event_type="test")) + + self.assertEqual(len(sink.events), 1) + event = sink.events[0] + self.assertTrue(event.timestamp.endswith("Z")) + self.assertTrue(len(event.request_id) > 0) + + def test_log_mcp_call(self): + sink = InMemorySink() + al = AuditLogger(sink) + al.log_mcp_call( + request_id="r1", + tool_name="get_online_features", + path="/get-online-features", + outcome="success", + duration_ms=10.5, + ) + + self.assertEqual(len(sink.events), 1) + event = sink.events[0] + self.assertEqual(event.event_type, "mcp.tools.call") + self.assertEqual(event.action.mcp_tool, "get_online_features") + self.assertEqual(event.outcome, "success") + + def test_log_http_request(self): + sink = InMemorySink() + al = AuditLogger(sink) + al.log_http_request( + request_id="r2", + method="POST", + path="/push", + status_code=200, + ) + + self.assertEqual(len(sink.events), 1) + event = sink.events[0] + self.assertEqual(event.event_type, "http.request") + self.assertIn("POST /push -> 200", event.detail) + + def test_log_authn_success(self): + sink = InMemorySink() + al = AuditLogger(sink) + al.log_authn(request_id="r3", outcome="success") + self.assertEqual(sink.events[0].event_type, "authn.success") + + def test_log_authn_failure(self): + sink = InMemorySink() + al = AuditLogger(sink) + al.log_authn(request_id="r4", outcome="failure", detail="bad token") + event = sink.events[0] + self.assertEqual(event.event_type, "authn.failure") + self.assertEqual(event.detail, "bad token") + + def test_log_authz_decision(self): + sink = InMemorySink() + al = AuditLogger(sink) + al.log_authz( + request_id="r5", + outcome="denied", + resource=AuditResource( + type="feature_service", name="driver_fs", actions=["READ_ONLINE"] + ), + ) + event = sink.events[0] + self.assertEqual(event.event_type, "authz.decision") + self.assertEqual(event.resource.name, "driver_fs") + + def test_log_successful_reads_suppressed(self): + sink = InMemorySink() + al = AuditLogger(sink, log_successful_reads=False) + al.log_http_request( + request_id="r6", + method="POST", + path="/get-online-features", + resource=AuditResource(type="feature_service", actions=["READ_ONLINE"]), + outcome="success", + status_code=200, + ) + # Successful read should be suppressed + self.assertEqual(len(sink.events), 0) + + def test_log_successful_reads_not_suppressed_for_writes(self): + sink = InMemorySink() + al = AuditLogger(sink, log_successful_reads=False) + al.log_http_request( + request_id="r7", + method="POST", + path="/push", + resource=AuditResource(type="push_source", actions=["WRITE_ONLINE"]), + outcome="success", + status_code=200, + ) + self.assertEqual(len(sink.events), 1) + + def test_log_failed_reads_not_suppressed(self): + sink = InMemorySink() + al = AuditLogger(sink, log_successful_reads=False) + al.log_http_request( + request_id="r8", + method="POST", + path="/get-online-features", + resource=AuditResource(type="feature_service", actions=["READ_ONLINE"]), + outcome="failure", + status_code=500, + ) + self.assertEqual(len(sink.events), 1) + + def test_emit_exception_does_not_raise(self): + bad_sink = MagicMock(spec=AuditSink) + bad_sink.emit.side_effect = RuntimeError("disk full") + al = AuditLogger(bad_sink) + al.log(AuditEvent(event_type="test")) + + def test_otel_context_injected_when_active(self): + import sys + import types + + mock_ctx = MagicMock() + mock_ctx.is_valid = True + mock_ctx.trace_id = 0xABCDEF1234567890ABCDEF1234567890 + mock_ctx.span_id = 0x1234567890ABCDEF + + mock_span = MagicMock() + mock_span.get_span_context.return_value = mock_ctx + + mock_otel_trace = types.ModuleType("opentelemetry.trace") + mock_otel_trace.get_current_span = lambda: mock_span # type: ignore[attr-defined] + + mock_otel = types.ModuleType("opentelemetry") + mock_otel.trace = mock_otel_trace # type: ignore[attr-defined] + + saved_otel = sys.modules.get("opentelemetry") + saved_trace = sys.modules.get("opentelemetry.trace") + sys.modules["opentelemetry"] = mock_otel + sys.modules["opentelemetry.trace"] = mock_otel_trace + try: + sink = InMemorySink() + al = AuditLogger(sink) + al.log(AuditEvent(event_type="test")) + event = sink.events[0] + self.assertEqual(event.trace_id, "abcdef1234567890abcdef1234567890") + self.assertEqual(event.span_id, "1234567890abcdef") + finally: + if saved_otel is None: + sys.modules.pop("opentelemetry", None) + else: + sys.modules["opentelemetry"] = saved_otel + if saved_trace is None: + sys.modules.pop("opentelemetry.trace", None) + else: + sys.modules["opentelemetry.trace"] = saved_trace + + def test_otel_not_available_no_crash(self): + sink = InMemorySink() + al = AuditLogger(sink) + al.log(AuditEvent(event_type="test")) + event = sink.events[0] + self.assertIsNone(event.trace_id) + self.assertIsNone(event.span_id) + + def test_otel_skipped_when_trace_id_already_set(self): + sink = InMemorySink() + al = AuditLogger(sink) + al.log(AuditEvent(event_type="test", trace_id="preexisting")) + event = sink.events[0] + self.assertEqual(event.trace_id, "preexisting") + + def test_audit_logger_has_lock(self): + sink = InMemorySink() + al = AuditLogger(sink) + self.assertIsInstance(al._lock, type(threading.Lock())) + + +class TestStdoutSink(unittest.TestCase): + @patch("sys.stdout") + def test_emit_writes_to_stdout(self, mock_stdout): + sink = StdoutAuditSink() + event = AuditEvent(event_type="test", timestamp="t", request_id="r") + sink.emit(event) + mock_stdout.write.assert_called_once() + written = mock_stdout.write.call_args[0][0] + self.assertIn('"event_type":"test"', written) + self.assertTrue(written.endswith("\n")) + + +class TestFileAuditSink(unittest.TestCase): + def test_emit_appends_to_file(self): + with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as tmp: + path = tmp.name + + try: + sink = FileAuditSink(path) + event = AuditEvent(event_type="test", timestamp="t", request_id="r") + sink.emit(event) + sink.close() + + with open(path) as f: + lines = f.readlines() + self.assertEqual(len(lines), 1) + parsed = json.loads(lines[0]) + self.assertEqual(parsed["event_type"], "test") + finally: + os.unlink(path) + + +class TestLoggerAuditSink(unittest.TestCase): + @patch("logging.getLogger") + def test_emit_logs_at_info(self, mock_get_logger): + mock_logger = MagicMock() + mock_get_logger.return_value = mock_logger + + sink = LoggerAuditSink("feast.audit") + event = AuditEvent(event_type="test", timestamp="t", request_id="r") + sink.emit(event) + mock_logger.info.assert_called_once() + + +class TestCreateAuditLoggerFromConfig(unittest.TestCase): + def test_returns_none_when_disabled(self): + from types import SimpleNamespace + + cfg = SimpleNamespace(enabled=False) + self.assertIsNone(create_audit_logger_from_config(cfg)) + + def test_returns_none_when_none(self): + self.assertIsNone(create_audit_logger_from_config(None)) + + def test_creates_stdout_logger(self): + from types import SimpleNamespace + + cfg = SimpleNamespace(enabled=True, sink="stdout", log_successful_reads=True) + al = create_audit_logger_from_config(cfg) + self.assertIsNotNone(al) + self.assertIsInstance(al._sink, StdoutAuditSink) + + def test_creates_file_logger(self): + from types import SimpleNamespace + + with tempfile.NamedTemporaryFile(suffix=".log", delete=False) as tmp: + path = tmp.name + + try: + cfg = SimpleNamespace( + enabled=True, sink="file", file_path=path, log_successful_reads=True + ) + al = create_audit_logger_from_config(cfg) + self.assertIsNotNone(al) + self.assertIsInstance(al._sink, FileAuditSink) + al.close() + finally: + os.unlink(path) + + def test_creates_logger_sink(self): + from types import SimpleNamespace + + cfg = SimpleNamespace(enabled=True, sink="logger", log_successful_reads=True) + al = create_audit_logger_from_config(cfg) + self.assertIsNotNone(al) + self.assertIsInstance(al._sink, LoggerAuditSink) + + def test_unknown_sink_falls_back_to_stdout(self): + from types import SimpleNamespace + + cfg = SimpleNamespace(enabled=True, sink="kafka", log_successful_reads=True) + al = create_audit_logger_from_config(cfg) + self.assertIsNotNone(al) + self.assertIsInstance(al._sink, StdoutAuditSink) + + +class TestAuditLoggingConfig(unittest.TestCase): + def test_config_defaults(self): + from feast.infra.feature_servers.base_config import AuditLoggingConfig + + cfg = AuditLoggingConfig() + self.assertFalse(cfg.enabled) + self.assertEqual(cfg.sink, "stdout") + self.assertEqual(cfg.file_path, "feast_audit.log") + self.assertTrue(cfg.log_successful_reads) + + def test_config_custom(self): + from feast.infra.feature_servers.base_config import AuditLoggingConfig + + cfg = AuditLoggingConfig( + enabled=True, + sink="file", + file_path="/var/log/feast_audit.jsonl", + log_successful_reads=False, + ) + self.assertTrue(cfg.enabled) + self.assertEqual(cfg.sink, "file") + self.assertEqual(cfg.file_path, "/var/log/feast_audit.jsonl") + self.assertFalse(cfg.log_successful_reads) + + def test_base_feature_server_config_includes_audit_logging(self): + from feast.infra.feature_servers.base_config import ( + AuditLoggingConfig, + BaseFeatureServerConfig, + ) + + cfg = BaseFeatureServerConfig( + audit_logging=AuditLoggingConfig(enabled=True, sink="stdout") + ) + self.assertIsNotNone(cfg.audit_logging) + self.assertTrue(cfg.audit_logging.enabled) + + def test_mcp_config_includes_audit_logging(self): + from feast.infra.feature_servers.base_config import AuditLoggingConfig + from feast.infra.mcp_servers.mcp_config import McpFeatureServerConfig + + cfg = McpFeatureServerConfig( + mcp_enabled=True, + audit_logging=AuditLoggingConfig( + enabled=True, sink="file", file_path="/tmp/audit.log" + ), + ) + self.assertIsNotNone(cfg.audit_logging) + self.assertTrue(cfg.audit_logging.enabled) + self.assertEqual(cfg.audit_logging.sink, "file") diff --git a/sdk/python/tests/unit/audit/test_audit_middleware.py b/sdk/python/tests/unit/audit/test_audit_middleware.py new file mode 100644 index 00000000000..dd9feca7f8b --- /dev/null +++ b/sdk/python/tests/unit/audit/test_audit_middleware.py @@ -0,0 +1,132 @@ +import unittest + +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from feast.audit.audit_logger import ( + AuditEvent, + AuditLogger, + AuditSink, +) +from feast.audit.audit_middleware import AuditLoggingMiddleware + + +class InMemorySink(AuditSink): + def __init__(self): + self.events: list[AuditEvent] = [] + + def emit(self, event: AuditEvent) -> None: + self.events.append(event) + + +def _make_app(audit_logger=None): + """Create a minimal FastAPI app with audit middleware for testing.""" + app = FastAPI() + app.state.audit_logger = audit_logger + + app.add_middleware(AuditLoggingMiddleware) + + @app.post("/get-online-features") + async def get_online_features(): + return {"result": "ok"} + + @app.post("/push") + async def push(): + return {"result": "ok"} + + @app.get("/health") + async def health(): + return {"status": "ok"} + + @app.post("/mcp") + async def mcp(): + return {"jsonrpc": "2.0", "result": "ok", "id": 1} + + @app.post("/error-endpoint") + async def error_endpoint(): + raise ValueError("test error") + + return app + + +class TestAuditLoggingMiddleware(unittest.TestCase): + def test_logs_http_request(self): + sink = InMemorySink() + audit = AuditLogger(sink) + app = _make_app(audit) + client = TestClient(app, raise_server_exceptions=False) + + resp = client.post("/get-online-features") + self.assertEqual(resp.status_code, 200) + + http_events = [e for e in sink.events if e.event_type == "http.request"] + self.assertEqual(len(http_events), 1) + event = http_events[0] + self.assertEqual(event.outcome, "success") + self.assertIn("/get-online-features", event.detail) + self.assertIsNotNone(event.duration_ms) + self.assertGreaterEqual(event.duration_ms, 0) + + def test_skips_health_endpoint(self): + sink = InMemorySink() + audit = AuditLogger(sink) + app = _make_app(audit) + client = TestClient(app, raise_server_exceptions=False) + + client.get("/health") + http_events = [e for e in sink.events if e.event_type == "http.request"] + self.assertEqual(len(http_events), 0) + + def test_skips_mcp_endpoint(self): + sink = InMemorySink() + audit = AuditLogger(sink) + app = _make_app(audit) + client = TestClient(app, raise_server_exceptions=False) + + client.post("/mcp", json={"jsonrpc": "2.0", "method": "tools/list"}) + http_events = [e for e in sink.events if e.event_type == "http.request"] + self.assertEqual(len(http_events), 0) + + def test_logs_failure_on_error(self): + sink = InMemorySink() + audit = AuditLogger(sink) + app = _make_app(audit) + client = TestClient(app, raise_server_exceptions=False) + + resp = client.post("/error-endpoint") + self.assertEqual(resp.status_code, 500) + + http_events = [e for e in sink.events if e.event_type == "http.request"] + self.assertEqual(len(http_events), 1) + self.assertEqual(http_events[0].outcome, "error") + + def test_no_logging_when_audit_logger_is_none(self): + app = _make_app(audit_logger=None) + client = TestClient(app, raise_server_exceptions=False) + + resp = client.post("/get-online-features") + self.assertEqual(resp.status_code, 200) + + def test_uses_x_request_id_header(self): + sink = InMemorySink() + audit = AuditLogger(sink) + app = _make_app(audit) + client = TestClient(app, raise_server_exceptions=False) + + client.post( + "/get-online-features", + headers={"x-request-id": "custom-id-123"}, + ) + http_events = [e for e in sink.events if e.event_type == "http.request"] + self.assertEqual(http_events[0].request_id, "custom-id-123") + + def test_resource_mapping(self): + sink = InMemorySink() + audit = AuditLogger(sink) + app = _make_app(audit) + client = TestClient(app, raise_server_exceptions=False) + + client.post("/push") + http_events = [e for e in sink.events if e.event_type == "http.request"] + self.assertEqual(http_events[0].resource.type, "push_source") + self.assertIn("WRITE_ONLINE", http_events[0].resource.actions) diff --git a/sdk/python/tests/unit/audit/test_mcp_audit_handler.py b/sdk/python/tests/unit/audit/test_mcp_audit_handler.py new file mode 100644 index 00000000000..39f86b78c45 --- /dev/null +++ b/sdk/python/tests/unit/audit/test_mcp_audit_handler.py @@ -0,0 +1,241 @@ +"""Tests for MCP protocol-layer audit logging (handler wrapping). + +Validates ``_wrap_call_tool_handler`` and ``_principal_from_mcp_context`` +from ``feast.infra.mcp_servers.mcp_server``. +""" + +import asyncio +import unittest +from types import SimpleNamespace +from unittest.mock import MagicMock + +from feast.audit.audit_logger import ( + AuditEvent, + AuditLogger, + AuditSink, +) +from feast.infra.mcp_servers.mcp_server import ( + _principal_from_mcp_context, + _wrap_call_tool_handler, +) + + +class InMemorySink(AuditSink): + def __init__(self): + self.events: list[AuditEvent] = [] + + def emit(self, event: AuditEvent) -> None: + self.events.append(event) + + +def _run(coro): + """Helper to run an async function synchronously in tests.""" + return asyncio.get_event_loop().run_until_complete(coro) + + +def _make_fake_mcp(handler=None): + """Build a fake FastApiMCP with a minimal ``server`` that has ``_request_handlers``.""" + + async def default_handler(ctx, params): + return SimpleNamespace(isError=False) + + handlers = {"tools/call": handler or default_handler} + server = SimpleNamespace(_request_handlers=handlers) + return SimpleNamespace(server=server) + + +class TestWrapCallToolHandler(unittest.TestCase): + def test_successful_call_logs_success(self): + sink = InMemorySink() + audit = AuditLogger(sink) + + async def handler(ctx, params): + return SimpleNamespace(isError=False) + + mcp = _make_fake_mcp(handler) + _wrap_call_tool_handler(mcp, audit) + + ctx = SimpleNamespace(request_id=42) + params = SimpleNamespace(name="get_online_features") + _run(mcp.server._request_handlers["tools/call"](ctx, params)) + + self.assertEqual(len(sink.events), 1) + event = sink.events[0] + self.assertEqual(event.event_type, "mcp.tools.call") + self.assertEqual(event.action.mcp_tool, "get_online_features") + self.assertEqual(event.outcome, "success") + self.assertEqual(event.jsonrpc_id, "42") + self.assertEqual(event.source.transport, "mcp-http") + self.assertIsNotNone(event.duration_ms) + self.assertGreaterEqual(event.duration_ms, 0) + + def test_handler_exception_logs_error(self): + sink = InMemorySink() + audit = AuditLogger(sink) + + async def handler(ctx, params): + raise RuntimeError("tool exploded") + + mcp = _make_fake_mcp(handler) + _wrap_call_tool_handler(mcp, audit) + + ctx = SimpleNamespace(request_id=7) + params = SimpleNamespace(name="failing_tool") + with self.assertRaises(RuntimeError): + _run(mcp.server._request_handlers["tools/call"](ctx, params)) + + self.assertEqual(len(sink.events), 1) + event = sink.events[0] + self.assertEqual(event.outcome, "error") + self.assertEqual(event.action.mcp_tool, "failing_tool") + self.assertIn("tool exploded", event.detail) + + def test_isError_result_logs_mcp_error(self): + sink = InMemorySink() + audit = AuditLogger(sink) + + async def handler(ctx, params): + return SimpleNamespace(isError=True) + + mcp = _make_fake_mcp(handler) + _wrap_call_tool_handler(mcp, audit) + + ctx = SimpleNamespace(request_id=99) + params = SimpleNamespace(name="bad_tool") + _run(mcp.server._request_handlers["tools/call"](ctx, params)) + + self.assertEqual(len(sink.events), 1) + self.assertEqual(sink.events[0].outcome, "mcp_error") + + def test_result_without_isError_attr_logs_success(self): + sink = InMemorySink() + audit = AuditLogger(sink) + + async def handler(ctx, params): + return {"content": "plain dict result"} + + mcp = _make_fake_mcp(handler) + _wrap_call_tool_handler(mcp, audit) + + ctx = SimpleNamespace(request_id=1) + params = SimpleNamespace(name="simple_tool") + _run(mcp.server._request_handlers["tools/call"](ctx, params)) + + self.assertEqual(sink.events[0].outcome, "success") + + def test_no_handler_skips_wrapping(self): + sink = InMemorySink() + audit = AuditLogger(sink) + + server = SimpleNamespace(_request_handlers={}) + mcp = SimpleNamespace(server=server) + _wrap_call_tool_handler(mcp, audit) + + self.assertNotIn("tools/call", mcp.server._request_handlers) + + def test_no_request_handlers_attr_is_safe(self): + sink = InMemorySink() + audit = AuditLogger(sink) + + mcp = SimpleNamespace(server=SimpleNamespace()) + _wrap_call_tool_handler(mcp, audit) + + def test_jsonrpc_id_from_ctx_request_id(self): + sink = InMemorySink() + audit = AuditLogger(sink) + + async def handler(ctx, params): + return SimpleNamespace(isError=False) + + mcp = _make_fake_mcp(handler) + _wrap_call_tool_handler(mcp, audit) + + ctx = SimpleNamespace(request_id="req-abc-123") + params = SimpleNamespace(name="some_tool") + _run(mcp.server._request_handlers["tools/call"](ctx, params)) + + self.assertEqual(sink.events[0].jsonrpc_id, "req-abc-123") + + def test_jsonrpc_id_none_when_ctx_has_no_request_id(self): + sink = InMemorySink() + audit = AuditLogger(sink) + + async def handler(ctx, params): + return SimpleNamespace(isError=False) + + mcp = _make_fake_mcp(handler) + _wrap_call_tool_handler(mcp, audit) + + ctx = SimpleNamespace() + params = SimpleNamespace(name="tool") + _run(mcp.server._request_handlers["tools/call"](ctx, params)) + + self.assertIsNone(sink.events[0].jsonrpc_id) + + def test_params_none_uses_empty_tool_name(self): + sink = InMemorySink() + audit = AuditLogger(sink) + + async def handler(ctx, params): + return SimpleNamespace(isError=False) + + mcp = _make_fake_mcp(handler) + _wrap_call_tool_handler(mcp, audit) + + ctx = SimpleNamespace(request_id=1) + _run(mcp.server._request_handlers["tools/call"](ctx, None)) + + self.assertEqual(sink.events[0].action.mcp_tool, "") + + def test_original_handler_result_is_returned(self): + sink = InMemorySink() + audit = AuditLogger(sink) + sentinel = object() + + async def handler(ctx, params): + return sentinel + + mcp = _make_fake_mcp(handler) + _wrap_call_tool_handler(mcp, audit) + + ctx = SimpleNamespace(request_id=1) + params = SimpleNamespace(name="tool") + result = _run(mcp.server._request_handlers["tools/call"](ctx, params)) + self.assertIs(result, sentinel) + + +class TestPrincipalFromMcpContext(unittest.TestCase): + def test_extracts_auth_type_header(self): + request = MagicMock() + request.headers = {"x-feast-auth-type": "oidc", "authorization": "Bearer tok"} + ctx = SimpleNamespace(request=request) + + principal = _principal_from_mcp_context(ctx) + self.assertEqual(principal.auth_type, "oidc") + self.assertEqual(principal.username, "(authenticated)") + + def test_no_auth_header_returns_empty_username(self): + request = MagicMock() + request.headers = {} + ctx = SimpleNamespace(request=request) + + principal = _principal_from_mcp_context(ctx) + self.assertEqual(principal.username, "") + self.assertEqual(principal.auth_type, "") + + def test_no_request_returns_empty_principal(self): + ctx = SimpleNamespace() + principal = _principal_from_mcp_context(ctx) + self.assertEqual(principal.username, "") + + def test_none_ctx_returns_empty_principal(self): + principal = _principal_from_mcp_context(None) + self.assertEqual(principal.username, "") + + def test_exception_in_headers_returns_empty_principal(self): + request = MagicMock() + request.headers = property(lambda self: (_ for _ in ()).throw(RuntimeError)) + ctx = SimpleNamespace(request=request) + + principal = _principal_from_mcp_context(ctx) + self.assertEqual(principal.username, "")