Skip to content

[adapters] Add retries to delta output connector.#5782

Merged
ryzhyk merged 6 commits intomainfrom
delta-hardening
Mar 10, 2026
Merged

[adapters] Add retries to delta output connector.#5782
ryzhyk merged 6 commits intomainfrom
delta-hardening

Conversation

@ryzhyk
Copy link
Contributor

@ryzhyk ryzhyk commented Mar 9, 2026

See commit messages.

Describe Manual Test Plan

Tested manually by directing the connector to local FS, renaming the folder (retries start) and then restoring the original folder (writing continues successfully)

Checklist

  • Unit tests added/updated
  • Integration tests added/updated
  • Documentation updated
  • Changelog updated

Breaking Changes?

Mark if you think the answer is yes for any of these components:

Describe Incompatible Changes

  • Adapters: the connector retries forever by default now
  • Python: adapter_stats is gone. It was only merged earlier today, so unlikely to affect users.

@ryzhyk ryzhyk added the connectors Issues related to the adapters/connectors crate label Mar 9, 2026
}

#[derive(Debug, Default, Deserialize, Serialize, ToSchema, Clone)]
pub struct ConnectorHealth {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not make this an enum with a String argument for unhealthy?
Do you ever expect Healthy to need a description?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or is this because you want a simpler JSON Schema?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this needs to be openapi-friendly

Backward-incompatible Delta Lake output connector change. The new `max_retries` setting configures
the number of times the connector retries failed Delta Lake operations like writing Parquet files
and committing transactions. The setting is unset by default, causing the connector to retry
indefinitely. This behavior prevents data loss due to transient or permanent write errors.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"permanent" is permanent, not much you can do then...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can prevent the pipeline from making progress beyond the error.

description = d.get("description")
if description is not None and not isinstance(description, str):
raise ValueError(
"invalid optional field 'health.description': expected string or null"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null or None?
so many languages...

self.health: Optional[ConnectorHealth] = None

@classmethod
def from_dict(cls, d: Mapping[str, Any]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the test file cover all these new fields?


async fn commit(&mut self, actions: &[Add]) -> AnyResult<()> {
// The snapshot version for the next commit is computed as the current version + 1.
// We need to update the current version manually, since it doesn't happen automatically.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand this comment, perhaps it means that the Delta server does not change versions?

@ryzhyk ryzhyk requested review from abhizer and snkas March 9, 2026 23:35
Copy link

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two issues; one is a real defect.

Blank breaking change description: Python SDK is checked in the breaking changes list but the "Describe Incompatible Changes" section is empty. The actual breaking change is the removal of feldera/connector_stats.py and the rename of InputConnectorStatusInputEndpointStatus / OutputConnectorStatusOutputEndpointStatus. Anyone importing from feldera.connector_stats will get an ImportError. Please describe this.

@ryzhyk ryzhyk requested a review from mythical-fred March 10, 2026 00:11
ryzhyk added 3 commits March 10, 2026 09:07
object-store logs S3 retries at INFO level resulting in very noisy logs for
some pipelines.  This commit sets its default log level to WARN.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
The connector used to give up if any delta-rs operation failed, e.g., because
configured S3 timeouts/retry limits have expired.

- Add retry loops controlled by a new `max_retries` config settings. The default setting
  is `None` for infinite retries. This is a non-backward compatible change, since connectors
  will now be stuck in retry loops instead of reporting an error and continuing to process
  the next change. The new behavior is a more sensible default, as it guarantees that the
  pipeline won't be checkpointed if some data was lost.

- Add a new `health` field to connector status. This is used by the connector to report its
  state while it's stuck in potentially infinite retries.

There are still situations where the connector can lose data even with infinite retries
configured:
- There's a TODO encode for a delta-rs API call that consumes its argument and
  therefore cannot be retried.
- Encoding errors are not retryable.

Going forward, we need to change the connector framework to handle data loss better:
- Connector errors should identify if they cause a data loss
- A per-connector setting should indicate whether the pipeline should fail in this case,
  preventing invalid state to get saved in the checkpoint.

Another todo is that the notion of a fatal error is no longer useful and should be
replaced with setting connector health status to FAILED.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
@ryzhyk ryzhyk enabled auto-merge March 10, 2026 16:10
@ryzhyk ryzhyk added this pull request to the merge queue Mar 10, 2026
ryzhyk added 2 commits March 10, 2026 11:00
In an earlier commit I stupidly asked cursor to generate Python bindings for
connector status endpoints. Without a shadow of a doubt, cursor generated new
classes, instead of extending existing classes in stats.py. Not being the owner
of this code, I did not notice it.

This commit dedups the classes, merging new connector status fields back into
existing classes. It also adds the new `health` field introduced in the
previous commit.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
The module was missing from documentation.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
@ryzhyk ryzhyk removed this pull request from the merge queue due to a manual request Mar 10, 2026
@ryzhyk ryzhyk enabled auto-merge March 10, 2026 18:01
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
@ryzhyk ryzhyk added this pull request to the merge queue Mar 10, 2026
Copy link

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM — the duplicate from_dict on InputEndpointStatus is gone; the dedup commit now has one complete implementation. Docs commit and auto-format don't introduce any issues.

Merged via the queue into main with commit 48c7c22 Mar 10, 2026
1 check passed
@ryzhyk ryzhyk deleted the delta-hardening branch March 10, 2026 20:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

connectors Issues related to the adapters/connectors crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants