Reader: high-level consumer

class gnsq.Reader(topic, channel, nsqd_tcp_addresses=[], lookupd_http_addresses=[], name=None, message_handler=None, async=False, max_tries=5, max_in_flight=1, max_concurrency=0, requeue_delay=0, lookupd_poll_interval=60, lookupd_poll_jitter=0.3, low_ready_idle_timeout=10, max_backoff_duration=128, backoff_on_requeue=True, **kwargs)[source]

High level NSQ consumer.

A Reader will connect to the nsqd tcp addresses or poll the provided nsqlookupd http addresses for the configured topic and send signals to message handlers connected to the on_message signal or provided by message_handler.

Messages will automatically be finished when the message handle returns unless the readers async flag is set to True. If an exception occurs or gnsq.errors.NSQRequeueMessage is raised, the message will be requeued.

The Reader will handle backing off of failed messages up to a configurable max_interval as well as automatically reconnecting to dropped connections.

Parameters:
  • topic – specifies the desired NSQ topic
  • channel – specifies the desired NSQ channel
  • nsqd_tcp_addresses – a sequence of string addresses of the nsqd instances this reader should connect to
  • lookupd_http_addresses – a sequence of string addresses of the nsqlookupd instances this reader should query for producers of the specified topic
  • name – a string that is used for logging messages (defaults to ‘gnsq.reader.topic.channel’)
  • message_handler – the callable that will be executed for each message received
  • async – consider the message handling to be async. The message will not automatically be finished after the handler returns and must manually be called
  • max_tries – the maximum number of attempts the reader will make to process a message after which messages will be automatically discarded
  • max_in_flight – the maximum number of messages this reader will pipeline for processing. this value will be divided evenly amongst the configured/discovered nsqd producers
  • max_concurrency – the maximum number of messages that will be handled concurrently. Defaults to the number of nsqd connections. Setting max_concurrency to -1 will use the systems cpu count.
  • requeue_delay – the default delay to use when requeueing a failed message
  • lookupd_poll_interval – the amount of time in seconds between querying all of the supplied nsqlookupd instances. A random amount of time based on this value will be initially introduced in order to add jitter when multiple readers are running
  • lookupd_poll_jitter – the maximum fractional amount of jitter to add to the lookupd pool loop. This helps evenly distribute requests even if multiple consumers restart at the same time.
  • low_ready_idle_timeout – the amount of time in seconds to wait for a message from a producer when in a state where RDY counts are re-distributed (ie. max_in_flight < num_producers)
  • max_backoff_duration – the maximum time we will allow a backoff state to last in seconds. If zero, backoff wil not occur
  • backoff_on_requeue – if False, backoff will only occur on exception
  • **kwargs – passed to gnsq.Nsqd initialization
close()[source]

Immediately close all connections and stop workers.

is_running

Check if reader is currently running.

is_starved

Evaluate whether any of the connections are starved.

This property should be used by message handlers to reliably identify when to process a batch of messages.

join(timeout=None, raise_error=False)[source]

Block until all connections have closed and workers stopped.

on_auth[source]

Emitted after a connection is successfully authenticated.

The signal sender is the reader and the conn and parsed response are sent as arguments.

on_close[source]

Emitted after close().

The signal sender is the reader.

on_error[source]

Emitted when an error is received.

The signal sender is the reader and the error is sent as an argument.

on_exception[source]

Emitted when an exception is caught while handling a message.

The signal sender is the reader and the message and error are sent as arguments.

on_finish[source]

Emitted after a message is successfully finished.

The signal sender is the reader and the message_id is sent as an argument.

on_giving_up[source]

Emitted after a giving up on a message.

Emitted when a message has exceeded the maximum number of attempts (max_tries) and will no longer be requeued. This is useful to perform tasks such as writing to disk, collecting statistics etc. The signal sender is the reader and the message is sent as an argument.

on_message[source]

Emitted when a message is received.

The signal sender is the reader and the message is sent as an argument. The message_handler param is connected to this signal.

on_requeue[source]

Emitted after a message is requeued.

The signal sender is the reader and the message_id and timeout are sent as arguments.

on_response[source]

Emitted when a response is received.

The signal sender is the reader and the response is sent as an argument.

publish(topic, message)[source]

Publish a message to a random connection.

start(block=True)[source]

Start discovering and listing to connections.