「Raspberry Pi PicoW」でBluetooth Low Energy(BLE)を使用して、ペアリング不要でデータを送受信する方法をPython(MicroPython)のサンプルプログラムを使用して詳しく紹介します。
動作確認には自作のpythonアプリを使用します。受信データのcsvファイル保存機能付きで便利です。このアプリのコードも最後に公開してますので動作確認用に使用してください。
パソコンは「Windows11」、開発環境は「Tonny」を使用してPython(MicroPython)で行います。
「Raspberry Pi Pico」の基本的な使い方やBluetoothクラシック(Arduino:C言語)の使い方は以下のリンクで詳しく紹介しています。


1.Bluetooth Low Energy(BLE)とは
・Bluetoothクラシックとの違い
・接続方法(GATTベース)
・サービスUUIDとキャラクタリスティックUUID
2.動作確認の準備
・開発環境Tonnyのインストール
・ライブラリの準備
3.Bluetooth Low Energy(BLE)通信の使い方
・ライブラリの使い方
・動作確認方法
4.動作確認1:PicoW基板内蔵温度データ取得
・動作紹介1
・サンプルプログラム1
・サンプルプログラム1の詳細
5.動作確認2:UART通信のようにデータ送受信
・動作紹介2
・サンプルプログラム2
・サンプルプログラム2の詳細
6.自作Pythonアプリの紹介
・コピペ用プログラム
・実行方法
7.まとめ
1.Bluetooth Low Energy(BLE)とは
Bluetooth Low Energy(BLE)とは、従来のBluetoothクラシックとは異なり、Bluetooth 4.0以降で導入された省電力通信規格で、主にセンサーデバイスやウェアラブル機器など、バッテリー駆動で長時間稼働する機器向けに設計されたものです。
・Bluetoothクラシックとの違い
Bluetooth Low Energy(BLE)と、従来のBluetoothクラシックの主な違いは以下表のようになります。
項目 | Bluetooth Low Energy (BLE) | Bluetoothクラシック |
---|---|---|
主な用途 | センサー通信、IoT、ウェアラブル | 音声・データ転送(ヘッドホン等) |
消費電力 | 非常に低い | 高い |
通信速度 | 最大1 Mbps | 最大3 Mbps |
ペアリングの必要性 | 必須ではない(任意) | 必須 |
接続時間 | 数ミリ秒〜数秒 | 数秒〜数十秒 |
同時接続数 | 多い(理論上数十台) | 少ない(通常7台まで) |
プロファイル | GATT(汎用属性プロファイル) | A2DP, HFPなど多数 |
・接続方法(GATTベース)
接続方法については、ライブラリを使用すれば勝手に接続してくれるので、問題なく接続できていれば気にしなくても良いのですが、エラーが発生した場合は知っておいた方がその後の対応が楽になるのでここで紹介しておきます。
接続手順は「GATT(Generic Attribute Profile)」という仕組みを使って以下①〜⑤のような流れで行われます。
・親機(中央機器):セントラル(Central)
・子機(周辺機器):ペリフェラル(Peripheral)
①アドバタイズ(Advertise:広告)
周辺機器(ペリフェラル)が自分の存在を周囲に知らせる。
知らせる情報(アドバタイズパケット)には、「デバイス名」や「サービスUUID」などがある。
②スキャン(Scan)
中央機器(セントラル)が周囲のBLEデバイス(ペリフェラル)をスキャンして検出。
③接続(Connect)
「セントラル」が「ペリフェラル」に接続要求を送信。
接続が確立すると、「GATTセッション」が開始される。
④サービス探索(Service Discovery)
「ペリフェラル」が提供する「サービスUUID」と「キャラクタリスティックUUID」を「セントラル」が取得。
⑤通信実行(Read/Write/Notify)
「セントラル」が「ペリフェラル」の「キャラクタリスティックUUID」に対してデータを読み取ったり、書き込んだり、通知(Notify)を受け取ったりする。
- Read: 「セントラル」が要求して、「ペリフェラル」からデータを受け取る(一方向)。
- Write: 「セントラル」が「ペリフェラル」にデータを送信する(一方向)。
- Notify(通知): 「ペリフェラル」のデータが変更された際に、「ペリフェラル」が「セントラル」に自動的に通知する(一方向)。
※「ペリフェラル」から「セントラル」に対して一方的にデータを送れるため、生存確認や近くに存在しているのを確認するビーコン的な使い方ができる
・サービスUUIDとキャラクタリスティックUUID
Bluetooth Low Energy(BLE)の「サービスUUID」と「キャラクタリスティックUUID」とは、デバイスが提供する機能を識別するために使われる重要な「識別子」です。
それぞれの役割や種類について詳しく紹介します。
サービスUUIDの種類
サービスUUIDは、デバイスが提供する「機能のまとまり」を表し、Bluetooth SIG(Special Interest Group)で定義されており、以下表のようにサービスごとに16ビットのUUIDで表されます。
サービス名 | UUID(16ビット) | 説明 |
---|---|---|
Heart Rate | 0x180D | 心拍数モニタリング |
Battery Service | 0x180F | バッテリー残量の取得 |
Device Information | 0x180A | デバイスの製造情報など |
Health Thermometer | 0x1809 | 体温測定 |
Blood Pressure | 0x1810 | 血圧測定 |
登録されているUUIDについては以下Bluetooth SIGのサイト内PDFファイルで確認できます。

