Skip to content

NATS Input Connector: Bug fix, reliability & test improvements#5761

Merged
blp merged 18 commits intofeldera:mainfrom
fornybar:nats-bug-and-improve2
Mar 13, 2026
Merged

NATS Input Connector: Bug fix, reliability & test improvements#5761
blp merged 18 commits intofeldera:mainfrom
fornybar:nats-bug-and-improve2

Conversation

@kfollesdal
Copy link
Contributor

@kfollesdal kfollesdal commented Mar 5, 2026

Summary
Fixes a critical bug where the NATS consumer could stall in an infinite silent loop, making pipeline unreliable and unstable. This PR also improves overall connector reliability through better retry logic, lifecycle management, and pause/resume support.

Changes

  • Bug fix: Resolve stalled consumer entering an infinite silent loop by deferring JetStream initialization to the worker task and adding an inactivity health check
  • Pause/resume: Make pause/start connector lifecycle work correctly via a reader retry state machine
  • Retry improvements: Add structured retry with configurable retry_interval_secs, error counters, and concise root-cause reporting
  • Logging: Reduce per-message and per-flush log verbosity; improve retry/error log messages
  • Testing: Introduce mock and controller test frameworks for NATS; add tests for retry, recovery, replay, disconnect, lifecycle, and config validation

New config fields

  • inactivity_timeout_secs — health check timeout for stalled consumers
  • retry_interval_secs — interval between connection retries

Describe Manual Test Plan

Have run full pipline in pipline manager locally, have not tested enterprise features. But FT should be covered by thest tests in code.

Checklist

  • Unit tests added/updated
  • Integration tests added/updated
  • Documentation updated
  • Changelog updated

Breaking Changes?

Mark if you think the answer is yes for any of these components:

Describe Incompatible Changes

@kfollesdal kfollesdal force-pushed the nats-bug-and-improve2 branch from 18d92a2 to 5d11eef Compare March 5, 2026 15:36
Copy link

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean fix. The deferred JetStream init and the two-probe inactivity check together solve both the startup stall and the silent-loop bug. The Retryable/Fatal distinction and the lifecycle state machine make the error handling legible.

