HART

This section deals with device communication using the HART industrial automation protocol. Familiarity with this protocol and general topology is required to understand the examples given. The usefulness of this section to a general audience will be limited.

You can find more information in the HART documentation.

Warning

This HART demo is not usable with the htf-community version. Please contact us for a demo license.

Basic HART Communication

For a simple example of how sending and receiving HART commands works see hart/test_basic_hart_communication.py.

Before running you have to edit the file to fit to your needs. Enter a valid value for COM_PORT.

# -*- coding: utf-8 -*-
#
# Copyright (c) 2020, HILSTER Testing Solutions GmbH, Germany - https://hilster.io
# All rights reserved.
#

import htf
from htf.communication.hart import HartDeviceCommunication, HartFrame

COM_PORT = 'ENTER_VALID_COM_PORT'  # Replace this with the COM port your HART modem is connected to.


@htf.fixture('test')
def hart_device():
    com = HartDeviceCommunication(COM_PORT)
    com.find_device()
    yield com
    com.close()


"""
These tests show different ways to send and receive HART commands. It is only intended to be a high-level
introduction to how communication can be achieved. Please check the documentation if lower-level access is needed.
"""


def test_query(hart_device):
    response = hart_device.query(HartFrame(1))  # Command 1 - Read Primary Variable
    print('Response:')
    print(response)


def test_query_context():
    with HartDeviceCommunication(COM_PORT) as com:
        response = com.query(HartFrame(1))  # Command 1 - Read Primary Variable
        print('Response:')
        print(response)


def test_request_payload(hart_device):
    request = HartFrame(18)  # Command 18 - Write Tag, Descriptor, Date
    request.payload.tag.set(b'TESTTAG')
    print('Request:')
    print(request)
    response = hart_device.query(request)
    print('Response:')
    print(response)


if __name__ == '__main__':
    htf.main()

To execute the demo, run

htf -o hart/test_basic_hart_communication.py

Device Specific Commands

In order to use device specific or custom HART commands, a specialized HartFrame is needed. See hart/test_custom_commands.py for an example.

# -*- coding: utf-8 -*-
#
# Copyright (c) 2020, HILSTER Testing Solutions GmbH, Germany - https://hilster.io
# All rights reserved.
#

import htf
import oser
from htf.communication.hart import HartFrame, ResponseCode, FieldDeviceStatus
from htf.communication.hart.data_types import HartResponse

# Let's define a custom command to read random bytes from a devices memory
#
# The command number shall be 123 and have the following payloads:
# Request
#         - one unsigned 8-bit Integer specifying the number of bytes to read
# Response
#         - one unsigned 8-bit Integer to echo the number of requested bytes
#         - a byte string of variable length, depending on the number of requested bytes
#
#         Possible error response codes: passed parameter too large (to disallow requests of over 127 bytes)
#                                        too few data bytes received (when the number of bytes parameter is missing)


# Defining request and response data bytes using the oser package


class Command123_ReadRandomBytesRequest(oser.ByteStruct):
    def __init__(self):
        super(Command123_ReadRandomBytesRequest, self).__init__()
        self.number_of_bytes = oser.UBInt8()


class Command123_ReadRandomBytesResponse(HartResponse):
    def __init__(self):
        super(Command123_ReadRandomBytesResponse, self).__init__()
        self.response_code = ResponseCode(error_passed_parameter_too_large=3,
                                          error_too_few_data_bytes_received=5)
        self.field_device_status = FieldDeviceStatus()

        self.number_of_bytes = oser.UBInt8()
        self.read_bytes = oser.Data(length=lambda self: self.number_of_bytes.get())


# To use these payloads a custom HartFrame is required to map request and response data bytes to specific commands.
# Universal and common practice commands are supported by the HartFrame base class, so only additional commands
# need to be added here.

class MyDeviceHartFrame(HartFrame):

    def requests_generator(self):
        """
        This generator function maps request data bytes to their respective command numbers
        """
        yield 123, oser.Lazy(Command123_ReadRandomBytesRequest)  # wrapping the class with oser.Lazy() is not required,
                                                                 # but can improve performance when a large number of
    def responses_generator(self):                               # commands are supported
        """
        This generator function maps response data bytes to their respective command numbers
        """
        yield 123, oser.Lazy(Command123_ReadRandomBytesResponse)


