summaryrefslogtreecommitdiffstats
path: root/glucometerutils/drivers/sdcodefree.py
diff options
context:
space:
mode:
Diffstat (limited to 'glucometerutils/drivers/sdcodefree.py')
-rw-r--r--glucometerutils/drivers/sdcodefree.py108
1 files changed, 55 insertions, 53 deletions
diff --git a/glucometerutils/drivers/sdcodefree.py b/glucometerutils/drivers/sdcodefree.py
index a6e2ce5..47dd9ca 100644
--- a/glucometerutils/drivers/sdcodefree.py
+++ b/glucometerutils/drivers/sdcodefree.py
@@ -24,46 +24,44 @@ import operator
import construct
-from glucometerutils import common
-from glucometerutils import exceptions
-from glucometerutils.support import serial, driver_base
+from glucometerutils import common, exceptions
+from glucometerutils.support import driver_base, serial
def xor_checksum(msg):
return functools.reduce(operator.xor, msg)
+
class Direction(enum.Enum):
In = 0x20
Out = 0x10
+
_PACKET = construct.Struct(
- 'stx' / construct.Const(0x53, construct.Byte),
- 'direction' / construct.Mapping(
- construct.Byte,
- {e: e.value for e in Direction}),
- 'length' / construct.Rebuild(
- construct.Byte, lambda this: len(this.message) + 2),
- 'message' / construct.Bytes(lambda this: this.length - 2),
- 'checksum' / construct.Checksum(
- construct.Byte, xor_checksum, construct.this.message),
- 'etx' / construct.Const(0xAA, construct.Byte)
+ "stx" / construct.Const(0x53, construct.Byte),
+ "direction" / construct.Mapping(construct.Byte, {e: e.value for e in Direction}),
+ "length" / construct.Rebuild(construct.Byte, lambda this: len(this.message) + 2),
+ "message" / construct.Bytes(lambda this: this.length - 2),
+ "checksum"
+ / construct.Checksum(construct.Byte, xor_checksum, construct.this.message),
+ "etx" / construct.Const(0xAA, construct.Byte),
)
_FIRST_MESSAGE = construct.Struct(
construct.Const(0x30, construct.Byte),
- 'count' / construct.Int16ub,
+ "count" / construct.Int16ub,
construct.Const(0xAA, construct.Byte)[19],
)
-_CHALLENGE_PACKET_FULL = b'\x53\x20\x04\x10\x30\x20\xAA'
-_RESPONSE_MESSAGE = b'\x10\x40'
+_CHALLENGE_PACKET_FULL = b"\x53\x20\x04\x10\x30\x20\xAA"
+_RESPONSE_MESSAGE = b"\x10\x40"
-_DATE_SET_MESSAGE = b'\x10\x10'
+_DATE_SET_MESSAGE = b"\x10\x10"
-_DISCONNECT_MESSAGE = b'\x10\x60'
-_DISCONNECTED_MESSAGE = b'\x10\x70'
+_DISCONNECT_MESSAGE = b"\x10\x60"
+_DISCONNECTED_MESSAGE = b"\x10\x70"
-_FETCH_MESSAGE = b'\x10\x60'
+_FETCH_MESSAGE = b"\x10\x60"
_MEAL_FLAG = {
common.Meal.NONE: 0x00,
@@ -73,67 +71,64 @@ _MEAL_FLAG = {
_READING = construct.Struct(
construct.Byte[2],
- 'year' / construct.Byte,
- 'month' / construct.Byte,
- 'day' / construct.Byte,
- 'hour' / construct.Byte,
- 'minute' / construct.Byte,
- 'value' / construct.Int16ub,
- 'meal' / construct.Mapping(
- construct.Byte, _MEAL_FLAG),
+ "year" / construct.Byte,
+ "month" / construct.Byte,
+ "day" / construct.Byte,
+ "hour" / construct.Byte,
+ "minute" / construct.Byte,
+ "value" / construct.Int16ub,
+ "meal" / construct.Mapping(construct.Byte, _MEAL_FLAG),
construct.Byte[7],
)
class Device(serial.SerialDevice, driver_base.GlucometerDriver):
BAUDRATE = 38400
- DEFAULT_CABLE_ID = '10c4:ea60' # Generic cable.
+ DEFAULT_CABLE_ID = "10c4:ea60" # Generic cable.
TIMEOUT = 300 # We need to wait for data from the device.
def read_message(self):
pkt = _PACKET.parse_stream(self.serial_)
- logging.debug('received packet: %r', pkt)
+ logging.debug("received packet: %r", pkt)
return pkt.message
def wait_and_ready(self):
- challenge = b'\0'
- while challenge == b'\0':
+ challenge = b"\0"
+ while challenge == b"\0":
challenge = self.serial_.read(1)
# The first packet read may have a prefixed zero, it might be a bug
# in the cp210x driver or device, but discard it if found.
- if challenge == b'\0':
- logging.debug('spurious null byte received')
+ if challenge == b"\0":
+ logging.debug("spurious null byte received")
continue
- if challenge != b'\x53':
+ if challenge != b"\x53":
raise exceptions.ConnectionFailed(
- message='Unexpected starting bytes %r' % challenge)
+ message="Unexpected starting bytes %r" % challenge
+ )
challenge += self.serial_.read(6)
if challenge != _CHALLENGE_PACKET_FULL:
raise exceptions.ConnectionFailed(
- message='Unexpected challenge %r' % challenge)
+ message="Unexpected challenge %r" % challenge
+ )
- logging.debug(
- 'challenge packet received: %s', binascii.hexlify(challenge))
+ logging.debug("challenge packet received: %s", binascii.hexlify(challenge))
self.send_message(_RESPONSE_MESSAGE)
# The first packet only contains the counter of how many readings are
# available.
first_message = _FIRST_MESSAGE.parse(self.read_message())
- logging.debug('received first message: %r', first_message)
+ logging.debug("received first message: %r", first_message)
return first_message.count
def send_message(self, message):
- pkt = _PACKET.build({
- 'message': message,
- 'direction': Direction.Out
- })
- logging.debug('sending packet: %s', binascii.hexlify(pkt))
+ pkt = _PACKET.build({"message": message, "direction": Direction.Out})
+ logging.debug("sending packet: %s", binascii.hexlify(pkt))
self.serial_.write(pkt)
def connect(self): # pylint: disable=no-self-use
@@ -146,7 +141,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
raise exceptions.InvalidResponse(response=response)
def get_meter_info(self): # pylint: disable=no-self-use
- return common.MeterInfo('SD CodeFree glucometer')
+ return common.MeterInfo("SD CodeFree glucometer")
def get_version(self): # pylint: disable=no-self-use
raise NotImplementedError
@@ -162,7 +157,7 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
raise NotImplementedError
def _set_device_datetime(self, date):
- setdatecmd = date.strftime('ADATE%Y%m%d%H%M').encode('ascii')
+ setdatecmd = date.strftime("ADATE%Y%m%d%H%M").encode("ascii")
# Ignore the readings count.
self.wait_and_ready()
@@ -173,8 +168,9 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
raise exceptions.InvalidResponse(response=response)
# The date we return should only include up to minute, unfortunately.
- return datetime.datetime(date.year, date.month, date.day,
- date.hour, date.minute)
+ return datetime.datetime(
+ date.year, date.month, date.day, date.hour, date.minute
+ )
def zero_log(self):
raise NotImplementedError
@@ -187,10 +183,16 @@ class Device(serial.SerialDevice, driver_base.GlucometerDriver):
message = self.read_message()
reading = _READING.parse(message)
- logging.debug('received reading: %r', reading)
+ logging.debug("received reading: %r", reading)
yield common.GlucoseReading(
datetime.datetime(
- 2000 + reading.year, reading.month,
- reading.day, reading.hour, reading.minute),
- reading.value, meal=reading.meal)
+ 2000 + reading.year,
+ reading.month,
+ reading.day,
+ reading.hour,
+ reading.minute,
+ ),
+ reading.value,
+ meal=reading.meal,
+ )