Consumer: high-level message reader¶
-
class
gnsq.
Consumer
(topic, channel, nsqd_tcp_addresses=[], lookupd_http_addresses=[], name=None, message_handler=None, max_tries=5, max_in_flight=1, requeue_delay=0, lookupd_poll_interval=60, lookupd_poll_jitter=0.3, low_ready_idle_timeout=10, max_backoff_duration=128, backoff_on_requeue=True, **kwargs)[source]¶ High level NSQ consumer.
A Consumer will connect to the nsqd tcp addresses or poll the provided nsqlookupd http addresses for the configured topic and send signals to message handlers connected to the
on_message
signal or provided bymessage_handler
.Messages will automatically be finished when the message handle returns unless
message.enable_async()
is called. If an exception occurs orNSQRequeueMessage
is raised, the message will be requeued.The Consumer will handle backing off of failed messages up to a configurable
max_interval
as well as automatically reconnecting to dropped connections.Example usage:
from gnsq import Consumer consumer = gnsq.Consumer('topic', 'channel', 'localhost:4150') @consumer.on_message.connect def handler(consumer, message): print 'got message:', message.body consumer.start()
Parameters: - topic – specifies the desired NSQ topic
- channel – specifies the desired NSQ channel
- nsqd_tcp_addresses – a sequence of string addresses of the nsqd instances this consumer should connect to
- lookupd_http_addresses – a sequence of string addresses of the nsqlookupd instances this consumer should query for producers of the specified topic
- name – a string that is used for logging messages (defaults to
'gnsq.consumer.{topic}.{channel}'
) - message_handler – the callable that will be executed for each message received
- max_tries – the maximum number of attempts the consumer will make to process a message after which messages will be automatically discarded
- max_in_flight – the maximum number of messages this consumer will pipeline for processing. this value will be divided evenly amongst the configured/discovered nsqd producers
- requeue_delay – the default delay to use when requeueing a failed message
- lookupd_poll_interval – the amount of time in seconds between querying all of the supplied nsqlookupd instances. A random amount of time based on this value will be initially introduced in order to add jitter when multiple consumers are running
- lookupd_poll_jitter – the maximum fractional amount of jitter to add to the lookupd poll loop. This helps evenly distribute requests even if multiple consumers restart at the same time.
- low_ready_idle_timeout – the amount of time in seconds to wait for a message from a producer when in a state where RDY counts are re-distributed (ie. max_in_flight < num_producers)
- max_backoff_duration – the maximum time we will allow a backoff state to last in seconds. If zero, backoff wil not occur
- backoff_on_requeue – if
False
, backoff will only occur on exception - **kwargs –
passed to
NsqdTCPClient
initialization
-
is_running
¶ Check if consumer is currently running.
-
is_starved
¶ Evaluate whether any of the connections are starved.
This property should be used by message handlers to reliably identify when to process a batch of messages.
-
join
(timeout=None, raise_error=False)[source]¶ Block until all connections have closed and workers stopped.
-
on_auth
[source]¶ Emitted after a connection is successfully authenticated.
The signal sender is the consumer and the
conn
and parsedresponse
are sent as arguments.
-
on_error
[source]¶ Emitted when an error is received.
The signal sender is the consumer and the
error
is sent as an argument.
-
on_exception
[source]¶ Emitted when an exception is caught while handling a message.
The signal sender is the consumer and the
message
anderror
are sent as arguments.
-
on_finish
[source]¶ Emitted after a message is successfully finished.
The signal sender is the consumer and the
message_id
is sent as an argument.
-
on_giving_up
[source]¶ Emitted after a giving up on a message.
Emitted when a message has exceeded the maximum number of attempts (
max_tries
) and will no longer be requeued. This is useful to perform tasks such as writing to disk, collecting statistics etc. The signal sender is the consumer and themessage
is sent as an argument.
-
on_message
[source]¶ Emitted when a message is received.
The signal sender is the consumer and the
message
is sent as an argument. Themessage_handler
param is connected to this signal.
-
on_requeue
[source]¶ Emitted after a message is requeued.
The signal sender is the consumer and the
message_id
andtimeout
are sent as arguments.