HART
This section deals with device communication using the HART industrial automation protocol. Familiarity with this protocol and general topology is required to understand the examples given. The usefulness of this section to a general audience will be limited.
You can find more information in the HART documentation.
Warning
This HART demo is not usable with the htf-community version. Please contact us for a demo license.
Basic HART Communication
For a simple example of how sending and receiving HART commands works
see hart/test_basic_hart_communication.py
.
Before running you have to edit the file to fit to your needs.
Enter a valid value for COM_PORT
.
#
# Copyright (c) 2023, HILSTER - https://hilster.io
# All rights reserved.
#
from typing import Generator
import htf
from htf.hart import HartDeviceCommunication, HartFrame
COM_PORT = "ENTER_VALID_COM_PORT" # Replace this with the COM port your HART modem is connected to.
@htf.fixture("test")
def hart_device() -> Generator[HartDeviceCommunication, None, None]:
com = HartDeviceCommunication(COM_PORT)
com.find_device()
yield com
com.close()
"""
These tests show different ways to send and receive HART commands. It is only intended to be a high-level
introduction to how communication can be achieved. Please check the documentation if lower-level access is needed.
"""
def test_query(hart_device: HartDeviceCommunication) -> None:
response = hart_device.query(HartFrame(1)) # Command 1 - Read Primary Variable
print("Response:")
print(response)
def test_query_context() -> None:
with HartDeviceCommunication(COM_PORT) as com:
response = com.query(HartFrame(1)) # Command 1 - Read Primary Variable
print("Response:")
print(response)
def test_request_payload(hart_device: HartDeviceCommunication) -> None:
request = HartFrame(18) # Command 18 - Write Tag, Descriptor, Date
request.payload.tag.set(b"TESTTAG")
print("Request:")
print(request)
response = hart_device.query(request)
print("Response:")
print(response)
if __name__ == "__main__":
htf.main()
To execute the demo, run
htf -o hart/test_basic_hart_communication.py
Device Specific Commands
In order to use device specific or custom HART commands, a specialized HartFrame is needed.
See hart/test_custom_commands.py
for an example.
#
# Copyright (c) 2023, HILSTER - https://hilster.io
# All rights reserved.
#
from typing import Generator, Tuple
import htf
import oser # type: ignore
from htf.hart import HartFrame, ResponseCode, FieldDeviceStatus
from htf.hart.data_types import HartResponse
# Let's define a custom command to read random bytes from a devices memory
#
# The command number shall be 123 and have the following payloads:
# Request
# - one unsigned 8-bit Integer specifying the number of bytes to read
# Response
# - one unsigned 8-bit Integer to echo the number of requested bytes
# - a byte string of variable length, depending on the number of requested bytes
#
# Possible error response codes: passed parameter too large (to disallow requests of over 127 bytes)
# too few data bytes received (when the number of bytes parameter is missing)
# Defining request and response data bytes using the oser package
class Command123_ReadRandomBytesRequest(oser.ByteStruct): # noqa: N801
def __init__(self) -> None:
super(Command123_ReadRandomBytesRequest, self).__init__()
self.number_of_bytes = oser.UBInt8()
class Command123_ReadRandomBytesResponse(HartResponse): # noqa: N801
def __init__(self) -> None:
super(Command123_ReadRandomBytesResponse, self).__init__()
self.response_code = ResponseCode(error_passed_parameter_too_large=3, error_too_few_data_bytes_received=5)
self.field_device_status = FieldDeviceStatus()
self.number_of_bytes = oser.UBInt8()
self.read_bytes = oser.Data(length=lambda self: self.number_of_bytes.get())
# To use these payloads a custom HartFrame is required to map request and response data bytes to specific commands.
# Universal and common practice commands are supported by the HartFrame base class, so only additional commands
# need to be added here.
class MyDeviceHartFrame(HartFrame):
def requests_generator(self) -> Generator[Tuple[int, oser.Lazy], None, None]:
"""
This generator function maps request data bytes to their respective command numbers
"""
yield 123, oser.Lazy(Command123_ReadRandomBytesRequest) # wrapping the class with oser.Lazy() is not required,
# but can improve performance when a large number of
def responses_generator(self) -> Generator[Tuple[int, oser.Lazy], None, None]:
"""
This generator function maps response data bytes to their respective command numbers
"""
yield 123, oser.Lazy(Command123_ReadRandomBytesResponse)
# The new command is now ready to be used and can be passed to HART communication functions as the 'decoder_generator'
# argument.
#
# Example:
# with HartDeviceCommunication(comport='COM1', decoder_generator=MyDeviceHartFrame) as com:
# response = com.query(MyDeviceHartFrame(123))
# print(response)
def test_custom_command() -> None:
request = MyDeviceHartFrame(123)
print("Request:")
print(request)
response = MyDeviceHartFrame(123)
response.delimiter.frame_type.set("ack")
print("Response:")
print(response)
if __name__ == "__main__":
htf.main()
To execute the demo, run
htf -o hart/test_custom_commands.py
HART Slave Device Simulation
When testing HART master devices it can be difficult to replicate certain test conditions, for example when a slave device
is misbehaving or faulty or when a large number of slave devices are required. In these cases it is often easier to
use device simulators instead of physical devices.
See hart/test_slave_simulator.py
for an example.
#
# Copyright (c) 2023, HILSTER - https://hilster.io
# All rights reserved.
#
from typing import Optional
import htf
from htf.hart import HartFrame
from htf.hart.slave_simulator import HartSlaveSimulator
from hart.test_custom_commands import MyDeviceHartFrame
from random import randint
from unittest.mock import patch
# This HART slave simulator will respond to the custom command 123 from the previous example
class SlaveSimulator(HartSlaveSimulator):
def __init__(self, comport: str) -> None:
super(SlaveSimulator, self).__init__(
comport=comport, decoder_generator=MyDeviceHartFrame, device_type=0x1234, device_id=0x56
)
self.device_memory = b"This is my memory. There's lots of data in it. " * 50
def get_random_bytes(self, number_of_bytes: int) -> bytes:
start = randint(0, len(self.device_memory) - number_of_bytes)
end = start + number_of_bytes
return self.device_memory[start : end + 1]
def handle_command123(self, request: HartFrame) -> Optional[HartFrame]:
response = self._create_response(request)
number_of_bytes_requested = request.payload.number_of_bytes.get()
if number_of_bytes_requested > 127: # only 127 bytes are allowed to be read at one time
response.payload.response_code.set("error_passed_parameter_too_large")
else:
response.payload.number_of_bytes.set(number_of_bytes_requested) # echo number of bytes
random_bytes = self.get_random_bytes(number_of_bytes_requested) # read random bytes from memory
response.payload.read_bytes.set(random_bytes)
return response
"""
This test will start the slave simulator with a mocked interface to simulated a request.
"""
def test_start_simulator_success() -> None:
with patch("htf.hart.slave_simulator.HartInterface") as mock_interface_class:
mock_interface = mock_interface_class.return_value
request = MyDeviceHartFrame(123)
request.address.address.set(0x1234000056) # the slave will only respond to commands addressed to it
request.payload.number_of_bytes.set(25) # read 25 bytes
mock_interface.read.return_value = request.encode() # set the request for the slave to read
print("Request:")
print(request)
sim = SlaveSimulator(comport="")
sim.run(1) # run until one request has been answered
response = MyDeviceHartFrame()
response.decode(mock_interface.write.call_args[0][0]) # decode the response written to the mocked interface
print("Response:")
print(response)
htf.assert_equal(response.payload.response_code.get(), "success") # expect a successful response
htf.assert_equal(response.payload.number_of_bytes.get(), 25) # number of requested bytes should be echoed
htf.assert_equal(len(response.payload.read_bytes.get()), 25) # the actual length of read bytes should match
sim.close()
def test_start_simulator_error_response() -> None:
with patch("htf.hart.slave_simulator.HartInterface") as mock_interface_class:
mock_interface = mock_interface_class.return_value
request = MyDeviceHartFrame(123)
request.address.address.set(0x1234000056) # the slave will only respond to commands addressed to it
request.payload.number_of_bytes.set(128) # read 128 bytes, violating the threshold
mock_interface.read.return_value = request.encode()
print("Request:")
print(request)
sim = SlaveSimulator(comport="")
sim.run(1) # run until one request has been answered
response = MyDeviceHartFrame()
response.decode(mock_interface.write.call_args[0][0]) # decode the response written to the mocked interface
print("Response:")
print(response)
# expect an error response for requesting more than 127 bytes
htf.assert_equal(response.payload.response_code.get(), "error_passed_parameter_too_large")
sim.close()
if __name__ == "__main__":
htf.main()
To execute the demo, run
htf -o hart/test_slave_simulator.py
HART Simulation/Hardware Tests
When writing tests to check device behavior you may want to run them against a simulator and your device without having to develop the same test twice. This can be accomplished by using a combination of fixtures and tags. Once the test is written you can select the target by passing the desired fixture tag to the runner.
See hart/test_read_primary_variable.py
for an example.
#
# Copyright (c) 2023, HILSTER - https://hilster.io
# All rights reserved.
#
from typing import Generator, Any, Optional
import htf
from unittest.mock import patch
from htf.hart import HartFrame, HartDeviceCommunication
from htf.hart.slave_simulator import HartSlaveSimulator
COM_PORT = "/dev/ttyUSB0" # !!! Change this to the port your device is connected to
class PrimaryVariableSimulator(HartSlaveSimulator):
def __init__(self, comport: str) -> None:
super(PrimaryVariableSimulator, self).__init__(
comport=comport, decoder_generator=HartFrame, device_type=0x1234, device_id=0x56
)
self.broken = False
def handle_command1(self, request: HartFrame) -> Optional[HartFrame]:
response = self._create_response(request)
response.payload.primary_variable.set(123.45)
if self.broken:
response.payload.response_code.set("error_device_specific_command_error")
return response
@htf.tags("simulator")
@htf.fixture(scope="test", name="device")
def device_simulator() -> Generator[PrimaryVariableSimulator, None, None]:
patch("htf.hart.slave_simulator.HartInterface")
sim = PrimaryVariableSimulator("/dev/ttyUSB0")
sim.run(2)
yield sim
@htf.tags("hardware")
@htf.fixture(scope="test", name="device")
def device_hardware() -> Generator[Any, None, None]:
yield object() # Replace this with an object controlling your device
@htf.tags("simulator")
@htf.fixture(scope="test", name="interface")
def interface_simulator(device: Any) -> Generator["SimulatorInterface", None, None]: # type: ignore # noqa: F821
class SimulatorInterface:
def query(self, request: HartFrame) -> Optional[HartFrame]:
return device._get_response_from_handler(request) # type: ignore
yield SimulatorInterface()
@htf.tags("hardware")
@htf.fixture(scope="test", name="interface")
def interface_hardware(device: Any) -> Generator[HartDeviceCommunication, None, None]:
yield HartDeviceCommunication(COM_PORT)
def test_read_primary_variable(
device: Any,
interface: "SimulatorInterface", # type: ignore # noqa: F821
step: htf.fixtures.step,
) -> None:
with step("Read primary variable"):
request = HartFrame(1)
request.address.address.set(0x1234000056) # the slave will only respond to commands addressed to it
print("Request:")
print(request)
response = interface.query(request)
print("Response:")
print(response)
with step("Validate primary variable and response code"):
htf.assert_equal(response.payload.response_code.get(), "success") # expect a successful response
htf.assert_almost_equal(
response.payload.primary_variable.get(), 123.45
) # number of requested bytes should be echoed
with step("Break device"):
device.broken = True
with step("Read primary variable again"):
print("Request:")
print(request)
response = interface.query(request)
print("Response:")
print(response)
with step("Validate response code indicates device specific error"):
htf.assert_equal(response.payload.response_code.get(), "error_device_specific_command_error") # expect an error
To execute the demo against the simulator, run
htf -o -F simulator hart/test_read_primary_variable.py
likewise, to use a real device as the target run
htf -o -F hardware hart/test_read_primary_variable.py
Attention: You will need to change the ``COM_PORT` variable at the top of the test module to the port your device is connected to.