Nsqd clients

class gnsq.NsqdHTTPClient(host='localhost', port=4151, **kwargs)[source]

Low level http client for nsqd.

Parameters:
  • host – nsqd host address (default: localhost)
  • port – nsqd http port (default: 4151)
  • useragent – useragent sent to nsqd (default: <client_library_name>/<version>)
  • connection_class – override the http connection class
create_channel(topic, channel)[source]

Create a channel for an existing topic.

create_topic(topic)[source]

Create a topic.

delete_channel(topic, channel)[source]

Delete an existing channel for an existing topic.

delete_topic(topic)[source]

Delete a topic.

empty_channel(topic, channel)[source]

Empty all the queued messages for an existing channel.

empty_topic(topic)[source]

Empty all the queued messages for an existing topic.

classmethod from_url(url, **kwargs)

Create a client from a url.

info()[source]

Returns version information.

multipublish(topic, messages, binary=False)[source]

Publish an iterable of messages to the given topic over http.

Parameters:
  • topic – the topic to publish to
  • messages – iterable of bytestrings to publish
  • binary – enable binary mode. defaults to False (requires nsq 1.0.0)

By default multipublish expects messages to be delimited by "\n", use the binary flag to enable binary mode where the POST body is expected to be in the following wire protocol format.

pause_channel(topic, channel)[source]

Pause message flow to consumers of an existing channel.

Messages will queue at channel.

pause_topic(topic)[source]

Pause message flow to all channels on an existing topic.

Messages will queue at topic.

ping()[source]

Monitoring endpoint.

Returns:should return "OK", otherwise raises an exception.
publish(topic, data, defer=None)[source]

Publish a message to the given topic over http.

Parameters:
  • topic – the topic to publish to
  • data – bytestring data to publish
  • defer – duration in millisconds to defer before publishing (requires nsq 0.3.6)
stats(topic=None, channel=None, text=False)[source]

Return internal instrumented statistics.

Parameters:
  • topic – (optional) filter to topic
  • channel – (optional) filter to channel
  • text – return the stats as a string (default: False)
unpause_channel(topic, channel)[source]

Resume message flow to consumers of an existing, paused, channel.

unpause_topic(topic)[source]

Resume message flow to channels of an existing, paused, topic.

class gnsq.NsqdTCPClient(address='127.0.0.1', port=4150, timeout=60.0, client_id=None, hostname=None, heartbeat_interval=30, output_buffer_size=16384, output_buffer_timeout=250, tls_v1=False, tls_options=None, snappy=False, deflate=False, deflate_level=6, sample_rate=0, auth_secret=None, user_agent='gnsq/1.0.1')[source]

Low level object representing a TCP connection to nsqd.

Parameters:
  • address – the host or ip address of the nsqd
  • port – the nsqd tcp port to connect to
  • timeout – the timeout for read/write operations (in seconds)
  • client_id – an identifier used to disambiguate this client (defaults to the first part of the hostname)
  • hostname – the hostname where the client is deployed (defaults to the clients hostname)
  • heartbeat_interval – the amount of time in seconds to negotiate with the connected producers to send heartbeats (requires nsqd 0.2.19+)
  • output_buffer_size – size of the buffer (in bytes) used by nsqd for buffering writes to this connection
  • output_buffer_timeout – timeout (in ms) used by nsqd before flushing buffered writes (set to 0 to disable). Warning: configuring clients with an extremely low (< 25ms) output_buffer_timeout has a significant effect on nsqd CPU usage (particularly with > 50 clients connected).
  • tls_v1 – enable TLS v1 encryption (requires nsqd 0.2.22+)
  • tls_options – dictionary of options to pass to ssl.wrap_socket()
  • snappy – enable Snappy stream compression (requires nsqd 0.2.23+)
  • deflate – enable deflate stream compression (requires nsqd 0.2.23+)
  • deflate_level – configure the deflate compression level for this connection (requires nsqd 0.2.23+)
  • sample_rate – take only a sample of the messages being sent to the client. Not setting this or setting it to 0 will ensure you get all the messages destined for the client. Sample rate can be greater than 0 or less than 100 and the client will receive that percentage of the message traffic. (requires nsqd 0.2.25+)
  • auth_secret – a string passed when using nsq auth (requires nsqd 0.2.29+)
  • user_agent – a string identifying the agent for this client in the spirit of HTTP (default: <client_library_name>/<version>) (requires nsqd 0.2.25+)
auth()[source]

Send authorization secret to nsqd.

close()[source]

Indicate no more messages should be sent.

close_stream()[source]

Close the underlying socket.

connect()[source]

Initialize connection to the nsqd.

finish(message_id)[source]

Finish a message (indicate successful processing).

identify()[source]

Update client metadata on the server and negotiate features.

Returns:nsqd response data if there was feature negotiation, otherwise None
is_connected

Check if the client is currently connected.

is_starved

Evaluate whether the connection is starved.

This property should be used by message handlers to reliably identify when to process a batch of messages.

listen()[source]

Listen to incoming responses until the connection closes.

multipublish(topic, messages)[source]

Publish an iterable of messages to the given topic over http.

Parameters:
  • topic – the topic to publish to
  • messages – iterable of bytestrings to publish
nop()[source]

Send no-op to nsqd. Used to keep connection alive.

on_auth[source]

Emitted after the connection is successfully authenticated.

The signal sender is the connection and the parsed response is sent as arguments.

on_close[source]

Emitted after close_stream().

Sent after the connection socket has closed. The signal sender is the connection.

on_error[source]

Emitted when an error frame is received.

The signal sender is the connection and the error is sent as an argument.

on_finish[source]

Emitted after finish().

Sent after a message owned by this connection is successfully finished. The signal sender is the connection and the message_id is sent as an argument.

on_message[source]

Emitted when a message frame is received.

The signal sender is the connection and the message is sent as an argument.

on_requeue[source]

Emitted after requeue().

Sent after a message owned by this connection is requeued. The signal sender is the connection and the message_id, timeout and backoff flag are sent as arguments.

on_response[source]

Emitted when a response frame is received.

The signal sender is the connection and the response is sent as an argument.

publish(topic, data, defer=None)[source]

Publish a message to the given topic over tcp.

Parameters:
  • topic – the topic to publish to
  • data – bytestring data to publish
  • defer – duration in milliseconds to defer before publishing (requires nsq 0.3.6)
read_response()[source]

Read an individual response from nsqd.

Returns:tuple of the frame type and the processed data.
ready(count)[source]

Indicate you are ready to receive count messages.

requeue(message_id, timeout=0, backoff=True)[source]

Re-queue a message (indicate failure to process).

subscribe(topic, channel)[source]

Subscribe to a nsq topic and channel.

touch(message_id)[source]

Reset the timeout for an in-flight message.

class gnsq.Nsqd(address='127.0.0.1', tcp_port=4150, http_port=4151, **kwargs)[source]

Use NsqdTCPClient or NsqdHTTPClient instead.

Deprecated since version 1.0.0.

multipublish_http(topic, messages, **kwargs)[source]

Use NsqdHTTPClient.multipublish() instead.

Deprecated since version 1.0.0.

multipublish_tcp(topic, messages, **kwargs)[source]

Use NsqdTCPClient.multipublish() instead.

Deprecated since version 1.0.0.

publish_http(topic, data, **kwargs)[source]

Use NsqdHTTPClient.publish() instead.

Deprecated since version 1.0.0.

publish_tcp(topic, data, **kwargs)[source]

Use NsqdTCPClient.publish() instead.

Deprecated since version 1.0.0.