ラズパイPicoWペアリング不要簡単Bluetooth(BLE)通信の使い方

Blurtooth Low Energy(BLE)アイキャッチ

「Raspberry Pi PicoW」でBluetooth Low Energy(BLE)を使用して、ペアリング不要でデータを送受信する方法をPython(MicroPython)のサンプルプログラムを使用して詳しく紹介します。

以前、Bluetoothクラシックを使用して、ペアリング後にCOMポートのようにデータを送受信する方法を紹介しましたが、今回はBluetooth Low Energy(BLE)を使用してペアリング不要(オープン接続)なデータの送受信を行います。
動作確認には自作のpythonアプリを使用します。受信データのcsvファイル保存機能付きで便利です。このアプリのコードも最後に公開してますので動作確認用に使用してください。
パソコンは「Windows11」、開発環境は「Tonny」を使用してPython(MicroPython)で行います。
今回は「PicoW」を使用しましたが「Pico2W」でも使い方は同じです。
Raspberry Pi Pico W
共立電子産業株式会社 KYOHRITSU ELECTRONIC INDUSTRY CO.,LTD.

「Raspberry Pi Pico」の基本的な使い方やBluetoothクラシック(Arduino:C言語)の使い方は以下のリンクで詳しく紹介しています。

ラズパイPico/PicoWの使い方を3つの開発環境Python、ArduinoIDE、PlatformIOで紹介
Raspberry Pi Pico/PicoWの使い方を端子配列からPython(MicroPython)とC言語の開発環境、Lチカ方法まで紹介。PythonはTonny、C言語はArduinoIDEとPlatformIOの3種類で詳しく紹介します。
ラズパイPicoW/Pico2W Bluetooth通信の使い方
Raspberry Pi PicoWでBluetooth通信を使用して、遠隔操作やデータ取得する方法をサンプルプログラムを使用して詳しく紹介します。
スポンサーリンク

1.Bluetooth Low Energy(BLE)とは

Bluetooth Low Energy(BLE)とは、従来のBluetoothクラシックとは異なり、Bluetooth 4.0以降で導入された省電力通信規格で、主にセンサーデバイスやウェアラブル機器など、バッテリー駆動で長時間稼働する機器向けに設計されたものです。

・Bluetoothクラシックとの違い

Bluetooth Low Energy(BLE)と、従来のBluetoothクラシックの主な違いは以下表のようになります。

項目Bluetooth Low Energy (BLE)Bluetoothクラシック
主な用途センサー通信、IoT、ウェアラブル音声・データ転送(ヘッドホン等)
消費電力非常に低い高い
通信速度最大1 Mbps最大3 Mbps
ペアリングの必要性必須ではない(任意)必須
接続時間数ミリ秒〜数秒数秒〜数十秒
同時接続数多い(理論上数十台)少ない(通常7台まで)
プロファイルGATT(汎用属性プロファイル)A2DP, HFPなど多数
Bluetooth Low Energy(BLE)Bluetoothクラシックと比べて、通信速度は遅く大量のデータ転送には不向きですが、低消費電力で同時に多数のデバイスと容易に接続でき、今回ここで紹介する方法のようにペアリングも必須ではないため、低容量のデータ送受信やデバイスの温度データ取得等IoT機器には最適な無線通信方法です。

・接続方法(GATTベース)

接続方法については、ライブラリを使用すれば勝手に接続してくれるので、問題なく接続できていれば気にしなくても良いのですが、エラーが発生した場合は知っておいた方がその後の対応が楽になるのでここで紹介しておきます。

少々複雑なので、初めての方はとりあえず下の方のサンプルプログラムで動作確認してコードを触りながら確認することをおすすめします。

接続手順は「GATT(Generic Attribute Profile)」という仕組みを使って以下①〜⑤のような流れで行われます。

通信対象の親機と子機のような表現はBluetooth Low Energy(BLE)では以下のように表現されます。
・親機(中央機器):セントラル(Central)
・子機(周辺機器):ペリフェラル(Peripheral)

①アドバタイズ(Advertise:広告)
 周辺機器(ペリフェラル)が自分の存在を周囲に知らせる。
 知らせる情報(アドバタイズパケット)には、「デバイス名」や「サービスUUID」などがある。

②スキャン(Scan)
 中央機器(セントラル)が周囲のBLEデバイス(ペリフェラル)をスキャンして検出。

③接続(Connect)
 「セントラル」が「ペリフェラル」に接続要求を送信。
 接続が確立すると、「GATTセッション」が開始される。

④サービス探索(Service Discovery)
 「ペリフェラル」が提供する「サービスUUID」と「キャラクタリスティックUUID」を「セントラル」が取得。

⑤通信実行(Read/Write/Notify)
 「セントラル」が「ペリフェラル」の「キャラクタリスティックUUID」に対してデータを読み取ったり、書き込んだり、通知(Notify)を受け取ったりする。

  • Read: 「セントラル」が要求して、「ペリフェラル」からデータを受け取る(一方向)。
  • Write: 「セントラル」が「ペリフェラル」にデータを送信する(一方向)。
  • Notify(通知): 「ペリフェラル」のデータが変更された際に、「ペリフェラル」が「セントラル」に自動的に通知する(一方向)。
    ※「ペリフェラル」から「セントラル」に対して一方的にデータを送れるため、生存確認や近くに存在しているのを確認するビーコン的な使い方ができる