One implicit constraint worth documenting somewhere: a fatal validation error (resume cursor before stream's first available sequence) stops the worker task and is not self-healing — the pipeline would need an operator reset. This is the right behavior, but operators hitting it for the first time may be surprised.

if stream_state.messages == 0 {
return Err(anyhow!(
"Resume sequence {resume_cursor} is invalid for stream '{stream_name}': stream is empty"
));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fatal here means the worker exits immediately on the next loop iteration (Stopped => break). Worth a brief doc comment on validate_resume_position noting that a compacted stream is a non-recoverable error and callers should treat it as such — particularly since NATS stream retention policies can make this happen silently in production.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add a section/note about fatal errors.

/// Returns `Ok(())` if the server and stream are healthy (caller should
/// continue its loop), or an error describing the stall and the failed check.
async fn check_inactivity_health(
jetstream: &jetstream::Context,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice two-probe approach. One small edge case: if fetch_stream_state returns an error because the existing JetStream context has a stale connection and the fallback reconnect succeeds, we return Ok(()) and resume reading. But the original nats_messages stream is still bound to the stale connection — the health check passing does not heal the stream. In that case the inactivity timeout will fire again on the next cycle, triggering another health check → reconnect detection → Retryable error → retry loop. So correctness is preserved, just slightly delayed. No action needed, but worth a comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch on the edge case. But the nats consumer is an ordered consumer and is it self-heals here: when the underlying TCP connection drops and async_nats auto-reconnects, the ordered consumer detects the lost heartbeats and automatically recreates its subscription on the new connection. So the nats_messages stream resumes producing messages without intervention, we won't actually see the extra timeout cycle of delay described in the old comment.

I've updated the doc comment to reflect this.

@blp
Copy link
Member

blp commented Mar 5, 2026

@kfollesdal Wow, this is super comprehensive. It took me quite a while just to read the commits with some degree of care. I'll approve it to run in the CI. Thanks a lot, those tests are amazing.

@blp
Copy link
Member

blp commented Mar 5, 2026

@kfollesdal Oh, BTW, in case it wasn't obvious, Fred is the review LLM. I hope he's helpful; if he's just wrong about things, sorry about that.

@blp
Copy link
Member

blp commented Mar 5, 2026

I don't understand the failures in CI, they are odd. I'll check them locally.

@blp
Copy link
Member

blp commented Mar 5, 2026

I don't understand the failures in CI, they are odd. I'll check them locally.

I take it back, I was only looking at the cargo doc errors at the end (which apparently we ignore?). These are the relevant clippy errors:

error: this function has too many arguments (8/7)
   --> crates/adapters/src/transport/nats/input.rs:273:5
    |
273 | /     async fn try_start_stream_reader(
274 | |         config: &Arc<NatsInputConfig>,
275 | |         nats_consumer_config: &NatsConsumerConfig,
276 | |         resume_cursor: Arc<AtomicU64>,
...   |
281 | |         reader_error_sender: UnboundedSender<ConnectorError>,
282 | |     ) -> Result<Canceller, ConnectorError> {
    | |__________________________________________^
    |
    = help: for further information visit https://rust-lang.github.io/rust-clippy/rust-1.91.0/index.html#too_many_arguments
    = note: `-D clippy::too-many-arguments` implied by `-D warnings`
    = help: to override `-D warnings` add `#[allow(clippy::too_many_arguments)]`

error: this function has too many arguments (8/7)
   --> crates/adapters/src/transport/nats/input.rs:743:1
    |
743 | / async fn consume_nats_messages_until(
744 | |     jetstream: &jetstream::Context,
745 | |     nats_consumer: NatsConsumer,
746 | |     last_message_sequence: u64,
...   |
751 | |     mut parser: Box<dyn Parser>,
752 | | ) -> Result<(Xxh3Default, BufferSize), ConnectorError> {
    | |______________________________________________________^
    |
    = help: for further information visit https://rust-lang.github.io/rust-clippy/rust-1.91.0/index.html#too_many_arguments

error: this function has too many arguments (10/7)
   --> crates/adapters/src/transport/nats/input.rs:834:1
    |
834 | / async fn spawn_nats_reader(
835 | |     jetstream: jetstream::Context,
836 | |     nats_consumer: NatsConsumer,
837 | |     next_sequence: Arc<AtomicU64>,
...   |
844 | |     reader_error_sender: UnboundedSender<ConnectorError>,
845 | | ) -> AnyResult<Canceller> {
    | |_________________________^
    |
    = help: for further information visit https://rust-lang.github.io/rust-clippy/rust-1.91.0/index.html#too_many_arguments

@kfollesdal, you're welcome to annotate these with #[allow(clippy::too_many_arguments)] if you think that is the best way to resolve them; we do need to resolve them one way or another.

@kfollesdal
Copy link
Contributor Author

I don't understand the failures in CI, they are odd. I'll check them locally.

I take it back, I was only looking at the cargo doc errors at the end (which apparently we ignore?). These are the relevant clippy errors:


error: this function has too many arguments (8/7)

   --> crates/adapters/src/transport/nats/input.rs:273:5

    |

273 | /     async fn try_start_stream_reader(

274 | |         config: &Arc<NatsInputConfig>,

275 | |         nats_consumer_config: &NatsConsumerConfig,

276 | |         resume_cursor: Arc<AtomicU64>,

...   |

281 | |         reader_error_sender: UnboundedSender<ConnectorError>,

282 | |     ) -> Result<Canceller, ConnectorError> {

    | |__________________________________________^

    |

    = help: for further information visit https://rust-lang.github.io/rust-clippy/rust-1.91.0/index.html#too_many_arguments

    = note: `-D clippy::too-many-arguments` implied by `-D warnings`

    = help: to override `-D warnings` add `#[allow(clippy::too_many_arguments)]`



error: this function has too many arguments (8/7)

   --> crates/adapters/src/transport/nats/input.rs:743:1

    |

743 | / async fn consume_nats_messages_until(

744 | |     jetstream: &jetstream::Context,

745 | |     nats_consumer: NatsConsumer,

746 | |     last_message_sequence: u64,

...   |

751 | |     mut parser: Box<dyn Parser>,

752 | | ) -> Result<(Xxh3Default, BufferSize), ConnectorError> {

    | |______________________________________________________^

    |

    = help: for further information visit https://rust-lang.github.io/rust-clippy/rust-1.91.0/index.html#too_many_arguments



error: this function has too many arguments (10/7)

   --> crates/adapters/src/transport/nats/input.rs:834:1

    |

834 | / async fn spawn_nats_reader(

835 | |     jetstream: jetstream::Context,

836 | |     nats_consumer: NatsConsumer,

837 | |     next_sequence: Arc<AtomicU64>,

...   |

844 | |     reader_error_sender: UnboundedSender<ConnectorError>,

845 | | ) -> AnyResult<Canceller> {

    | |_________________________^

    |

    = help: for further information visit https://rust-lang.github.io/rust-clippy/rust-1.91.0/index.html#too_many_arguments

@kfollesdal, you're welcome to annotate these with #[allow(clippy::too_many_arguments)] if you think that is the best way to resolve them; we do need to resolve them one way or another.

Sorry about that, I forgot to run clippy this time. I thought I had remembered everything 😊 I will fix.

@gz
Copy link
Contributor

gz commented Mar 6, 2026

@blp the easiest way to test / reproduce pre-merge queue failures locally is to run

pre-commit run --show-diff-on-failure --color=always --all-files

@kfollesdal
Copy link
Contributor Author

@kfollesdal Oh, BTW, in case it wasn't obvious, Fred is the review LLM. I hope he's helpful; if he's just wrong about things, sorry about that.

He he .. it was some clues ;-) Good feedback from him.

@kfollesdal
Copy link
Contributor Author

@blp the easiest way to test / reproduce pre-merge queue failures locally is to run

pre-commit run --show-diff-on-failure --color=always --all-files

Thanks, got some urelated error to my PR. But otherwise this should be good now.

@ryanjdillon ryanjdillon force-pushed the nats-bug-and-improve2 branch from 7a9fc12 to ec39d21 Compare March 9, 2026 15:16
@kfollesdal kfollesdal force-pushed the nats-bug-and-improve2 branch from ec39d21 to b54d28f Compare March 12, 2026 13:44
@blp
Copy link
Member

blp commented Mar 13, 2026

@kfollesdal Would you mind rebasing to fix the conflict? It should be trivial. I updated a NATS test on main because it used a function that changed signature, and you deleted the file, so the solution at the conflict point is just to reaffirm the deletion.

@kfollesdal
Copy link
Contributor Author

@kfollesdal Would you mind rebasing to fix the conflict? It should be trivial. I updated a NATS test on main because it used a function that changed signature, and you deleted the file, so the solution at the conflict point is just to reaffirm the deletion.

@blp, will do. Give me a couple of minutes 👍

…ze tests by type

Reorganize NATS input tests from a single test.rs file into a modular
test directory structure:

- test/mod.rs: shared types (NatsTestRecord) and utility functions
- test/mock_framework.rs: declarative NatsMockAction test runner for
  unit-level tests using mock_input_pipeline
- test/controller_framework.rs: declarative NatsControllerAction test
  runner for fault-tolerance tests using the full Controller
- test/mock_tests.rs: mock-based tests (basic ingestion)
- test/control_tests.rs: controller-based FT tests
- test/custom_tests.rs: standalone tests without test framework

All existing tests are ported to the new framework-based style.
No new test scenarios are introduced in this commit.

Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
…config fields

Add two new configuration fields to NatsInputConfig:

- inactivity_timeout_secs: maximum time to wait for the next message
  before running a stream/server health check (default: 10s)
- retry_interval_secs: delay between automatic reconnect attempts
  while in retry mode (default: 5s)

Both fields are validated to be at least 1 second during endpoint
construction.

Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
Replace eager connect-and-verify in the constructor with lazy
initialization via initialize_jetstream(). Each replay and extend
cycle now connects on demand with a combined timeout, allowing the
worker to retry connections instead of failing the entire endpoint
at construction time.

Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
…infrastructure

Add building blocks for the upcoming retry state machine:

- ConnectorError enum (Retryable/Fatal) for classifying errors
- verify_server_and_stream_health() for full reconnect probes
- check_inactivity_health() with two-tier probe (cheap stream info
  fetch, then full reconnect fallback)
- StreamState and fetch_stream_state() for querying stream metadata

Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
Introduce a ReaderLifecycleState (Running/ErrorRetrying/Paused/Stopped)
that drives the worker task's main loop:

- Running: monitors the reader error channel and commands concurrently
- ErrorRetrying: waits for a retry timer, then attempts to start a new
  reader via try_start_stream_reader()
- Paused/Stopped: awaits commands or exits

Key changes:
- try_start_stream_reader() encapsulates connect + consumer creation +
  spawn_nats_reader() with ConnectorError classification
- spawn_nats_reader() now reports errors via a channel instead of
  calling consumer.error() directly, letting the state machine decide
  whether to retry or stop
- consume_nats_messages_until() and spawn_nats_reader() gain inactivity
  timeout support with check_inactivity_health() probes
- Replay loop retries on transient errors with command interleaving
- Errors are classified as Retryable or Fatal throughout

Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
Add comprehensive mock-based tests for the new NATS input behaviors:

Retry and recovery:
- test_nats_retry_loop_when_server_unavailable
- test_nats_resume_after_server_becomes_available
- test_nats_pause_during_retry_loop
- test_nats_no_stale_error_after_pause_extend
- test_nats_mid_run_server_restart_recovers_no_fatal
- test_nats_connection_refused_enters_retry_loop
- test_nats_startup_connection_refused_retries_repeatedly
- test_nats_stream_not_found_enters_retry_loop
- test_nats_connection_timeout
- test_nats_pause_interrupts_inflight_retry_attempt

Inactivity timeout:
- test_nats_quiet_but_healthy_no_false_alarm
- test_nats_inactivity_timeout_config_is_honored

Configuration validation:
- test_nats_retry_interval_config_is_honored
- test_nats_retry_interval_zero_rejected

Replay:
- test_nats_replay_basic
- test_nats_replay_partial_range
- test_nats_replay_then_extend
- test_nats_replay_empty_range
- test_nats_replay_multiple_ranges
- test_nats_replay_stream_deleted_retries_and_recovers
- test_nats_replay_server_killed_retries_non_fatal
- test_nats_replay_after_purge_errors
- test_nats_replay_end_after_last_sequence_fails_fast
- test_nats_replay_start_before_first_sequence_fails_fast
- test_nats_replay_empty_stream_fails_fast

Disconnect:
- test_nats_disconnect_stops_delivery

Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
Add new controller-based fault tolerance tests:

- test_nats_ft_replay_after_stream_purge: replay fails when
  checkpointed messages have been purged
- test_nats_ft_stream_deletion_and_recreation: replay fails after
  stream delete+recreate since old sequences are gone
- test_nats_ft_stream_deletion_after_full_checkpoint: startup fails
  when resume cursor points at deleted stream
- test_nats_ft_startup_retries_when_stream_missing: startup enters
  retry mode when configured stream doesn't exist yet

Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
Add test_nats_inactivity_timeout_zero_rejected to verify that
inactivity_timeout_secs=0 is rejected during pipeline creation.

Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
Demote high-frequency log statements to lower levels to avoid flooding
logs during normal operation:
- Got message #N: info -> trace (emitted per message)
- Queued {N} records: info -> debug (emitted every flush cycle)
- Queued 0 records: debug -> trace (empty flushes are noise)

Lifecycle events (connection, recovery, errors) remain at INFO/ERROR.

Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
…e root cause

Add retry_count tracker to the reader lifecycle state machine:
- First error: 'entered ERROR state, will retry in {interval}'
- Subsequent retries: 'retry #{N} failed, next attempt in {interval}'
  using error.root_cause() instead of the full context chain
- Recovery: 'recovered after {N} retries, resuming from {cursor}'
- Counter resets on recovery, pause, and successful extend

Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
Trim the verbose nested error chains in check_inactivity_health() by
using root_cause() instead of the full anyhow context chain ({:#}).
This makes the initial 'stalled' error message much more readable.

Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
The ordered consumer automatically detects lost heartbeats and recreates
its subscription on a reconnected client, so a stale connection is not
an edge case that requires caller intervention.

Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
Introduce three small structs to group related parameters:
- StreamContext: connection config, stream name, inactivity timeout
- ReaderState: shared next_sequence and queue
- MessagePipeline: consumer and parser pair

Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
Detail specific fatal error scenarios: stream deleted/recreated,
stream purged, stream emptied, and unexpected sequence during replay.
Add recovery tip for resetting pipeline checkpoint state.

Signed-off-by: kfollesdal <kristoffer.follesdal@eviny.no>
@kfollesdal kfollesdal force-pushed the nats-bug-and-improve2 branch from b54d28f to af9acf2 Compare March 13, 2026 18:03
@kfollesdal
Copy link
Contributor Author

@blp, rebased and solved conflicts. And have run tests and pre-commit, but the Cargo Machete step in pre-committ, wanted to do lot of changes to Cargo.toml files, but that is unrelated to this PR. Let see how it behave in the PR checks.

@blp blp enabled auto-merge March 13, 2026 18:25
@gz
Copy link
Contributor

gz commented Mar 13, 2026

@blp I think you need to reivew/approve before it can get merged

@blp blp added this pull request to the merge queue Mar 13, 2026
@github-merge-queue github-merge-queue bot removed this pull request from the merge queue due to failed status checks Mar 13, 2026
@blp
Copy link
Member

blp commented Mar 13, 2026

The mysterious disappearing test binary problem is back. Requeuing.

@blp blp added this pull request to the merge queue Mar 13, 2026
Merged via the queue into feldera:main with commit b81033a Mar 13, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants