「Raspberry Pi PicoW」2台でペアリング不要の「Bluetooth Low Energy(BLE)」を使用して、遠隔操作でLチカする方法をPython(MicroPython)のサンプルプログラムを使用して詳しく紹介します。
パソコンは「Windows11」、開発環境は「Thonny」を使用してPython(MicroPython)で行います。
「Bluetooth Low Energy(BLE)」についてや、Picoとパソコン間でデータ送受信する方法は、以下のリンクで詳しく紹介しています。

「Raspberry Pi Pico」の基本的な使い方やBluetoothクラシック(Arduino:C言語)の使い方は以下のリンクで詳しく紹介しています。


1.Bluetooth Low Energy(BLE)とは
2.動作紹介(遠隔Lチカ)
3.準備するもの
・ラズパイPicoW、Pico2W
・ブレッドボード、スイッチ
・開発環境Tonnyのインストール
4.ライブラリ(aioble)について
5.サンプルプログラム
・子機側サンプルプログラム
・親機側サンプルプログラム
6.サンプルプログラムの詳細
・子機側
・親機側
7.まとめ
1.Bluetooth Low Energy(BLE)とは
Bluetooth Low Energy(BLE)とは、従来のBluetoothクラシックとは異なり、Bluetooth 4.0以降で導入された省電力通信規格で、主にセンサーデバイスやウェアラブル機器など、バッテリー駆動で長時間稼働する機器向けに設計されたものです。
パソコンやスマホに使用するキーボードやイヤホン等はBluetooth通信で接続する時に「ペアリング」をして機器を認識させますが、Bluetooth Low Energy(BLE)では「ペアリング」不要のオープンな接続が可能なため、今回は電源ONで自動で接続されて操作できる遠隔Lチカを行います。
「Bluetooth Low Energy(BLE)」通信の大まかな動作については下図のようになります。

「Bluetooth Low Energy(BLE)」の仕組みや用語については、以下のリンクで詳しく紹介しています。

2.動作紹介(遠隔Lチカ)
PicoW同士で「BLE」通信を使用した、遠隔Lチカの動作は下画像のようになります。

実際に動作確認している様子は、下写真のようになります。
両方のPicoWの電源を入れると自動で接続が開始され、接続が完了すると親機(ボタン操作側)側のLEDが点灯します。

親機側に接続したスイッチ「SW1(青)」を押すと、子機側の「LED」が点灯します。

親機側に接続したスイッチ「SW2(赤)」を押すと、子機側の「LED」が消灯します。
子機側との接続が復旧すると、親機側の「LED」は点灯し再び通信可能になります。
3.準備するもの
動作確認のために準備するものは以下になります。
・ラズパイPicoW、Pico2W
「Raspberry Pi PicoW/Pico2W」本体は下写真になります。
2つ必要ですがどちらでも使い方は同じため、両方同じでも、1つづつでも良いので2台準備してください。

ラズパイPicoについては以下のリンクで詳しく紹介しています。

・ブレッドボード、スイッチ
親機側のPicoWにはスイッチを接続しますが、下写真のように「ブレッドボード」と「タクトスイッチ」を使用すると簡単です。

・開発環境Tonnyのインストール
開発環境「Thonny」は以下のリンクからダウンロードできます。
「Thonny」のインストール方法や使い方は以下のリンクで詳しく紹介しています。

