summaryrefslogtreecommitdiffstats
path: root/glucometerutils/drivers/otverio2015.py
diff options
context:
space:
mode:
Diffstat (limited to 'glucometerutils/drivers/otverio2015.py')
-rw-r--r--glucometerutils/drivers/otverio2015.py123
1 files changed, 57 insertions, 66 deletions
diff --git a/glucometerutils/drivers/otverio2015.py b/glucometerutils/drivers/otverio2015.py
index bde0af3..b8429ea 100644
--- a/glucometerutils/drivers/otverio2015.py
+++ b/glucometerutils/drivers/otverio2015.py
@@ -29,67 +29,61 @@ import construct
from pyscsi.pyscsi.scsi import SCSI
from pyscsi.pyscsi.scsi_device import SCSIDevice
-from glucometerutils import common
-from glucometerutils import exceptions
+from glucometerutils import common, exceptions
from glucometerutils.support import driver_base, lifescan, lifescan_binary_protocol
# This device uses SCSI blocks as registers.
_REGISTER_SIZE = 512
_PACKET = construct.Padded(
- _REGISTER_SIZE,
- lifescan_binary_protocol.LifeScanPacket(False))
+ _REGISTER_SIZE, lifescan_binary_protocol.LifeScanPacket(False)
+)
-_COMMAND_SUCCESS = construct.Const(b'\x03\x06')
+_COMMAND_SUCCESS = construct.Const(b"\x03\x06")
_QUERY_REQUEST = construct.Struct(
- construct.Const(b'\x03\xe6\x02'),
- 'selector' / construct.Enum(
- construct.Byte, serial=0x00, model=0x01, software=0x02),
+ construct.Const(b"\x03\xe6\x02"),
+ "selector" / construct.Enum(construct.Byte, serial=0x00, model=0x01, software=0x02),
)
_QUERY_RESPONSE = construct.Struct(
- construct.Const(b'\x03\x06'),
- 'value' / construct.CString(encoding='utf-16-le'),
+ construct.Const(b"\x03\x06"), "value" / construct.CString(encoding="utf-16-le"),
)
_READ_PARAMETER_REQUEST = construct.Struct(
- construct.Const(b'\x03'),
- 'selector' / construct.Enum(
- construct.Byte, unit=0x04),
+ construct.Const(b"\x03"), "selector" / construct.Enum(construct.Byte, unit=0x04),
)
_READ_UNIT_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'unit' / lifescan_binary_protocol.GLUCOSE_UNIT,
+ "unit" / lifescan_binary_protocol.GLUCOSE_UNIT,
construct.Padding(3),
)
-_READ_RTC_REQUEST = construct.Const(b'\x03\x20\x02')
+_READ_RTC_REQUEST = construct.Const(b"\x03\x20\x02")
_READ_RTC_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
+ "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
)
_WRITE_RTC_REQUEST = construct.Struct(
- construct.Const(b'\x03\x20\x01'),
- 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
+ construct.Const(b"\x03\x20\x01"),
+ "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
)
-_MEMORY_ERASE_REQUEST = construct.Const(b'\x03\x1a')
+_MEMORY_ERASE_REQUEST = construct.Const(b"\x03\x1a")
-_READ_RECORD_COUNT_REQUEST = construct.Const(b'\x03\x27\x00')
+_READ_RECORD_COUNT_REQUEST = construct.Const(b"\x03\x27\x00")
_READ_RECORD_COUNT_RESPONSE = construct.Struct(
- _COMMAND_SUCCESS,
- 'count' / construct.Int16ul,
+ _COMMAND_SUCCESS, "count" / construct.Int16ul,
)
_READ_RECORD_REQUEST = construct.Struct(
- construct.Const(b'\x03\x31\x02'),
- 'record_id' / construct.Int16ul,
- construct.Const(b'\x00'),
+ construct.Const(b"\x03\x31\x02"),
+ "record_id" / construct.Int16ul,
+ construct.Const(b"\x00"),
)
_MEAL_FLAG = {
@@ -100,13 +94,12 @@ _MEAL_FLAG = {
_READ_RECORD_RESPONSE = construct.Struct(
_COMMAND_SUCCESS,
- 'inverse_counter' / construct.Int16ul,
+ "inverse_counter" / construct.Int16ul,
construct.Padding(1),
- 'lifetime_counter' / construct.Int16ul,
- 'timestamp' / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
- 'value' / construct.Int16ul,
- 'meal' / construct.Mapping(
- construct.Byte, _MEAL_FLAG),
+ "lifetime_counter" / construct.Int16ul,
+ "timestamp" / lifescan_binary_protocol.VERIO_TIMESTAMP, # type: ignore
+ "value" / construct.Int16ul,
+ "meal" / construct.Mapping(construct.Byte, _MEAL_FLAG),
construct.Padding(4),
)
@@ -115,8 +108,9 @@ class Device(driver_base.GlucometerDriver):
def __init__(self, device):
if not device:
raise exceptions.CommandLineError(
- '--device parameter is required, should point to the disk '
- 'device representing the meter.')
+ "--device parameter is required, should point to the disk "
+ "device representing the meter."
+ )
self.device_name_ = device
self.scsi_device_ = SCSIDevice(device, readwrite=True)
@@ -125,11 +119,12 @@ class Device(driver_base.GlucometerDriver):
def connect(self):
inq = self.scsi_.inquiry()
- logging.debug('Device connected: %r', inq.result)
- vendor = inq.result['t10_vendor_identification'][:32]
- if vendor != b'LifeScan':
+ logging.debug("Device connected: %r", inq.result)
+ vendor = inq.result["t10_vendor_identification"][:32]
+ if vendor != b"LifeScan":
raise exceptions.ConnectionFailed(
- 'Device %s is not a LifeScan glucometer.' % self.device_name_)
+ "Device %s is not a LifeScan glucometer." % self.device_name_
+ )
def disconnect(self): # pylint: disable=no-self-use
return
@@ -155,21 +150,19 @@ class Device(driver_base.GlucometerDriver):
"""
try:
request = request_format.build(request_obj)
- request_raw = _PACKET.build({'data': {'value': {
- 'message': request,
- }}})
- logging.debug(
- 'Request sent: %s', binascii.hexlify(request_raw))
+ request_raw = _PACKET.build({"data": {"value": {"message": request,}}})
+ logging.debug("Request sent: %s", binascii.hexlify(request_raw))
self.scsi_.write10(lba, 1, request_raw)
response_raw = self.scsi_.read10(lba, 1)
logging.debug(
- 'Response received: %s', binascii.hexlify(response_raw.datain))
+ "Response received: %s", binascii.hexlify(response_raw.datain)
+ )
response_pkt = _PACKET.parse(response_raw.datain).data
- logging.debug('Response packet: %r', response_pkt)
+ logging.debug("Response packet: %r", response_pkt)
response = response_format.parse(response_pkt.value.message)
- logging.debug('Response parsed: %r', response)
+ logging.debug("Response parsed: %r", response)
return response
except construct.ConstructError as e:
@@ -177,60 +170,58 @@ class Device(driver_base.GlucometerDriver):
def _query_string(self, selector):
response = self._send_request(
- 3, _QUERY_REQUEST, {'selector': selector}, _QUERY_RESPONSE)
+ 3, _QUERY_REQUEST, {"selector": selector}, _QUERY_RESPONSE
+ )
return response.value
def get_meter_info(self):
return common.MeterInfo(
- 'OneTouch %s glucometer' % self._query_string('model'),
+ "OneTouch %s glucometer" % self._query_string("model"),
serial_number=self.get_serial_number(),
- version_info=(
- 'Software version: ' + self.get_version(),),
- native_unit=self.get_glucose_unit())
+ version_info=("Software version: " + self.get_version(),),
+ native_unit=self.get_glucose_unit(),
+ )
def get_serial_number(self):
- return self._query_string('serial')
+ return self._query_string("serial")
def get_version(self):
- return self._query_string('software')
+ return self._query_string("software")
def get_datetime(self):
- response = self._send_request(
- 3, _READ_RTC_REQUEST, None, _READ_RTC_RESPONSE)
+ response = self._send_request(3, _READ_RTC_REQUEST, None, _READ_RTC_RESPONSE)
return response.timestamp
def _set_device_datetime(self, date):
- self._send_request(
- 3, _WRITE_RTC_REQUEST, {'timestamp': date},
- _COMMAND_SUCCESS)
+ self._send_request(3, _WRITE_RTC_REQUEST, {"timestamp": date}, _COMMAND_SUCCESS)
# The device does not return the new datetime, so confirm by calling
# READ RTC again.
return self.get_datetime()
def zero_log(self):
- self._send_request(
- 3, _MEMORY_ERASE_REQUEST, None,
- _COMMAND_SUCCESS)
+ self._send_request(3, _MEMORY_ERASE_REQUEST, None, _COMMAND_SUCCESS)
def get_glucose_unit(self):
response = self._send_request(
- 4, _READ_PARAMETER_REQUEST, {'selector': 'unit'},
- _READ_UNIT_RESPONSE)
+ 4, _READ_PARAMETER_REQUEST, {"selector": "unit"}, _READ_UNIT_RESPONSE
+ )
return response.unit
def _get_reading_count(self):
response = self._send_request(
- 3, _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE)
+ 3, _READ_RECORD_COUNT_REQUEST, None, _READ_RECORD_COUNT_RESPONSE
+ )
return response.count
def _get_reading(self, record_id):
response = self._send_request(
- 3, _READ_RECORD_REQUEST, {'record_id': record_id},
- _READ_RECORD_RESPONSE)
+ 3, _READ_RECORD_REQUEST, {"record_id": record_id}, _READ_RECORD_RESPONSE
+ )
return common.GlucoseReading(
- response.timestamp, float(response.value), meal=response.meal)
+ response.timestamp, float(response.value), meal=response.meal
+ )
def get_readings(self):
record_count = self._get_reading_count()