# -*- coding: utf-8 -*-
import itertools
import json
import sys
import gnsq
[docs]class LogGiveupHandler(object):
"""Log messages on giveup.
Writes the message body to the log. This can be customized by subclassing
and implementing :meth:`format_message`. Assuming messages do not requeued
using the `to_nsq` utility.
Example usage::
>>> fp = open('topic.__BURY__.log', 'w')
>>> consumer.on_giving_up.connect(
... LogGiveupHandler(fp.write), weak=False)
"""
def __init__(self, log=sys.stdout.write, newline='\n'):
self.log = log
self.newline = newline
def format_message(self, message):
return message.body
def __call__(self, consumer, message):
self.log(self.format_message(message) + self.newline)
[docs]class JSONLogGiveupHandler(LogGiveupHandler):
"""Log messages as json on giveup.
Works like :class:`LogGiveupHandler` but serializes the message details as
json before writing to the log.
Example usage::
>>> fp = open('topic.__BURY__.log', 'w')
>>> consumer.on_giving_up.connect(
... JSONLogGiveupHandler(fp.write), weak=False)
"""
def format_message(self, message):
return json.dumps({
'timestamp': message.timestamp,
'attempts': message.attempts,
'id': message.id,
'body': message.body,
})
[docs]class NsqdGiveupHandler(object):
"""Send messages by to nsq on giveup.
Forwards the message body to the given topic where it can be inspected and
requeued. This can be customized by subclassing and implementing
:meth:`format_message`. Messages can be requeued with the `nsq_to_nsq`
utility.
Example usage::
>>> giveup_handler = NsqdGiveupHandler('topic.__BURY__')
>>> consumer.on_giving_up.connect(giveup_handler)
"""
def __init__(self, topic, nsqd_hosts=['localhost'],
nsqd_class=gnsq.NsqdHTTPClient):
if not nsqd_hosts:
raise ValueError('at least one nsqd host is required')
self.topic = topic
self.nsqds = itertools.cycle([nsqd_class(host) for host in nsqd_hosts])
def format_message(self, message):
return message.body
def __call__(self, consumer, message):
nsq = self.nsqds.next()
nsq.publish(self.topic, self.format_message(message))