キャラクタリスティックUUID
キャラクタリスティックUUIDは、サービス内の「個々のデータ項目」を表します。
サービスUUIDと同様にBluetooth SIGで定義されており、以下表のように16ビットのUUIDで表されます。
キャラクタリスティック名 | UUID(16ビット) | 説明 |
---|---|---|
Battery Level | 0x2A19 | バッテリー残量(%) |
Heart Rate Measurement | 0x2A37 | 心拍数データ |
Manufacturer Name String | 0x2A29 | 製造者名 |
Temperature Measurement | 0x2A1C | 温度データ |
サービスUUIDとキャラクタリスティックUUIDの違い
サービスUUIDとキャラクタリスティックUUIDの違いは以下表のようになります。
項目 | サービスUUID | キャラクタリスティックUUID |
---|---|---|
役割 | 機能のまとまり(例:心拍数サービス) | 個々のデータ項目(例:心拍数値) |
階層 | 上位 | 下位(サービスの中に含まれる) |
カスタムUUID
他にも開発者が独自に定義するカスタムUUIDもあります。
今回ここで紹介する「UART通信」のようなデータの送受信(TX/RX)機能を実現するためには、多くのデバイスメーカーが独自のカスタムUUIDを設定しています。
特に広く使われているのが「Nordic Semiconductor」が提供する「NUS(Nordic UART Service)」で、そのUUIDは以下のような128ビットで設定されています。
・サービスUUID:
6E400001-B5A3-F393-E0A9-E50E24DCCA9E
・受信用キャラクタリスティックUUID:
6E400002-B5A3-F393-E0A9-E50E24DCCA9E
・送信用キャラクタリスティックUUID:
6E400003-B5A3-F393-E0A9-E50E24DCCA9E
2.動作確認の準備
・開発環境Tonnyのインストール
開発環境thonnyは以下のリンクからダウンロードできます。
Thonnyのインストール方法や使い方は以下のリンクで詳しく紹介しています。

・ライブラリの準備
今回使用するライブラリは「MicroPython」の「aioble」です。
初期状態で使用できるため、個別にインストールする必要はありません。
Bluetooth通信を行うには「bluetooth」モジュールを使用しますが、ライブラリとしては以下の公式サイトのように「aioble」を使用することを推奨しています。
3.Bluetooth Low Energy(BLE)通信の使い方
・ライブラリの使い方
今回使用する「MicroPython」のBluetooth Low Energy(BLE)用ライブラリ「aioble」を使用するには「bluetooth」モジュールと一緒に以下のようにインポートして使用します。
import aioble # MicroPythonのBLEライブラリ
import bluetooth # MicroPythonのBLEモジュール
import uasyncio as asyncio # 非同期処理ライブラリ
今回使用する「aioble」ライブラリの基本的な使用方法についてはUUIDの設定等、初期設定が複雑なため、詳細は「サンプルプログラムの詳細」の内容を確認してください。
以下、大まかな流れを紹介します。
# UUIDの設定やキャラクタリスティックの設定、各サービスの開始
# --- 省略(サンプルプログラム参照)---
# アドバタイズド開始
async with await aioble.advertise( # BLEアドバタイズを開始
250_000, # アドバタイズ間隔(マイクロ秒)
name="RPi-Pico", # アドバタイズ名(BLE接続デバイス名)
services=[UART_SERVICE_UUID], # サービスUUIDを指定
) as connection: # 接続待機
# データ送信(接続先の「connection」に対して「data」を送信)
await tx_char.notify(connection, data.encode()) # データをエンコードして送信
# データ受信(接続先の「connection」から「tx_char」に書き込まれたデータを「value」に格納)
connection, value = await rx_char.written() # 受信処理(受信データが来るまでここで待機する)
・動作確認方法
動作についてはスマホアプリの「BLE Terminal」等でも確認できますが、ここでは下画像のような自作のPythonアプリを使用します。

