summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDiego Elio Pettenò <flameeyes@flameeyes.eu>2018-01-01 13:46:48 +0100
committerDiego Elio Pettenò <flameeyes@flameeyes.eu>2018-01-01 13:46:48 +0100
commit7f6a3eeca6aff196e68215d99bb6d48d57d069c3 (patch)
tree99839fbaba25bb087762190eaac19ff314725628
parentotultraeasy: factor out the construct Timestamp implementation. (diff)
downloadglucometerutils-7f6a3eeca6aff196e68215d99bb6d48d57d069c3.tar
glucometerutils-7f6a3eeca6aff196e68215d99bb6d48d57d069c3.tar.gz
glucometerutils-7f6a3eeca6aff196e68215d99bb6d48d57d069c3.tar.bz2
glucometerutils-7f6a3eeca6aff196e68215d99bb6d48d57d069c3.tar.lz
glucometerutils-7f6a3eeca6aff196e68215d99bb6d48d57d069c3.tar.xz
glucometerutils-7f6a3eeca6aff196e68215d99bb6d48d57d069c3.tar.zst
glucometerutils-7f6a3eeca6aff196e68215d99bb6d48d57d069c3.zip
-rw-r--r--README30
-rw-r--r--glucometerutils/drivers/otverio2015.py436
-rw-r--r--setup.py2
3 files changed, 232 insertions, 236 deletions
diff --git a/README b/README
index 947348c..2e7a460 100644
--- a/README
+++ b/README
@@ -32,21 +32,21 @@ $ . glucometerutils-venv/bin/activate
Please see the following table for the driver for each device that is known and
supported.
-| Manufacturer | Model Name | Driver | Dependencies |
-| --- | --- | --- | --- |
-| LifeScan | OneTouch Ultra 2 | `otultra2` | [pyserial] |
-| LifeScan | OneTouch Ultra Easy | `otultraeasy` | [construct] [pyserial] |
-| LifeScan | OneTouch Ultra Mini | `otultraeasy` | [construct] [pyserial] |
-| LifeScan | OneTouch Verio (USB) | `otverio2015` | [python-scsi] |
-| LifeScan | OneTouch Select Plus | `otverio2015` | [python-scsi] |
-| Abbott | FreeStyle InsuLinx† | `fsinsulinx` | [construct] [hidapi]‡ |
-| Abbott | FreeStyle Libre | `fslibre` | [construct] [hidapi]‡ |
-| Abbott | FreeStyle Optium | `fsoptium` | [pyserial] |
-| Abbott | FreeStyle Precision Neo | `fsprecisionneo` | [construct] [hidapi]‡ |
-| Abbott | FreeStyle Optium Neo | `fsprecisionneo` | [construct] [hidapi]‡ |
-| Abbott | FreeStyle Optium Neo H | `fsprecisionneo` | [construct] [hidapi]‡ |
-| Roche | Accu-Chek Mobile | `accuchek_reports` | |
-| SD Biosensor | SD CodeFree | `sdcodefree` | [construct] [pyserial] |
+| Manufacturer | Model Name | Driver | Dependencies |
+| --- | --- | --- | --- |
+| LifeScan | OneTouch Ultra 2 | `otultra2` | [pyserial] |
+| LifeScan | OneTouch Ultra Easy | `otultraeasy` | [construct] [pyserial] |
+| LifeScan | OneTouch Ultra Mini | `otultraeasy` | [construct] [pyserial] |
+| LifeScan | OneTouch Verio (USB) | `otverio2015` | [construct] [python-scsi] |
+| LifeScan | OneTouch Select Plus | `otverio2015` | [construct] [python-scsi] |
+| Abbott | FreeStyle InsuLinx† | `fsinsulinx` | [construct] [hidapi]‡ |
+| Abbott | FreeStyle Libre | `fslibre` | [construct] [hidapi]‡ |
+| Abbott | FreeStyle Optium | `fsoptium` | [pyserial] |
+| Abbott | FreeStyle Precision Neo | `fsprecisionneo` | [construct] [hidapi]‡ |
+| Abbott | FreeStyle Optium Neo | `fsprecisionneo` | [construct] [hidapi]‡ |
+| Abbott | FreeStyle Optium Neo H | `fsprecisionneo` | [construct] [hidapi]‡ |
+| Roche | Accu-Chek Mobile | `accuchek_reports` | |
+| SD Biosensor | SD CodeFree | `sdcodefree` | [construct] [pyserial] |
† Untested.
‡ Optional dependency on Linux; required on other operating systems.
diff --git a/glucometerutils/drivers/otverio2015.py b/glucometerutils/drivers/otverio2015.py
index 5925a58..8d87409 100644
--- a/glucometerutils/drivers/otverio2015.py
+++ b/glucometerutils/drivers/otverio2015.py
@@ -28,246 +28,242 @@ __license__ = 'MIT'
import binascii
import datetime
import logging
-import struct
+import construct
from pyscsi.pyscsi.scsi import SCSI
from pyscsi.pyscsi.scsi_device import SCSIDevice
from glucometerutils import common
from glucometerutils import exceptions
+from glucometerutils.support import construct_extras
from glucometerutils.support import lifescan
-# Match the same values in the otultraeasy driver.
-_STX = 0x02
-_ETX = 0x03
-
# This device uses SCSI blocks as registers.
_REGISTER_SIZE = 512
-_STRUCT_PREAMBLE = struct.Struct('<BH')
-_STRUCT_CODA = _STRUCT_PREAMBLE # they are actually the same, mirrored.
-
-_STRUCT_UINT16 = struct.Struct('<H')
-_STRUCT_UINT32 = struct.Struct('<I')
-
-_STRUCT_CHECKSUM = _STRUCT_UINT16
-_STRUCT_TIMESTAMP = _STRUCT_UINT32
-_STRUCT_RECORDID = _STRUCT_UINT16
-_STRUCT_READING = _STRUCT_UINT32
-_STRUCT_RECORD = struct.Struct('<BBHBHIHBBBBB')
-
-_QUERY_REQUEST = b'\x04\xe6\x02'
-_QUERY_KEY_SERIAL = b'\x00'
-_QUERY_KEY_MODEL = b'\x01'
-_QUERY_KEY_SOFTWARE = b'\x02'
-
-_READ_PARAMETER_REQUEST = b'\x04'
-_PARAMETER_KEY_UNIT = b'\x04'
-
-_READ_RTC_REQUEST = b'\x04\x20\x02'
-_WRITE_RTC_REQUEST = b'\x04\x20\x01'
-# All timestamp reported by this device are seconds since this date.
-_EPOCH_BASE = 946684800 # 2010-01-01 00:00
-
-_READ_RECORD_COUNT_REQUEST = b'\x04\x27\x00'
-_READ_RECORD_REQUEST_PREFIX = b'\x04\x31\x02'
-_READ_RECORD_REQUEST_SUFFIX = b'\x00'
-
-_MEMORY_ERASE_REQUEST = b'\x04\x1a'
-
-_MEAL_CODES = {
- 0x00: common.Meal.NONE,
- 0x01: common.Meal.BEFORE,
- 0x02: common.Meal.AFTER,
+_PACKET = construct.Padded(
+ 512, construct.Struct(
+ construct.RawCopy(
+ construct.Embedded(
+ construct.Struct(
+ construct.Const(b'\x02'), # stx
+ 'length' / construct.Rebuild(
+ construct.Int16ul, lambda ctx: len(ctx.message) + 6),
+ 'message' / construct.Bytes(
+ length=lambda ctx: ctx.length - 6),
+ construct.Const(b'\x03'), # etx
+ ),
+ ),
+ ),
+ 'checksum' / construct.Checksum(
+ construct.Int16ul, lifescan.crc_ccitt, construct.this.data),
+ ),
+)
+
+_COMMAND_SUCCESS = construct.Const(b'\x04\x06')
+
+# Device-specific timestamp. All timestamp reported by this device are seconds
+# since this date.
+_TIMESTAMP = construct_extras.Timestamp(
+ construct.Int32ul, epoch=946684800) # 2010-01-01 00:00
+
+_QUERY_REQUEST = construct.Struct(
+ construct.Const(b'\x04\xe6\x02'),
+ 'selector' / construct.Enum(
+ construct.Byte, serial=0x00, model=0x01, software=0x02),
+)
+
+_QUERY_RESPONSE = construct.Struct(
+ _COMMAND_SUCCESS,
+ # This should be an UTF-16L CString, but construct does not support it.
+ 'value' / construct.GreedyString(encoding='utf-16-le'),
+)
+
+_READ_PARAMETER_REQUEST = construct.Struct(
+ construct.Const(b'\x04'),
+ 'selector' / construct.Enum(
+ construct.Byte, unit=0x04),
+)
+
+_GLUCOSE_MAPPING = {
+ common.Unit.MG_DL: 0x00,
+ common.Unit.MMOL_L: 0x01,
}
-def _extract_message(register):
- """Parse the message preamble and verify checksums."""
- stx, length = _STRUCT_PREAMBLE.unpack_from(register)
- if stx != _STX:
- raise lifescan.MalformedCommand(
- 'invalid STX byte: %02x' % stx)
- if length > _REGISTER_SIZE:
- raise lifescan.MalformedCommand(
- 'invalid length: %d > REGISTER_SIZE' % length)
+_READ_UNIT_RESPONSE = construct.Struct(
+ construct.Const(b'\x03\x06'), # different from _COMMAND_SUCCESS
+ 'unit' / construct.SymmetricMapping(
+ construct.Byte, _GLUCOSE_MAPPING),
+ construct.Padding(3),
+)
- # 2 is the length of the checksum, so it should be ignored.
- calculated_checksum = lifescan.crc_ccitt(register[:(length-2)])
+_READ_RTC_REQUEST = construct.Const(b'\x04\x20\x02')
- coda_offset = length - _STRUCT_CODA.size
- etx, encoded_checksum = _STRUCT_CODA.unpack_from(register[coda_offset:])
- if etx != _ETX:
- raise lifescan.MalformedCommand(
- 'invalid ETX byte: %02x' % etx)
- if encoded_checksum != calculated_checksum:
- raise exceptions.InvalidChecksum(encoded_checksum, calculated_checksum)
+_READ_RTC_RESPONSE = construct.Struct(
+ _COMMAND_SUCCESS,
+ 'timestamp' / _TIMESTAMP,
+)
- response = register[_STRUCT_PREAMBLE.size:coda_offset]
+_WRITE_RTC_REQUEST = construct.Struct(
+ construct.Const(b'\x04\x20\x01'),
+ 'timestamp' / _TIMESTAMP,
+)
- logging.debug('Read packet: %s' % binascii.hexlify(response))
- return response
+_MEMORY_ERASE_REQUEST = construct.Const(b'\x04\x1a')
-def _encode_message(cmd):
- """Add message preamble and calculate checksum, add padding."""
- length = len(cmd) + _STRUCT_PREAMBLE.size + _STRUCT_CODA.size
- preamble = _STRUCT_PREAMBLE.pack(_STX, length)
- message = preamble + cmd + bytes((_ETX,))
- checksum = _STRUCT_CHECKSUM.pack(lifescan.crc_ccitt(message))
- message += checksum
+_READ_RECORD_COUNT_REQUEST = construct.Const(b'\x04\x27\x00')
- logging.debug('Sending packet: %s' % binascii.hexlify(message))
+_READ_RECORD_COUNT_RESPONSE = construct.Struct(
+ _COMMAND_SUCCESS,
+ 'count' / construct.Int16ul,
+)
- # Pad the message to match the size of the register.
- return message + bytes(_REGISTER_SIZE - len(message))
+_READ_RECORD_REQUEST = construct.Struct(
+ construct.Const(b'\x04\x31\x02'),
+ 'record_id' / construct.Int16ul,
+ construct.Const(b'\x00'),
+)
+
+_MEAL_FLAG = {
+ common.Meal.NONE: 0x00,
+ common.Meal.BEFORE: 0x01,
+ common.Meal.AFTER: 0x02,
+}
-def _convert_timestamp(timestamp):
- return datetime.datetime.utcfromtimestamp(timestamp + _EPOCH_BASE)
+_READ_RECORD_RESPONSE = construct.Struct(
+ _COMMAND_SUCCESS,
+ 'inverse_counter' / construct.Int16ul,
+ construct.Padding(1),
+ 'lifetime_counter' / construct.Int16ul,
+ 'timestamp' / _TIMESTAMP,
+ 'value' / construct.Int16ul,
+ 'meal' / construct.SymmetricMapping(
+ construct.Byte, _MEAL_FLAG),
+ construct.Padding(4),
+)
class Device(object):
- def __init__(self, device):
- if not device:
- raise exceptions.CommandLineError(
- '--device parameter is required, should point to the disk device '
- 'representing the meter.')
-
- self.device_name_ = device
- self.scsi_device_ = SCSIDevice(device, readwrite=True)
- self.scsi_ = SCSI(self.scsi_device_)
- self.scsi_.blocksize = _REGISTER_SIZE
-
- def _send_message(self, cmd, lba):
- """Send a request to the meter, and read its response.
-
- Args:
- cmd: (bytes) the raw command to send the device, without
- preamble or checksum.
- lba: (int) the address of the block register to use, known
- valid addresses are 3, 4 and 5.
-
- Returns:
- (bytes) The raw response from the meter. No preamble or coda is
- present, and the checksum has already been validated.
- """
- self.scsi_.write10(lba, 1, _encode_message(cmd))
- response = self.scsi_.read10(lba, 1)
- # TODO: validate that the response is valid.
- return _extract_message(response.datain)
-
- def connect(self):
- inq = self.scsi_.inquiry()
- vendor = inq.result['t10_vendor_identification'][:32]
- if vendor != b'LifeScan':
- raise exceptions.ConnectionFailed(
- 'Device %s is not a LifeScan glucometer.' % self.device_name_)
-
- def disconnect(self):
- return
-
- def get_meter_info(self):
- return common.MeterInfo(
- 'OneTouch %s glucometer' % self._query_string(_QUERY_KEY_MODEL),
- serial_number=self.get_serial_number(),
- version_info=(
- 'Software version: ' + self.get_version(),),
- native_unit=self.get_glucose_unit())
-
- def _query_string(self, query_key):
- response = self._send_message(_QUERY_REQUEST + query_key, 3)
- if response[0:2] != b'\x04\06':
- raise lifescan.MalformedCommand(
- 'invalid response, expected 04 06, received %02x %02x' % (
- response[0], response[1]))
- # Strings are encoded in wide characters (LE), but they should
- # only contain ASCII characters. Note that the string is
- # null-terminated, so the last character should be dropped.
- return response[2:].decode('utf-16-le')[:-1]
-
- def _read_parameter(self, parameter_key):
- response = self._send_message(
- _READ_PARAMETER_REQUEST + parameter_key, 4)
- if response[0:2] != b'\x03\x06':
- raise lifescan.MalformedCommand(
- 'invalid response, expected 03 06, received %02x %02x' % (
- response[0], response[1]))
- return response[2:]
-
- def get_serial_number(self):
- return self._query_string(_QUERY_KEY_SERIAL)
-
- def get_version(self):
- return self._query_string(_QUERY_KEY_SOFTWARE)
-
- def get_datetime(self):
- response = self._send_message(_READ_RTC_REQUEST, 3)
- if response[0:2] != b'\x04\06':
- raise lifescan.MalformedCommand(
- 'invalid response, expected 04 06, received %02x %02x' % (
- response[0], response[1]))
- (timestamp,) = _STRUCT_TIMESTAMP.unpack(response[2:])
- return _convert_timestamp(timestamp)
-
- def set_datetime(self, date=datetime.datetime.now()):
- epoch = datetime.datetime.utcfromtimestamp(_EPOCH_BASE)
- delta = date - epoch
- timestamp = int(delta.total_seconds())
-
- timestamp_bytes = _STRUCT_TIMESTAMP.pack(timestamp)
- response = self._send_message(_WRITE_RTC_REQUEST + timestamp_bytes, 3)
-
- if response[0:2] != b'\x04\06':
- raise lifescan.MalformedCommand(
- 'invalid response, expected 04 06, received %02x %02x' % (
- response[0], response[1]))
-
- # The device does not return the new datetime, so confirm by
- # calling READ RTC again.
- return self.get_datetime()
-
- def zero_log(self):
- response = self._send_message(_MEMORY_ERASE_REQUEST, 3)
- if response[0:2] != b'\x04\06':
- raise lifescan.MalformedCommand(
- 'invalid response, expected 04 06, received %02x %02x' % (
- response[0], response[1]))
-
- def _get_reading_count(self):
- response = self._send_message(_READ_RECORD_COUNT_REQUEST, 3)
- if response[0:2] != b'\x04\06':
- raise lifescan.MalformedCommand(
- 'invalid response, expected 04 06, received %02x %02x' % (
- response[0], response[1]))
-
- (record_count,) = _STRUCT_RECORDID.unpack(response[2:])
- return record_count
-
- def get_glucose_unit(self):
- unit_value = self._read_parameter(_PARAMETER_KEY_UNIT)
- if unit_value == b'\x00\x00\x00\x00':
- return common.Unit.MG_DL
- elif unit_value == b'\x01\x00\x00\x00':
- return common.Unit.MMOL_L
- else:
- raise exceptions.InvalidGlucoseUnit('%r' % unit_value)
-
- def _get_reading(self, record_number):
- request = (_READ_RECORD_REQUEST_PREFIX +
- _STRUCT_RECORDID.pack(record_number) +
- _READ_RECORD_REQUEST_SUFFIX)
- response = self._send_message(request, 3)
- if response[0:2] != b'\x04\06':
- raise lifescan.MalformedCommand(
- 'invalid response, expected 04 06, received %02x %02x' % (
- response[0], response[1]))
-
- (unused_const1, unused_const2, unused_counter, unused_const3,
- unused_counter2, timestamp, value, meal_flag, unused_const4, unused_flags,
- unused_const5, unused_const6) = _STRUCT_RECORD.unpack(
- response)
-
- return common.GlucoseReading(
- _convert_timestamp(timestamp), float(value), meal=_MEAL_CODES[meal_flag])
-
- def get_readings(self):
- record_count = self._get_reading_count()
- for record_number in range(record_count):
- yield self._get_reading(record_number)
+ def __init__(self, device):
+ if not device:
+ raise exceptions.CommandLineError(
+ '--device parameter is required, should point to the disk '
+ 'device representing the meter.')
+
+ self.device_name_ = device
+ self.scsi_device_ = SCSIDevice(device, readwrite=True)
+ self.scsi_ = SCSI(self.scsi_device_)
+ self.scsi_.blocksize = _REGISTER_SIZE
+
+ def connect(self):
+ inq = self.scsi_.inquiry()
+ logging.debug('Device connected: %r', inq.result)
+ vendor = inq.result['t10_vendor_identification'][:32]
+ if vendor != b'LifeScan':
+ raise exceptions.ConnectionFailed(
+ 'Device %s is not a LifeScan glucometer.' % self.device_name_)
+
+ def disconnect(self):
+ return
+
+ def _send_request(self, lba, request_format, request_obj, response_format):
+ """Send a request to the meter, and read its response.
+
+ Args:
+ lba: (int) the address of the block register to use, known
+ valid addresses are 3, 4 and 5.
+ request_format: a construct format identifier of the request to send
+ request_obj: the object to format with the provided identifier
+ response_format: a construct format identifier to parse the returned
+ message with.
+
+ Returns:
+ The Container object parsed from the response received by the meter.
+
+ Raises:
+ lifescan.MalformedCommand if Construct fails to build the request or
+ parse the response.
+
+ """
+ try:
+ request = request_format.build(request_obj)
+ request_raw = _PACKET.build({'value': {'message': request}})
+ logging.debug(
+ 'Request sent: %s', binascii.hexlify(request_raw))
+ self.scsi_.write10(lba, 1, request_raw)
+
+ response_raw = self.scsi_.read10(lba, 1)
+ logging.debug(
+ 'Response received: %s', binascii.hexlify(response_raw.datain))
+ response_pkt = _PACKET.parse(response_raw.datain)
+ logging.debug('Response packet: %r', response_pkt)
+
+ response = response_format.parse(response_pkt.value.message)
+ logging.debug('Response parsed: %r', response)
+
+ return response
+ except construct.ConstructError as e:
+ raise lifescan.MalformedCommand(str(e))
+
+ def _query_string(self, selector):
+ response = self._send_request(
+ 3, _QUERY_REQUEST, {'selector': selector}, _QUERY_RESPONSE)
+
+ # Unfortunately the CString implementation in construct does not support
+ # multi-byte encodings, so we need to discard the terminating null byte
+ # ourself.
+ return response.value[:-1]
+
+ def get_meter_info(self):
+ return common.MeterInfo(
+ 'OneTouch %s glucometer' % self._query_string('model'),
+ serial_number=self.get_serial_number(),
+ version_info=(
+ 'Software version: ' + self.get_version(),),
+ native_unit=self.get_glucose_unit())
+
+ def get_serial_number(self):
+ return self._query_string('serial')
+
+ def get_version(self):
+ return self._query_string('software')
+
+ def get_datetime(self):
+ response = self._send_request(
+ 3, _READ_RTC_REQUEST, None, _READ_RTC_RESPONSE)
+ return response.timestamp
+
+ def set_datetime(self, date=datetime.datetime.now()):
+ self._send_request(
+ 3, _WRITE_RTC_REQUEST, {'timestamp': date},
+ _COMMAND_SUCCESS)
+
+ # The device does not return the new datetime, so confirm by calling
+ # READ RTC again.
+ return self.get_datetime()
+
+ def zero_log(self):
+ self._send_request(
+ 3, _MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS)
+
+ def _get_reading_count(self):
+ response = self._send_request(
+ 3, _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE)
+ return response.count
+
+ def get_glucose_unit(self):
+ response = self._send_request(
+ 4, _READ_PARAMETER_REQUEST, {'selector': 'unit'},
+ _READ_UNIT_RESPONSE)
+ return response.unit
+
+ def _get_reading(self, record_id):
+ response = self._send_request(
+ 3, _READ_RECORD_REQUEST, {'record_id': record_id},
+ _READ_RECORD_RESPONSE)
+ return common.GlucoseReading(
+ response.timestamp, float(response.value), meal=response.meal)
+
+ def get_readings(self):
+ record_count = self._get_reading_count()
+ for record_id in range(record_count):
+ yield self._get_reading(record_id)
diff --git a/setup.py b/setup.py
index 3be4b94..212425a 100644
--- a/setup.py
+++ b/setup.py
@@ -50,7 +50,7 @@ setup(
# listed as mandatory for the feature.
'otultra2': ['pyserial'],
'otultraeasy': ['construct', 'pyserial'],
- 'otverio2015': ['python-scsi'],
+ 'otverio2015': ['construct', 'python-scsi'],
'fsinsulinx': ['construct', 'hidapi'],
'fslibre': ['construct', 'hidapi'],
'fsoptium': ['pyserial'],