shadowとwillとretainを活用してAWS IoT CoreとPubSubする④pahoでshadow update - Qiita (2024)

お題は不問!Qiita Engineer Festa 2024で記事投稿!Qiita Engineer Festa20242024年7月17日まで開催中!
@Saito5656(5656 Saito)
  • RaspberryPi
  • Shadow
  • mqtt
  • paho
  • awsIoT

Last updated at Posted at 2024-06-27

この記事について

shadowとwillとretainを活用してAWS IoT CoreとPubSubする③pahoでpubsubで行ったmessageのpubsubにshadowのupdate/getを追加します。

今回はラズパイに接続された仮想device_0の正常稼働とdevice停止をAWS IoTのdevice shadowに反映させます。


shadowとwillとretainを活用してAWS IoT CoreとPubSubする④pahoでshadow update - Qiita (2)

shadowのtopic

Device shadowはAWS IoTで用意されているサービスの一種で、一般的なMQTTメソッドではありません。加えてAWS IoT Coreでshadowの各種イベントが発生する際に使われる11種類のtopicがシステム予約で決められています。

今回は、この中の/update/documentsと/update/deltaを使ってshdowとshadow deltaのupdateをキャッチするcustom callbackを作成します。

名前アクションMQTT トピック
/get発行$aws/things/MQTTServer1/shadow/name/device_0/get
/get/acceptedサブスクライブ$aws/things/MQTTServer1/shadow/name/device_0/get/accepted
/get/rejectedサブスクライブ$aws/things/MQTTServer1/shadow/name/device_0/get/rejected
/update発行$aws/things/MQTTServer1/shadow/name/device_0/update
/update/deltaサブスクライブ$aws/things/MQTTServer1/shadow/name/device_0/update/delta
/update/acceptedサブスクライブ$aws/things/MQTTServer1/shadow/name/device_0/update/accepted
/update/documentsサブスクライブ$aws/things/MQTTServer1/shadow/name/device_0/update/documents
/update/rejectedサブスクライブ$aws/things/MQTTServer1/shadow/name/device_0/update/rejected
/delete発行$aws/things/MQTTServer1/shadow/name/device_0/delete
/delete/acceptedサブスクライブ$aws/things/MQTTServer1/shadow/name/device_0/accepted
/delete/rejectedサブスクライブ$aws/things/MQTTServer1/shadow/name/device_0/rejected

callback

custom callback

一般的なMQTTメソッドではない為callbackも用意されていません。そこでpaho-mqttのclient.message_callback_add関数を使いcustom callbackを作成します。

message_callback_add(sub : str , callback : Callable)
特定のトピックのメッセージ コールバックを登録します。 'sub' に一致するメッセージは 'callback' に渡されます。一致しないメッセージはデフォルトのon_messageコールバックに渡されます。

複数のトピック固有のコールバックを定義するには、異なる「sub」を使用して複数回呼び出します。

トピック固有のコールバックはmessage_callback_remove()で削除できます。

shdow updateのsubscribe用callback

/update/documents

AWS IoT は、シャドウの更新が正常に実行されるたびに、このトピックに状態ドキュメントを公開します。

shdow updateをキャッチするには/update/documentsが利用できます。

今回のホスト名でdevice_0について/update/documentsをsubする場合topicは次のようになります。

$aws/things/MQTTServer1/shadow/name/device_0/update/documents 

/update/documentsで呼び出すcallbackの中身としてon_shadow_updateメソッドを準備します。

機能はon_messageと同じですが、shadowがupdateされた時だけpayloadをdecodeして出力させます。

def on_shadow_update(client, userdata, msg): payload_decoded = json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False) print("Shadow Update detected: " + payload_decoded)

他のcallbackと同様に、接続時にon_shadow_update()を設定しておくことでupdate時にcallbackの出力が得られます。

shdow delta updateのsubscribe用callback

/update/delta

shdow delta updateをキャッチするには/update/deltaが利用できます。

AWS IoTは、デバイスのシャドウの変更を受け入れると、このトピックに応答状態ドキュメントを公開します。また、リクエスト状態ドキュメントには、desiredとreportedの値とさまざまな状態が含まれます。

