「Raspberry Pi Pico」の2つのCoreを使って、2つの処理を同時に実行(並列動作:マルチスレッド)させる方法について詳しく紹介します。
一定の間隔でプログラムを実行しながら、別の処理を行いたい場合や、処理を分担して同時に実行したい場合に便利です。
特に複雑な設定はなく、簡単に実現できるためぜひお試しを♪
「ラズパイ Pico」の詳細や基本的な使い方は、以下のリンクで詳しく紹介しています。
1.Picoのデュアルコアについて
2.配線図
3.サンプルプログラム
4.動作確認
5.MicroPythonのマルチスレッドコマンド詳細
・マルチスレッド動作の開始
・スレッドの停止
・スレッドからの同時アクセス防止(ロック)
・複数の変数を共有する方法
・キーワード引数(kwargs)の指定方法
・その他
6.まとめ
1.Picoのデュアルコアについて
「ラズパイ Pico」のデュアルコアについて簡単に紹介しておきます。
以下は、Picoのコントローラ「RP2040」のデータシートから「デュアルコア」についてのブロック図を抜粋したものです。
ブロック図では左右に独立した処理装置「Core0」と「Core1」があり「SIO:Single-cycle IO」経由で「GPIO:入出力端子」を共有しています。
「GPIO」からの信号が「SIO」の中で処理され、それぞれの「Core」へ情報を伝え、個別に処理を行なった結果を「SIO」経由で「GPIO」へ返すことで「並列処理」を実現しています。
2.配線図
動作確認に使用した配線図は以下のようになります。
スイッチ1個とLED(赤)1個、LED用の抵抗1個だけです。
配線図のように各部品を接続し「Core0」でスイッチ(SW1)が押された時に外付けLED(赤)を点滅させます。
「Core1」では本体LED(緑)を常時点滅させ、スイッチ(SW1)をどのタイミングでONしてもLED(赤)は点滅を開始し、本体LED(緑)は安定して点滅を続けることを確認することで「Core0」と「Core1」が並列動作していることが確認できます。
この他にスイッチを押した時のLED(赤)の点滅回数をカウントしています。
点滅回数のカウントは「Core0」で行うため、このカウント数を「Core1」でシリアルモニタに表示することで「Core」間での変数の共有方法についても確認します。
3.サンプルプログラム
サンプルプログラムは以下になります。「Thonny」等の開発画面にコピペで貼り付けて実行してください。
※プログラムのコピーは下の黒塗り部の右上アイコンクリックでもできます。
import _thread # 複数のタスクを同時に実行するスレッドモジュールを準備
from machine import Pin # 端子制御用モジュールを準備
import time # タイマーモジュールを準備
# 入力ピン設定
sw0 = Pin(7, Pin.IN, Pin.PULL_UP) # スイッチのピン番号を指定してswとして入力設定(プルアップ)
# 本体LEDを出力設定
led = Pin("LED", Pin.OUT) # 本体LEDを出力設定
led0 = Pin(18, Pin.OUT) # 外部LEDのピン番号を指定してled0として出力設定
# 変数宣言(スレッドで共有する変数は配列で指定)
cnt = [0] # スレッドで共有する変数
# Core1 並列処理(本体LED点滅&カウント数表示) ---------------------------------------------------------------
def core1(cnt):
while True: # ずっと繰り返し
led.value(1) # 本体LEDを点灯
time.sleep(0.5) # 待ち時間
led.value(0) # 本体LEDを消灯
time.sleep(0.5) # 待ち時間
print("cnt:{:d}".format(cnt[0])) # 表示
# Core0 メイン処理(スイッチONで外部LED点滅&カウント処理) ---------------------------------------------------------------
def core0(cnt):
while True: # ずっと繰り返し
if sw0.value() == 0: # スイッチが押されていたら
led0.value(0) # 外部LEDを点灯
time.sleep(0.5) # 待ち時間
led0.value(1) # 外部LEDを消灯
time.sleep(0.5) # 待ち時間
cnt[0] += 1 # カウント+1
else: # スイッチが押されていなければ
led0.value(1) # LEDを消灯
# 新しいスレッドでcore1関数を実行 ※引数はタプルで指定(要素1つはカンマ付)
_thread.start_new_thread(core1, (cnt,))
# メインスレッドでcore0関数を実行
core0(cnt)
4.動作確認
実際にブレッドボードを使用して、以下のように各部品を接続して動作確認を行いました。
プログラムが実行されると本体LED(緑)が点滅を繰り返します。
スイッチを押すと外付けLED(赤)が点滅を開始します。
スイッチをどのタイミングでONしてもLED(赤)は点滅を開始し、本体LED(緑)は常に一定間隔で点滅し「Core0」と「Core1」が並列動作していることが確認できます。
次に、開発環境「Thonny」の「シェル」に、スイッチを押した時のLED(赤)の点滅回数のカウント数を確認します。
以下のように、スイッチを押していない時にはカウント数は変化せず、スイッチを押した時にカウント数が増えていくことが確認できます。
ブレッドボードは1列の穴数が6個あるサンハヤトの「SAD-101」がおすすめです。
安価なものはありますが、ほとんどが1列の穴数が5個で自由度がなく、ボードが歪んでいたり、抜き差しが硬かったり緩かったりするものが多いように思います。
5.MicroPythonのマルチスレッドコマンド詳細
「ラズパイ Pico」で並列動作をさせるためには「MicroPython」の「_thread」モジュールをインポートして使用します。
「_thread」モジュールを使用することで、Pythonで複数のタスクを同時に実行する「マルチスレッド(並列動作)」プログラミングを実現することができます。
以下から「_thread」モジュールの基本的な使用方法について紹介します。
・マルチスレッド動作の開始
マルチスレッド動作を実現するには以下の手順で行います。
まず「_thread」モジュールをインポートし「Core0」で実行する「メイン処理」と「Core1」で実行する「並列処理」をそれぞれループ関数として準備します。
「Core0」の「メイン処理」は通常の関数呼び出しと同様に指定して実行します。
「Core1」の「並列処理」は「_thread.start_new_thread()」関数を使用して以下のように実行します。
_thread.start_new_thread(関数名, (配列, )) # 引数を指定する場合(タプルで指定)
_thread.start_new_thread(関数名, (), {kwargs}) # キーワード引数(kwargs)でも指定可
- 第一引数:新しいスレッドで実行する関数名を指定。
- 第二引数:関数に渡す引数をタプルで指定。
(スレッドで共有するデータは配列で渡す必要があります。) - 第三引数:第三引数で「キーワード引数(kwargs)」として「辞書」データも指定可能。
実際にマルチスレッド動作を行うための方法は、上で使用したサンプルプログラムから、マルチスレッド動作をするためのコードだけを抜粋して以下のようになります。
import _thread # 複数のタスクを同時に実行するスレッドモジュールを準備
# Core1 並列処理
def core1():
while True: # ずっと繰り返し
# Core1の処理を書く
# Core0 メイン処理
def core0():
while True: # ずっと繰り返し
# Core0の処理を書く
# 新しいスレッドでcore1関数を実行 ※引数はタプルで指定(要素1つはカンマ付)
_thread.start_new_thread(core1, ())
# メインスレッドでcore0関数を実行
core0()
・スレッドの停止
実行中のスレッドを停止するには以下を実行します。
実際にサンプルプログラムで動作確認してみましょう。動作は以下になります。
スイッチ0(GP7)を押すと「Core1」のスレッドが停止し、本体LEDが消灯します。
スイッチ1(GP8)を押すと「Core1」のスレッドが再起動し、本体LEDが点滅します。
サンプルプログラムは以下になります。「Thonny」等の開発画面にコピペで貼り付けて実行してください。
※プログラムのコピーは下の黒塗り部の右上アイコンクリックでもできます。
import _thread # 複数のタスクを同時に実行するスレッドモジュールを準備
from machine import Pin # 端子制御用モジュールを準備
import time # タイマーモジュールを準備
# 入力ピン設定
sw0 = Pin(7, Pin.IN, Pin.PULL_UP) # スレッド停止スイッチ
sw1 = Pin(8, Pin.IN, Pin.PULL_UP) # スレッド再開スイッチ
# 本体LEDを出力設定
led = Pin("LED", Pin.OUT) # 本体LEDを出力設定
# 変数宣言
state = False
# Core1 並列処理(本体LED点滅) ---------------------------------------------------------------
def core1():
while True: # ずっと繰り返し
led.value(1) # LEDを点灯
time.sleep(0.3) # 待ち時間
led.value(0) # LEDを消灯
time.sleep(0.3) # 待ち時間
if sw0.value() == 0: # スイッチ0が押されていたら
print("Core1 Stop!")
_thread.exit() # Core1スレッド停止
# Core0 メイン処理(スレッドの再開) ---------------------------------------------------------------
def core0():
while True: # ずっと繰り返し
if sw1.value() == 0 and state == False: # スイッチ1が押されていたら
state = True # スイッチ1 ON状態保持
print("Core1 Restart!")
_thread.start_new_thread(core1, ()) # Core1スレッド再起動
if sw1.value() == True: # スイッチ1が押されてなければ
state = False # スイッチ1 ON状態解除
# 新しいスレッドでcore1関数を実行
_thread.start_new_thread(core1, ())
# メインスレッドでcore0関数を実行
core0()
実行してスイッチを操作すると「Thonny」の「シェル」に以下のように表示されます。
・スレッドからの同時アクセス防止(ロック)
各スレッドで変数を共有する場合、1つの変数に同時にアクセスが行われると想定通りの動作を行うことができません。
このため、一方のスレッドで変数を操作している間は、もう一方のスレッドからアクセスされないようにする「ロック」処理が必要です。
「ロック」を行うには以下のように指定します。
with lock: # ロックを取得
# —–ロック中に実行する処理を書く—–
- _thread.allocate_lock() 関数は、新しいロックオブジェクトを作成します。
ロックオブジェクトは、複数のスレッドが共有リソースに同時にアクセスするのを防ぐために使用されます。 - with lock: ここで指定したブロック内でのみロックが取得され、ブロックを抜けると自動的にロックが解放されます。
これにより、共有リソースへの同時アクセスを防止することができます。
実際にサンプルプログラムで動作確認してみましょう。動作は以下になります。
「Core1」でカウント +1、「Core0」でカウント -1 をそれぞれ1万回行い、最終的に ±0 になるかを確認します。
サンプルプログラムは以下になります。「Thonny」等の開発画面にコピペで貼り付けて実行してください。
※プログラムのコピーは下の黒塗り部の右上アイコンクリックでもできます。
import _thread # 複数のタスクを同時に実行するスレッドモジュールを準備
# ロックオブジェクトの作成
lock = _thread.allocate_lock()
count = [0, 0, 0] # 0番目の要素は共有変数
# Core1 並列処理(1万回加算)
def core1(count):
for _ in range(10000):
count[1] += 1 # 加算カウント確認用
with lock: # ロックを取得
count[0] += 1 # 共有変数 +1
# Core0 メイン処理(1万回減算)
def core0(count):
for _ in range(10000):
count[2] -= 1 # 減算カウント確認用
with lock: # ロックを取得
count[0] -= 1 # 共有変数 -1
# 新しいスレッドでcore1関数を実行 ※引数はタプルで指定(要素1つはカンマ付)
_thread.start_new_thread(core1, (count, ))
# メインスレッドでcore0関数を実行
core0(count)
#結果を表示
print(count[0]) # 共有変数 0を出力
print(count[1]) # 加算カウント 10000を出力
print(count[2]) # 減算カウント-10000を出力
実行すると「Thonny」の「シェル」に以下のように表示されます。
・複数の変数を共有する方法
各スレッドで複数の変数を共有する方法を紹介します。
「_thread.start_new_thread()」関数の引数はタプルで指定します。
この時のタプルを複数の要素で「(elem1, elem2, elem3, ・・・)」のように指定しますが、スレッド間で共有するデータは配列として扱う必要があります。
この時注意が必要なのが数値や文字列等「型」の違うデータを扱うとき「Python」では同じ配列内で複数の型を扱えるので問題ないのですが、他の多くの言語では配列では異なる型を扱うことができません。
このため以下のように数値の配列と、文字列の配列をそれぞれ宣言して、タプルの形で渡すのが良いと思います。
str_list = [“Hello!“]
_thread.start_new_thread(関数名, (number_list, str_list)) # 複数の引数をタプルで指定
実際にサンプルプログラムで動作確認してみましょう。動作は以下になります。
「Core0」でカウントしながらスイッチの状態を格納している文字列配列の内容を変更し、「Core1」で表示させます。
サンプルプログラムは以下になります。「Thonny」等の開発画面にコピペで貼り付けて実行してください。
※プログラムのコピーは下の黒塗り部の右上アイコンクリックでもできます。
import _thread # 複数のタスクを同時に実行するスレッドモジュールを準備
from machine import Pin # 端子制御用モジュールを準備
import time # タイマーモジュールを準備
# 入力ピン設定
sw0 = Pin(7, Pin.IN, Pin.PULL_UP) # スイッチのピン番号を指定してswとして入力設定(プルアップ)
# 本体LEDを出力設定
led = Pin("LED", Pin.OUT) # 本体LEDを出力設定
led0 = Pin(18, Pin.OUT) # 外部LEDのピン番号を指定してled0として出力設定
# 変数宣言(スレッドで共有する変数は配列で指定)
state = False # ボタン状態保持用
cnt = [0] # 点滅回数カウント用(スレッドで共有する数値配列)
status = ["Stop"] # スイッチステータス表示用(スレッドで共有する文字列配列)
# Core1 並列処理(本体LED点滅&カウント数、スイッチステータス表示) ---------------------------
def core1(cnt, status):
while True: # ずっと繰り返し
led.value(1) # 本体LEDを点灯
time.sleep(0.5) # 待ち時間
led.value(0) # 本体LEDを消灯
time.sleep(0.5) # 待ち時間
print("cnt:{:d} {:s}".format(cnt[0], status[0])) # 表示
# Core0 メイン処理(スイッチONで外部LED点滅&カウント処理&スイッチステータス変更) --------------
def core0(cnt, status):
while True: # ずっと繰り返し
if sw0.value() == 0: # スイッチが押されていたら
status[0] = "Start" # スイッチステータス変更
led0.value(0) # 外部LEDを点灯
time.sleep(0.5) # 待ち時間
led0.value(1) # 外部LEDを消灯
time.sleep(0.5) # 待ち時間
cnt[0] += 1 # カウント+1
else: # スイッチが押されてい なければ
status[0] = "Stop" # スイッチステータス変更
led0.value(1) # LEDを消灯
# 新しいスレッドでcore1関数を実行 ※引数はタプルで指定。
_thread.start_new_thread(core1, (cnt, status))
# メインスレッドでcore0関数を実行
core0(cnt, status)
実行してスイッチを操作すると「Thonny」の「シェル」に以下のように表示されます。
・キーワード引数(kwargs)の指定方法
スレッドを開始する「_thread.start_new_thread()」関数では「キーワード引数(kwargs)」を指定することもできます。
def core1(**kwargs):
# キーワード引数を利用した処理を書く
_thread.start_new_thread(core1, (), keyword_args) # キーワード引数(kwargs)を指定
いい使い道が思いつかないので・・・とりあえず以下のようにして、単純に本体LEDの状態「ON/OFF」を表示させています。
import _thread # 複数のタスクを同時に実行するスレッドモジュールを準備
from machine import Pin # 端子制御用モジュールを準備
import time # タイマーモジュールを準備
# 入力ピン設定
sw0 = Pin(7, Pin.IN, Pin.PULL_UP) # スイッチのピン番号を指定してswとして入力設定(プルアップ)
# 本体LEDを出力設定
led = Pin("LED", Pin.OUT) # 本体LEDを出力設定
led0 = Pin(18, Pin.OUT) # 外部LEDのピン番号を指定してled0として出力設定
keyword_args = {"status1": "ON", "status2": "OFF"} # キーワード変数
# Core1 並列処理(本体LED点滅&ON/OFF表示) --------------------------------------------
def core1(**kwargs):
while True: # ずっと繰り返し
led.value(1) # 本体LEDを点灯
time.sleep(0.5) # 待ち時間
print(kwargs["status1"]) # ON表示
led.value(0) # 本体LEDを消灯
time.sleep(0.5) # 待ち時間
print(kwargs["status2"]) # OFF表示
# Core0 メイン処理(スイッチONで外部LED点滅) -------------------------------------------
def core0():
while True: # ずっと繰り返し
if sw0.value() == 0: # スイッチが押されていたら
led0.value(0) # 外部LEDを点灯
time.sleep(0.5) # 待ち時間
led0.value(1) # 外部LEDを消灯
time.sleep(0.5) # 待ち時間
else: # スイッチが押されていなければ
led0.value(1) # LEDを消灯
# 新しいスレッドでcore1関数を実行 ※第三引数でキーワード引数を指定
_thread.start_new_thread(core1, (), keyword_args)
# メインスレッドでcore0関数を実行
core0()
実行すると「Thonny」の「シェル」に以下のように表示され続けます。
・その他
その他にも、実行中のスレッドの番号(識別子)を確認するための「_thread.get_ident()」等があります。
「_threadモジュール」の詳細については以下の「Pythonドキュメント」のサイトでご確認ください。
6.まとめ
「Raspberry Pi Pico」でマルチコアを使用して並列動作(マルチスレッド)させる方法について詳しく紹介しました。
「並列処理」は、入出力端子の信号を共有した2つの「Core0/Core1」で別々の処理を行うことで実現しています。
通常は「Core0」で処理を行っていますが「_threadモジュール」を使用することで、新しいスレッドとして「Core1」を起動し別の処理を実行させることができます。
「Core」間で複数の変数を共有することもできるため、処理を分担して同時に実行したい場合に便利ですが、一方の「Core」で変数を操作している間は、もう一方の「Core」からアクセスされないように「ロック」する処理が必要です。
一定の間隔でプログラムを実行しながら、別の処理を行いたい場合や、処理を分担して同時に実行したい時にはマルチコアを使用してみましょう。
以下で製作している「ラズパイ Picoシーケンサ」も、複雑な制御プログラムを「ラダー動作」で簡単にプログラムを作成するために、一定の処理を実行しながら、別の処理を実行できる「マルチコア」での処理が必要となります。
まだ試作段階ですがマルチコアを有効活用した動作について、また詳しく紹介していきたいと思います。
コメント