ラズパイPicoマルチコアで並列動作させる方法:MicroPython編

「Raspberry Pi Pico」の2つのCoreを使って、2つの処理を同時に実行(並列動作:マルチスレッド)させる方法について詳しく紹介します。
一定の間隔でプログラムを実行しながら、別の処理を行いたい場合や、処理を分担して同時に実行したい場合に便利です。

特に複雑な設定はなく、簡単に実現できるためぜひお試しを♪


「ラズパイ Pico」の詳細や基本的な使い方は、以下のリンクで詳しく紹介しています。

ラズパイPicoの使い方 MicroPython&開発環境Thonny、SSLエラーの対処方法も紹介
Raspberry Pi Picoので開発環境Thonnyを使用した「Python(MicroPython)」でのプログラミング方法について初期設定からパッケージ(ライブラリ)の追加、動作確認の方法まで詳しく紹介します。
ラズパイPico基本プログラムPython編(Lチカ,入出力,アナログ,PWM)
Raspberry Pi Picoの基本的なLチカや入出力,アナログ入力(ADC),PWM出力を行うPythonのコピペ用サンプルプログラムをまとめました。
開発環境は「Thonny」を使用します。プログラムを修正して再度実行するときは、一度停止してから実行してください。エラーが発生します。
スポンサーリンク

1.Picoのデュアルコアについて

「ラズパイ Pico」のデュアルコアについて簡単に紹介しておきます。

以下は、Picoのコントローラ「RP2040」のデータシートから「デュアルコア」についてのブロック図を抜粋したものです。

Raspberry Pi Pico Dual CORE

ブロック図では左右に独立した処理装置「Core0」と「Core1」があり「SIO:Single-cycle IO」経由で「GPIO:入出力端子」を共有しています。

Raspberry Pi Pico Dual CORE

「GPIO」からの信号が「SIO」の中で処理され、それぞれの「Core」へ情報を伝え、個別に処理を行なった結果を「SIO」経由で「GPIO」へ返すことで「並列処理」を実現しています。

通常起動しているのは「Core0」です。
「Core1」も起動しますが、すぐにディープスリープ状態となっています。
スポンサーリンク

2.配線図

動作確認に使用した配線図は以下のようになります。
スイッチ1個とLED(赤)1個、LED用の抵抗1個だけです。

Raspberry Pi Pico デュアルコア配線図

配線図のように各部品を接続し「Core0」でスイッチ(SW1)が押された時に外付けLED(赤)を点滅させます。
Core1」では本体LED(緑)を常時点滅させ、スイッチ(SW1)をどのタイミングでONしてもLED(赤)は点滅を開始し、本体LED(緑)は安定して点滅を続けることを確認することで「Core0」と「Core1」が並列動作していることが確認できます。

各LEDの点滅は0.5秒ON、0.5秒OFFで行います。時間カウントは「time.sleep(0.5)」を使用するため、各「Core」ごとに、この時間分の遅延は発生します。

この他にスイッチを押した時のLED(赤)の点滅回数をカウントしています。

点滅回数のカウントは「Core0」で行うため、このカウント数を「Core1」でシリアルモニタに表示することで「Core」間での変数の共有方法についても確認します。

スポンサーリンク

3.サンプルプログラム

サンプルプログラムは以下になります。「Thonny」等の開発画面にコピペで貼り付けて実行してください。
※プログラムのコピーは下の黒塗り部の右上アイコンクリックでもできます。

import _thread  # 複数のタスクを同時に実行するスレッドモジュールを準備
from machine import Pin  # 端子制御用モジュールを準備
import time  # タイマーモジュールを準備

# 入力ピン設定
sw0 = Pin(7, Pin.IN, Pin.PULL_UP)  # スイッチのピン番号を指定してswとして入力設定(プルアップ)

# 本体LEDを出力設定
led = Pin("LED", Pin.OUT)  # 本体LEDを出力設定
led0 = Pin(18, Pin.OUT)  # 外部LEDのピン番号を指定してled0として出力設定

