Skip to content

add un00 - engineering units #107

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions streaming_data_types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@

__version__ = version

from streaming_data_types.units_un00 import serialise_un00, deserialise_un00

SERIALISERS = {
"an44": serialise_an44,
"ev42": serialise_ev42,
Expand Down Expand Up @@ -71,6 +73,7 @@
"ad00": serialise_ad00,
"da00": serialise_da00,
"ar51": serialise_ar51,
"un00": serialise_un00,
}


Expand Down Expand Up @@ -103,4 +106,5 @@
"ad00": deserialise_ad00,
"da00": deserialise_da00,
"ar51": deserialise_ar51,
"un00": deserialise_un00,
}
80 changes: 80 additions & 0 deletions streaming_data_types/fbschemas/units_un00/Units.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# automatically generated by the FlatBuffers compiler, do not modify

# namespace:

import flatbuffers
from flatbuffers.compat import import_numpy
np = import_numpy()

class Units(object):
__slots__ = ['_tab']

@classmethod
def GetRootAs(cls, buf, offset=0):
n = flatbuffers.encode.Get(flatbuffers.packer.uoffset, buf, offset)
x = Units()
x.Init(buf, n + offset)
return x

@classmethod
def GetRootAsUnits(cls, buf, offset=0):
"""This method is deprecated. Please switch to GetRootAs."""
return cls.GetRootAs(buf, offset)
@classmethod
def UnitsBufferHasIdentifier(cls, buf, offset, size_prefixed=False):
return flatbuffers.util.BufferHasIdentifier(buf, offset, b"\x75\x6E\x30\x30", size_prefixed=size_prefixed)

# Units
def Init(self, buf, pos):
self._tab = flatbuffers.table.Table(buf, pos)

# Units
def SourceName(self):
o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(4))
if o != 0:
return self._tab.String(o + self._tab.Pos)
return None

# Units
def Timestamp(self):
o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(6))
if o != 0:
return self._tab.Get(flatbuffers.number_types.Int64Flags, o + self._tab.Pos)
return 0

# Units
def Units(self):
o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(8))
if o != 0:
return self._tab.String(o + self._tab.Pos)
return None

def UnitsStart(builder):
builder.StartObject(3)

def Start(builder):
UnitsStart(builder)

def UnitsAddSourceName(builder, sourceName):
builder.PrependUOffsetTRelativeSlot(0, flatbuffers.number_types.UOffsetTFlags.py_type(sourceName), 0)

def AddSourceName(builder, sourceName):
UnitsAddSourceName(builder, sourceName)

def UnitsAddTimestamp(builder, timestamp):
builder.PrependInt64Slot(1, timestamp, 0)

def AddTimestamp(builder, timestamp):
UnitsAddTimestamp(builder, timestamp)

def UnitsAddUnits(builder, units):
builder.PrependUOffsetTRelativeSlot(2, flatbuffers.number_types.UOffsetTFlags.py_type(units), 0)

def AddUnits(builder, units):
UnitsAddUnits(builder, units)

def UnitsEnd(builder):
return builder.EndObject()

def End(builder):
return UnitsEnd(builder)
Empty file.
41 changes: 41 additions & 0 deletions streaming_data_types/units_un00.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from collections import namedtuple
from typing import Optional

import flatbuffers

from streaming_data_types.fbschemas.units_un00 import Units
from streaming_data_types.utils import check_schema_identifier

FILE_IDENTIFIER = b"un00"

UnitInfo = namedtuple("UnitInfo", ("source", "timestamp_ns", "units"))


def deserialise_un00(buffer) -> UnitInfo:
check_schema_identifier(buffer, FILE_IDENTIFIER)
units = Units.Units.GetRootAsUnits(buffer, 0)

return UnitInfo(
units.SourceName().decode("utf-8") if units.SourceName() else "",
units.Timestamp(),
units.Units().decode("utf-8") if units.Units() is not None else None,
)


def serialise_un00(
source: str, timestamp_ns: int, units: Optional[str]
) -> bytes:
builder = flatbuffers.Builder(128)
if units is not None:
units_offset = builder.CreateString(units)
source_offset = builder.CreateString(source)

Units.UnitsStart(builder)
Units.UnitsAddSourceName(builder, source_offset)
Units.UnitsAddTimestamp(builder, timestamp_ns)
if units is not None:
Units.UnitsAddUnits(builder, units_offset)
_units = Units.UnitsEnd(builder)

builder.Finish(_units, file_identifier=FILE_IDENTIFIER)
return bytes(builder.Output())
43 changes: 43 additions & 0 deletions tests/test_un00.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import pytest

from streaming_data_types import DESERIALISERS, SERIALISERS
from streaming_data_types.exceptions import WrongSchemaException
from streaming_data_types.units_un00 import deserialise_un00, serialise_un00


class TestSerialisationUn00:
def test_serialises_and_deserialises_un00_message_correctly(self):
"""
Round-trip to check what we serialise is what we get back.
"""
buf = serialise_un00("some_source", 1234567890, "Some unit")
entry = deserialise_un00(buf)

assert entry.source == "some_source"
assert entry.timestamp_ns == 1234567890
assert entry.units == "Some unit"

def test_serialises_and_deserialises_un00_message_correctly_with_none_as_unit(self):
"""
Round-trip to check what we serialise is what we get back with None specified as a unit.
"""
buf = serialise_un00("some_source", 1234567890, None)
entry = deserialise_un00(buf)

assert entry.source == "some_source"
assert entry.timestamp_ns == 1234567890
assert entry.units is None

def test_if_buffer_has_wrong_id_then_throws(self):
buf = serialise_un00("some_source", 1234567890, "Some unit")

# Manually hack the id
buf = bytearray(buf)
buf[4:8] = b"1234"

with pytest.raises(WrongSchemaException):
deserialise_un00(buf)

def test_schema_type_is_in_global_serialisers_list(self):
assert "un00" in SERIALISERS
assert "un00" in DESERIALISERS