Binary Data and Communication Protocols
Oser helps you to work with binary data structures. You can easily build and read binary data files, communication protocols, eeprom contents, etc.
More information can be found in the oser documentation.
Data structures and their dependencies are easily described. You do not need to count bits and bytes anymore.
Communication Protocol
Assume you have a binary communication protocol for a safety relevant system.
Each message consists of a header and one to many data objects. Each data object consists of a type and a type dependent payload. Every header and every data object’s consistency is assured with a CRC.
All data is big endian.
The header is specified in the following table.
Byte |
Name |
Type |
Description |
---|---|---|---|
0 |
service_id |
UInt8 |
The service id (Enum) |
1..4 |
timestamp |
UInt32 |
timestamp as unix epoch |
5 |
number_of_data_objects |
UInt8 |
the number of data objects |
6..9 |
crc |
UInt32 |
32 bit checksum with polynomial 0x04C11DB7 |
10..N |
data_objects |
Array |
The array of data objects |
The enumeration for the service_id
is found in the following table.
Name |
Value |
---|---|
Unicast |
0x00 |
Broadcast |
0xFF |
The data object structure is specified in the following table.
Byte |
Name |
Type |
Description |
---|---|---|---|
0 |
data_object_id |
UInt8 |
The data object id (Enum) |
1..N |
payload |
Object |
The specific payload |
N+1..N+4 |
crc |
UInt32 |
32 bit checksum with polynomial 0x04C11DB7 |
The enumeration for the data_object_id
can be found in the following table.
Name |
Value |
---|---|
U32 |
0x00 |
Float[] |
0x01 |
Status |
0xFF |
The U32 payload is described in the following table.
Byte |
Name |
Type |
Description |
---|---|---|---|
1..4 |
value |
UInt32 |
The value |
The Float[] payload is described in the following table.
Byte |
Name |
Type |
Description |
---|---|---|---|
1 |
number_of_values |
UInt8 |
The number of float values |
2..N |
values |
Float list |
List of float values |
The Status payload is described in the following table.
Byte |
Name |
Type |
Description |
---|---|---|---|
0..3 |
uptime |
UInt32 |
Uptime in seconds |
4 |
status |
UInt8 |
Status bits |
The implementation of the protocol is located in binary_data/protocol.py
.
#
# Copyright (c) 2023, HILSTER - https://hilster.io
# All rights reserved.
#
import oser # type: ignore
class U32Payload(oser.ByteStruct):
"""
+----------+------------------------+--------+----------------------------+
| Byte | Name | Type | Description |
+==========+========================+========+============================+
| 1..4 | value | UInt32 | The value |
+----------+------------------------+--------+----------------------------+
"""
def __init__(self) -> None:
super(U32Payload, self).__init__()
self.value = oser.UBInt32()
class FloatListPayload(oser.ByteStruct):
"""
+----------+------------------------+--------+----------------------------+
| Byte | Name | Type | Description |
+==========+========================+========+============================+
| 1 | number_of_values | UInt8 | The number of float values |
+----------+------------------------+--------+----------------------------+
| 2..N | values | Float | List of float values |
| | | list | |
+----------+------------------------+--------+----------------------------+
"""
def __init__(self) -> None:
super(FloatListPayload, self).__init__()
self.number_of_values = oser.UBInt8()
self.values = oser.Array(length=lambda ctx: ctx.number_of_values.get(), prototype=oser.BFloat)
class StatusPayload(oser.ByteStruct):
"""
+----------+------------------------+--------+----------------------------+
| Byte | Name | Type | Description |
+==========+========================+========+============================+
| 0..3 | uptime | UInt32 | Uptime in seconds |
+----------+------------------------+--------+----------------------------+
| 4 | status | UInt8 | Status bits |
+----------+------------------------+--------+----------------------------+
"""
def __init__(self) -> None:
super(StatusPayload, self).__init__()
self.uptime = oser.UBInt32()
self.status = oser.UBInt8()
PAYLOAD = {
"U32": (0x00, U32Payload),
"Float[]": (0x01, FloatListPayload),
"Status": (0xFF, StatusPayload),
}
def DataObjectId() -> oser.Enum: # noqa: N802
"""
+-----------+------------------------+
| Name | Value |
+===========+========================+
| U32 | 0x00 |
+-----------+------------------------+
| Float[] | 0x01 |
+-----------+------------------------+
| Status | 0xFF |
+-----------+------------------------+
"""
return oser.Enum(prototype=oser.UBInt8, values={key: value[0] for key, value in PAYLOAD.items()})
class DataObject(oser.ByteStruct):
"""
+----------+------------------------+--------+----------------------------+
| Byte | Name | Type | Description |
+==========+========================+========+============================+
| 0 | data_object_id | UInt8 | The data object id (Enum) |
+----------+------------------------+--------+----------------------------+
| 1..N | payload | Object | The specific payload |
+----------+------------------------+--------+----------------------------+
| N+1..N+4 | crc | UInt32 | 32 bit checksum with |
| | | | polynomial 0x04C11DB7 |
+----------+------------------------+--------+----------------------------+
"""
def __init__(self) -> None:
super(DataObject, self).__init__()
self.data_object_id = DataObjectId()
self.payload = oser.Switch(
condition=lambda ctx: self.data_object_id.get(),
values={key: value[1]() for key, value in PAYLOAD.items()},
default=oser.Null(),
)
self.crc = oser.CRCB32(polynomial=0x04C11DB7, strict=True)
def ServiceId() -> oser.Enum: # noqa: N802
"""
+-----------+------------------------+
| Name | Value |
+===========+========================+
| Unicast | 0x00 |
+-----------+------------------------+
| Broadcast | 0xFF |
+-----------+------------------------+
"""
return oser.Enum(
prototype=oser.UBInt8,
values={
"Unicast": 0x00,
"Broadcast": 0xFF,
},
value="Unicast",
)
class Message(oser.ByteStruct):
"""
+----------+------------------------+--------+----------------------------+
| Byte | Name | Type | Description |
+==========+========================+========+============================+
| 0 | service_id | UInt8 | The service id (Enum) |
+----------+------------------------+--------+----------------------------+
| 1..4 | timestamp | UInt32 | timestamp as unix epoch |
+----------+------------------------+--------+----------------------------+
| 5 | number_of_data_objects | UInt8 | the number of data objects |
+----------+------------------------+--------+----------------------------+
| 6..9 | crc | UInt32 | 32 bit checksum with |
| | | | polynomial 0x04C11DB7 |
+----------+------------------------+--------+----------------------------+
| 10..N | data_objects | Array | The array of data objects |
+----------+------------------------+--------+----------------------------+
"""
def __init__(self) -> None:
super(Message, self).__init__()
self.service_id = ServiceId()
self.timestamp = oser.UBInt32()
self.number_of_data_objects = oser.UBInt8()
self.crc = oser.CRCB32(polynomial=0x04C11DB7, strict=True)
self.data_objects = oser.Array(length=lambda ctx: ctx.number_of_data_objects.get(), prototype=DataObject)
An example on how to create a specific message is located in
binary_data/create_message.py
.
#
# Copyright (c) 2023, HILSTER - https://hilster.io
# All rights reserved.
#
import time
import oser # type: ignore
from protocol import Message # type: ignore
if __name__ == "__main__":
# instantiate a message
message = Message()
# set service id
message.service_id.set("Broadcast")
# set timestamp
message.timestamp.set(int(time.time()))
# add 3 data objects
message.number_of_data_objects.set(3)
# set data object 0 to U32
data_object = message.data_objects[0]
data_object.data_object_id.set("U32")
data_object.payload.value.set(0x1234)
# set data object 1 to Float[] with 4 floats
data_object = message.data_objects[1]
data_object.data_object_id.set("Float[]")
data_object.payload.number_of_values.set(4)
data_object.payload.values[:] = (1.5**i for i in range(4))
# set data object 2 to Status
data_object = message.data_objects[2]
data_object.data_object_id.set("Status")
data_object.payload.uptime.set(24 * 60 * 60) # 1 day
data_object.payload.status.set(0b11110000)
# encode message to binary
binary_message = message.encode()
# print binary message
print(">>> Message as hex <<<")
print(oser.to_hex(binary_message))
# print message
print("\n>>> Message default print <<<")
print(message)
# print message with introspection
print("\n>>> Message introspection <<<")
print(message.introspect())
An example on how to parse a specific message and extract data is located in
binary_data/parse_message.py
.
#
# Copyright (c) 2023, HILSTER - https://hilster.io
# All rights reserved.
#
from protocol import Message # type: ignore
binary_message = (
b"\x00\x5e\xc4\xf1\x91\x0a\xaf\x6b\x65\x56\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x01\x04\xc1\x1d\xb7\x00\x00\x00\x00"
b"\x04\x13\x04\x76\xdc\x00\x00\x00\x00\x09\x22\xc9\xf0\x0f\x00\x00"
b"\x00\x00\x10\x4c\x11\xdb\x70\x00\x00\x00\x00\x19\x6e\xd8\x2b\x7f"
b"\x00\x00\x00\x00\x24\x8b\x27\xc0\x3c\x00\x00\x00\x00\x31\xd0\xf3"
b"\x70\x27\x00\x00\x00\x00\x40\x34\x86\x70\x77\x00\x00\x00\x00\x51"
b"\x7c\x56\xb6\xb0"
)
if __name__ == "__main__":
# the above binary_message can be decoded as a Message()
message = Message()
message.decode(binary_message)
# what is the message type?
print("Message type:", message.service_id.get())
# how many data objects does the message have?
print("Number of data objects:", message.number_of_data_objects.get())
# what is the content of the 3rd data object?
print("3rd value:", message.data_objects[2].payload.value.get())
# change the crc at the end and try to decode the message again. What happens?
broken_binary_message = binary_message[:-1] + b"\xb1"
# message.decode(broken_binary_message) # please comment out this line and run this script again
You will also find a example of a crc checksum error in the last example.