とありますが、responseを見るとdesiredのみが返されます。当然ですがdeltaが発生(更新)していないときはこのresponseは返されません。

今回のホスト名でdevice_0について/update/deltaをsubする場合topicは次のようになります。

$aws/things/MQTTServer1/shadow/name/device_0/update/delta 

/update/deltaで呼び出すcallbackの中身としてon_shadowdelta_updateメソッドを準備します。

機能はon_messageと同じですが、shadowのdeltaがupdateされた時だけpayloadをdecodeして出力させます。

def on_shadowdelta_update(client, userdata, msg): payload_decoded = json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False) print("Shadow Delta Update detected: " + payload_decoded)

他のcallbackと同様に、接続時にon_shadowdelta_update()を設定しておくことでupdate時にcallbackの出力が得られます。

def aws_connect(client): . . . client.message_callback_add(f"$aws/things/MQTTServer1/shadow/name/device_0/update/delta", on_shadowdelta_update)

customメソッド

shadow_update()

desiredとreportedの状態をjsonに設定してpubするメソッドです。desired stateを固定してreported stateを変化させて後ほどdelta発生時の動作を確認します。

stateは…最後までこの路線で行きますヨシッ👉

desired_state = "ヨシッ👉"state_ok = "ヨシッ👉"state_ng = "ダメ🙅"def shadow_update(client, state): topic = f"$aws/things/MQTTServer1/shadow/name/device_0/update" print(topic) payload = { "state": { "desired": { "device_no": device_no, "state": desired_state }, "reported": { "device_no": device_no, "state": state_ok #state_ng } } } client.publish(topic, json.dumps(payload), qos=1) print(f"Shadow update sent: {payload}")

shadow_get()

updateされたshadowをgetします。今回は確認用で使います。

def shadow_get(client): shadow_update_topic = f"$aws/things/MQTTServer1/shadow/name/device_0/update/documents" print("Subscribing to shadow update topic:", shadow_update_topic) client.subscribe(shadow_update_topic, qos=1)

shadowdelta_get()

shadowのdeltaがupdateされた場合にgetします。同様に確認用で使います。

def shadowdelta_get(client): shadow_update_topic = f"$aws/things/MQTTServer1/shadow/name/device_0/update/delta" print("Subscribing to shadow delta update topic:", shadow_update_topic) client.subscribe(shadow_update_topic, qos=1)

Demo code 全体

前回のコードに今回作成したcallbackとメソッドを追加すると以下のようになります。

pubsub_shadow_paho.py

#!/usr/bin/python# -*- coding: utf-8 -*-import osimport sysimport jsonimport sslimport timeimport subprocessimport paho.mqtt.client as mqttendpoint = 'xxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com'hostname = os.uname()[1]port = 8883cert = f'./cert/MQTTServer1-certificate.pem.crt'key = f'./cert/MQTTServer1-private.pem.key'ca = f'./cert/AmazonRootCA1.pem'device_no = 0topic = f'MQTTServer/MQTTServer1'msg = "AWS IoT 接続ヨシッ👉"desired_state = "ヨシッ👉"def on_connect(client, userdata, flags, respons_code): if respons_code != 0: print(f"Restart after 120 secs due to the connection cannot be established: respons_code: {respons_code} flags: {flags}") print('Connected')def on_disconnect(client, userdata, respons_code): if respons_code != 0: print(f"Unexpected disconnection.") else: print(f"Disconnected successfully.")def on_message(client, userdata, msg): print(f"Received message: {json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False)}\non topic: {msg.topic}\nwith QoS: {msg.qos}") returndef on_shadow_update(client, userdata, msg): payload_decoded = json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False) print("Shadow Update detected: " + payload_decoded)def on_shadowdelta_update(client, userdata, msg): payload_decoded = json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False) print("Shadow Delta Update detected: " + payload_decoded)def get_ssid(): cmd = 'iwconfig wlan0|grep ESSID' r = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.decode().rstrip() idx = r.find('ESSID:') if r[idx + 7:-1] == "ff/an": print(f"Restart after 120 secs due to the connection cannot be determined: ESSID: {r[idx + 7:-1]}") time.sleep(120) subprocess.call(["sudo", "reboot"])def mqtt_init(clientId): try: client = mqtt.Client(client_id=clientId, protocol=mqtt.MQTTv311) client.tls_set( ca, certfile=cert, keyfile=key, tls_version=ssl.PROTOCOL_TLSv1_2) client.tls_insecure_set(True) except Exception as e: print(f"Restart after 120 secs due to launches an MQTT client and creates an object instance failed of: {e}") return clientdef mqtt_connect(client): client.on_connect = on_connect client.on_message = on_message client.message_callback_add(f"$aws/things/{hostname}/shadow/name/device_{device_no}/update/documents", on_shadow_update) client.message_callback_add(f"$aws/things/{hostname}/shadow/name/device_{device_no}/update/delta", on_shadowdelta_update) client.connect(endpoint, port, keepalive=60) time.sleep(1) subscribe_pub(client) time.sleep(1) shadow_get(client) time.sleep(1) shadowdelta_get(client) time.sleep(1)def mqtt_disconnect(client): client.on_disconnect = on_disconnect client.disconnect() client.loop_stop()def mqtt_publish(client, msg): data = {} _topic = topic + "/" + str(device_no) print("Publishing to topic:", _topic) data['Timestamp'] = int(time.time()) data['hostname'] = os.uname()[1] data['device_no'] = device_no data['msg'] = msg client.publish(_topic, json.dumps(data, default=json_serial), qos=1) print(_topic, data) returndef mqtt_subscribe(client): _topic = topic + "/" + str(device_no) print("Subscribing to topic:", _topic) client.subscribe(_topic, qos=1) returndef json_serial(para): return para.isoformat()def shadow_update(client, state): topic = f"$aws/things/{hostname}/shadow/name/device_{device_no}/update" print(topic) payload = { "state": { "desired": { "device_no": device_no, "state": desired_state }, "reported": { "device_no": device_no, "state": state } } } client.publish(topic, json.dumps(payload), qos=1) print(f"Shadow update sent: {payload}")def shadow_get(client): shadow_update_topic = f"$aws/things/{hostname}/shadow/name/device_{device_no}/update/documents" print("Subscribing to shadow update topic:", shadow_update_topic) client.subscribe(shadow_update_topic, qos=1)def shadowdelta_get(client): shadow_update_topic = f"$aws/things/{hostname}/shadow/name/device_{device_no}/update/delta" print("Subscribing to shadow delta update topic:", shadow_update_topic) client.subscribe(shadow_update_topic, qos=1)if __name__ == '__main__': clientid = f"{hostname}" state_ok = "ヨシッ👉" state_ng = "ダメ🙅" time.sleep(5) try: get_ssid() client = aws_init(clientid) aws_connect(client) client.loop_start() publish_pub(client, msg) shadow_update(client, state_ok) time.sleep(1) aws_disconnect(client) except KeyboardInterrupt: aws_disconnect(client) time.sleep(3) sys.exit()

実行結果

OK state

state_okをreported stateとしてupdateします。

state_ok = "ヨシッ👉"

下図の状態です。

shadowとwillとretainを活用してAWS IoT CoreとPubSubする④pahoでshadow update - Qiita (3)

topicは正しく設定されています。

Subscribing to topic: MQTTServer/MQTTServer1/0Subscribing to shadow update topic: $aws/things/MQTTServer1/shadow/name/device_0/update/documentsSubscribing to shadow delta update topic: $aws/things/MQTTServer1/shadow/name/device_0/update/deltaPublishing to topic: MQTTServer/MQTTServer1/0Connected

正常にpubsubを実行してます。

Message pblished : {'Timestamp': 1719502382, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT 接続ヨシッ👉'}Received message: {"Timestamp": 1719502382, "hostname": "MQTTServer1", "device_no": 0, "msg": "AWS IoT 接続ヨシッ👉"}

正常にshadowのupdateを実行してます。

$aws/things/MQTTServer1/shadow/name/device_0/updateShadow update sent: {'state': {'desired': {'device_no': 0, 'state': 'ヨシッ👉'}, 'reported': {'device_no': 0, 'state': 'ヨシッ👉'}}}

shadowのupdateをgetできました。

Shadow Update detected: {"previous": {"state": {"desired": {"device_no": 0, "state": "ヨシッ👉"}, "reported": {"device_no": 0, "state": "ダメ🙅"}}, "metadata": {"desired": {"device_no": {"timestamp": 1719502318}, "state": {"timestamp": 1719502318}}, "reported": {"device_no": {"timestamp": 1719502318}, "state": {"timestamp": 1719502318}}}, "version": 56}, "current": {"state": {"desired": {"device_no": 0, "state": " ヨシッ👉"}, "reported": {"device_no": 0, "state": "ヨシッ👉"}}, "metadata": {"desired": {"device_no": {"timestamp": 1719502382}, "state": {"timestamp": 1719502382}}, "reported": {"device_no": {"timestamp": 1719502382}, "state": {"timestamp": 1719502382}}}, "version": 57}, "timestamp": 1719502382}

shadow deltaは出力されていません。

AWS IoTのテストクライアント上でも確認できました。

shadowとwillとretainを活用してAWS IoT CoreとPubSubする④pahoでshadow update - Qiita (4)

NG state

state_ngをreported stateとしてupdateします。

state_ng = "ダメ🙅"

下図の状態です。

shadowとwillとretainを活用してAWS IoT CoreとPubSubする④pahoでshadow update - Qiita (5)

reportedを「ダメ」でupdateしています。

$aws/things/MQTTServer1/shadow/name/device_0/updateShadow update sent: {'state': {'desired': {'device_no': 0, 'state': 'ヨシッ👉'}, 'reported': {'device_no': 0, 'state': 'ダメ🙅'}}}

shadowのupdateでも確認できます。

Shadow Update detected: {"previous": {"state": {"desired": {"device_no": 0, "state": "ヨシッ👉"}, "reported": {"device_no": 0, "state": "ヨシッ👉"}}, "metadata": {"desired": {"device_no": {"timestamp": 1719507396}, "state": {"timestamp": 1719507396}}, "reported": {"device_no": {"timestamp": 1719507396}, "state": {"timestamp": 1719507396}}}, "version": 58}, "current": {"state": {"desired": {"device_no": 0, "state": "ヨシッ👉"}, "reported": {"device_no": 0, "state": "ダメ🙅"}}, "metadata": {"desired": {"device_no": {"timestamp": 1719508744}, "state": {"timestamp": 1719508744}}, "reported": {"device_no": {"timestamp": 1719508744}, "state": {"timestamp": 1719508744}}}, "version": 59}, "timestamp": 1719508744}

今度はshadow deltaが確認できます。

Shadow Delta Update detected: {"version": 59, "timestamp": 1719508744, "state": {"state": "ヨシッ👉"}, "metadata": {"state": {"timestamp": 1719508744}}}

AWS IoTのテストクライアント上でも確認できました。

shadowとwillとretainを活用してAWS IoT CoreとPubSubする④pahoでshadow update - Qiita (6)

このdeltaをトリガーとして自動実行ワークロードに組み込むことが可能です。

次回

shadowとwillとretainを活用してAWS IoT CoreとPubSubする⑤pahoでretainとwill

Go to list of users who liked

comment0

Go to list of comments

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme

What you can do with signing up

Sign upLogin

shadowとwillとretainを活用してAWS IoT CoreとPubSubする④pahoでshadow update - Qiita (2024)
Top Articles
Latest Posts
Article information

Author: Ms. Lucile Johns

Last Updated:

Views: 5918

Rating: 4 / 5 (41 voted)

Reviews: 80% of readers found this page helpful

Author information

Name: Ms. Lucile Johns

Birthday: 1999-11-16

Address: Suite 237 56046 Walsh Coves, West Enid, VT 46557

Phone: +59115435987187

Job: Education Supervisor

Hobby: Genealogy, Stone skipping, Skydiving, Nordic skating, Couponing, Coloring, Gardening

Introduction: My name is Ms. Lucile Johns, I am a successful, friendly, friendly, homely, adventurous, handsome, delightful person who loves writing and wants to share my knowledge and understanding with you.