以上の手順で通信が実行されますが、通信は「ペリフェラル」が提供する「サービスUUID」の個々のデータ項目「キャラクタリスティックUUID」に対して行われます。各UUIDについて、以下から詳しく紹介します。

・サービスUUIDとキャラクタリスティックUUID

Bluetooth Low Energy(BLE)の「サービスUUID」と「キャラクタリスティックUUID」とは、デバイスが提供する機能を識別するために使われる重要な「識別子」です。
それぞれの役割や種類について詳しく紹介します。


サービスUUIDの種類

サービスUUIDは、デバイスが提供する「機能のまとまり」を表し、Bluetooth SIG(Special Interest Group)で定義されており、以下表のようにサービスごとに16ビットのUUIDで表されます。

サービス名UUID(16ビット)説明
Heart Rate0x180D心拍数モニタリング
Battery Service0x180Fバッテリー残量の取得
Device Information0x180Aデバイスの製造情報など
Health Thermometer0x1809体温測定
Blood Pressure0x1810血圧測定

登録されているUUIDについては以下Bluetooth SIGのサイト内PDFファイルで確認できます。

割当番号|ブルートゥース® テクノロジーウェブサイト
仕様リクエスト割当番号 企業識別子、メンバー用16ビットUUID、非メンバー用UUIDのリクエストや、Bluetoothの割当番号詳細は...

キャラクタリスティックUUID

キャラクタリスティックUUIDは、サービス内の「個々のデータ項目」を表します。
サービスUUIDと同様にBluetooth SIGで定義されており、以下表のように16ビットのUUIDで表されます。

キャラクタリスティック名UUID(16ビット)説明
Battery Level0x2A19バッテリー残量(%)
Heart Rate Measurement0x2A37心拍数データ
Manufacturer Name String0x2A29製造者名
Temperature Measurement0x2A1C温度データ

サービスUUIDとキャラクタリスティックUUIDの違い

サービスUUIDとキャラクタリスティックUUIDの違いは以下表のようになります。

項目サービスUUIDキャラクタリスティックUUID
役割機能のまとまり(例:心拍数サービス)個々のデータ項目(例:心拍数値)
階層上位下位(サービスの中に含まれる)

カスタムUUID

他にも開発者が独自に定義するカスタムUUIDもあります。

今回ここで紹介する「UART通信」のようなデータの送受信(TX/RX)機能を実現するためには、多くのデバイスメーカーが独自のカスタムUUIDを設定しています。

特に広く使われているのが「Nordic Semiconductor」が提供する「NUS(Nordic UART Service)」で、そのUUIDは以下のような128ビットで設定されています。

BLE経由でテキストやバイナリデータをUART通信のように送受信するためのカスタムUUID
・サービスUUID:
  6E400001-B5A3-F393-E0A9-E50E24DCCA9E
・受信用キャラクタリスティックUUID:
  6E400002-B5A3-F393-E0A9-E50E24DCCA9E
・送信用キャラクタリスティックUUID:
  6E400003-B5A3-F393-E0A9-E50E24DCCA9E
ここで紹介するサンプルプログラムもこのUART通信用の「カスタムUUID」を使用しています。
通信するだけなら「セントラル」と「ペリフェラル」でUUIDを同じにしておけば、オリジナルのUUIDを設定してもいいのですが、用途が特定できるように、定義されたものや一般的なUUIDを設定するようにしましょう。
スポンサーリンク

2.動作確認の準備

・開発環境Tonnyのインストール

開発環境thonnyは以下のリンクからダウンロードできます。

Thonny, Python IDE for beginners

Thonnyのインストール方法や使い方は以下のリンクで詳しく紹介しています。

ラズパイPico2の使い方:MicroPython編
Raspberry Pi Pico 2でPythonを使用した開発環境(Thonny)のインストールから初期設定、サンプルプログラムの動作確認方法まで詳しく紹介
ArduinoIDEや他の開発環境で使用していたものは、最初にPicoを初期化する必要があります。
初期化の方法についても上のリンクページの最後で紹介しています。

・ライブラリの準備

今回使用するライブラリは「MicroPython」の「aioble」です。
初期状態で使用できるため、個別にインストールする必要はありません。

Bluetooth通信を行うには「bluetooth」モジュールを使用しますが、ライブラリとしては以下の公式サイトのように「aioble」を使用することを推奨しています。

bluetooth --- 低レベル Bluetooth — MicroPython latest ドキュメント
bluetooth」は MicroPython に組み込まれている 標準モジュール です。
Bluetooth通信の低レベル機能(アドバタイズ、スキャン、接続など)を提供します。
aioble」は「bluetooth」モジュールを使って構築された ライブラリ です。
より高レベルな機能(非同期通信、GATTサービスの管理など)を提供します。
スポンサーリンク

3.Bluetooth Low Energy(BLE)通信の使い方

・ライブラリの使い方

今回使用する「MicroPython」のBluetooth Low Energy(BLE)用ライブラリ「aioble」を使用するには「bluetooth」モジュールと一緒に以下のようにインポートして使用します。

import aioble     # MicroPythonのBLEライブラリ
import bluetooth  # MicroPythonのBLEモジュール
import uasyncio as asyncio  # 非同期処理ライブラリ
「aioble」ライブラリは非同期で使用する必要があるため「asyncio」ライブラリも同時にインポートする必要があります。

今回使用する「aioble」ライブラリの基本的な使用方法についてはUUIDの設定等、初期設定が複雑なため、詳細は「サンプルプログラムの詳細」の内容を確認してください。

以下、大まかな流れを紹介します。

