# -*- coding: utf-8 -*-
from __future__ import absolute_import
import json
import time
import blinker
from gevent import socket
from . import protocol as nsq
from . import errors
from .decorators import cached_property, deprecated
from .httpclient import HTTPClient, USERAGENT
from .message import Message
from .states import CONNECTED, DISCONNECTED, INIT
from .stream import Stream
HOSTNAME = socket.gethostname()
SHORTNAME = HOSTNAME.split('.')[0]
[docs]class NsqdTCPClient(object):
"""Low level object representing a TCP connection to nsqd.
:param address: the host or ip address of the nsqd
:param port: the nsqd tcp port to connect to
:param timeout: the timeout for read/write operations (in seconds)
:param client_id: an identifier used to disambiguate this client (defaults
to the first part of the hostname)
:param hostname: the hostname where the client is deployed (defaults to the
clients hostname)
:param heartbeat_interval: the amount of time in seconds to negotiate with
the connected producers to send heartbeats (requires nsqd 0.2.19+)
:param output_buffer_size: size of the buffer (in bytes) used by nsqd for
buffering writes to this connection
:param output_buffer_timeout: timeout (in ms) used by nsqd before flushing
buffered writes (set to 0 to disable). Warning: configuring clients with
an extremely low (< 25ms) output_buffer_timeout has a significant effect
on nsqd CPU usage (particularly with > 50 clients connected).
:param tls_v1: enable TLS v1 encryption (requires nsqd 0.2.22+)
:param tls_options: dictionary of options to pass to `ssl.wrap_socket()
<http://docs.python.org/2/library/ssl.html#ssl.wrap_socket>`_
:param snappy: enable Snappy stream compression (requires nsqd 0.2.23+)
:param deflate: enable deflate stream compression (requires nsqd 0.2.23+)
:param deflate_level: configure the deflate compression level for this
connection (requires nsqd 0.2.23+)
:param sample_rate: take only a sample of the messages being sent to the
client. Not setting this or setting it to 0 will ensure you get all the
messages destined for the client. Sample rate can be greater than 0 or
less than 100 and the client will receive that percentage of the message
traffic. (requires nsqd 0.2.25+)
:param auth_secret: a string passed when using nsq auth (requires
nsqd 0.2.29+)
:param user_agent: a string identifying the agent for this client in the
spirit of HTTP (default: ``<client_library_name>/<version>``) (requires
nsqd 0.2.25+)
"""
def __init__(
self,
address='127.0.0.1',
port=4150,
timeout=60.0,
client_id=None,
hostname=None,
heartbeat_interval=30,
output_buffer_size=16 * 1024,
output_buffer_timeout=250,
tls_v1=False,
tls_options=None,
snappy=False,
deflate=False,
deflate_level=6,
sample_rate=0,
auth_secret=None,
user_agent=USERAGENT,
):
self.address = address
self.port = port
self.timeout = timeout
self.client_id = client_id or SHORTNAME
self.hostname = hostname or HOSTNAME
self.heartbeat_interval = 1000 * heartbeat_interval
self.output_buffer_size = output_buffer_size
self.output_buffer_timeout = output_buffer_timeout
self.tls_v1 = tls_v1
self.tls_options = tls_options
self.snappy = snappy
self.deflate = deflate
self.deflate_level = deflate_level
self.sample_rate = sample_rate
self.auth_secret = auth_secret
self.user_agent = user_agent
self.state = INIT
self.last_response = time.time()
self.last_message = time.time()
self.ready_count = 0
self.in_flight = 0
self.max_ready_count = 2500
self._frame_handlers = {
nsq.FRAME_TYPE_RESPONSE: self.handle_response,
nsq.FRAME_TYPE_ERROR: self.handle_error,
nsq.FRAME_TYPE_MESSAGE: self.handle_message
}
[docs] @cached_property
def on_message(self):
"""Emitted when a message frame is received.
The signal sender is the connection and the ``message`` is sent as an
argument.
"""
return blinker.Signal(doc='Emitted when a message frame is received.')
[docs] @cached_property
def on_response(self):
"""Emitted when a response frame is received.
The signal sender is the connection and the ``response`` is sent as an
argument.
"""
return blinker.Signal(doc='Emitted when a response frame is received.')
[docs] @cached_property
def on_error(self):
"""Emitted when an error frame is received.
The signal sender is the connection and the ``error`` is sent as an
argument.
"""
return blinker.Signal(doc='Emitted when a error frame is received.')
[docs] @cached_property
def on_finish(self):
"""Emitted after :meth:`finish`.
Sent after a message owned by this connection is successfully finished.
The signal sender is the connection and the ``message_id`` is sent as an
argument.
"""
return blinker.Signal(doc='Emitted after the a message is finished.')
[docs] @cached_property
def on_requeue(self):
"""Emitted after :meth:`requeue`.
Sent after a message owned by this connection is requeued. The signal
sender is the connection and the ``message_id``, ``timeout`` and
``backoff`` flag are sent as arguments.
"""
return blinker.Signal(doc='Emitted after the a message is requeued.')
[docs] @cached_property
def on_auth(self):
"""Emitted after the connection is successfully authenticated.
The signal sender is the connection and the parsed ``response`` is sent
as arguments.
"""
return blinker.Signal(
doc='Emitted after the connection is successfully authenticated.')
[docs] @cached_property
def on_close(self):
"""Emitted after :meth:`close_stream`.
Sent after the connection socket has closed. The signal sender is the
connection.
"""
return blinker.Signal(doc='Emitted after the connection is closed.')
@property
def is_connected(self):
"""Check if the client is currently connected."""
return self.state == CONNECTED
@property
def is_starved(self):
"""Evaluate whether the connection is starved.
This property should be used by message handlers to reliably identify
when to process a batch of messages.
"""
return self.in_flight >= max(self.ready_count * 0.85, 1)
[docs] def connect(self):
"""Initialize connection to the nsqd."""
if self.state == DISCONNECTED:
raise errors.NSQException('connection already closed')
if self.is_connected:
return
stream = Stream(self.address, self.port, self.timeout)
stream.connect()
self.stream = stream
self.state = CONNECTED
self.send(nsq.MAGIC_V2)
[docs] def close_stream(self):
"""Close the underlying socket."""
if not self.is_connected:
return
self.stream.close()
self.state = DISCONNECTED
self.on_close.send(self)
def send(self, data):
try:
return self.stream.send(data)
except Exception:
self.close_stream()
raise
def _read_response(self):
try:
size = nsq.unpack_size(self.stream.read(4))
return self.stream.read(size)
except Exception:
self.close_stream()
raise
[docs] def read_response(self):
"""Read an individual response from nsqd.
:returns: tuple of the frame type and the processed data.
"""
response = self._read_response()
frame, data = nsq.unpack_response(response)
self.last_response = time.time()
if frame not in self._frame_handlers:
raise errors.NSQFrameError('unknown frame {}'.format(frame))
frame_handler = self._frame_handlers[frame]
processed_data = frame_handler(data)
return frame, processed_data
def handle_response(self, data):
if data == nsq.HEARTBEAT:
self.nop()
self.on_response.send(self, response=data)
return data
def handle_error(self, data):
error = errors.make_error(data)
self.on_error.send(self, error=error)
if error.fatal:
self.close_stream()
return error
def handle_message(self, data):
self.last_message = time.time()
self.in_flight += 1
message = Message(*nsq.unpack_message(data))
message.on_finish.connect(self.handle_finish)
message.on_requeue.connect(self.handle_requeue)
message.on_touch.connect(self.handle_touch)
self.on_message.send(self, message=message)
return message
def handle_finish(self, message):
self.finish(message.id)
def handle_requeue(self, message, timeout, backoff):
self.requeue(message.id, timeout, backoff)
def handle_touch(self, message):
self.touch(message.id)
def finish_inflight(self):
self.in_flight -= 1
[docs] def listen(self):
"""Listen to incoming responses until the connection closes."""
while self.is_connected:
self.read_response()
def check_ok(self, expected=nsq.OK):
frame, data = self.read_response()
if frame == nsq.FRAME_TYPE_ERROR:
raise data
if frame != nsq.FRAME_TYPE_RESPONSE:
raise errors.NSQException('expected response frame')
if data != expected:
raise errors.NSQException('unexpected response {!r}'.format(data))
def upgrade_to_tls(self):
self.stream.upgrade_to_tls(**self.tls_options)
self.check_ok()
def upgrade_to_snappy(self):
self.stream.upgrade_to_snappy()
self.check_ok()
def upgrade_to_defalte(self):
self.stream.upgrade_to_defalte(self.deflate_level)
self.check_ok()
[docs] def identify(self):
"""Update client metadata on the server and negotiate features.
:returns: nsqd response data if there was feature negotiation,
otherwise ``None``
"""
self.send(nsq.identify({
# nsqd 0.2.28+
'client_id': self.client_id,
'hostname': self.hostname,
# nsqd 0.2.19+
'feature_negotiation': True,
'heartbeat_interval': self.heartbeat_interval,
# nsqd 0.2.21+
'output_buffer_size': self.output_buffer_size,
'output_buffer_timeout': self.output_buffer_timeout,
# nsqd 0.2.22+
'tls_v1': self.tls_v1,
# nsqd 0.2.23+
'snappy': self.snappy,
'deflate': self.deflate,
'deflate_level': self.deflate_level,
# nsqd nsqd 0.2.25+
'sample_rate': self.sample_rate,
'user_agent': self.user_agent,
}))
frame, data = self.read_response()
if frame == nsq.FRAME_TYPE_ERROR:
raise data
if data == nsq.OK:
return
try:
data = json.loads(data.decode('utf-8'))
except ValueError:
self.close_stream()
raise errors.NSQException(
'failed to parse IDENTIFY response JSON from nsqd: '
'{!r}'.format(data))
self.max_ready_count = data.get('max_rdy_count', self.max_ready_count)
if self.tls_v1 and data.get('tls_v1'):
self.upgrade_to_tls()
if self.snappy and data.get('snappy'):
self.upgrade_to_snappy()
elif self.deflate and data.get('deflate'):
self.deflate_level = data.get('deflate_level', self.deflate_level)
self.upgrade_to_defalte()
if self.auth_secret and data.get('auth_required'):
self.auth()
return data
[docs] def auth(self):
"""Send authorization secret to nsqd."""
self.send(nsq.auth(self.auth_secret))
frame, data = self.read_response()
if frame == nsq.FRAME_TYPE_ERROR:
raise data
try:
response = json.loads(data.decode('utf-8'))
except ValueError:
self.close_stream()
raise errors.NSQException(
'failed to parse AUTH response JSON from nsqd: '
'{!r}'.format(data))
self.on_auth.send(self, response=response)
return response
[docs] def subscribe(self, topic, channel):
"""Subscribe to a nsq `topic` and `channel`."""
self.send(nsq.subscribe(topic, channel))
[docs] def publish(self, topic, data, defer=None):
"""Publish a message to the given topic over tcp.
:param topic: the topic to publish to
:param data: bytestring data to publish
:param defer: duration in milliseconds to defer before publishing
(requires nsq 0.3.6)
"""
if defer is None:
self.send(nsq.publish(topic, data))
else:
self.send(nsq.deferpublish(topic, data, defer))
[docs] def multipublish(self, topic, messages):
"""Publish an iterable of messages to the given topic over http.
:param topic: the topic to publish to
:param messages: iterable of bytestrings to publish
"""
self.send(nsq.multipublish(topic, messages))
[docs] def ready(self, count):
"""Indicate you are ready to receive ``count`` messages."""
self.ready_count = count
self.send(nsq.ready(count))
[docs] def finish(self, message_id):
"""Finish a message (indicate successful processing)."""
self.send(nsq.finish(message_id))
self.finish_inflight()
self.on_finish.send(self, message_id=message_id)
[docs] def requeue(self, message_id, timeout=0, backoff=True):
"""Re-queue a message (indicate failure to process)."""
self.send(nsq.requeue(message_id, timeout))
self.finish_inflight()
self.on_requeue.send(
self,
message_id=message_id,
timeout=timeout,
backoff=backoff
)
[docs] def touch(self, message_id):
"""Reset the timeout for an in-flight message."""
self.send(nsq.touch(message_id))
[docs] def close(self):
"""Indicate no more messages should be sent."""
self.send(nsq.close())
[docs] def nop(self):
"""Send no-op to nsqd. Used to keep connection alive."""
self.send(nsq.nop())
def __str__(self):
return '{}:{}'.format(self.address, self.port)
def __hash__(self):
return hash(str(self))
def __eq__(self, other):
return isinstance(other, type(self)) and str(self) == str(other)
def __cmp__(self, other):
return hash(self) - hash(other)
def __lt__(self, other):
return hash(self) < hash(other)
[docs]class NsqdHTTPClient(HTTPClient):
"""Low level http client for nsqd.
:param host: nsqd host address (default: localhost)
:param port: nsqd http port (default: 4151)
:param useragent: useragent sent to nsqd (default:
``<client_library_name>/<version>``)
:param connection_class: override the http connection class
"""
def __init__(self, host='localhost', port=4151, **kwargs):
super(NsqdHTTPClient, self).__init__(host, port, **kwargs)
[docs] def publish(self, topic, data, defer=None):
"""Publish a message to the given topic over http.
:param topic: the topic to publish to
:param data: bytestring data to publish
:param defer: duration in millisconds to defer before publishing
(requires nsq 0.3.6)
"""
nsq.assert_valid_topic_name(topic)
fields = {'topic': topic}
if defer is not None:
fields['defer'] = '{}'.format(defer)
return self._request('POST', '/pub', fields=fields, body=data)
def _validate_mpub_message(self, message):
if b'\n' not in message:
return message
raise errors.NSQException(
'newlines are not allowed in http multipublish')
[docs] def multipublish(self, topic, messages, binary=False):
"""Publish an iterable of messages to the given topic over http.
:param topic: the topic to publish to
:param messages: iterable of bytestrings to publish
:param binary: enable binary mode. defaults to False
(requires nsq 1.0.0)
By default multipublish expects messages to be delimited by ``"\\n"``,
use the binary flag to enable binary mode where the POST body is
expected to be in the following wire protocol format.
"""
nsq.assert_valid_topic_name(topic)
fields = {'topic': topic}
if binary:
fields['binary'] = 'true'
body = nsq.multipublish_body(messages)
else:
body = b'\n'.join(self._validate_mpub_message(m) for m in messages)
return self._request('POST', '/mpub', fields=fields, body=body)
[docs] def create_topic(self, topic):
"""Create a topic."""
nsq.assert_valid_topic_name(topic)
return self._request('POST', '/topic/create', fields={'topic': topic})
[docs] def delete_topic(self, topic):
"""Delete a topic."""
nsq.assert_valid_topic_name(topic)
return self._request('POST', '/topic/delete', fields={'topic': topic})
[docs] def create_channel(self, topic, channel):
"""Create a channel for an existing topic."""
nsq.assert_valid_topic_name(topic)
nsq.assert_valid_channel_name(channel)
return self._request('POST', '/channel/create',
fields={'topic': topic, 'channel': channel})
[docs] def delete_channel(self, topic, channel):
"""Delete an existing channel for an existing topic."""
nsq.assert_valid_topic_name(topic)
nsq.assert_valid_channel_name(channel)
return self._request('POST', '/channel/delete',
fields={'topic': topic, 'channel': channel})
[docs] def empty_topic(self, topic):
"""Empty all the queued messages for an existing topic."""
nsq.assert_valid_topic_name(topic)
return self._request('POST', '/topic/empty', fields={'topic': topic})
[docs] def empty_channel(self, topic, channel):
"""Empty all the queued messages for an existing channel."""
nsq.assert_valid_topic_name(topic)
nsq.assert_valid_channel_name(channel)
return self._request('POST', '/channel/empty',
fields={'topic': topic, 'channel': channel})
[docs] def pause_topic(self, topic):
"""Pause message flow to all channels on an existing topic.
Messages will queue at topic.
"""
nsq.assert_valid_topic_name(topic)
return self._request('POST', '/topic/pause', fields={'topic': topic})
[docs] def unpause_topic(self, topic):
"""Resume message flow to channels of an existing, paused, topic."""
nsq.assert_valid_topic_name(topic)
return self._request('POST', '/topic/unpause', fields={'topic': topic})
[docs] def pause_channel(self, topic, channel):
"""Pause message flow to consumers of an existing channel.
Messages will queue at channel.
"""
nsq.assert_valid_topic_name(topic)
nsq.assert_valid_channel_name(channel)
return self._request('POST', '/channel/pause',
fields={'topic': topic, 'channel': channel})
[docs] def unpause_channel(self, topic, channel):
"""Resume message flow to consumers of an existing, paused, channel."""
nsq.assert_valid_topic_name(topic)
nsq.assert_valid_channel_name(channel)
return self._request('POST', '/channel/unpause',
fields={'topic': topic, 'channel': channel})
[docs] def stats(self, topic=None, channel=None, text=False):
"""Return internal instrumented statistics.
:param topic: (optional) filter to topic
:param channel: (optional) filter to channel
:param text: return the stats as a string (default: ``False``)
"""
if text:
fields = {'format': 'text'}
else:
fields = {'format': 'json'}
if topic:
nsq.assert_valid_topic_name(topic)
fields['topic'] = topic
if channel:
nsq.assert_valid_channel_name(channel)
fields['channel'] = channel
return self._request('GET', '/stats', fields=fields)
[docs] def ping(self):
"""Monitoring endpoint.
:returns: should return ``"OK"``, otherwise raises an exception.
"""
return self._request('GET', '/ping')
[docs] def info(self):
"""Returns version information."""
return self._request('GET', '/info')
[docs]class Nsqd(object):
"""Use :class:`NsqdTCPClient` or :class:`NsqdHTTPClient` instead.
.. deprecated:: 1.0.0
"""
@deprecated
def __init__(self, address='127.0.0.1', tcp_port=4150, http_port=4151,
**kwargs):
"""Use :class:`NsqdTCPClient` or :class:`NsqdHTTPClient` instead.
.. deprecated:: 1.0.0
"""
self.address = address
self.tcp_port = tcp_port
self.http_port = http_port
self.__tcp_client = NsqdTCPClient(address, tcp_port, **kwargs)
self.__http_client = NsqdHTTPClient(address, http_port)
@property
def base_url(self):
return 'http://{}:{}/'.format(self.address, self.http_port)
[docs] @deprecated
def publish_tcp(self, topic, data, **kwargs):
"""Use :meth:`NsqdTCPClient.publish` instead.
.. deprecated:: 1.0.0
"""
return self.__tcp_client.publish(topic, data, **kwargs)
[docs] @deprecated
def publish_http(self, topic, data, **kwargs):
"""Use :meth:`NsqdHTTPClient.publish` instead.
.. deprecated:: 1.0.0
"""
self.__http_client.publish(topic, data, **kwargs)
def publish(self, topic, data, *args, **kwargs):
if self.__tcp_client.is_connected:
return self.__tcp_client.publish(topic, data, *args, **kwargs)
else:
return self.__http_client.publish(topic, data, *args, **kwargs)
[docs] @deprecated
def multipublish_tcp(self, topic, messages, **kwargs):
"""Use :meth:`NsqdTCPClient.multipublish` instead.
.. deprecated:: 1.0.0
"""
return self.__tcp_client.multipublish(topic, messages, **kwargs)
[docs] @deprecated
def multipublish_http(self, topic, messages, **kwargs):
"""Use :meth:`NsqdHTTPClient.multipublish` instead.
.. deprecated:: 1.0.0
"""
return self.__http_client.multipublish(topic, messages, **kwargs)
def multipublish(self, topic, messages, *args, **kwargs):
if self.__tcp_client.is_connected:
return self.__tcp_client.multipublish(
topic, messages, *args, **kwargs)
else:
return self.__http_client.multipublish(
topic, messages, *args, **kwargs)
def __getattr__(self, name):
for client in (self.__tcp_client, self.__http_client):
try:
return getattr(client, name)
except AttributeError:
pass
return super(Nsqd, self).__getattr__(name)
def __str__(self):
return '{}:{}'.format(self.address, self.tcp_port)
def __hash__(self):
return hash(str(self))
def __eq__(self, other):
return isinstance(other, type(self)) and str(self) == str(other)
def __cmp__(self, other):
return hash(self) - hash(other)
def __lt__(self, other):
return hash(self) < hash(other)