Skip to content

[FEAT] Add configurable RabbitMQ HA mode with quorum queues#1840

Merged
chandrasekharan-zipstack merged 14 commits intomainfrom
feat/rabbitmq-ha-quorum-queues
Mar 12, 2026
Merged

[FEAT] Add configurable RabbitMQ HA mode with quorum queues#1840
chandrasekharan-zipstack merged 14 commits intomainfrom
feat/rabbitmq-ha-quorum-queues

Conversation

@johnyrahul
Copy link
Contributor

Summary

  • When RABBITMQ_HA_ENABLED=true, all application Celery queues are declared with x-queue-type: quorum for RabbitMQ HA compatibility
  • Overrides qos_semantics_matches_spec since quorum queues don't support global (channel-level) QoS
  • Internal Celery queues (pidbox, reply) stay classic as they require auto-delete
  • Default is false — no behavioral change unless explicitly enabled

Changes by component

Component Change
backend/backend/celery_config.py Added is_rabbitmq_ha_enabled(), make_queue() helper, quorum queue declarations for all 4 backend queues, and QoS override
workers/shared/models/worker_models.py Added _add_quorum_queue_configuration() to declare worker queues as quorum when HA enabled
workers/shared/infrastructure/config/builder.py Added QoS semantics override in build_celery_app()
backend/sample.env, workers/sample.env, docker/sample.env Added RABBITMQ_HA_ENABLED=false

Why

Celery/kombu sends x-queue-type: classic by default when declaring queues, overriding the RabbitMQ server-side default_queue_type = quorum setting. Explicit queue declarations are required.

Related PRs

Test plan

  • Deploy with RABBITMQ_HA_ENABLED=false — verify no behavioral change
  • Deploy with RABBITMQ_HA_ENABLED=true and RabbitMQ cluster — verify quorum queues created
  • Verify rabbitmqctl list_queues name type shows quorum type for application queues
  • Verify internal Celery queues (pidbox, etc.) remain classic
  • Verify end-to-end task processing works with quorum queues

🤖 Generated with Claude Code

When RABBITMQ_HA_ENABLED=true, all application Celery queues are
declared with x-queue-type: quorum for RabbitMQ HA compatibility.
This is necessary because Celery/kombu sends x-queue-type: classic
by default, overriding the server-side default_queue_type setting.

Also overrides qos_semantics_matches_spec since quorum queues do
not support global (channel-level) QoS.

Internal Celery queues (pidbox, reply) are unaffected as they are
managed by Celery internals and require auto-delete (incompatible
with quorum queues).

Related: Zipstack/unstract-llm-whisperer#599
Related: Zipstack/unstract-cloud#1345

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 10, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Adds RabbitMQ HA support: when RABBITMQ_HA_ENABLED is true, worker startup replaces Celery task_queues with quorum queues and monkey-patches pyamqp.Transport.qos_semantics_matches_spec to avoid NOT_IMPLEMENTED errors.

Changes

Cohort / File(s) Summary
Backend Celery Configuration
backend/backend/celery_config.py
Adds is_rabbitmq_ha_enabled() and make_queue(); when HA enabled, builds task_queues as quorum queues at import time and includes QoS-semantics override logic.
Worker Infrastructure - HA helper
workers/shared/infrastructure/config/rabbitmq_ha.py
New module apply_rabbitmq_ha(app, worker_celery_config) that replaces app.conf.task_queues with quorum queues (sorted) and patches pyamqp.Transport.qos_semantics_matches_spec.
Worker Builder Hooks
workers/shared/infrastructure/config/builder.py, workers/shared/worker_builder.py
Call apply_rabbitmq_ha(app, worker_celery_config) after celery config overrides so quorum-queue declarations and QoS patching apply at runtime.

Sequence Diagram(s)

mermaid
sequenceDiagram
participant Builder as Builder / startup
participant CeleryApp as Celery App
participant HAConfig as rabbitmq_ha.apply_rabbitmq_ha
participant PyAMQP as pyamqp.Transport
participant RabbitMQ as RabbitMQ

