ある案件でRaspberry Pi Pico 2 Wを使うことになりました.前々から気になっていたのですが,なかなか手を出すまでには至らずでしたが,ある案件でいよいよ使うことになりました.開発環境はCでもPythonでもよかったのですが,勉強を兼ねてPythonを使うことにしました.通常のPythonとは異なり,MicroPythonで開発ということの様です.
Raspberry Pi PicoをBOOTモードで立ち上げるには,基板に備わるプッシュスイッチを押しながらパソコンとUSBケーブルでつなぐとよいようです.そうするとパソコン側からはRaspberry Pi Picoが外付けデバイスのように見えるので,そこへこちらのサイトから入手可能なuf2という拡張子のファイルをダウンロードし,そのファイルを外付けデバイスのように見えているRaspberry Pi Picoにコピーするだけでよいようです.あとはいったんUSBケーブルを取り外し,プッシュスイッチを押さずにUSBケーブルを接続するとPythonが動作させられる状況になります.
今回の目標は,Raspberry Pi Pico でMQTTをサブスクライブし,そのデータ内にある情報をもとにフルカラーLEDを制御するというものです.フルカラーLEDにはR,G,Bそれぞれの電圧を変更できるような可変レギュレータがあり,その可変レギュレータの電圧を決定するのに用いる抵抗として,ディジタルポテンショメータを使うというものです.このディジタルポテンショメータはI2Cで抵抗値を変えられることから,Raspberry Pi Picoを使ってディジタルポテンショメータをI2Cで制御することで,結果としてフルカラーLEDを制御することを実現します.
まずはMQTTのブローカです.今回,ブローカとしてRaspberry Pi 5を用いるため,こちらのサイトを参考にしました.
つぎにRaspberry Pi Pico 2 WにMQTTクライアントを作成します.まずはMQTTを使えなければならないため,こちらのサイトを参考にMQTTのライブラリを設定しました.行ったこととしては,Raspberry Pi Pico にmqttというフォルダを作成し,その中にsimple.pyとrobust.pyという2つのPythonプログラムを入れることです.実際,robust.pyの方は使わなくても動作しているようでした.そして,WiFiのアクセスポイントへの接続,MQTT Clientとしてブローカに接続,そしてサブスクライブする,というようなプログラムを書きました.下のプログラムはクラス化したものです.このクラスを使う時,サブスクライブした時点で呼び出される関数をコールバック関数として設定しておきます.あとはサブスクライブしたデータに基づき,I2Cで接続されたディジタルポテンショメータを制御すればよいはずです.
import network
from time import sleep
from umqtt.simple import MQTTClient
class Subscriber:
def __init__(self, ssid, ssid_password, client_id, server, port, mqtt_user, mqtt_password, keepalive, ssl, ssl_params, callback):
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
# Connect to the network
wlan.connect(ssid, ssid_password)
# Wait for Wi-Fi connection
connection_timeout = 10
while connection_timeout > 0:
if wlan.status() >= 3:
break
connection_timeout -= 1
print('Waiting for Wi-Fi connection...')
sleep(1)
# Check if connection is successful
if wlan.status() != 3:
self.is_error = True
return
else:
print('Connection successful!')
network_info = wlan.ifconfig()
print('IP address:', network_info[0])
self.is_error = False
print(mqtt_user)
print(mqtt_password)
try:
self.client = MQTTClient(client_id,
server,
port,
mqtt_user,
mqtt_password,
keepalive,
ssl,
ssl_params)
self.client.connect()
self.client.set_callback(callback)
except Exception as e:
print('Error connecting to MQTT:', e)
raise # Re-raise the exception to see the full traceback
def subscribe(self, topics):
while True:
self.client.subscribe(topics, 1);
I2Cについてはまた今度.