# UUIDの設定やキャラクタリスティックの設定、各サービスの開始
# --- 省略(サンプルプログラム参照)---

# アドバタイズド開始
async with await aioble.advertise( # BLEアドバタイズを開始
    250_000,                       # アドバタイズ間隔(マイクロ秒)
    name="RPi-Pico",               # アドバタイズ名(BLE接続デバイス名)
    services=[UART_SERVICE_UUID],  # サービスUUIDを指定
) as connection:                   # 接続待機

# データ送信(接続先の「connection」に対して「data」を送信)
await tx_char.notify(connection, data.encode())  # データをエンコードして送信

# データ受信(接続先の「connection」から「tx_char」に書き込まれたデータを「value」に格納)
connection, value = await rx_char.written() # 受信処理(受信データが来るまでここで待機する)
データの送受信の際には適切なエラー処理を行わないと、処理が問題なく完了していても中断してしまうことがありました。
これについても「サンプルプログラムの詳細」の内容を確認してください。

・動作確認方法

動作についてはスマホアプリの「BLE Terminal」等でも確認できますが、ここでは下画像のような自作のPythonアプリを使用します。

BLE Pythonアプリの使い方

このアプリを使用することで、周辺のBLEデバイスの「スキャン」から、「接続」してデータの「送受信」や「記録」機能で、受信したデータをcsvファイルに保存することもできます。

このアプリの実行プログラムや、起動方法についてはこのページの最後の方で紹介しています。

4.動作確認1:PicoW基板内蔵温度データ取得

サンプルプログラムを使用して実際に動作確認していきます。

動作確認1では「PicoW」基板内蔵の温度センサーの値をBluetooth Low Energy(BLE)通信で受信して確認します。
BLE通信接続中はPicoW基板上のLEDが点灯し、接続が解除されると消灯します。

「PicoW」の温度センサの値は1秒ごとに送信するように設定しています。
温度センサの値は基板内部の温度で、基板の発熱の影響を受けるため雰囲気温度とは異なります。あまり実用的ではないため、正確な温度測定を行うには外付けの温度センサを使用しましょう。
Raspberry Pi Pico W
共立電子産業株式会社 KYOHRITSU ELECTRONIC INDUSTRY CO.,LTD.

・動作紹介1

動作確認の様子は以下になります。

開発環境のThonnyを起動したら下画像のように、サンプルプログラムをコピペで貼り付けて[実行]ボタンを押します。

ThonnyでPython実行

Pythonアプリをパソコンから起動して接続すると、下画像のように1秒ごとにPico基板の内蔵温度センサーの値が送信されて、パソコン側のアプリで確認したり、記録することができます。

Thonnyで動作確認

パソコン側のPythonアプリでの動作確認の様子は以下のようになります。

BLE Pythonアプリの使い方

Pythonアプリで[スキャン]ボタンを押してしばらくすると、デバイス名「RPi-Pico」が見つかるので、上画像のようにドロップダウンリストからデバイス名に「RPi-Pico」を指定します。

BLE Pythonアプリの使い方

[接続]ボタンを押すと、PicoW基板への接続が開始され上画像のように「接続成功」が表示されると、1秒ごとに温度データを受信して確認することができます。
[切断ボタン]を押すと、接続が解除されて温度データの受信は終了します。

BLE Pythonアプリの使い方

次に[記録]ボタンを押すと上画像のように「記録開始:〜」が表示され、Pythonアプリファイルがあるフォルダにタイムスタンプ付きのcsvファイルが作成されます。(下画像)

「記録」状態で[接続]ボタンを押すと、温度データが表示され、同時にcsvファイルにデータが保存されていきます。
[切断]ボタンを押すと温度データの受信と保存が終了します。

[記録]ボタンを押すと上画像のように、タイムスタンプ付きのcsvファイルが作成されます。
このcsvファイルは空で作成されますが、データを受信するごとに追記して保存されていきます。

csvファイルを開くと上画像のように受信したデータが保存されているのが確認できます。
csv形式で保存しているため、エクセル等で開いてグラフ表示やデータ分析等に使用できます。

BLE Pythonアプリの使い方

「記録」を終了するには上画像のように[停止]ボタンを押します。

BLE Pythonアプリの使い方

[削除]ボタンを押すことで、受信テキストエリアの表示内容を全て消去することができます。

・サンプルプログラム1

サンプルプログラムは以下になります。
コピペで貼り付けて実行してください。コピーは下の黒塗り部右上のアイコンクリックでもできます。

# pico_ble_temp.py (micropython)
# BLE接続の基本設定完了後、Pico基板内蔵の温度センサの値を取得して1秒ごとに送信

import aioble               # MicroPythonのBLEライブラリ
import bluetooth            # MicroPythonのBLEモジュール
import uasyncio as asyncio  # 非同期処理ライブラリ
from machine import Pin, ADC  # PicoのGPIOとADCモジュールをインポート

UART_SERVICE_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")  # UARTサービスUUID
UART_TX_CHAR_UUID = bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E")  # UART送信用キャラクタリスティックUUID

uart_service = aioble.Service(UART_SERVICE_UUID)  # UARTサービスを定義
tx_char = aioble.Characteristic(uart_service, UART_TX_CHAR_UUID, notify=True)  # UART送信用キャラクタリスティックを定義

aioble.register_services(uart_service)  # GATTサービスを登録

led = Pin("LED", Pin.OUT)  # LED端子をledとして出力に設定

