From e6ba344dd2f3176d0c7edd6489cfc5f405b624ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diego=20Elio=20Petten=C3=B2?= Date: Sat, 27 Mar 2021 18:03:11 +0000 Subject: glucomenareo: new driver added. This driver supports GlucoMen areo by A. Menarini Diagnostics. It possibly works with the GlucoMen Areo 2K but it is untested with it, and does not support reporting Ketone readings. --- glucometerutils/drivers/glucomenareo.py | 222 ++++++++++++++++++++++++++++++++ 1 file changed, 222 insertions(+) create mode 100644 glucometerutils/drivers/glucomenareo.py (limited to 'glucometerutils/drivers') diff --git a/glucometerutils/drivers/glucomenareo.py b/glucometerutils/drivers/glucomenareo.py new file mode 100644 index 0000000..ca903e0 --- /dev/null +++ b/glucometerutils/drivers/glucomenareo.py @@ -0,0 +1,222 @@ +# SPDX-FileCopyrightText: © 2021 The glucometerutils Authors +# SPDX-License-Identifier: MIT + +"""Driver for GlucoMen Areo devices. + +Supported features: + - get readings, including pre-/post-meal notes and comments; + - set date and time. + +Expected device path: /dev/ttyUSB0 or similar serial port device. +""" + +import dataclasses +import datetime +import logging +from typing import Generator, Iterator, List, Mapping, NoReturn, Sequence, Union + +import crcmod.predefined +import serial as pyserial + +from glucometerutils import common, driver, exceptions +from glucometerutils.support import serial + +_crc8_maxim = crcmod.predefined.mkPredefinedCrcFun("crc-8-maxim") + +_CMD_GET_INFO = b"\xa2" + +_CMD_SET_DATETIME = b"\xc2\xa1" + +_CMD_GET_READINGS = b"\x80" + +_UNITS_MAPPING = { + "mmol/L": common.Unit.MMOL_L, + "mg/dL": common.Unit.MG_DL, +} + +_MARKINGS_MAPPING: Mapping[str, Union[str, common.Meal]] = { + "00": "", + "01": "Check Mark", + "02": common.Meal.BEFORE, + "04": common.Meal.AFTER, + "08": "Exercise", +} + + +@dataclasses.dataclass(frozen=True) +class _Reading: + reading_type: str + value_string: str + unit_string: str + marking_string: str + date: str + time: str + + @property + def value(self) -> float: + return float(self.value_string) + + @property + def unit(self) -> common.Unit: + return _UNITS_MAPPING[self.unit_string] + + @property + def _marking(self) -> Union[str, common.Meal]: + return _MARKINGS_MAPPING[self.marking_string] + + @property + def meal(self) -> common.Meal: + if isinstance(self._marking, common.Meal): + return self._marking + else: + return common.Meal.NONE + + @property + def comment(self) -> str: + if not isinstance(self._marking, common.Meal): + return self._marking + else: + return "" + + @property + def timestamp(self) -> datetime.datetime: + return datetime.datetime.strptime(f"{self.date},{self.time}", "%y%m%d,%H%M") + + +class Device(serial.SerialDevice, driver.GlucometerDevice): + BAUDRATE = 9600 + PARITY = pyserial.PARITY_ODD + DEFAULT_CABLE_ID = "10c4:ea60" # Generic cable. + + def connect(self) -> None: # pylint: disable=no-self-use + pass + + def disconnect(self) -> None: # pylint: disable=no-self-use + pass + + def _readline(self) -> bytes: + line = self.serial_.readline() + logging.debug(f"Read line: {line!r}") + return line + + def _read_text_response(self) -> Sequence[bytes]: + all_lines: List[bytes] = [] + + while True: + line = self._readline() + if not line.endswith(b"\r\n"): + raise exceptions.InvalidResponse(f"Corrupted response line: {line!r}") + all_lines.append(line) + + if line == b"]\r\n": + break + + if all_lines[0] != b"[\r\n": + raise exceptions.InvalidResponse( + f"Unexpected first response line: {all_lines!r}" + ) + + wire_checksum = int(all_lines[-2][:-2], base=16) + calculated_checksum = _crc8_maxim(b"".join(all_lines[:-2])) + + if wire_checksum != calculated_checksum: + raise exceptions.InvalidChecksum(wire_checksum, calculated_checksum) + + return [line[:-2] for line in all_lines[1:-2]] + + def _send_command(self, command: bytes) -> None: + logging.debug(f"sending command: {command!r}") + self.serial_.write(command) + + def _get_meter_info(self) -> Sequence[str]: + self._send_command(_CMD_GET_INFO) + get_info_response = list(self._read_text_response()) + if len(get_info_response) != 1: + raise exceptions.InvalidResponse( + f"Multiple lines returned, when one expected: {get_info_response!r}" + ) + info = get_info_response[0].split(b",") + if len(info) != 5: + raise exceptions.InvalidResponse( + f"Incomplete information response received: {get_info_response!r}" + ) + + return [component.decode("ascii") for component in info] + + def get_serial_number(self) -> str: + return self._get_meter_info()[3].strip() + + def get_version_info(self) -> Sequence[str]: + info = self._get_meter_info() + return (info[4].strip(),) + + def get_meter_info(self) -> common.MeterInfo: + return common.MeterInfo( + "GlucoMen areo", + serial_number=self.get_serial_number(), + version_info=self.get_version_info(), + native_unit=self.get_glucose_unit(), + ) + + def get_datetime(self) -> NoReturn: # pylint: disable=no-self-use + raise NotImplementedError + + def zero_log(self) -> NoReturn: + raise NotImplementedError + + def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime: + datetime_representation = date.strftime("%y%m%d%H%M").encode("ascii") + command_string = b"[\r\n" + datetime_representation + b"\r\n" + + checksum = _crc8_maxim(command_string) + assert 0 <= checksum <= 255 + + command_string += f"{checksum:02X}".encode("ascii") + b"\r\n]\r\n" + + command = _CMD_SET_DATETIME + command_string + self._send_command(command) + response = self.serial_.read() + if response == b"P": + return date + else: + raise exceptions.InvalidResponse(f"Unexpected response {response!r}.") + + def _get_raw_readings(self) -> Iterator[_Reading]: + self._send_command(_CMD_GET_READINGS) + response = list(self._read_text_response()) + if response[0] == b"\x90\x3d": + logging.debug("No readings available on the meter.") + return + + for reading in response: + yield _Reading(*reading.decode("ascii").split(",")) + + def get_glucose_unit(self) -> common.Unit: + for reading in self._get_raw_readings(): + if reading.reading_type != "Glu": + continue + return reading.unit + else: + logging.debug("No readings in the device, cannot guess glucose unit.") + return common.Unit.MG_DL + + def get_readings(self) -> Generator[common.AnyReading, None, None]: + for reading in self._get_raw_readings(): + if reading.reading_type != "Glu": + logging.warning( + f"Unsupported reading type {reading.reading_type!r}. Please file an issue at https://github.com/glucometers-tech/glucometerutils/issues" + ) + continue + + mgdl_value = common.convert_glucose_unit( + reading.value, + from_unit=reading.unit, + to_unit=common.Unit.MG_DL, + ) + + yield common.GlucoseReading( + reading.timestamp, + mgdl_value, + meal=reading.meal, + comment=reading.comment, + ) -- cgit v1.2.3