# 変数宣言(スレッドで共有する変数は配列で指定)
cnt = [0]  # スレッドで共有する変数

# Core1 並列処理(本体LED点滅&カウント数表示) ---------------------------------------------------------------
def core1(cnt):
    while True:  # ずっと繰り返し
        led.value(1)     # 本体LEDを点灯
        time.sleep(0.5)  # 待ち時間
        led.value(0)     # 本体LEDを消灯
        time.sleep(0.5)  # 待ち時間
        print("cnt:{:d}".format(cnt[0]))  # 表示
        
# Core0 メイン処理(スイッチONで外部LED点滅&カウント処理) ---------------------------------------------------------------
def core0(cnt):
    while True:  # ずっと繰り返し
        if sw0.value() == 0:  # スイッチが押されていたら
            led0.value(0)       # 外部LEDを点灯
            time.sleep(0.5)     # 待ち時間
            led0.value(1)       # 外部LEDを消灯
            time.sleep(0.5)     # 待ち時間
            cnt[0] += 1         # カウント+1
        else:                 # スイッチが押されていなければ
            led0.value(1)       # LEDを消灯

# 新しいスレッドでcore1関数を実行 ※引数はタプルで指定(要素1つはカンマ付)
_thread.start_new_thread(core1, (cnt,))
# メインスレッドでcore0関数を実行
core0(cnt)
PCBGOGOバナー600_1
PCBGOGOバナー600_2

4.動作確認

実際にブレッドボードを使用して、以下のように各部品を接続して動作確認を行いました。

Raspberry Pi Pico Dual CORE

プログラムが実行されると本体LED(緑)が点滅を繰り返します。

Raspberry Pi Pico デュアルコア動作確認

スイッチを押すと外付けLED(赤)が点滅を開始します。

スイッチをどのタイミングでONしてもLED(赤)は点滅を開始し、本体LED(緑)は常に一定間隔で点滅し「Core0」と「Core1」が並列動作していることが確認できます。

次に、開発環境「Thonny」の「シェル」に、スイッチを押した時のLED(赤)の点滅回数のカウント数を確認します。

以下のように、スイッチを押していない時にはカウント数は変化せず、スイッチを押した時にカウント数が増えていくことが確認できます。

Raspberry Pi Pico デュアルコア変数の受け渡し

ブレッドボードは1列の穴数が6個あるサンハヤトの「SAD-101」がおすすめです。
安価なものはありますが、ほとんどが1列の穴数が5個で自由度がなく、ボードが歪んでいたり、抜き差しが硬かったり緩かったりするものが多いように思います。

5.MicroPythonのマルチスレッドコマンド詳細

「ラズパイ Pico」で並列動作をさせるためには「MicroPython」の「_thread」モジュールをインポートして使用します。

「_thread」モジュールを使用することで、Pythonで複数のタスクを同時に実行する「マルチスレッド(並列動作)」プログラミングを実現することができます。

以下から「_thread」モジュールの基本的な使用方法について紹介します。

・マルチスレッド動作の開始

マルチスレッド動作を実現するには以下の手順で行います。

まず「_thread」モジュールをインポートし「Core0」で実行する「メイン処理」と「Core1」で実行する「並列処理」をそれぞれループ関数として準備します。

「Core0」の「メイン処理」は通常の関数呼び出しと同様に指定して実行します。
「Core1」の「並列処理」は「_thread.start_new_thread()」関数を使用して以下のように実行します。

_thread.start_new_thread(関数名, ())  # 引数を指定しない場合
_thread.start_new_thread(関数名, (配列, ))  # 引数を指定する場合(タプルで指定)
_thread.start_new_thread(関数名, (), {kwargs})  # キーワード引数(kwargs)でも指定可
  • 第一引数:新しいスレッドで実行する関数名を指定。
  • 第二引数:関数に渡す引数をタプルで指定。
         (スレッドで共有するデータは配列で渡す必要があります。)
  • 第三引数:第三引数で「キーワード引数(kwargs)」として「辞書」データも指定可能。

