资讯专栏INFORMATION COLUMN

解决esp8266 Mircopython OTA 远程升级方案

Shonim / 3859人阅读

摘要:删除用户完成删除用户失败加载用户。。。。烧录用户错误烧录用户完毕,正在重启。。。注意升级代码通过传输时代码过长时需要进行分段发送,本代码仅演示未分段代码的升级过程。测试过程中发现发送中文会乱码。

        对于ESP8266的开发,在arduino平台上的开发库非常多,arduino上也可以找到esp8266 OTA的许多解决方案,最近突然对Mircopython好奇起来,想通过Mircopython写一下esp8266运行的程序,然后过程中查找了许多的资料都没有看到Mircopython平台上如何OTA升级esp8266固件,于是自己胡乱做了一个用起来还不错的替代方案,给爱好者们提供一个小小的参考思路,直接进入正题。

        我是使用uPyCraft烧录以及编辑代码的,下载uPyCraft链接: https://pan.baidu.com/s/1HARl7J3fy0J11I_FHG8fCg 提取码: 5u4k 

        第一次插入开发版会自动提示烧录Mircopython固件,选择好设置点击OK,或者通过菜单Tools→BurnFirmware进行烧录。

        

 用uPyCraft工具烧完成后,device文件夹为esp8266寄存器中的根目录,device目录下仅有一个boot.py文件,次文件为esp8266每次启动时必定执行一次的文件,随后上传simple.py以及index0.py文件,simple.py文件为MQTT连接库,可以在从目录uPy_lib/umqtt中直接找到拖拽到device目录。以下代码我也会全部给出。

 boot.py:

# This file is executed on every boot (including wake-boot from deepsleep)#import esp#esp.osdebug(None)import gcimport os#import webrepl#webrepl.start()gc.collect()base = "0"try:  file = open("base.ini","r")  base = file.read()  file.close()except:  file = open("base.ini","w")  file.write("0")  file.close()  if base == "1":  print("加载用户1。。。。")  try:    os.remove("index0.py")    print("删除用户0完成")  except:    print("删除用户0失败")  import index1  index1.connectWiFi()else :  print("加载用户0。。。。")  try:    os.remove("index1.py")    print("删除用户1完成")  except:    print("删除用户1失败")  import index0  index0.connectWiFi()

simple.py:

import usocket as socketimport ustruct as struct#from ubinascii import hexlifyclass MQTTException(Exception):    passclass MQTTClient:  def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0,ssl=False, ssl_params={}):    if port == 0:      port = 8883 if ssl else 1883    self.client_id = client_id    self.sock = None    self.addr = socket.getaddrinfo(server, port)[0][-1]    self.ssl = ssl    self.ssl_params = ssl_params    self.pid = 0    self.cb = None    self.user = user    self.pswd = password    self.keepalive = keepalive    self.lw_topic = None    self.lw_msg = None    self.lw_qos = 0    self.lw_retain = False  def _send_str(self, s):    self.sock.write(struct.pack("!H", len(s)))    self.sock.write(s)  def _recv_len(self):    n = 0    sh = 0    while 1:      b = self.sock.read(1)[0]      n |= (b & 0x7f) << sh      if not b & 0x80:        return n      sh += 7  def set_callback(self, f):    self.cb = f  def set_last_will(self, topic, msg, retain=False, qos=0):    assert 0 <= qos <= 2    assert topic    self.lw_topic = topic    self.lw_msg = msg    self.lw_qos = qos    self.lw_retain = retain  def connect(self, clean_session=True):    self.sock = socket.socket()    self.sock.connect(self.addr)    if self.ssl:      import ussl      self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)    msg = bytearray(b"/x10/0/0/x04MQTT/x04/x02/0/0")    msg[1] = 10 + 2 + len(self.client_id)    msg[9] = clean_session << 1    if self.user is not None:      msg[1] += 2 + len(self.user) + 2 + len(self.pswd)      msg[9] |= 0xC0    if self.keepalive:      assert self.keepalive < 65536      msg[10] |= self.keepalive >> 8      msg[11] |= self.keepalive & 0x00FF    if self.lw_topic:      msg[1] += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)      msg[9] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3      msg[9] |= self.lw_retain << 5    self.sock.write(msg)    #print(hex(len(msg)), hexlify(msg, ":"))    self._send_str(self.client_id)    if self.lw_topic:      self._send_str(self.lw_topic)      self._send_str(self.lw_msg)    if self.user is not None:      self._send_str(self.user)      self._send_str(self.pswd)    resp = self.sock.read(4)    assert resp[0] == 0x20 and resp[1] == 0x02    if resp[3] != 0:      raise MQTTException(resp[3])    return resp[2] & 1  def disconnect(self):    self.sock.write(b"/xe0/0")    self.sock.close()  def ping(self):    self.sock.write(b"/xc0/0")  def publish(self, topic, msg, retain=False, qos=0):    pkt = bytearray(b"/x30/0/0/0")    pkt[0] |= qos << 1 | retain    sz = 2 + len(topic) + len(msg)    if qos > 0:      sz += 2    assert sz < 2097152    i = 1    while sz > 0x7f:      pkt[i] = (sz & 0x7f) | 0x80      sz >>= 7      i += 1    pkt[i] = sz    #print(hex(len(pkt)), hexlify(pkt, ":"))    self.sock.write(pkt, i + 1)    self._send_str(topic)    if qos > 0:      self.pid += 1      pid = self.pid      struct.pack_into("!H", pkt, 0, pid)      self.sock.write(pkt, 2)    self.sock.write(msg)    if qos == 1:      while 1:        op = self.wait_msg()        if op == 0x40:          sz = self.sock.read(1)          assert sz == b"/x02"          rcv_pid = self.sock.read(2)          rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]          if pid == rcv_pid:            return    elif qos == 2:      assert 0  def subscribe(self, topic, qos=0):    assert self.cb is not None, "Subscribe callback is not set"    pkt = bytearray(b"/x82/0/0/0")    self.pid += 1    struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)    #print(hex(len(pkt)), hexlify(pkt, ":"))    self.sock.write(pkt)    self._send_str(topic)    self.sock.write(qos.to_bytes(1, "little"))    while 1:      op = self.wait_msg()      if op == 0x90:        resp = self.sock.read(4)        #print(resp)        assert resp[1] == pkt[2] and resp[2] == pkt[3]        if resp[3] == 0x80:          raise MQTTException(resp[3])        return  # Wait for a single incoming MQTT message and process it.  # Subscribed messages are delivered to a callback previously  # set by .set_callback() method. Other (internal) MQTT  # messages processed internally.  def wait_msg(self):    res = self.sock.read(1)    self.sock.setblocking(True)    if res is None:      return None    if res == b"":      raise OSError(-1)    if res == b"/xd0":  # PINGRESP      sz = self.sock.read(1)[0]      assert sz == 0      return None    op = res[0]    if op & 0xf0 != 0x30:      return op    sz = self._recv_len()    topic_len = self.sock.read(2)    topic_len = (topic_len[0] << 8) | topic_len[1]    topic = self.sock.read(topic_len)    sz -= topic_len + 2    if op & 6:      pid = self.sock.read(2)      pid = pid[0] << 8 | pid[1]      sz -= 2    msg = self.sock.read(sz)    self.cb(topic, msg)    if op & 6 == 2:      pkt = bytearray(b"/x40/x02/0/0")      struct.pack_into("!H", pkt, 2, pid)      self.sock.write(pkt)    elif op & 6 == 4:      assert 0  # Checks whether a pending message from server is available.  # If not, returns immediately with None. Otherwise, does  # the same processing as wait_msg.  def check_msg(self):    self.sock.setblocking(False)    return self.wait_msg()

index0.py:

import osimport time import networkfrom simple import MQTTClientimport machinefrom machine import Pin#wifissid="XXXXX"#WiFi名称passwd="XXXXXXXXX"#WiFi密码#mqttclient_id = "mricopython_"mserver = "XXXXX.com"#mqtt服务器IPport=1883M_subscribe = b"py"#设备订阅的主题M_subscribe_ota = M_subscribe + b"u"#设备订阅的OTA主题M_topic = M_subscribe + b"r"#设备推送消息的主题client = Nonewlan = Noneled=Pin(2, Pin.OUT)led2=Pin(5, Pin.OUT, value=1)def connectWiFi():  #连接WiFi  wlan = network.WLAN(network.STA_IF)  wlan.active(True)  if not wlan.isconnected():    print("connecting to network...")    wlan.connect(ssid, passwd)      while not wlan.isconnected():      pass  print("network config:", wlan.ifconfig())  connect_mqtt()#处理OTA版本信息def ota(txt):  file = open("base.ini","r")  base = file.read()  file.close()  if base == "1":    try:      f = open("index0.py","a")      f.write(txt)      f.close()          file = open("base.ini","w")      file.write("0")      file.close()      print("烧录用户0完毕,正在重启。。。")      machine.reset()     except:      print("烧录用户0错误")      pass  else:    try:          f = open("index1.py","a")      f.write(txt)      f.close()      file = open("base.ini","w")      file.write("1")      file.close()      print("烧录用户1完毕,正在重启。。。")      machine.reset()    except:      print("烧录用户1错误")      pass  #MQTT信息处理def sub_cb(topic_b, msg_b):  #转换格式后打印bytes.decode(),str.decode()  topic = bytes.decode(topic_b)  msg = bytes.decode(msg_b)  print(topic,":",msg)  # 判断MQTT信息作出相应动作  #string.spilt(" ")分组  if topic == bytes.decode(M_subscribe):    if msg=="1":      led.off()    if msg=="0":      led.on()  elif topic == bytes.decode(M_subscribe_ota):    ota(msg)    #连接MQTTdef connect_mqtt():   try:    t = str(time.ticks_us())    print("client:")    c = MQTTClient(client_id + t + str(time.ticks_us()),mserver)    c.set_callback(sub_cb)    c.connect()    c.subscribe(M_subscribe)#订阅信息主题    c.subscribe(M_subscribe_ota)#订阅OTA升级主题    print("连接MQTT成功,已订阅" + str(M_subscribe))    c.publish(M_topic,b"连接MQTT成功") #推送信息#主循环,程序在此做死循环    while True:      c.check_msg() #接收MQTT信息      #c.publish(M_topic,b"版本1") #推送信息  except:    print("重新连接MQTT")    connect_mqtt()if __name__ == "__main__":  connectWiFi()

思路是boot.py通过读取系统生成的base.ini里的内容判断需要加载用户0 “index0.py”或者是用户1  “index1.py”进行运行,升级代码通过mqtt传输到esp8266上,esp8266接受到自定义的OTA升级主题发送的代码后,通过判断base.ini生成新的index0.py或者index1.py,并删除旧的index.py。

        注意:升级代码通过mqtt传输时代码过长时需要进行分段发送,本代码仅演示未分段代码的升级过程。测试过程中发现MQTT.fx发送中文会乱码。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/121628.html

相关文章

  • 【基于Arduino IDE平台开发ESP8266连接巴法云】

    摘要:教程传送门基于平台开发连接巴法云简介实验准备硬件软件实验步骤点灯实验发送温湿度指令升级总结关于巴法云专注于开源,智造,创新,分享。 Arduino教程传送门????...

    不知名网友 评论0 收藏0
  • (超简单)ESP8266深度睡眠模式下远程采集温湿度信息

    摘要:超简单深度睡眠模式下远程采集温湿度信息项目背景相关技术深度睡眠模式温湿度采集数据收发前后端实现后端前端项目背景自己用收纳箱做了一个用于存放打印耗材的干燥箱,想用闲置的开发板和温湿度传感器做一个远程温湿度监测的小项目。 ...

    pkhope 评论0 收藏0
  • 01.ESP8266开发方式知多少

    摘要:开发方式是乐鑫为开发者提供的物联应开发平台,包括基础平台以及上层应开发示例,如智能灯智能开关等。指令开发方式作为芯片,指令开发也是必不可少的。开发方式即,意为运行在单片机上的。 ...

    sushi 评论0 收藏0
  • Arduino uno r3 使用 ESP8266 UART-WiFi 透传模块

    摘要:查询附近名称密码连接路由器的查看路由器分配给模组的地址例如设置单连接设置透传模式建立的服务器开始发送数据进入发送模式发送数据注意退出透传,直接发送。取消发送新行五参考模块指令入门指南透传简单使用模块指令汇总一、所需硬件材料 1.ESP8266:01s某宝上3、5块钱 2.杜邦线:某宝几块钱一组40P,这里只需要三根,用于连接 树莓派与继电器    3.烧录器 ...

    amuqiao 评论0 收藏0
  • 用Docker容器进行IoT开发

    摘要:大多数的硬件公司很难提供能够正常运行的。这个容器在共享。这将使很重要的数据能够非常容易的从输入到你的容器中。如果你想在容器内运行这个项目是我在时做的。希望爱特梅尔公司和德州仪器将来也使用。 随着Iot新的硬件平台和开发板的不断更新, SDK交付越来越多的转向零碎化以及按需组装。大多数的硬件公司很难提供能够正常运行的Software Development Kits (SDKs)。 Do...

    glumes 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<