Builder->>CeleryApp: apply celery_config overrides
Note right of CeleryApp: merged app.conf ready
Builder->>HAConfig: apply_rabbitmq_ha(app, worker_celery_config)
HAConfig->>CeleryApp: set app.conf.task_queues (quorum queues)
HAConfig->>PyAMQP: set qos_semantics_matches_spec -> true (monkey-patch)
CeleryApp->>RabbitMQ: declare quorum queues on connect

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is comprehensive with clear summary, component changes, rationale, and test plan, but is missing several required template sections including Env Config, Database Migrations, Dependencies Versions, Checklist, and the required section on breaking changes. Fill in all required template sections: add explicit Env Config section documenting RABBITMQ_HA_ENABLED, confirm no database migrations needed, list dependency versions if applicable, confirm no breaking changes, and complete the contribution guidelines checklist.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly summarizes the main change: adding configurable RabbitMQ HA mode with quorum queues. It is specific, concise, and directly related to the primary objective of the changeset.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/rabbitmq-ha-quorum-queues

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

pre-commit-ci bot and others added 3 commits March 10, 2026 09:45
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This env var is only used in K8s deployments and is injected
via Helm values in unstract-cloud. No need to expose it in OSS.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@johnyrahul johnyrahul changed the title feat: Add configurable RabbitMQ HA mode with quorum queues [FEAT] Add configurable RabbitMQ HA mode with quorum queues Mar 10, 2026
@johnyrahul johnyrahul marked this pull request as ready for review March 10, 2026 10:15
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
workers/shared/infrastructure/config/builder.py (1)

99-106: QoS override logic is correct but duplicated with celery_config.py.

The monkey-patch correctly forces per-consumer prefetch semantics for quorum queue compatibility. The unused lambda arguments (self, conn) are intentional since the override unconditionally returns True.

However, this logic is duplicated in backend/backend/celery_config.py (lines 84-90). Consider extracting a shared utility to avoid drift:

# shared/utils/rabbitmq_ha.py
def patch_qos_for_quorum_queues():
    if os.environ.get("RABBITMQ_HA_ENABLED", "").lower() == "true":
        from kombu.transport import pyamqp
        pyamqp.Transport.qos_semantics_matches_spec = lambda self, conn: True