実際にマルチスレッド動作を行うための方法は、上で使用したサンプルプログラムから、マルチスレッド動作をするためのコードだけを抜粋して以下のようになります。

import _thread  # 複数のタスクを同時に実行するスレッドモジュールを準備

# Core1 並列処理
def core1():
    while True:  # ずっと繰り返し
        # Core1の処理を書く
        
# Core0 メイン処理
def core0():
    while True:  # ずっと繰り返し
        # Core0の処理を書く

# 新しいスレッドでcore1関数を実行 ※引数はタプルで指定(要素1つはカンマ付)
_thread.start_new_thread(core1, ())
# メインスレッドでcore0関数を実行
core0()
このように指定することで「core0」関数と「core1」関数が並列動作し、各「Core」で別々のプログラムを実行するマルチスレッドを実現することができます。

・スレッドの停止

実行中のスレッドを停止するには以下を実行します。

_thread.exit()  # 起動しているスレッドを停止
スレッドを再開するには、起動しているスレッド側から「_thread.start_new_thread()」を実行する必要があります。

実際にサンプルプログラムで動作確認してみましょう。動作は以下になります。

実行すると「Core1」で本体LEDの点滅プログラムが実行されます。
スイッチ0(GP7)を押すと「Core1」のスレッドが停止し、本体LEDが消灯します。
スイッチ1(GP8)を押すと「Core1」のスレッドが再起動し、本体LEDが点滅します。
スイッチ0は長めに押してください。
再起動の動作確認をするためには、もう1つのスイッチを「GP8」に接続する必要があります。

サンプルプログラムは以下になります。「Thonny」等の開発画面にコピペで貼り付けて実行してください。
※プログラムのコピーは下の黒塗り部の右上アイコンクリックでもできます。

import _thread  # 複数のタスクを同時に実行するスレッドモジュールを準備
from machine import Pin  # 端子制御用モジュールを準備
import time  # タイマーモジュールを準備

# 入力ピン設定
sw0 = Pin(7, Pin.IN, Pin.PULL_UP)  # スレッド停止スイッチ
sw1 = Pin(8, Pin.IN, Pin.PULL_UP)  # スレッド再開スイッチ

# 本体LEDを出力設定
led = Pin("LED", Pin.OUT)  # 本体LEDを出力設定

#  変数宣言
state = False

# Core1 並列処理(本体LED点滅) ---------------------------------------------------------------
def core1():
    while True:  # ずっと繰り返し
        led.value(1)     # LEDを点灯
        time.sleep(0.3)  # 待ち時間
        led.value(0)     # LEDを消灯
        time.sleep(0.3)  # 待ち時間
        if sw0.value() == 0:  # スイッチ0が押されていたら
            print("Core1 Stop!")
            _thread.exit()  # Core1スレッド停止
        
# Core0 メイン処理(スレッドの再開) ---------------------------------------------------------------
def core0():
    while True:  # ずっと繰り返し
        if sw1.value() == 0 and state == False:  # スイッチ1が押されていたら
            state = True                         # スイッチ1 ON状態保持
            print("Core1 Restart!")
            _thread.start_new_thread(core1, ())  # Core1スレッド再起動
        if sw1.value() == True:                  # スイッチ1が押されてなければ
            state = False                        # スイッチ1 ON状態解除
            
# 新しいスレッドでcore1関数を実行 
_thread.start_new_thread(core1, ())
# メインスレッドでcore0関数を実行
core0()

実行してスイッチを操作すると「Thonny」の「シェル」に以下のように表示されます。

Raspberry Pi Pico マルチスレッド スレッドの停止
「スイッチ0」を押すと「Core1 Stop!」が表示され、本体LEDが消灯するため「Core1」が停止したことが確認できます。
「スイッチ1」を押すと「Core1 Restart!」が表示され、本体LEDが点滅を再開するため「Core1」が再起動したことが確認できます。
「Core1」は停止と再起動が可能ですが、「Core0」は停止はできても再起動はできませんでした。

