物联网核心之MQTT移植

2019-07-21 02:06发布

本帖最后由 mzwhhwj 于 2017-9-11 22:57 编辑

   在上一篇文章中,只是讲了MQTT的主要内容,至于怎么移植到STM32上,怎么使用才是最重要的关键。这里使用的平台是RT8711WIFI SOC,使用的LWIPFreeRTOS,移植使用跟STM32+LWIP是没什么区别的。   先在Github上找到Eclipse的开源MQTT客户端程序https://github.com/eclipse/paho.mqtt.embedded-c.git,并把源码下载起来 1.jpg    解压源码,再进入MQTTPacket文件夹,里面有三个文件夹 1.jpg    把src里面的所有文件和samples下的transport.ctransport.h两个文件复制到工程目录下。这里我们主要的移植工作就在transport里面。打开transport.c文件,这个是MQTT连接,发送,接收的接口,源码是LinuxWindows平台,用的标准的Socket接口函数,我们这里的移植工作量很小,因为LWIP也是支持标准的Socket接口函数,只不过里面有些函数接口是LWIP不支持的,主要就是transport_open这个连接函数有区别。把原来的transport_open函数注释掉,重新写一个。[mw_shl_code=c,true]int transport_open(char* addr, int port)
{
        int* sock = &mysock;
        struct hostent *server;
    struct sockaddr_in serv_addr;
        static struct timeval tv;
        int timeout = 1000;
        fd_set readset;
        fd_set writeset;

    *sock = socket(AF_INET, SOCK_STREAM, 0);
    if(*sock < 0)
        DiagPrintf("[ERROR] Create socket failed ");
   
    server = gethostbyname(addr);
    if(server == NULL)
        DiagPrintf("[ERROR] Get host ip failed ");
   
    memset(&serv_addr,0,sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(port);
    memcpy(&serv_addr.sin_addr.s_addr,server->h_addr,server->h_length);
   
    if (connect(*sock,(struct sockaddr *)&serv_addr,sizeof(serv_addr)) < 0){
        DiagPrintf("[ERROR] connect failed ");
        return -1;
        }
        tv.tv_sec = 10;  /* 1 second Timeout */
        tv.tv_usec = 0;  
        setsockopt(mysock, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout,sizeof(timeout));
        return mysock;
}[/mw_shl_code]   到这里其实移植工作就已经完成了,就是这么的简单,剩下就是怎么使用MQTT。直接上代码,这里用FreeRTOS新建一个MQTT的任务。[mw_shl_code=c,true]#define HOST_NAME "m2m.eclipse.org"#define HOST_PORT 1883

void mqtt_thread( void *arg)
{
        MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
        MQTTString receivedTopic;
        int rc = 0;
        char buf[200];
        int buflen = sizeof(buf);
        int mysock = 0;
        MQTTString topicString = MQTTString_initializer;
        int payloadlen_in;
        unsigned char* payload_in;
        unsigned short msgid = 1;
        int subcount;
        int granted_qos =0;
        unsigned char sessionPresent, connack_rc;
        unsigned short submsgid;
        int len = 0;
        int req_qos = 1;
        unsigned char dup;
        int qos;
        unsigned char retained;

        char *host = "m2m.eclipse.org";
        int port = 1883;
        uint8_t  msgtypes = CONNECT;
        uint32_t curtick = xTaskGetTickCount();
        log_info("socket connect to server");
        mysock = transport_open(host,port);
        if(mysock < 0)
                return mysock;
        log_notice("Sending to hostname %s port %d ", host, port);
        data.clientID.cstring = "me";
        data.keepAliveInterval = 50;
        data.cleansession = 1;
        data.username.cstring = "";
        data.password.cstring = "";
        data.MQTTVersion = 4;
        while(1)
        {
                if((xTaskGetTickCount() - curtick) >(data.keepAliveInterval/2*1000))
                {
                        if(msgtypes == 0)
                        {
                                curtick = xTaskGetTickCount();
                                msgtypes = PINGREQ;
                        }
                }

                switch(msgtypes)
                {
                        case CONNECT:        len = MQTTSerialize_connect(buf, buflen, &data);        
                                                        rc = transport_sendPacketBuffer(mysock, (unsigned char*)buf, len);
                                                        if (rc == len)
                                                                log_info("send CONNECT Successfully");
                                                        else
                                                                log_debug("send CONNECT failed");               
                                                        log_info("MQTT concet to server!");
                                                        msgtypes = 0;
                                                        break;
                        case CONNACK:         if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0)
                                                        {
                                                                log_info("Unable to connect, return code %d ", connack_rc);
                                                        }
                                                        else log_info("MQTT is concet OK!");
                                                        msgtypes = SUBSCRIBE;
                                                        break;

                        case SUBSCRIBE:        topicString.cstring = "ledtest";
                                                        len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);
                                                        rc = transport_sendPacketBuffer(mysock, (unsigned char*)buf, len);
                                                        if (rc == len)
                                                                log_info("send SUBSCRIBE Successfully ");
                                                        else
                                                                log_debug("send SUBSCRIBE failed ");        
                                                        log_info("client subscribe:[%s]",topicString.cstring);
                                                        msgtypes = 0;
                                                        break;
                        case SUBACK:        rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen);                                                        
                                                        log_info("granted qos is %d ", granted_qos);                                                               
                                                        msgtypes = 0;
                                                        break;
                        case PUBLISH:        rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,        &payload_in, &payloadlen_in, buf, buflen);
                                                        log_info("message arrived : %s ", payload_in);
                                                        if(strstr(payload_in,"on"))
                                                        {
                                                                log_notice("LED on!!");
                                                        }
                                                        else if(strstr(payload_in,"off"))
                                                        {
                                                                log_notice("LED off!!");
                                                        }
                                                        if(qos == 1)
                                                        {
                                                                log_info("publish qos is 1,send publish ack.");
                                                                memset(buf,0,buflen);
                                                                len = MQTTSerialize_ack(buf,buflen,PUBACK,dup,msgid);   //publish ack                        
                                                                rc = transport_sendPacketBuffer(mysock, (unsigned char*)buf, len);
                                                                if (rc == len)
                                                                        log_info("send PUBACK Successfully");
                                                                else
                                                                        log_debug("send PUBACK failed");                                       
                                                        }
                                                        msgtypes = 0;
                                                        break;

                        case PUBACK:        log_info("PUBACK!");
                                                        msgtypes = 0;
                                                        break;
                        case PUBREC:        log_info("PUBREC!");     //just for qos2
                                                        break;
                        case PUBREL:        log_info("PUBREL!");        //just for qos2
                                                        break;
                        case PUBCOMP:        log_info("PUBCOMP!");        //just for qos2
                                                        break;
                        case PINGREQ:        len = MQTTSerialize_pingreq(buf, buflen);
                                                        rc = transport_sendPacketBuffer(mysock, (unsigned char*)buf, len);
                                                        if (rc == len)
                                                                log_info("send PINGREQ Successfully ");
                                                        else
                                                                log_debug("send PINGREQ failed ");        
                                                        log_info("time to ping mqtt server to take alive!");
                                                        msgtypes = 0;
                                                        break;
                        case PINGRESP:        log_info("mqtt server Pong");                                                        
                                                        msgtypes = 0;
                                                        break;
                }
                                memset(buf,0,buflen);
                                rc=MQTTPacket_read(buf, buflen, transport_getdata);        
                                if(rc >0)
                                {
                                        msgtypes = rc;
                                        log_info("MQTT is get recv:");
                                }
                                gpio_write(&gpio_led, !gpio_read(&gpio_led));
        }