# Pico内蔵温度センサーから温度を取得
def getTemp():
    sensor_temp = ADC(4)
    conversion_factor = 3.3 / 65535

    reading = sensor_temp.read_u16()
    voltage = reading * conversion_factor
    temperature = 27 - (voltage - 0.706) / 0.001721
    print("Temperature: {:.2f} °C".format(temperature))
    return temperature

# BLE Peripheralとして動作するメインループ
async def ble_notify_loop():
    while True:
        try:  # BLE接続を待機
            led.value(0)  # 接続待機中はLED消灯
            async with await aioble.advertise(  # BLEアドバタイズを開始
                250_000,  # アドバタイズ間隔(マイクロ秒)
                name="RPi-Pico",  # アドバタイズ名(BLE接続デバイス名)
                services=[UART_SERVICE_UUID],  # サービスUUIDを指定
            ) as connection:  # 接続待機
                
                print("接続中... :", connection.device)  # 接続されたデバイス情報を表示
                await asyncio.sleep(2)  # 接続安定化のため少し待つ
                
                while connection.is_connected():  # 接続が維持されている間ループ
                    led.value(1)  # 接続されたらLED点灯

                    try:  # データを送信(try:~内で実行しないと送信できない)
                        temp = getTemp()  # 温度センサーから温度を取得
                        data = f"{temp:.2f}℃\n"  # 小数点以下2桁にフォーマット
                        await tx_char.notify(connection, data.encode())  # データをエンコードして送信
                    except Exception:
                        pass  # エラー(Notify Error: 'NoneType' object isn't iterable)は無視して続行
                    
                    await asyncio.sleep(1)  # 1秒ごとにデータを送信
        except Exception as e:  # アドバタイズエラー処理
            print("Error:", e)  # エラーが発生した場合の処理
        await asyncio.sleep_ms(1000)  # 1秒待機して再試行

asyncio.run(ble_notify_loop())  # PicoのBLE Peripheralとして動作開始

・サンプルプログラム1の詳細

サンプルプログラムの詳細は以下になります。

Bluetooth Low Energy(BLE)通信を行うために、まずは「9,10行目」で以下のように自分の存在を周囲に知らせる「アドバタイズ(広告)」のための「UUID」を設定します。

UART_SERVICE_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")  # UARTサービスUUID
UART_TX_CHAR_UUID = bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E")  # UART送信用キャラクタリスティックUUID

次に「12〜15行目」で以下のように、設定した「UUID」を定義して、セントラル(ここではパソコンアプリ)が接続要求を送信してきた時のために「GATTサービス」を登録しておきます。

uart_service = aioble.Service(UART_SERVICE_UUID)  # UARTサービスを定義
tx_char = aioble.Characteristic(uart_service, UART_TX_CHAR_UUID, notify=True)  # UART送信用キャラクタリスティックを定義

aioble.register_services(uart_service)  # GATTサービスを登録

19〜25行目」の以下「getTemp()」関数は、PicoW基板内蔵の温度センサの値を取得するためのものです。換算式等はメーカが提供するものを使って温度データを取得できるようにしています

# Pico内蔵温度センサーから温度を取得
def getTemp():

以下の「30〜59行目」がBluetooth Low Energy(BLE)通信を行うためのメインループ処理です。

# BLE Peripheralとして動作するメインループ
async def ble_notify_loop():
    while True:
        try:  # BLE接続を待機
            .
            ここでアドバタイズ(広告)が実行され、接続が開始されたら処理を実行
            アドバタイズに失敗しても1秒後に再試行される
            .
        except Exception as e:  # アドバタイズエラー処理
            print("Error:", e)  # エラーが発生した場合の処理
        await asyncio.sleep_ms(1000)  # 1秒待機して再試行

asyncio.run(ble_notify_loop())  # PicoのBLE Peripheralとして動作開始
BLE通信は基本的に非同期で実行されるため非同期関数「ble_notify_loop()」を作成して非同期で実行するようにしています。

以下の「30〜59行目」が自分の存在を周囲に知らせるための「アドバタイズ(広告)」処理です。

async with await aioble.advertise(  # BLEアドバタイズを開始
    250_000,  # アドバタイズ間隔(マイクロ秒)
    name="RPi-Pico",  # アドバタイズ名(BLE接続デバイス名)
    services=[UART_SERVICE_UUID],  # サービスUUIDを指定
) as connection:  # 接続待機

この処理の詳細は以下になります。

async with awaitaioble.advertise(
  アドバタイズ間隔,  # アドバタイズ(広告)の間隔をマイクロ秒単位で設定
  アドバタイズ名,   # スキャンした時に表示されるデバイス名になります
  サービスUUID,   # 広告するサービスを指定、デバイスの用途が認識される
) as connection:

非同期(async) でBLEのアドバタイズ(広告)を開始します。
with構文」を使うことで、接続が確立されるまでこの行で待機します。
接続が成功すると、「connection」という名前のオブジェクトが生成されます。

接続が完了すると「44〜54行目」で以下のように、取得した温度データをセントラル(ここではパソコンアプリ)へ送信(通知:notify)しています。

while connection.is_connected():  # 接続が維持されている間ループ
    led.value(1)  # 接続されたらLED点灯

    try:  # データを送信(try:~内で実行しないと送信できない)
        temp = getTemp()  # 温度センサーから温度を取得
        data = f"{temp:.2f}℃\n"  # 小数点以下2桁にフォーマット
        await tx_char.notify(connection, data.encode())  # データをエンコードして送信
    except Exception:
        pass  # エラー(Notify Error: 'NoneType' object isn't iterable)は無視して続行
                    
    await asyncio.sleep(1)  # 1秒ごとにデータを送信

