Skip to content

[adapters] Report recent connector errors in the API#5775

Merged
ryzhyk merged 5 commits intomainfrom
issue-1108
Mar 9, 2026
Merged

[adapters] Report recent connector errors in the API#5775
ryzhyk merged 5 commits intomainfrom
issue-1108

Conversation

@ryzhyk
Copy link
Contributor

@ryzhyk ryzhyk commented Mar 6, 2026

Partially fixes #1108.

Today, connector errors are only reported in the log where they can be
difficult to find or get lost completely if the log is noisy.

This commit exposes connector errors via the REST API:

  • We store up to a fixed number (100) most recent errors of each kind
    (transport/encoder/parser) and for each tag (we reuse the same tags that were
    recently introduced for log throttling).

  • Old errors are evicted in FIFO order

  • The error list is returned via
    /tables/table_name/connectors/connector_name/stats endpoint, but not via
    global /stats to avoid bloating the output of that endpoint.

Describe Manual Test Plan

Checklist

  • Unit tests added/updated
  • Integration tests added/updated
  • Documentation updated
  • Changelog updated

Breaking Changes?

Mark if you think the answer is yes for any of these components:

Describe Incompatible Changes

@ryzhyk ryzhyk requested a review from Karakatiza666 March 6, 2026 23:54
@ryzhyk ryzhyk added the connectors Issues related to the adapters/connectors crate label Mar 6, 2026
}

