0

the original code which only support python 2 is here

link to thinkgear.py

I'm trying to edit it to support python 3. the edited code is here:

import sys import serial from io import BytesIO import struct from collections import namedtuple import logging import logging.handlers import sys import time import datetime global delta delta = [] _log = logging.getLogger(__name__) _bytelog = logging.getLogger(__name__+'.bytes') _bytelog.propagate = False fh = logging.FileHandler('spam.log') fh.setLevel(logging.DEBUG) _log.addHandler(fh) class ThinkGearProtocol(object): def __init__(self, port): self.serial = serial.Serial(port, 57600) self.preread = BytesIO() self.io = self.serial @staticmethod def _chksum(packet): return ~sum(c for c in packet ) & 0xff def _read(self, n): buf = self.io.read(n) if len(buf) < n: _log.debug('incomplete read, short %s bytes', n - len(buf)) if self.io == self.preread: _log.debug('end of preread buffer') # self.preread.reset() # self.preread.truncate() # two line comment out self.io = self.serial buf += self.io.read(n-len(buf)) if len(buf) < n: _log.debug('incomplete read, short %s bytes', n - len(buf)) for o in range(0, len(buf), 16): _bytelog.debug('%04X '+' '.join(('%02X',)*len(buf[o:o+16])), o, *(c for c in buf[o:o+16])) return buf def _deread(self, buf): _log.debug('putting back %s bytes', len(buf)) pos = self.preread.tell() self.preread.seek(0, 2) self.preread.write(buf) self.preread.seek(pos) self.io = self.preread def get_packets(self): last_two = () while True: last_two = last_two[-1:]+(self._read(1),) # _log.debug('last_two: %r', last_two) if last_two == (b'\xAA',b'\xAA'): plen = self._read(1) if plen >= b'\xAA': # Bogosity _log.debug('discarding %r while syncing', last_two[0]) last_two = last_two[-1:]+(plen,) else: last_two = () packet = self._read(int.from_bytes((plen), byteorder='big')) # _log.debug(plen) checksum = self._read(1) if ord(checksum) == self._chksum(packet): yield self._decode(packet) else: _log.debug('bad checksum') self._deread(packet+checksum) elif len(last_two) == 2: _log.debug('discarding %r while syncing', last_two[0]) def _decode(self, packet): decoded = [] while packet: extended_code_level = 0 while len(packet) and packet[0] == '\x55': extended_code_level += 1 packet = packet[1:] if len(packet) < 2: _log.debug('ran out of packet: %r', '\x55'*extended_code_level+packet) break code = packet[0] if code < 0x80: value = packet[1] packet = packet[2:] else: vlen = packet[1] if len(packet) < 2+vlen: _log.debug('ran out of packet: %r', '\x55'*extended_code_level+chr(code)+chr(vlen)+packet) break value = packet[2:2+vlen] packet = packet[2+vlen:] # _log.debug('extended_code_level is '+str(extended_code_level)) # _log.debug('code is '+str(code)) # _log.debug('data_types is '+str(data_types)) # _log.debug(not extended_code_level and code in data_types) # _log.debug(not bool(extended_code_level and code in data_types)) # _log.debug((extended_code_level,code) in data_types) if not bool(extended_code_level and code in data_types): data = data_types[code](extended_code_level, code, value) # _log.debug('extended_code_level is '+str(extended_code_level)) # _log.debug('code is '+str(code)) # _log.debug('value is '+str(value)) # _log.debug('data_types is '+str(data_types)) elif (extended_code_level,code) in data_types: data = data_types[(extended_code_level,code)](extended_code_level, code, value) else: data = ThinkGearUnknownData(extended_code_level, code, value) decoded.append(data) return decoded data_types = {} class ThinkGearMetaClass(type): def __new__(mcls, name, bases, data): cls = super(ThinkGearMetaClass, mcls).__new__(mcls, name, bases, data) code = getattr(cls, 'code', None) if code: data_types[code] = cls extended_code_level = getattr(cls, 'extended_code_level', None) if extended_code_level: data_types[(extended_code_level,code)] = cls return cls class ThinkGearData(object, metaclass=ThinkGearMetaClass): def __init__(self, extended_code_level, code, value): self.extended_code_level = extended_code_level self.code = code # self.value = self._decode(value) self.value = value # _log.debug('123') if self._log: _log.log(self._log, '%s', self) @staticmethod def _decode(v): return v def __str__(self): return self._strfmt % vars(self) # __metaclass__ = ThinkGearMetaClass _log = logging.DEBUG class ThinkGearUnknownData(ThinkGearData): '''???''' _strfmt = 'Unknown: code=%(code)02X extended_code_level=%(extended_code_level)s %(value)r' class ThinkGearPoorSignalData(ThinkGearData): '''POOR_SIGNAL Quality (0-255)''' code = 0x02 _strfmt = 'POOR SIGNAL: %(value)s' _decode = staticmethod(ord) class ThinkGearAttentionData(ThinkGearData): '''ATTENTION eSense (0 to 100)''' code = 0x04 _strfmt = 'ATTENTION eSense: %(value)s' _decode = staticmethod(ord) class ThinkGearMeditationData(ThinkGearData): '''MEDITATION eSense (0 to 100)''' code = 0x05 _strfmt = 'MEDITATION eSense: %(value)s' _decode = staticmethod(ord) class ThinkGearRawWaveData(ThinkGearData): '''RAW Wave Value (-32768 to 32767)''' code = 0x80 _strfmt = 'Raw Wave: %(value)s' _decode = staticmethod(lambda v: struct.unpack('>h', v)[0]) # There are lots of these, don't log them by default _log = False EEGPowerData = namedtuple('EEGPowerData', 'delta theta lowalpha highalpha lowbeta highbeta lowgamma midgamma') delta_value = namedtuple('EEGPowerData', 'delta') class ThinkGearEEGPowerData(ThinkGearData): '''Eight EEG band power values (0 to 16777215). delta, theta, low-alpha high-alpha, low-beta, high-beta, low-gamma, and mid-gamma EEG band power values. ''' code = 0x83 _strfmt = 'ASIC EEG Power: %(value)r' _decode = staticmethod(lambda v: EEGPowerData(*struct.unpack('>8L', ''.join( '\x00'+v[o:o+3] for o in range(0, 24, 3))))) #print(EEGPowerData.delta) def main(): global packet_log packet_log = [] logging.basicConfig(level=logging.DEBUG) for pkt in ThinkGearProtocol('COM3').get_packets(): packet_log.append(pkt) if __name__ == '__main__': main() 

