摘要:刪除用戶完成刪除用戶失敗加載用戶。。。。燒錄用戶錯誤燒錄用戶完畢,正在重啟。。。注意升級代碼通過傳輸時代碼過長時需要進行分段發(fā)送,本代碼僅演示未分段代碼的升級過程。測試過程中發(fā)現(xiàn)發(fā)送中文會亂碼。
? ? ? ? 對于ESP8266的開發(fā),在arduino平臺上的開發(fā)庫非常多,arduino上也可以找到esp8266?OTA的許多解決方案,最近突然對Mircopython好奇起來,想通過Mircopython寫一下esp8266運行的程序,然后過程中查找了許多的資料都沒有看到Mircopython平臺上如何OTA升級esp8266固件,于是自己胡亂做了一個用起來還不錯的替代方案,給愛好者們提供一個小小的參考思路,直接進入正題。
????????我是使用uPyCraft燒錄以及編輯代碼的,下載uPyCraft鏈接: https://pan.baidu.com/s/1HARl7J3fy0J11I_FHG8fCg 提取碼: 5u4k?
? ? ? ? 第一次插入開發(fā)版會自動提示燒錄Mircopython固件,選擇好設(shè)置點擊OK,或者通過菜單Tools→BurnFirmware進行燒錄。
????????
?用uPyCraft工具燒完成后,device文件夾為esp8266寄存器中的根目錄,device目錄下僅有一個boot.py文件,次文件為esp8266每次啟動時必定執(zhí)行一次的文件,隨后上傳simple.py以及index0.py文件,simple.py文件為MQTT連接庫,可以在從目錄uPy_lib/umqtt中直接找到拖拽到device目錄。以下代碼我也會全部給出。
?boot.py:
# This file is executed on every boot (including wake-boot from deepsleep)#import esp#esp.osdebug(None)import gcimport os#import webrepl#webrepl.start()gc.collect()base = "0"try: file = open("base.ini","r") base = file.read() file.close()except: file = open("base.ini","w") file.write("0") file.close() if base == "1": print("加載用戶1。。。。") try: os.remove("index0.py") print("刪除用戶0完成") except: print("刪除用戶0失敗") import index1 index1.connectWiFi()else : print("加載用戶0。。。。") try: os.remove("index1.py") print("刪除用戶1完成") except: print("刪除用戶1失敗") import index0 index0.connectWiFi()
simple.py:
import usocket as socketimport ustruct as struct#from ubinascii import hexlifyclass MQTTException(Exception): passclass MQTTClient: def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0,ssl=False, ssl_params={}): if port == 0: port = 8883 if ssl else 1883 self.client_id = client_id self.sock = None self.addr = socket.getaddrinfo(server, port)[0][-1] self.ssl = ssl self.ssl_params = ssl_params self.pid = 0 self.cb = None self.user = user self.pswd = password self.keepalive = keepalive self.lw_topic = None self.lw_msg = None self.lw_qos = 0 self.lw_retain = False def _send_str(self, s): self.sock.write(struct.pack("!H", len(s))) self.sock.write(s) def _recv_len(self): n = 0 sh = 0 while 1: b = self.sock.read(1)[0] n |= (b & 0x7f) << sh if not b & 0x80: return n sh += 7 def set_callback(self, f): self.cb = f def set_last_will(self, topic, msg, retain=False, qos=0): assert 0 <= qos <= 2 assert topic self.lw_topic = topic self.lw_msg = msg self.lw_qos = qos self.lw_retain = retain def connect(self, clean_session=True): self.sock = socket.socket() self.sock.connect(self.addr) if self.ssl: import ussl self.sock = ussl.wrap_socket(self.sock, **self.ssl_params) msg = bytearray(b"/x10/0/0/x04MQTT/x04/x02/0/0") msg[1] = 10 + 2 + len(self.client_id) msg[9] = clean_session << 1 if self.user is not None: msg[1] += 2 + len(self.user) + 2 + len(self.pswd) msg[9] |= 0xC0 if self.keepalive: assert self.keepalive < 65536 msg[10] |= self.keepalive >> 8 msg[11] |= self.keepalive & 0x00FF if self.lw_topic: msg[1] += 2 + len(self.lw_topic) + 2 + len(self.lw_msg) msg[9] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3 msg[9] |= self.lw_retain << 5 self.sock.write(msg) #print(hex(len(msg)), hexlify(msg, ":")) self._send_str(self.client_id) if self.lw_topic: self._send_str(self.lw_topic) self._send_str(self.lw_msg) if self.user is not None: self._send_str(self.user) self._send_str(self.pswd) resp = self.sock.read(4) assert resp[0] == 0x20 and resp[1] == 0x02 if resp[3] != 0: raise MQTTException(resp[3]) return resp[2] & 1 def disconnect(self): self.sock.write(b"/xe0/0") self.sock.close() def ping(self): self.sock.write(b"/xc0/0") def publish(self, topic, msg, retain=False, qos=0): pkt = bytearray(b"/x30/0/0/0") pkt[0] |= qos << 1 | retain sz = 2 + len(topic) + len(msg) if qos > 0: sz += 2 assert sz < 2097152 i = 1 while sz > 0x7f: pkt[i] = (sz & 0x7f) | 0x80 sz >>= 7 i += 1 pkt[i] = sz #print(hex(len(pkt)), hexlify(pkt, ":")) self.sock.write(pkt, i + 1) self._send_str(topic) if qos > 0: self.pid += 1 pid = self.pid struct.pack_into("!H", pkt, 0, pid) self.sock.write(pkt, 2) self.sock.write(msg) if qos == 1: while 1: op = self.wait_msg() if op == 0x40: sz = self.sock.read(1) assert sz == b"/x02" rcv_pid = self.sock.read(2) rcv_pid = rcv_pid[0] << 8 | rcv_pid[1] if pid == rcv_pid: return elif qos == 2: assert 0 def subscribe(self, topic, qos=0): assert self.cb is not None, "Subscribe callback is not set" pkt = bytearray(b"/x82/0/0/0") self.pid += 1 struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid) #print(hex(len(pkt)), hexlify(pkt, ":")) self.sock.write(pkt) self._send_str(topic) self.sock.write(qos.to_bytes(1, "little")) while 1: op = self.wait_msg() if op == 0x90: resp = self.sock.read(4) #print(resp) assert resp[1] == pkt[2] and resp[2] == pkt[3] if resp[3] == 0x80: raise MQTTException(resp[3]) return # Wait for a single incoming MQTT message and process it. # Subscribed messages are delivered to a callback previously # set by .set_callback() method. Other (internal) MQTT # messages processed internally. def wait_msg(self): res = self.sock.read(1) self.sock.setblocking(True) if res is None: return None if res == b"": raise OSError(-1) if res == b"/xd0": # PINGRESP sz = self.sock.read(1)[0] assert sz == 0 return None op = res[0] if op & 0xf0 != 0x30: return op sz = self._recv_len() topic_len = self.sock.read(2) topic_len = (topic_len[0] << 8) | topic_len[1] topic = self.sock.read(topic_len) sz -= topic_len + 2 if op & 6: pid = self.sock.read(2) pid = pid[0] << 8 | pid[1] sz -= 2 msg = self.sock.read(sz) self.cb(topic, msg) if op & 6 == 2: pkt = bytearray(b"/x40/x02/0/0") struct.pack_into("!H", pkt, 2, pid) self.sock.write(pkt) elif op & 6 == 4: assert 0 # Checks whether a pending message from server is available. # If not, returns immediately with None. Otherwise, does # the same processing as wait_msg. def check_msg(self): self.sock.setblocking(False) return self.wait_msg()
index0.py:
import osimport time import networkfrom simple import MQTTClientimport machinefrom machine import Pin#wifissid="XXXXX"#WiFi名稱passwd="XXXXXXXXX"#WiFi密碼#mqttclient_id = "mricopython_"mserver = "XXXXX.com"#mqtt服務(wù)器IPport=1883M_subscribe = b"py"#設(shè)備訂閱的主題M_subscribe_ota = M_subscribe + b"u"#設(shè)備訂閱的OTA主題M_topic = M_subscribe + b"r"#設(shè)備推送消息的主題client = Nonewlan = Noneled=Pin(2, Pin.OUT)led2=Pin(5, Pin.OUT, value=1)def connectWiFi(): #連接WiFi wlan = network.WLAN(network.STA_IF) wlan.active(True) if not wlan.isconnected(): print("connecting to network...") wlan.connect(ssid, passwd) while not wlan.isconnected(): pass print("network config:", wlan.ifconfig()) connect_mqtt()#處理OTA版本信息def ota(txt): file = open("base.ini","r") base = file.read() file.close() if base == "1": try: f = open("index0.py","a") f.write(txt) f.close() file = open("base.ini","w") file.write("0") file.close() print("燒錄用戶0完畢,正在重啟。。。") machine.reset() except: print("燒錄用戶0錯誤") pass else: try: f = open("index1.py","a") f.write(txt) f.close() file = open("base.ini","w") file.write("1") file.close() print("燒錄用戶1完畢,正在重啟。。。") machine.reset() except: print("燒錄用戶1錯誤") pass #MQTT信息處理def sub_cb(topic_b, msg_b): #轉(zhuǎn)換格式后打印bytes.decode(),str.decode() topic = bytes.decode(topic_b) msg = bytes.decode(msg_b) print(topic,":",msg) # 判斷MQTT信息作出相應(yīng)動作 #string.spilt(" ")分組 if topic == bytes.decode(M_subscribe): if msg=="1": led.off() if msg=="0": led.on() elif topic == bytes.decode(M_subscribe_ota): ota(msg) #連接MQTTdef connect_mqtt(): try: t = str(time.ticks_us()) print("client:") c = MQTTClient(client_id + t + str(time.ticks_us()),mserver) c.set_callback(sub_cb) c.connect() c.subscribe(M_subscribe)#訂閱信息主題 c.subscribe(M_subscribe_ota)#訂閱OTA升級主題 print("連接MQTT成功,已訂閱" + str(M_subscribe)) c.publish(M_topic,b"連接MQTT成功") #推送信息#主循環(huán),程序在此做死循環(huán) while True: c.check_msg() #接收MQTT信息 #c.publish(M_topic,b"版本1") #推送信息 except: print("重新連接MQTT") connect_mqtt()if __name__ == "__main__": connectWiFi()
思路是boot.py通過讀取系統(tǒng)生成的base.ini里的內(nèi)容判斷需要加載用戶0 “index0.py”或者是用戶1? “index1.py”進行運行,升級代碼通過mqtt傳輸?shù)絜sp8266上,esp8266接受到自定義的OTA升級主題發(fā)送的代碼后,通過判斷base.ini生成新的index0.py或者index1.py,并刪除舊的index.py。
? ? ? ? 注意:升級代碼通過mqtt傳輸時代碼過長時需要進行分段發(fā)送,本代碼僅演示未分段代碼的升級過程。測試過程中發(fā)現(xiàn)MQTT.fx發(fā)送中文會亂碼。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/121628.html
摘要:教程傳送門基于平臺開發(fā)連接巴法云簡介實驗準備硬件軟件實驗步驟點燈實驗發(fā)送溫濕度指令升級總結(jié)關(guān)于巴法云專注于開源,智造,創(chuàng)新,分享。 Arduino教程傳送門????...
摘要:超簡單深度睡眠模式下遠程采集溫濕度信息項目背景相關(guān)技術(shù)深度睡眠模式溫濕度采集數(shù)據(jù)收發(fā)前后端實現(xiàn)后端前端項目背景自己用收納箱做了一個用于存放打印耗材的干燥箱,想用閑置的開發(fā)板和溫濕度傳感器做一個遠程溫濕度監(jiān)測的小項目。 ...
摘要:開發(fā)方式是樂鑫為開發(fā)者提供的物聯(lián)應(yīng)開發(fā)平臺,包括基礎(chǔ)平臺以及上層應(yīng)開發(fā)示例,如智能燈智能開關(guān)等。指令開發(fā)方式作為芯片,指令開發(fā)也是必不可少的。開發(fā)方式即,意為運行在單片機上的。 ...
摘要:查詢附近名稱密碼連接路由器的查看路由器分配給模組的地址例如設(shè)置單連接設(shè)置透傳模式建立的服務(wù)器開始發(fā)送數(shù)據(jù)進入發(fā)送模式發(fā)送數(shù)據(jù)注意退出透傳,直接發(fā)送。取消發(fā)送新行五參考模塊指令入門指南透傳簡單使用模塊指令匯總一、所需硬件材料 1.ESP8266:01s某寶上3、5塊錢 2.杜邦線:某寶幾塊錢一組40P,這里只需要三根,用于連接 樹莓派與繼電器 ?? 3.燒錄器 ...
摘要:大多數(shù)的硬件公司很難提供能夠正常運行的。這個容器在共享。這將使很重要的數(shù)據(jù)能夠非常容易的從輸入到你的容器中。如果你想在容器內(nèi)運行這個項目是我在時做的。希望愛特梅爾公司和德州儀器將來也使用。 隨著Iot新的硬件平臺和開發(fā)板的不斷更新, SDK交付越來越多的轉(zhuǎn)向零碎化以及按需組裝。大多數(shù)的硬件公司很難提供能夠正常運行的Software Development Kits (SDKs)。 Do...
閱讀 1002·2021-11-24 10:30
閱讀 2328·2021-10-08 10:04
閱讀 3969·2021-09-30 09:47
閱讀 1452·2021-09-29 09:45
閱讀 1446·2021-09-24 10:33
閱讀 6275·2021-09-22 15:57
閱讀 2359·2021-09-22 15:50
閱讀 4091·2021-08-30 09:45