exit:
        transport_close(mysock);
    log_info("mqtt thread exit.");
    vTaskDelete(NULL);
}[/mw_shl_code]
       MQTT服务器仍然是上篇文章用的m2m.eclipse.org,一开始就是调用 前面移植的transport_open(host,port)去连接MQTT服务器,并返回套接字。然后就是前面说的登录、订阅、发布、心跳等操作,这里在While里用状态来实现整个连接。首先就是CONNECT登录,MQTTPacket_connectData data = MQTTPacket_connectData_initializer初始化登录数据结构体,然后再对data进行初始化,data.clientID.cstring = "me";data.keepAliveInterval = 50;data.cleansession = 1;data.username.cstring = "";data.password.cstring = "";data.MQTTVersion = 4;这里表示cilentIDme,心跳时间为50,用户名跟密码都为空,结构初始化后,就是要对数据进去打包,调用MQTTSerialize_connect函数。打包后就是调用transport_sendPacketBuffer发送数据包。数据发送完后是调用MQTTPacket_read去接收服务器返回来的数据,并得到返回的数据包类型,根据数据包类型进行不同的逻辑处理。发送CONNECT后服务器对相应的返回CONNACK数据包表示登录成功。 登录成功后,开始订阅我们想要Topics,这里订阅一个”ledtest”Tipics。数据包的初始化、打包、发送跟登录是相似的,就不详述,直接看代码就可以了,服务器会相应的返回SUBACK    接下来是心跳,通过获取系统的Tick来判断是否要发PINGREQ,如果的话,让状态机状态为PINGREQ后去发送心跳包,相应服务器会返回PINGRESP    最后是PUBLISH,这是MQTT的最主要的通信协议,这里我只实现的客户端接收PUBLISH,其它的使用,其实都可以在源码的MQTTPacket里面sample可以找到例程。PUBLISH实现也很简单,定阅了Topics之后,只要其它客户端向这个Topics推送数据,服务器就会转发到订阅者上。MQTTPacket_read接收到服务器的推送,解析成PUBLISH数据包,然后状态机改变状态到PUBLISH再调用MQTTDeserialize_publish进行解包,得到推送的内容,最后根据推送的内容执行相应的动作,就实现远程控制。     整个代码的效果如下: 2.jpg     可以看到MQTT的登录,订阅,还有在PC上通过MQTT.fx推送信息给ledtestRT8711上收到推送的内容,并执行打开LED的动作。    也许有人会问,如果STM32或者其它单片机是用WIFI模块或者GPRS模块,没有LWIP的怎么办。其实只要理解的MQTT的源码,就不难用GPRS或者WiFi模块去实现。MQTT的源码里都是对协议包进行打包解包,数据传输都是在tranport.c里面,我们完全不用transport,可以自己写通信接口,然后把打包的数据包通过模块发出去,写接收接口,把模块接收到服务器数据调用MQTT解包接口解析就可以了。
友情提示: 此问题已得到解决,问题已经关闭,关闭后问题禁止继续编辑,回答。