This is acceptable for now since backend and workers may be deployed separately, but the duplication increases maintenance burden.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@workers/shared/infrastructure/config/builder.py` around lines 99 - 106,
Extract the duplicated RabbitMQ HA monkey-patch into a shared helper (e.g.
patch_qos_for_quorum_queues) and call it from both
workers/shared/infrastructure/config/builder.py and
backend/backend/celery_config.py; specifically move the logic that sets
pyamqp.Transport.qos_semantics_matches_spec = lambda self, conn: True behind a
single function (ensuring it still checks
os.environ.get("RABBITMQ_HA_ENABLED").lower() == "true") and replace the inline
monkey-patch in both places with an import and invocation of that helper.
backend/backend/celery_config.py (1)

69-82: Hardcoded queue list may drift from actual queue usage.

The _BACKEND_QUEUES list is manually maintained. If new queues are added to backend task routing elsewhere, they must also be added here for HA compatibility. Consider:

  1. Adding a comment noting where this list should be kept in sync with
  2. Or, deriving the list from a single source of truth (e.g., a shared constant or Django settings)

The commit history shows dashboard_metric_events was added in a follow-up fix, indicating this list can be easily missed during updates.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/backend/celery_config.py` around lines 69 - 82, The hardcoded
_BACKEND_QUEUES list used when is_rabbitmq_ha_enabled() runs can drift from
actual task routing; replace or centralize it by deriving the queue names from a
single source of truth and update CeleryConfig.task_queues = [make_queue(q) for
q in _BACKEND_QUEUES] accordingly: either import the canonical queue list from
existing routing/constants in your project (e.g., a shared QUEUES constant or
Django settings) and use that instead of the inline _BACKEND_QUEUES, or add a
clear comment pointing to the canonical location and a test to assert parity
between the canonical list and _BACKEND_QUEUES (reference symbols:
_BACKEND_QUEUES, is_rabbitmq_ha_enabled, CeleryConfig.task_queues, make_queue).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@backend/backend/celery_config.py`:
- Around line 69-82: The hardcoded _BACKEND_QUEUES list used when
is_rabbitmq_ha_enabled() runs can drift from actual task routing; replace or
centralize it by deriving the queue names from a single source of truth and
update CeleryConfig.task_queues = [make_queue(q) for q in _BACKEND_QUEUES]
accordingly: either import the canonical queue list from existing
routing/constants in your project (e.g., a shared QUEUES constant or Django
settings) and use that instead of the inline _BACKEND_QUEUES, or add a clear
comment pointing to the canonical location and a test to assert parity between
the canonical list and _BACKEND_QUEUES (reference symbols: _BACKEND_QUEUES,
is_rabbitmq_ha_enabled, CeleryConfig.task_queues, make_queue).

In `@workers/shared/infrastructure/config/builder.py`:
- Around line 99-106: Extract the duplicated RabbitMQ HA monkey-patch into a
shared helper (e.g. patch_qos_for_quorum_queues) and call it from both
workers/shared/infrastructure/config/builder.py and
backend/backend/celery_config.py; specifically move the logic that sets
pyamqp.Transport.qos_semantics_matches_spec = lambda self, conn: True behind a
single function (ensuring it still checks
os.environ.get("RABBITMQ_HA_ENABLED").lower() == "true") and replace the inline
monkey-patch in both places with an import and invocation of that helper.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 9903fddb-438f-4989-af6b-73a3c56a13fb

📥 Commits

Reviewing files that changed from the base of the PR and between d645846 and 4c0f82c.

📒 Files selected for processing (3)
  • backend/backend/celery_config.py
  • workers/shared/infrastructure/config/builder.py
  • workers/shared/models/worker_models.py

@johnyrahul johnyrahul requested a review from ritwik-g March 10, 2026 10:37
@johnyrahul
Copy link
Contributor Author

@muhammad-ali-e @chandrasekharan-zipstack

Quorum Queues (41 queues) - working as expected:

  • Application queues: celery, celery_api_deployments, celery_log_task_queue, celery_periodic_logs
  • Worker queues: file_processing, file_processing_callback, api_file_processing, api_file_processing_callback
  • Notification queues: notifications, notifications_email, notifications_sms, notifications_webhook, notifications_priority
  • Scheduler: scheduler
  • Delayed queues: celery_delayed_0 through celery_delayed_27 (28 queues for retry/ETA support)

Classic Queues (14 queues) - expected, these are Celery internals:

  • Pidbox queues (6): Control/management queues per worker (e.g., celery@...celery.pidbox) — these are ephemeral and must remain
    classic
  • Celery event queues (6): celeryev.* — ephemeral event monitoring queues, also must remain classic
  • file_processing_priority (1): This one is classic but probably shouldn't be — it's an application queue that may have been
    missed

Issue Found:

file_processing_priority is the only application queue still declared as classic. It's likely missing from the worker's quorum
queue configuration. You may want to check if this queue needs to be added to the quorum queue declarations in the worker
config.

1 similar comment
@johnyrahul
Copy link
Contributor Author

@muhammad-ali-e @chandrasekharan-zipstack

Quorum Queues (41 queues) - working as expected:

  • Application queues: celery, celery_api_deployments, celery_log_task_queue, celery_periodic_logs
  • Worker queues: file_processing, file_processing_callback, api_file_processing, api_file_processing_callback
  • Notification queues: notifications, notifications_email, notifications_sms, notifications_webhook, notifications_priority
  • Scheduler: scheduler
  • Delayed queues: celery_delayed_0 through celery_delayed_27 (28 queues for retry/ETA support)

Classic Queues (14 queues) - expected, these are Celery internals:

  • Pidbox queues (6): Control/management queues per worker (e.g., celery@...celery.pidbox) — these are ephemeral and must remain
    classic
  • Celery event queues (6): celeryev.* — ephemeral event monitoring queues, also must remain classic
  • file_processing_priority (1): This one is classic but probably shouldn't be — it's an application queue that may have been
    missed

Issue Found:

file_processing_priority is the only application queue still declared as classic. It's likely missing from the worker's quorum
queue configuration. You may want to check if this queue needs to be added to the quorum queue declarations in the worker
config.

@johnyrahul
Copy link
Contributor Author

@muhammad-ali-e @chandrasekharan-zipstack

Quorum Queues (41 queues) - working as expected:

  • Application queues: celery, celery_api_deployments, celery_log_task_queue, celery_periodic_logs
  • Worker queues: file_processing, file_processing_callback, api_file_processing, api_file_processing_callback
  • Notification queues: notifications, notifications_email, notifications_sms, notifications_webhook, notifications_priority
  • Scheduler: scheduler
  • Delayed queues: celery_delayed_0 through celery_delayed_27 (28 queues for retry/ETA support)

Classic Queues (14 queues) - expected, these are Celery internals:

  • Pidbox queues (6): Control/management queues per worker (e.g., celery@...celery.pidbox) — these are ephemeral and must remain
    classic
  • Celery event queues (6): celeryev.* — ephemeral event monitoring queues, also must remain classic
  • file_processing_priority (1): This one is classic but probably shouldn't be — it's an application queue that may have been
    missed

Issue Found:

file_processing_priority is the only application queue still declared as classic. It's likely missing from the worker's quorum queue configuration. You may want to check if this queue needs to be added to the quorum queue declarations in the worker config.

@muhammad-ali-e what is this delayed queues?

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 12, 2026

Greptile Summary

This PR adds configurable RabbitMQ HA support using quorum queues across the backend and worker services. When RABBITMQ_HA_ENABLED=true, application Celery queues are explicitly declared with x-queue-type: quorum, and pyamqp.Transport.qos_semantics_matches_spec is patched to use per-consumer QoS (required because quorum queues reject channel-level global=True QoS calls). Internal Celery queues (pidbox, reply queues) are intentionally left as classic queues since they require auto-delete.

Key design points:

  • Backend (celery_config.py) applies configuration at module import time via a module-level if block, consistent with its config_from_object pattern.
  • Workers (rabbitmq_ha.py + both WorkerBuilder variants) apply configuration at build time, correctly placed after all override_config overrides have been merged into app.conf, preventing any override from silently replacing the quorum queue declarations.
  • Queue ordering is deterministic via sorted() in apply_rabbitmq_ha.
  • The make_queue() helper in celery_config.py retains an unnecessary kwargs: dict intermediate variable — a leftover from a removed routing_key parameter — that can be simplified.

Confidence Score: 4/5

  • This PR is safe to merge; the HA path is opt-in and the default (RABBITMQ_HA_ENABLED=false) preserves existing behaviour exactly.
  • The implementation is well-structured: HA logic is centralized in rabbitmq_ha.py, quorum queue declarations are applied after overrides to prevent silent replacement, and sorted() ensures deterministic queue ordering. The one outstanding minor issue is a redundant kwargs variable in make_queue. No logic errors or security concerns were found; the only deductions are for the minor style issue and the inherent class-level monkey-patch on pyamqp.Transport (an acknowledged trade-off already discussed).
  • backend/backend/celery_config.py — the import-time class-level mutation of CeleryConfig.task_queues and pyamqp.Transport warrants care in test environments where RABBITMQ_HA_ENABLED may vary between test runs.

Important Files Changed

Filename Overview
backend/backend/celery_config.py Adds is_rabbitmq_ha_enabled(), make_queue(), and an import-time HA configuration block that declares backend queues as quorum and patches pyamqp.Transport.qos_semantics_matches_spec. The kwargs: dict intermediate in make_queue is a leftover from the removed routing_key parameter and could be simplified. Docstring still describes conditional behaviour but the function always creates quorum queues unconditionally.
workers/shared/infrastructure/config/rabbitmq_ha.py New centralized module for applying quorum-queue and QoS configuration. Uses sorted() to guarantee deterministic queue ordering and is called after all config overrides have been applied, correctly addressing previous concerns about ordering and override precedence.
workers/shared/infrastructure/config/builder.py Correctly integrates apply_rabbitmq_ha after app.conf.update(celery_config), ensuring HA quorum declarations always take precedence over any override_config values.
workers/shared/worker_builder.py Mirrors infrastructure/config/builder.py with the same apply_rabbitmq_ha call placed correctly after app.conf.update. Consistent with the newer builder implementation.

Sequence Diagram

sequenceDiagram
    participant ENV as Env (RABBITMQ_HA_ENABLED)
    participant Backend as celery_config.py (import time)
    participant Builder as WorkerBuilder.build_celery_app()
    participant HA as rabbitmq_ha.apply_rabbitmq_ha()
    participant App as Celery app.conf
    participant RMQ as RabbitMQ

    note over Backend: Module imported
    Backend->>ENV: is_rabbitmq_ha_enabled()?
    alt HA enabled
        Backend->>Backend: Set CeleryConfig.task_queues (quorum Queues)
        Backend->>Backend: Patch pyamqp.Transport.qos_semantics_matches_spec
    end

    note over Builder: build_celery_app() called
    Builder->>App: app.conf.update(celery_config)
    Builder->>App: app.conf.update(override_config) [if any]
    Builder->>HA: apply_rabbitmq_ha(app, worker_celery_config)
    HA->>ENV: RABBITMQ_HA_ENABLED == "true"?
    alt HA enabled
        HA->>App: app.conf.task_queues = sorted quorum Queues
        HA->>HA: Patch pyamqp.Transport.qos_semantics_matches_spec
    end

    note over App,RMQ: Worker starts
    App->>RMQ: Declare queues with x-queue-type: quorum
    RMQ-->>App: Queues confirmed as quorum type
Loading
Prompt To Fix All With AI
This is a comment left during a code review.
Path: backend/backend/celery_config.py
Line: 30-31

Comment:
**Unnecessary intermediate `kwargs` variable**

After the `routing_key` parameter was removed, the `kwargs` dict is now created only to be immediately unpacked in the very next line. There's no longer a reason for this indirection — the queue can be constructed directly:

```suggestion
    return Queue(name, queue_arguments={"x-queue-type": "quorum"})