when running in python2, i get the result like this:

DEBUG:__main__:ASIC EEG Power: EEGPowerData(delta=7784, theta=7734, lowalpha=2035, highalpha=1979, lowbeta=2914, highbeta=3996, lowgamma=1944, midgamma=1847 

when running in python3, the result is like this:

DEBUG:__main__:ASIC EEG Power: b'\x00\xa9\xf1\x00t%\x00\rK\x00\x18"\x00\x16%\x00\x1d6\x00OT\x00\x17\x84' 

Anyone know how should i edit this line of code in order to make it work in python 3? Thank you

_decode = staticmethod(lambda v: EEGPowerData(*struct.unpack('>8L', ''.join( '\x00'+v[o:o+3] for o in range(0, 24, 3))))) 
9
  • Okay, it looks complete now, but it's also important that you make it minimal too. (read minimal reproducible example) -- the bug is only in the construction of the data_types dict. ---- anyway, the question is probably a duplicate of metaclass - __metaclass__ in Python 3 - Stack Overflow , if there isn't any other errors.. Commented Feb 1, 2021 at 6:20
  • really appreciate your help! it worked. but now have other errors... Commented Feb 1, 2021 at 6:27
  • DEBUG:__main__:ASIC EEG Power: b'\x00\r|\x00L\xa3\x00F,\x00\x0b\xca\x00\x1a\x80\x00)#\x006\xec\x00\x0cN' Commented Feb 1, 2021 at 6:29
  • What's the actual and expected behavior? (edit the question, including your fixed code.) Commented Feb 1, 2021 at 6:30
  • do you know why the EEG power is not decoded into 8 numbers? Commented Feb 1, 2021 at 6:30

0

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.