・スレッドからの同時アクセス防止(ロック)

各スレッドで変数を共有する場合、1つの変数に同時にアクセスが行われると想定通りの動作を行うことができません。

このため、一方のスレッドで変数を操作している間は、もう一方のスレッドからアクセスされないようにする「ロック」処理が必要です。

「ロック」を行うには以下のように指定します。

lock = _thread.allocate_lock()    # ロックオブジェクトの作成
with lock:   # ロックを取得
    # —–ロック中に実行する処理を書く—–
  • _thread.allocate_lock() 関数は、新しいロックオブジェクトを作成します。 
    ロックオブジェクトは、複数のスレッドが共有リソースに同時にアクセスするのを防ぐために使用されます。
  • with lock: ここで指定したブロック内でのみロックが取得され、ブロックを抜けると自動的にロックが解放されます。
    これにより、共有リソースへの同時アクセスを防止することができます。

実際にサンプルプログラムで動作確認してみましょう。動作は以下になります。

各スレッドで「count」変数を共有し操作します。
「Core1」でカウント +1、「Core0」でカウント -1 をそれぞれ1万回行い、最終的に ±0 になるかを確認します。

サンプルプログラムは以下になります。「Thonny」等の開発画面にコピペで貼り付けて実行してください。
※プログラムのコピーは下の黒塗り部の右上アイコンクリックでもできます。

import _thread  # 複数のタスクを同時に実行するスレッドモジュールを準備

# ロックオブジェクトの作成
lock = _thread.allocate_lock()

count = [0, 0, 0]  # 0番目の要素は共有変数

# Core1 並列処理(1万回加算)
def core1(count):
    for _ in range(10000):
        count[1] += 1  # 加算カウント確認用
        with lock:  # ロックを取得
            count[0] += 1  # 共有変数 +1
        
# Core0 メイン処理(1万回減算)
def core0(count):
    for _ in range(10000):
        count[2] -= 1  # 減算カウント確認用
        with lock:  # ロックを取得
            count[0] -= 1  # 共有変数 -1

# 新しいスレッドでcore1関数を実行 ※引数はタプルで指定(要素1つはカンマ付)
_thread.start_new_thread(core1, (count, ))
# メインスレッドでcore0関数を実行
core0(count)

#結果を表示
print(count[0])  # 共有変数 0を出力
print(count[1])  # 加算カウント 10000を出力
print(count[2])  # 減算カウント-10000を出力

実行すると「Thonny」の「シェル」に以下のように表示されます。

Raspberry Pi Pico マルチスレッド ロック処理
「Core1」のカウント数は「10000」、「Core0」のカウント数は「-10000」、共有変数のカウント数は「0」となり、正常に処理が行われたことが確認できます。
※サンプルプログラムの「12,19行目」の「with lock:」を削除すると、出力結果は「0」にならず、実行するたびに違う数値が表示されることが確認できます。

・複数の変数を共有する方法

各スレッドで複数の変数を共有する方法を紹介します。

「_thread.start_new_thread()」関数の引数はタプルで指定します。
この時のタプルを複数の要素で「(elem1, elem2, elem3, ・・・)」のように指定しますが、スレッド間で共有するデータは配列として扱う必要があります。

この時注意が必要なのが数値や文字列等「型」の違うデータを扱うとき「Python」では同じ配列内で複数の型を扱えるので問題ないのですが、他の多くの言語では配列では異なる型を扱うことができません。

このため以下のように数値の配列と、文字列の配列をそれぞれ宣言して、タプルの形で渡すのが良いと思います。

number_list = [1]
str_list
= [“Hello!“]
_thread
.start_new_thread(関数名, (number_list, str_list))
  # 複数の引数をタプルで指定

実際にサンプルプログラムで動作確認してみましょう。動作は以下になります。

最初のサンプルプログラムに、スイッチのON/OFF状態を「Start/Stop」で表示するように変更したものです。
「Core0」でカウントしながらスイッチの状態を格納している文字列配列の内容を変更し、「Core1」で表示させます。

