from machine import UART, Pin import time import _thread class APScaned: #模块基本属性 def __init__(self, ssid, mac, ch, rssi, security): self.ssid=ssid self.mac=mac self.ch=ch self.rssi=rssi self.security=security def hasKey(self): #检查是否有安全密钥 if self.security==None or self.security.find("NONE")>=0: return False else: return True def toString(self): #变成字符串,便于后续输出 s= f"{self.ch},{self.ssid},{self.mac},{self.rssi},{self.hasKey()}" return s class HFWifi: def __init__(self): #super().__init__() self.uart= UART(1, baudrate=115200, bits=8, parity=None, stop=1, tx=Pin(24), rx=Pin(25), rxbuf=1400, timeout=300) self.isCmd= False self.cmdRpl= "" self.listener=None self.lock= _thread.allocate_lock() self.lock.acquire() _thread.start_new_thread(self._uart_read_thread,()) self.doReset() ''' 不要在listener回调函数中调用AT+CMD发送,否则会阻塞串口接收线程 args: listener: function(event: str, data: bytes) ''' def setListener(self, listener): #设置事件监听器 self.listener= listener def _didRecvEvent(self, event, data): #用于触发事件监听器 if self.listener!=None: self.listener(event, data) def _checkCmd(self, rd: bytes): #用于检查命令回复 remain=None #print(rd) s= rd.decode('utf-8') if s!=None and len(s)>0: #print("checkCmd:"+s) idx= s.find("+ok") if idx>=0 and self.cmdRpl=="": self.cmdRpl= s[idx:] if idx>0: remain=rd[0:idx] elif idx==-1: self.cmdRpl=s elif self.cmdRpl!="": self.cmdRpl+=s else: return s idx= self.cmdRpl.rfind("\r\n\r") if idx>=0: self.isCmd=False self.cmdRpl= self.cmdRpl[0:idx] remain=rd[idx+4:] self.lock.release() return remain def _checkEvent(self, rd: bytes): #根据接收到的数据判断是否发生了特定事件 s= rd.decode('utf-8') if s!=None: print("RECV:"+s) idx= s.find("+EVENT=") if s.find("HF-LPT270")>=0: self._didRecvEvent("BOOTUP HF-LPT270", None) return None elif idx>=0: idx2= s.find("\r\n", idx) if idx2>idx: btmp=rd[0:idx]+rd[idx2+2:] rd=btmp sevent=s[idx:idx2] print("sevent:"+sevent) if sevent.find("SOCKA_ON")>=0: self._didRecvEvent("SOCKA_ON", None) elif sevent.find("SOCKA_OFF")>=0: self._didRecvEvent("SOCKA_OFF", None) elif sevent.find("SOCKB_ON")>=0: self._didRecvEvent("SOCKB_ON", None) elif sevent.find("SOCKB_OFF")>=0: self._didRecvEvent("SOCKB_OFF", None) elif sevent.find("CON_ON")>=0: self._didRecvEvent("CON_ON", None) elif sevent.find("CON_OFF")>=0: self._didRecvEvent("CON_OFF", None) elif sevent.find("DHCP_OK")>=0: self._didRecvEvent("DHCP_OK", None) return rd def _uart_read_thread(self): #串口读取线程方法,用于循环读取串口数据并处理 while True: rxData= self.uart.read(1400) if rxData==None: continue if self.isCmd==True: if len(rxData)>0: rxData=self._checkCmd(rxData) while rxData!=None and len(rxData)>0: btmp= self._checkEvent(rxData) if btmp!=None and len(btmp)==len(rxData): break else: rxData=btmp if rxData!=None and len(rxData)>0: self._didRecvEvent("RECV", rxData) def _tryInt(self, s): #将字符串转换为整数 try: d=int(s) return d except ValueError: return None def _sendCmd(self, cmd, wait=True, timeout=3): #发送命令并等待回复 self.cmdRpl= "" self.isCmd= wait self.uart.write(cmd) if wait: self.lock.acquire(timeout) if self.isCmd==True: self.isCmd=False return "timeout" else: s= self.cmdRpl return s else: return "无响应" def doReset(self): #重置 Pin(23, Pin.OUT).value(1) time.sleep(1.5) def doSend(self, txData): #将数据发送至串口 self.uart.write(txData) def doConnectWifi(self, ssid, key): #连接 WiFi cmd= "HFAT+WSSSID="+ssid+"\n" bcmd= bytes(cmd, "utf-8") print(f"CMD:{bcmd}") s= self.AT_CMD(bcmd) print(f"RPL:{s}") cmd= "HFAT+WSKEY="+key+"\n" bcmd= bytes(cmd, "utf-8") print(f"CMD:{bcmd}") s= self.AT_CMD(bcmd) print(f"RPL:{s}") s= self.AT_CMD(b"HFAT+Z\n") print(f"RPL:{s}") def doTcpConnect(self, saddr, sport, wait=False): #TCP 连接方法 cmd="HFAT+NETP=TCP,CLIENT,"+str(sport)+","+saddr+"\n" bcmd=bytes(cmd, "utf-8") print(f"CMD:{bcmd}") s= self.AT_CMD(bcmd, wait=wait) print(f"RPL:{s}") def doUdpStart(self, raddr, rport, lport): pass def doTcpServerStart(self, lport): pass def doTcpReset(self): #串口重置回AP初始态/初始化 cmd= "HFAT+WMODE=AP\n" bcmd= bytes(cmd, "utf-8") print(f"CMD:{bcmd}") s= self.AT_CMD(bcmd) print(f"RPL:{s}") cmd="HFAT+NETP=TCP,Server,8899,10.10.100.254\n" bcmd=bytes(cmd,"utf-8") print(f"CMD:{bcmd}") s= self.AT_CMD(bcmd) print(f"RPL:{s}") def doMQTT(self, port, server,user,password): cmd="HFAT+NETP=MQTT,"+port+","+server+"\n" bcmd= bytes(cmd, "utf-8") print(f"CMD:{cmd}") s= self.AT_CMD(bcmd) print(f"RPL:{s}") print("CMD:HFAT+MQTOPIC=%MAC/up,%MAC/down") s= self.AT_CMD(b"HFAT+MQTOPIC=%MAC/up,%MAC/down\n") print(f"RPL:{s}") cmd="HFAT+MQLOGIN="+user+","+password+"\n" bcmd= bytes(cmd, "utf-8") print(f"CMD:{cmd}") s= self.AT_CMD(bcmd) print(f"RPL:{s}") print("CMD:HFAT+MQID=%MAC") s= self.AT_CMD(b"HFAT+MQID=%MAC\n") print(f"RPL:{s}") s= self.AT_CMD(b"HFAT+Z\n") print(f"RPL:{s}") ''' head: 数据类型Disc, 默认为None时,函数内自动补上默认的head def doHttpGet(self, url, head=None, content): pass def doHttpPost(self, url, head=None, content): pass wait: if wait=False, 函数会不等回复,马上返回。而且AT+CMD的回复会当作RECV数据在listener中收到 ''' def AT_CMD(self, bcmd, wait=True, timeout=2): #执行AT命令 s= self._sendCmd(bcmd, wait=wait, timeout=timeout) #print(f"RPL:{s}") return s def AT_WSCAN(self): #执行WiFi 扫描操作 s= self._sendCmd(b"HFAT+WSCAN\n", timeout=8) if s==None or s=="": return None #print(s) lines= s.split("\n") print(f"{len(lines)} lines") apScanedList= [] for line in lines: items= line.split(",") if items!=None and len(items)==5: ch= self._tryInt(items[0]) if ch!=None: ssid=items[1] mac=items[2] security=items[3] rssi= int(items[4]) apScanedList.append(APScaned(ssid,mac,ch,rssi,security)) return apScanedList