4.ライブラリ(aioble)について
今回使用するライブラリは「MicroPython」の「aioble」です。
初期状態で使用できるため、個別にインストールする必要はありません。
Bluetooth通信を行うには「bluetooth」モジュールを使用しますが、ライブラリとしては以下の公式サイトのように「aioble」を使用することを推奨しています。
5.サンプルプログラム
・子機側サンプルプログラム
LEDを点灯させる子機側のサンプルプログラムは以下になります。
コピペで貼り付けて実行してください。コピーは下の黒塗り部右上のアイコンクリックでもできます。
# ble_peripheral_led.py
# BLEで受信したコマンド(文字列)に応じて処理を実行(ここではLEDの点灯/消灯)
from machine import Pin # PicoのGPIOモジュール
import uasyncio as asyncio # 非同期処理ライブラリ
import aioble # MicroPythonのBLEライブラリ
import bluetooth # MicroPythonのBLEモジュール
UART_SERVICE_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E") # UARTサービスUUID
UART_RX_CHAR_UUID = bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E") # UART受信用キャラクタリスティックUUID
UART_TX_CHAR_UUID = bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E") # UART送信用キャラクタリスティックUUID
# サービスとキャラクタリスティックの定義
uart_service = aioble.Service(UART_SERVICE_UUID) # UARTサービスを定義
tx_char = aioble.Characteristic(uart_service, UART_TX_CHAR_UUID, read=True, notify=True) # UART送信用キャラクタリスティックを定義
rx_char = aioble.Characteristic(uart_service, UART_RX_CHAR_UUID, write=True, capture=True) # UART受信用キャラクタリスティックを定義
aioble.register_services(uart_service) # GATTサービスを登録
led = Pin("LED", Pin.OUT) # LED端子をledとして出力に設定
# 受信コマンド処理完了メッセージ送信(通知:Notify)
async def write_result(connection, message):
print("処理結果送信:", message)
try:
if connection and connection.is_connected(): # 接続が有効な場合のみ実行
await tx_char.notify(connection, message.encode('utf-8')) # セントラルへメッセージを送信(通知)
else:
print("通知対象が無効です:", connection) # 接続が無効な場合の処理
except Exception as e: # エラー処理
if "'NoneType' object isn't iterable" in str(e): # このエラーは無視
pass
else:
print("Notifyエラー", e)
# 非同期で受信タスクを実行
async def handle_rx(connection):
# 受信待ちループ
while connection.is_connected(): # 接続が有効な間ループ
try:
connection, value = await rx_char.written() # 受信処理(受信データが来るまでここで待機する)
string = value.decode ('utf-8') # バイト文字を文字列に変換
print("受信コマンド:", string)
if string == "LED_ON": # 受信した文字列が「LED_ON」なら
led.value(1) # LED点灯
await write_result(connection, "ON!") # 受信コマンド処理完了メッセージ送信(通知:Notify)
elif string == "LED_OFF": # 受信した文字列が「LED_OFF」なら
led.value(0) # LED消灯
await write_result(connection, "OFF!") # 受信コマンド処理完了メッセージ送信(通知:Notify)
else:
await write_result(connection, "Unknown command!") # 未登録コマンドの場合のメッセージ送信(書込)
except Exception as e: # エラー処理
print("エラー:", e)
break
# BLE Peripheralとして動作するメインループ
async def ble_uart_echo():
while True:
led.value(0) # 接続待機中はLED消灯
try:
async with await aioble.advertise( # BLEアドバタイズを開始
250_000, # アドバタイズ間隔(マイクロ秒)
name = "RPi-Pico", # アドバタイズ名(BLE接続デバイス名)
services = [UART_SERVICE_UUID], # サービスUUIDを指定
) as connection: # 接続待機
print("接続中:", connection.device) # 接続されたデバイス情報を表示
# 受信処理を別タスクで開始
rx_task = asyncio.create_task(handle_rx(connection))
# 接続が切れるまで待機
while connection.is_connected():
await asyncio.sleep_ms(100) # 100ms待機
# 接続解除(タスクキャンセル)
rx_task.cancel()
print("接続解除されました")
except Exception as e:
print("アドバタイズエラー:", repr(e))
await asyncio.sleep_ms(200) # 200ms待機してから再アドバタイズ
# PicoのBLE Peripheralとして動作開始
asyncio.run(ble_uart_echo())・親機側サンプルプログラム
ボタン操作をする親機側のサンプルプログラムは以下になります。
コピペで貼り付けて実行してください。コピーは下の黒塗り部右上のアイコンクリックでもできます。
# ble_central_button.py
# ボタンを押すとBLEでコマンド(文字列)送信(ここではBTN1を押すと「LED ON」送信、BTN1を押すと「LED OFF」送信)
from machine import Pin # PicoのGPIO(標準入出力)モジュール
import uasyncio as asyncio # 非同期処理ライブラリ
import aioble # MicroPythonのBLEライブラリ
import bluetooth # MicroPythonのBLEモジュール
device_name = "RPi-Pico" # 接続したいデバイス名
# UUIDの定義
UART_SERVICE_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E") # UARTサービスUUID
UART_RX_CHAR_UUID = bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E") # UART受信用キャラクタリスティックUUID
UART_TX_CHAR_UUID = bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E") # UART送信用キャラクタリスティックUUID
# 入力端子設定
BTN1 = Pin(20, Pin.IN, Pin.PULL_UP) # GP20をBTN1として入力端子(プルアップ)に設定
BTN2 = Pin(16, Pin.IN, Pin.PULL_UP) # GP16をBTN2として入力端子(プルアップ)に設定
led = Pin("LED", Pin.OUT) # LED端子をledとして出力に設定
# デバイススキャン実行
async def find_device():
async with aioble.scan(5000, interval_us=30000, window_us=30000, active=True) as scanner:
async for result in scanner: # スキャン結果を順番に処理
print("見つかったデバイス:", result.name(), result.device)
if result.name() == device_name: # デバイス名が見つかったら
return result.device # デバイス情報を返す
return None # 見つからなかったらNoneを返す
# コマンド送信
async def write_command(command, rx_char):
print("送信コマンド:", command)
await rx_char.write(command.encode('utf-8')) # コマンド送信実行
# Notifyを受信して表示
async def handle_notify(tx_char):
while True:
try:
data = await tx_char.notified() # Notifyを待機
print("Notify受信:", data.decode('utf-8')) # 受信データを表示
except Exception as e:
print("Notifyエラー:", e)
break
# デバイスに接続実行(リトライあり)
async def connect_to_device():
while True:
led.value(0) # LED消灯(未接続状態)
device = await find_device() # デバイススキャン実行
if not device: # デバイスが見つからなかったら
print(device, "が見つかりません。再スキャンします...")
await asyncio.sleep(3) # 3s待機
continue # 再スキャンへ
try:
print(device_name, "に接続開始...")
connection = await device.connect() # デバイスに接続
if connection: # 接続に成功したら
print("接続済")
led.value(1) # LED点灯(接続成功)
return connection # 接続成功したら返す
else:
print("接続に失敗しました。再試行します...")
except Exception as e: # 接続エラー処理
print("接続エラー:", e) # エラーメッセージ表示
await asyncio.sleep(3) # 少し待ってから再試行
# メイン処理
async def main():
btn_state1 = False # BTN1ボタン状態格納用
btn_state2 = False # BTN2ボタン状態格納用
while True:
led.value(0) # LED消灯
connection = await connect_to_device() # デバイスに接続(リトライあり)
async with connection: # 接続コンテキスト内で処理
try:
uart_service = await connection.service(UART_SERVICE_UUID) # UART接続のサービスを取得
rx_char = await uart_service.characteristic(UART_RX_CHAR_UUID) # ペリフェラルのRX受信用キャラクタリスティック取得
tx_char = await uart_service.characteristic(UART_TX_CHAR_UUID) # ペリフェラルのTX送信用キャラクタリスティック取得
await tx_char.subscribe() # Notifyを購読
notify_task = asyncio.create_task(handle_notify(tx_char)) # Notify受信タスク開始
except Exception as e:
print("サービスとキャラクタリスティックの取得に失敗", e)
return
# 接続中の処理ループ
while connection.is_connected(): # 接続が維持されている間
if BTN1.value() == 0 and not btn_state1: # BTN1が押されたら
btn_state1 = True # ボタン1の状態をTrueへ
await write_command("LED_ON", rx_char) # コマンド「LED_ON」送信
if BTN1.value() == 1 and btn_state1: # BTN1が離されたら
btn_state1 = False # ボタン1の状態をFalseへ
if BTN2.value() == 0 and not btn_state2: # BTN2が押されたら
btn_state2 = True # ボタン2の状態をTrueへ
await write_command("LED_OFF", rx_char) # コマンド「LED_OFF」送信
if BTN2.value() == 1 and btn_state2: # BTN2が離されたら
btn_state2 = False # ボタン2の状態をFalseへ
await asyncio.sleep_ms(100) # 100ms待機
notify_task.cancel() # 接続が切れたらNotifyタスクをキャンセル
asyncio.run(main()) # メイン処理実行6.サンプルプログラムの詳細
サンプルプログラムの詳細は下図を使用して紹介していきます。
サンプルプログラム内の各動作ごとに、使用している関数(aiobleライブラリ)も記入しています。