pub fn parse_error(&self, endpoint_id: EndpointId) {
pub fn parse_error(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would name these functions with "report_parse_error", to make it clear that they are not constructors.

/// Stores up to MAX_CONNECTOR_ERRORS most recent errors for each tag.
/// When the number of errors for a tag exceeds MAX_CONNECTOR_ERRORS, the oldest error is removed.
#[derive(Debug)]
pub struct ConnectorErrorList {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the constant MAX_CONNECTOR_ERRORS be part of this class?
Alternatively, it should be a constructor parameter if not.
Also, this class could have "Recent" in its name.

/// If `full` is false, generates a trimmed version of the endpoint status,
/// not including the list of errors. This status can be returned via the
/// `/stats` endpoint.
pub fn to_api_type(&self, full: bool) -> ExternalOutputEndpointStatus {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rename "full" to "include_errors"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may add more optional fields in the future. I think full is adequate.

@ryzhyk
Copy link
Contributor Author

ryzhyk commented Mar 7, 2026

TODOs:

  • tests
  • checkpointing error list
  • webconsole support

Not in scope: system table for errors.

@mihaibudiu
Copy link
Contributor

System tables would tricky if the pipeline needs to essentially block. If it can't run any more steps it won't be able to write to any output connectors.

/// The first fatal error that occurred at the endpoint.
pub fatal_error: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
/// Recent encoding errors on this endpoint.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments above decorators make the whole field more readable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

/// Optional tag for the error.
///
/// The tag is used to group errors by their type.
pub tag: Option<String>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An optional tag is not pretty, this forces clients to come up with own strategy to treat tag=null records. Would it be meaningful to have an "Other" or "Generic" tag assigned by the pipeline-manager, making this field required in the API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's easy to substitute "other" if a tag is missing; it's harder to go in the other direction. In any case, I don't think we have much use for the tag, you may want to just ignore it in the webconsole

/// The first fatal error that occurred at the endpoint.
pub fatal_error: Option<String>,
#[serde(default)]
pub parse_errors: Option<Vec<ConnectorError>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you introduce error lists in Input and Output Status, do you deprecate metrics.num_*_errors in both?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because these error lists only contain recent errors, the error count should account for all errors that have ever occurred in this pipeline.

pub tag: Option<String>,

/// Error message.
pub error: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

message is more fitting and consistent with other errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok!

@ryzhyk ryzhyk marked this pull request as ready for review March 7, 2026 19:34
@ryzhyk ryzhyk requested review from Karakatiza666 and gz March 7, 2026 20:10
Copy link
Contributor

@gz gz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm maybe we should store 1000 since it should be cheap and use http pagination headers to retrieve them https://docs.github.com/en/rest/using-the-rest-api/using-pagination-in-the-rest-api?apiVersion=2022-11-28

(might be a future PR if too hard)

fn parse_error(&self) {
self.metrics.num_parse_errors.fetch_add(1, Ordering::AcqRel);
fn parse_error(&self, tag: Option<&'static str>, error: &ParseError) {
let last_error = self.metrics.num_parse_errors.fetch_add(1, Ordering::AcqRel);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's odd that you need both parse_errors and num_parse_errors

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

num errors accounts for all errors generated by the connector. The error list only lists recent errors. @mihaibudiu suggested renaming it to recent_parse_errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But note that we include error index to help track gaps in the list of reported errors.

/// When the number of errors for a tag exceeds MAX_CONNECTOR_ERRORS, the oldest error is removed.
#[derive(Debug)]
pub struct ConnectorErrorList {
errors: BTreeMap<Option<&'static str>, VecDeque<ConnectorError>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be better if this is ConcurrentHashMap<string, Mutex<VecDequeue>>

Copy link
Contributor Author

@ryzhyk ryzhyk Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it matters. These are per-connector, so there's not gonna be a lot of contention. And in any case if the connector is producing so many errors of different types, lock contention is probably not your biggest problem.

/// Optional tag for the error.
///
/// The tag is used to group errors by their type.
pub tag: Option<String>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mihai asked for skip_serializing_if here and ryzhyk agreed ("yes!"), but it wasn't added. tag: None will serialize as "tag": null in the JSON response. Should be:

#[serde(default, skip_serializing_if = "Option::is_none")]
pub tag: Option<String>,

@ryzhyk ryzhyk force-pushed the issue-1108 branch 4 times, most recently from 435ed07 to 49c6acb Compare March 8, 2026 05:15
Copy link

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good fixes — both the tag bug and the comment on semantics are addressed. One remaining item: still lacks . Approving, but worth a quick fix before merge.

///
/// The tag is used to group errors by their type.
pub tag: Option<String>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing #[serde(skip_serializing_if = "Option::is_none")]. Mihai flagged this earlier and you agreed — every tagless error will still serialize as "tag": null in the API response. One-liner fix.

///
/// The tag is used to group errors by their type.
pub tag: Option<String>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing #[serde(skip_serializing_if = "Option::is_none")] here. Mihai flagged this earlier and you agreed — every tagless error will still serialize as "tag": null in the API response. One-liner fix.

We had duplicate code converting connector status to the REST API format.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Copy link

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM on the rebase — same fixes as before. The tag/skip_serializing_if comment above still stands.

ryzhyk added 3 commits March 9, 2026 09:30
Partially fixes #1108.

Today, connector errors are only reported in the log where they can be
difficult to find or get lost completely if the log is noisy.

This commit exposes connector errors via the REST API:
- We store up to a fixed number (100) most recent errors of each kind
  (transport/encoder/parser) and for each tag (we reuse the same tags that were
  recently introduced for log throttling).

- Old errors are evicted in FIFO order

- The error list is returned via
  /tables/table_name/connectors/connector_name/stats endpoint, but not via
  global /stats to avoid bloating the output of that endpoint.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
- Add Python bindings for querying input/output connector statuses.
- Add a test that validates all connector fields and specifically tests
  input/out metric reporting.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Per-table/view status endpoints now return strongly typed results of typr
`InputConnectorStatus`/`OutpuConnectorStatus`.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
@ryzhyk ryzhyk added this pull request to the merge queue Mar 9, 2026
Merged via the queue into main with commit c8a71c4 Mar 9, 2026
1 check passed
@ryzhyk ryzhyk deleted the issue-1108 branch March 9, 2026 18:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

connectors Issues related to the adapters/connectors crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[RFC] Connector error reporting.

6 participants