温度データの「送信(通知:notify)」を行なっている部分の詳細は以下になります。

await tx_char.notify(connection, data.encode())# データをエンコードして送信

ここでは、送信先の「connection」オブジェクトの送信用キャラクタリスティック「tx_char」に対してエンコードした「data」を非同期処理で「送信(通知:notify)」しています。

notify」を実行すると「エラー(Notify Error: ‘NoneType’ object isn’t iterable)」が発生します。これによって処理が中断してしまうため「try:〜except」内で実行してエラーは無視(pass)することで問題なく通知(notify)は完了できました。
エラー対策も少し探ってみたのですが、うまくいかなかったので・・・あまり深追いせずに今回はpassしています。
今回「copilot」にベースを書いてもらいましたがこの解決はできず、無視するということも教えてくれませんでしたので「解決できないので無視することにします」としたら、あっさり「問題ないと思います」との回答でした^^

5.動作確認2:UART通信のようにデータ送受信

動作確認2では、「Bluetooth Low Energy(BLE)」通信でパソコン側から送信したテキストデータをPicoW基板で受信して、そのままパソコン側へ送り返す(コールバック)動作について確認します。
BLE通信接続中はPicoW基板上のLEDが点灯し、接続が解除されると消灯します。

ここでは「Bluetooth Low Energy(BLE)」通信を使用した、データの「送信方法」と「受信方法」両方の確認ができます。
これができると、受信したコマンドごとにデバイスの動作を切り替えたり、デバイスの状態を送信して知らせることができるため、遠隔操作やデータ監視デバイスの設定変更等に応用することができます。
Raspberry Pi Pico W
共立電子産業株式会社 KYOHRITSU ELECTRONIC INDUSTRY CO.,LTD.

・動作紹介2

動作確認1と同様に開発環境のThonnyを起動したら下画像のように、サンプルプログラムをコピペで貼り付けて[実行]ボタンを押します。

ThonnyでPython実行

Pythonアプリをパソコンから起動して文字列を送信すると、下画像のようにPicoW基板で受信され、受信したデータがそのまま送り返されるコールバック動作が確認できます。

Thonnyで動作確認

パソコン側のPythonアプリでの動作確認の様子は以下のようになります。
スキャンから接続までの動作は動作確認1の時と同じです。

BLE Pythonアプリの使い方

「送信テキストエリア」に文字列(半角英数)を入力して[送信]ボタンをクリックすると、PicoW基板へデータが送信され、コールバックでそのままの文字列が返されて「受信テキストエリア」で確認できます。

BLE Pythonアプリの使い方

「送信テキストエリア」の文字列は[Enter]キーを押すことでも送信できます。
※測定器のデータ受信等で同じコマンドを連続して送信できるように、送信後のテキストはクリアされないようにしています。
アプリ側の修正で消去するようにもできます。

・サンプルプログラム2

サンプルプログラムは以下になります。
コピペで貼り付けて実行してください。コピーは下の黒塗り部右上のアイコンクリックでもできます。

# pico_ble_uart_echo.py
# BLEで受信したデータをそのまま返す(コールバック)

import aioble               # MicroPythonのBLEライブラリ
import bluetooth            # MicroPythonのBLEモジュール
import uasyncio as asyncio  # 非同期処理ライブラリ
from machine import Pin  # PicoのGPIOモジュールをインポート

UART_SERVICE_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")  # UARTサービスUUID
UART_RX_CHAR_UUID = bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E")  # UART受信用キャラクタリスティックUUID
UART_TX_CHAR_UUID = bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E")  # UART送信用キャラクタリスティックUUID

# サービスとキャラクタリスティックの定義
uart_service = aioble.Service(UART_SERVICE_UUID)  # UARTサービスを定義
tx_char = aioble.Characteristic(uart_service, UART_TX_CHAR_UUID, read=True, notify=True)  # UART送信用キャラクタリスティックを定義
rx_char = aioble.Characteristic(uart_service, UART_RX_CHAR_UUID, write=True, capture=True)  # UART受信用キャラクタリスティックを定義

aioble.register_services(uart_service)  # GATTサービスを登録

led = Pin("LED", Pin.OUT)  # LED端子をledとして出力に設定

# 非同期で受信タスクを実行
async def handle_rx(connection):
    # 受信待ちループ
    while connection.is_connected():
        try:
            connection, value = await rx_char.written() # 受信処理(受信データが来るまでここで待機する)
            print("受信:", value)  # 受信したデータを表示

            # 受信したデータをそのまま通知(送信)
            try:
                if connection and connection.is_connected():  # 接続が有効な場合のみ送信
                    print("送信:", value) # 先に出力しておく(この後エラーになるため)
                    await tx_char.notify(connection, value) # 通知(送信)は成功するがエラーになる
                else:
                    print("通知対象が無効です:", connection)  # 接続が無効な場合の処理
            except Exception:
                pass  # エラー(Notify Error: 'NoneType' object isn't iterable)は無視して続行
        except Exception as e:
            print("受信/送信エラー:", e) 
            break

