Adsense HTML/JavaScript

Wednesday, July 6, 2022

ESP32-C3/MicroPython multithreading exercise, get user input un-blocked using _thread.

This exercise get user input by call input(). But input() is a blocking function, means it blocks further execution of a program until user enter something. In this exercise code, read user input by calling input() and control RGB in two separated thread using _thread.

Please notice that _thread currently is highly experimental and its API is not yet fully settled.


mpyC3_thread.py
"""
ESP32-C3/MicroPython exercise:
read user input non-blocked using _thread

MicroPython libraries _thread (multithreading support)
https://docs.micropython.org/en/latest/library/_thread.html

This module is highly experimental and its API is not yet fully
settled and not yet described in documentation.

So, basically - the excise is by guessing, and run as is.

Tested on Espressif ESP32-C3-DevKitM-1/micropython v1.19.1
"""
import os
import sys
import time
import _thread
import neopixel
from machine import Pin

# On Espreffif ESP32-C3-DevKitM-1:
# The onboard RGB LED (WS2812) is connected to GPIO8
np = neopixel.NeoPixel(Pin(8), 1)

rqs_to_show =False
to_show = ""

print()

print("====================================")
print(sys.implementation[0], os.uname()[3],
      "\nrun on", os.uname()[4])
print("====================================")

# thread to read user input,
# input() will block the program,
# so have to run in another thread.
def input_thread():
    global rqs_to_show
    global to_show
    while True:
        user_input = input()
        to_show = user_input
        rqs_to_show = True
        print("in input_thread() => ", user_input)
        
# thread to change RGB repeatly.
def rgb_thread():
    while True:
        np[0] = (10, 0, 0)
        np.write()
        time.sleep(0.5)
        np[0] = (0, 10, 0)
        np.write()
        time.sleep(0.5)
        np[0] = (0, 0, 10)
        np.write()
        time.sleep(0.5)

_thread.start_new_thread(input_thread, ())
_thread.start_new_thread(rgb_thread, ())

while True:
    if rqs_to_show:
        rqs_to_show = False
        print("in main thread: rqs_to_show -> ", to_show)

Next:
~ applied on real exercise: ESP32-C3/MicroPython BLE UART Communication

No comments:

Post a Comment