# The new command is now ready to be used and can be passed to HART communication functions as the 'decoder_generator'
# argument.
#
# Example:
#     with HartDeviceCommunication(comport='COM1', decoder_generator=MyDeviceHartFrame) as com:
#         response = com.query(MyDeviceHartFrame(123))
#         print(response)


def test_custom_command():
    request = MyDeviceHartFrame(123)
    print('Request:')
    print(request)
    response = MyDeviceHartFrame(123)
    response.delimiter.frame_type.set('ack')
    print('Response:')
    print(response)


if __name__ == '__main__':
    htf.main()

To execute the demo, run

htf -o hart/test_custom_commands.py

HART Slave Device Simulation

When testing HART master devices it can be difficult to replicate certain test conditions, for example when a slave device is misbehaving or faulty or when a large number of slave devices are required. In these cases it is often easier to use device simulators instead of physical devices. See hart/test_slave_simulator.py for an example.

# -*- coding: utf-8 -*-
#
# Copyright (c) 2020, HILSTER Testing Solutions GmbH, Germany - https://hilster.io
# All rights reserved.
#
from random import randint

import htf
from htf.communication.hart.slave_simulator import HartSlaveSimulator
from unittest.mock import patch
from hart.test_custom_commands import MyDeviceHartFrame


# This HART slave simulator will respond to the custom command 123 from the previous example

class SlaveSimulator(HartSlaveSimulator):
    def __init__(self, comport):
        super(SlaveSimulator, self).__init__(comport=comport,
                                             decoder_generator=MyDeviceHartFrame,
                                             device_type=0x1234,
                                             device_id=0x56)
        self.device_memory = b"This is my memory. There's lots of data in it. " * 50

    def get_random_bytes(self, number_of_bytes):
        start = randint(0, len(self.device_memory) - number_of_bytes)
        end = start + number_of_bytes
        return self.device_memory[start:end+1]

    def handle_command123(self, request):
        response = self._create_response(request)
        number_of_bytes_requested = request.payload.number_of_bytes.get()
        if number_of_bytes_requested > 127:  # only 127 bytes are allowed to be read at one time
            response.payload.response_code.set('error_passed_parameter_too_large')
        else:
            response.payload.number_of_bytes.set(number_of_bytes_requested)  # echo number of bytes
            random_bytes = self.get_random_bytes(number_of_bytes_requested)  # read random bytes from memory
            response.payload.read_bytes.set(random_bytes)
        return response


"""
This test will start the slave simulator with a mocked interface to simulated a request.
"""

@patch('htf.communication.hart.slave_simulator.HartInterface')
def test_start_simulator_success(mock_interface_class):
    mock_interface = mock_interface_class.return_value

    request = MyDeviceHartFrame(123)
    request.address.address.set(0x1234000056)  # the slave will only respond to commands addressed to it
    request.payload.number_of_bytes.set(25)  # read 25 bytes
    mock_interface.read.return_value = request.encode()  # set the request for the slave to read
    print('Request:')
    print(request)

    sim = SlaveSimulator(comport='')
    sim.run(1)  # run until one request has been answered
    response = MyDeviceHartFrame()
    response.decode(mock_interface.write.call_args[0][0])  # decode the response written to the mocked interface
    print('Response:')
    print(response)

    htf.assert_equal(response.payload.response_code.get(), 'success')  # expect a successful response
    htf.assert_equal(response.payload.number_of_bytes.get(), 25)  # number of requested bytes should be echoed
    htf.assert_equal(len(response.payload.read_bytes.get()), 25)  # the actual length of read bytes should match

    sim.close()


@patch('htf.communication.hart.slave_simulator.HartInterface')
def test_start_simulator_error_response(mock_interface_class):
    mock_interface = mock_interface_class.return_value
    request = MyDeviceHartFrame(123)
    request.address.address.set(0x1234000056)  # the slave will only respond to commands addressed to it
    request.payload.number_of_bytes.set(128)  # read 128 bytes, violating the threshold
    mock_interface.read.return_value = request.encode()
    print('Request:')
    print(request)

    sim = SlaveSimulator(comport='')
    sim.run(1)  # run until one request has been answered
    response = MyDeviceHartFrame()
    response.decode(mock_interface.write.call_args[0][0])  # decode the response written to the mocked interface
    print('Response:')
    print(response)

    # expect an error response for requesting more than 127 bytes
    htf.assert_equal(response.payload.response_code.get(), 'error_passed_parameter_too_large')

    sim.close()


if __name__ == '__main__':
    htf.main()

To execute the demo, run

htf -o hart/test_slave_simulator.py