23import serial.tools.list_ports
27from .interface_pin
import Interface_PIN
28from .interface_i2c
import Interface_I2C
29from .interface_spi
import Interface_SPI
30from .interface_async
import Interface_Async
31from queue
import Queue
34 """FIRMWARE_VERSION specifies the version of the uC firmware that this API is compatible with
35 major and minor version need to match, patch version of the API can be lower than the (newer) firmware
37 FIRMWARE_VERSION_MAJOR = 0
38 FIRMWARE_VERSION_MINOR = 9
39 FIRMWARE_VERSION_PATCH = 2
43 the class uC_api exposes the full interface to the uC as an object,
44 all the interface like i2c, spi, pin
and async can be found
as exposed variables
46 Interfaces need to be activated before use,
and the activation
is ackloaged by the uC
47 you might need to time.sleep(0.01) after the activation
for the python object to update register the state change
49 if you want to record data you need to use function
start_experiment() to be abel to see the data send back
50 the maximum time of each experiment
is 72min, please call stop experiment before that to start agian
52 after you are done call close_connection to sever the serial connection to the uC,
53 the recorded data
in the python object remains
and can be processed after
55 def __init__(self, serial_port_path, api_level=2):
56 """__init__ creates the uC interface object and establishes the connection to the uC on the given port
58 :param serial_port_path: the path of your system to the serial port, eg. on linux it might be /dev/ttyAMC0 or higher, on mac /dev/tty.usbmodem<XXXXX> on windows <COM port>
59 :type serial_port_path: string
60 :param api_level: level 1
is that the api only espablishes the connection to the uC
and the
"infinite" write
and read buffers, you need to construct the instruction packages your self,
61 level 2 it wraps the full representation of the uC interfaces
in objects that are made availible
as variables on this object, defaults to 2
62 :type api_level: int, optional
73 self.
__name =
"MCU_" + str(serial_port_path)
84 for pin_id
in range(55):
87 for async_id
in range(8):
90 for async_id
in range(8):
104 """update_state This method processes all availible messages from the uC and updates the internal representaion
105 in detail it distributes the recorded packages to the coresponding interfaces
for processing.
114 if isinstance(packet_to_process, ErrorPacket):
115 header_for_sorting = packet_to_process.original_header()
117 header_for_sorting = packet_to_process.header()
121 if header_for_sorting == Data32bitHeader.IN_SET_TIME:
128 if header_for_sorting
in interface.header():
129 interface.process_packet(packet_to_process)
134 if header_for_sorting == self.
pin[0].header()[0]:
135 self.
pin[packet_to_process.value()].process_packet(packet_to_process)
139 elif header_for_sorting
in self.
pin[0].header():
140 self.
pin[packet_to_process.pin_id()].process_packet(packet_to_process)
146 self.
errors.append(str(packet_to_process))
155 "\nfree input queue spots on uC: " + str(free_input_queue_spots_on_uc ) + \
157 "\nERRORS: "+str(self.
errors) +
"\n"
160 """start_experiment This will reset the uC clock, enable that data is collected and that timed instructions are executed by the uC
161 latest after 72min stop_experiment has to be called, after which a new experiment can be programmed and the function can be called again.
165 print(f
"Starting experiment on {self.__name}")
167 packet_to_send =
Data32bitPacket(header=Data32bitHeader.IN_SET_TIME,value=1)
172 """stop_experiment this will stop recording and flush all not jet excecuted timed instructions
174 :param time: the time in us after start_experiment when this function should be called, defaults to 0 (execute instantly)
175 :type time: int, optional
179 print(f
"Stopping experiment on {self.__name}")
181 packet_to_send =
Data32bitPacket(header=Data32bitHeader.IN_SET_TIME,value=0,time=time)
186 """experiment_state returns the state history
188 1 or larger: experiment
is running
192 :
return: 2 lists: first the state second the uC timesteps
in us of the state change
193 :rtype: ([int],[int])
198 """send_packet send a packet to the uC via the "infinite" buffer
199 needs a package object see package.py
201 :param packet_to_send: the package to be send
202 :type packet_to_send: Packet, or any subclass
205 if packet_to_send.header() == Data32bitHeader.IN_SET_TIME:
208 if packet_to_send.time() == 0:
214 logging.warning(
"the instructions are not sorted in time - execution order will be inconsistent")
217 """read_packet returns one package from the uC via the "infinte" buffer
219 if there
is no packet availible it blocks
and waits until a packet becomes availible
221 :
return: one package
from the uC
222 :rtype: Packet,
or any subclass
229 logging.error(
"reading raw packets is only availible in API level 1")
232 """has_packet checks if a packet is availible for reading from the buffer
234 :return: true
if packet
is avilible, false
if not
240 logging.error(
"reading raw packets is only availible in API level 1")
245 """close_connection closes the serial connection to the uC and blocks until this is done
257 """reset uC and hope the serial connection survives
265 def __check_first_connection(self,connection):
266 """__check_first_connection checks if the uC is responding and prints the firmware version
267 if the firmware version does
not match the API version it will
print a warning
269 connection_state = False
270 logging.info(
"send: opening connection - aligning commuication")
272 connection.write(ALIGN_BYTEARRAY)
276 if connection.in_waiting >= 9:
277 byte_packet = connection.read(size = 9)
279 byte_packet = byte_packet.lstrip(b
'\xff')
282 if len(byte_packet) < 9:
283 if len(byte_packet) == 0:
286 elif connection.in_waiting < 9-len(byte_packet):
288 logging.error(
"partial packet received, but not enough bytes send by uC, trying to recover by realigning")
289 connection.write(ALIGN_BYTEARRAY)
293 byte_packet = bytearray(byte_packet).extend(bytearray(connection.read(size = 9-len(byte_packet))))
295 read_packet = Packet.from_bytearray(byte_packet)
297 if read_packet.header() == ErrorHeader.OUT_ALIGN_SUCCESS_VERSION:
298 logging.info(
"uC is ready - firmware version: "+str(read_packet.original_header())+
"."+str(read_packet.original_sub_header())+
"."+str(read_packet.value()))
300 if read_packet.original_header() != FIRMWARE_VERSION.FIRMWARE_VERSION_MAJOR
or read_packet.original_sub_header() != FIRMWARE_VERSION.FIRMWARE_VERSION_MINOR
or read_packet.value() < FIRMWARE_VERSION.FIRMWARE_VERSION_PATCH:
301 logging.warning(
"uC firmware version does not match the API version: \nfirmware version: "+str(read_packet.original_header())+
"."+str(read_packet.original_sub_header())+
"."+str(read_packet.value())+
" \nAPI version: "+str(int(FIRMWARE_VERSION.FIRMWARE_VERSION_MAJOR))+
"."+str(int(FIRMWARE_VERSION.FIRMWARE_VERSION_MINOR))+
"."+str(int(FIRMWARE_VERSION.FIRMWARE_VERSION_PATCH)))
304 connection_state =
True
307 logging.warning(
"unknown packet received, while connecting to uC for the first time: "+str(read_packet))
310 if connection_state ==
False:
312 logging.error(
"uC is not responding to first connection request")
316 def __thread_function(self):
317 """__thread_function internal function managing the actual async communication with the uC in the background
319 idle_write_pc = False
326 free_input_queue_spots_on_uc = -1
327 request_free_input_queue_spots =
False
336 ports = serial.tools.list_ports.comports()
341 if "USB Serial Device" in port.description:
342 logging.info(f
"{self.__serial_port_path} is listed.")
344 logging.info(f
"{self.__serial_port_path} is busy.")
347 logging.info(f
"Opening serial {self.__serial_port_path}, API: {self.__api_level}")
348 for attempt
in range(max_attempts + 1):
356 logging.warning(f
"Try[{attempt + 1}] Port: {self.__connection.port}, Was not the first connection, closing and retrying")
366 if attempt >= max_attempts:
367 logging.error(f
"ERROR: Tried {attempt + 1} times but failed to connect to {self.__serial_port_path}")
370 if connected
and not port_error:
371 logging.info(f
"Connected to {self.__serial_port_path}!")
377 idle_write_pc =
False
382 if data_packet.header() == Data32bitHeader.UC_CLOSE_CONNECTION:
388 logging.debug(
"send instant: "+str(data_packet))
393 if free_input_queue_spots_on_uc > 0 :
396 free_input_queue_spots_on_uc -= 1
398 last_sent_time = data_packet.time()
400 logging.debug(
"send timed: "+str(data_packet))
408 if idle_write_uc%200 == 1:
409 if request_free_input_queue_spots ==
False:
410 request_free_input_queue_spots =
True
411 packet_to_send =
Data32bitPacket(Data32bitHeader.IN_FREE_INSTRUCTION_SPOTS)
413 logging.debug(
"send request: "+str(packet_to_send))
427 byte_packet = byte_packet.lstrip(b
'\xff')
429 if len(byte_packet) < 9:
430 if len(byte_packet) != 0:
432 logging.warning(
"outgoing uC alignment needed, shifted by "+str(len(byte_packet))+
" bytes")
435 logging.error(
"partial packet received, but not enough bytes send by uC, trying to recover by realigning")
439 byte_packet = bytearray(byte_packet).extend(bytearray(self.
__connection.read(size = 9-len(byte_packet))))
442 logging.debug(
"alignment sucesss - no incoming alignment error")
448 read_packet = Packet.from_bytearray(byte_packet)
451 logging.error(
"packet is malformed, maybe misaligned, trying to recover by realigning")
455 logging.debug(
"read: "+str(read_packet))
457 if read_packet.header()
is Data32bitHeader.OUT_FREE_INSTRUCTION_SPOTS:
459 if read_packet.time() > last_sent_time
and exec_running > 0:
460 logging.warning(
"Timing exec squewed, increase buffer size in firmware, last sent time: "+\
461 str(last_sent_time)+
" uC time: "+str(read_packet.time())+\
462 "\npackets send: "+str(packet_send)+
" for free spots: "+str(last_free_spots))
464 logging.debug(
"uC reports "+str(read_packet.value())+
" free input queue spots at time "+\
466 " with time starting from: "+str(last_sent_time)+\
467 "\npackets send: "+str(packet_send)+
" for free spots: "+str(last_free_spots))
468 last_free_spots = read_packet.value()
471 free_input_queue_spots_on_uc = read_packet.value()
472 if free_input_queue_spots_on_uc == 0:
476 request_free_input_queue_spots =
False
478 elif read_packet.header()
is ErrorHeader.OUT_ERROR_UNKNOWN_INSTRUCTION
or read_packet.header()
is ErrorHeader.OUT_ERROR_UNKNOWN_CONFIGURATION:
479 logging.error(
"uC is reporting that it cant understand a send packet, either API and firmware are a different version or communication is not aligned, trying to recover by realigning")
482 elif read_packet.header() == Data32bitHeader.IN_SET_TIME:
483 exec_running = read_packet.value()
484 logging.info(
"Experiment state changed to: "+str(exec_running))
495 if idle_read
and (idle_write_pc
or idle_write_uc > 0):
498 idle_write_pc =
False
This class is the API for the I2C interfaces of the uC It is used to configure, send and recive data ...
This exposes all the pin functionality for use,.
The interface SPI creates and object though which the interface can be accessed.
The Data32bitPacket is used to send 32bit data instructions to the uC all availible instructions are ...
FIRMWARE_VERSION specifies the version of the uC firmware that this API is compatible with major and ...
the class uC_api exposes the full interface to the uC as an object, all the interface like i2c,...
def close_connection(self)
close_connection closes the serial connection to the uC and blocks until this is done resets the uC t...
def experiment_state(self)
experiment_state returns the state history
def __check_first_connection(self, connection)
def __init__(self, serial_port_path, api_level=2)
init creates the uC interface object and establishes the connection to the uC on the given port
def reset(self)
reset uC and hope the serial connection survives
def start_experiment(self)
start_experiment This will reset the uC clock, enable that data is collected and that timed instructi...
def read_packet(self)
read_packet returns one package from the uC via the "infinte" buffer
def stop_experiment(self, time=0)
stop_experiment this will stop recording and flush all not jet excecuted timed instructions
__experiment_state_timestamp
def __thread_function(self)
def update_state(self)
update_state This method processes all availible messages from the uC and updates the internal repres...
def send_packet(self, packet_to_send)
send_packet send a packet to the uC via the "infinite" buffer needs a package object see package....
def has_packet(self)
has_packet checks if a packet is availible for reading from the buffer