Contrib modules¶
Patterns and best practices for gnsq made code.
Batching messages¶
-
class
gnsq.contrib.batch.
BatchHandler
(batch_size, handle_batch=None, handle_message=None, handle_batch_error=None, handle_message_error=None, timeout=10, spawn=<bound method Greenlet.spawn of <class 'gevent._greenlet.Greenlet'>>)[source]¶ Batch message handler for gnsq.
It is recommended to use a max inflight greater than the batch size.
Example usage:
>>> consumer = Consumer('topic', 'worker', max_in_flight=16) >>> consumer.on_message.connect(BatchHandler(8, my_handler), weak=False)
-
handle_batch
(messages)[source]¶ Handle a batch message.
Processes a batch of messages. You must provide a
handle_batch()
function to the constructor or override this method.Raising an exception in
handle_batch()
will cause all messages in the batch to be requeued.
-
handle_batch_error
(error, messages, batch)[source]¶ Handle an exception processsing a batch of messages.
This may be overridden or passed into the constructor.
-
handle_message
(message)[source]¶ Handle a single message.
Over ride this to provide some processing and an individual message. The result of this function is what is passed to
handle_batch()
. This may be overridden or passed into the constructor. By default it simply returns the message.Raising an exception in
handle_message()
will cause that message to be requeued and excluded from the batch.
-
Giveup handlers¶
-
class
gnsq.contrib.giveup.
LogGiveupHandler
(log=<built-in method write of _io.TextIOWrapper object>, newline='n')[source]¶ Log messages on giveup.
Writes the message body to the log. This can be customized by subclassing and implementing
format_message()
. Assuming messages do not requeued using the to_nsq utility.Example usage:
>>> fp = open('topic.__BURY__.log', 'w') >>> consumer.on_giving_up.connect( ... LogGiveupHandler(fp.write), weak=False)
-
class
gnsq.contrib.giveup.
JSONLogGiveupHandler
(log=<built-in method write of _io.TextIOWrapper object>, newline='n')[source]¶ Log messages as json on giveup.
Works like
LogGiveupHandler
but serializes the message details as json before writing to the log.Example usage:
>>> fp = open('topic.__BURY__.log', 'w') >>> consumer.on_giving_up.connect( ... JSONLogGiveupHandler(fp.write), weak=False)
-
class
gnsq.contrib.giveup.
NsqdGiveupHandler
(topic, nsqd_hosts=['localhost'], nsqd_class=<class 'gnsq.nsqd.NsqdHTTPClient'>)[source]¶ Send messages by to nsq on giveup.
Forwards the message body to the given topic where it can be inspected and requeued. This can be customized by subclassing and implementing
format_message()
. Messages can be requeued with the nsq_to_nsq utility.Example usage:
>>> giveup_handler = NsqdGiveupHandler('topic.__BURY__') >>> consumer.on_giving_up.connect(giveup_handler)
Concurrency¶
-
class
gnsq.contrib.queue.
QueueHandler
[source]¶ Iterator like api for gnsq.
Example usage:
>>> queue = QueueHandler() >>> consumer = Consumer('topic', 'worker', max_in_flight=16) >>> consumer.on_message.connect(queue) >>> consumer.start(block=False) >>> for message in queue: ... print(message.body) ... message.finish()
Or give it to a pool:
>>> gevent.pool.Pool().map(queue, my_handler)
Parameters: maxsize – maximum number of messages that can be queued. If less than or equal to zero or None, the queue size is infinite. -
empty
¶ Return
True
if the queue is empty,False
otherwise.
-
full
¶ Return
True
if the queue is full,False
otherwise.Queue(None)
is never full.
-
get
¶ Remove and return an item from the queue.
If optional args block is true and timeout is
None
(the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises theEmpty
exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise theEmpty
exception (timeout is ignored in that case).
-
get_nowait
¶ Remove and return an item from the queue without blocking.
Only get an item if one is immediately available. Otherwise raise the
Empty
exception.
-
peek
¶ Return an item from the queue without removing it.
If optional args block is true and timeout is
None
(the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises theEmpty
exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise theEmpty
exception (timeout is ignored in that case).
-
peek_nowait
¶ Return an item from the queue without blocking.
Only return an item if one is immediately available. Otherwise raise the
Empty
exception.
-
qsize
¶ Return the size of the queue.
-
-
class
gnsq.contrib.queue.
ChannelHandler
[source]¶ Iterator like api for gnsq.
Like
QueueHandler
with amaxsize
of1
.