STOMP Client
STOMPClient is used to connect to and communicate with the message broker. Here is how to create a client object and connect it to the Broker:
Using STOMP Client
Connecting to the Broker
import stompy
port = 61613
host = "host.domain.com"
client = stompy.connect((host, port))
connect() function arguments
- stompy.connect(addr_list, **args)
Creates the client object and connects it to the Broker
- Parameters
addr_list – a single broker address as tuple (ip_address, port), or a list of tuples - addresses
login (str) – login id to use, default: None
passcode (str) – pass code to use, default: None
headers (dict) – additional headers for the CONNECT frame, default: none
- Returns
STOMPlient instance connected to the Broker
- Return type
Reading messages using iterator
import stompy
port = 61613
host = "host.domain.com"
client = stompy.connect((host, port))
client.subscribe("/queue/Q")
client.subscribe("/topic/T")
for frame in client:
command = frame.Command
headers = frame.headers() # copy of the headers array
source = frame["source"] # headers can be accessed via mapping interface
option = frame.get("option", "none")
destination = frame.destination # convenience, same as frame["destination"]
body_as_bytes = frame.Body
body_as_text = frame.text # decoded with UTF-8
# ... process frame
# iterator stops when the connection closes
Reading messages using recv()
frame = client.recv()
while frame is not None:
# ... process frame
if ...:
break
frame = client.recv()
Reading messages using callback
def process_message(client, frame):
# process message received by the client
if ...:
return frame # non-False return will stop the loop()
last_frame = client.loop(callback = process_message)
The loop
method can have zero or more positional arguments. First positional arguments will be the callback
.
The remaining positional arguments will be passed as positional arguments to the callback
in addition to the client
and the frame
.
The method accepts 2 optional keyword arguments:
- transaction
str, transaction ID to associate with all the ACKs and NACKs sent automatically during the loop
- timeout
numeric, time-out used to receive individual frames. In case of time-out, STOMPTimeout exception will be raised
Additional keyword arguments can be specified. They will be passed to the callback
as is.
For example:
client.loop() # loop until disconnection without calling any callback
def my_callback_1(client, frame): # no additional arguments
...
if ...:
return True
client.loop(my_callback_1) # loop until my_callback_1 returns True
def my_callback_2(client, frame, param1, param2, param3=None): # 2 positional and 1 keyword arguments
...
if ...:
return True
client.loop(my_callback_2, param1_value, param2_value, param3="hello")
def my_callback_3(client, frame, param1, param2, param3=None): # 2 positional and 1 keyword arguments
...
if ...:
return True
client.loop(my_callback_3, param1_value, param2_value, transaction="txn", timeout=10.0, param3="hello")
The loop() method will run the client in the loop, receiving frames from the broker, calling the callback
,
if present.
The loop() will return once the callback (if any) returns something which evaluates to True or the connection closes.
The loop() will return the last value returned by the callback or None if the loop stopped due to the
disconnection.
Waiting for a receipt
def wait_for_receipt(_, frame, receipt):
# process message received by the client
return frame is not None and frame.Command == "RECEIPT" and frame["receipt-id"] == receipt
closed = client.loop(wait_for_receipt, receipt="the-receipt") == None
Sending ACKs/NACKs
client = stompy.connect((host, port))
client.subscribe("/queue/Q", send_acks = False) # disable auto-sending ACKs
for frame in client:
if frame.Command == "MESSAGE" and "ack" in frame:
if ...:
client.ack(frame["ack"])
else:
client.nack(frame["ack"])
Sending messages and other frames
client = stompy.connect((host, port))
client.send("SEND",
destination="/queue/Q",
body="Hello there", # can by bytes or str
source=str(os.getpid()) # custom header
)
client.message("/queue/Q", "Hello there", source=str(os.getpid())) # same as above
Sending messages and waiting for receipt
client = stompy.connect((host, port))
my_receipt = client.message("/queue/Q", "Hello there", receipt=True) # will generate and return receipt-id
def wait_for_receipt(_, frame, receipt):
# process message received by the client
return frame is not None and frame.Command == "RECEIPT" and frame["receipt-id"] == receipt
if client.loop(wait_for_receipt, receipt=my_receipt):
# receipt received
else:
# connection closed
Transactions
client = stompy.connect((host, port))
transaction = client.transaction()
trnsaction.message("/queue/Q", "Message part #1")
trnsaction.message("/queue/Q", "Message part #2")
receipt = transaction.commit(receipt=True)
# wait for receipt
if client.loop(wait_for_receipt, receipt=receipt):
# receipt received
else:
# connection closed
STOMPClient object methods
- class stompy.STOMPClient
STOMPClient constructor does not have any arguments.
- connect(self, addr_list, login=None, passcode=None, headers={}, **kv_headers)
Connects to a broker. On successfull connection, sets the following attributes:
client.BrokerAddress - tuple (ip_address, port) - actual address of the broker the connection was established to clint.Connected = True
- Parameters
addr_list – a single broker address as tuple (ip_address, port), or a list of tuples - addresses
login (str) – login id to use, default: None
passcode (str) – pass code to use, default: None
headers (dict) – additional headers for the CONNECT frame, default: none
kv_headers – additional headers for the CONNECT frame
- Returns
CONNECTED frame returned by the broker
- disconnect()
Send DISCONNECT frame, wait for receipt and close the connection.
- __init__()
STOMPClient constructor does not have any arguments.
- __iter__()
The client can be used as an iterator, returning next received frame on every iteration. The iteration stops when the connection closes:
client = STOMPClient() client.connect(...) for frame in client: ... # connection closed
- ack(ack_id, transaction=None)
Send ACK frame
- Parameters
ack_id (str) – NACK id to send
transaction (str or None) – transaction id to associate the ACK with, default: None
- loop(*params, transaction=None, timeout=None, **callback_args)
The method can have zero or more positional arguments. First positional arguments will be the
callback
. The remaining positional arguments will be passed as positional arguments to thecallback
in addition to client and the frame. Keyword arguments will be passed to thecallback
excepttransaction
andtimeout
.The method will run the client in the loop, receiving frames from the broker, calling the
callback
, if present. The loop() will return once the callback (if any) returns something which evaluates to True or the connection closes. The loop() will return the last value returned by the callback or None if the loop stopped due to the disconnection.- Parameters
timeout (numeric) – read time-out in seconds, or None
transaction (str) – transaction id to associate ACKs and NACKs sent during the loop, or None
- Returns
The value returned by the last call to the
callback
. If the loop stopped due to disconnecrtion, returns None
- message(destination, body=b'', id=None, headers={}, receipt=False, transaction=None, **kv_headers)
Conventience method to send a message. Uses send().
- Parameters
destination (str) – destination to send the message to
body (bytes) – message body, default - empty
id (str or None) – add message-id header, if not None
headers (dict) – headers to add to the message, default - empty
receipt (boolean or str) – if True or non-empty string, the frame will include “receipt” header. If
receipt
is a str, it will be used as is. Ifreceipt
is True, the client will generate new receipt id. Ifreceipt
is False, do not require a receipt.transaction (str) – transaction id to associate the frame with, or None
- Returns
receipt (str) if the receipt was requested (
receipt
was not False), otherwise None
- nack(ack_id, transaction=None)
Send NACK frame
- Parameters
ack_id (str) – NACK id to send
transaction (str or None) – transaction id to associate the NACK with, default: None
- recv(*params, **args)
Receive next frame. If the next frame is RECEIPT, notify those who are waiting for it and keep receiving. Return None if the connection closed. Raise STOMPError on ERROR.
- Parameters
transaction (str or None) – transaction to associate the automatically sent ACK, or None
- Returns
frame received or None, if the connection was closed
- Return type
STOMPFrame or None
- send(command, headers={}, body=b'', transaction=None, receipt=False, **kv_headers)
Send the frame. If a receipt was requested, then the frame sent by the client will incude “receipt” header and the method will return the receipt-id:
- Parameters
command (str) – frame command
headers (dict) – frame headers, default - {}
body (bytes) – frame body, default - empty body
receipt (str or boolean) – if True or non-empty string, the frame will include “receipt” header. If receipt is a str, it will be used as is. If receipt=True, the client will generate new receipt id. If receipt=False, do not require a receipt.
kv_headers – additional headers to add to the frame
- Returns
receipt (str) if the receipt was requested (
receipt
was not False), otherwise None
- subscribe(dest, ack_mode='auto', send_acks=True)
Subscribe to messages sent to the specified destination
- Parameters
dest (str) – destination
ack_mode (str) – can be either “auto” (default), “client” or “client-individual”
send_acks (boolean) – whether the client should automatically send ACKs received on this scubscription
- Returns
subscription id
- Return type
str
- transaction(txn_id=None)
Creates and begins new transaction
- Parameters
txn_id (str or None) – transaction ID or None (default), in which case a new transaction ID will be generated
- unsubscribe(sub_id)
Remove subscription
- Parameters
sub_id (str) – subscription id
STOMPTransaction object
STOMPClient.transaction()
method returns STOMPTransaction object, which has the following methods:
- class stompy.client.STOMPTransaction(client, txn_id)
- abort(receipt=None)
Aborts the transaction.
- Parameters
receipt (str or boolean) – if True or non-empty string, the frame will include “receipt” header. If receipt is a str, it will be used as is. If receipt=True, the client will generate new receipt id. If receipt=False, do not require a receipt.
- Returns
receipt (str) if the receipt was requested (
receipt
was not False), otherwise None
- ack(ack_id, transaction=None)
Sends ACK associated with the transaction
- Parameters
string – ack id
- commit(receipt=None)
Commits the transaction.
- Parameters
receipt (str or boolean) – if True or non-empty string, the frame will include “receipt” header. If receipt is a str, it will be used as is. If receipt=True, the client will generate new receipt id. If receipt=False, do not require a receipt.
- Returns
receipt (str) the receipt was requested (
receipt
was not False), otherwise None
- message(*params, **args)
Sends MESSAGE frame and associates it with the transaction. The method has same arguments as the STOMPClient.message() method.
- nack(ack_id)
Sends NACK associated with the transaction
- Parameters
string – ack id
- recv(timeout=None)
Receives next frame from the Broker. If the subscription allows sendig ACKs, the ACK will be associated with the transaction.
- send(command, **args)
Sends a STOMP frame to the broker, associating it with the transaction.
- Parameters
command (str) – frame command
Other arguments are the same as for the STOMPClient.send()
STOMPFrame object
STOMPFrame object represents a STOMP frame received from the Broker
- class stompy.frame.STOMPFrame(command=None, body=b'', headers=None, **headers_kv)
Initializes STOMP Frame object
- Parameters
command (str) – frame command
body (str, bytes) – message body
headers (dict) – dictionary with frame headers
headers_kv (keyword) – keyword arguments will be added to the headers
- property destination
Convenience accessor for the frame destination
- get(name, default=None)
Part of mapping interface to the frame headers:
value = frame.get(“header-name”, default)
- property headers
Convenience accessor, returns copy of the frame headers dictionary
- property json
Convenience accessor to interpret the frame body as a JSON object
- property text
Convenience accessor, converting the frame body to text. Uses the encoding from the content-type header or UTF-8