```

How can I resolve this? If you propose a fix, please make it concise.

Last reviewed commit: 11e6805

johnyrahul and others added 3 commits March 12, 2026 14:04
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Signed-off-by: Rahul Johny <116638720+johnyrahul@users.noreply.github.com>
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Signed-off-by: Rahul Johny <116638720+johnyrahul@users.noreply.github.com>
@johnyrahul johnyrahul requested a review from ritwik-g March 12, 2026 08:40
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
backend/backend/celery_config.py (1)

81-87: Duplicated QoS override logic with builder.py.

This identical monkey-patch exists in workers/shared/infrastructure/config/builder.py:99-107. While double-patching the same value is harmless, maintaining duplicate logic increases risk of divergence. Consider extracting to a shared utility (e.g., a function in a common module) that both locations can call.

The static analysis warnings about unused lambda arguments (self, conn) are false positives—these parameters are required by the method signature being overridden.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/backend/celery_config.py` around lines 81 - 87, The QoS monkey-patch
(pyamqp.Transport.qos_semantics_matches_spec = lambda self, conn: True) is
duplicated in builder.py; extract it to a single shared helper (e.g., a function
like enable_quorum_qos_compatibility() in a common utilities module) and replace
both occurrences to call that helper from celery_config.py and
workers/shared/infrastructure/config/builder.py; ensure the helper performs the
same override using the full method signature (accepting self and conn) to avoid
static analysis false positives and keep behavior identical.
workers/shared/models/worker_models.py (1)

