uC chip interface arduino  0.9.0
A interface for async and neuromrphic IC testing
Loading...
Searching...
No Matches
uC.py
Go to the documentation of this file.
2# This file is part of the Firmware project to interface with small Async or Neuromorphic chips
3# Copyright (C) 2023-2024 Ole Richter - University of Groningen
4# Copyright (C) 2024 Vincent Jassies - University of Groningen
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation, either version 3 of the License, or
9# (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19
20import logging
21import threading
22import serial
23import serial.tools.list_ports
24from .packet import *
25from .header import *
26from time import sleep
27from .interface_pin import Interface_PIN
28from .interface_i2c import Interface_I2C
29from .interface_spi import Interface_SPI
30from .interface_async import Interface_Async
31from queue import Queue
32
33class FIRMWARE_VERSION(enum.IntEnum):
34 """FIRMWARE_VERSION specifies the version of the uC firmware that this API is compatible with
35 major and minor version need to match, patch version of the API can be lower than the (newer) firmware
36 """
37 FIRMWARE_VERSION_MAJOR = 0
38 FIRMWARE_VERSION_MINOR = 9
39 FIRMWARE_VERSION_PATCH = 2
40
41class uC_api:
42 """
43 the class uC_api exposes the full interface to the uC as an object,
44 all the interface like i2c, spi, pin and async can be found as exposed variables
45
46 Interfaces need to be activated before use, and the activation is ackloaged by the uC
47 you might need to time.sleep(0.01) after the activation for the python object to update register the state change
48
49 if you want to record data you need to use function start_experiment() to be abel to see the data send back
50 the maximum time of each experiment is 72min, please call stop experiment before that to start agian
51
52 after you are done call close_connection to sever the serial connection to the uC,
53 the recorded data in the python object remains and can be processed after
54 """
55 def __init__(self, serial_port_path, api_level=2):
56 """__init__ creates the uC interface object and establishes the connection to the uC on the given port
57
58 :param serial_port_path: the path of your system to the serial port, eg. on linux it might be /dev/ttyAMC0 or higher, on mac /dev/tty.usbmodem<XXXXX> on windows <COM port>
59 :type serial_port_path: string
60 :param api_level: level 1 is that the api only espablishes the connection to the uC and the "infinite" write and read buffers, you need to construct the instruction packages your self,
61 level 2 it wraps the full representation of the uC interfaces in objects that are made availible as variables on this object, defaults to 2
62 :type api_level: int, optional
63 """
64
65 #print(f"Initializing for {serial_port_path}, API: {api_level}")
66 self.__experiment_state = []
68 self.__read_buffer = Queue()
69 self.__write_buffer_timed = Queue()
70 self.__write_buffer = Queue()
71 self.__last_timed_packet = 0
72 self.__api_level = api_level
73 self.__name = "MCU_" + str(serial_port_path)
74
75 self.__connection = None
76 self.__serial_port_path = serial_port_path
77
78 if self.__api_level == 2:
79 # create all the interface objects
80 self.errors = []
81 self.spi = [Interface_SPI(self,0), Interface_SPI(self,1), Interface_SPI(self,2)]
82 self.i2c = [Interface_I2C(self,0), Interface_I2C(self,1), Interface_I2C(self,2)]
83 self.pin = []
84 for pin_id in range(55):
85 self.pin.append(Interface_PIN(self,pin_id))
86 self.async_to_chip = []
87 for async_id in range(8):
88 self.async_to_chip.append(Interface_Async(self,async_id,"TO_CHIP"))
90 for async_id in range(8):
91 self.async_from_chip.append(Interface_Async(self,async_id,"FROM_CHIP"))
92
93 # setup the thread function that handles the communication
94 self.__communication_thread = threading.Thread(target=self.__thread_function)
95
96 #start the thread
97 self.__communication_thread.start()
98
99 # wait for 10 seconds for the uC to align
100 self.__connection = False
101
102
103 def update_state(self):
104 """update_state This method processes all availible messages from the uC and updates the internal representaion
105 in detail it distributes the recorded packages to the coresponding interfaces for processing.
106
107 level 2 only
108 """
109 # process all availible messages from the uC one by one
110 while not self.__read_buffer.empty():
111 # get the next package
112 packet_to_process = self.__read_buffer.get()
113 # choose the interface to process the package
114 if isinstance(packet_to_process, ErrorPacket):
115 header_for_sorting = packet_to_process.original_header()
116 else:
117 header_for_sorting = packet_to_process.header()
118
119 no_match = True
120 # high level experiment control packet
121 if header_for_sorting == Data32bitHeader.IN_SET_TIME:
122 self.__experiment_state.append(packet_to_process.value())
123 self.__experiment_state_timestamp.append(packet_to_process.time())
124 no_match = False
125 continue
126 # iterate though all interfaces and check if they signal they are responcible for that header
127 for interface in self.async_to_chip + self.async_from_chip + self.spi + self.i2c:
128 if header_for_sorting in interface.header():
129 interface.process_packet(packet_to_process)
130 self.__read_buffer.task_done()
131 no_match = False
132 break
133 # pins use all the same header, so we assing the package to the pin object with the same id
134 if header_for_sorting == self.pin[0].header()[0]:
135 self.pin[packet_to_process.value()].process_packet(packet_to_process)
136 self.__read_buffer.task_done()
137 no_match = False
138 continue
139 elif header_for_sorting in self.pin[0].header():
140 self.pin[packet_to_process.pin_id()].process_packet(packet_to_process)
141 self.__read_buffer.task_done()
142 no_match = False
143 continue
144 # if no interface is responcible for the package, we add it to the error package list
145 if no_match:
146 self.errors.append(str(packet_to_process))
147 self.__read_buffer.task_done()
148
149 def __str__(self):
150 self.update_state()
151 return "UC" + \
152 "\nExperiment state: " + str(self.__experiment_state) + \
153 "\nExperiment state timestamp: " + str(self.__experiment_state_timestamp) + \
154 "\nlast timed packet: " + str(self.__last_timed_packet) + \
155 "\nfree input queue spots on uC: " + str(free_input_queue_spots_on_uc ) + \
156 "\napilevel: " + str(self.__api_level) + \
157 "\nERRORS: "+str(self.errors) + "\n"
158
160 """start_experiment This will reset the uC clock, enable that data is collected and that timed instructions are executed by the uC
161 latest after 72min stop_experiment has to be called, after which a new experiment can be programmed and the function can be called again.
162 """
163
164 # Print status
165 print(f"Starting experiment on {self.__name}")
166
167 packet_to_send = Data32bitPacket(header=Data32bitHeader.IN_SET_TIME,value=1)
168 self.send_packet(packet_to_send)
169
170
171 def stop_experiment(self, time = 0):
172 """stop_experiment this will stop recording and flush all not jet excecuted timed instructions
173
174 :param time: the time in us after start_experiment when this function should be called, defaults to 0 (execute instantly)
175 :type time: int, optional
176 """
177
178 # Print status
179 print(f"Stopping experiment on {self.__name}")
180
181 packet_to_send = Data32bitPacket(header=Data32bitHeader.IN_SET_TIME,value=0,time=time)
182 self.send_packet(packet_to_send)
183
184
186 """experiment_state returns the state history
187
188 1 or larger: experiment is running
189 0 experiment stopped
190 -1 uC reset
191
192 :return: 2 lists: first the state second the uC timesteps in us of the state change
193 :rtype: ([int],[int])
194 """
196
197 def send_packet(self, packet_to_send):
198 """send_packet send a packet to the uC via the "infinite" buffer
199 needs a package object see package.py
200
201 :param packet_to_send: the package to be send
202 :type packet_to_send: Packet, or any subclass
203 """
204 # reset the time reference for the timed instructions
205 if packet_to_send.header() == Data32bitHeader.IN_SET_TIME:
206 self.__last_timed_packet = packet_to_send.value()
207 # put the packet in the buffer depending if it s instant or timed
208 if packet_to_send.time() == 0:
209 self.__write_buffer.put(packet_to_send)
210 else:
211 self.__write_buffer_timed.put(packet_to_send)
212 # check if the timed instructions are sorted in time
213 if packet_to_send.time() < self.__last_timed_packet:
214 logging.warning("the instructions are not sorted in time - execution order will be inconsistent")
215
216 def read_packet(self):
217 """read_packet returns one package from the uC via the "infinte" buffer
218
219 if there is no packet availible it blocks and waits until a packet becomes availible
220
221 :return: one package from the uC
222 :rtype: Packet, or any subclass
223 """
224 if self.__api_level == 1:
225 read_packet = self.__read_buffer.get()
226 self.__read_buffer.task_done()
227 return read_packet
228 else:
229 logging.error("reading raw packets is only availible in API level 1")
230
231 def has_packet(self):
232 """has_packet checks if a packet is availible for reading from the buffer
233
234 :return: true if packet is avilible, false if not
235 :rtype: boolean
236 """
237 if self.__api_level == 1:
238 return (self.__read_buffer.empty() == False)
239 else:
240 logging.error("reading raw packets is only availible in API level 1")
241
242 #def print_all_errors(self):
243
245 """close_connection closes the serial connection to the uC and blocks until this is done
246 resets the uC too
247 """
248 # place close connection packet in the write buffer, so the worker thread closes the connection and stop itself
249 self.__write_buffer.put(Data32bitPacket(Data32bitHeader.UC_CLOSE_CONNECTION))
250 # add reset to the experiment state history
251 self.__experiment_state.append(-1)
252 self.__experiment_state_timestamp.append(-1)
253 # wait for the worker thread to close the connection
254 self.__communication_thread.join()
255
256 def reset(self):
257 """reset uC and hope the serial connection survives
258 """
259 # place reset packet in the write buffer, so the worker thread sends it
260 self.__write_buffer.put(Data32bitPacket(Data32bitHeader.IN_RESET))
261 # add reset to the experiment state history
262 self.__experiment_state.append(-1)
263 self.__experiment_state_timestamp.append(-1)
264
265 def __check_first_connection(self,connection):
266 """__check_first_connection checks if the uC is responding and prints the firmware version
267 if the firmware version does not match the API version it will print a warning
268 """
269 connection_state = False
270 logging.info("send: opening connection - aligning commuication")
271 # write 9 bytes to the uC to align the communication
272 connection.write(ALIGN_BYTEARRAY)
273 # wait for 10 seconds for the uC to align -> 1 second
274 for i in range(4): # was 40 with sleep (0.25)
275 # check if the uC has send a packet
276 if connection.in_waiting >= 9:
277 byte_packet = connection.read(size = 9)
278 # do reading alingnment
279 byte_packet = byte_packet.lstrip(b'\xff')
280 sleep(0.05)
281 # if the packet is not complete, try to recover by reading the rest of the packet
282 if len(byte_packet) < 9:
283 if len(byte_packet) == 0:
284 # no packet (was only alignment bytes), jump to next iteration
285 continue
286 elif connection.in_waiting < 9-len(byte_packet):
287 # something went wrong, try again
288 logging.error("partial packet received, but not enough bytes send by uC, trying to recover by realigning")
289 connection.write(ALIGN_BYTEARRAY)
290 continue
291 # complete partial packet
292 else:
293 byte_packet = bytearray(byte_packet).extend(bytearray(connection.read(size = 9-len(byte_packet))))
294 # convert the byte packet to a packet object
295 read_packet = Packet.from_bytearray(byte_packet)
296 # check if the packet is the expected Success packet
297 if read_packet.header() == ErrorHeader.OUT_ALIGN_SUCCESS_VERSION:
298 logging.info("uC is ready - firmware version: "+str(read_packet.original_header())+"."+str(read_packet.original_sub_header())+"."+str(read_packet.value()))
299 # check if the firmware version matches the API version
300 if read_packet.original_header() != FIRMWARE_VERSION.FIRMWARE_VERSION_MAJOR or read_packet.original_sub_header() != FIRMWARE_VERSION.FIRMWARE_VERSION_MINOR or read_packet.value() < FIRMWARE_VERSION.FIRMWARE_VERSION_PATCH:
301 logging.warning("uC firmware version does not match the API version: \nfirmware version: "+str(read_packet.original_header())+"."+str(read_packet.original_sub_header())+"."+str(read_packet.value())+" \nAPI version: "+str(int(FIRMWARE_VERSION.FIRMWARE_VERSION_MAJOR))+"."+str(int(FIRMWARE_VERSION.FIRMWARE_VERSION_MINOR))+"."+str(int(FIRMWARE_VERSION.FIRMWARE_VERSION_PATCH)))
302 # connection is established
303 connection = True
304 connection_state = True
305 return True
306 else:
307 logging.warning("unknown packet received, while connecting to uC for the first time: "+str(read_packet))
308 sleep(0.25)
309 # connection failed after 40 tries/10sec --> changed to 4 times, 1 second
310 if connection_state == False:
311 #logging.error("uC is not responding for 10 sec, wrong port?, no permission?")
312 logging.error("uC is not responding to first connection request")
313 return False
314
315
316 def __thread_function(self):
317 """__thread_function internal function managing the actual async communication with the uC in the background
318 """
319 idle_write_pc = False
320 idle_write_uc = 0
321 idle_read = False
322 last_sent_time = 0
323 last_free_spots = 0
324 packet_send = 0
325 exec_running = 0
326 free_input_queue_spots_on_uc = -1
327 request_free_input_queue_spots = False
328
329 # Serial connection helper variables, to retry connecting if somehow there is already a connection live
330 attempt = 1
331 max_attempts = 5
332 connected = False
333 port_error = False
334
335 # List all available serial ports
336 ports = serial.tools.list_ports.comports()
337
338 # Check if the ports are busy
339 for port in ports:
340 if self.__serial_port_path == port.device:
341 if "USB Serial Device" in port.description:
342 logging.info(f"{self.__serial_port_path} is listed.")
343 else:
344 logging.info(f"{self.__serial_port_path} is busy.")
345 port_error = True
346
347 logging.info(f"Opening serial {self.__serial_port_path}, API: {self.__api_level}")
348 for attempt in range(max_attempts + 1):
349 if not connected: # no connection has been established yet
350 if not port_error: # port is busy or not connected
351 self.__connection = serial.Serial(self.__serial_port_path, 115200, timeout= None, write_timeout=0) #its USB so the speed setting gets ignored and it runes at max speed
352
353 # init communication by forcing the uC to align
354 if not self.__check_first_connection(self.__connection):
355 #print(f"Try[{attempt + 1}] Port: {self.__connection.port}, Was not the first connection, closing and retrying")
356 logging.warning(f"Try[{attempt + 1}] Port: {self.__connection.port}, Was not the first connection, closing and retrying")
357 self.__connection.close()
358 #return
359 else:
360 port_error = False
361 connected = True
362 else:
363 break
364
365 # throw an error message if it failed to connected after x amount of times
366 if attempt >= max_attempts:
367 logging.error(f"ERROR: Tried {attempt + 1} times but failed to connect to {self.__serial_port_path}")
368
369 # only start the communication if connected and no port errors
370 if connected and not port_error:
371 logging.info(f"Connected to {self.__serial_port_path}!")
372 # start communication
373 while True:
374 # check if there is something to send
375 if not self.__write_buffer_timed.empty() or not self.__write_buffer.empty():
376 # set loop slowdown condition flags to false
377 idle_write_pc = False
378 # first write the instant packets
379 if not self.__write_buffer.empty():
380 data_packet = self.__write_buffer.get()
381 # check and close the connection if requested by API
382 if data_packet.header() == Data32bitHeader.UC_CLOSE_CONNECTION:
383 self.__connection.write(Data32bitPacket(Data32bitHeader.IN_RESET).to_bytearray())
384 self.__connection.close()
385 return
386 # else send the packet
387 self.__connection.write(data_packet.to_bytearray())
388 logging.debug("send instant: "+str(data_packet))
389 self.__write_buffer.task_done()
390 # then write the timed packets
391 else:
392 # check if there is space in the uC input queue
393 if free_input_queue_spots_on_uc > 0 :
394 # send the packet and decrease the free input queue spots reference in the API
395 data_packet = self.__write_buffer_timed.get()
396 free_input_queue_spots_on_uc -= 1
397 self.__connection.write(data_packet.to_bytearray())
398 last_sent_time = data_packet.time()
399 packet_send += 1
400 logging.debug("send timed: "+str(data_packet))
401 self.__write_buffer_timed.task_done()
402 else:
403 # request the free input queue spots from the uC,
404 # first request is send instantly, then every 200th loop run through
405 # to not overload the uC with requests, uC will also report the free input
406 # queue spots when it frees up space and the queue was full before
407 idle_write_uc += 1
408 if idle_write_uc%200 == 1:
409 if request_free_input_queue_spots == False:
410 request_free_input_queue_spots = True
411 packet_to_send = Data32bitPacket(Data32bitHeader.IN_FREE_INSTRUCTION_SPOTS)
412 self.__connection.write(packet_to_send.to_bytearray())
413 logging.debug("send request: "+str(packet_to_send))
414 else:
415 # set write loop slowdown condition flag
416 idle_write_pc = True
417
418 # check if there is a package to read from the serial connection,
419 # read 20 packages under the assumption that the pc is faster and
420 # the uC will send more packages
421 # stops if there is nothing to read
422 for i in range(20):
423 if self.__connection.in_waiting >= 9:
424 idle_read = False
425 byte_packet = self.__connection.read(size = 9)
426 # remove alignment bytes
427 byte_packet = byte_packet.lstrip(b'\xff')
428 # check if the packet is complete
429 if len(byte_packet) < 9:
430 if len(byte_packet) != 0:
431 #initiate alignment for incompleate packets
432 logging.warning("outgoing uC alignment needed, shifted by "+str(len(byte_packet))+ " bytes")
433 # partial packet not recoverable
434 if self.__connection.in_waiting < 9-len(byte_packet):
435 logging.error("partial packet received, but not enough bytes send by uC, trying to recover by realigning")
436 self.__connection.write(ALIGN_BYTEARRAY)
437 continue
438 # to complete partial packet
439 byte_packet = bytearray(byte_packet).extend(bytearray(self.__connection.read(size = 9-len(byte_packet))))
440
441 else:
442 logging.debug("alignment sucesss - no incoming alignment error")
443 # no packet, jump to next iteration
444 continue
445 # is now aligned
446 try:
447 # convert the byte packet to a packet object
448 read_packet = Packet.from_bytearray(byte_packet)
449 except:
450 # packet was malformed, force alignment sequence
451 logging.error("packet is malformed, maybe misaligned, trying to recover by realigning")
452 self.__connection.write(ALIGN_BYTEARRAY)
453 continue
454 # packet is complete and valid
455 logging.debug("read: "+str(read_packet))
456 # catch the special case of the uC reporting free input queue spots
457 if read_packet.header() is Data32bitHeader.OUT_FREE_INSTRUCTION_SPOTS:
458 # save the free input queue spots in the API
459 if read_packet.time() > last_sent_time and exec_running > 0:
460 logging.warning("Timing exec squewed, increase buffer size in firmware, last sent time: "+\
461 str(last_sent_time)+" uC time: "+str(read_packet.time())+\
462 "\npackets send: "+str(packet_send)+" for free spots: "+str(last_free_spots))
463 else:
464 logging.debug("uC reports "+str(read_packet.value())+" free input queue spots at time "+\
465 str(read_packet.time())+"\nwaiting on PC: "+str(self.__write_buffer_timed.qsize())+\
466 " with time starting from: "+str(last_sent_time)+\
467 "\npackets send: "+str(packet_send)+" for free spots: "+str(last_free_spots))
468 last_free_spots = read_packet.value()
469 packet_send = 0
470 # set one less then availible, because of bug the pc will send to much
471 free_input_queue_spots_on_uc = read_packet.value()
472 if free_input_queue_spots_on_uc == 0:
473 idle_write_uc = 2
474 else:
475 idle_write_uc = 0
476 request_free_input_queue_spots = False
477 # catch the special case of the uC reporting an malformed packet from the API
478 elif read_packet.header() is ErrorHeader.OUT_ERROR_UNKNOWN_INSTRUCTION or read_packet.header() is ErrorHeader.OUT_ERROR_UNKNOWN_CONFIGURATION:
479 logging.error("uC is reporting that it cant understand a send packet, either API and firmware are a different version or communication is not aligned, trying to recover by realigning")
480 self.__connection.write(ALIGN_BYTEARRAY)
481 # keep track of the experiment state, so we know when to issue a warning for execution time squew
482 elif read_packet.header() == Data32bitHeader.IN_SET_TIME:
483 exec_running = read_packet.value()
484 logging.info("Experiment state changed to: "+str(exec_running))
485 self.__read_buffer.put(read_packet)
486 # normal packet, send to the read buffer for further processing by the main thread
487 else:
488 self.__read_buffer.put(read_packet)
489 else:
490 # set read loop slowdown condition flag, as there is nothing to read
491 idle_read = True
492 break
493
494 # slow down the loop if there is nothing to do
495 if idle_read and (idle_write_pc or idle_write_uc > 0):
496 sleep(0.000003)
497 idle_read = False
498 idle_write_pc = False
499
500
501
This class is the API for the I2C interfaces of the uC It is used to configure, send and recive data ...
This exposes all the pin functionality for use,.
The interface SPI creates and object though which the interface can be accessed.
The Data32bitPacket is used to send 32bit data instructions to the uC all availible instructions are ...
Definition: packet.py:125
FIRMWARE_VERSION specifies the version of the uC firmware that this API is compatible with major and ...
Definition: uC.py:33
the class uC_api exposes the full interface to the uC as an object, all the interface like i2c,...
Definition: uC.py:41
def close_connection(self)
close_connection closes the serial connection to the uC and blocks until this is done resets the uC t...
Definition: uC.py:244
def experiment_state(self)
experiment_state returns the state history
Definition: uC.py:185
def __str__(self)
Definition: uC.py:149
def __check_first_connection(self, connection)
Definition: uC.py:265
def __init__(self, serial_port_path, api_level=2)
init creates the uC interface object and establishes the connection to the uC on the given port
Definition: uC.py:55
def reset(self)
reset uC and hope the serial connection survives
Definition: uC.py:256
__write_buffer
Definition: uC.py:70
def start_experiment(self)
start_experiment This will reset the uC clock, enable that data is collected and that timed instructi...
Definition: uC.py:159
__communication_thread
Definition: uC.py:94
def read_packet(self)
read_packet returns one package from the uC via the "infinte" buffer
Definition: uC.py:216
__serial_port_path
Definition: uC.py:76
__last_timed_packet
Definition: uC.py:71
__write_buffer_timed
Definition: uC.py:69
def stop_experiment(self, time=0)
stop_experiment this will stop recording and flush all not jet excecuted timed instructions
Definition: uC.py:171
__experiment_state_timestamp
Definition: uC.py:67
__experiment_state
Definition: uC.py:66
def __thread_function(self)
Definition: uC.py:316
def update_state(self)
update_state This method processes all availible messages from the uC and updates the internal repres...
Definition: uC.py:103
def send_packet(self, packet_to_send)
send_packet send a packet to the uC via the "infinite" buffer needs a package object see package....
Definition: uC.py:197
def has_packet(self)
has_packet checks if a packet is availible for reading from the buffer
Definition: uC.py:231
async_from_chip
Definition: uC.py:89