Communication

This chapter describes the modules for the asynchronous communication framework.

Pluggable Notifier

The htf.communication.PluggableNotifier is an observer that lets subjects register tuples of name, filterExpression and a callback.

When data is input all registered subjects are used the following way. First the filterExpression is called with the data as a parameter. If True is returned the callback is called with the data as a parameter.

With the htf.communication.PluggableNotifier you can create an asynchronous communication framework easily.

class htf.communication.PluggableNotifier(verbosity: int = 0, raise_exceptions: bool = False, filter_exception_handler: Callback | None = None, callback_exception_handler: Callback | None = None)

PluggableNotifier implements an observer pattern. Subjects can register tuples consisting of executable filter expressions and callbacks to be notified about events.

There is a method to enqueue new items and a thread to apply filters on items. Communication between method and thread is done via a queue.

PluggableNotifier extends threading.Thread.

Parameters:
  • verbosity – if set to 1 debug messages are printed to stdout

  • raise_exceptions – if set to True exceptions are raised if a filter expression or a callback raises an exception. This may be used for debugging. In case of an exception stop is called so no further items can be put in.

  • filter_exception_handler – a callback that is called to handle a raised exception in a filter expression. If the filter_exception_handler returns True the exception is not raised afterwards. Else the exception handler raises the catched exception and stopps the thread. The supplied handler musst accept name and item arguments. name is the callback name supplied by register and item is the supplied item from input.

  • callback_exception_handler – a callback that is called to handle a raised exception in a callback. If the callback_exception_handler returns True the exception is not raised afterwards. Else the exception handler raises the catched exception and stopps the thread. The supplied handler musst accept name and item arguments. name is the callback name supplied by register and item is the supplied item from input.

get_filter_names() List[str]

Get the filter names.

Returns:

a sorted list containing the filter names.

Return type:

list of str

input(item: Any) None

Input an item to be used with all registered subjects.

Parameters:

item – an item to be processed.

Raises:
join(timeout: float | None = 1.0) None

Wait until the thread ends.

Parameters:

timeout – the timeout in seconds. Default: 1.0 seconds.

register(name: str, filter_expression: Callback, callback: Callback) None

Register a tuple of name, filter_expression and a callback.

Parameters:
  • name – a name for the tuple (for debugging, see verbosity).

  • filter_expression – a callable filter expression (see htf.filters for more information) or a method that is called. filter_expression is called with the item from input as a parameter. If filter_expression returns True the callback is called with the same item.

  • callback – a callable object that is called in case filter_expression(item) returns True.

run() None

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

stop() None

Stop the thread.

unregister(name: str) None

Unregister a callback with a given name.

Parameters:

name – the name of the callback to be unregistered.

Raises:

KeyError – if name was not found in notifications.

Notification Callbacks

Notification callbacks can be used in htf.communication.PluggableNotifier as the callback.

For example you can enqueue an item into a queue the callee just created to asynchronously receive data.

Example:

from htf.communication import PluggableNotifier, enqueueItem
import queue # or Queue in Python 2

pn = PluggableNotifier()
q = queue.Queue()
pn.register("subject", filterExpression=lambda *args,
            **kwargs: kwargs["item"]=="example item", callback=enqueueItem(q))
pn.input("example item") # is passed
item = q.get(block=True, timeout=1)
print(item) # "example item"
pn.input("another item") # is filtered out
item = q.get(block=True, timeout=1) # raises Queue.Empty
htf.communication.notification_callbacks.enqueue_item(queue: Queue) Callable[[str, Any], None]

Enqueues item into queue. This way asynchronous communication can be set up easily.

Parameters:

queue (queue.Queue) – a queue to be used.

Returns:

an anonymous method that can be called with an item that is put into queue.

Return type:

callable

SSH Client

With the SSH Client you can run commands remotely via SSH.

class htf.communication.SSHClient

SSHClient executes commands remotely using ssh.

close() None

Close current ssh connection.

connect(hostname: str, username: str | None = None, password: str | None = None) None

Establish an ssh connection.

Parameters:
  • hostname – the hostname or ip address

  • username=None – The username to be used. If set to None the current username is used instead of using the ssh keys in your system.

  • password=None – The password for username. If set to None the password for username or the current username is used.

execute(*command: str) Tuple[int, bytes, bytes]

Execute a command remotely via ssh.

Parameters:

*command – the command to be executed.

Returns:

a tuple containing exitcode, stdout and stderr of the executed command.

Return type:

tuple

SFTP Client

With the SFTP Client it is possible to do file operations remotely via SSH.

class htf.communication.SFTPClient

SFTPClient lets you access files via sftp protocol.

close() None

Close current sftp connection.

connect(hostname: str, username: str | None = None, password: str | None = None) None

