[adapters] Add retries to delta output connector.#5782
Conversation
| } | ||
|
|
||
| #[derive(Debug, Default, Deserialize, Serialize, ToSchema, Clone)] | ||
| pub struct ConnectorHealth { |
There was a problem hiding this comment.
why not make this an enum with a String argument for unhealthy?
Do you ever expect Healthy to need a description?
There was a problem hiding this comment.
Or is this because you want a simpler JSON Schema?
There was a problem hiding this comment.
yes, this needs to be openapi-friendly
| Backward-incompatible Delta Lake output connector change. The new `max_retries` setting configures | ||
| the number of times the connector retries failed Delta Lake operations like writing Parquet files | ||
| and committing transactions. The setting is unset by default, causing the connector to retry | ||
| indefinitely. This behavior prevents data loss due to transient or permanent write errors. |
There was a problem hiding this comment.
"permanent" is permanent, not much you can do then...
There was a problem hiding this comment.
you can prevent the pipeline from making progress beyond the error.
| description = d.get("description") | ||
| if description is not None and not isinstance(description, str): | ||
| raise ValueError( | ||
| "invalid optional field 'health.description': expected string or null" |
There was a problem hiding this comment.
null or None?
so many languages...
| self.health: Optional[ConnectorHealth] = None | ||
|
|
||
| @classmethod | ||
| def from_dict(cls, d: Mapping[str, Any]): |
There was a problem hiding this comment.
does the test file cover all these new fields?
|
|
||
| async fn commit(&mut self, actions: &[Add]) -> AnyResult<()> { | ||
| // The snapshot version for the next commit is computed as the current version + 1. | ||
| // We need to update the current version manually, since it doesn't happen automatically. |
There was a problem hiding this comment.
I don't really understand this comment, perhaps it means that the Delta server does not change versions?
mythical-fred
left a comment
There was a problem hiding this comment.
Two issues; one is a real defect.
Blank breaking change description: Python SDK is checked in the breaking changes list but the "Describe Incompatible Changes" section is empty. The actual breaking change is the removal of feldera/connector_stats.py and the rename of InputConnectorStatus → InputEndpointStatus / OutputConnectorStatus → OutputEndpointStatus. Anyone importing from feldera.connector_stats will get an ImportError. Please describe this.
object-store logs S3 retries at INFO level resulting in very noisy logs for some pipelines. This commit sets its default log level to WARN. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
The connector used to give up if any delta-rs operation failed, e.g., because configured S3 timeouts/retry limits have expired. - Add retry loops controlled by a new `max_retries` config settings. The default setting is `None` for infinite retries. This is a non-backward compatible change, since connectors will now be stuck in retry loops instead of reporting an error and continuing to process the next change. The new behavior is a more sensible default, as it guarantees that the pipeline won't be checkpointed if some data was lost. - Add a new `health` field to connector status. This is used by the connector to report its state while it's stuck in potentially infinite retries. There are still situations where the connector can lose data even with infinite retries configured: - There's a TODO encode for a delta-rs API call that consumes its argument and therefore cannot be retried. - Encoding errors are not retryable. Going forward, we need to change the connector framework to handle data loss better: - Connector errors should identify if they cause a data loss - A per-connector setting should indicate whether the pipeline should fail in this case, preventing invalid state to get saved in the checkpoint. Another todo is that the notion of a fatal error is no longer useful and should be replaced with setting connector health status to FAILED. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
In an earlier commit I stupidly asked cursor to generate Python bindings for connector status endpoints. Without a shadow of a doubt, cursor generated new classes, instead of extending existing classes in stats.py. Not being the owner of this code, I did not notice it. This commit dedups the classes, merging new connector status fields back into existing classes. It also adds the new `health` field introduced in the previous commit. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
The module was missing from documentation. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
mythical-fred
left a comment
There was a problem hiding this comment.
LGTM — the duplicate from_dict on InputEndpointStatus is gone; the dedup commit now has one complete implementation. Docs commit and auto-format don't introduce any issues.
See commit messages.
Describe Manual Test Plan
Tested manually by directing the connector to local FS, renaming the folder (retries start) and then restoring the original folder (writing continues successfully)
Checklist
Breaking Changes?
Mark if you think the answer is yes for any of these components:
Describe Incompatible Changes