From 9ccf61f917fb90070b47d5611a97fb9d54fc4b29 Mon Sep 17 00:00:00 2001 From: Zach Barnett Date: Wed, 18 Mar 2026 21:31:44 -0500 Subject: [PATCH] fix: Add drain deadline inside of while loop for online_write_batch --- .../cassandra_online_store/cassandra_online_store.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py index 176066268b0..83158083e82 100644 --- a/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/cassandra_online_store/cassandra_online_store.py @@ -627,9 +627,19 @@ def on_failure(exc, concurrent_queue): logger.warning( f"Waiting for futures. Pending are {concurrent_queue.qsize()}" ) + # Drain timeout: the original loop had no timeout, so a + # future that never completes (e.g. silent connection drop) + # would cause the Spark task to hang forever, blocking the + # entire streaming micro-batch. + drain_deadline = time.monotonic() + 300 # 5 minute safety valve while not concurrent_queue.empty(): if ex: raise ex + if time.monotonic() > drain_deadline: + raise Exception( + "Timed out waiting for Cassandra futures to drain. " + f"Pending: {concurrent_queue.qsize()}" + ) time.sleep(0.001) if ex: raise ex