Source code for gnsq.producer

# -*- coding: utf-8 -*-
from __future__ import absolute_import, division

import logging
from collections import defaultdict, deque

import blinker
import gevent

from gevent.event import AsyncResult
from gevent.pool import Group
from gevent.queue import Queue, Empty

from . import protocol as nsq

from .backofftimer import BackoffTimer
from .decorators import cached_property
from .errors import NSQException, NSQNoConnections
from .nsqd import NsqdTCPClient
from .states import INIT, RUNNING, CLOSED
from .util import parse_nsqds


[docs]class Producer(object): """High level NSQ producer. A Producer will connect to the nsqd tcp addresses and support async publishing (``PUB`` & ``MPUB`` & ``DPUB``) of messages to `nsqd` over the TCP protocol. Example publishing a message:: from gnsq import Producer producer = Producer('localhost:4150') producer.start() producer.publish('topic', b'hello world') :param nsqd_tcp_addresses: a sequence of string addresses of the nsqd instances this consumer should connect to :param max_backoff_duration: the maximum time we will allow a backoff state to last in seconds. If zero, backoff wil not occur :param **kwargs: passed to :class:`~gnsq.NsqdTCPClient` initialization """ def __init__(self, nsqd_tcp_addresses=[], max_backoff_duration=128, **kwargs): if not nsqd_tcp_addresses: raise ValueError('must specify at least one nsqd or lookupd') self.nsqd_tcp_addresses = parse_nsqds(nsqd_tcp_addresses) self.max_backoff_duration = max_backoff_duration self.conn_kwargs = kwargs self.logger = logging.getLogger(__name__) self._state = INIT self._connections = Queue() self._connection_backoffs = defaultdict(self._create_backoff) self._response_queues = {} self._workers = Group()
[docs] @cached_property def on_response(self): """Emitted when a response is received. The signal sender is the consumer and the ` ` is sent as an argument. """ return blinker.Signal(doc='Emitted when a response is received.')
[docs] @cached_property def on_error(self): """Emitted when an error is received. The signal sender is the consumer and the ``error`` is sent as an argument. """ return blinker.Signal(doc='Emitted when a error is received.')
[docs] @cached_property def on_auth(self): """Emitted after a connection is successfully authenticated. The signal sender is the consumer and the ``conn`` and parsed ``response`` are sent as arguments. """ return blinker.Signal(doc='Emitted when a response is received.')
[docs] @cached_property def on_close(self): """Emitted after :meth:`close`. The signal sender is the consumer. """ return blinker.Signal(doc='Emitted after the consumer is closed.')
[docs] def start(self): """Start discovering and listing to connections.""" if self._state == CLOSED: raise NSQException('producer already closed') if self.is_running: self.logger.warn('producer already started') return self.logger.debug('starting producer...') self._state = RUNNING for address in self.nsqd_tcp_addresses: address, port = address.split(':') self.connect_to_nsqd(address, int(port))
[docs] def close(self): """Immediately close all connections and stop workers.""" if not self.is_running: return self._state = CLOSED self.logger.debug('closing connection(s)') while True: try: conn = self._connections.get(block=False) except Empty: break conn.close_stream() self.on_close.send(self)
[docs] def join(self, timeout=None, raise_error=False): """Block until all connections have closed and workers stopped.""" self._workers.join(timeout, raise_error)
@property def is_running(self): """Check if the producer is currently running.""" return self._state == RUNNING def connect_to_nsqd(self, address, port): if not self.is_running: return conn = NsqdTCPClient(address, port, **self.conn_kwargs) self.logger.debug('[%s] connecting...', conn) conn.on_response.connect(self.handle_response) conn.on_error.connect(self.handle_error) conn.on_auth.connect(self.handle_auth) try: conn.connect() conn.identify() except NSQException as error: self.logger.warn('[%s] connection failed (%r)', conn, error) self.handle_connection_failure(conn) return # Check if we've closed since we started if not self.is_running: self.handle_connection_failure(conn) return self.logger.info('[%s] connection successful', conn) self.handle_connection_success(conn) def _listen(self, conn): try: conn.listen() except NSQException as error: self.logger.warning('[%s] connection lost (%r)', conn, error) self.handle_connection_failure(conn) def handle_connection_success(self, conn): self._response_queues[conn] = deque() self._put_connection(conn) self._workers.spawn(self._listen, conn) self._connection_backoffs[conn].success() def handle_connection_failure(self, conn): conn.close_stream() self._clear_responses(conn, NSQException('connection closed')) if not self.is_running: return seconds = self._connection_backoffs[conn].failure().get_interval() self.logger.debug('[%s] retrying in %ss', conn, seconds) gevent.spawn_later( seconds, self.connect_to_nsqd, conn.address, conn.port) def handle_auth(self, conn, response): metadata = [] if response.get('identity'): metadata.append("Identity: %r" % response['identity']) if response.get('permission_count'): metadata.append("Permissions: %d" % response['permission_count']) if response.get('identity_url'): metadata.append(response['identity_url']) self.logger.info('[%s] AUTH accepted %s', conn, ' '.join(metadata)) self.on_auth.send(self, conn=conn, response=response) def handle_response(self, conn, response): self.logger.debug('[%s] response: %s', conn, response) if response == nsq.OK: result = self._response_queues[conn].popleft() result.set(response) self.on_response.send(self, response=response) def handle_error(self, conn, error): self.logger.debug('[%s] error: %s', conn, error) self._clear_responses(conn, error) self.on_error.send(self, error=error) def _create_backoff(self): return BackoffTimer(max_interval=self.max_backoff_duration) def _clear_responses(self, conn, error): # All relevent errors are fatal for result in self._response_queues.pop(conn, []): result.set_exception(error) def _get_connection(self, block=True, timeout=None): if not self.is_running: raise NSQException('producer not running') while True: try: conn = self._connections.get(block=block, timeout=timeout) except Empty: raise NSQNoConnections if conn.is_connected: return conn # Discard closed connections def _put_connection(self, conn): if not self.is_running: return self._connections.put(conn)
[docs] def publish(self, topic, data, defer=None, block=True, timeout=None, raise_error=True): """Publish a message to the given topic. :param topic: the topic to publish to :param data: bytestring data to publish :param defer: duration in milliseconds to defer before publishing (requires nsq 0.3.6) :param block: wait for a connection to become available before publishing the message. If block is `False` and no connections are available, :class:`~gnsq.errors.NSQNoConnections` is raised :param timeout: if timeout is a positive number, it blocks at most ``timeout`` seconds before raising :class:`~gnsq.errors.NSQNoConnections` :param raise_error: if ``True``, it blocks until a response is received from the nsqd server, and any error response is raised. Otherwise an :class:`~gevent.event.AsyncResult` is returned """ result = AsyncResult() conn = self._get_connection(block=block, timeout=timeout) try: self._response_queues[conn].append(result) conn.publish(topic, data, defer=defer) finally: self._put_connection(conn) if raise_error: return result.get() return result
[docs] def multipublish(self, topic, messages, block=True, timeout=None, raise_error=True): """Publish an iterable of messages to the given topic. :param topic: the topic to publish to :param messages: iterable of bytestrings to publish :param block: wait for a connection to become available before publishing the message. If block is `False` and no connections are available, :class:`~gnsq.errors.NSQNoConnections` is raised :param timeout: if timeout is a positive number, it blocks at most ``timeout`` seconds before raising :class:`~gnsq.errors.NSQNoConnections` :param raise_error: if ``True``, it blocks until a response is received from the nsqd server, and any error response is raised. Otherwise an :class:`~gevent.event.AsyncResult` is returned """ result = AsyncResult() conn = self._get_connection(block=block, timeout=timeout) try: self._response_queues[conn].append(result) conn.multipublish(topic, messages) finally: self._put_connection(conn) if raise_error: return result.get() return result