Nsqd client

class gnsq.Nsqd(address='127.0.0.1', tcp_port=4150, http_port=4151, timeout=60.0, client_id=None, hostname=None, heartbeat_interval=30, output_buffer_size=16384, output_buffer_timeout=250, tls_v1=False, tls_options=None, snappy=False, deflate=False, deflate_level=6, sample_rate=0, auth_secret=None, user_agent='gnsq/0.3.0')[source]

Low level object representing a TCP or HTTP connection to nsqd.

Parameters:
  • address – the host or ip address of the nsqd
  • tcp_port – the nsqd tcp port to connect to
  • http_port – the nsqd http port to connect to
  • timeout – the timeout for read/write operations (in seconds)
  • client_id – an identifier used to disambiguate this client (defaults to the first part of the hostname)
  • hostname – the hostname where the client is deployed (defaults to the clients hostname)
  • heartbeat_interval – the amount of time in seconds to negotiate with the connected producers to send heartbeats (requires nsqd 0.2.19+)
  • output_buffer_size – size of the buffer (in bytes) used by nsqd for buffering writes to this connection
  • output_buffer_timeout – timeout (in ms) used by nsqd before flushing buffered writes (set to 0 to disable). Warning: configuring clients with an extremely low (< 25ms) output_buffer_timeout has a significant effect on nsqd CPU usage (particularly with > 50 clients connected).
  • tls_v1 – enable TLS v1 encryption (requires nsqd 0.2.22+)
  • tls_options – dictionary of options to pass to ssl.wrap_socket()
  • snappy – enable Snappy stream compression (requires nsqd 0.2.23+)
  • deflate – enable deflate stream compression (requires nsqd 0.2.23+)
  • deflate_level – configure the deflate compression level for this connection (requires nsqd 0.2.23+)
  • sample_rate – take only a sample of the messages being sent to the client. Not setting this or setting it to 0 will ensure you get all the messages destined for the client. Sample rate can be greater than 0 or less than 100 and the client will receive that percentage of the message traffic. (requires nsqd 0.2.25+)
  • auth_secret – a string passed when using nsq auth (requires nsqd 0.2.29+)
  • user_agent – a string identifying the agent for this client in the spirit of HTTP (default: <client_library_name>/<version>) (requires nsqd 0.2.25+)
auth()[source]

Send authorization secret to nsqd.

close()[source]

Indicate no more messages should be sent.

close_stream()[source]

Close the underlying socket.

connect()[source]

Initialize connection to the nsqd.

create_channel(topic, channel)[source]

Create a channel for an existing topic.

create_topic(topic)[source]

Create a topic.

delete_channel(topic, channel)[source]

Delete an existing channel for an existing topic.

delete_topic(topic)[source]

Delete a topic.

empty_channel(topic, channel)[source]

Empty all the queued messages for an existing channel.

empty_topic(topic)[source]

Empty all the queued messages for an existing topic.

finish(message_id)[source]

Finish a message (indicate successful processing).

identify()[source]

Update client metadata on the server and negotiate features.

Returns:nsqd response data if there was feature negotiation, otherwise None
info()[source]

Returns version information.

is_connected

Check if the client is currently connected.

is_starved

Evaluate whether the connection is starved.

This property should be used by message handlers to reliably identify when to process a batch of messages.

listen()[source]

Listen to incoming responses until the connection closes.

multipublish(topic, messages)[source]

Publish an iterable of messages in one roundtrip.

If connected, the messages will be sent over tcp. Otherwise it will fall back to http.

multipublish_http(topic, messages)[source]

Publish an iterable of messages to the given topic over http.

multipublish_tcp(topic, messages)[source]

Publish an iterable of messages to the given topic over tcp.

nop()[source]

Send no-op to nsqd. Used to keep connection alive.

on_auth[source]

Emitted after the connection is successfully authenticated.

The signal sender is the connection and the parsed response is sent as arguments.

on_close[source]

Emitted after close_stream().

Sent after the connection socket has closed. The signal sender is the connection.

on_error[source]

Emitted when an error frame is received.

The signal sender is the connection and the error is sent as an argument.

on_finish[source]

Emitted after finish().

Sent after a message owned by this connection is successfully finished. The signal sender is the connection and the message_id is sent as an argument.

on_message[source]

Emitted when a message frame is received.

The signal sender is the connection and the message is sent as an argument.

on_requeue[source]

Emitted after requeue().

Sent after a message owned by this connection is requeued. The signal sender is the connection and the message_id, timeout and backoff flag are sent as arguments.

on_response[source]

Emitted when a response frame is received.

The signal sender is the connection and the response is sent as an argument.

pause_channel(topic, channel)[source]

Pause message flow to all channels on an existing topic.

Messages will queue at topic.

ping()[source]

Monitoring endpoint.

Returns:should return “OK”, otherwise raises an exception.
publish(topic, data)[source]

Publish a message.

If connected, the message will be sent over tcp. Otherwise it will fall back to http.

publish_http(topic, data)[source]

Publish a message to the given topic over http.

publish_tcp(topic, data)[source]

Publish a message to the given topic over tcp.

read_response()[source]

Read an individual response from nsqd.

Returns:tuple of the frame type and the processed data.
ready(count)[source]

Indicate you are ready to receive count messages.

requeue(message_id, timeout=0, backoff=True)[source]

Re-queue a message (indicate failure to process).

stats()[source]

Return internal instrumented statistics.

subscribe(topic, channel)[source]

Subscribe to a nsq topic and channel.

touch(message_id)[source]

Reset the timeout for an in-flight message.

unpause_channel(topic, channel)[source]

Resume message flow to channels of an existing, paused, topic.