diff options
-rw-r--r-- | glucometerutils/drivers/otultraeasy.py | 19 | ||||
-rw-r--r-- | glucometerutils/support/construct_extras.py | 33 | ||||
-rw-r--r-- | test-requirements.txt | 1 | ||||
-rw-r--r-- | test/test_construct_extras.py | 75 |
4 files changed, 115 insertions, 13 deletions
diff --git a/glucometerutils/drivers/otultraeasy.py b/glucometerutils/drivers/otultraeasy.py index 579c07c..0f95e80 100644 --- a/glucometerutils/drivers/otultraeasy.py +++ b/glucometerutils/drivers/otultraeasy.py @@ -25,18 +25,13 @@ import logging import construct from glucometerutils import common +from glucometerutils.support import construct_extras from glucometerutils.support import lifescan from glucometerutils.support import serial _INVALID_RECORD = 501 -_EPOCH = datetime.datetime.utcfromtimestamp(0) - -def datetime_to_timestamp(date): - delta = date - _EPOCH - return int(delta.total_seconds()) - _PACKET = construct.Struct( construct.RawCopy( @@ -63,10 +58,6 @@ _PACKET = construct.Struct( ) _COMMAND_SUCCESS = construct.Const(b'\x05\x06') -_TIMESTAMP_ADAPTER = construct.ExprAdapter( - construct.Int32ul, - encoder=lambda obj, ctx: datetime_to_timestamp(obj), - decoder=lambda obj, ctx: datetime.datetime.fromtimestamp(obj)) _VERSION_REQUEST = construct.Const(b'\x05\x0d\x02') @@ -86,12 +77,14 @@ _SERIAL_NUMBER_RESPONSE = construct.Struct( _DATETIME_REQUEST = construct.Struct( construct.Const(b'\x05\x20'), # 0x20 is the datetime 'request_type' / construct.Enum(construct.Byte, write=0x01, read=0x02), - 'timestamp' / construct.Default(_TIMESTAMP_ADAPTER, _EPOCH), + 'timestamp' / construct.Default( + construct_extras.Timestamp(construct.Int32ul), + datetime.datetime(1970, 1, 1, 0, 0)), ) _DATETIME_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'timestamp' / _TIMESTAMP_ADAPTER, + 'timestamp' / construct_extras.Timestamp(construct.Int32ul), ) _GLUCOSE_UNIT_REQUEST = construct.Const( @@ -123,7 +116,7 @@ _READ_RECORD_REQUEST = construct.Struct( _READING_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'timestamp' / _TIMESTAMP_ADAPTER, + 'timestamp' / construct_extras.Timestamp(construct.Int32ul), 'value' / construct.Int32ul, ) diff --git a/glucometerutils/support/construct_extras.py b/glucometerutils/support/construct_extras.py new file mode 100644 index 0000000..cb42105 --- /dev/null +++ b/glucometerutils/support/construct_extras.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +"""Extra classes for Construct.""" + +__author__ = 'Diego Elio Pettenò' +__email__ = 'flameeyes@flameeyes.eu' +__copyright__ = 'Copyright © 2018, Diego Elio Pettenò' +__license__ = 'MIT' + +import datetime + +import construct + +class Timestamp(construct.Adapter): + """Adapter for converting datetime object into timestamps. + + Take two parameters: the subcon object to output the resulting timestamp as, + and an optional epoch offset to the UNIX Epoch. + + """ + __slots__ = ["epoch"] + + def __init__(self, subcon, epoch=0): + super(Timestamp, self).__init__(subcon) + self.epoch = epoch + + def _encode(self, obj, context): + assert isinstance(obj, datetime.datetime) + epoch_date = datetime.datetime.utcfromtimestamp(self.epoch) + delta = obj - epoch_date + return int(delta.total_seconds()) + + def _decode(self, obj, context): + return datetime.datetime.utcfromtimestamp(obj + self.epoch) diff --git a/test-requirements.txt b/test-requirements.txt index 9f7c85b..29ac573 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,4 +1,5 @@ absl-py +construct pytest pytest-timeout pyserial diff --git a/test/test_construct_extras.py b/test/test_construct_extras.py new file mode 100644 index 0000000..faccabf --- /dev/null +++ b/test/test_construct_extras.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- +"""Tests for the common routines.""" + +__author__ = 'Diego Elio Pettenò' +__email__ = 'flameeyes@flameeyes.eu' +__copyright__ = 'Copyright © 2018, Diego Elio Pettenò' +__license__ = 'MIT' + +import datetime +import os +import sys +import unittest + +import construct + +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from glucometerutils.support import construct_extras + + +_TEST_DATE1 = datetime.datetime(1970, 1, 2, 0, 0) +_TEST_DATE2 = datetime.datetime(1971, 1, 1, 0, 0) +_TEST_DATE3 = datetime.datetime(1970, 1, 1, 0, 0) + +_NEW_EPOCH = 31536000 # datetime.datetime(1971, 1, 1, 0, 0) + +class TestTimestamp(unittest.TestCase): + + def test_build_unix_epoch(self): + self.assertEqual( + construct_extras.Timestamp(construct.Int32ul).build(_TEST_DATE1), + b'\x80\x51\x01\x00') + + def test_parse_unix_epoch(self): + self.assertEqual( + construct_extras.Timestamp(construct.Int32ul).parse( + b'\x803\xe1\x01'), + _TEST_DATE2) + + def test_build_custom_epoch(self): + self.assertEqual( + construct_extras.Timestamp( + construct.Int32ul, epoch=_NEW_EPOCH).build(_TEST_DATE2), + b'\x00\x00\x00\x00') + + def test_parse_custom_epoch(self): + self.assertEqual( + construct_extras.Timestamp( + construct.Int32ul, epoch=_NEW_EPOCH).parse( + b'\x00\x00\x00\x00'), + _TEST_DATE2) + + def test_build_custom_epoch_negative_failure(self): + with self.assertRaises(construct.core.FieldError): + construct_extras.Timestamp( + construct.Int32ul, epoch=_NEW_EPOCH).build(_TEST_DATE1) + + def test_build_custom_epoch_negative_success(self): + self.assertEqual( + construct_extras.Timestamp( + construct.Int32sl, epoch=_NEW_EPOCH).build(_TEST_DATE1), + b'\x00\x1e\x20\xfe') + + def test_build_varint(self): + self.assertEqual( + construct_extras.Timestamp(construct.VarInt).build(_TEST_DATE3), + b'\x00') + + def test_invalid_value(self): + with self.assertRaises(AssertionError): + construct_extras.Timestamp(construct.Int32ul).build('foo') + + +if __name__ == '__main__': + unittest.main() |