Skip to content

[RFC][Connectors]: Add native s2 input and output connector support in Feldera#5728

Open
Mrhs121 wants to merge 9 commits intofeldera:mainfrom
Mrhs121:s2-connector-draft
Open

[RFC][Connectors]: Add native s2 input and output connector support in Feldera#5728
Mrhs121 wants to merge 9 commits intofeldera:mainfrom
Mrhs121:s2-connector-draft

Conversation

@Mrhs121
Copy link

@Mrhs121 Mrhs121 commented Mar 2, 2026

Describe Manual Test Plan

RFC : #5726

Checklist

  • Unit tests added/updated
  • Integration tests added/updated
  • Documentation updated
  • Changelog updated

Breaking Changes?

Mark if you think the answer is yes for any of these components:

Describe Incompatible Changes

Copy link
Collaborator

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a solid start as an RFC — the fault-tolerance wiring, replay semantics, and connector plumbing all look structurally correct. A few things to address before this can merge:

s2-sdk dependency
MIT license, fine. But the crate is very young: first published Feb 2026 (~3 weeks before this PR), 2152 total downloads across all versions, pre-1.0 with multiple breaking version bumps per week (0.22 → 0.23 → 0.24 in six weeks). The GitHub repo has 441 stars and is actively pushed to by the S2 team — not abandoned — but this is not "widely used" by any meaningful measure. Feldera would be an early adopter. That's a product decision, not a hard technical block, but the 0.x churn risk is real.

Missing integration tests
Config serde tests are good but insufficient for a new connector. Needs actual stream read tests. Look at NATS or PubSub for the pattern; S2 supports a local emulator which should make this straightforward.

No documentation
User-visible connector with no entry in docs.feldera.com. Required before merge.

Commit message
support s2 native connector is too terse for a 500-line change. Should explain motivation and approach.

Ok(batch) => {
for record in &batch.records {
info!("Got record #{}", record.seq_num);
next_seq.store(record.seq_num + 1, Ordering::Release);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per-record info! will spam logs at any meaningful throughput. Downgrade to trace!, or drop it — the queue/extend logs already give sufficient visibility.

}
Err(error) => {
consumer.error(false, anyhow!("S2 error: {error}"), Some("s2-input"));
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-fatal error with no backoff. If S2 returns errors continuously (network blip, auth expiry, rate limit), this spin-calls consumer.error(false, ...) in a tight loop. Other connectors add exponential backoff before retrying.

pub stream: String,
/// S2 authentication token.
pub auth_token: String,
/// Custom S2 endpoint URL (e.g., "http://localhost:8080").
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

auth_token appears in plain connector config and will surface in API responses. At minimum, document clearly that this is a secret and advise users not to embed raw tokens in SQL. Longer-term, consider a secret-scrubbing annotation consistent with how other connectors handle credentials.

Copy link

@quettabit quettabit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey @Mrhs121! thank you for your contribution. i'm from team S2, and i just left some comments regarding the usage of the SDK. i'll defer to feldera folks regarding other parts.

Comment on lines +32 to +33
/// S2 authentication token.
pub auth_token: String,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we call it as access_token? coz thats the terminology we use

Comment on lines +35 to +37
/// If not set, uses the default S2 cloud endpoint.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub endpoint: Option<String>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as you might have noticed in https://s2.dev/docs/api/endpoints, S2 has account endpoint and basin endpoint. for complete configurability and the efficiency that the basin endpoint offers, i think we should propagate both account_endpoint and basin_endpoint.

Comment on lines +320 to +321
const RETRY_BASE_DELAY: Duration = Duration::from_millis(500);
const RETRY_MAX_DELAY: Duration = Duration::from_secs(30);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not totally sure of the motivation behind this dedicated retry loop. i can see the control in terms of logging etc, but ICYMI, the SDK by default supports exponential backoff with jitter. you can adjust the knobs if you want. https://docs.rs/s2-sdk/latest/s2_sdk/types/struct.RetryConfig.html

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! I originally added the retry mechanism because when the S2 service goes down, the Feldera pipeline fails immediately due to a connection error. To handle this, I implemented a manual retry at the application layer.
After reviewing https://s2.dev/docs/sdk/retries-timeouts
, I’ve updated the implementation to leverage the SDK’s built-in retry support instead.

@gz
Copy link
Contributor

gz commented Mar 3, 2026

thank you for your input @quettabit we appreciate it

@Mrhs121
Copy link
Author

Mrhs121 commented Mar 3, 2026

Thank you all for the reviews. We’re currently building a new data system, and during our technology evaluation we were drawn to Feldera’s incremental computation capabilities and S2’s stateless, lightweight design. That’s why we plan to integrate S2 as a native connector in Feldera. This PR is still in an early stage and should be considered a first draft, with a lot more work still needed.

@gz
Copy link
Contributor

gz commented Mar 3, 2026

@Mrhs121 absolutely, we're happy to help you get this merged, let us know if you have any questions etc.

@Mrhs121 Mrhs121 force-pushed the s2-connector-draft branch 4 times, most recently from 5fbbc8a to 2db735a Compare March 12, 2026 14:04
@Mrhs121 Mrhs121 changed the title [RFC][Connectors][Draft]: Add native s2 input connector support in Feldera [RFC][Connectors][Draft]: Add native s2 input and output connector support in Feldera Mar 13, 2026
Copy link
Collaborator

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice progress — output connector looks clean and config unit tests are solid. A few issues to address before merge-ready.

// Use a conservative limit to leave room for framing overhead.
1_000_000
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

push_buffer submits and immediately awaits the ack before returning — one round-trip per buffer. For real throughput: pipeline submits and wait for acks at flush/disconnect. Not a hard blocker for a first pass, but add a TODO so it isn't forgotten.

@gz
Copy link
Contributor

gz commented Mar 13, 2026

@Mrhs121 mythical-fred is our automated reviewer, if his comments are not correct you can ignore him :)

@Mrhs121 Mrhs121 force-pushed the s2-connector-draft branch from eacaa53 to 9a04d1a Compare March 18, 2026 11:36
Copy link
Collaborator

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both blockers addressed: info! downgraded to trace!, and the double-parse moved to the error path only. Error handling via SDK RetryConfig is also correct. LGTM.

Copy link
Collaborator

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both blockers addressed. LGTM.

Copy link
Collaborator

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both hard blockers resolved — info→trace fixed, double-parse moved to error path. The push_buffer one-ack-per-record issue remains (soft), and auth_token still lacks a secret annotation — worth a follow-up issue.

@Mrhs121 Mrhs121 force-pushed the s2-connector-draft branch from dba7b1b to 0e590c2 Compare March 19, 2026 14:31
@Mrhs121 Mrhs121 force-pushed the s2-connector-draft branch from 0e590c2 to feedd68 Compare March 23, 2026 12:20
@Mrhs121 Mrhs121 changed the title [RFC][Connectors][Draft]: Add native s2 input and output connector support in Feldera [RFC][Connectors]: Add native s2 input and output connector support in Feldera Mar 24, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants