This document describes the current stable version of Celery (5.6). For development docs, go here.
celery.worker.consumer¶
Worker consumer.
- class celery.worker.consumer.Agent(c, **kwargs)[source]¶
Agent starts https://pypi.org/project/cell/ actors.
- conditional = True¶
- name = 'celery.worker.consumer.agent.Agent'¶
- requires = (step:celery.worker.consumer.connection.Connection{()},)¶
- class celery.worker.consumer.Connection(c, **kwargs)[source]¶
Service managing the consumer broker connection.
- name = 'celery.worker.consumer.connection.Connection'¶
- class celery.worker.consumer.Consumer(on_task_request, init_callback=<function noop>, hostname=None, pool=None, app=None, timer=None, controller=None, hub=None, amqheartbeat=None, worker_options=None, disable_rate_limits=False, initial_prefetch_count=2, prefetch_multiplier=1, **kwargs)[source]¶
Consumer blueprint.
- class Blueprint(steps=None, name=None, on_start=None, on_close=None, on_stopped=None)[source]¶
Consumer blueprint.
- default_steps = ['celery.worker.consumer.connection:Connection', 'celery.worker.consumer.mingle:Mingle', 'celery.worker.consumer.events:Events', 'celery.worker.consumer.gossip:Gossip', 'celery.worker.consumer.heart:Heart', 'celery.worker.consumer.control:Control', 'celery.worker.consumer.tasks:Tasks', 'celery.worker.consumer.delayed_delivery:DelayedDelivery', 'celery.worker.consumer.consumer:Evloop', 'celery.worker.consumer.agent:Agent']¶
- name = 'Consumer'¶
- Strategies¶
alias of
dict
- broker_connection_retry_attempt = 0¶
Counter to track number of conn retry attempts to broker. Will be reset to 0 once successful
- cancel_active_requests()[source]¶
Cancel active requests during shutdown.
Cancels all active requests that either do not require late acknowledgments or, if they do, have not been acknowledged yet.
Does not cancel successful tasks, even if they have not been acknowledged yet.
- connect()[source]¶
Establish the broker connection used for consuming tasks.
Retries establishing the connection if the
broker_connection_retrysetting is enabled
- first_connection_attempt = True¶
This flag will be turned off after the first failed connection attempt.
- init_callback = None¶
Optional callback called the first time the worker is ready to receive tasks.
- property max_prefetch_count¶
- on_decode_error(message, exc)[source]¶
Callback called if an error occurs while decoding a message.
Simply logs the error and acknowledges the message so it doesn’t enter a loop.
- Parameters:
message (kombu.Message) – The message received.
exc (Exception) – The exception being handled.
- pool = None¶
The current worker pool instance.
- restart_count = -1¶
- timer = None¶
A timer used for high-priority internal tasks, such as sending heartbeats.
- class celery.worker.consumer.Control(c, **kwargs)[source]¶
Remote control command service.
- include_if(c)[source]¶
Return true if bootstep should be included.
You can define this as an optional predicate that decides whether this step should be created.
- name = 'celery.worker.consumer.control.Control'¶
- requires = (step:celery.worker.consumer.tasks.Tasks{(step:celery.worker.consumer.mingle.Mingle{(step:celery.worker.consumer.events.Events{(step:celery.worker.consumer.connection.Connection{()},)},)},)},)¶
- class celery.worker.consumer.Events(c, task_events=True, without_heartbeat=False, without_gossip=False, **kwargs)[source]¶
Service used for sending monitoring events.
- name = 'celery.worker.consumer.events.Events'¶
- requires = (step:celery.worker.consumer.connection.Connection{()},)¶
- class celery.worker.consumer.Gossip(c, without_gossip=False, interval=5.0, heartbeat_interval=2.0, **kwargs)[source]¶
Bootstep consuming events from other workers.
This keeps the logical clock value up to date.
- compatible_transports = {'amqp', 'redis'}¶
- label = 'Gossip'¶
- name = 'celery.worker.consumer.gossip.Gossip'¶
- requires = (step:celery.worker.consumer.mingle.Mingle{(step:celery.worker.consumer.events.Events{(step:celery.worker.consumer.connection.Connection{()},)},)},)¶
- class celery.worker.consumer.Heart(c, without_heartbeat=False, heartbeat_interval=None, **kwargs)[source]¶
Bootstep sending event heartbeats.
This service sends a
worker-heartbeatmessage every n seconds.Note
Not to be confused with AMQP protocol level heartbeats.
- name = 'celery.worker.consumer.heart.Heart'¶
- requires = (step:celery.worker.consumer.events.Events{(step:celery.worker.consumer.connection.Connection{()},)},)¶
- shutdown(c)¶
- class celery.worker.consumer.Mingle(c, without_mingle=False, **kwargs)[source]¶
Bootstep syncing state with neighbor workers.
At startup, or upon consumer restart, this will:
Sync logical clocks.
Sync revoked tasks.
- compatible_transports = {'amqp', 'gcpubsub', 'redis'}¶
- label = 'Mingle'¶
- name = 'celery.worker.consumer.mingle.Mingle'¶
- requires = (step:celery.worker.consumer.events.Events{(step:celery.worker.consumer.connection.Connection{()},)},)¶
- class celery.worker.consumer.Tasks(c, **kwargs)[source]¶
Bootstep starting the task message consumer.
- name = 'celery.worker.consumer.tasks.Tasks'¶
- requires = (step:celery.worker.consumer.mingle.Mingle{(step:celery.worker.consumer.events.Events{(step:celery.worker.consumer.connection.Connection{()},)},)},)¶