このアプリを使用することで、周辺のBLEデバイスの「スキャン」から、「接続」してデータの「送受信」や「記録」機能で、受信したデータをcsvファイルに保存することもできます。
4.動作確認1:PicoW基板内蔵温度データ取得
サンプルプログラムを使用して実際に動作確認していきます。
動作確認1では「PicoW」基板内蔵の温度センサーの値をBluetooth Low Energy(BLE)通信で受信して確認します。
BLE通信接続中はPicoW基板上のLEDが点灯し、接続が解除されると消灯します。
・動作紹介1
動作確認の様子は以下になります。
開発環境のThonnyを起動したら下画像のように、サンプルプログラムをコピペで貼り付けて[実行]ボタンを押します。

Pythonアプリをパソコンから起動して接続すると、下画像のように1秒ごとにPico基板の内蔵温度センサーの値が送信されて、パソコン側のアプリで確認したり、記録することができます。

パソコン側のPythonアプリでの動作確認の様子は以下のようになります。

Pythonアプリで[スキャン]ボタンを押してしばらくすると、デバイス名「RPi-Pico」が見つかるので、上画像のようにドロップダウンリストからデバイス名に「RPi-Pico」を指定します。

[接続]ボタンを押すと、PicoW基板への接続が開始され上画像のように「接続成功」が表示されると、1秒ごとに温度データを受信して確認することができます。
[切断ボタン]を押すと、接続が解除されて温度データの受信は終了します。

次に[記録]ボタンを押すと上画像のように「記録開始:〜」が表示され、Pythonアプリファイルがあるフォルダにタイムスタンプ付きのcsvファイルが作成されます。(下画像)

「記録」状態で[接続]ボタンを押すと、温度データが表示され、同時にcsvファイルにデータが保存されていきます。
[切断]ボタンを押すと温度データの受信と保存が終了します。

[記録]ボタンを押すと上画像のように、タイムスタンプ付きのcsvファイルが作成されます。
このcsvファイルは空で作成されますが、データを受信するごとに追記して保存されていきます。

csvファイルを開くと上画像のように受信したデータが保存されているのが確認できます。
csv形式で保存しているため、エクセル等で開いてグラフ表示やデータ分析等に使用できます。

「記録」を終了するには上画像のように[停止]ボタンを押します。

