[FEAT] Add configurable RabbitMQ HA mode with quorum queues#1840
[FEAT] Add configurable RabbitMQ HA mode with quorum queues#1840chandrasekharan-zipstack merged 14 commits intomainfrom
Conversation
When RABBITMQ_HA_ENABLED=true, all application Celery queues are declared with x-queue-type: quorum for RabbitMQ HA compatibility. This is necessary because Celery/kombu sends x-queue-type: classic by default, overriding the server-side default_queue_type setting. Also overrides qos_semantics_matches_spec since quorum queues do not support global (channel-level) QoS. Internal Celery queues (pidbox, reply) are unaffected as they are managed by Celery internals and require auto-delete (incompatible with quorum queues). Related: Zipstack/unstract-llm-whisperer#599 Related: Zipstack/unstract-cloud#1345 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdds RabbitMQ HA support: when RABBITMQ_HA_ENABLED is true, worker startup replaces Celery task_queues with quorum queues and monkey-patches pyamqp.Transport.qos_semantics_matches_spec to avoid NOT_IMPLEMENTED errors. Changes
Sequence Diagram(s)mermaid Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
for more information, see https://pre-commit.ci
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This env var is only used in K8s deployments and is injected via Helm values in unstract-cloud. No need to expose it in OSS. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (2)
workers/shared/infrastructure/config/builder.py (1)
99-106: QoS override logic is correct but duplicated withcelery_config.py.The monkey-patch correctly forces per-consumer prefetch semantics for quorum queue compatibility. The unused lambda arguments (
self,conn) are intentional since the override unconditionally returnsTrue.However, this logic is duplicated in
backend/backend/celery_config.py(lines 84-90). Consider extracting a shared utility to avoid drift:# shared/utils/rabbitmq_ha.py def patch_qos_for_quorum_queues(): if os.environ.get("RABBITMQ_HA_ENABLED", "").lower() == "true": from kombu.transport import pyamqp pyamqp.Transport.qos_semantics_matches_spec = lambda self, conn: TrueThis is acceptable for now since backend and workers may be deployed separately, but the duplication increases maintenance burden.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/shared/infrastructure/config/builder.py` around lines 99 - 106, Extract the duplicated RabbitMQ HA monkey-patch into a shared helper (e.g. patch_qos_for_quorum_queues) and call it from both workers/shared/infrastructure/config/builder.py and backend/backend/celery_config.py; specifically move the logic that sets pyamqp.Transport.qos_semantics_matches_spec = lambda self, conn: True behind a single function (ensuring it still checks os.environ.get("RABBITMQ_HA_ENABLED").lower() == "true") and replace the inline monkey-patch in both places with an import and invocation of that helper.backend/backend/celery_config.py (1)
69-82: Hardcoded queue list may drift from actual queue usage.The
_BACKEND_QUEUESlist is manually maintained. If new queues are added to backend task routing elsewhere, they must also be added here for HA compatibility. Consider:
- Adding a comment noting where this list should be kept in sync with
- Or, deriving the list from a single source of truth (e.g., a shared constant or Django settings)
The commit history shows
dashboard_metric_eventswas added in a follow-up fix, indicating this list can be easily missed during updates.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/backend/celery_config.py` around lines 69 - 82, The hardcoded _BACKEND_QUEUES list used when is_rabbitmq_ha_enabled() runs can drift from actual task routing; replace or centralize it by deriving the queue names from a single source of truth and update CeleryConfig.task_queues = [make_queue(q) for q in _BACKEND_QUEUES] accordingly: either import the canonical queue list from existing routing/constants in your project (e.g., a shared QUEUES constant or Django settings) and use that instead of the inline _BACKEND_QUEUES, or add a clear comment pointing to the canonical location and a test to assert parity between the canonical list and _BACKEND_QUEUES (reference symbols: _BACKEND_QUEUES, is_rabbitmq_ha_enabled, CeleryConfig.task_queues, make_queue).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@backend/backend/celery_config.py`:
- Around line 69-82: The hardcoded _BACKEND_QUEUES list used when
is_rabbitmq_ha_enabled() runs can drift from actual task routing; replace or
centralize it by deriving the queue names from a single source of truth and
update CeleryConfig.task_queues = [make_queue(q) for q in _BACKEND_QUEUES]
accordingly: either import the canonical queue list from existing
routing/constants in your project (e.g., a shared QUEUES constant or Django
settings) and use that instead of the inline _BACKEND_QUEUES, or add a clear
comment pointing to the canonical location and a test to assert parity between
the canonical list and _BACKEND_QUEUES (reference symbols: _BACKEND_QUEUES,
is_rabbitmq_ha_enabled, CeleryConfig.task_queues, make_queue).
In `@workers/shared/infrastructure/config/builder.py`:
- Around line 99-106: Extract the duplicated RabbitMQ HA monkey-patch into a
shared helper (e.g. patch_qos_for_quorum_queues) and call it from both
workers/shared/infrastructure/config/builder.py and
backend/backend/celery_config.py; specifically move the logic that sets
pyamqp.Transport.qos_semantics_matches_spec = lambda self, conn: True behind a
single function (ensuring it still checks
os.environ.get("RABBITMQ_HA_ENABLED").lower() == "true") and replace the inline
monkey-patch in both places with an import and invocation of that helper.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 9903fddb-438f-4989-af6b-73a3c56a13fb
📒 Files selected for processing (3)
backend/backend/celery_config.pyworkers/shared/infrastructure/config/builder.pyworkers/shared/models/worker_models.py
|
@muhammad-ali-e @chandrasekharan-zipstack Quorum Queues (41 queues) - working as expected:
Classic Queues (14 queues) - expected, these are Celery internals:
Issue Found: file_processing_priority is the only application queue still declared as classic. It's likely missing from the worker's quorum |
1 similar comment
|
@muhammad-ali-e @chandrasekharan-zipstack Quorum Queues (41 queues) - working as expected:
Classic Queues (14 queues) - expected, these are Celery internals:
Issue Found: file_processing_priority is the only application queue still declared as classic. It's likely missing from the worker's quorum |
@muhammad-ali-e what is this delayed queues? |
Greptile SummaryThis PR adds configurable RabbitMQ HA support using quorum queues across the backend and worker services. When Key design points:
Confidence Score: 4/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant ENV as Env (RABBITMQ_HA_ENABLED)
participant Backend as celery_config.py (import time)
participant Builder as WorkerBuilder.build_celery_app()
participant HA as rabbitmq_ha.apply_rabbitmq_ha()
participant App as Celery app.conf
participant RMQ as RabbitMQ
note over Backend: Module imported
Backend->>ENV: is_rabbitmq_ha_enabled()?
alt HA enabled
Backend->>Backend: Set CeleryConfig.task_queues (quorum Queues)
Backend->>Backend: Patch pyamqp.Transport.qos_semantics_matches_spec
end
note over Builder: build_celery_app() called
Builder->>App: app.conf.update(celery_config)
Builder->>App: app.conf.update(override_config) [if any]
Builder->>HA: apply_rabbitmq_ha(app, worker_celery_config)
HA->>ENV: RABBITMQ_HA_ENABLED == "true"?
alt HA enabled
HA->>App: app.conf.task_queues = sorted quorum Queues
HA->>HA: Patch pyamqp.Transport.qos_semantics_matches_spec
end
note over App,RMQ: Worker starts
App->>RMQ: Declare queues with x-queue-type: quorum
RMQ-->>App: Queues confirmed as quorum type
Prompt To Fix All With AIThis is a comment left during a code review.
Path: backend/backend/celery_config.py
Line: 30-31
Comment:
**Unnecessary intermediate `kwargs` variable**
After the `routing_key` parameter was removed, the `kwargs` dict is now created only to be immediately unpacked in the very next line. There's no longer a reason for this indirection — the queue can be constructed directly:
```suggestion
return Queue(name, queue_arguments={"x-queue-type": "quorum"})
```
How can I resolve this? If you propose a fix, please make it concise.Last reviewed commit: 11e6805 |
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> Signed-off-by: Rahul Johny <116638720+johnyrahul@users.noreply.github.com>
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> Signed-off-by: Rahul Johny <116638720+johnyrahul@users.noreply.github.com>
There was a problem hiding this comment.
🧹 Nitpick comments (2)
backend/backend/celery_config.py (1)
81-87: Duplicated QoS override logic with builder.py.This identical monkey-patch exists in
workers/shared/infrastructure/config/builder.py:99-107. While double-patching the same value is harmless, maintaining duplicate logic increases risk of divergence. Consider extracting to a shared utility (e.g., a function in a common module) that both locations can call.The static analysis warnings about unused lambda arguments (
self,conn) are false positives—these parameters are required by the method signature being overridden.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/backend/celery_config.py` around lines 81 - 87, The QoS monkey-patch (pyamqp.Transport.qos_semantics_matches_spec = lambda self, conn: True) is duplicated in builder.py; extract it to a single shared helper (e.g., a function like enable_quorum_qos_compatibility() in a common utilities module) and replace both occurrences to call that helper from celery_config.py and workers/shared/infrastructure/config/builder.py; ensure the helper performs the same override using the full method signature (accepting self and conn) to avoid static analysis false positives and keep behavior identical.workers/shared/models/worker_models.py (1)
505-525: Consider consolidating quorum queue utilities.This method duplicates the environment check and quorum queue creation logic found in
backend/backend/celery_config.py(is_rabbitmq_ha_enabled()andmake_queue()). If a shared utilities module is feasible between backend and workers, consolidating this logic would reduce maintenance burden and ensure consistent behavior.The implementation is correct—the lazy import (line 519) and sorted queue list (line 524) are good practices.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/shared/models/worker_models.py` around lines 505 - 525, This duplicates the RabbitMQ HA env check and quorum queue creation logic; extract the logic into a shared helper (e.g., is_rabbitmq_ha_enabled() and make_queue(queue_name) used by backend) and call it from _add_quorum_queue_configuration: replace the inline os.environ check with is_rabbitmq_ha_enabled(), import and use make_queue(queue_name) (or a shared wrapper that returns a kombu.Queue with {"x-queue-type":"quorum"}) for each name from self.queue_config.all_queues(), preserving the lazy import behavior and sorted order.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@backend/backend/celery_config.py`:
- Around line 81-87: The QoS monkey-patch
(pyamqp.Transport.qos_semantics_matches_spec = lambda self, conn: True) is
duplicated in builder.py; extract it to a single shared helper (e.g., a function
like enable_quorum_qos_compatibility() in a common utilities module) and replace
both occurrences to call that helper from celery_config.py and
workers/shared/infrastructure/config/builder.py; ensure the helper performs the
same override using the full method signature (accepting self and conn) to avoid
static analysis false positives and keep behavior identical.
In `@workers/shared/models/worker_models.py`:
- Around line 505-525: This duplicates the RabbitMQ HA env check and quorum
queue creation logic; extract the logic into a shared helper (e.g.,
is_rabbitmq_ha_enabled() and make_queue(queue_name) used by backend) and call it
from _add_quorum_queue_configuration: replace the inline os.environ check with
is_rabbitmq_ha_enabled(), import and use make_queue(queue_name) (or a shared
wrapper that returns a kombu.Queue with {"x-queue-type":"quorum"}) for each name
from self.queue_config.all_queues(), preserving the lazy import behavior and
sorted order.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 474311d3-d564-4cd5-9c1a-5cc3abb76157
📒 Files selected for processing (2)
backend/backend/celery_config.pyworkers/shared/models/worker_models.py
…rations Move quorum queue and QoS configuration from WorkerCeleryConfig into both builder modules, applied directly on app.conf after override_config has been merged. This ensures HA declarations always win when RABBITMQ_HA_ENABLED=true, regardless of what override_config contains. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (2)
workers/shared/worker_builder.py (2)
88-88: Use underscore prefix for intentionally unused lambda parameters.The lambda arguments are unused because this is a method signature override. Using underscore prefix makes the intent explicit and silences linters (ARG005).
✏️ Suggested fix
- pyamqp.Transport.qos_semantics_matches_spec = lambda self, conn: True + pyamqp.Transport.qos_semantics_matches_spec = lambda _self, _conn: True🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/shared/worker_builder.py` at line 88, The override of pyamqp.Transport.qos_semantics_matches_spec uses an unused lambda parameter named conn; change the lambda to use an underscore-prefixed parameter (e.g., _conn) to signal intentional non-use and satisfy linters (ARG005) when overriding qos_semantics_matches_spec on the pyamqp.Transport class.
75-88: Duplicated HA configuration logic across modules.This block is identical to lines 99-112 in
workers/shared/infrastructure/config/builder.py. Both files implement the same quorum queue and QoS override logic. Additionally,backend/backend/celery_config.pyalready definesis_rabbitmq_ha_enabled()andmake_queue()helpers that should be reused.Consider extracting a shared helper to avoid drift between these implementations:
♻️ Suggested approach: create shared helper
Create a shared utility in the workers package:
# workers/shared/utils/rabbitmq_ha.py import os from kombu import Queue def is_rabbitmq_ha_enabled() -> bool: return os.environ.get("RABBITMQ_HA_ENABLED", "").lower() == "true" def configure_quorum_queues(app, queue_names: set[str]) -> None: """Configure quorum queues and QoS semantics for HA mode.""" if not is_rabbitmq_ha_enabled(): return from kombu.transport import pyamqp quorum_args = {"x-queue-type": "quorum"} app.conf.task_queues = [ Queue(q, queue_arguments=quorum_args) for q in sorted(queue_names) ] pyamqp.Transport.qos_semantics_matches_spec = lambda _self, _conn: TrueThen call
configure_quorum_queues(app, worker_celery_config.queue_config.all_queues())in both builder modules.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/shared/worker_builder.py` around lines 75 - 88, Extract the duplicated RabbitMQ HA logic into a shared helper (e.g., is_rabbitmq_ha_enabled and configure_quorum_queues) and replace the inline block in worker_builder.py that declares quorum queues and overrides QoS with a call to that helper; specifically, move the quorum_args/Queue creation and the pyamqp.Transport.qos_semantics_matches_spec override into configure_quorum_queues(app, queue_names) and call it with worker_celery_config.queue_config.all_queues(), or reuse the existing is_rabbitmq_ha_enabled()/make_queue() helpers from backend.backend.celery_config if preferred, ensuring the original behavior (sorting queue names, setting app.conf.task_queues, and setting qos_semantics_matches_spec) is preserved.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@workers/shared/worker_builder.py`:
- Line 88: The override of pyamqp.Transport.qos_semantics_matches_spec uses an
unused lambda parameter named conn; change the lambda to use an
underscore-prefixed parameter (e.g., _conn) to signal intentional non-use and
satisfy linters (ARG005) when overriding qos_semantics_matches_spec on the
pyamqp.Transport class.
- Around line 75-88: Extract the duplicated RabbitMQ HA logic into a shared
helper (e.g., is_rabbitmq_ha_enabled and configure_quorum_queues) and replace
the inline block in worker_builder.py that declares quorum queues and overrides
QoS with a call to that helper; specifically, move the quorum_args/Queue
creation and the pyamqp.Transport.qos_semantics_matches_spec override into
configure_quorum_queues(app, queue_names) and call it with
worker_celery_config.queue_config.all_queues(), or reuse the existing
is_rabbitmq_ha_enabled()/make_queue() helpers from backend.backend.celery_config
if preferred, ensuring the original behavior (sorting queue names, setting
app.conf.task_queues, and setting qos_semantics_matches_spec) is preserved.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 26dd76c8-c9e5-4628-8ebc-09822fbd74ec
📒 Files selected for processing (2)
workers/shared/infrastructure/config/builder.pyworkers/shared/worker_builder.py
Both WorkerBuilder implementations had an identical HA configuration block. Extract it into a single apply_rabbitmq_ha() function in shared/infrastructure/config/rabbitmq_ha.py so future changes only need to be made in one place. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@workers/shared/infrastructure/config/rabbitmq_ha.py`:
- Around line 30-34: Don't replace app.conf.task_queues with newly built Queue
objects from worker_celery_config.queue_config.all_queues(); instead iterate the
existing app.conf.task_queues and for each Queue object merge or set
queue.queue_arguments to include {"x-queue-type": "quorum"} while preserving its
existing exchange, routing_key, and other arguments; then for any queue name
returned by worker_celery_config.queue_config.all_queues() that isn't present in
the existing app.conf.task_queues, create a new Queue(name,
queue_arguments={"x-queue-type":"quorum"}) and append it. Ensure you reference
app.conf.task_queues, Queue, and worker_celery_config.queue_config.all_queues()
when implementing the change.
- Around line 24-25: The monkey-patch of
pyamqp.Transport.qos_semantics_matches_spec must save the original method and
restore it when HA is disabled or during app teardown: before replacing
pyamqp.Transport.qos_semantics_matches_spec, store the reference (e.g. orig_qos
= pyamqp.Transport.qos_semantics_matches_spec) and only apply the patch when
os.environ.get("RABBITMQ_HA_ENABLED", "").lower() == "true"; when the flag is
false or in your shutdown/cleanup path, set
pyamqp.Transport.qos_semantics_matches_spec back to orig_qos (or avoid global
monkey-patching by composing a per-instance wrapper used by the HA code path) so
the global kombu state is not permanently modified.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: bf3426b6-0462-4f84-831d-02fc986fb189
📒 Files selected for processing (3)
workers/shared/infrastructure/config/builder.pyworkers/shared/infrastructure/config/rabbitmq_ha.pyworkers/shared/worker_builder.py
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> Signed-off-by: Rahul Johny <116638720+johnyrahul@users.noreply.github.com>
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> Signed-off-by: Rahul Johny <116638720+johnyrahul@users.noreply.github.com>
Test ResultsSummary
Runner Tests - Full Report
|
|
| kombu Queue instance with quorum queue arguments. | ||
| """ |
There was a problem hiding this comment.
Unnecessary intermediate kwargs variable
After the routing_key parameter was removed, the kwargs dict is now created only to be immediately unpacked in the very next line. There's no longer a reason for this indirection — the queue can be constructed directly:
| kombu Queue instance with quorum queue arguments. | |
| """ | |
| return Queue(name, queue_arguments={"x-queue-type": "quorum"}) |
Prompt To Fix With AI
This is a comment left during a code review.
Path: backend/backend/celery_config.py
Line: 30-31
Comment:
**Unnecessary intermediate `kwargs` variable**
After the `routing_key` parameter was removed, the `kwargs` dict is now created only to be immediately unpacked in the very next line. There's no longer a reason for this indirection — the queue can be constructed directly:
```suggestion
return Queue(name, queue_arguments={"x-queue-type": "quorum"})
```
How can I resolve this? If you propose a fix, please make it concise.


Summary
RABBITMQ_HA_ENABLED=true, all application Celery queues are declared withx-queue-type: quorumfor RabbitMQ HA compatibilityqos_semantics_matches_specsince quorum queues don't support global (channel-level) QoSauto-deletefalse— no behavioral change unless explicitly enabledChanges by component
backend/backend/celery_config.pyis_rabbitmq_ha_enabled(),make_queue()helper, quorum queue declarations for all 4 backend queues, and QoS overrideworkers/shared/models/worker_models.py_add_quorum_queue_configuration()to declare worker queues as quorum when HA enabledworkers/shared/infrastructure/config/builder.pybuild_celery_app()backend/sample.env,workers/sample.env,docker/sample.envRABBITMQ_HA_ENABLED=falseWhy
Celery/kombu sends
x-queue-type: classicby default when declaring queues, overriding the RabbitMQ server-sidedefault_queue_type = quorumsetting. Explicit queue declarations are required.Related PRs
RABBITMQ_HA_ENABLEDenv var)Test plan
RABBITMQ_HA_ENABLED=false— verify no behavioral changeRABBITMQ_HA_ENABLED=trueand RabbitMQ cluster — verify quorum queues createdrabbitmqctl list_queues name typeshows quorum type for application queues🤖 Generated with Claude Code