# -*- coding: utf-8 -*-
import logging
import warnings
import gevent.queue
import gevent.pool
from gnsq.errors import NSQException
TIMEOUT_WARNING = 'batching timed out. batch size may be to large'
STARVED_WARNING = 'consumer is starved. batch size may be to large'
[docs]class BatchHandler(object):
"""Batch message handler for gnsq.
It is recommended to use a max inflight greater than the batch size.
Example usage::
>>> consumer = Consumer('topic', 'worker', max_in_flight=16)
>>> consumer.on_message.connect(BatchHandler(8, my_handler), weak=False)
"""
def __init__(self, batch_size, handle_batch=None, handle_message=None,
handle_batch_error=None, handle_message_error=None,
timeout=10, spawn=gevent.spawn):
self.logger = logging.getLogger(__name__)
self.message_channel = gevent.queue.Channel()
self.batch_size = batch_size
self.timeout = timeout
if isinstance(spawn, int):
spawn = gevent.pool.Pool(spawn).spawn
self.spawn = spawn
if handle_batch is not None:
self.handle_batch = handle_batch
if handle_message is not None:
self.handle_message = handle_message
if handle_batch_error is not None:
self.handle_batch_error = handle_batch_error
if handle_message_error is not None:
self.handle_message_error = handle_message_error
self.worker = gevent.spawn(self._run)
def __call__(self, consumer, message):
message.enable_async()
self.message_channel.put(message)
if consumer.is_starved:
self.message_channel.put(StopIteration)
def _run(self):
while True:
messages = []
while len(messages) < self.batch_size:
try:
message = self.message_channel.get(timeout=self.timeout)
except gevent.queue.Empty:
warnings.warn(TIMEOUT_WARNING, RuntimeWarning)
break
if message is StopIteration:
warnings.warn(STARVED_WARNING, RuntimeWarning)
break
messages.append(message)
if messages:
self.spawn(self.run_batch, messages)
def finish_message(self, message):
try:
message.finish()
except NSQException as error:
self.logger.warning('error finishing message (%r)', error)
def finish_messages(self, messages):
for message in messages:
if message.has_responded():
continue
self.finish_message(message)
def requeue_message(self, message):
try:
message.requeue()
except NSQException as error:
self.logger.warning('error requeueing message (%r)', error)
def requeue_messages(self, messages):
for message in messages:
if message.has_responded():
continue
self.requeue_message(message)
def run_batch(self, messages):
batch = []
for message in messages:
try:
batch.append(self.handle_message(message))
except Exception as error:
self.logger.exception('caught exception while handling message')
self.handle_message_error(error, message)
self.requeue_message(message)
if batch:
try:
self.handle_batch(batch)
except Exception as error:
self.logger.exception('caught exception while handling batch')
self.handle_batch_error(error, messages, batch)
self.requeue_messages(messages)
return
self.finish_messages(messages)
[docs] def handle_message(self, message):
"""Handle a single message.
Over ride this to provide some processing and an individual message.
The result of this function is what is passed to :meth:`handle_batch`.
This may be overridden or passed into the constructor. By default it
simply returns the message.
Raising an exception in :meth:`handle_message` will cause that message
to be requeued and excluded from the batch.
"""
return message
[docs] def handle_batch(self, messages):
"""Handle a batch message.
Processes a batch of messages. You must provide a :meth:`handle_batch`
function to the constructor or override this method.
Raising an exception in :meth:`handle_batch` will cause all messages in
the batch to be requeued.
"""
raise RuntimeError('handle_message must be overridden')
[docs] def handle_message_error(self, error, message):
"""Handle an exception processesing an individual message.
This may be overridden or passed into the constructor.
"""
pass
[docs] def handle_batch_error(self, error, messages, batch):
"""Handle an exception processsing a batch of messages.
This may be overridden or passed into the constructor.
"""
pass