Establish an sftp connection.

Parameters:
  • hostname – the hostname or ip address

  • username=None – The username to be used. If set to None the current username is used instead of using the ssh keys in your system.

  • password=None – The password for username. If set to None the password for username or the current username is used.

get(remotepath: str, localpath: str) None

Copies file identified by remotepath to localpath using sftp protocol.

Parameters:
  • remotepath – the remote file to be copied

  • localpath – the local path where the field is copied to

put(localpath: str, remotepath: str) SFTPAttributes

Copies file identified by localpath to remotepath using sftp protocol.

Parameters:
  • localpath – the local path to be copied

  • remotepath – the remote file where the field is copied to

Returns:

the result of os.stat

Return type:

SFTPAttributes

remove(remotepath: str) None

Removes a file identified by remotepath.

Parameters:

remotepath – the path for the file to be removed

Raises:

IOError – if the path is not a file

rename(oldpath: str, newpath: str) None

Remotely renames a file or a folder.

Parameters:
  • oldpath – the file or folder to be renamed

  • newpath – the new file- or foldername

Raises:

IOError – if something goes wrong

Removes a file identified by remotepath.

Parameters:

remotepath – the path for the file to be removed

Raises:

IOError – if the path is not a file

SLIP — Serial Line Internet Protocol

SLIP contains a mixin to be used to implement SLIP-based communications.

If you don’t know about SLIP you should first read RFC 1055.

The htf.communication.SlipMixin contains two methods for encoding and decoding SLIP-encoded communication frames.

class htf.communication.SlipMixin

The SlipMixin helps to encode and decode frames transferred in the Serial Line Internet Protocol.

It cannot be used standalone and must be mixed into a class that implements the htf.communication.SlipInterface.

slip_receive_frame(timeouts: int | None = None, max_length: int = 1024) bytes | None

Receive a frame encoded in the Serial Line Internet Protocol and return the decoded frame.

Parameters:
  • timeouts=None – the number of timeouts until None is returned. The timeout depends on the mixing class and underlying communication system.

  • max_length=1024 – maximum number of bytes that will be received, not including ESC. Prevents the method from perpetually blocking.

Returns:

the decoded frame if a frame was received or

None if no frame was received.

Return type:

bytes or None

slip_send_frame(frame: bytes) None

Send a frame encoded in the Serial Line Internet Protocol.

Parameters:

frame – the frame to be sent.

A class can subclass htf.communication.SlipMixin and must implement the htf.communication.SlipInterface.

class htf.communication.SlipInterface
receive_byte() bytes

Receive a single byte. If no byte was received, None is returned to tell about a timeout.

Returns:

a single byte that was received or None if a

timeout occurred.

Return type:

bytes or None

send_byte(byte: bytes) None

Send a single byte.

Parameters:

byte – a single byte to be sent.

In the following example one-way SLIP communication is established with a client and a server.

SLIP-Client:

from htf.communication import SlipMixin
import serial


class SlipClient(SlipMixin):
    def __init__(self, comport):
        self._serial = serial.Serial(comport, 115200,
                                     timeout=1.0, rtscts=False)

    def send_byte(self, byte):
        self._serial.write(byte)

    def receive_byte(self):
        return self._serial.read(1)

    def send(self, message):
        if not isinstance(message, bytes):
            message = message.encode()
        self.slip_send_frame(message)

if __name__ == "__main__":
    s = SlipClient("/dev/ttyUSB1")
    message = "Hello World!"
    s.send(message)

SLIP-Server:

from htf.communication import SlipMixin
import serial


class SlipServer(SlipMixin):
    def __init__(self, comport):
        self._serial = serial.Serial(comport, 115200,
                                     timeout=1.0, rtscts=False)

    def send_byte(self, byte):
        self._serial.write(byte)

    def receive_byte(self):
        return self._serial.read(1)

    def send(self, message):
        if not isinstance(message, bytes):
            message = message.encode()
        self.slip_send_frame(message)

    def receive(self):
        message = self.slip_receive_frame(timeouts=3)
        print("Received message:", message)

if __name__ == "__main__":
    s = SlipServer("/dev/ttyUSB2")
    while True:
        s.receive()

Changelog

htf-communication-4.0.4

  • add requirements for validation purposes

htf-communication-4.0.3

  • add build environment data for validation purposes

htf-communication-4.0.2

  • htf-communication can now be used standalone

htf-communication-4.0.1

  • add support for Python 3.12

htf-communication-4.0.0

  • remove all camel-case methods

  • add type-hints

  • use hlm-3.1

  • use oser-3.1

htf-communication-3.0.1

  • add support for Python 3.11

htf-communication-3.0.0

  • extract htf-communication from htf