From 20539cf3151a93bd5e1875f03cc1e2c85ebf37af Mon Sep 17 00:00:00 2001 From: Himanshu Singh Date: Sun, 24 May 2026 23:45:47 +0530 Subject: [PATCH] feat(cassandra): add multi-DC support via per-datacenter execution profiles MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce a `datacenters` config list as an alternative to the flat `hosts` field. Each entry specifies the contact points, local_dc, and optional replication metadata for one datacenter. On connect, a named Cassandra execution profile is registered per DC (profile key = local_dc), enabling callers to route driver-level queries to a specific datacenter. The default profile is pinned to `load_balancing.local_dc`, or the first DC when load_balancing is absent. All existing `hosts` / `secure_bundle_path` configs are fully backward compatible — the original code path is untouched. Also updates docs/reference/online-stores/cassandra.md with a multi-DC feature_store.yaml example. Signed-off-by: singhhimanshu0811 email : itssinghhimanshu@gmail.com Signed-off-by: Himanshu Singh --- docs/reference/online-stores/cassandra.md | 38 +++++ .../cassandra_online_store.py | 142 ++++++++++++++++++ 2 files changed, 180 insertions(+) diff --git a/docs/reference/online-stores/cassandra.md b/docs/reference/online-stores/cassandra.md index 198f15ca47f..81be835e49b 100644 --- a/docs/reference/online-stores/cassandra.md +++ b/docs/reference/online-stores/cassandra.md @@ -37,6 +37,44 @@ online_store: ``` {% endcode %} +### Example (Cassandra — multi-DC) + +Use `datacenters` instead of `hosts` when your cluster spans multiple datacenters. +Each entry gets a named Cassandra **execution profile** (keyed by `local_dc`), enabling +per-DC routing. The default profile is set by `load_balancing.local_dc` (or the first +datacenter if `load_balancing` is absent). The keyspace must already exist. + +{% code title="feature_store.yaml" %} +```yaml +project: my_feature_repo +registry: data/registry.db +provider: local +online_store: + type: cassandra + keyspace: KeyspaceName + datacenters: + - local_dc: dc1 + hosts: + - 192.168.1.1 + - 192.168.1.2 + replication_factor: 3 # optional, informational + replication_strategy: NetworkTopologyStrategy # optional, informational + - local_dc: dc2 + hosts: + - 10.0.0.1 + replication_factor: 2 # optional, informational + port: 9042 # optional + username: user # optional + password: secret # optional + protocol_version: 5 # optional + load_balancing: # optional + local_dc: 'dc1' # selects the default DC profile + load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional + read_concurrency: 100 # optional + write_concurrency: 100 # optional +``` +{% endcode %} + ### Example (Astra DB) {% code title="feature_store.yaml" %} diff --git a/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py index 0870bc709db..fc201b3526f 100644 --- a/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py @@ -70,6 +70,10 @@ E_CASSANDRA_UNKNOWN_LB_POLICY = ( "Unknown/unsupported Load Balancing Policy name in Cassandra configuration" ) +E_CASSANDRA_DC_DEFAULT_NOT_FOUND = ( + "Cassandra multi-DC: 'load_balancing.local_dc' does not match any entry " + "in the 'datacenters' list" +) # CQL command templates (that is, before replacing schema names) INSERT_CQL_4_TEMPLATE = ( @@ -153,6 +157,61 @@ class CassandraOnlineStoreConfig(FeastConfigBaseModel): request_timeout: Optional[StrictFloat] = None """Request timeout in seconds.""" + class CassandraDatacenterConfig(FeastConfigBaseModel): + """ + Per-datacenter configuration for multi-DC Cassandra deployments. + + When the top-level ``datacenters`` list is used, each entry defines + the contact points and replication settings for one datacenter. + A named Cassandra execution profile (keyed by ``local_dc``) is created + for every entry, enabling targeted per-DC routing via the driver's + execution-profile mechanism. + """ + + local_dc: StrictStr + """ + Datacenter name as reported by the cluster (e.g. ``datacenter1``). + Also used as the named execution-profile key. + """ + + hosts: List[StrictStr] + """Contact-point host addresses belonging to this datacenter.""" + + replication_factor: Optional[StrictInt] = None + """Replication factor for this datacenter (informational; keyspace must be pre-created).""" + + replication_strategy: Optional[StrictStr] = None + """ + Replication strategy class (e.g. ``'SimpleStrategy'``, + ``'NetworkTopologyStrategy'``). Informational; keyspace must be pre-created. + """ + + datacenters: Optional[List[CassandraDatacenterConfig]] = None + """ + Per-datacenter configuration enabling multi-DC Cassandra support. + + When set, a named Cassandra execution profile is registered for each + datacenter (profile name = ``local_dc``). The default profile is + determined by ``load_balancing.local_dc``; if ``load_balancing`` is + absent the first datacenter in the list is used as default. + + Mutually exclusive with ``hosts`` and ``secure_bundle_path``. + The keyspace must already exist; Feast does not create it automatically. + + Example:: + + datacenters: + - local_dc: dc1 + hosts: [192.168.1.1, 192.168.1.2] + replication_factor: 3 + - local_dc: dc2 + hosts: [10.0.0.1] + replication_factor: 2 + load_balancing: + local_dc: dc1 + load_balancing_policy: TokenAwarePolicy(DCAwareRoundRobinPolicy) + """ + class CassandraLoadBalancingPolicy(FeastConfigBaseModel): """ Configuration block related to the Cluster's load-balancing policy. @@ -173,6 +232,9 @@ class CassandraLoadBalancingPolicy(FeastConfigBaseModel): """ Details on the load-balancing policy: it will be wrapped into an execution profile if present. + + In multi-DC mode (``datacenters`` list), ``local_dc`` also selects + which datacenter's profile becomes the default execution profile. """ read_concurrency: Optional[StrictInt] = 100 @@ -208,6 +270,14 @@ class CassandraOnlineStore(OnlineStore): _session: Session = None _keyspace: str = "feast_keyspace" _prepared_statements: Dict[str, PreparedStatement] = {} + _dc_execution_profiles: List[str] = [] + """ + Named execution-profile keys registered in multi-DC mode, one per + datacenter (e.g. ``["dc1", "dc2"]``). Empty in single-DC / Astra mode. + Use these names to route driver-level queries to a specific datacenter:: + + session.execute(stmt, params, execution_profile="dc2") + """ def _get_session(self, config: RepoConfig): """ @@ -223,6 +293,78 @@ def _get_session(self, config: RepoConfig): if self._session: return self._session + + # ------------------------------------------------------------------ + # Branch B: multi-DC mode — one named execution profile per datacenter + # ------------------------------------------------------------------ + if online_store_config.datacenters: + port = online_store_config.port or 9042 + keyspace = online_store_config.keyspace + username = online_store_config.username + password = online_store_config.password + protocol_version = online_store_config.protocol_version + + if (username is None) ^ (password is None): + raise CassandraInvalidConfig(E_CASSANDRA_INCONSISTENT_AUTH) + + auth_provider = ( + PlainTextAuthProvider(username=username, password=password) + if username is not None + else None + ) + + _lbp_name = ( + online_store_config.load_balancing.load_balancing_policy + if online_store_config.load_balancing + else "DCAwareRoundRobinPolicy" + ) + + execution_profiles: Dict[Any, ExecutionProfile] = {} + all_hosts: List[str] = [] + for dc in online_store_config.datacenters: + all_hosts.extend(dc.hosts) + if _lbp_name == "DCAwareRoundRobinPolicy": + lb_policy = DCAwareRoundRobinPolicy(local_dc=dc.local_dc) + elif _lbp_name == "TokenAwarePolicy(DCAwareRoundRobinPolicy)": + lb_policy = TokenAwarePolicy( + DCAwareRoundRobinPolicy(local_dc=dc.local_dc) + ) + else: + raise CassandraInvalidConfig(E_CASSANDRA_UNKNOWN_LB_POLICY) + execution_profiles[dc.local_dc] = ExecutionProfile( + request_timeout=online_store_config.request_timeout, + load_balancing_policy=lb_policy, + ) + self._dc_execution_profiles.append(dc.local_dc) + + default_dc = ( + online_store_config.load_balancing.local_dc + if online_store_config.load_balancing + else online_store_config.datacenters[0].local_dc + ) + if default_dc not in execution_profiles: + raise CassandraInvalidConfig(E_CASSANDRA_DC_DEFAULT_NOT_FOUND) + execution_profiles[EXEC_PROFILE_DEFAULT] = execution_profiles[default_dc] + + cluster_kwargs = { + k: v + for k, v in {"protocol_version": protocol_version}.items() + if v is not None + } + self._cluster = Cluster( + all_hosts, + port=port, + auth_provider=auth_provider, + execution_profiles=execution_profiles, + **cluster_kwargs, + ) + self._keyspace = keyspace + self._session = self._cluster.connect(self._keyspace) + return self._session + + # ------------------------------------------------------------------ + # Branch A: original single-DC / Astra DB (unchanged) + # ------------------------------------------------------------------ if not self._session: # configuration consistency checks hosts = online_store_config.hosts