Consumer: high-level message reader

class gnsq.Consumer(topic, channel, nsqd_tcp_addresses=[], lookupd_http_addresses=[], name=None, message_handler=None, max_tries=5, max_in_flight=1, requeue_delay=0, lookupd_poll_interval=60, lookupd_poll_jitter=0.3, low_ready_idle_timeout=10, max_backoff_duration=128, backoff_on_requeue=True, **kwargs)[source]

High level NSQ consumer.

A Consumer will connect to the nsqd tcp addresses or poll the provided nsqlookupd http addresses for the configured topic and send signals to message handlers connected to the on_message signal or provided by message_handler.

Messages will automatically be finished when the message handle returns unless message.enable_async() is called. If an exception occurs or NSQRequeueMessage is raised, the message will be requeued.

The Consumer will handle backing off of failed messages up to a configurable max_interval as well as automatically reconnecting to dropped connections.

Example usage:

from gnsq import Consumer

consumer = gnsq.Consumer('topic', 'channel', 'localhost:4150')

@consumer.on_message.connect
def handler(consumer, message):
    print 'got message:', message.body

consumer.start()
Parameters:
  • topic – specifies the desired NSQ topic
  • channel – specifies the desired NSQ channel
  • nsqd_tcp_addresses – a sequence of string addresses of the nsqd instances this consumer should connect to
  • lookupd_http_addresses – a sequence of string addresses of the nsqlookupd instances this consumer should query for producers of the specified topic
  • name – a string that is used for logging messages (defaults to 'gnsq.consumer.{topic}.{channel}')
  • message_handler – the callable that will be executed for each message received
  • max_tries – the maximum number of attempts the consumer will make to process a message after which messages will be automatically discarded
  • max_in_flight – the maximum number of messages this consumer will pipeline for processing. this value will be divided evenly amongst the configured/discovered nsqd producers
  • requeue_delay – the default delay to use when requeueing a failed message
  • lookupd_poll_interval – the amount of time in seconds between querying all of the supplied nsqlookupd instances. A random amount of time based on this value will be initially introduced in order to add jitter when multiple consumers are running
  • lookupd_poll_jitter – the maximum fractional amount of jitter to add to the lookupd poll loop. This helps evenly distribute requests even if multiple consumers restart at the same time.
  • low_ready_idle_timeout – the amount of time in seconds to wait for a message from a producer when in a state where RDY counts are re-distributed (ie. max_in_flight < num_producers)
  • max_backoff_duration – the maximum time we will allow a backoff state to last in seconds. If zero, backoff wil not occur
  • backoff_on_requeue – if False, backoff will only occur on exception
  • **kwargs

    passed to NsqdTCPClient initialization

close()[source]

Immediately close all connections and stop workers.

is_running

Check if consumer is currently running.

is_starved

Evaluate whether any of the connections are starved.

This property should be used by message handlers to reliably identify when to process a batch of messages.

join(timeout=None, raise_error=False)[source]

Block until all connections have closed and workers stopped.

on_auth[source]

Emitted after a connection is successfully authenticated.

The signal sender is the consumer and the conn and parsed response are sent as arguments.

on_close[source]

Emitted after close().

The signal sender is the consumer.

on_error[source]

Emitted when an error is received.

The signal sender is the consumer and the error is sent as an argument.

on_exception[source]

Emitted when an exception is caught while handling a message.

The signal sender is the consumer and the message and error are sent as arguments.

on_finish[source]

Emitted after a message is successfully finished.

The signal sender is the consumer and the message_id is sent as an argument.

on_giving_up[source]

Emitted after a giving up on a message.

Emitted when a message has exceeded the maximum number of attempts (max_tries) and will no longer be requeued. This is useful to perform tasks such as writing to disk, collecting statistics etc. The signal sender is the consumer and the message is sent as an argument.

on_message[source]

Emitted when a message is received.

The signal sender is the consumer and the message is sent as an argument. The message_handler param is connected to this signal.

on_requeue[source]

Emitted after a message is requeued.

The signal sender is the consumer and the message_id and timeout are sent as arguments.

on_response[source]

Emitted when a response is received.

The signal sender is the consumer and the response is sent as an argument.

start(block=True)[source]

Start discovering and listing to connections.