Skip to content

py: modify kafka integration test to support multi variant test methods#5809

Merged
rivudhk merged 3 commits intomainfrom
kafka_avro2
Mar 18, 2026
Merged

py: modify kafka integration test to support multi variant test methods#5809
rivudhk merged 3 commits intomainfrom
kafka_avro2

Conversation

@rivudhk
Copy link
Contributor

@rivudhk rivudhk commented Mar 12, 2026

This PR modifies the Kafka Avro integration tests as follows:

  1. Introduces multiple tests per pipeline variant.
  2. Generates connector configurations from predefined variant settings.
  3. Separates SQL definitions into different functions instead of a single string.
  4. Adds helper functions for polling and loopback validation.
  5. Adds support for multiple test variants in the file
  6. Adds automatic Kafka topic creation when required
  7. Renames extract_kafka_avro_artifacts to extract_kafka_schema_artifacts
  8. Renames cleanup_kafka to cleanup_kafka_schema_artifacts
  9. Adds TCP and HTTP checks for Kafka broker and Schema Registry availability
  10. Adds RUN_ID mechanism to allow selective execution of test variants

Checklist

  • Integration tests added/updated

@rivudhk rivudhk requested a review from blp March 12, 2026 16:17
@rivudhk rivudhk changed the title draft PR: modify kafka integration test to support multi variant test methods py: modify kafka integration test to support multi variant test methods Mar 15, 2026
@rivudhk rivudhk marked this pull request as ready for review March 15, 2026 19:18
# export SCHEMA_REGISTRY_URL= http://localhost:8081


KAFKA_BOOTSTRAP = env(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KAFKA_BOOTSTRAP, SCHEMA_REGISTRY, and KAFKA_ADMIN are all evaluated at module load time (lines 47-56). Importing this file during pytest discovery or linting triggers live socket/HTTP connections to the CI Kafka broker and Schema Registry. A missing connection will fail the import and break any test run that includes this file, even for unrelated tests.

Consider lazy-initialising these (e.g., in setUpClass or via a module-level None populated on first use) or guard the connectivity checks behind an explicit flag/env var.

TEST_CONFIGS = [
{"id": 0, "limit": 10, "partitions": [0], "sync": False},
{
"id": 1,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variant 0 has "partitions": [0] and "sync": False but no "start_from". The old code passed "start_from": "earliest" unconditionally. sql_loopback_table only adds start_from if v.start_from is set, so variant 0 uses whatever the connector default is. If the default is "latest" the loopback table gets 0 rows and the hash validation fails. Please verify and add "start_from": "earliest" to variant 0 if needed.

@blp
Copy link
Member

blp commented Mar 16, 2026

When I run this locally, it fails because topics my_topic_avro_0 and my_topic_avro2_0 don't exist. If I create them manually, it passes, and then it fails if I rerun it (because the first run deleted the topics?).

Since the topic names are hardcoded, I think that multiple runs at the same time are going to interfere with each other.

Copy link
Member

@blp blp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you make the change we discussed, then this is fine, so I will approve it in anticipation of that.

rivudhk added 3 commits March 18, 2026 03:11
- Introduces multiple test methods per pipeline variant.
- Generates connector configurations from predefined variant settings.
- Inherits from SharedTestPipeline instead of unittest.TestCase to allow single SQL compilation.
- Separates SQL definitions into different functions instead of a single string.
- Adds helper functions for polling and loopback validation.

Signed-off-by: rivudhk <rivudhkr@gmail.com>
- Removed the SharedTestPipeline framework because the tests were accessing topics not assigned to them
- Introduced automatic Kafka topic creation when needed.
- Renamed the method 'extract_kafka_avro_artifacts' to 'extract_kafka_schema_artifacts' for clarity.
- Renamed the method 'cleanup_kafka' to 'cleanup_kafka_schema_artifacts' for clarity.
- Added TCP and HTTP checks to verify the availability of the Kafka broker and Schema Registry before tests run.
- Added a RUN_ID mechanism, allowing individual test variants to be executed selectively.

Signed-off-by: rivudhk <rivudhkr@gmail.com>
… connections

- Replace run-id based topic names by unique UUID based topic names to avoid collisions across test runs
- Lazy-initialize Kafka and Schema registry connections to prevent import-time network calls

Signed-off-by: rivudhk <rivudhkr@gmail.com>
@rivudhk rivudhk enabled auto-merge March 17, 2026 21:28
@rivudhk rivudhk added this pull request to the merge queue Mar 17, 2026
Merged via the queue into main with commit 4369457 Mar 18, 2026
1 check passed
@rivudhk rivudhk deleted the kafka_avro2 branch March 18, 2026 02:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants