作者:柯大
為了方便樹莓派(Raspbery Pi)、工業型嵌入式電腦、嵌入式物聯網開發板及各型個人電腦或工業電腦(IPC),可以直接利用USB連接LPWAN通訊模組,柯大創客屋特別為社群好友分享了一款以MTK MT2625設計的USB Dongle 可以讓您快速利用樹莓派Python 語言或x86 Base 的IPC以 .NET架構連接USB NB-IoT存取雲端資訊。
這款NB-IoT USB Dongle 採用Quectel BC26/BC66模組 (內建MTK MT2625 NB-IoT晶片),外觀如下,可直接與樹莓派、電腦連接或透過USB OTG 線與手機連接。

(圖片來源:柯清長)
BC26/BC66內建國產MTK MT2625 NB-IoT晶片,完整功能規格如下:
1.支援全球NB-IoT頻段:
完整支援B1/B2/B3/B4/B5/B8/B12/B13/B17/B18/B19/B20/B25/B26/B28/B66等頻段。
2.通訊傳輸速率:
• Single-Tone : 下行: 25.5kbps ;上行: 16.7kbps
• Multi-Tone : 下行: 25.5kbps ;上行: 62.5kbps
3.支援訊協定:
UDP/TCP/CoAP/LwM2M/MQTT/*PPP/*SNTP/*TLS/*DTLS/*HTTP/*HTTPS/*FTP。
(註*: 陸續推出支援協定)
4.支援AT 指令集:3GPP Rel. 13 /14以及 Quectel 增强型 AT 命令。
5.支援序列埠(UART)數:
3組(信訊電位為1.8V),序列埠傳輸速率均支援 4800bps、9600bps、 115200bps、 230400bps、460800bps 和 921600bps(預設為115200bps)。
6.BC26/BC66可支援內置eSIM卡:可搭配設計eSIM晶片來取代傳統SIM卡。
7.供電特性:低供電電壓範圍 (2.1V~3.63V),適合鋰錳、鋰亞電池直接供電。
8.工作模式:
- Active:模組處於喚醒狀態;所有功能正常可用,可以進行資料發送和接收,模組 在此模式下可切換到 Idle 模式或 PSM 模式。
- Idle :模組處於輕休眠狀態,網路處於 DRX/eDRX 狀態,可接收尋呼消息,模 塊在此模式下可切換至 Active 或 PSM 模式。
- PSM:模組處於深睡眠狀態,內部只有 RTC 工作,網路處於非連接狀態,模組 此模式下可切換至 Active 模式。
9.支援Firmware軟體更新方式:UART、DFOTA。
10.BC26/BC66(MTK MT2625) USB Dongle方便快速開發創新應用:
- 開發者可利用USB Dongle直接與工業電腦、一般PC、具USB插頭的嵌入式單板電腦(如Raspbbery Pi)、各種手機(利用USB OTG接頭連接)。
- 可以以OpenCPU SDK開發應用,省下外接MCU之成本及所增加之電力,並獨立運作。
- 支援MS .Net 平台開發、VB、VC、JAVA、C++、Python…。
- 功能界定:Maker、新創團隊快速開發NB-IoT以USB為應用產品快速搭配原產品軟體(APP)出貨,如:智慧工業4.0、智慧醫療(IoT血糖計、血壓計)。

(圖片來源:柯清長)
為了讓大家快速入門NB-IoT,在這邊示範以樹莓派 + NB-IoT USB Dongle 以Python 開發MQTT 應用,將資料以MQTT Publish Data 到MQTTBox subscribe。
MQTT Broker: broker.hivemq.com ,Port :1883
Publish LED Topic:”NB/BC26/USER99/LED”
Publish TEMP Topic:”NB/BC26/USER99/TEMP”
Publish HUMI Topic:”NB/BC26/USER99/HUMI”
BC26/BC66 MQTT AT command 使用及測試:
AT+QMTOPEN=0,”broker.hivemq.com”,1883
AT+QMTCONN=0,”BC26-Client1”
AT+QMTPUB=0,0,0,0,”NB/BC26/USER99/LED””,”1″
AT+QMTPUB=0,0,0,0,”NB/BC26/USER99/TEMP”,”23.10″
AT+QMTPUB=0,0,0,0,”NB/BC26/USER99/HUMI”,”61.10″
AT+QMTCLOSE=0

(圖片來源:柯清長)

(圖片來源:柯清長)

(圖片來源:柯清長)

(圖片來源:柯清長)
範例程式原始碼:
import time import serial import sys import serial.tools.list_ports # global variables TIMEOUT = 3 # seconds #################################### # Main program ser = serial.Serial() port_list = list(serial.tools.list_ports.comports()) print(port_list) if len(port_list) == 0: print("NB-IoT Dongle 未安裝!") else: for i in range(0,len(port_list)): print(port_list[i]) nb= NBIoT() #define NBIoT Class #Loop try: nb.sendATComm("ATE1","OK\r\n") nb. getATI() nb.getIMEI() time.sleep(0.5) nb.getFirmwareInfo() time.sleep(0.5) nb.getHardwareInfo() time.sleep(0.5) nb.getIPAddress() time.sleep(0.5) nb.getSignalQuality() time.sleep(0.5) nb.closeallMQTT() nb.openMQTT("broker.hivemq.com",1883) time.sleep(0.5) nb.connectMQTT("MQTTclient1") time.sleep(0.5) nb.publishMQTT("NB/BC26/USER99/LED","1") time.sleep(0.5) nb.publishMQTT("NB/BC26/USER99/TEMP","21.20") time.sleep(0.5) nb.publishMQTT("NB/BC26/USER99/HUMI","52.20") time.sleep(0.5) nb.publishMQTT("NB/BC26/USER99/LED","0") time.sleep(0.5) nb.publishMQTT("NB/BC26/USER99/TEMP","22.30") time.sleep(0.5) nb.publishMQTT("NB/BC26/USER99/HUMI","53.40") time.sleep(0.5) nb.publishMQTT("NB/BC26/USER99/LED","1") time.sleep(0.5) nb.publishMQTT("NB/BC26/USER99/TEMP","23.40") time.sleep(0.5) nb.publishMQTT("NB/BC26/USER99/HUMI","54.50") time.sleep(0.5) nb.publishMQTT("NB/BC26/USER99/LED","0") time.sleep(0.5) nb.publishMQTT("NB/BC26/USER99/TEMP","24.50") time.sleep(0.5) nb.publishMQTT("NB/BC26/USER99/HUMI","55.60") time.sleep(0.5) nb.closeMQTT() except KeyboardInterrupt: print('AT+QMTCLOSE=0') nb.closeallMQTT() # 關閉MQTT 連線 sleep(3) ser.close() # 清除序列通訊物件 print('結束 MQTT 測試!') # function for printing debug message def debug_print(message): print(message) # function for getting time as miliseconds def millis(): return int(time.time()) # function for delay as miliseconds def delay(ms): time.sleep(float(ms/1000.0)) #################################### ### NB IoT Shield Class ################ #################################### class NBIoT: board = "" # Shield name ip_address = "" # ip address domain_name = "" # domain name port_number = "" # port number timeout = TIMEOUT # default timeout for function and methods on this library. compose = "" response = "" # Default Initializer def __init__(self, serial_port=“/dev/ttyAMA0”, serial_baudrate=115200, board="BC-26"): self.board = board ser.port = serial_port ser.baudrate = serial_baudrate ser.parity=serial.PARITY_NONE ser.stopbits=serial.STOPBITS_ONE ser.bytesize=serial.EIGHTBITS debug_print(self.board + " Class initialized!") # Function for clearing compose variable def clear_compose(self): self.compose = "" # Function for getting modem response def getResponse(self, desired_response): if (ser.isOpen() == False): ser.open() while 1: self.response ="" while(ser.inWaiting()): self.response += ser.read(ser.inWaiting()).decode('utf-8', errors='ignore') if(self.response.find(desired_response) != -1): debug_print(self.response) break # Function for sending at comamand to module def sendATCommOnce(self, command): if (ser.isOpen() == False): ser.open() self.compose = "" self.compose = str(command) + "\r" ser.reset_input_buffer() ser.write(self.compose.encode()) debug_print(self.compose) # Function for sending at command to BC26_AT. def sendATComm(self, command, desired_response, timeout = None): if timeout is None: timeout = self.timeout self.sendATCommOnce(command) f_debug = False timer = millis() while 1: if( millis() - timer > timeout): self.sendATCommOnce(command) timer = millis() f_debug = False self.response ="" while(ser.inWaiting()): try: self.response += ser.read(ser.inWaiting()).decode('utf-8', errors='ignore') delay(100) except Exception as e: debug_print(e.Message) # debug_print(self.response) if(self.response.find(desired_response) != -1): debug_print(self.response) break # Function for saving conf. and reset BC26_AT module def resetModule(self): self.saveConfigurations() delay(200) self.sendATComm("AT+QRST=1","") # Function for save configurations tShield be done in current session. def saveConfigurations(self): self.sendATComm("AT&W","OK\r\n") # Function for getting ATI def getATI(self): return self.sendATComm("ATI","OK\r\n") # Function for getting IMEI number def getIMEI(self): return self.sendATComm("AT+CGSN=1","OK\r\n") # Function for getting firmware info def getFirmwareInfo(self): return self.sendATComm("AT+CGMR","OK\r\n") # Function for getting hardware info def getHardwareInfo(self): return self.sendATComm("AT+CGMM","OK\r\n") # Function for getting self.ip_address def getIPAddress(self): return self.sendATComm("AT+CGPADDR","OK\r\n") #****************************************************************************************** #*** Network Service Functions ************************************************************ #****************************************************************************************** # Function for getting signal quality def getSignalQuality(self): return self.sendATComm("AT+CSQ","OK\r\n") #Function for Query NBIoT Band #def getBand(self): #return self.sendATComm("AT+QBAND?","OK\r\n") #************************************************************************************* #***MQTT Protocols Functions ****BC-26/BC-66****************************** # Function for create MQTT broker server connection #AT+QMTOPEN=0,"broker.hivemq.com",1883 => +QMTOPEN: 0,0 # ************************************************************************************* def openMQTT(self,mqttserver="broker.hivemq.com",port_number=1883): self.compose = "AT+QMTOPEN=0,\"" self.compose += str(mqttserver) self.compose += "\"," self.compose += str(port_number) self.sendATComm(self.compose,"+QMTOPEN: 0,0") self.clear_compose() # Function fo connect MQTT broker server connection #AT+QMTCONN=0,“LASSclient” => +QMTCONN: 0,0,0 # ************************************************************************************* def connectMQTT(self,clientName): self.compose = "AT+QMTCONN=0,\"" self.compose += str(clientName) self.compose += "\"" self.sendATComm(self.compose,"+QMTCONN: 0,0,0") self.clear_compose() # Function fo Punlish MQTT Topics payload data #AT+QMTPUB=0,0,0,0,"NB/BC26/USER99/TEMP","23.10" => +QMTPUB: 0,0,0 def publishMQTT(self,topic,payload): self.compose = "AT+QMTPUB=0,0,0,0,\"" self.compose += str(topic) self.compose += "\",\"" self.compose += str(payload) self.compose += "\"" self.sendATComm(self.compose,"+QMTPUB: 0,0,0") self.clear_compose() # Function for closing server connection def closeMQTT(self): self.sendATComm("AT+QMTCLOSE=0","+QMTCLOSE: 0,0") def closeallMQTT(self): self.sendATComm("AT+QMTCLOSE=0","")
MQTTBox 接收結果如下圖:

(圖片來源:柯清長)
(責任編輯:王姵文)
- 【開箱評測】MTDuino IOT開發板: MQTT功能實作介紹 - 2023/06/26
- 【開箱評測】MTDuino IOT開發板:NB-IoT及GPS功能介紹 - 2023/05/07
- 用NB-IoT USB Dongle實現MQTT,加快IoT應用開發 - 2019/12/18
訂閱MakerPRO知識充電報
與40000位開發者一同掌握科技創新的技術資訊!