Contrib modules

Patterns and best practices for gnsq made code.

Batching messages

class gnsq.contrib.batch.BatchHandler(batch_size, handle_batch=None, handle_message=None, handle_batch_error=None, handle_message_error=None, timeout=10, spawn=<bound method Greenlet.spawn of <class 'gevent._greenlet.Greenlet'>>)[source]

Batch message handler for gnsq.

It is recommended to use a max inflight greater than the batch size.

Example usage:

>>> consumer = Consumer('topic', 'worker', max_in_flight=16)
>>> consumer.on_message.connect(BatchHandler(8, my_handler), weak=False)
handle_batch(messages)[source]

Handle a batch message.

Processes a batch of messages. You must provide a handle_batch() function to the constructor or override this method.

Raising an exception in handle_batch() will cause all messages in the batch to be requeued.

handle_batch_error(error, messages, batch)[source]

Handle an exception processsing a batch of messages.

This may be overridden or passed into the constructor.

handle_message(message)[source]

Handle a single message.

Over ride this to provide some processing and an individual message. The result of this function is what is passed to handle_batch(). This may be overridden or passed into the constructor. By default it simply returns the message.

Raising an exception in handle_message() will cause that message to be requeued and excluded from the batch.

handle_message_error(error, message)[source]

Handle an exception processesing an individual message.

This may be overridden or passed into the constructor.

Giveup handlers

class gnsq.contrib.giveup.LogGiveupHandler(log=<built-in method write of _io.TextIOWrapper object>, newline='n')[source]

Log messages on giveup.

Writes the message body to the log. This can be customized by subclassing and implementing format_message(). Assuming messages do not requeued using the to_nsq utility.

Example usage:

>>> fp = open('topic.__BURY__.log', 'w')
>>> consumer.on_giving_up.connect(
...     LogGiveupHandler(fp.write), weak=False)
class gnsq.contrib.giveup.JSONLogGiveupHandler(log=<built-in method write of _io.TextIOWrapper object>, newline='n')[source]

Log messages as json on giveup.

Works like LogGiveupHandler but serializes the message details as json before writing to the log.

Example usage:

>>> fp = open('topic.__BURY__.log', 'w')
>>> consumer.on_giving_up.connect(
...     JSONLogGiveupHandler(fp.write), weak=False)
class gnsq.contrib.giveup.NsqdGiveupHandler(topic, nsqd_hosts=['localhost'], nsqd_class=<class 'gnsq.nsqd.NsqdHTTPClient'>)[source]

Send messages by to nsq on giveup.

Forwards the message body to the given topic where it can be inspected and requeued. This can be customized by subclassing and implementing format_message(). Messages can be requeued with the nsq_to_nsq utility.

Example usage:

>>> giveup_handler = NsqdGiveupHandler('topic.__BURY__')
>>> consumer.on_giving_up.connect(giveup_handler)

Concurrency

class gnsq.contrib.queue.QueueHandler[source]

Iterator like api for gnsq.

Example usage:

>>> queue = QueueHandler()
>>> consumer = Consumer('topic', 'worker', max_in_flight=16)
>>> consumer.on_message.connect(queue)
>>> consumer.start(block=False)
>>> for message in queue:
...     print(message.body)
...     message.finish()

Or give it to a pool:

>>> gevent.pool.Pool().map(queue, my_handler)
Parameters:maxsize – maximum number of messages that can be queued. If less than or equal to zero or None, the queue size is infinite.
empty

Return True if the queue is empty, False otherwise.

full

Return True if the queue is full, False otherwise.

Queue(None) is never full.

get

Remove and return an item from the queue.

If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

get_nowait

Remove and return an item from the queue without blocking.

Only get an item if one is immediately available. Otherwise raise the Empty exception.

peek

Return an item from the queue without removing it.

If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

peek_nowait

Return an item from the queue without blocking.

Only return an item if one is immediately available. Otherwise raise the Empty exception.

qsize

Return the size of the queue.

class gnsq.contrib.queue.ChannelHandler[source]

Iterator like api for gnsq.

Like QueueHandler with a maxsize of 1.

Error logging

class gnsq.contrib.sentry.SentryExceptionHandler(client)[source]

Log gnsq exceptions to sentry.

Example usage:

>>> from raven import Sentry
>>> sentry = Sentry()
>>> consumer.on_exception.connect(
...     SentryExceptionHandler(sentry), weak=False)