505-525: Consider consolidating quorum queue utilities.

This method duplicates the environment check and quorum queue creation logic found in backend/backend/celery_config.py (is_rabbitmq_ha_enabled() and make_queue()). If a shared utilities module is feasible between backend and workers, consolidating this logic would reduce maintenance burden and ensure consistent behavior.

The implementation is correct—the lazy import (line 519) and sorted queue list (line 524) are good practices.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@workers/shared/models/worker_models.py` around lines 505 - 525, This
duplicates the RabbitMQ HA env check and quorum queue creation logic; extract
the logic into a shared helper (e.g., is_rabbitmq_ha_enabled() and
make_queue(queue_name) used by backend) and call it from
_add_quorum_queue_configuration: replace the inline os.environ check with
is_rabbitmq_ha_enabled(), import and use make_queue(queue_name) (or a shared
wrapper that returns a kombu.Queue with {"x-queue-type":"quorum"}) for each name
from self.queue_config.all_queues(), preserving the lazy import behavior and
sorted order.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@backend/backend/celery_config.py`:
- Around line 81-87: The QoS monkey-patch
(pyamqp.Transport.qos_semantics_matches_spec = lambda self, conn: True) is
duplicated in builder.py; extract it to a single shared helper (e.g., a function
like enable_quorum_qos_compatibility() in a common utilities module) and replace
both occurrences to call that helper from celery_config.py and
workers/shared/infrastructure/config/builder.py; ensure the helper performs the
same override using the full method signature (accepting self and conn) to avoid
static analysis false positives and keep behavior identical.