# BLE Peripheralとして動作するメインループ
async def ble_uart_echo():
    while True:
        led.value(0)  # 接続待機中はLED消灯
        try:
            async with await aioble.advertise(  # BLEアドバタイズを開始
                250_000,  # アドバタイズ間隔(マイクロ秒)
                name="RPi-Pico",  # アドバタイズ名(BLE接続デバイス名)
                services=[UART_SERVICE_UUID],  # サービスUUIDを指定
            ) as connection: # 接続待機
                print("接続中:", connection.device)  # 接続されたデバイス情報を表示
                led.value(1)

                # 受信処理を別タスクで開始
                rx_task = asyncio.create_task(handle_rx(connection))

                # 接続が切れるまで待機
                while connection.is_connected():
                    await asyncio.sleep_ms(100)

                # 接続解除 → タスクキャンセル
                rx_task.cancel()
                led.value(0)  # LED消灯
                print("接続解除されました")
        except Exception as e:
            print("アドバタイズエラー:", repr(e))
        await asyncio.sleep_ms(200)
# PicoのBLE Peripheralとして動作開始
asyncio.run(ble_uart_echo())    

・サンプルプログラム2の詳細

サンプルプログラムの詳細は以下になります。
「UUID」の設定から「GATTサービス」の登録までは動作確認1と同じです。

データの送信(通知:notify)方法も動作確認1と同じで、動作確認2ではデータ受信の方法について詳しく紹介します。

データの受信は「22〜41行目」で以下のように非同期関数で実行しています。
メインループ内で実行してもいいのですが、省電力な「Bluetooth Low Energy(BLE)」通信の特性なのか、受信処理を実行すると実行した時点で低消費電力状態でその場で待機するのでLEDの点灯消灯の制御等他の処理との連携がうまくいかなかったため、非同期関数で実行しています。

# 非同期で受信タスクを実行
async def handle_rx(connection):
    # 受信待ちループ
    while connection.is_connected():
        try:
            connection, value = await rx_char.written() # 受信処理(受信データが来るまでここで待機する)
            print("受信:", value)  # 受信したデータを表示

            # 受信したデータをそのまま通知(送信)
            try:
                if connection and connection.is_connected():  # 接続が有効な場合のみ送信
                    print("送信:", value) # 先に出力しておく(この後エラーになるため)
                    await tx_char.notify(connection, value) # 通知(送信)は成功するがエラーになる
                else:
                    print("通知対象が無効です:", connection)  # 接続が無効な場合の処理
            except Exception:
                pass  # エラー(Notify Error: 'NoneType' object isn't iterable)は無視して続行
        except Exception as e:
            print("受信/送信エラー:", e) 
            break

データ受信部は「27行目」で詳細は以下になります。

connection, value = await rx_char.written() # 受信処理

await rx_char.written()」が実行されると、設定した受信用キャラクタリスティック「rx_char」 に書き込みされるまで待機します。

connection」は受信したデータの送信元(セントラル)を表します。
これにより、どのクライアントに対して応答すべきかが分かるため、複数接続にも対応可能です。

value」には書き込まれた(セントラルから受信した)データ(bytes型)が格納されます。

その後のデータ送信(通知:notify)方法については動作確認1と同じです。

6.自作Pythonアプリの紹介

最後に、動作確認に使用した自作のPythonアプリについて紹介しておきます。
Pythonの「tkinter」で操作画面を作成して「bleak」モジュールでBLE通信を制御しています。

以下にコピペ用サンプルプログラムを準備しました。実行方法についても紹介しておきます。

このアプリは個人の動作確認用に作成したものです。基本的な機能の動作確認に使用できますが、思わぬエラーが発生するかもしれません。
ご使用前にサンプルプログラムをご確認いただき、自己責任にて使用してください。
また、再配布はご遠慮ください。個人で使用する分には改変については自由に行なってください。

・コピペ用プログラム

サンプルプログラムは以下になります。メモ帳等にコピペで貼り付けて拡張子を「.py」に変更して実行してください。コピーは下の黒塗り部右上のアイコンクリックでもできます。

import asyncio
import csv
import datetime
import threading
import tkinter as tk
from tkinter import ttk, scrolledtext
from bleak import BleakClient, BleakScanner

# UUID定義
UART_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
UART_RX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E"
UART_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"