「UUIDの定義」や「GATTサービスの登録」については、前回と同じで以下のリンクで詳しく紹介していますのでそちらを参照してください。

以下からは「遠隔Lチカ」操作をするための子機へのコマンド(文字列)送信や、通信完了結果を親機へ通知(Notify)する方法を詳しく紹介します。
・子機側
子機側では、まず親機から認識されるようにサンプルプログラムの「63〜68行目」の中で以下のように「サービスUUID等」と一緒に「アドバタイズ名」を設定して「アドバタイズ(広告)」を開始します。
async with await aioble.advertise( # BLEアドバタイズを開始
250_000, # アドバタイズ間隔(マイクロ秒)
name = "RPi-Pico", # アドバタイズ名(BLE接続デバイス名)
services = [UART_SERVICE_UUID], # サービスUUIDを指定
) as connection: # 接続待機
print("接続中:", connection.device) # 接続されたデバイス情報を表示「アドバタイズ」が開始されたらサンプルプログラム「71行目」で以下のように非同期関数「handle_rx()」を実行します。
# 受信処理を別タスクで開始
rx_task = asyncio.create_task(handle_rx(connection))データを受信するには「handle_rx()」内のサンプルプログラム「41行目」で以下のように非同期関数「rx_char.written()」を実行することで、親機からのコマンド(文字列)受信(書き込み)を待ちます。
connection, value = await rx_char.written() # 受信処理(受信データが来るまでここで待機する)親機からコマンド(文字列)が送信されてきたら、非同期関数「handle_rx()」内の受信コマンドごとの処理が実行されます。
ここでは「LED_ON」を受信したらLEDを点灯、「LED_OFF」を受信したらLEDを消灯させています。
処理が完了したら、親機へ処理が完了したことを伝えるために、サンプルプログラムの「47、50、52行目」のように「受信完了メッセージ」を引数として「23〜29行目」の以下非同期関数「write_result()」を呼び出します。
「write_result()」では「受信メッセージ(message)」を以下「6行目」の「tx_char.notify()」で親機へ送信(Notify:通知)します。
# 受信コマンド処理完了メッセージ送信(通知:Notify)
async def write_result(connection, message):
print("処理結果送信:", message)
try:
if connection and connection.is_connected(): # 接続が有効な場合のみ実行
await tx_char.notify(connection, message.encode('utf-8')) # セントラルへメッセージを送信(通知)
else:
print("通知対象が無効です:", connection) # 接続が無効な場合の処理「tx_char.notify()」を実行すると「‘NoneType’ object isn’t iterable」のエラーが発生します。
動作には影響ないため、以下のようにエラー処理で無視するようにしています。
except Exception as e: # エラー処理
if "'NoneType' object isn't iterable" in str(e): # このエラーは無視
pass
else:
print("Notifyエラー", e)・親機側
親機側では、まずサンプルプログラムのメインループ内「78行目」で以下のように非同期関数「connect_to_device()」を実行して接続情報を取得します。
connection = await connect_to_device() # デバイスに接続(リトライあり)「connect_to_device()」ではサンプルプログラム「22〜29行目」のように以下の「find_device()」関数を実行して、子機側で設定した「アドバタイズ名:RPi-Pico」を探して接続を行い、接続完了でデバイスの情報を取得して返します。(接続先が見つからない場合はリトライで再接続を繰り返します。)
# デバイススキャン実行
async def find_device():
async with aioble.scan(5000, interval_us=30000, window_us=30000, active=True) as scanner:
async for result in scanner: # スキャン結果を順番に処理
print("見つかったデバイス:", result.name(), result.device)
if result.name() == device_name: # デバイス名が見つかったら
return result.device # デバイス情報を返す
return None # 見つからなかったらNoneを返す接続が完了したら、サンプルプログラム「82〜84行目」で以下のように、子機側に設定されている「サービスUUID」を取得し、データの送受信先の「キャラクタリスティック」を取得します。
uart_service = await connection.service(UART_SERVICE_UUID) # UART接続のサービスを取得
rx_char = await uart_service.characteristic(UART_RX_CHAR_UUID) # ペリフェラルのRX受信用キャラクタリスティック取得
tx_char = await uart_service.characteristic(UART_TX_CHAR_UUID) # ペリフェラルのTX送信用キャラクタリスティック取得・rx_char:このキャラクタリスティックに対して親機が書き込むことで、子機にコマンドを送信できる。
・tx_char:子機はこのキャラクタリスティックを使って 親機へ通知(Notify) を送信できる(親機側で通知を許可しておく必要がある)。
親機が子機からの「通知によるデータ受信(Notify)」を行うには、サンプルプログラム「86行目」で以下のように、通知(Notify)を許可(購読)する必要があります。
await tx_char.subscribe() # Notifyを購読次に以下のように、非同期関数「handle_notify()」で通知(Notify)を受信する非同期タスクを起動します。
notify_task = asyncio.create_task(handle_notify(tx_char)) # Notify受信タスク開始「handle_notify()」では、サンプルプログラム「40行目」で以下のように「tx_char.notified()」を実行して通知を待ち受けることで、子機側からの「通知によるデータ受信(Notify)」ができるようになります。
data = await tx_char.notified() # Notifyを待機データの送信はサンプルプログラム「93〜107行目」の中で処理に応じて「rx_char」に対して送信データ(文字列)を指定して以下のように「write_command()」を実行するだけです。
await write_command("LED_ON", rx_char) # コマンド「LED_ON」送信7.まとめ
「Raspberry Pi PicoW」同士でペアリング不要の「Bluetooth Low Energy(BLE)」通信を使用して、遠隔操作でLチカする方法を詳しく紹介しました。
「Bluetooth Low Energy(BLE)」とは、Bluetooth 4.0以降で導入された省電力通信規格で、主にセンサーデバイスやウェアラブル機器など、バッテリー駆動で長時間稼働する機器向けに設計されたものです。
Bluetooth通信では一般的に「ペアリング」をして機器を認識させますが「Bluetooth Low Energy(BLE)」では「ペアリング」不要のオープンな接続が可能です。
通信速度は比較的遅く大量のデータ転送には不向きですが、低消費電力で同時に多数のデバイスと容易に接続でき「ペアリング」も必須ではないため、低容量のデータ送受信やデバイスの温度データ取得等IoT機器には最適な無線通信方法です。
パソコン等の親機側は「アドバタイズ名」で子機(周辺機器)を探し、見つかったら「アドバタイズ」された「サービスUUID」を取得します。
「サービスUUID」を取得したら用途別の「キャラクタリスティック」を取得し、これらに対してデータを送信したり読み込んだり、親機側が受信(通知:Notify)を許可した「キャラクタリスティック」からのデータを受信(通知:Notify)することができます。
通信方式は少々複雑ですが、ここで紹介した「遠隔Lチカ」動作で、手順を確認しながら動作確認していけば理解の近道になると思うので、まずは実際に動作確認してみましょう。






























コメント