In `@workers/shared/models/worker_models.py`:
- Around line 505-525: This duplicates the RabbitMQ HA env check and quorum
queue creation logic; extract the logic into a shared helper (e.g.,
is_rabbitmq_ha_enabled() and make_queue(queue_name) used by backend) and call it
from _add_quorum_queue_configuration: replace the inline os.environ check with
is_rabbitmq_ha_enabled(), import and use make_queue(queue_name) (or a shared
wrapper that returns a kombu.Queue with {"x-queue-type":"quorum"}) for each name
from self.queue_config.all_queues(), preserving the lazy import behavior and
sorted order.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 474311d3-d564-4cd5-9c1a-5cc3abb76157

📥 Commits

Reviewing files that changed from the base of the PR and between 4c0f82c and 5a233f1.

📒 Files selected for processing (2)
  • backend/backend/celery_config.py
  • workers/shared/models/worker_models.py

…rations

Move quorum queue and QoS configuration from WorkerCeleryConfig into
both builder modules, applied directly on app.conf after override_config
has been merged. This ensures HA declarations always win when
RABBITMQ_HA_ENABLED=true, regardless of what override_config contains.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
workers/shared/worker_builder.py (2)

88-88: Use underscore prefix for intentionally unused lambda parameters.

The lambda arguments are unused because this is a method signature override. Using underscore prefix makes the intent explicit and silences linters (ARG005).

