From cfbf51d6a090626accfc8437f5bd586112178a34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diego=20Elio=20Petten=C3=B2?= Date: Mon, 1 Jan 2018 11:51:23 +0000 Subject: otultraeasy: factor out the construct Timestamp implementation. This adds tests to ensure this works right in the general case, so that it can be used with different parameters. The adapter will be reused in the otverio2015 driver. --- glucometerutils/drivers/otultraeasy.py | 19 +++----- glucometerutils/support/construct_extras.py | 33 +++++++++++++ test-requirements.txt | 1 + test/test_construct_extras.py | 75 +++++++++++++++++++++++++++++ 4 files changed, 115 insertions(+), 13 deletions(-) create mode 100644 glucometerutils/support/construct_extras.py create mode 100644 test/test_construct_extras.py diff --git a/glucometerutils/drivers/otultraeasy.py b/glucometerutils/drivers/otultraeasy.py index 579c07c..0f95e80 100644 --- a/glucometerutils/drivers/otultraeasy.py +++ b/glucometerutils/drivers/otultraeasy.py @@ -25,18 +25,13 @@ import logging import construct from glucometerutils import common +from glucometerutils.support import construct_extras from glucometerutils.support import lifescan from glucometerutils.support import serial _INVALID_RECORD = 501 -_EPOCH = datetime.datetime.utcfromtimestamp(0) - -def datetime_to_timestamp(date): - delta = date - _EPOCH - return int(delta.total_seconds()) - _PACKET = construct.Struct( construct.RawCopy( @@ -63,10 +58,6 @@ _PACKET = construct.Struct( ) _COMMAND_SUCCESS = construct.Const(b'\x05\x06') -_TIMESTAMP_ADAPTER = construct.ExprAdapter( - construct.Int32ul, - encoder=lambda obj, ctx: datetime_to_timestamp(obj), - decoder=lambda obj, ctx: datetime.datetime.fromtimestamp(obj)) _VERSION_REQUEST = construct.Const(b'\x05\x0d\x02') @@ -86,12 +77,14 @@ _SERIAL_NUMBER_RESPONSE = construct.Struct( _DATETIME_REQUEST = construct.Struct( construct.Const(b'\x05\x20'), # 0x20 is the datetime 'request_type' / construct.Enum(construct.Byte, write=0x01, read=0x02), - 'timestamp' / construct.Default(_TIMESTAMP_ADAPTER, _EPOCH), + 'timestamp' / construct.Default( + construct_extras.Timestamp(construct.Int32ul), + datetime.datetime(1970, 1, 1, 0, 0)), ) _DATETIME_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'timestamp' / _TIMESTAMP_ADAPTER, + 'timestamp' / construct_extras.Timestamp(construct.Int32ul), ) _GLUCOSE_UNIT_REQUEST = construct.Const( @@ -123,7 +116,7 @@ _READ_RECORD_REQUEST = construct.Struct( _READING_RESPONSE = construct.Struct( _COMMAND_SUCCESS, - 'timestamp' / _TIMESTAMP_ADAPTER, + 'timestamp' / construct_extras.Timestamp(construct.Int32ul), 'value' / construct.Int32ul, ) diff --git a/glucometerutils/support/construct_extras.py b/glucometerutils/support/construct_extras.py new file mode 100644 index 0000000..cb42105 --- /dev/null +++ b/glucometerutils/support/construct_extras.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +"""Extra classes for Construct.""" + +__author__ = 'Diego Elio Pettenò' +__email__ = 'flameeyes@flameeyes.eu' +__copyright__ = 'Copyright © 2018, Diego Elio Pettenò' +__license__ = 'MIT' + +import datetime + +import construct + +class Timestamp(construct.Adapter): + """Adapter for converting datetime object into timestamps. + + Take two parameters: the subcon object to output the resulting timestamp as, + and an optional epoch offset to the UNIX Epoch. + + """ + __slots__ = ["epoch"] + + def __init__(self, subcon, epoch=0): + super(Timestamp, self).__init__(subcon) + self.epoch = epoch + + def _encode(self, obj, context): + assert isinstance(obj, datetime.datetime) + epoch_date = datetime.datetime.utcfromtimestamp(self.epoch) + delta = obj - epoch_date + return int(delta.total_seconds()) + + def _decode(self, obj, context): + return datetime.datetime.utcfromtimestamp(obj + self.epoch) diff --git a/test-requirements.txt b/test-requirements.txt index 9f7c85b..29ac573 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,4 +1,5 @@ absl-py +construct pytest pytest-timeout pyserial diff --git a/test/test_construct_extras.py b/test/test_construct_extras.py new file mode 100644 index 0000000..faccabf --- /dev/null +++ b/test/test_construct_extras.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- +"""Tests for the common routines.""" + +__author__ = 'Diego Elio Pettenò' +__email__ = 'flameeyes@flameeyes.eu' +__copyright__ = 'Copyright © 2018, Diego Elio Pettenò' +__license__ = 'MIT' + +import datetime +import os +import sys +import unittest + +import construct + +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from glucometerutils.support import construct_extras + + +_TEST_DATE1 = datetime.datetime(1970, 1, 2, 0, 0) +_TEST_DATE2 = datetime.datetime(1971, 1, 1, 0, 0) +_TEST_DATE3 = datetime.datetime(1970, 1, 1, 0, 0) + +_NEW_EPOCH = 31536000 # datetime.datetime(1971, 1, 1, 0, 0) + +class TestTimestamp(unittest.TestCase): + + def test_build_unix_epoch(self): + self.assertEqual( + construct_extras.Timestamp(construct.Int32ul).build(_TEST_DATE1), + b'\x80\x51\x01\x00') + + def test_parse_unix_epoch(self): + self.assertEqual( + construct_extras.Timestamp(construct.Int32ul).parse( + b'\x803\xe1\x01'), + _TEST_DATE2) + + def test_build_custom_epoch(self): + self.assertEqual( + construct_extras.Timestamp( + construct.Int32ul, epoch=_NEW_EPOCH).build(_TEST_DATE2), + b'\x00\x00\x00\x00') + + def test_parse_custom_epoch(self): + self.assertEqual( + construct_extras.Timestamp( + construct.Int32ul, epoch=_NEW_EPOCH).parse( + b'\x00\x00\x00\x00'), + _TEST_DATE2) + + def test_build_custom_epoch_negative_failure(self): + with self.assertRaises(construct.core.FieldError): + construct_extras.Timestamp( + construct.Int32ul, epoch=_NEW_EPOCH).build(_TEST_DATE1) + + def test_build_custom_epoch_negative_success(self): + self.assertEqual( + construct_extras.Timestamp( + construct.Int32sl, epoch=_NEW_EPOCH).build(_TEST_DATE1), + b'\x00\x1e\x20\xfe') + + def test_build_varint(self): + self.assertEqual( + construct_extras.Timestamp(construct.VarInt).build(_TEST_DATE3), + b'\x00') + + def test_invalid_value(self): + with self.assertRaises(AssertionError): + construct_extras.Timestamp(construct.Int32ul).build('foo') + + +if __name__ == '__main__': + unittest.main() -- cgit v1.2.3