Binary Data and Communication Protocols

Oser helps you to work with binary data structures. You can easily build and read binary data files, communication protocols, eeprom contents, etc.

More information can be found in the oser documentation.

Data structures and their dependencies are easily described. You do not need to count bits and bytes anymore.

Communication Protocol

Assume you have a binary communication protocol for a safety relevant system.

Each message consists of a header and one to many data objects. Each data object consists of a type and a type dependent payload. Every header and every data object’s consistency is assured with a CRC.

All data is big endian.

The header is specified in the following table.

Byte

Name

Type

Description

0

service_id

UInt8

The service id (Enum)

1..4

timestamp

UInt32

timestamp as unix epoch

5

number_of_data_objects

UInt8

the number of data objects

6..9

crc

UInt32

32 bit checksum with polynomial 0x04C11DB7

10..N

data_objects

Array

The array of data objects

The enumeration for the service_id is found in the following table.

Name

Value

Unicast

0x00

Broadcast

0xFF

The data object structure is specified in the following table.

Byte

Name

Type

Description

0

data_object_id

UInt8

The data object id (Enum)

1..N

payload

Object

The specific payload

N+1..N+4

crc

UInt32

32 bit checksum with polynomial 0x04C11DB7

The enumeration for the data_object_id can be found in the following table.

Name

Value

U32

0x00

Float[]

0x01

Status

0xFF

The U32 payload is described in the following table.

Byte

Name

Type

Description

1..4

value

UInt32

The value

The Float[] payload is described in the following table.

Byte

Name

Type

Description

1

number_of_values

UInt8

The number of float values

2..N

values

Float list

List of float values

The Status payload is described in the following table.

Byte

Name

Type

Description

0..3

uptime

UInt32

Uptime in seconds

4

status

UInt8

Status bits

The implementation of the protocol is located in binary_data/protocol.py.

#
# Copyright (c) 2023, HILSTER - https://hilster.io
# All rights reserved.
#

import oser  # type: ignore


class U32Payload(oser.ByteStruct):
    """
    +----------+------------------------+--------+----------------------------+
    | Byte     | Name                   | Type   | Description                |
    +==========+========================+========+============================+
    | 1..4     | value                  | UInt32 | The value                  |
    +----------+------------------------+--------+----------------------------+
    """

    def __init__(self) -> None:
        super(U32Payload, self).__init__()
        self.value = oser.UBInt32()


class FloatListPayload(oser.ByteStruct):
    """
    +----------+------------------------+--------+----------------------------+
    | Byte     | Name                   | Type   | Description                |
    +==========+========================+========+============================+
    | 1        | number_of_values       | UInt8  | The number of float values |
    +----------+------------------------+--------+----------------------------+
    | 2..N     | values                 | Float  | List of float values       |
    |          |                        | list   |                            |
    +----------+------------------------+--------+----------------------------+
    """

    def __init__(self) -> None:
        super(FloatListPayload, self).__init__()
        self.number_of_values = oser.UBInt8()
        self.values = oser.Array(length=lambda ctx: ctx.number_of_values.get(), prototype=oser.BFloat)


class StatusPayload(oser.ByteStruct):
    """
    +----------+------------------------+--------+----------------------------+
    | Byte     | Name                   | Type   | Description                |
    +==========+========================+========+============================+
    | 0..3     | uptime                 | UInt32 | Uptime in seconds          |
    +----------+------------------------+--------+----------------------------+
    | 4        | status                 | UInt8  | Status bits                |
    +----------+------------------------+--------+----------------------------+
    """

    def __init__(self) -> None:
        super(StatusPayload, self).__init__()
        self.uptime = oser.UBInt32()
        self.status = oser.UBInt8()


PAYLOAD = {
    "U32": (0x00, U32Payload),
    "Float[]": (0x01, FloatListPayload),
    "Status": (0xFF, StatusPayload),
}


def DataObjectId() -> oser.Enum:  # noqa: N802
    """
    +-----------+------------------------+
    | Name      | Value                  |
    +===========+========================+
    | U32       | 0x00                   |
    +-----------+------------------------+
    | Float[]   | 0x01                   |
    +-----------+------------------------+
    | Status    | 0xFF                   |
    +-----------+------------------------+
    """
    return oser.Enum(prototype=oser.UBInt8, values={key: value[0] for key, value in PAYLOAD.items()})


class DataObject(oser.ByteStruct):
    """
    +----------+------------------------+--------+----------------------------+
    | Byte     | Name                   | Type   | Description                |
    +==========+========================+========+============================+
    | 0        | data_object_id         | UInt8  | The data object id (Enum)  |
    +----------+------------------------+--------+----------------------------+
    | 1..N     | payload                | Object | The specific payload       |
    +----------+------------------------+--------+----------------------------+
    | N+1..N+4 | crc                    | UInt32 | 32 bit checksum with       |
    |          |                        |        | polynomial 0x04C11DB7      |
    +----------+------------------------+--------+----------------------------+
    """

    def __init__(self) -> None:
        super(DataObject, self).__init__()

        self.data_object_id = DataObjectId()
        self.payload = oser.Switch(
            condition=lambda ctx: self.data_object_id.get(),
            values={key: value[1]() for key, value in PAYLOAD.items()},
            default=oser.Null(),
        )
        self.crc = oser.CRCB32(polynomial=0x04C11DB7, strict=True)


