NATS Input Connector: Bug fix, reliability & test improvements#5761
NATS Input Connector: Bug fix, reliability & test improvements#5761blp merged 18 commits intofeldera:mainfrom
Conversation
18d92a2 to
5d11eef
Compare
mythical-fred
left a comment
There was a problem hiding this comment.
Clean fix. The deferred JetStream init and the two-probe inactivity check together solve both the startup stall and the silent-loop bug. The Retryable/Fatal distinction and the lifecycle state machine make the error handling legible.
One implicit constraint worth documenting somewhere: a fatal validation error (resume cursor before stream's first available sequence) stops the worker task and is not self-healing — the pipeline would need an operator reset. This is the right behavior, but operators hitting it for the first time may be surprised.
| if stream_state.messages == 0 { | ||
| return Err(anyhow!( | ||
| "Resume sequence {resume_cursor} is invalid for stream '{stream_name}': stream is empty" | ||
| )); |
There was a problem hiding this comment.
Fatal here means the worker exits immediately on the next loop iteration (Stopped => break). Worth a brief doc comment on validate_resume_position noting that a compacted stream is a non-recoverable error and callers should treat it as such — particularly since NATS stream retention policies can make this happen silently in production.
There was a problem hiding this comment.
Will add a section/note about fatal errors.
| /// Returns `Ok(())` if the server and stream are healthy (caller should | ||
| /// continue its loop), or an error describing the stall and the failed check. | ||
| async fn check_inactivity_health( | ||
| jetstream: &jetstream::Context, |
There was a problem hiding this comment.
Nice two-probe approach. One small edge case: if fetch_stream_state returns an error because the existing JetStream context has a stale connection and the fallback reconnect succeeds, we return Ok(()) and resume reading. But the original nats_messages stream is still bound to the stale connection — the health check passing does not heal the stream. In that case the inactivity timeout will fire again on the next cycle, triggering another health check → reconnect detection → Retryable error → retry loop. So correctness is preserved, just slightly delayed. No action needed, but worth a comment.
There was a problem hiding this comment.
Good catch on the edge case. But the nats consumer is an ordered consumer and is it self-heals here: when the underlying TCP connection drops and async_nats auto-reconnects, the ordered consumer detects the lost heartbeats and automatically recreates its subscription on the new connection. So the nats_messages stream resumes producing messages without intervention, we won't actually see the extra timeout cycle of delay described in the old comment.
I've updated the doc comment to reflect this.
|
@kfollesdal Wow, this is super comprehensive. It took me quite a while just to read the commits with some degree of care. I'll approve it to run in the CI. Thanks a lot, those tests are amazing. |
|
@kfollesdal Oh, BTW, in case it wasn't obvious, Fred is the review LLM. I hope he's helpful; if he's just wrong about things, sorry about that. |
|
I don't understand the failures in CI, they are odd. I'll check them locally. |
I take it back, I was only looking at the @kfollesdal, you're welcome to annotate these with |
Sorry about that, I forgot to run clippy this time. I thought I had remembered everything 😊 I will fix. |
|
@blp the easiest way to test / reproduce pre-merge queue failures locally is to run
|
He he .. it was some clues ;-) Good feedback from him. |
Thanks, got some urelated error to my PR. But otherwise this should be good now. |
7a9fc12 to
ec39d21
Compare
ec39d21 to
b54d28f
Compare
|
@kfollesdal Would you mind rebasing to fix the conflict? It should be trivial. I updated a NATS test on main because it used a function that changed signature, and you deleted the file, so the solution at the conflict point is just to reaffirm the deletion. |
@blp, will do. Give me a couple of minutes 👍 |
…ze tests by type Reorganize NATS input tests from a single test.rs file into a modular test directory structure: - test/mod.rs: shared types (NatsTestRecord) and utility functions - test/mock_framework.rs: declarative NatsMockAction test runner for unit-level tests using mock_input_pipeline - test/controller_framework.rs: declarative NatsControllerAction test runner for fault-tolerance tests using the full Controller - test/mock_tests.rs: mock-based tests (basic ingestion) - test/control_tests.rs: controller-based FT tests - test/custom_tests.rs: standalone tests without test framework All existing tests are ported to the new framework-based style. No new test scenarios are introduced in this commit. Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
…config fields Add two new configuration fields to NatsInputConfig: - inactivity_timeout_secs: maximum time to wait for the next message before running a stream/server health check (default: 10s) - retry_interval_secs: delay between automatic reconnect attempts while in retry mode (default: 5s) Both fields are validated to be at least 1 second during endpoint construction. Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
Replace eager connect-and-verify in the constructor with lazy initialization via initialize_jetstream(). Each replay and extend cycle now connects on demand with a combined timeout, allowing the worker to retry connections instead of failing the entire endpoint at construction time. Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
…infrastructure Add building blocks for the upcoming retry state machine: - ConnectorError enum (Retryable/Fatal) for classifying errors - verify_server_and_stream_health() for full reconnect probes - check_inactivity_health() with two-tier probe (cheap stream info fetch, then full reconnect fallback) - StreamState and fetch_stream_state() for querying stream metadata Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
Introduce a ReaderLifecycleState (Running/ErrorRetrying/Paused/Stopped) that drives the worker task's main loop: - Running: monitors the reader error channel and commands concurrently - ErrorRetrying: waits for a retry timer, then attempts to start a new reader via try_start_stream_reader() - Paused/Stopped: awaits commands or exits Key changes: - try_start_stream_reader() encapsulates connect + consumer creation + spawn_nats_reader() with ConnectorError classification - spawn_nats_reader() now reports errors via a channel instead of calling consumer.error() directly, letting the state machine decide whether to retry or stop - consume_nats_messages_until() and spawn_nats_reader() gain inactivity timeout support with check_inactivity_health() probes - Replay loop retries on transient errors with command interleaving - Errors are classified as Retryable or Fatal throughout Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
Add comprehensive mock-based tests for the new NATS input behaviors: Retry and recovery: - test_nats_retry_loop_when_server_unavailable - test_nats_resume_after_server_becomes_available - test_nats_pause_during_retry_loop - test_nats_no_stale_error_after_pause_extend - test_nats_mid_run_server_restart_recovers_no_fatal - test_nats_connection_refused_enters_retry_loop - test_nats_startup_connection_refused_retries_repeatedly - test_nats_stream_not_found_enters_retry_loop - test_nats_connection_timeout - test_nats_pause_interrupts_inflight_retry_attempt Inactivity timeout: - test_nats_quiet_but_healthy_no_false_alarm - test_nats_inactivity_timeout_config_is_honored Configuration validation: - test_nats_retry_interval_config_is_honored - test_nats_retry_interval_zero_rejected Replay: - test_nats_replay_basic - test_nats_replay_partial_range - test_nats_replay_then_extend - test_nats_replay_empty_range - test_nats_replay_multiple_ranges - test_nats_replay_stream_deleted_retries_and_recovers - test_nats_replay_server_killed_retries_non_fatal - test_nats_replay_after_purge_errors - test_nats_replay_end_after_last_sequence_fails_fast - test_nats_replay_start_before_first_sequence_fails_fast - test_nats_replay_empty_stream_fails_fast Disconnect: - test_nats_disconnect_stops_delivery Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
Add new controller-based fault tolerance tests: - test_nats_ft_replay_after_stream_purge: replay fails when checkpointed messages have been purged - test_nats_ft_stream_deletion_and_recreation: replay fails after stream delete+recreate since old sequences are gone - test_nats_ft_stream_deletion_after_full_checkpoint: startup fails when resume cursor points at deleted stream - test_nats_ft_startup_retries_when_stream_missing: startup enters retry mode when configured stream doesn't exist yet Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
Add test_nats_inactivity_timeout_zero_rejected to verify that inactivity_timeout_secs=0 is rejected during pipeline creation. Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
Demote high-frequency log statements to lower levels to avoid flooding
logs during normal operation:
- Got message #N: info -> trace (emitted per message)
- Queued {N} records: info -> debug (emitted every flush cycle)
- Queued 0 records: debug -> trace (empty flushes are noise)
Lifecycle events (connection, recovery, errors) remain at INFO/ERROR.
Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
…e root cause
Add retry_count tracker to the reader lifecycle state machine:
- First error: 'entered ERROR state, will retry in {interval}'
- Subsequent retries: 'retry #{N} failed, next attempt in {interval}'
using error.root_cause() instead of the full context chain
- Recovery: 'recovered after {N} retries, resuming from {cursor}'
- Counter resets on recovery, pause, and successful extend
Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
Trim the verbose nested error chains in check_inactivity_health() by
using root_cause() instead of the full anyhow context chain ({:#}).
This makes the initial 'stalled' error message much more readable.
Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
The ordered consumer automatically detects lost heartbeats and recreates its subscription on a reconnected client, so a stale connection is not an edge case that requires caller intervention. Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
Introduce three small structs to group related parameters: - StreamContext: connection config, stream name, inactivity timeout - ReaderState: shared next_sequence and queue - MessagePipeline: consumer and parser pair Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
Detail specific fatal error scenarios: stream deleted/recreated, stream purged, stream emptied, and unexpected sequence during replay. Add recovery tip for resetting pipeline checkpoint state. Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
b54d28f to
af9acf2
Compare
|
@blp, rebased and solved conflicts. And have run tests and pre-commit, but the |
|
@blp I think you need to reivew/approve before it can get merged |
|
The mysterious disappearing test binary problem is back. Requeuing. |
Summary
Fixes a critical bug where the NATS consumer could stall in an infinite silent loop, making pipeline unreliable and unstable. This PR also improves overall connector reliability through better retry logic, lifecycle management, and pause/resume support.
Changes
retry_interval_secs, error counters, and concise root-cause reportingNew config fields
inactivity_timeout_secs— health check timeout for stalled consumersretry_interval_secs— interval between connection retriesDescribe Manual Test Plan
Have run full pipline in pipline manager locally, have not tested enterprise features. But FT should be covered by thest tests in code.
Checklist
Breaking Changes?
Mark if you think the answer is yes for any of these components:
Describe Incompatible Changes