Communication¶
This chapter describes the modules for the asynchronous communication framework.
Pluggable Notifier¶
The htf.communication.PluggableNotifier
is an observer that lets subjects register tuples of name
, filterExpression
and a callback
.
When data is input all registered subjects are used the following way.
First the filterExpression
is called with the data as a parameter.
If True
is returned the callback is called with the data as a parameter.
With the htf.communication.PluggableNotifier
you can create an asynchronous communication framework easily.
-
class
htf.communication.
PluggableNotifier
(verbosity=0, raise_exceptions=False, filter_exception_handler=None, callback_exception_handler=None)¶ PluggableNotifier
implements an observer pattern. Subjects can register tuples consisting of executable filter expressions and callbacks to be notified about events.There is a method to enqueue new items and a thread to apply filters on items. Communication between method and thread is done via a queue.
PluggableNotififer
extendsthreading.Thread
.- Parameters
verbosity (int) – if set to 1 debug messages are printed to stdout
raise_exceptions (bool) – if set to
True
exceptions are raised if a filter expression or a callback raises an exception. This may be used for debugging. In case of an exceptionstop
is called so no further items can be put in.filter_exception_handler (None or callable) – a callback that is called to handle a raised exception in a filter expression. If the
filter_exception_handler
returnsTrue
the exception is not raised afterwards. Else the exception handler raises the catched exception and stopps the thread. The supplied handler musst acceptname
anditem
arguments.name
is the callback name supplied byregister
anditem
is the supplied item frominput
.callback_exception_handler (None or callable) – a callback that is called to handle a raised exception in a callback. If the
callback_exception_handler
returnsTrue
the exception is not raised afterwards. Else the exception handler raises the catched exception and stopps the thread. The supplied handler musst acceptname
anditem
arguments.name
is the callback name supplied byregister
anditem
is the supplied item frominput
.
-
get_filter_names
()¶ Get the filter names.
- Returns
a sorted list containing the filter names.
- Return type
list of str
-
input
(item)¶ Input an
item
to be used with all registered subjects.- Parameters
item – an item to be processed.
- Raises
RuntimeError – if
stop
was called before –
-
is_alive
()¶ Return whether the thread is alive.
This method returns True just before the run() method starts until just after the run() method terminates. The module function enumerate() returns a list of all alive threads.
-
join
(timeout=1.0)¶ Wait until the thread ends.
- Parameters
timeout (float) – the timeout in seconds. Default: 1.0 seconds.
-
register
(name, filter_expression, callback)¶ Register a tuple of name, filter_expression and a callback.
- Parameters
name (str) – a name for the tuple (for debugging, see
verbosity
).filter_expression (callable) – a callable filter expression (see
htf.filters
for more information) or a method that is called.filter_expression
is called with theitem
frominput
as a parameter. Iffilter_expression
returns True the callback is called with the same item.callback (callable) – a callable object that is called in case
filter_expression(item)
returnsTrue
.
-
run
()¶ Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
-
start
()¶ Start the thread’s activity.
It must be called at most once per thread object. It arranges for the object’s run() method to be invoked in a separate thread of control.
This method will raise a RuntimeError if called more than once on the same thread object.
-
stop
()¶ Stop the thread.
Notification Callbacks¶
Notification callbacks can be used in htf.communication.PluggableNotifier
as the
callback.
For example you can enqueue an item into a queue the callee just created to asynchronously receive data.
Example:
from htf.communication import PluggableNotifier, enqueueItem
import queue # or Queue in Python 2
pn = PluggableNotifier()
q = queue.Queue()
pn.register("subject", filterExpression=lambda *args,
**kwargs: kwargs["item"]=="example item", callback=enqueueItem(q))
pn.input("example item") # is passed
item = q.get(block=True, timeout=1)
print(item) # "example item"
pn.input("another item") # is filtered out
item = q.get(block=True, timeout=1) # raises Queue.Empty
-
htf.communication.notification_callbacks.
enqueueItem
(queue)¶ Enqueues
item
intoqueue
. This way asynchronous communication can be set up easily.- Parameters
queue (queue.Queue) – a queue to be used.
- Returns
an anonymous method that can be called with an
item
that is put intoqueue
.- Return type
callable
SSH Client¶
With the SSH Client you can run commands remotely via SSH.
-
class
htf.communication.
SSHClient
¶ SSHClient
executes commands remotely using ssh.-
close
()¶ Close current ssh connection.
-
connect
(hostname, username=None, password=None)¶ Establish an ssh connection.
- Parameters
hostname (str) – the hostname or ip address
username=None (str) – The username to be used. If set to
None
the current username is used instead of using the ssh keys in your system.password=None (str) – The password for
username
. If set toNone
the password forusername
or the current username is used.
-
SFTP Client¶
With the SFTP Client it is possible to do file operations remotely via SSH.
-
class
htf.communication.
SFTPClient
¶ SFTPClient
lets you access files via sftp protocol.-
close
()¶ Close current sftp connection.
-
connect
(hostname, username=None, password=None)¶ Establish an sftp connection.
- Parameters
hostname (str) – the hostname or ip address
username=None (str) – The username to be used. If set to
None
the current username is used instead of using the ssh keys in your system.password=None (str) – The password for
username
. If set toNone
the password forusername
or the current username is used.
-
execute
(*command)¶ Execute a command remotely via ssh.
- Parameters
*command (tuple of str) – the command to be executed.
- Returns
a tuple containing exitcode, stdout and stderr of the executed command.
- Return type
tuple (exitcode, stdout, stderr)
-
get
(remotepath, localpath)¶ Copies file identified by
remotepath
tolocalpath
using sftp protocol.
-
put
(localpath, remotepath)¶ Copies file identified by
localpath
toremotepath
using sftp protocol.
-
remove
(remotepath)¶ Removes a file identified by
remotepath
.
-
rename
(oldpath, newpath)¶ Remotely renames a file or a folder.
-
SLIP — Serial Line Internet Protocol¶
SLIP contains a mixin to be used to implement SLIP-based communications.
If you don’t know about SLIP you should first read RFC 1055.
The htf.communication.SlipMixin
contains two methods for encoding
and decoding SLIP-encoded communication frames.
-
class
htf.communication.
SlipMixin
¶ The
SlipMixin
helps to encode and decode frames transferred in the Serial Line Internet Protocol.It cannot be used standalone and must be mixed into a class that implements the
htf.communication.SlipInterface
.-
slip_receive_frame
(timeouts=None, max_length=1024)¶ Receive a frame encoded in the Serial Line Internet Protocol and return the decoded frame.
- Parameters
- Returns
- the decoded frame if a frame was received or
None
if no frame was received.
- Return type
-
A class can subclass htf.communication.SlipMixin
and must implement
the htf.communication.SlipInterface
.
-
class
htf.communication.
SlipInterface
¶ -
receive_byte
()¶ Receive a single byte. If no byte was received,
None
is returned to tell about a timeout.
-
In the following example one-way SLIP communication is established with a client and a server.
SLIP-Client:
from __future__ import \
absolute_import, division, print_function, unicode_literals
from htf.communication import SlipMixin
import serial
class SlipClient(SlipMixin):
def __init__(self, comport):
self._serial = serial.Serial(comport, 115200,
timeout=1.0, rtscts=False)
def send_byte(self, byte):
self._serial.write(byte)
def receive_byte(self):
return self._serial.read(1)
def send(self, message):
if not isinstance(message, bytes):
message = message.encode()
self.slip_send_frame(message)
if __name__ == "__main__":
s = SlipClient("/dev/ttyUSB1")
message = "Hello World!"
s.send(message)
SLIP-Server:
from __future__ import \
absolute_import, division, print_function, unicode_literals
from htf.communication import SlipMixin
import serial
class SlipServer(SlipMixin):
def __init__(self, comport):
self._serial = serial.Serial(comport, 115200,
timeout=1.0, rtscts=False)
def send_byte(self, byte):
self._serial.write(byte)
def receive_byte(self):
return self._serial.read(1)
def send(self, message):
if not isinstance(message, bytes):
message = message.encode()
self.slip_send_frame(message)
def receive(self):
message = self.slip_receive_frame(timeouts=3)
print("Received message:", message)
if __name__ == "__main__":
s = SlipServer("/dev/ttyUSB2")
while True:
s.receive()