summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDiego Elio Pettenò <flameeyes@flameeyes.eu>2018-01-06 22:37:30 +0100
committerDiego Elio Pettenò <flameeyes@flameeyes.eu>2018-01-06 22:37:30 +0100
commit57d719974e9acb40146e41906b3184b2feac7091 (patch)
treeafc4548d4465448d5ad363b9a98b96d23d06f4ab
parentlifescan_binary_protocol: factor out glucose unit mappings. (diff)
downloadglucometerutils-57d719974e9acb40146e41906b3184b2feac7091.tar
glucometerutils-57d719974e9acb40146e41906b3184b2feac7091.tar.gz
glucometerutils-57d719974e9acb40146e41906b3184b2feac7091.tar.bz2
glucometerutils-57d719974e9acb40146e41906b3184b2feac7091.tar.lz
glucometerutils-57d719974e9acb40146e41906b3184b2feac7091.tar.xz
glucometerutils-57d719974e9acb40146e41906b3184b2feac7091.tar.zst
glucometerutils-57d719974e9acb40146e41906b3184b2feac7091.zip
-rw-r--r--glucometerutils/drivers/otverioiq.py194
1 files changed, 194 insertions, 0 deletions
diff --git a/glucometerutils/drivers/otverioiq.py b/glucometerutils/drivers/otverioiq.py
new file mode 100644
index 0000000..9cd9e16
--- /dev/null
+++ b/glucometerutils/drivers/otverioiq.py
@@ -0,0 +1,194 @@
+# -*- coding: utf-8 -*-
+"""Driver for LifeScan OneTouch Verio IQ devices.
+
+Currently work in progress, untested.
+
+Expected device path: /dev/ttyUSB0 or similar serial port device. Device should
+be auto-detected if not provided.
+"""
+
+__author__ = 'Diego Elio Pettenò'
+__email__ = 'flameeyes@flameeyes.eu'
+__copyright__ = 'Copyright © 2018, Diego Elio Pettenò'
+__license__ = 'MIT'
+
+import binascii
+import datetime
+import logging
+
+import construct
+
+from glucometerutils import common
+from glucometerutils.support import construct_extras
+from glucometerutils.support import lifescan
+from glucometerutils.support import lifescan_binary_protocol
+from glucometerutils.support import serial
+
+_COMMAND_SUCCESS = construct.Const(b'\x04\x06')
+
+_VERSION_REQUEST = construct.Const(b'\x04\x0d\x02') # Untested
+
+_VERSION_RESPONSE = construct.Struct(
+ _COMMAND_SUCCESS,
+ 'version' / construct.PascalString(construct.Byte, encoding='ascii'),
+)
+
+_SERIAL_NUMBER_REQUEST = construct.Const(
+ b'\x04\x0b\x00\x02')
+
+_SERIAL_NUMBER_RESPONSE = construct.Struct(
+ _COMMAND_SUCCESS,
+ 'serial_number' / construct.CString(encoding='ascii'),
+)
+
+_READ_RTC_REQUEST = construct.Const(b'\x04\x20\x02')
+
+_READ_RTC_RESPONSE = construct.Struct(
+ _COMMAND_SUCCESS,
+ 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP,
+)
+
+_WRITE_RTC_REQUEST = construct.Struct(
+ construct.Const(b'\x04\x20\x01'),
+ 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP,
+)
+
+_GLUCOSE_UNIT_REQUEST = construct.Const(
+ b'\x04\x09\x02\x02')
+
+
+_GLUCOSE_UNIT_RESPONSE = construct.Struct(
+ _COMMAND_SUCCESS,
+ 'unit' / lifescan_binary_protocol.GLUCOSE_UNIT,
+ construct.Padding(3),
+)
+
+_MEMORY_ERASE_REQUEST = construct.Const(b'\x04\x1a') # Untested
+
+_READ_RECORD_COUNT_REQUEST = construct.Const(b'\x04\x27\x00')
+
+_READ_RECORD_COUNT_RESPONSE = construct.Struct(
+ _COMMAND_SUCCESS,
+ 'count' / construct.Int16ul,
+)
+
+_READ_RECORD_REQUEST = construct.Struct(
+ construct.Const(b'\x04\x21'),
+ 'record_id' / construct.Int16ul,
+)
+
+_READING_RESPONSE = construct.Struct(
+ _COMMAND_SUCCESS,
+ 'timestamp' / construct_extras.Timestamp(construct.Int32ul),
+ 'value' / construct.Int32ul,
+ 'control' / construct.Byte, # Unknown value
+)
+
+
+class Device(serial.SerialDevice):
+ BAUDRATE = 9600
+ DEFAULT_CABLE_ID = '10c4:85a7' # Specific ID for embedded cp210x
+ TIMEOUT = 0.5
+
+ def __init__(self, device):
+ super(Device, self).__init__(device)
+ self.buffered_reader_ = construct.Rebuffered(
+ lifescan_binary_protocol.PACKET, tailcutoff=1024)
+
+ def connect(self):
+ pass
+
+ def disconnect(self):
+ pass
+
+ def _send_packet(self, message):
+ pkt = lifescan_binary_protocol.PACKET.build(
+ {'value': {
+ 'message': request,
+ 'link_control': {}, # Verio does not use link_control.
+ }})
+ logging.debug('sending packet: %s', binascii.hexlify(pkt))
+
+ self.serial_.write(pkt)
+ self.serial_.flush()
+
+ def _read_packet(self):
+ raw_pkt = self.buffered_reader_.parse_stream(self.serial_)
+ logging.debug('received packet: %r', raw_pkt)
+
+ # discard the checksum and copy
+ pkt = raw_pkt.value
+
+ return pkt
+
+ def _send_request(self, request_format, request_obj, response_format):
+ try:
+ request = request_format.build(request_obj)
+ self._send_packet(request)
+
+ response_pkt = self._read_packet()
+
+ return response_format.parse(response_pkt.message)
+ except construct.ConstructError as e:
+ raise lifescan.MalformedCommand(str(e))
+
+ def get_meter_info(self):
+ return common.MeterInfo(
+ 'OneTouch Verio IQ glucometer',
+ serial_number=self.get_serial_number(),
+ version_info=(
+ 'Software version: ' + self.get_version(),),
+ native_unit=self.get_glucose_unit())
+
+ def get_version(self):
+ response = self._send_request(
+ _VERSION_REQUEST, None, _VERSION_RESPONSE)
+
+ return response.version
+
+ def get_serial_number(self):
+ response = self._send_request(
+ _SERIAL_NUMBER_REQUEST, None, _SERIAL_NUMBER_RESPONSE)
+
+ return response.serial_number
+
+ def get_datetime(self):
+ response = self._send_request(
+ _READ_RTC_REQUEST, _READ_RTC_RESPONSE)
+
+ return response.timestamp
+
+ def set_datetime(self, date=datetime.datetime.now()):
+ response = self._send_request(
+ _WRITE_RTC_REQUEST, {
+ 'timestamp': date,
+ }, _READ_RTC_RESPONSE)
+
+ return response.timestamp
+
+ def zero_log(self):
+ self._send_request(_MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS)
+
+ def get_glucose_unit(self):
+ response = self._send_request(
+ _GLUCOSE_UNIT_REQUEST, None, _GLUCOSE_UNIT_RESPONSE)
+
+ return response.unit
+
+ def _get_reading_count(self):
+ response = self._send_request(
+ _READ_RECORD_REQUEST, {'record_id': _INVALID_RECORD},
+ _READING_COUNT_RESPONSE)
+ return response.count
+
+ def _get_reading(self, record_id):
+ response = self._send_request(
+ _READ_RECORD_REQUEST, {'record_id': record_id}, _READING_RESPONSE)
+
+ return common.GlucoseReading(
+ response.timestamp, float(response.value))
+
+ def get_readings(self):
+ record_count = self._get_reading_count()
+ for record_id in range(record_count):
+ yield self._get_reading(record_id)