サンプルプログラムは以下になります。「Thonny」等の開発画面にコピペで貼り付けて実行してください。
※プログラムのコピーは下の黒塗り部の右上アイコンクリックでもできます。

import _thread  # 複数のタスクを同時に実行するスレッドモジュールを準備
from machine import Pin  # 端子制御用モジュールを準備
import time  # タイマーモジュールを準備

# 入力ピン設定
sw0 = Pin(7, Pin.IN, Pin.PULL_UP)  # スイッチのピン番号を指定してswとして入力設定(プルアップ)

# 本体LEDを出力設定
led = Pin("LED", Pin.OUT)  # 本体LEDを出力設定
led0 = Pin(18, Pin.OUT)  # 外部LEDのピン番号を指定してled0として出力設定

# 変数宣言(スレッドで共有する変数は配列で指定)
state = False  # ボタン状態保持用
cnt = [0]  # 点滅回数カウント用(スレッドで共有する数値配列)
status = ["Stop"] # スイッチステータス表示用(スレッドで共有する文字列配列)

# Core1 並列処理(本体LED点滅&カウント数、スイッチステータス表示) ---------------------------
def core1(cnt, status):
    while True:  # ずっと繰り返し
        led.value(1)     # 本体LEDを点灯
        time.sleep(0.5)  # 待ち時間
        led.value(0)     # 本体LEDを消灯
        time.sleep(0.5)  # 待ち時間
        print("cnt:{:d} {:s}".format(cnt[0], status[0]))  # 表示

# Core0 メイン処理(スイッチONで外部LED点滅&カウント処理&スイッチステータス変更) --------------
def core0(cnt, status):
    while True:  # ずっと繰り返し
        if sw0.value() == 0:  # スイッチが押されていたら
            status[0] = "Start" # スイッチステータス変更
            led0.value(0)       # 外部LEDを点灯
            time.sleep(0.5)     # 待ち時間
            led0.value(1)       # 外部LEDを消灯
            time.sleep(0.5)     # 待ち時間
            cnt[0] += 1         # カウント+1
        else:                 # スイッチが押されてい なければ
            status[0] = "Stop"  # スイッチステータス変更
            led0.value(1)       # LEDを消灯

# 新しいスレッドでcore1関数を実行 ※引数はタプルで指定。
_thread.start_new_thread(core1, (cnt, status))
# メインスレッドでcore0関数を実行
core0(cnt, status)

実行してスイッチを操作すると「Thonny」の「シェル」に以下のように表示されます。

Raspberry Pi Pico マルチスレッド 複数変数の共有
スイッチを押すとカウント数が増え「Stop」が「Start」に変化します。
スイッチを離すとカウントが停止し「Start」が「Stop」に変化します。
カウントやスイッチの状態変更と表示処理は別々の「Core」で実行しているため、複数の異なる型の変数を共有できていることが確認できます。

・キーワード引数(kwargs)の指定方法

スレッドを開始する「_thread.start_new_thread()」関数では「キーワード引数(kwargs)」を指定することもできます。

keyword_args = {“key1“: “value1“, “key2“: “value2“}  # 辞書型でキーワードを設定
def core1(**kwargs):
    # キーワード引数を利用した処理を書く

# 新しいスレッドでcore1関数を実行

_thread.start_new_thread(core1, (), keyword_args)  # キーワード引数(kwargs)を指定

いい使い道が思いつかないので・・・とりあえず以下のようにして、単純に本体LEDの状態「ON/OFF」を表示させています。

import _thread  # 複数のタスクを同時に実行するスレッドモジュールを準備
from machine import Pin  #  端子制御用モジュールを準備
import time  # タイマーモジュールを準備

# 入力ピン設定
sw0 = Pin(7, Pin.IN, Pin.PULL_UP)  # スイッチのピン番号を指定してswとして入力設定(プルアップ)