class BLEUARTApp:
    def __init__(self, root):
        self.client = None
        self.csv_file = None
        self.recording = False
        self.loop = asyncio.new_event_loop()
        threading.Thread(target=self.loop.run_forever, daemon=True).start()

        self.setup_gui(root)
        root.protocol("WM_DELETE_WINDOW", self.on_close)

    def setup_gui(self, root):
        root.title("BLE UART Communication")
        root.geometry("400x350")

        style = ttk.Style()
        style.configure("Right.TCombobox", justify='right')  # テキストを右寄せ

        
        # デバイス名選択用コンボボックス(setup_guiの中)
        tk.Label(root, text="デバイス名:", anchor="e").grid(row=0, column=0, sticky="e")
        self.device_name_var = tk.StringVar()
        self.device_combo = ttk.Combobox(root, textvariable=self.device_name_var)
        self.device_combo.grid(row=0, column=1, columnspan=2, sticky="ew")

        # 送信テキストの改行オプション
        self.send_append_lf = tk.BooleanVar(value=True)    # 初期状態でLFがオン
        self.send_append_cr = tk.BooleanVar(value=False)
        line_control_frame = tk.Frame(root)
        line_control_frame.grid(row=0, column=3, sticky="n", padx=5)

        tk.Checkbutton(line_control_frame, text="LF", variable=self.send_append_lf).pack(anchor="w")
        tk.Checkbutton(line_control_frame, text="CR", variable=self.send_append_cr).pack(anchor="w")

        # 送信テキスト
        self.send_entry = tk.Entry(root)
        self.send_entry.grid(row=1, column=0, columnspan=4, sticky="ew")
        self.send_entry.bind("<Return>", lambda e: self.send_data())

        # 受信テキストエリア + スクロール
        self.recv_text = scrolledtext.ScrolledText(root)
        self.recv_text.grid(row=2, column=0, columnspan=4, sticky="nsew")

        # 操作ボタン
        self.scan_btn = tk.Button(root, text="スキャン", width=10, command=self.scan_devices)
        self.scan_btn.grid(row=3, column=0)
        self.connect_btn = tk.Button(root, text="接続", width=10, command=self.connect_device)
        self.connect_btn.grid(row=3, column=1)
        self.disconnect_btn = tk.Button(root, text="切断", width=10, command=self.disconnect_device)
        self.disconnect_btn.grid(row=3, column=2)
        self.send_btn = tk.Button(root, text="送信", width=10, command=self.send_data)
        self.send_btn.grid(row=3, column=3)

        # 記録制御ボタン
        self.record_btn = tk.Button(root, text="記録", width=10, command=self.start_recording)
        self.record_btn.grid(row=4, column=0)
        self.stop_btn = tk.Button(root, text="停止", width=10, command=self.stop_recording)
        self.stop_btn.grid(row=4, column=1)
        self.clear_btn = tk.Button(root, text="削除", width=10, command=lambda: self.recv_text.delete(1.0, tk.END))
        self.clear_btn.grid(row=4, column=2)

        # レイアウト調整用の空白行
        tk.Frame(root, height=10).grid(row=5, column=0, columnspan=4)

        root.grid_columnconfigure(0, weight=1)
        root.grid_columnconfigure(1, weight=1)
        root.grid_columnconfigure(2, weight=1)
        root.grid_columnconfigure(3, weight=1)
        root.grid_rowconfigure(2, weight=1)

    def log_to_gui(self, message):
        timestamp = datetime.datetime.now().strftime("%H:%M:%S")
        formatted = f"[{timestamp}] {message}"
        self.recv_text.insert(tk.END, formatted + "\n")
        self.recv_text.yview(tk.END)
        print(message)  # コンソールへも出力

    async def handle_notifications(self, sender, data):
        try:
            text = data.decode()
            lines = text.split("\n")
            timestamp = datetime.datetime.now().strftime("%H:%M:%S") 

            for line in lines:
                if not line.strip():
                    continue

                formatted_line = f"[{timestamp}] {line.strip()}"
                self.recv_text.insert(tk.END, formatted_line + "\n")
                self.recv_text.yview(tk.END)

                if self.recording and self.csv_filename:
                    try:
                        with open(self.csv_filename, "a", newline="", encoding="utf-8") as f:
                            writer = csv.writer(f)
                            fields = [timestamp] + [float(x) if x.replace(".", "", 1).isdigit() else x for x in line.split(",")]
                            writer.writerow(fields)
                    except Exception as e:
                        self.log_to_gui(f"記録中のエラー: {e}")
        except Exception as e:
            self.log_to_gui(f"受信処理エラー: {e}")

    def scan_devices(self):
        asyncio.run_coroutine_threadsafe(self.async_scan(), self.loop) 

    async def async_scan(self):
        try:
            self.log_to_gui("BLEデバイスのスキャン開始")
            devices = await BleakScanner.discover(timeout=5.0)
            names = sorted({d.name for d in devices if d.name})  # 重複除去 + ソート
            if names:
                self.device_combo['values'] = names
                self.device_combo.set(names[0])  # 最初の項目を初期選択
                self.log_to_gui(f"見つかったデバイス一覧: {names}")
            else:
                self.device_combo.set("")
                self.device_combo['values'] = []
                self.log_to_gui(f"名前付きデバイスが見つかりません")
        except Exception as e:
            self.log_to_gui(f"スキャンエラー: {e}")

    async def connect(self, device_name):
        try:
            if self.client:
                await self.client.disconnect()

            self.log_to_gui(f" {device_name}へ接続中...")
            devices = await BleakScanner.discover()
            target = next((d for d in devices if d.name == device_name), None)
            if target is None:
                self.log_to_gui(f"デバイスが見つかりません")
                return

            self.client = BleakClient(target)
            await self.client.connect()
            await self.client.start_notify(UART_TX_CHAR_UUID, self.handle_notifications)
            self.log_to_gui(f"接続成功")
        except Exception as e:
            self.log_to_gui(f"接続エラー: {e}")

    def connect_device(self):
        device_name = self.device_name_var.get()
        if not device_name:
            return
        asyncio.run_coroutine_threadsafe(self.connect(device_name), self.loop)

    def send_data(self):
        data = self.send_entry.get()

        # 末尾追加処理
        suffix = ""
        if self.send_append_cr.get():
            suffix += "\r"
        if self.send_append_lf.get():
            suffix += "\n"
        data += suffix

        if not self.client or not self.client.is_connected:
            return

        asyncio.run_coroutine_threadsafe(
            self.client.write_gatt_char(UART_RX_CHAR_UUID, data.encode()),
            self.loop
        )
        # self.send_entry.delete(0, tk.END) # 送信後に入力欄をクリアしないようにコメントアウト

    def start_recording(self):
        timestamp = datetime.datetime.now().strftime("%y%m%d_%H%M%S")
        self.csv_filename = f"{timestamp}.csv"
        self.recording = True

        # 空のCSVファイルを作成
        try:
            with open(self.csv_filename, "a", newline="", encoding="utf-8") as f:
                pass  # 何も書き込まずに即閉じる(ヘッダーを追加する場合は無効に)
                # writer = csv.writer(f)  # ヘッダー行を追加する場合は以下を有効に
                # writer.writerow(["timestamp", "受信データ"])  # ヘッダーの内容
        except Exception as e:
            self.log_to_gui(f"記録ファイル作成エラー: {e}")
            return

        self.log_to_gui(f"記録開始: {self.csv_filename}")

    def stop_recording(self):
        self.recording = False
        self.csv_filename = None
        self.log_to_gui("記録停止")

    def disconnect_device(self):
        if self.client and self.client.is_connected:
            asyncio.run_coroutine_threadsafe(self.client.disconnect(), self.loop)
            self.log_to_gui("接続を解除しました")
            self.client = None
        else:
            self.log_to_gui("現在接続されていません")

    def on_close(self):
        asyncio.run_coroutine_threadsafe(self.cleanup(), self.loop)
        self.loop.call_soon_threadsafe(self.loop.stop)
        root.quit()  # GUIイベントループを終了

    async def cleanup(self):
        if self.client:
            try:
                await self.client.disconnect()
                print("接続解除")
            except Exception as e:
                print("切断時エラー:", e)
        self.stop_recording()
        root.destroy()