[削除]ボタンを押すことで、受信テキストエリアの表示内容を全て消去することができます。
・サンプルプログラム1
サンプルプログラムは以下になります。
コピペで貼り付けて実行してください。コピーは下の黒塗り部右上のアイコンクリックでもできます。
# pico_ble_temp.py (micropython)
# BLE接続の基本設定完了後、Pico基板内蔵の温度センサの値を取得して1秒ごとに送信
import aioble # MicroPythonのBLEライブラリ
import bluetooth # MicroPythonのBLEモジュール
import uasyncio as asyncio # 非同期処理ライブラリ
from machine import Pin, ADC # PicoのGPIOとADCモジュールをインポート
UART_SERVICE_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E") # UARTサービスUUID
UART_TX_CHAR_UUID = bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E") # UART送信用キャラクタリスティックUUID
uart_service = aioble.Service(UART_SERVICE_UUID) # UARTサービスを定義
tx_char = aioble.Characteristic(uart_service, UART_TX_CHAR_UUID, notify=True) # UART送信用キャラクタリスティックを定義
aioble.register_services(uart_service) # GATTサービスを登録
led = Pin("LED", Pin.OUT) # LED端子をledとして出力に設定
# Pico内蔵温度センサーから温度を取得
def getTemp():
sensor_temp = ADC(4)
conversion_factor = 3.3 / 65535
reading = sensor_temp.read_u16()
voltage = reading * conversion_factor
temperature = 27 - (voltage - 0.706) / 0.001721
print("Temperature: {:.2f} °C".format(temperature))
return temperature
# BLE Peripheralとして動作するメインループ
async def ble_notify_loop():
while True:
try: # BLE接続を待機
led.value(0) # 接続待機中はLED消灯
async with await aioble.advertise( # BLEアドバタイズを開始
250_000, # アドバタイズ間隔(マイクロ秒)
name="RPi-Pico", # アドバタイズ名(BLE接続デバイス名)
services=[UART_SERVICE_UUID], # サービスUUIDを指定
) as connection: # 接続待機
print("接続中... :", connection.device) # 接続されたデバイス情報を表示
await asyncio.sleep(2) # 接続安定化のため少し待つ
while connection.is_connected(): # 接続が維持されている間ループ
led.value(1) # 接続されたらLED点灯
try: # データを送信(try:~内で実行しないと送信できない)
temp = getTemp() # 温度センサーから温度を取得
data = f"{temp:.2f}℃\n" # 小数点以下2桁にフォーマット
await tx_char.notify(connection, data.encode()) # データをエンコードして送信
except Exception:
pass # エラー(Notify Error: 'NoneType' object isn't iterable)は無視して続行
await asyncio.sleep(1) # 1秒ごとにデータを送信
except Exception as e: # アドバタイズエラー処理
print("Error:", e) # エラーが発生した場合の処理
await asyncio.sleep_ms(1000) # 1秒待機して再試行
asyncio.run(ble_notify_loop()) # PicoのBLE Peripheralとして動作開始
・サンプルプログラム1の詳細
サンプルプログラムの詳細は以下になります。
Bluetooth Low Energy(BLE)通信を行うために、まずは「9,10行目」で以下のように自分の存在を周囲に知らせる「アドバタイズ(広告)」のための「UUID」を設定します。
UART_SERVICE_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E") # UARTサービスUUID
UART_TX_CHAR_UUID = bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E") # UART送信用キャラクタリスティックUUID
次に「12〜15行目」で以下のように、設定した「UUID」を定義して、セントラル(ここではパソコンアプリ)が接続要求を送信してきた時のために「GATTサービス」を登録しておきます。
uart_service = aioble.Service(UART_SERVICE_UUID) # UARTサービスを定義
tx_char = aioble.Characteristic(uart_service, UART_TX_CHAR_UUID, notify=True) # UART送信用キャラクタリスティックを定義
aioble.register_services(uart_service) # GATTサービスを登録
「19〜25行目」の以下「getTemp()」関数は、PicoW基板内蔵の温度センサの値を取得するためのものです。換算式等はメーカが提供するものを使って温度データを取得できるようにしています
# Pico内蔵温度センサーから温度を取得
def getTemp():
以下の「30〜59行目」がBluetooth Low Energy(BLE)通信を行うためのメインループ処理です。
# BLE Peripheralとして動作するメインループ
async def ble_notify_loop():
while True:
try: # BLE接続を待機
.
ここでアドバタイズ(広告)が実行され、接続が開始されたら処理を実行
アドバタイズに失敗しても1秒後に再試行される
.
except Exception as e: # アドバタイズエラー処理
print("Error:", e) # エラーが発生した場合の処理
await asyncio.sleep_ms(1000) # 1秒待機して再試行
asyncio.run(ble_notify_loop()) # PicoのBLE Peripheralとして動作開始
以下の「30〜59行目」が自分の存在を周囲に知らせるための「アドバタイズ(広告)」処理です。
async with await aioble.advertise( # BLEアドバタイズを開始
250_000, # アドバタイズ間隔(マイクロ秒)
name="RPi-Pico", # アドバタイズ名(BLE接続デバイス名)
services=[UART_SERVICE_UUID], # サービスUUIDを指定
) as connection: # 接続待機
この処理の詳細は以下になります。
アドバタイズ間隔, # アドバタイズ(広告)の間隔をマイクロ秒単位で設定
アドバタイズ名, # スキャンした時に表示されるデバイス名になります
サービスUUID, # 広告するサービスを指定、デバイスの用途が認識される
) as connection:
非同期(async) でBLEのアドバタイズ(広告)を開始します。
「with
構文」を使うことで、接続が確立されるまでこの行で待機します。
接続が成功すると、「connection」という名前のオブジェクトが生成されます。
接続が完了すると「44〜54行目」で以下のように、取得した温度データをセントラル(ここではパソコンアプリ)へ送信(通知:notify)しています。
while connection.is_connected(): # 接続が維持されている間ループ
led.value(1) # 接続されたらLED点灯
try: # データを送信(try:~内で実行しないと送信できない)
temp = getTemp() # 温度センサーから温度を取得
data = f"{temp:.2f}℃\n" # 小数点以下2桁にフォーマット
await tx_char.notify(connection, data.encode()) # データをエンコードして送信
except Exception:
pass # エラー(Notify Error: 'NoneType' object isn't iterable)は無視して続行
await asyncio.sleep(1) # 1秒ごとにデータを送信
温度データの「送信(通知:notify)」を行なっている部分の詳細は以下になります。
ここでは、送信先の「connection」オブジェクトの送信用キャラクタリスティック「tx_char」に対してエンコードした「data」を非同期処理で「送信(通知:notify)」しています。
5.動作確認2:UART通信のようにデータ送受信
動作確認2では、「Bluetooth Low Energy(BLE)」通信でパソコン側から送信したテキストデータをPicoW基板で受信して、そのままパソコン側へ送り返す(コールバック)動作について確認します。
BLE通信接続中はPicoW基板上のLEDが点灯し、接続が解除されると消灯します。
・動作紹介2
動作確認1と同様に開発環境のThonnyを起動したら下画像のように、サンプルプログラムをコピペで貼り付けて[実行]ボタンを押します。

