Communication
This chapter describes the modules for the asynchronous communication framework.
Pluggable Notifier
The htf.communication.PluggableNotifier
is an observer that lets subjects register tuples of name, filterExpression
and a callback.
When data is input all registered subjects are used the following way.
First the filterExpression is called with the data as a parameter.
If True is returned the callback is called with the data as a parameter.
With the htf.communication.PluggableNotifier
you can create an asynchronous communication framework easily.
- class htf.communication.PluggableNotifier(verbosity: int = 0, raise_exceptions: bool = False, filter_exception_handler: Callback | None = None, callback_exception_handler: Callback | None = None)
PluggableNotifierimplements an observer pattern. Subjects can register tuples consisting of executable filter expressions and callbacks to be notified about events.There is a method to enqueue new items and a thread to apply filters on items. Communication between method and thread is done via a queue.
PluggableNotifierextendsthreading.Thread.- Parameters:
verbosity – if set to 1 debug messages are printed to stdout
raise_exceptions – if set to
Trueexceptions are raised if a filter expression or a callback raises an exception. This may be used for debugging. In case of an exceptionstopis called so no further items can be put in.filter_exception_handler – a callback that is called to handle a raised exception in a filter expression. If the
filter_exception_handlerreturnsTruethe exception is not raised afterwards. Else the exception handler raises the catched exception and stopps the thread. The supplied handler musst acceptnameanditemarguments.nameis the callback name supplied byregisteranditemis the supplied item frominput.callback_exception_handler – a callback that is called to handle a raised exception in a callback. If the
callback_exception_handlerreturnsTruethe exception is not raised afterwards. Else the exception handler raises the catched exception and stopps the thread. The supplied handler musst acceptnameanditemarguments.nameis the callback name supplied byregisteranditemis the supplied item frominput.
- input(item: Any) None
Input an
itemto be used with all registered subjects.- Parameters:
item – an item to be processed.
- Raises:
RuntimeError – if
stopwas called before –
- join(timeout: float | None = 1.0) None
Wait until the thread ends.
- Parameters:
timeout – the timeout in seconds. Default: 1.0 seconds.
- register(name: str, filter_expression: Callback, callback: Callback) None
Register a tuple of name, filter_expression and a callback.
- Parameters:
name – a name for the tuple (for debugging, see
verbosity).filter_expression – a callable filter expression (see
htf.filtersfor more information) or a method that is called.filter_expressionis called with theitemfrominputas a parameter. Iffilter_expressionreturns True the callback is called with the same item.callback – a callable object that is called in case
filter_expression(item)returnsTrue.
- run() None
Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
Notification Callbacks
Notification callbacks can be used in htf.communication.PluggableNotifier as the
callback.
For example you can enqueue an item into a queue the callee just created to asynchronously receive data.
Example:
from htf.communication import PluggableNotifier, enqueueItem
import queue # or Queue in Python 2
pn = PluggableNotifier()
q = queue.Queue()
pn.register("subject", filterExpression=lambda *args,
**kwargs: kwargs["item"]=="example item", callback=enqueueItem(q))
pn.input("example item") # is passed
item = q.get(block=True, timeout=1)
print(item) # "example item"
pn.input("another item") # is filtered out
item = q.get(block=True, timeout=1) # raises Queue.Empty
- htf.communication.notification_callbacks.enqueue_item(queue: Queue) Callable[[str, Any], None]
Enqueues
itemintoqueue. This way asynchronous communication can be set up easily.- Parameters:
queue (queue.Queue) – a queue to be used.
- Returns:
an anonymous method that can be called with an
itemthat is put intoqueue.- Return type:
callable
SSH Client
With the SSH Client you can run commands remotely via SSH.
- class htf.communication.SSHClient
SSHClientexecutes commands remotely using ssh.- connect(hostname: str, username: str | None = None, password: str | None = None) None
Establish an ssh connection.
- Parameters:
hostname – the hostname or ip address
username=None – The username to be used. If set to
Nonethe current username is used instead of using the ssh keys in your system.password=None – The password for
username. If set toNonethe password forusernameor the current username is used.
SFTP Client
With the SFTP Client it is possible to do file operations remotely via SSH.
- class htf.communication.SFTPClient
SFTPClientlets you access files via sftp protocol.- connect(hostname: str, username: str | None = None, password: str | None = None) None
Establish an sftp connection.
- Parameters:
hostname – the hostname or ip address
username=None – The username to be used. If set to
Nonethe current username is used instead of using the ssh keys in your system.password=None – The password for
username. If set toNonethe password forusernameor the current username is used.
- get(remotepath: str, localpath: str) None
Copies file identified by
remotepathtolocalpathusing sftp protocol.- Parameters:
remotepath – the remote file to be copied
localpath – the local path where the field is copied to
- put(localpath: str, remotepath: str) SFTPAttributes
Copies file identified by
localpathtoremotepathusing sftp protocol.- Parameters:
localpath – the local path to be copied
remotepath – the remote file where the field is copied to
- Returns:
the result of os.stat
- Return type:
SFTPAttributes
- remove(remotepath: str) None
Removes a file identified by
remotepath.- Parameters:
remotepath – the path for the file to be removed
- Raises:
IOError – if the path is not a file
SLIP — Serial Line Internet Protocol
SLIP contains a mixin to be used to implement SLIP-based communications.
If you don’t know about SLIP you should first read RFC 1055.
The htf.communication.SlipMixin contains two methods for encoding
and decoding SLIP-encoded communication frames.
- class htf.communication.SlipMixin
The
SlipMixinhelps to encode and decode frames transferred in the Serial Line Internet Protocol.It cannot be used standalone and must be mixed into a class that implements the
htf.communication.SlipInterface.- slip_receive_frame(timeouts: int | None = None, max_length: int = 1024) bytes | None
Receive a frame encoded in the Serial Line Internet Protocol and return the decoded frame.
- Parameters:
timeouts=None – the number of timeouts until None is returned. The timeout depends on the mixing class and underlying communication system.
max_length=1024 – maximum number of bytes that will be received, not including ESC. Prevents the method from perpetually blocking.
- Returns:
- the decoded frame if a frame was received or
Noneif no frame was received.
- Return type:
bytes or None
A class can subclass htf.communication.SlipMixin and must implement
the htf.communication.SlipInterface.
- class htf.communication.SlipInterface
In the following example one-way SLIP communication is established with a client and a server.
SLIP-Client:
from htf.communication import SlipMixin
import serial
class SlipClient(SlipMixin):
def __init__(self, comport):
self._serial = serial.Serial(comport, 115200,
timeout=1.0, rtscts=False)
def send_byte(self, byte):
self._serial.write(byte)
def receive_byte(self):
return self._serial.read(1)
def send(self, message):
if not isinstance(message, bytes):
message = message.encode()
self.slip_send_frame(message)
if __name__ == "__main__":
s = SlipClient("/dev/ttyUSB1")
message = "Hello World!"
s.send(message)
SLIP-Server:
from htf.communication import SlipMixin
import serial
class SlipServer(SlipMixin):
def __init__(self, comport):
self._serial = serial.Serial(comport, 115200,
timeout=1.0, rtscts=False)
def send_byte(self, byte):
self._serial.write(byte)
def receive_byte(self):
return self._serial.read(1)
def send(self, message):
if not isinstance(message, bytes):
message = message.encode()
self.slip_send_frame(message)
def receive(self):
message = self.slip_receive_frame(timeouts=3)
print("Received message:", message)
if __name__ == "__main__":
s = SlipServer("/dev/ttyUSB2")
while True:
s.receive()
Changelog
htf-communication-4.0.5
add support for Python 3.13
htf-communication-4.0.4
add requirements for validation purposes
htf-communication-4.0.3
add build environment data for validation purposes
htf-communication-4.0.2
htf-communicationcan now be used standalone
htf-communication-4.0.1
add support for Python 3.12
htf-communication-4.0.0
remove all camel-case methods
add type-hints
use
hlm-3.1use
oser-3.1
htf-communication-3.0.1
add support for Python 3.11
htf-communication-3.0.0
extract
htf-communicationfromhtf