Communication
This chapter describes the modules for the asynchronous communication framework.
Pluggable Notifier
The htf.communication.PluggableNotifier
is an observer that lets subjects register tuples of name
, filterExpression
and a callback
.
When data is input all registered subjects are used the following way.
First the filterExpression
is called with the data as a parameter.
If True
is returned the callback is called with the data as a parameter.
With the htf.communication.PluggableNotifier
you can create an asynchronous communication framework easily.
- class htf.communication.PluggableNotifier(verbosity: int = 0, raise_exceptions: bool = False, filter_exception_handler: Callback | None = None, callback_exception_handler: Callback | None = None)
PluggableNotifier
implements an observer pattern. Subjects can register tuples consisting of executable filter expressions and callbacks to be notified about events.There is a method to enqueue new items and a thread to apply filters on items. Communication between method and thread is done via a queue.
PluggableNotifier
extendsthreading.Thread
.- Parameters:
verbosity – if set to 1 debug messages are printed to stdout
raise_exceptions – if set to
True
exceptions are raised if a filter expression or a callback raises an exception. This may be used for debugging. In case of an exceptionstop
is called so no further items can be put in.filter_exception_handler – a callback that is called to handle a raised exception in a filter expression. If the
filter_exception_handler
returnsTrue
the exception is not raised afterwards. Else the exception handler raises the catched exception and stopps the thread. The supplied handler musst acceptname
anditem
arguments.name
is the callback name supplied byregister
anditem
is the supplied item frominput
.callback_exception_handler – a callback that is called to handle a raised exception in a callback. If the
callback_exception_handler
returnsTrue
the exception is not raised afterwards. Else the exception handler raises the catched exception and stopps the thread. The supplied handler musst acceptname
anditem
arguments.name
is the callback name supplied byregister
anditem
is the supplied item frominput
.
- input(item: Any) None
Input an
item
to be used with all registered subjects.- Parameters:
item – an item to be processed.
- Raises:
RuntimeError – if
stop
was called before –
- join(timeout: float | None = 1.0) None
Wait until the thread ends.
- Parameters:
timeout – the timeout in seconds. Default: 1.0 seconds.
- register(name: str, filter_expression: Callback, callback: Callback) None
Register a tuple of name, filter_expression and a callback.
- Parameters:
name – a name for the tuple (for debugging, see
verbosity
).filter_expression – a callable filter expression (see
htf.filters
for more information) or a method that is called.filter_expression
is called with theitem
frominput
as a parameter. Iffilter_expression
returns True the callback is called with the same item.callback – a callable object that is called in case
filter_expression(item)
returnsTrue
.
- run() None
Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
Notification Callbacks
Notification callbacks can be used in htf.communication.PluggableNotifier
as the
callback.
For example you can enqueue an item into a queue the callee just created to asynchronously receive data.
Example:
from htf.communication import PluggableNotifier, enqueueItem
import queue # or Queue in Python 2
pn = PluggableNotifier()
q = queue.Queue()
pn.register("subject", filterExpression=lambda *args,
**kwargs: kwargs["item"]=="example item", callback=enqueueItem(q))
pn.input("example item") # is passed
item = q.get(block=True, timeout=1)
print(item) # "example item"
pn.input("another item") # is filtered out
item = q.get(block=True, timeout=1) # raises Queue.Empty
- htf.communication.notification_callbacks.enqueue_item(queue: Queue) Callable[[str, Any], None]
Enqueues
item
intoqueue
. This way asynchronous communication can be set up easily.- Parameters:
queue (queue.Queue) – a queue to be used.
- Returns:
an anonymous method that can be called with an
item
that is put intoqueue
.- Return type:
callable
SSH Client
With the SSH Client you can run commands remotely via SSH.
- class htf.communication.SSHClient
SSHClient
executes commands remotely using ssh.- connect(hostname: str, username: str | None = None, password: str | None = None) None
Establish an ssh connection.
- Parameters:
hostname – the hostname or ip address
username=None – The username to be used. If set to
None
the current username is used instead of using the ssh keys in your system.password=None – The password for
username
. If set toNone
the password forusername
or the current username is used.
SFTP Client
With the SFTP Client it is possible to do file operations remotely via SSH.
- class htf.communication.SFTPClient
SFTPClient
lets you access files via sftp protocol.- connect(hostname: str, username: str | None = None, password: str | None = None) None
Establish an sftp connection.
- Parameters:
hostname – the hostname or ip address
username=None – The username to be used. If set to
None
the current username is used instead of using the ssh keys in your system.password=None – The password for
username
. If set toNone
the password forusername
or the current username is used.
- get(remotepath: str, localpath: str) None
Copies file identified by
remotepath
tolocalpath
using sftp protocol.- Parameters:
remotepath – the remote file to be copied
localpath – the local path where the field is copied to
- put(localpath: str, remotepath: str) SFTPAttributes
Copies file identified by
localpath
toremotepath
using sftp protocol.- Parameters:
localpath – the local path to be copied
remotepath – the remote file where the field is copied to
- Returns:
the result of os.stat
- Return type:
SFTPAttributes
- remove(remotepath: str) None
Removes a file identified by
remotepath
.- Parameters:
remotepath – the path for the file to be removed
- Raises:
IOError – if the path is not a file
SLIP — Serial Line Internet Protocol
SLIP contains a mixin to be used to implement SLIP-based communications.
If you don’t know about SLIP you should first read RFC 1055.
The htf.communication.SlipMixin
contains two methods for encoding
and decoding SLIP-encoded communication frames.
- class htf.communication.SlipMixin
The
SlipMixin
helps to encode and decode frames transferred in the Serial Line Internet Protocol.It cannot be used standalone and must be mixed into a class that implements the
htf.communication.SlipInterface
.- slip_receive_frame(timeouts: int | None = None, max_length: int = 1024) bytes | None
Receive a frame encoded in the Serial Line Internet Protocol and return the decoded frame.
- Parameters:
timeouts=None – the number of timeouts until None is returned. The timeout depends on the mixing class and underlying communication system.
max_length=1024 – maximum number of bytes that will be received, not including ESC. Prevents the method from perpetually blocking.
- Returns:
- the decoded frame if a frame was received or
None
if no frame was received.
- Return type:
bytes or None
A class can subclass htf.communication.SlipMixin
and must implement
the htf.communication.SlipInterface
.
- class htf.communication.SlipInterface
In the following example one-way SLIP communication is established with a client and a server.
SLIP-Client:
from htf.communication import SlipMixin
import serial
class SlipClient(SlipMixin):
def __init__(self, comport):
self._serial = serial.Serial(comport, 115200,
timeout=1.0, rtscts=False)
def send_byte(self, byte):
self._serial.write(byte)
def receive_byte(self):
return self._serial.read(1)
def send(self, message):
if not isinstance(message, bytes):
message = message.encode()
self.slip_send_frame(message)
if __name__ == "__main__":
s = SlipClient("/dev/ttyUSB1")
message = "Hello World!"
s.send(message)
SLIP-Server:
from htf.communication import SlipMixin
import serial
class SlipServer(SlipMixin):
def __init__(self, comport):
self._serial = serial.Serial(comport, 115200,
timeout=1.0, rtscts=False)
def send_byte(self, byte):
self._serial.write(byte)
def receive_byte(self):
return self._serial.read(1)
def send(self, message):
if not isinstance(message, bytes):
message = message.encode()
self.slip_send_frame(message)
def receive(self):
message = self.slip_receive_frame(timeouts=3)
print("Received message:", message)
if __name__ == "__main__":
s = SlipServer("/dev/ttyUSB2")
while True:
s.receive()
Changelog
htf-communication-4.0.5
add support for Python 3.13
htf-communication-4.0.4
add requirements for validation purposes
htf-communication-4.0.3
add build environment data for validation purposes
htf-communication-4.0.2
htf-communication
can now be used standalone
htf-communication-4.0.1
add support for Python 3.12
htf-communication-4.0.0
remove all camel-case methods
add type-hints
use
hlm-3.1
use
oser-3.1
htf-communication-3.0.1
add support for Python 3.11
htf-communication-3.0.0
extract
htf-communication
fromhtf