# アプリ起動
root = tk.Tk()
app = BLEUARTApp(root)
root.mainloop()

・実行方法

サンプルプログラムのPythonアプリの実行方法について紹介します。

このアプリを実行するにはPythonの実行環境が必要です。
Pythonのインストール方法は以下のリンクで詳しく紹介しています。

pythonのダウンロードからインストール方法の紹介
人気のプログラミング言語 python のインストール方法の紹介です。python はアプリ開発やWebサイト構築、ディープラーニングにも使用されますが、マイコンボードの統合開発環境「Platform IO」のインストールにも必要です。

専用の作業フォルダを作った方がわかりやすいので、まずは好きな場所に好きな名前で作業フォルダを作成します。

ここでは「Documents」フォルダに作成した「python」フォルダを作業フォルダにすることを例に紹介していきます。

次にメモ帳アプリを開きます。開いたら上のサンプルプログラムをコピペで貼り付けます。
拡張子を「.txt」から「.py」に変更して下画像のように作業フォルダ「python」に保存します。

python実行方法

pythonファイルが保存できたら、下画像のようにWindows画面下の検索窓に「cmd」と入力して「コマンドプロンプト」の[開く]をクリックしてコマンドプロンプトを起動します。

コマンドプロンプトの起動方法

起動したら、まずPythonのBLE通信制御用の「bleak」ライブラリを下のコマンドを入力してインストールします。

pip install bleak

実際に入力している画像は以下になります。

pythonモジュールのインストール方法

以下のような画面が表示された場合は、インストールに使用する「pip」のアップグレードが必要です。
黄緑色で表示されているコマンドをコピペで貼り付けて実行してください。

python pipのアップグレード方法

アップグレードが完了したら再度「bleak」のインストールコマンドを入力してインストールします。

「bleak」のインストールが完了したら、以下のコマンド(cd 作業フォルダのパス)で、pythonアプリファイルを保存した作業フォルダへ移動します。(作業環境によってパスは異なります。)

cd Documents\python  

実際に入力した画像は以下になります。

Pythonアプリの起動方法

最後に以下のコマンドを入力して、pythonアプリを起動します。

python BLE_tkinter.py

実際に入力した画像は以下になります。

Pythonアプリの起動方法

実行されると以下のようにアプリ画面が表示されます。

BLE Pythonアプリの使い方

このアプリでBLE通信の基本的なデータの送受は動作確認できるので、サンプルプログラムの動作確認用に使用してください。

7.まとめ

「Raspberry Pi PicoW」でBluetooth Low Energy(BLE)通信を使用した、データの送受信方法をサンプルプログラムを使用して詳しく紹介しました。

Bluetooth Low Energy(BLE)とは、従来のBluetoothクラシックとは異なり、Bluetooth 4.0以降で導入された省電力通信規格で、クラシックと比較すると通信速度は遅く大量のデータ転送には不向きですが、低消費電力で同時に多数のデバイスと容易に接続でき、ペアリングも必須ではないため、低容量のデータ送受信やデバイスの温度データ取得等、センサーデバイスやウェアラブル機器など、バッテリー駆動で長時間稼働する機器には最適です。

Bluetoothクラシックのように「仮想COMポート」としてシリアル通信のように使用することはできませんが、今回紹介したようにBLE通信用のアプリを使用すれば、ペアリング不要で簡単にデータの送受信を行うことができます。

Bluetooth Low Energy(BLE)でもペアリングを行った通信は可能ですが、今回のようなデバイスの温度データを取得するだけや、近くにデバイスが存在しているかどうかを確認するビーコン的な用途にはBluetooth Low Energy(BLE)が最適です。

今回は触れていませんがペアリング認証を使用した通信方法も今後紹介していければと思います。

Raspberry Pi Pico W
共立電子産業株式会社 KYOHRITSU ELECTRONIC INDUSTRY CO.,LTD.
ラズパイPicoW/Pico2W Bluetooth通信の使い方
Raspberry Pi PicoWでBluetooth通信を使用して、遠隔操作やデータ取得する方法をサンプルプログラムを使用して詳しく紹介します。

コメント

タイトルとURLをコピーしました