✏️ Suggested fix
-            pyamqp.Transport.qos_semantics_matches_spec = lambda self, conn: True
+            pyamqp.Transport.qos_semantics_matches_spec = lambda _self, _conn: True
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@workers/shared/worker_builder.py` at line 88, The override of
pyamqp.Transport.qos_semantics_matches_spec uses an unused lambda parameter
named conn; change the lambda to use an underscore-prefixed parameter (e.g.,
_conn) to signal intentional non-use and satisfy linters (ARG005) when
overriding qos_semantics_matches_spec on the pyamqp.Transport class.

75-88: Duplicated HA configuration logic across modules.

This block is identical to lines 99-112 in workers/shared/infrastructure/config/builder.py. Both files implement the same quorum queue and QoS override logic. Additionally, backend/backend/celery_config.py already defines is_rabbitmq_ha_enabled() and make_queue() helpers that should be reused.

Consider extracting a shared helper to avoid drift between these implementations:

♻️ Suggested approach: create shared helper

Create a shared utility in the workers package:

# workers/shared/utils/rabbitmq_ha.py
import os
from kombu import Queue

def is_rabbitmq_ha_enabled() -> bool:
    return os.environ.get("RABBITMQ_HA_ENABLED", "").lower() == "true"

def configure_quorum_queues(app, queue_names: set[str]) -> None:
    """Configure quorum queues and QoS semantics for HA mode."""
    if not is_rabbitmq_ha_enabled():
        return
    
    from kombu.transport import pyamqp
    
    quorum_args = {"x-queue-type": "quorum"}
    app.conf.task_queues = [
        Queue(q, queue_arguments=quorum_args)
        for q in sorted(queue_names)
    ]
    pyamqp.Transport.qos_semantics_matches_spec = lambda _self, _conn: True

Then call configure_quorum_queues(app, worker_celery_config.queue_config.all_queues()) in both builder modules.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@workers/shared/worker_builder.py` around lines 75 - 88, Extract the
duplicated RabbitMQ HA logic into a shared helper (e.g., is_rabbitmq_ha_enabled
and configure_quorum_queues) and replace the inline block in worker_builder.py
that declares quorum queues and overrides QoS with a call to that helper;
specifically, move the quorum_args/Queue creation and the
pyamqp.Transport.qos_semantics_matches_spec override into
configure_quorum_queues(app, queue_names) and call it with
worker_celery_config.queue_config.all_queues(), or reuse the existing
is_rabbitmq_ha_enabled()/make_queue() helpers from backend.backend.celery_config
if preferred, ensuring the original behavior (sorting queue names, setting
app.conf.task_queues, and setting qos_semantics_matches_spec) is preserved.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@workers/shared/worker_builder.py`:
- Line 88: The override of pyamqp.Transport.qos_semantics_matches_spec uses an
unused lambda parameter named conn; change the lambda to use an
underscore-prefixed parameter (e.g., _conn) to signal intentional non-use and
satisfy linters (ARG005) when overriding qos_semantics_matches_spec on the
pyamqp.Transport class.
- Around line 75-88: Extract the duplicated RabbitMQ HA logic into a shared
helper (e.g., is_rabbitmq_ha_enabled and configure_quorum_queues) and replace
the inline block in worker_builder.py that declares quorum queues and overrides
QoS with a call to that helper; specifically, move the quorum_args/Queue
creation and the pyamqp.Transport.qos_semantics_matches_spec override into
configure_quorum_queues(app, queue_names) and call it with
worker_celery_config.queue_config.all_queues(), or reuse the existing
is_rabbitmq_ha_enabled()/make_queue() helpers from backend.backend.celery_config
if preferred, ensuring the original behavior (sorting queue names, setting
app.conf.task_queues, and setting qos_semantics_matches_spec) is preserved.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 26dd76c8-c9e5-4628-8ebc-09822fbd74ec

📥 Commits

Reviewing files that changed from the base of the PR and between 5a233f1 and c20421d.

📒 Files selected for processing (2)
  • workers/shared/infrastructure/config/builder.py
  • workers/shared/worker_builder.py

johnyrahul and others added 2 commits March 12, 2026 14:59
Both WorkerBuilder implementations had an identical HA configuration
block. Extract it into a single apply_rabbitmq_ha() function in
shared/infrastructure/config/rabbitmq_ha.py so future changes only
need to be made in one place.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@workers/shared/infrastructure/config/rabbitmq_ha.py`:
- Around line 30-34: Don't replace app.conf.task_queues with newly built Queue
objects from worker_celery_config.queue_config.all_queues(); instead iterate the
existing app.conf.task_queues and for each Queue object merge or set
queue.queue_arguments to include {"x-queue-type": "quorum"} while preserving its
existing exchange, routing_key, and other arguments; then for any queue name
returned by worker_celery_config.queue_config.all_queues() that isn't present in
the existing app.conf.task_queues, create a new Queue(name,
queue_arguments={"x-queue-type":"quorum"}) and append it. Ensure you reference
app.conf.task_queues, Queue, and worker_celery_config.queue_config.all_queues()
when implementing the change.
- Around line 24-25: The monkey-patch of
pyamqp.Transport.qos_semantics_matches_spec must save the original method and
restore it when HA is disabled or during app teardown: before replacing
pyamqp.Transport.qos_semantics_matches_spec, store the reference (e.g. orig_qos
= pyamqp.Transport.qos_semantics_matches_spec) and only apply the patch when
os.environ.get("RABBITMQ_HA_ENABLED", "").lower() == "true"; when the flag is
false or in your shutdown/cleanup path, set
pyamqp.Transport.qos_semantics_matches_spec back to orig_qos (or avoid global
monkey-patching by composing a per-instance wrapper used by the HA code path) so
the global kombu state is not permanently modified.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: bf3426b6-0462-4f84-831d-02fc986fb189

