[adapters] Report recent connector errors in the API#5775
Conversation
| } | ||
|
|
||
| pub fn parse_error(&self, endpoint_id: EndpointId) { | ||
| pub fn parse_error( |
There was a problem hiding this comment.
I would name these functions with "report_parse_error", to make it clear that they are not constructors.
| /// Stores up to MAX_CONNECTOR_ERRORS most recent errors for each tag. | ||
| /// When the number of errors for a tag exceeds MAX_CONNECTOR_ERRORS, the oldest error is removed. | ||
| #[derive(Debug)] | ||
| pub struct ConnectorErrorList { |
There was a problem hiding this comment.
Should the constant MAX_CONNECTOR_ERRORS be part of this class?
Alternatively, it should be a constructor parameter if not.
Also, this class could have "Recent" in its name.
| /// If `full` is false, generates a trimmed version of the endpoint status, | ||
| /// not including the list of errors. This status can be returned via the | ||
| /// `/stats` endpoint. | ||
| pub fn to_api_type(&self, full: bool) -> ExternalOutputEndpointStatus { |
There was a problem hiding this comment.
I would rename "full" to "include_errors"
There was a problem hiding this comment.
We may add more optional fields in the future. I think full is adequate.
|
TODOs:
Not in scope: system table for errors. |
|
System tables would tricky if the pipeline needs to essentially block. If it can't run any more steps it won't be able to write to any output connectors. |
| /// The first fatal error that occurred at the endpoint. | ||
| pub fatal_error: Option<String>, | ||
| #[serde(default, skip_serializing_if = "Option::is_none")] | ||
| /// Recent encoding errors on this endpoint. |
There was a problem hiding this comment.
Comments above decorators make the whole field more readable
| /// Optional tag for the error. | ||
| /// | ||
| /// The tag is used to group errors by their type. | ||
| pub tag: Option<String>, |
There was a problem hiding this comment.
An optional tag is not pretty, this forces clients to come up with own strategy to treat tag=null records. Would it be meaningful to have an "Other" or "Generic" tag assigned by the pipeline-manager, making this field required in the API?
There was a problem hiding this comment.
It's easy to substitute "other" if a tag is missing; it's harder to go in the other direction. In any case, I don't think we have much use for the tag, you may want to just ignore it in the webconsole
| /// The first fatal error that occurred at the endpoint. | ||
| pub fatal_error: Option<String>, | ||
| #[serde(default)] | ||
| pub parse_errors: Option<Vec<ConnectorError>>, |
There was a problem hiding this comment.
Since you introduce error lists in Input and Output Status, do you deprecate metrics.num_*_errors in both?
There was a problem hiding this comment.
No, because these error lists only contain recent errors, the error count should account for all errors that have ever occurred in this pipeline.
| pub tag: Option<String>, | ||
|
|
||
| /// Error message. | ||
| pub error: String, |
There was a problem hiding this comment.
message is more fitting and consistent with other errors?
There was a problem hiding this comment.
lgtm maybe we should store 1000 since it should be cheap and use http pagination headers to retrieve them https://docs.github.com/en/rest/using-the-rest-api/using-pagination-in-the-rest-api?apiVersion=2022-11-28
(might be a future PR if too hard)
| fn parse_error(&self) { | ||
| self.metrics.num_parse_errors.fetch_add(1, Ordering::AcqRel); | ||
| fn parse_error(&self, tag: Option<&'static str>, error: &ParseError) { | ||
| let last_error = self.metrics.num_parse_errors.fetch_add(1, Ordering::AcqRel); |
There was a problem hiding this comment.
it's odd that you need both parse_errors and num_parse_errors
There was a problem hiding this comment.
num errors accounts for all errors generated by the connector. The error list only lists recent errors. @mihaibudiu suggested renaming it to recent_parse_errors.
There was a problem hiding this comment.
But note that we include error index to help track gaps in the list of reported errors.
| /// When the number of errors for a tag exceeds MAX_CONNECTOR_ERRORS, the oldest error is removed. | ||
| #[derive(Debug)] | ||
| pub struct ConnectorErrorList { | ||
| errors: BTreeMap<Option<&'static str>, VecDeque<ConnectorError>>, |
There was a problem hiding this comment.
would it be better if this is ConcurrentHashMap<string, Mutex<VecDequeue>>
There was a problem hiding this comment.
I don't think it matters. These are per-connector, so there's not gonna be a lot of contention. And in any case if the connector is producing so many errors of different types, lock contention is probably not your biggest problem.
| /// Optional tag for the error. | ||
| /// | ||
| /// The tag is used to group errors by their type. | ||
| pub tag: Option<String>, |
There was a problem hiding this comment.
Mihai asked for skip_serializing_if here and ryzhyk agreed ("yes!"), but it wasn't added. tag: None will serialize as "tag": null in the JSON response. Should be:
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tag: Option<String>,435ed07 to
49c6acb
Compare
mythical-fred
left a comment
There was a problem hiding this comment.
Good fixes — both the tag bug and the comment on semantics are addressed. One remaining item: still lacks . Approving, but worth a quick fix before merge.
| /// | ||
| /// The tag is used to group errors by their type. | ||
| pub tag: Option<String>, | ||
|
|
There was a problem hiding this comment.
Missing #[serde(skip_serializing_if = "Option::is_none")]. Mihai flagged this earlier and you agreed — every tagless error will still serialize as "tag": null in the API response. One-liner fix.
| /// | ||
| /// The tag is used to group errors by their type. | ||
| pub tag: Option<String>, | ||
|
|
There was a problem hiding this comment.
Missing #[serde(skip_serializing_if = "Option::is_none")] here. Mihai flagged this earlier and you agreed — every tagless error will still serialize as "tag": null in the API response. One-liner fix.
We had duplicate code converting connector status to the REST API format. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
mythical-fred
left a comment
There was a problem hiding this comment.
LGTM on the rebase — same fixes as before. The tag/skip_serializing_if comment above still stands.
Partially fixes #1108. Today, connector errors are only reported in the log where they can be difficult to find or get lost completely if the log is noisy. This commit exposes connector errors via the REST API: - We store up to a fixed number (100) most recent errors of each kind (transport/encoder/parser) and for each tag (we reuse the same tags that were recently introduced for log throttling). - Old errors are evicted in FIFO order - The error list is returned via /tables/table_name/connectors/connector_name/stats endpoint, but not via global /stats to avoid bloating the output of that endpoint. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
- Add Python bindings for querying input/output connector statuses. - Add a test that validates all connector fields and specifically tests input/out metric reporting. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Per-table/view status endpoints now return strongly typed results of typr `InputConnectorStatus`/`OutpuConnectorStatus`. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
Partially fixes #1108.
Today, connector errors are only reported in the log where they can be
difficult to find or get lost completely if the log is noisy.
This commit exposes connector errors via the REST API:
We store up to a fixed number (100) most recent errors of each kind
(transport/encoder/parser) and for each tag (we reuse the same tags that were
recently introduced for log throttling).
Old errors are evicted in FIFO order
The error list is returned via
/tables/table_name/connectors/connector_name/stats endpoint, but not via
global /stats to avoid bloating the output of that endpoint.
Describe Manual Test Plan
Checklist
Breaking Changes?
Mark if you think the answer is yes for any of these components:
Describe Incompatible Changes