def ServiceId() -> oser.Enum:  # noqa: N802
    """
    +-----------+------------------------+
    | Name      | Value                  |
    +===========+========================+
    | Unicast   | 0x00                   |
    +-----------+------------------------+
    | Broadcast | 0xFF                   |
    +-----------+------------------------+
    """
    return oser.Enum(
        prototype=oser.UBInt8,
        values={
            "Unicast": 0x00,
            "Broadcast": 0xFF,
        },
        value="Unicast",
    )


class Message(oser.ByteStruct):
    """
    +----------+------------------------+--------+----------------------------+
    | Byte     | Name                   | Type   | Description                |
    +==========+========================+========+============================+
    | 0        | service_id             | UInt8  | The service id (Enum)      |
    +----------+------------------------+--------+----------------------------+
    | 1..4     | timestamp              | UInt32 | timestamp as unix epoch    |
    +----------+------------------------+--------+----------------------------+
    | 5        | number_of_data_objects | UInt8  | the number of data objects |
    +----------+------------------------+--------+----------------------------+
    | 6..9     | crc                    | UInt32 | 32 bit checksum with       |
    |          |                        |        | polynomial 0x04C11DB7      |
    +----------+------------------------+--------+----------------------------+
    | 10..N    | data_objects           | Array  | The array of data objects  |
    +----------+------------------------+--------+----------------------------+
    """

    def __init__(self) -> None:
        super(Message, self).__init__()

        self.service_id = ServiceId()
        self.timestamp = oser.UBInt32()
        self.number_of_data_objects = oser.UBInt8()
        self.crc = oser.CRCB32(polynomial=0x04C11DB7, strict=True)

        self.data_objects = oser.Array(length=lambda ctx: ctx.number_of_data_objects.get(), prototype=DataObject)

An example on how to create a specific message is located in binary_data/create_message.py.

#
# Copyright (c) 2023, HILSTER - https://hilster.io
# All rights reserved.
#

import time

import oser  # type: ignore
from protocol import Message  # type: ignore


if __name__ == "__main__":
    # instantiate a message
    message = Message()

    # set service id
    message.service_id.set("Broadcast")

    # set timestamp
    message.timestamp.set(int(time.time()))

    # add 3 data objects
    message.number_of_data_objects.set(3)

    # set data object 0 to U32
    data_object = message.data_objects[0]
    data_object.data_object_id.set("U32")
    data_object.payload.value.set(0x1234)

    # set data object 1 to Float[] with 4 floats
    data_object = message.data_objects[1]
    data_object.data_object_id.set("Float[]")
    data_object.payload.number_of_values.set(4)
    data_object.payload.values[:] = (1.5**i for i in range(4))

    # set data object 2 to Status
    data_object = message.data_objects[2]
    data_object.data_object_id.set("Status")
    data_object.payload.uptime.set(24 * 60 * 60)  # 1 day
    data_object.payload.status.set(0b11110000)

    # encode message to binary
    binary_message = message.encode()

    # print binary message
    print(">>> Message as hex <<<")
    print(oser.to_hex(binary_message))

    # print message
    print("\n>>> Message default print <<<")
    print(message)

    # print message with introspection
    print("\n>>> Message introspection <<<")
    print(message.introspect())

An example on how to parse a specific message and extract data is located in binary_data/parse_message.py.

#
# Copyright (c) 2023, HILSTER - https://hilster.io
# All rights reserved.
#

from protocol import Message  # type: ignore

binary_message = (
    b"\x00\x5e\xc4\xf1\x91\x0a\xaf\x6b\x65\x56\x00\x00\x00\x00\x00\x00"
    b"\x00\x00\x00\x00\x00\x00\x00\x01\x04\xc1\x1d\xb7\x00\x00\x00\x00"
    b"\x04\x13\x04\x76\xdc\x00\x00\x00\x00\x09\x22\xc9\xf0\x0f\x00\x00"
    b"\x00\x00\x10\x4c\x11\xdb\x70\x00\x00\x00\x00\x19\x6e\xd8\x2b\x7f"
    b"\x00\x00\x00\x00\x24\x8b\x27\xc0\x3c\x00\x00\x00\x00\x31\xd0\xf3"
    b"\x70\x27\x00\x00\x00\x00\x40\x34\x86\x70\x77\x00\x00\x00\x00\x51"
    b"\x7c\x56\xb6\xb0"
)


if __name__ == "__main__":
    # the above binary_message can be decoded as a Message()
    message = Message()
    message.decode(binary_message)

    # what is the message type?
    print("Message type:", message.service_id.get())

    # how many data objects does the message have?
    print("Number of data objects:", message.number_of_data_objects.get())

    # what is the content of the 3rd data object?
    print("3rd value:", message.data_objects[2].payload.value.get())

    # change the crc at the end and try to decode the message again. What happens?
    broken_binary_message = binary_message[:-1] + b"\xb1"
    # message.decode(broken_binary_message)  # please comment out this line and run this script again

You will also find a example of a crc checksum error in the last example.