- RaspberryPi
- Shadow
- mqtt
- paho
- awsIoT
Last updated at Posted at 2024-06-27
この記事について
shadowとwillとretainを活用してAWS IoT CoreとPubSubする③pahoでpubsubで行ったmessageのpubsubにshadowのupdate/getを追加します。
今回はラズパイに接続された仮想device_0の正常稼働とdevice停止をAWS IoTのdevice shadowに反映させます。
shadowのtopic
Device shadowはAWS IoTで用意されているサービスの一種で、一般的なMQTTメソッドではありません。加えてAWS IoT Coreでshadowの各種イベントが発生する際に使われる11種類のtopicがシステム予約で決められています。
今回は、この中の/update/documentsと/update/deltaを使ってshdowとshadow deltaのupdateをキャッチするcustom callbackを作成します。
名前 | アクション | MQTT トピック |
---|---|---|
/get | 発行 | $aws/things/MQTTServer1/shadow/name/device_0/get |
/get/accepted | サブスクライブ | $aws/things/MQTTServer1/shadow/name/device_0/get/accepted |
/get/rejected | サブスクライブ | $aws/things/MQTTServer1/shadow/name/device_0/get/rejected |
/update | 発行 | $aws/things/MQTTServer1/shadow/name/device_0/update |
/update/delta | サブスクライブ | $aws/things/MQTTServer1/shadow/name/device_0/update/delta |
/update/accepted | サブスクライブ | $aws/things/MQTTServer1/shadow/name/device_0/update/accepted |
/update/documents | サブスクライブ | $aws/things/MQTTServer1/shadow/name/device_0/update/documents |
/update/rejected | サブスクライブ | $aws/things/MQTTServer1/shadow/name/device_0/update/rejected |
/delete | 発行 | $aws/things/MQTTServer1/shadow/name/device_0/delete |
/delete/accepted | サブスクライブ | $aws/things/MQTTServer1/shadow/name/device_0/accepted |
/delete/rejected | サブスクライブ | $aws/things/MQTTServer1/shadow/name/device_0/rejected |
callback
custom callback
一般的なMQTTメソッドではない為callbackも用意されていません。そこでpaho-mqttのclient.message_callback_add関数を使いcustom callbackを作成します。
message_callback_add(sub : str , callback : Callable)
特定のトピックのメッセージ コールバックを登録します。 'sub' に一致するメッセージは 'callback' に渡されます。一致しないメッセージはデフォルトのon_messageコールバックに渡されます。複数のトピック固有のコールバックを定義するには、異なる「sub」を使用して複数回呼び出します。
トピック固有のコールバックはmessage_callback_remove()で削除できます。
shdow updateのsubscribe用callback
AWS IoT は、シャドウの更新が正常に実行されるたびに、このトピックに状態ドキュメントを公開します。
shdow updateをキャッチするには/update/documentsが利用できます。
今回のホスト名でdevice_0について/update/documentsをsubする場合topicは次のようになります。
$aws/things/MQTTServer1/shadow/name/device_0/update/documents
/update/documentsで呼び出すcallbackの中身としてon_shadow_updateメソッドを準備します。
機能はon_messageと同じですが、shadowがupdateされた時だけpayloadをdecodeして出力させます。
def on_shadow_update(client, userdata, msg): payload_decoded = json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False) print("Shadow Update detected: " + payload_decoded)
他のcallbackと同様に、接続時にon_shadow_update()を設定しておくことでupdate時にcallbackの出力が得られます。
def aws_connect(client): . . client.message_callback_add(f"$aws/things/MQTTServer1/shadow/name/device_0/update/documents", on_shadow_update)
shdow delta updateのsubscribe用callback
shdow delta updateをキャッチするには/update/deltaが利用できます。
AWS IoTは、デバイスのシャドウの変更を受け入れると、このトピックに応答状態ドキュメントを公開します。また、リクエスト状態ドキュメントには、desiredとreportedの値とさまざまな状態が含まれます。
とありますが、responseを見るとdesiredのみが返されます。当然ですがdeltaが発生(更新)していないときはこのresponseは返されません。
今回のホスト名でdevice_0について/update/deltaをsubする場合topicは次のようになります。
$aws/things/MQTTServer1/shadow/name/device_0/update/delta
/update/deltaで呼び出すcallbackの中身としてon_shadowdelta_updateメソッドを準備します。
機能はon_messageと同じですが、shadowのdeltaがupdateされた時だけpayloadをdecodeして出力させます。
def on_shadowdelta_update(client, userdata, msg): payload_decoded = json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False) print("Shadow Delta Update detected: " + payload_decoded)
他のcallbackと同様に、接続時にon_shadowdelta_update()を設定しておくことでupdate時にcallbackの出力が得られます。
def aws_connect(client): . . . client.message_callback_add(f"$aws/things/MQTTServer1/shadow/name/device_0/update/delta", on_shadowdelta_update)
customメソッド
shadow_update()
desiredとreportedの状態をjsonに設定してpubするメソッドです。desired stateを固定してreported stateを変化させて後ほどdelta発生時の動作を確認します。
stateは…最後までこの路線で行きますヨシッ👉
desired_state = "ヨシッ👉"state_ok = "ヨシッ👉"state_ng = "ダメ🙅"def shadow_update(client, state): topic = f"$aws/things/MQTTServer1/shadow/name/device_0/update" print(topic) payload = { "state": { "desired": { "device_no": device_no, "state": desired_state }, "reported": { "device_no": device_no, "state": state_ok #state_ng } } } client.publish(topic, json.dumps(payload), qos=1) print(f"Shadow update sent: {payload}")
shadow_get()
updateされたshadowをgetします。今回は確認用で使います。
def shadow_get(client): shadow_update_topic = f"$aws/things/MQTTServer1/shadow/name/device_0/update/documents" print("Subscribing to shadow update topic:", shadow_update_topic) client.subscribe(shadow_update_topic, qos=1)
shadowdelta_get()
shadowのdeltaがupdateされた場合にgetします。同様に確認用で使います。
def shadowdelta_get(client): shadow_update_topic = f"$aws/things/MQTTServer1/shadow/name/device_0/update/delta" print("Subscribing to shadow delta update topic:", shadow_update_topic) client.subscribe(shadow_update_topic, qos=1)
Demo code 全体
前回のコードに今回作成したcallbackとメソッドを追加すると以下のようになります。
pubsub_shadow_paho.py
#!/usr/bin/python# -*- coding: utf-8 -*-import osimport sysimport jsonimport sslimport timeimport subprocessimport paho.mqtt.client as mqttendpoint = 'xxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com'hostname = os.uname()[1]port = 8883cert = f'./cert/MQTTServer1-certificate.pem.crt'key = f'./cert/MQTTServer1-private.pem.key'ca = f'./cert/AmazonRootCA1.pem'device_no = 0topic = f'MQTTServer/MQTTServer1'msg = "AWS IoT 接続ヨシッ👉"desired_state = "ヨシッ👉"def on_connect(client, userdata, flags, respons_code): if respons_code != 0: print(f"Restart after 120 secs due to the connection cannot be established: respons_code: {respons_code} flags: {flags}") print('Connected')def on_disconnect(client, userdata, respons_code): if respons_code != 0: print(f"Unexpected disconnection.") else: print(f"Disconnected successfully.")def on_message(client, userdata, msg): print(f"Received message: {json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False)}\non topic: {msg.topic}\nwith QoS: {msg.qos}") returndef on_shadow_update(client, userdata, msg): payload_decoded = json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False) print("Shadow Update detected: " + payload_decoded)def on_shadowdelta_update(client, userdata, msg): payload_decoded = json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False) print("Shadow Delta Update detected: " + payload_decoded)def get_ssid(): cmd = 'iwconfig wlan0|grep ESSID' r = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.decode().rstrip() idx = r.find('ESSID:') if r[idx + 7:-1] == "ff/an": print(f"Restart after 120 secs due to the connection cannot be determined: ESSID: {r[idx + 7:-1]}") time.sleep(120) subprocess.call(["sudo", "reboot"])def mqtt_init(clientId): try: client = mqtt.Client(client_id=clientId, protocol=mqtt.MQTTv311) client.tls_set( ca, certfile=cert, keyfile=key, tls_version=ssl.PROTOCOL_TLSv1_2) client.tls_insecure_set(True) except Exception as e: print(f"Restart after 120 secs due to launches an MQTT client and creates an object instance failed of: {e}") return clientdef mqtt_connect(client): client.on_connect = on_connect client.on_message = on_message client.message_callback_add(f"$aws/things/{hostname}/shadow/name/device_{device_no}/update/documents", on_shadow_update) client.message_callback_add(f"$aws/things/{hostname}/shadow/name/device_{device_no}/update/delta", on_shadowdelta_update) client.connect(endpoint, port, keepalive=60) time.sleep(1) subscribe_pub(client) time.sleep(1) shadow_get(client) time.sleep(1) shadowdelta_get(client) time.sleep(1)def mqtt_disconnect(client): client.on_disconnect = on_disconnect client.disconnect() client.loop_stop()def mqtt_publish(client, msg): data = {} _topic = topic + "/" + str(device_no) print("Publishing to topic:", _topic) data['Timestamp'] = int(time.time()) data['hostname'] = os.uname()[1] data['device_no'] = device_no data['msg'] = msg client.publish(_topic, json.dumps(data, default=json_serial), qos=1) print(_topic, data) returndef mqtt_subscribe(client): _topic = topic + "/" + str(device_no) print("Subscribing to topic:", _topic) client.subscribe(_topic, qos=1) returndef json_serial(para): return para.isoformat()def shadow_update(client, state): topic = f"$aws/things/{hostname}/shadow/name/device_{device_no}/update" print(topic) payload = { "state": { "desired": { "device_no": device_no, "state": desired_state }, "reported": { "device_no": device_no, "state": state } } } client.publish(topic, json.dumps(payload), qos=1) print(f"Shadow update sent: {payload}")def shadow_get(client): shadow_update_topic = f"$aws/things/{hostname}/shadow/name/device_{device_no}/update/documents" print("Subscribing to shadow update topic:", shadow_update_topic) client.subscribe(shadow_update_topic, qos=1)def shadowdelta_get(client): shadow_update_topic = f"$aws/things/{hostname}/shadow/name/device_{device_no}/update/delta" print("Subscribing to shadow delta update topic:", shadow_update_topic) client.subscribe(shadow_update_topic, qos=1)if __name__ == '__main__': clientid = f"{hostname}" state_ok = "ヨシッ👉" state_ng = "ダメ🙅" time.sleep(5) try: get_ssid() client = aws_init(clientid) aws_connect(client) client.loop_start() publish_pub(client, msg) shadow_update(client, state_ok) time.sleep(1) aws_disconnect(client) except KeyboardInterrupt: aws_disconnect(client) time.sleep(3) sys.exit()
実行結果
OK state
state_okをreported stateとしてupdateします。
state_ok = "ヨシッ👉"
下図の状態です。
topicは正しく設定されています。
Subscribing to topic: MQTTServer/MQTTServer1/0Subscribing to shadow update topic: $aws/things/MQTTServer1/shadow/name/device_0/update/documentsSubscribing to shadow delta update topic: $aws/things/MQTTServer1/shadow/name/device_0/update/deltaPublishing to topic: MQTTServer/MQTTServer1/0Connected
正常にpubsubを実行してます。
Message pblished : {'Timestamp': 1719502382, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT 接続ヨシッ👉'}Received message: {"Timestamp": 1719502382, "hostname": "MQTTServer1", "device_no": 0, "msg": "AWS IoT 接続ヨシッ👉"}
正常にshadowのupdateを実行してます。
$aws/things/MQTTServer1/shadow/name/device_0/updateShadow update sent: {'state': {'desired': {'device_no': 0, 'state': 'ヨシッ👉'}, 'reported': {'device_no': 0, 'state': 'ヨシッ👉'}}}
shadowのupdateをgetできました。
Shadow Update detected: {"previous": {"state": {"desired": {"device_no": 0, "state": "ヨシッ👉"}, "reported": {"device_no": 0, "state": "ダメ🙅"}}, "metadata": {"desired": {"device_no": {"timestamp": 1719502318}, "state": {"timestamp": 1719502318}}, "reported": {"device_no": {"timestamp": 1719502318}, "state": {"timestamp": 1719502318}}}, "version": 56}, "current": {"state": {"desired": {"device_no": 0, "state": " ヨシッ👉"}, "reported": {"device_no": 0, "state": "ヨシッ👉"}}, "metadata": {"desired": {"device_no": {"timestamp": 1719502382}, "state": {"timestamp": 1719502382}}, "reported": {"device_no": {"timestamp": 1719502382}, "state": {"timestamp": 1719502382}}}, "version": 57}, "timestamp": 1719502382}
shadow deltaは出力されていません。
AWS IoTのテストクライアント上でも確認できました。
NG state
state_ngをreported stateとしてupdateします。
state_ng = "ダメ🙅"
下図の状態です。
reportedを「ダメ」でupdateしています。
$aws/things/MQTTServer1/shadow/name/device_0/updateShadow update sent: {'state': {'desired': {'device_no': 0, 'state': 'ヨシッ👉'}, 'reported': {'device_no': 0, 'state': 'ダメ🙅'}}}
shadowのupdateでも確認できます。
Shadow Update detected: {"previous": {"state": {"desired": {"device_no": 0, "state": "ヨシッ👉"}, "reported": {"device_no": 0, "state": "ヨシッ👉"}}, "metadata": {"desired": {"device_no": {"timestamp": 1719507396}, "state": {"timestamp": 1719507396}}, "reported": {"device_no": {"timestamp": 1719507396}, "state": {"timestamp": 1719507396}}}, "version": 58}, "current": {"state": {"desired": {"device_no": 0, "state": "ヨシッ👉"}, "reported": {"device_no": 0, "state": "ダメ🙅"}}, "metadata": {"desired": {"device_no": {"timestamp": 1719508744}, "state": {"timestamp": 1719508744}}, "reported": {"device_no": {"timestamp": 1719508744}, "state": {"timestamp": 1719508744}}}, "version": 59}, "timestamp": 1719508744}
今度はshadow deltaが確認できます。
Shadow Delta Update detected: {"version": 59, "timestamp": 1719508744, "state": {"state": "ヨシッ👉"}, "metadata": {"state": {"timestamp": 1719508744}}}
AWS IoTのテストクライアント上でも確認できました。
このdeltaをトリガーとして自動実行ワークロードに組み込むことが可能です。
次回
shadowとwillとretainを活用してAWS IoT CoreとPubSubする⑤pahoでretainとwill
Go to list of users who liked
comment0
Go to list of comments
Register as a new user and use Qiita more conveniently
- You get articles that match your needs
- You can efficiently read back useful information
- You can use dark theme
What you can do with signing up
Sign upLogin