[RFC][Connectors]: Add native s2 input and output connector support in Feldera#5728
[RFC][Connectors]: Add native s2 input and output connector support in Feldera#5728Mrhs121 wants to merge 9 commits intofeldera:mainfrom
Conversation
mythical-fred
left a comment
There was a problem hiding this comment.
This is a solid start as an RFC — the fault-tolerance wiring, replay semantics, and connector plumbing all look structurally correct. A few things to address before this can merge:
s2-sdk dependency
MIT license, fine. But the crate is very young: first published Feb 2026 (~3 weeks before this PR), 2152 total downloads across all versions, pre-1.0 with multiple breaking version bumps per week (0.22 → 0.23 → 0.24 in six weeks). The GitHub repo has 441 stars and is actively pushed to by the S2 team — not abandoned — but this is not "widely used" by any meaningful measure. Feldera would be an early adopter. That's a product decision, not a hard technical block, but the 0.x churn risk is real.
Missing integration tests
Config serde tests are good but insufficient for a new connector. Needs actual stream read tests. Look at NATS or PubSub for the pattern; S2 supports a local emulator which should make this straightforward.
No documentation
User-visible connector with no entry in docs.feldera.com. Required before merge.
Commit message
support s2 native connector is too terse for a 500-line change. Should explain motivation and approach.
| Ok(batch) => { | ||
| for record in &batch.records { | ||
| info!("Got record #{}", record.seq_num); | ||
| next_seq.store(record.seq_num + 1, Ordering::Release); |
There was a problem hiding this comment.
Per-record info! will spam logs at any meaningful throughput. Downgrade to trace!, or drop it — the queue/extend logs already give sufficient visibility.
| } | ||
| Err(error) => { | ||
| consumer.error(false, anyhow!("S2 error: {error}"), Some("s2-input")); | ||
| } |
There was a problem hiding this comment.
Non-fatal error with no backoff. If S2 returns errors continuously (network blip, auth expiry, rate limit), this spin-calls consumer.error(false, ...) in a tight loop. Other connectors add exponential backoff before retrying.
| pub stream: String, | ||
| /// S2 authentication token. | ||
| pub auth_token: String, | ||
| /// Custom S2 endpoint URL (e.g., "http://localhost:8080"). |
There was a problem hiding this comment.
auth_token appears in plain connector config and will surface in API responses. At minimum, document clearly that this is a secret and advise users not to embed raw tokens in SQL. Longer-term, consider a secret-scrubbing annotation consistent with how other connectors handle credentials.
| /// S2 authentication token. | ||
| pub auth_token: String, |
There was a problem hiding this comment.
nit: can we call it as access_token? coz thats the terminology we use
| /// If not set, uses the default S2 cloud endpoint. | ||
| #[serde(default, skip_serializing_if = "Option::is_none")] | ||
| pub endpoint: Option<String>, |
There was a problem hiding this comment.
as you might have noticed in https://s2.dev/docs/api/endpoints, S2 has account endpoint and basin endpoint. for complete configurability and the efficiency that the basin endpoint offers, i think we should propagate both account_endpoint and basin_endpoint.
| const RETRY_BASE_DELAY: Duration = Duration::from_millis(500); | ||
| const RETRY_MAX_DELAY: Duration = Duration::from_secs(30); |
There was a problem hiding this comment.
i'm not totally sure of the motivation behind this dedicated retry loop. i can see the control in terms of logging etc, but ICYMI, the SDK by default supports exponential backoff with jitter. you can adjust the knobs if you want. https://docs.rs/s2-sdk/latest/s2_sdk/types/struct.RetryConfig.html
There was a problem hiding this comment.
Thanks for the suggestion! I originally added the retry mechanism because when the S2 service goes down, the Feldera pipeline fails immediately due to a connection error. To handle this, I implemented a manual retry at the application layer.
After reviewing https://s2.dev/docs/sdk/retries-timeouts
, I’ve updated the implementation to leverage the SDK’s built-in retry support instead.
|
thank you for your input @quettabit we appreciate it |
|
Thank you all for the reviews. We’re currently building a new data system, and during our technology evaluation we were drawn to Feldera’s incremental computation capabilities and S2’s stateless, lightweight design. That’s why we plan to integrate S2 as a native connector in Feldera. This PR is still in an early stage and should be considered a first draft, with a lot more work still needed. |
|
@Mrhs121 absolutely, we're happy to help you get this merged, let us know if you have any questions etc. |
5fbbc8a to
2db735a
Compare
mythical-fred
left a comment
There was a problem hiding this comment.
Nice progress — output connector looks clean and config unit tests are solid. A few issues to address before merge-ready.
| // Use a conservative limit to leave room for framing overhead. | ||
| 1_000_000 | ||
| } | ||
|
|
There was a problem hiding this comment.
push_buffer submits and immediately awaits the ack before returning — one round-trip per buffer. For real throughput: pipeline submits and wait for acks at flush/disconnect. Not a hard blocker for a first pass, but add a TODO so it isn't forgotten.
|
@Mrhs121 mythical-fred is our automated reviewer, if his comments are not correct you can ignore him :) |
eacaa53 to
9a04d1a
Compare
mythical-fred
left a comment
There was a problem hiding this comment.
Both blockers addressed: info! downgraded to trace!, and the double-parse moved to the error path only. Error handling via SDK RetryConfig is also correct. LGTM.
mythical-fred
left a comment
There was a problem hiding this comment.
Both blockers addressed. LGTM.
mythical-fred
left a comment
There was a problem hiding this comment.
Both hard blockers resolved — info→trace fixed, double-parse moved to error path. The push_buffer one-ack-per-record issue remains (soft), and auth_token still lacks a secret annotation — worth a follow-up issue.
dba7b1b to
0e590c2
Compare
0e590c2 to
feedd68
Compare
Describe Manual Test Plan
RFC : #5726
Checklist
Breaking Changes?
Mark if you think the answer is yes for any of these components:
Describe Incompatible Changes