📥 Commits

Reviewing files that changed from the base of the PR and between c20421d and 1e6b4b1.

📒 Files selected for processing (3)
  • workers/shared/infrastructure/config/builder.py
  • workers/shared/infrastructure/config/rabbitmq_ha.py
  • workers/shared/worker_builder.py

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Signed-off-by: Rahul Johny <116638720+johnyrahul@users.noreply.github.com>
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Signed-off-by: Rahul Johny <116638720+johnyrahul@users.noreply.github.com>
@github-actions
Copy link
Contributor

Test Results

Summary
  • Runner Tests: 11 passed, 0 failed (11 total)

Runner Tests - Full Report
filepath function $$\textcolor{#23d18b}{\tt{passed}}$$ SUBTOTAL
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_logs}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_cleanup}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_cleanup\_skip}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_client\_init}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_get\_image\_exists}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_get\_image}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_get\_container\_run\_config}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_get\_container\_run\_config\_without\_mount}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_run\_container}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_get\_image\_for\_sidecar}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{runner/src/unstract/runner/clients/test\_docker.py}}$$ $$\textcolor{#23d18b}{\tt{test\_sidecar\_container}}$$ $$\textcolor{#23d18b}{\tt{1}}$$ $$\textcolor{#23d18b}{\tt{1}}$$
$$\textcolor{#23d18b}{\tt{TOTAL}}$$ $$\textcolor{#23d18b}{\tt{11}}$$ $$\textcolor{#23d18b}{\tt{11}}$$

@sonarqubecloud
Copy link

@chandrasekharan-zipstack chandrasekharan-zipstack merged commit 09d852c into main Mar 12, 2026
6 checks passed
@chandrasekharan-zipstack chandrasekharan-zipstack deleted the feat/rabbitmq-ha-quorum-queues branch March 12, 2026 09:50
Comment on lines +30 to +31
kombu Queue instance with quorum queue arguments.
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary intermediate kwargs variable

After the routing_key parameter was removed, the kwargs dict is now created only to be immediately unpacked in the very next line. There's no longer a reason for this indirection — the queue can be constructed directly:

Suggested change
kombu Queue instance with quorum queue arguments.
"""
return Queue(name, queue_arguments={"x-queue-type": "quorum"})
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/backend/celery_config.py
Line: 30-31

Comment:
**Unnecessary intermediate `kwargs` variable**

After the `routing_key` parameter was removed, the `kwargs` dict is now created only to be immediately unpacked in the very next line. There's no longer a reason for this indirection — the queue can be constructed directly:

```suggestion
    return Queue(name, queue_arguments={"x-queue-type": "quorum"})
```

How can I resolve this? If you propose a fix, please make it concise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants