summaryrefslogtreecommitdiffstats
path: root/glucometerutils/drivers/glucomenareo.py
diff options
context:
space:
mode:
Diffstat (limited to 'glucometerutils/drivers/glucomenareo.py')
-rw-r--r--glucometerutils/drivers/glucomenareo.py222
1 files changed, 222 insertions, 0 deletions
diff --git a/glucometerutils/drivers/glucomenareo.py b/glucometerutils/drivers/glucomenareo.py
new file mode 100644
index 0000000..ca903e0
--- /dev/null
+++ b/glucometerutils/drivers/glucomenareo.py
@@ -0,0 +1,222 @@
+# SPDX-FileCopyrightText: © 2021 The glucometerutils Authors
+# SPDX-License-Identifier: MIT
+
+"""Driver for GlucoMen Areo devices.
+
+Supported features:
+ - get readings, including pre-/post-meal notes and comments;
+ - set date and time.
+
+Expected device path: /dev/ttyUSB0 or similar serial port device.
+"""
+
+import dataclasses
+import datetime
+import logging
+from typing import Generator, Iterator, List, Mapping, NoReturn, Sequence, Union
+
+import crcmod.predefined
+import serial as pyserial
+
+from glucometerutils import common, driver, exceptions
+from glucometerutils.support import serial
+
+_crc8_maxim = crcmod.predefined.mkPredefinedCrcFun("crc-8-maxim")
+
+_CMD_GET_INFO = b"\xa2"
+
+_CMD_SET_DATETIME = b"\xc2\xa1"
+
+_CMD_GET_READINGS = b"\x80"
+
+_UNITS_MAPPING = {
+ "mmol/L": common.Unit.MMOL_L,
+ "mg/dL": common.Unit.MG_DL,
+}
+
+_MARKINGS_MAPPING: Mapping[str, Union[str, common.Meal]] = {
+ "00": "",
+ "01": "Check Mark",
+ "02": common.Meal.BEFORE,
+ "04": common.Meal.AFTER,
+ "08": "Exercise",
+}
+
+
+@dataclasses.dataclass(frozen=True)
+class _Reading:
+ reading_type: str
+ value_string: str
+ unit_string: str
+ marking_string: str
+ date: str
+ time: str
+
+ @property
+ def value(self) -> float:
+ return float(self.value_string)
+
+ @property
+ def unit(self) -> common.Unit:
+ return _UNITS_MAPPING[self.unit_string]
+
+ @property
+ def _marking(self) -> Union[str, common.Meal]:
+ return _MARKINGS_MAPPING[self.marking_string]
+
+ @property
+ def meal(self) -> common.Meal:
+ if isinstance(self._marking, common.Meal):
+ return self._marking
+ else:
+ return common.Meal.NONE
+
+ @property
+ def comment(self) -> str:
+ if not isinstance(self._marking, common.Meal):
+ return self._marking
+ else:
+ return ""
+
+ @property
+ def timestamp(self) -> datetime.datetime:
+ return datetime.datetime.strptime(f"{self.date},{self.time}", "%y%m%d,%H%M")
+
+
+class Device(serial.SerialDevice, driver.GlucometerDevice):
+ BAUDRATE = 9600
+ PARITY = pyserial.PARITY_ODD
+ DEFAULT_CABLE_ID = "10c4:ea60" # Generic cable.
+
+ def connect(self) -> None: # pylint: disable=no-self-use
+ pass
+
+ def disconnect(self) -> None: # pylint: disable=no-self-use
+ pass
+
+ def _readline(self) -> bytes:
+ line = self.serial_.readline()
+ logging.debug(f"Read line: {line!r}")
+ return line
+
+ def _read_text_response(self) -> Sequence[bytes]:
+ all_lines: List[bytes] = []
+
+ while True:
+ line = self._readline()
+ if not line.endswith(b"\r\n"):
+ raise exceptions.InvalidResponse(f"Corrupted response line: {line!r}")
+ all_lines.append(line)
+
+ if line == b"]\r\n":
+ break
+
+ if all_lines[0] != b"[\r\n":
+ raise exceptions.InvalidResponse(
+ f"Unexpected first response line: {all_lines!r}"
+ )
+
+ wire_checksum = int(all_lines[-2][:-2], base=16)
+ calculated_checksum = _crc8_maxim(b"".join(all_lines[:-2]))
+
+ if wire_checksum != calculated_checksum:
+ raise exceptions.InvalidChecksum(wire_checksum, calculated_checksum)
+
+ return [line[:-2] for line in all_lines[1:-2]]
+
+ def _send_command(self, command: bytes) -> None:
+ logging.debug(f"sending command: {command!r}")
+ self.serial_.write(command)
+
+ def _get_meter_info(self) -> Sequence[str]:
+ self._send_command(_CMD_GET_INFO)
+ get_info_response = list(self._read_text_response())
+ if len(get_info_response) != 1:
+ raise exceptions.InvalidResponse(
+ f"Multiple lines returned, when one expected: {get_info_response!r}"
+ )
+ info = get_info_response[0].split(b",")
+ if len(info) != 5:
+ raise exceptions.InvalidResponse(
+ f"Incomplete information response received: {get_info_response!r}"
+ )
+
+ return [component.decode("ascii") for component in info]
+
+ def get_serial_number(self) -> str:
+ return self._get_meter_info()[3].strip()
+
+ def get_version_info(self) -> Sequence[str]:
+ info = self._get_meter_info()
+ return (info[4].strip(),)
+
+ def get_meter_info(self) -> common.MeterInfo:
+ return common.MeterInfo(
+ "GlucoMen areo",
+ serial_number=self.get_serial_number(),
+ version_info=self.get_version_info(),
+ native_unit=self.get_glucose_unit(),
+ )
+
+ def get_datetime(self) -> NoReturn: # pylint: disable=no-self-use
+ raise NotImplementedError
+
+ def zero_log(self) -> NoReturn:
+ raise NotImplementedError
+
+ def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime:
+ datetime_representation = date.strftime("%y%m%d%H%M").encode("ascii")
+ command_string = b"[\r\n" + datetime_representation + b"\r\n"
+
+ checksum = _crc8_maxim(command_string)
+ assert 0 <= checksum <= 255
+
+ command_string += f"{checksum:02X}".encode("ascii") + b"\r\n]\r\n"
+
+ command = _CMD_SET_DATETIME + command_string
+ self._send_command(command)
+ response = self.serial_.read()
+ if response == b"P":
+ return date
+ else:
+ raise exceptions.InvalidResponse(f"Unexpected response {response!r}.")
+
+ def _get_raw_readings(self) -> Iterator[_Reading]:
+ self._send_command(_CMD_GET_READINGS)
+ response = list(self._read_text_response())
+ if response[0] == b"\x90\x3d":
+ logging.debug("No readings available on the meter.")
+ return
+
+ for reading in response:
+ yield _Reading(*reading.decode("ascii").split(","))
+
+ def get_glucose_unit(self) -> common.Unit:
+ for reading in self._get_raw_readings():
+ if reading.reading_type != "Glu":
+ continue
+ return reading.unit
+ else:
+ logging.debug("No readings in the device, cannot guess glucose unit.")
+ return common.Unit.MG_DL
+
+ def get_readings(self) -> Generator[common.AnyReading, None, None]:
+ for reading in self._get_raw_readings():
+ if reading.reading_type != "Glu":
+ logging.warning(
+ f"Unsupported reading type {reading.reading_type!r}. Please file an issue at https://github.com/glucometers-tech/glucometerutils/issues"
+ )
+ continue
+
+ mgdl_value = common.convert_glucose_unit(
+ reading.value,
+ from_unit=reading.unit,
+ to_unit=common.Unit.MG_DL,
+ )
+
+ yield common.GlucoseReading(
+ reading.timestamp,
+ mgdl_value,
+ meal=reading.meal,
+ comment=reading.comment,
+ )