# 本体LEDを出力設定
led = Pin("LED", Pin.OUT)  # 本体LEDを出力設定
led0 = Pin(18, Pin.OUT)  # 外部LEDのピン番号を指定してled0として出力設定
keyword_args = {"status1": "ON", "status2": "OFF"}  # キーワード変数

# Core1 並列処理(本体LED点滅&ON/OFF表示) --------------------------------------------
def core1(**kwargs):
    while True:  # ずっと繰り返し
        led.value(1)     # 本体LEDを点灯
        time.sleep(0.5)  # 待ち時間
        print(kwargs["status1"])  # ON表示
        led.value(0)     # 本体LEDを消灯
        time.sleep(0.5)  # 待ち時間
        print(kwargs["status2"])  # OFF表示
        
# Core0 メイン処理(スイッチONで外部LED点滅) -------------------------------------------
def core0():
    while True:  # ずっと繰り返し
        if sw0.value() == 0:  # スイッチが押されていたら
            led0.value(0)       # 外部LEDを点灯
            time.sleep(0.5)     # 待ち時間
            led0.value(1)       # 外部LEDを消灯
            time.sleep(0.5)     # 待ち時間
        else:                 # スイッチが押されていなければ
            led0.value(1)       # LEDを消灯

# 新しいスレッドでcore1関数を実行 ※第三引数でキーワード引数を指定
_thread.start_new_thread(core1, (), keyword_args)
# メインスレッドでcore0関数を実行
core0()

実行すると「Thonny」の「シェル」に以下のように表示され続けます。

Raspberry Pi Pico マルチスレッド キーワード変数
LEDの点滅に合わせてキーワード引数で指定した「ON」「OFF」が表示されることが確認できます。

・その他

その他にも、実行中のスレッドの番号(識別子)を確認するための「_thread.get_ident()」等があります。

「_threadモジュール」の詳細については以下の「Pythonドキュメント」のサイトでご確認ください。

_thread --- 低水準の スレッド API
このモジュールはマルチスレッド (別名 軽量プロセス( light-weight processes)または タスク( tasks)) に用いられる低水準プリミティブを提供します --- グローバルデータ空間を共有するマルチスレッドを制御します。同期のための単純なロック (別名 mutexes またはバイナリセマフォ ...

6.まとめ

「Raspberry Pi Pico」でマルチコアを使用して並列動作(マルチスレッド)させる方法について詳しく紹介しました。

「並列処理」は、入出力端子の信号を共有した2つの「Core0/Core1」で別々の処理を行うことで実現しています。

通常は「Core0」で処理を行っていますが「_threadモジュール」を使用することで、新しいスレッドとして「Core1」を起動し別の処理を実行させることができます。

「Core」間で複数の変数を共有することもできるため、処理を分担して同時に実行したい場合に便利ですが、一方の「Core」で変数を操作している間は、もう一方の「Core」からアクセスされないように「ロック」する処理が必要です。

一定の間隔でプログラムを実行しながら、別の処理を行いたい場合や、処理を分担して同時に実行したい時にはマルチコアを使用してみましょう。

以下で製作している「ラズパイ Picoシーケンサ」も、複雑な制御プログラムを「ラダー動作」で簡単にプログラムを作成するために、一定の処理を実行しながら、別の処理を実行できる「マルチコア」での処理が必要となります。
まだ試作段階ですがマルチコアを有効活用した動作について、また詳しく紹介していきたいと思います。

ラズパイPicoでシーケンサを作ろう(動作確認編)基板製造、部品実装
Raspbrry Pi Picoで製作したシーケンサ(PLC)基板の動作確認(入出力、ラダー、I2C/UART通信等)を行います。基板の発注方法も詳しく紹介していきます。
ラズパイPicoシーケンサ、ラダーをPythonで書く方法
シーケンサのラダー図からPythonプログラムに置き換えて動作確認する方法を、Raspberry Pi Picoでサンプルプログラムを使って詳しく紹介します。

コメント

タイトルとURLをコピーしました