Pythonアプリをパソコンから起動して文字列を送信すると、下画像のようにPicoW基板で受信され、受信したデータがそのまま送り返されるコールバック動作が確認できます。

パソコン側のPythonアプリでの動作確認の様子は以下のようになります。
スキャンから接続までの動作は動作確認1の時と同じです。

「送信テキストエリア」に文字列(半角英数)を入力して[送信]ボタンをクリックすると、PicoW基板へデータが送信され、コールバックでそのままの文字列が返されて「受信テキストエリア」で確認できます。

「送信テキストエリア」の文字列は[Enter]キーを押すことでも送信できます。
※測定器のデータ受信等で同じコマンドを連続して送信できるように、送信後のテキストはクリアされないようにしています。
アプリ側の修正で消去するようにもできます。
・サンプルプログラム2
サンプルプログラムは以下になります。
コピペで貼り付けて実行してください。コピーは下の黒塗り部右上のアイコンクリックでもできます。
# pico_ble_uart_echo.py
# BLEで受信したデータをそのまま返す(コールバック)
import aioble # MicroPythonのBLEライブラリ
import bluetooth # MicroPythonのBLEモジュール
import uasyncio as asyncio # 非同期処理ライブラリ
from machine import Pin # PicoのGPIOモジュールをインポート
UART_SERVICE_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E") # UARTサービスUUID
UART_RX_CHAR_UUID = bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E") # UART受信用キャラクタリスティックUUID
UART_TX_CHAR_UUID = bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E") # UART送信用キャラクタリスティックUUID
# サービスとキャラクタリスティックの定義
uart_service = aioble.Service(UART_SERVICE_UUID) # UARTサービスを定義
tx_char = aioble.Characteristic(uart_service, UART_TX_CHAR_UUID, read=True, notify=True) # UART送信用キャラクタリスティックを定義
rx_char = aioble.Characteristic(uart_service, UART_RX_CHAR_UUID, write=True, capture=True) # UART受信用キャラクタリスティックを定義
aioble.register_services(uart_service) # GATTサービスを登録
led = Pin("LED", Pin.OUT) # LED端子をledとして出力に設定
# 非同期で受信タスクを実行
async def handle_rx(connection):
# 受信待ちループ
while connection.is_connected():
try:
connection, value = await rx_char.written() # 受信処理(受信データが来るまでここで待機する)
print("受信:", value) # 受信したデータを表示
# 受信したデータをそのまま通知(送信)
try:
if connection and connection.is_connected(): # 接続が有効な場合のみ送信
print("送信:", value) # 先に出力しておく(この後エラーになるため)
await tx_char.notify(connection, value) # 通知(送信)は成功するがエラーになる
else:
print("通知対象が無効です:", connection) # 接続が無効な場合の処理
except Exception:
pass # エラー(Notify Error: 'NoneType' object isn't iterable)は無視して続行
except Exception as e:
print("受信/送信エラー:", e)
break
# BLE Peripheralとして動作するメインループ
async def ble_uart_echo():
while True:
led.value(0) # 接続待機中はLED消灯
try:
async with await aioble.advertise( # BLEアドバタイズを開始
250_000, # アドバタイズ間隔(マイクロ秒)
name="RPi-Pico", # アドバタイズ名(BLE接続デバイス名)
services=[UART_SERVICE_UUID], # サービスUUIDを指定
) as connection: # 接続待機
print("接続中:", connection.device) # 接続されたデバイス情報を表示
led.value(1)
# 受信処理を別タスクで開始
rx_task = asyncio.create_task(handle_rx(connection))
# 接続が切れるまで待機
while connection.is_connected():
await asyncio.sleep_ms(100)
# 接続解除 → タスクキャンセル
rx_task.cancel()
led.value(0) # LED消灯
print("接続解除されました")
except Exception as e:
print("アドバタイズエラー:", repr(e))
await asyncio.sleep_ms(200)
# PicoのBLE Peripheralとして動作開始
asyncio.run(ble_uart_echo())
・サンプルプログラム2の詳細
サンプルプログラムの詳細は以下になります。
「UUID」の設定から「GATTサービス」の登録までは動作確認1と同じです。
データの受信は「22〜41行目」で以下のように非同期関数で実行しています。
メインループ内で実行してもいいのですが、省電力な「Bluetooth Low Energy(BLE)」通信の特性なのか、受信処理を実行すると実行した時点で低消費電力状態でその場で待機するのでLEDの点灯消灯の制御等他の処理との連携がうまくいかなかったため、非同期関数で実行しています。
# 非同期で受信タスクを実行
async def handle_rx(connection):
# 受信待ちループ
while connection.is_connected():
try:
connection, value = await rx_char.written() # 受信処理(受信データが来るまでここで待機する)
print("受信:", value) # 受信したデータを表示
# 受信したデータをそのまま通知(送信)
try:
if connection and connection.is_connected(): # 接続が有効な場合のみ送信
print("送信:", value) # 先に出力しておく(この後エラーになるため)
await tx_char.notify(connection, value) # 通知(送信)は成功するがエラーになる
else:
print("通知対象が無効です:", connection) # 接続が無効な場合の処理
except Exception:
pass # エラー(Notify Error: 'NoneType' object isn't iterable)は無視して続行
except Exception as e:
print("受信/送信エラー:", e)
break
データ受信部は「27行目」で詳細は以下になります。
「await rx_char.written()」が実行されると、設定した受信用キャラクタリスティック「rx_char」 に書き込みされるまで待機します。
「connection」は受信したデータの送信元(セントラル)を表します。
これにより、どのクライアントに対して応答すべきかが分かるため、複数接続にも対応可能です。
「value」には書き込まれた(セントラルから受信した)データ(bytes型)が格納されます。
6.自作Pythonアプリの紹介
最後に、動作確認に使用した自作のPythonアプリについて紹介しておきます。
Pythonの「tkinter」で操作画面を作成して「bleak」モジュールでBLE通信を制御しています。
以下にコピペ用サンプルプログラムを準備しました。実行方法についても紹介しておきます。
・コピペ用プログラム
サンプルプログラムは以下になります。メモ帳等にコピペで貼り付けて拡張子を「.py」に変更して実行してください。コピーは下の黒塗り部右上のアイコンクリックでもできます。
import asyncio
import csv
import datetime
import threading
import tkinter as tk
from tkinter import ttk, scrolledtext
from bleak import BleakClient, BleakScanner
# UUID定義
UART_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
UART_RX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E"
UART_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"
class BLEUARTApp:
def __init__(self, root):
self.client = None
self.csv_file = None
self.recording = False
self.loop = asyncio.new_event_loop()
threading.Thread(target=self.loop.run_forever, daemon=True).start()
self.setup_gui(root)
root.protocol("WM_DELETE_WINDOW", self.on_close)
def setup_gui(self, root):
root.title("BLE UART Communication")
root.geometry("400x350")
style = ttk.Style()
style.configure("Right.TCombobox", justify='right') # テキストを右寄せ
# デバイス名選択用コンボボックス(setup_guiの中)
tk.Label(root, text="デバイス名:", anchor="e").grid(row=0, column=0, sticky="e")
self.device_name_var = tk.StringVar()
self.device_combo = ttk.Combobox(root, textvariable=self.device_name_var)
self.device_combo.grid(row=0, column=1, columnspan=2, sticky="ew")
# 送信テキストの改行オプション
self.send_append_lf = tk.BooleanVar(value=True) # 初期状態でLFがオン
self.send_append_cr = tk.BooleanVar(value=False)
line_control_frame = tk.Frame(root)
line_control_frame.grid(row=0, column=3, sticky="n", padx=5)
tk.Checkbutton(line_control_frame, text="LF", variable=self.send_append_lf).pack(anchor="w")
tk.Checkbutton(line_control_frame, text="CR", variable=self.send_append_cr).pack(anchor="w")
# 送信テキスト
self.send_entry = tk.Entry(root)
self.send_entry.grid(row=1, column=0, columnspan=4, sticky="ew")
self.send_entry.bind("<Return>", lambda e: self.send_data())
# 受信テキストエリア + スクロール
self.recv_text = scrolledtext.ScrolledText(root)
self.recv_text.grid(row=2, column=0, columnspan=4, sticky="nsew")
# 操作ボタン
self.scan_btn = tk.Button(root, text="スキャン", width=10, command=self.scan_devices)
self.scan_btn.grid(row=3, column=0)
self.connect_btn = tk.Button(root, text="接続", width=10, command=self.connect_device)
self.connect_btn.grid(row=3, column=1)
self.disconnect_btn = tk.Button(root, text="切断", width=10, command=self.disconnect_device)
self.disconnect_btn.grid(row=3, column=2)
self.send_btn = tk.Button(root, text="送信", width=10, command=self.send_data)
self.send_btn.grid(row=3, column=3)
# 記録制御ボタン
self.record_btn = tk.Button(root, text="記録", width=10, command=self.start_recording)
self.record_btn.grid(row=4, column=0)
self.stop_btn = tk.Button(root, text="停止", width=10, command=self.stop_recording)
self.stop_btn.grid(row=4, column=1)
self.clear_btn = tk.Button(root, text="削除", width=10, command=lambda: self.recv_text.delete(1.0, tk.END))
self.clear_btn.grid(row=4, column=2)
# レイアウト調整用の空白行
tk.Frame(root, height=10).grid(row=5, column=0, columnspan=4)
root.grid_columnconfigure(0, weight=1)
root.grid_columnconfigure(1, weight=1)
root.grid_columnconfigure(2, weight=1)
root.grid_columnconfigure(3, weight=1)
root.grid_rowconfigure(2, weight=1)
def log_to_gui(self, message):
timestamp = datetime.datetime.now().strftime("%H:%M:%S")
formatted = f"[{timestamp}] {message}"
self.recv_text.insert(tk.END, formatted + "\n")
self.recv_text.yview(tk.END)
print(message) # コンソールへも出力
async def handle_notifications(self, sender, data):
try:
text = data.decode()
lines = text.split("\n")
timestamp = datetime.datetime.now().strftime("%H:%M:%S")
for line in lines:
if not line.strip():
continue
formatted_line = f"[{timestamp}] {line.strip()}"
self.recv_text.insert(tk.END, formatted_line + "\n")
self.recv_text.yview(tk.END)
if self.recording and self.csv_filename:
try:
with open(self.csv_filename, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
fields = [timestamp] + [float(x) if x.replace(".", "", 1).isdigit() else x for x in line.split(",")]
writer.writerow(fields)
except Exception as e:
self.log_to_gui(f"記録中のエラー: {e}")
except Exception as e:
self.log_to_gui(f"受信処理エラー: {e}")
def scan_devices(self):
asyncio.run_coroutine_threadsafe(self.async_scan(), self.loop)
async def async_scan(self):
try:
self.log_to_gui("BLEデバイスのスキャン開始")
devices = await BleakScanner.discover(timeout=5.0)
names = sorted({d.name for d in devices if d.name}) # 重複除去 + ソート
if names:
self.device_combo['values'] = names
self.device_combo.set(names[0]) # 最初の項目を初期選択
self.log_to_gui(f"見つかったデバイス一覧: {names}")
else:
self.device_combo.set("")
self.device_combo['values'] = []
self.log_to_gui(f"名前付きデバイスが見つかりません")
except Exception as e:
self.log_to_gui(f"スキャンエラー: {e}")
async def connect(self, device_name):
try:
if self.client:
await self.client.disconnect()
self.log_to_gui(f" {device_name}へ接続中...")
devices = await BleakScanner.discover()
target = next((d for d in devices if d.name == device_name), None)
if target is None:
self.log_to_gui(f"デバイスが見つかりません")
return
self.client = BleakClient(target)
await self.client.connect()
await self.client.start_notify(UART_TX_CHAR_UUID, self.handle_notifications)
self.log_to_gui(f"接続成功")
except Exception as e:
self.log_to_gui(f"接続エラー: {e}")
def connect_device(self):
device_name = self.device_name_var.get()
if not device_name:
return
asyncio.run_coroutine_threadsafe(self.connect(device_name), self.loop)
def send_data(self):
data = self.send_entry.get()
# 末尾追加処理
suffix = ""
if self.send_append_cr.get():
suffix += "\r"
if self.send_append_lf.get():
suffix += "\n"
data += suffix
if not self.client or not self.client.is_connected:
return
asyncio.run_coroutine_threadsafe(
self.client.write_gatt_char(UART_RX_CHAR_UUID, data.encode()),
self.loop
)
# self.send_entry.delete(0, tk.END) # 送信後に入力欄をクリアしないようにコメントアウト
def start_recording(self):
timestamp = datetime.datetime.now().strftime("%y%m%d_%H%M%S")
self.csv_filename = f"{timestamp}.csv"
self.recording = True
# 空のCSVファイルを作成
try:
with open(self.csv_filename, "a", newline="", encoding="utf-8") as f:
pass # 何も書き込まずに即閉じる(ヘッダーを追加する場合は無効に)
# writer = csv.writer(f) # ヘッダー行を追加する場合は以下を有効に
# writer.writerow(["timestamp", "受信データ"]) # ヘッダーの内容
except Exception as e:
self.log_to_gui(f"記録ファイル作成エラー: {e}")
return
self.log_to_gui(f"記録開始: {self.csv_filename}")
def stop_recording(self):
self.recording = False
self.csv_filename = None
self.log_to_gui("記録停止")
def disconnect_device(self):
if self.client and self.client.is_connected:
asyncio.run_coroutine_threadsafe(self.client.disconnect(), self.loop)
self.log_to_gui("接続を解除しました")
self.client = None
else:
self.log_to_gui("現在接続されていません")
def on_close(self):
asyncio.run_coroutine_threadsafe(self.cleanup(), self.loop)
self.loop.call_soon_threadsafe(self.loop.stop)
root.quit() # GUIイベントループを終了
async def cleanup(self):
if self.client:
try:
await self.client.disconnect()
print("接続解除")
except Exception as e:
print("切断時エラー:", e)
self.stop_recording()
root.destroy()
# アプリ起動
root = tk.Tk()
app = BLEUARTApp(root)
root.mainloop()
・実行方法
サンプルプログラムのPythonアプリの実行方法について紹介します。
このアプリを実行するにはPythonの実行環境が必要です。
Pythonのインストール方法は以下のリンクで詳しく紹介しています。

専用の作業フォルダを作った方がわかりやすいので、まずは好きな場所に好きな名前で作業フォルダを作成します。
次にメモ帳アプリを開きます。開いたら上のサンプルプログラムをコピペで貼り付けます。
拡張子を「.txt」から「.py」に変更して下画像のように作業フォルダ「python」に保存します。

pythonファイルが保存できたら、下画像のようにWindows画面下の検索窓に「cmd」と入力して「コマンドプロンプト」の[開く]をクリックしてコマンドプロンプトを起動します。

起動したら、まずPythonのBLE通信制御用の「bleak」ライブラリを下のコマンドを入力してインストールします。
pip install bleak
実際に入力している画像は以下になります。

以下のような画面が表示された場合は、インストールに使用する「pip」のアップグレードが必要です。
黄緑色で表示されているコマンドをコピペで貼り付けて実行してください。

アップグレードが完了したら再度「bleak」のインストールコマンドを入力してインストールします。
「bleak」のインストールが完了したら、以下のコマンド(cd 作業フォルダのパス)で、pythonアプリファイルを保存した作業フォルダへ移動します。(作業環境によってパスは異なります。)
cd Documents\python
実際に入力した画像は以下になります。

最後に以下のコマンドを入力して、pythonアプリを起動します。
python BLE_tkinter.py
実際に入力した画像は以下になります。

実行されると以下のようにアプリ画面が表示されます。

このアプリでBLE通信の基本的なデータの送受は動作確認できるので、サンプルプログラムの動作確認用に使用してください。
7.まとめ
「Raspberry Pi PicoW」でBluetooth Low Energy(BLE)通信を使用した、データの送受信方法をサンプルプログラムを使用して詳しく紹介しました。
Bluetooth Low Energy(BLE)とは、従来のBluetoothクラシックとは異なり、Bluetooth 4.0以降で導入された省電力通信規格で、クラシックと比較すると通信速度は遅く大量のデータ転送には不向きですが、低消費電力で同時に多数のデバイスと容易に接続でき、ペアリングも必須ではないため、低容量のデータ送受信やデバイスの温度データ取得等、センサーデバイスやウェアラブル機器など、バッテリー駆動で長時間稼働する機器には最適です。
Bluetoothクラシックのように「仮想COMポート」としてシリアル通信のように使用することはできませんが、今回紹介したようにBLE通信用のアプリを使用すれば、ペアリング不要で簡単にデータの送受信を行うことができます。
Bluetooth Low Energy(BLE)でもペアリングを行った通信は可能ですが、今回のようなデバイスの温度データを取得するだけや、近くにデバイスが存在しているかどうかを確認するビーコン的な用途にはBluetooth Low Energy(BLE)が最適です。
今回は触れていませんがペアリング認証を使用した通信方法も今後紹介していければと思います。

コメント