/* Project: MQtt Client demo program without X-Server(connect to two micro brokers) Compiler: BC++ 3.1, Turbo C++ 1.01(3.01) (free from http://cc.codegear.com/free/cpp) Compile mode: large Project: MQttX_2B.C ..\..\lib\7186el.lib ..\..\lib\tcp_dm32.lib ..\..\lib\MQtt_X.lib" Details: This demo shows how to use MQtt_X library in 7186(connect to two micro brokers). Step 1: Initiate the controller. Step 2: Initiate the Ethernet adapter. Step 3-1: Initiate MQtt client 1. Step 3-2: Initiate MQtt client 2. Step 4-1: Connect MQtt client to miro broker 1(IP:192.168.1.91). Step 4-2: Connect MQtt client to miro broker 2(IP:192.168.1.94). Step 5-1: Subscribe Topic to micro broker 1. Step 5-2: Subscribe Topic to micro broker 2. Step 6: It is a loop function which receive data published from other MQtt clients and publish its data to other MQtt clients via micro broker 1 and micro broker 2. [Dec 14, 2009] by Wilson */ #include #include "..\lib\7186e.h" #include "..\lib\Tcpip32.h" #include "..\lib\MQtt_X.h" int main(void) { int i=0, iRet=0, iLength=0; unsigned long lStart_TimeTick; PUBPARMS pubParms_1; SUBPARMS subParms_1; PUBPARMS pubParms_2; SUBPARMS subParms_2; MQISDPTI *pSendTaskParms; MQISDPTI *pRcvTaskParms; MQISDPTI *pApiTaskParms; char SDataBuf[1024]; long dataLength = 0; long topicLength = 0; char STopic[100]; char SData[100]; char *pBuffer=NULL; char PDataBuf[32]; int statusCode = 0; unsigned int iCntState_1 =0; unsigned int iPubState_1 =0; unsigned int iCntState_2 = 0; unsigned int iPubState_2 = 0; //Step1. Initiate the controller. InitLib(); InstallCom1(115200, 8, 0, 1); //Step2. Initiate the Ethernet adapter. iRet=Ethernet_Init(); if(iRet==NoError) printCom1("Inint Ethernet ok.\n\r"); else printCom1("Inint Ethernet error.\n\r"); //Step3-1. Initiate MQtt client 1. iRet = MQtt_Init(0); if(iRet!=0) { // Initial MQtt_X library error. printCom1("Initial MQtt_X library error.\n\r"); } else { // Initial MQtt_X library ok. printCom1("Initial MQtt_X library OK.\n\r"); } //Step3-2. Initiate MQtt client 2. iRet = MQtt_Init(1); if(iRet!=0) { // Initial MQtt_X library error. printCom1("Initial MQtt_X library error.\n\r"); } else { // Initial MQtt_X library ok. printCom1("Initial MQtt_X library OK.\n\r"); } //Step4-1. Connect MQtt client to miro broker 1(IP:192.168.1.91). pubParms_1.port = 1883; subParms_1.port = 1888; sprintf(pubParms_1.pBroker,"%s","192.168.1.91"); sprintf(subParms_1.pBroker,"%s","192.168.1.91"); pubParms_1.qos = 1; subParms_1.qos = 1; pubParms_1.retain = 0; pubParms_1.debug = 0; subParms_1.debug = 0; //Last will settings sprintf(pubParms_1.lwtTopic,"%s","uPAC1/system/alive"); sprintf(subParms_1.lwtTopic,"%s","uPAC1/system/alive"); pubParms_1.lwtQos = 1; subParms_1.lwtQos = 1; pubParms_1.lwtRetain = 0; subParms_1.lwtRetain = 0; sprintf(pubParms_1.lwtData,"%s","help"); sprintf(subParms_1.lwtData,"%s","help"); pubParms_1.dataArg = 0; subParms_1.dataArg = 0; pubParms_1.lastSentMsg = MQISDP_INV_MSG_HANDLE; subParms_1.lastSentMsg = MQISDP_INV_MSG_HANDLE; subParms_1.timeout = 10; sprintf(pubParms_1.clientId,"%s","uPAC1"); iRet = MQtt_MQIsdpConnect(0, &pubParms_1, pApiTaskParms, 2, 5, 1); if(iRet!=0) { // MQtt_MQIsdpConnect error. printCom1("MQtt_MQIsdpConnect 1 error.\n\r"); } else { // MQtt_MQIsdpConnect OK. printCom1("MQtt_MQIsdpConnect 1 OK.\n\r"); } //Step4-2. Connect MQtt client to miro broker 2(IP:192.168.1.94). pubParms_2.port = 1883; subParms_2.port = 1888; sprintf(pubParms_2.pBroker,"%s","192.168.1.94"); sprintf(subParms_2.pBroker,"%s","192.168.1.94"); pubParms_2.qos = 1; subParms_2.qos = 1; pubParms_2.retain = 0; pubParms_2.debug = 0; subParms_2.debug = 0; //Last will settings sprintf(pubParms_2.lwtTopic,"%s","uPAC1/system/alive"); sprintf(subParms_2.lwtTopic,"%s","uPAC1/system/alive"); pubParms_2.lwtQos = 1; subParms_2.lwtQos = 1; pubParms_2.lwtRetain = 0; subParms_2.lwtRetain = 0; sprintf(pubParms_2.lwtData,"%s","help"); sprintf(subParms_2.lwtData,"%s","help"); pubParms_2.dataArg = 0; subParms_2.dataArg = 0; pubParms_2.lastSentMsg = MQISDP_INV_MSG_HANDLE; subParms_2.lastSentMsg = MQISDP_INV_MSG_HANDLE; subParms_2.timeout = 10; sprintf(pubParms_2.clientId,"%s","uPAC1"); iRet = MQtt_MQIsdpConnect(1, &pubParms_2, pApiTaskParms, 2, 5, 1); if(iRet!=0) { // MQtt_MQIsdpConnect error. printCom1("MQtt_MQIsdpConnect 2 error.\n\r"); } else { // MQtt_MQIsdpConnect OK. printCom1("MQtt_MQIsdpConnect 2 OK.\n\r"); } //Step5-1. Subscribe Topic to micro broker 1. subParms_1.hConn = pubParms_1.hConn; subParms_1.qos = 1; subParms_1.timeout = 50; //Sub topic 1 sprintf(subParms_1.topic,"%s","uPAC101/XBoard/DO/ch0"); iRet = MQtt_MQIsdpSubscribe(0, &subParms_1 ); if(iRet!=0) { // MQtt_MQIsdpSubscribe 1 error. printCom1("MQtt_MQIsdpSubscribe 1 error.\n\r"); } else { // MQtt_MQIsdpSubscribe 1 OK. printCom1("MQtt_MQIsdpSubscribe 1 OK.\n\r"); } //Sub topic 2 sprintf(subParms_1.topic,"%s","uPAC101/XBoard/DO/ch1"); iRet = MQtt_MQIsdpSubscribe(0, &subParms_1 ); if(iRet!=0) { // MQtt_MQIsdpSubscribe 2 error. printCom1("MQtt_MQIsdpSubscribe 2 error.\n\r"); } else { // MQtt_MQIsdpSubscribe 2 OK. printCom1("MQtt_MQIsdpSubscribe 2 OK.\n\r"); } //Sub topic 3 sprintf(subParms_1.topic,"%s","uPAC101/XBoard/AO/ch0"); iRet = MQtt_MQIsdpSubscribe(0, &subParms_1 ); if(iRet!=0) { // MQtt_MQIsdpSubscribe 3 error. printCom1("MQtt_MQIsdpSubscribe 3 error.\n\r"); } else { // MQtt_MQIsdpSubscribe 3 OK. printCom1("MQtt_MQIsdpSubscribe 3 OK.\n\r"); } //Sub topic 4 sprintf(subParms_1.topic,"%s","uPAC101/XBoard/AO/ch1"); iRet = MQtt_MQIsdpSubscribe(0, &subParms_1 ); if(iRet!=0) { // MQtt_MQIsdpSubscribe 4 error. printCom1("MQtt_MQIsdpSubscribe 4 error.\n\r"); } else { // MQtt_MQIsdpSubscribe 4 OK. printCom1("MQtt_MQIsdpSubscribe 4 OK.\n\r"); } //Step5-2. Subscribe Topic to micro broker 2. subParms_2.hConn = pubParms_2.hConn; subParms_2.qos = 1; subParms_2.timeout = 50; //Sub topic 1 sprintf(subParms_2.topic,"%s","uPAC101/XBoard/DO/ch0"); iRet = MQtt_MQIsdpSubscribe(1, &subParms_2 ); if(iRet!=0) { // MQtt_MQIsdpSubscribe 1 error. printCom1("MQtt_MQIsdpSubscribe 1 error.\n\r"); } else { // MQtt_MQIsdpSubscribe 1 OK. printCom1("MQtt_MQIsdpSubscribe 1 OK.\n\r"); } //Sub topic 2 sprintf(subParms_2.topic,"%s","uPAC101/XBoard/DO/ch1"); iRet = MQtt_MQIsdpSubscribe(1, &subParms_2 ); if(iRet!=0) { // MQtt_MQIsdpSubscribe 2 error. printCom1("MQtt_MQIsdpSubscribe 2 error.\n\r"); } else { // MQtt_MQIsdpSubscribe 2 OK. printCom1("MQtt_MQIsdpSubscribe 2 OK.\n\r"); } //Sub topic 3 sprintf(subParms_2.topic,"%s","uPAC101/XBoard/AO/ch0"); iRet = MQtt_MQIsdpSubscribe(1, &subParms_2 ); if(iRet!=0) { // MQtt_MQIsdpSubscribe 3 error. printCom1("MQtt_MQIsdpSubscribe 3 error.\n\r"); } else { // MQtt_MQIsdpSubscribe 3 OK. printCom1("MQtt_MQIsdpSubscribe 3 OK.\n\r"); } //Sub topic 4 sprintf(subParms_2.topic,"%s","uPAC101/XBoard/AO/ch1"); iRet = MQtt_MQIsdpSubscribe(1, &subParms_2 ); if(iRet!=0) { // MQtt_MQIsdpSubscribe 4 error. printCom1("MQtt_MQIsdpSubscribe 4 error.\n\r"); } else { // MQtt_MQIsdpSubscribe 4 OK. printCom1("MQtt_MQIsdpSubscribe 4 OK.\n\r"); } //Step6. Begin the loop function for(;;) { if((GetTimeTicks()-lStart_TimeTick)>50) { lStart_TimeTick=GetTimeTicks(); //Step6.1 Recieve Publish from micro broker 1 iRet = MQtt_MQIsdpRcvPub(0, &subParms_1, SDataBuf, &topicLength , &dataLength); switch ( iRet ) { case MQISDP_OK: strncpy(STopic,SDataBuf,topicLength); STopic[topicLength] = '\0'; strcpy(SData,SDataBuf+topicLength); printCom1("1 Topic:%s Data:%s\n\r",STopic,SData); break; case MQISDP_NO_PUBS_AVAILABLE: // Timed out // printCom1("MQISDP_NO_PUBS_AVAILABLE\n\r"); break; default: // Some other problem printCom1("1 Some other problem\n\r"); break; } //Step6.2 Publish to micro broker 1 pubParms_1.qos = 1; pubParms_1.retain = 0 ? 1: 0; iPubState_1 = iPubState_1 % 4; switch(iPubState_1) { case 0: //DI = 0 sprintf(PDataBuf,"%s","0"); sprintf(pubParms_1.topic,"%s","uPAC1/XBoard/DI/ch0"); break; case 1: //DI = 1 sprintf(PDataBuf,"%s","1"); sprintf(pubParms_1.topic,"%s","uPAC1/XBoard/DI/ch0"); break; case 2: //AI = 3.5 sprintf(PDataBuf,"%s","3.5"); sprintf(pubParms_1.topic,"%s","uPAC1/XBoard/AI/ch0"); break; case 3: //AI = -3.5 sprintf(PDataBuf,"%s","-3.5"); sprintf(pubParms_1.topic,"%s","uPAC1/XBoard/AI/ch0"); break; } pBuffer = PDataBuf; iRet = MQtt_MQIsdpPublish(0, &pubParms_1, pBuffer, strlen(pBuffer)); if(iRet!=0) { // MQtt_MQIsdpPublish error. printCom1("1 MQtt_MQIsdpPublish %d error.\n\r",iPubState_1); } else { // MQtt_MQIsdpPublish OK. // printCom1("MQtt_MQIsdpPublish %d OK.\n\r",iPubState); } iPubState_1++; if(iPubState_1 >= 10000) iPubState_1 = 0; statusCode = 0; if ( pubParms_1.lastSentMsg != MQISDP_INV_MSG_HANDLE) { while ( statusCode != MQISDP_DELIVERED && (iCntState_1 < 50)) { iCntState_1++; statusCode = MQtt_MQIsdp_getMsgStatus(0, pubParms_1.hConn, pubParms_1.lastSentMsg ); if(statusCode == MQISDP_DELIVERED) { printCom1("1 statusCode = MQISDP_DELIVERED\n\r"); } else { printCom1("1 statusCode = %d(NOT MQISDP_DELIVERED)\n\r",statusCode); } } } iCntState_1 = 0; //Step6.3 Recieve Publish from micro broker 2 iRet = MQtt_MQIsdpRcvPub(1, &subParms_2, SDataBuf, &topicLength , &dataLength); switch ( iRet ) { case MQISDP_OK: strncpy(STopic,SDataBuf,topicLength); STopic[topicLength] = '\0'; strcpy(SData,SDataBuf+topicLength); printCom1("2 Topic:%s Data:%s\n\r",STopic,SData); break; case MQISDP_NO_PUBS_AVAILABLE: // Timed out // printCom1("MQISDP_NO_PUBS_AVAILABLE\n\r"); break; default: // Some other problem printCom1("Some other problem\n\r"); break; } //Step6.4 Publish to micro broker 2 pubParms_2.qos = 1; pubParms_2.retain = 0 ? 1: 0; iPubState_2 = iPubState_2 % 4; switch(iPubState_2) { case 0: //DI = 0 sprintf(PDataBuf,"%s","0"); sprintf(pubParms_2.topic,"%s","uPAC1/XBoard/DI/ch0"); break; case 1: //DI = 1 sprintf(PDataBuf,"%s","1"); sprintf(pubParms_2.topic,"%s","uPAC1/XBoard/DI/ch0"); break; case 2: //AI = 3.5 sprintf(PDataBuf,"%s","3.5"); sprintf(pubParms_2.topic,"%s","uPAC1/XBoard/AI/ch0"); break; case 3: //AI = -3.5 sprintf(PDataBuf,"%s","-3.5"); sprintf(pubParms_2.topic,"%s","uPAC1/XBoard/AI/ch0"); break; } pBuffer = PDataBuf; iRet = MQtt_MQIsdpPublish(1, &pubParms_2, pBuffer, strlen(pBuffer)); if(iRet!=0) { // MQtt_MQIsdpPublish error. printCom1("2 MQtt_MQIsdpPublish %d error.\n\r",iPubState_2); } else { // MQtt_MQIsdpPublish OK. // printCom1("MQtt_MQIsdpPublish %d OK.\n\r",iPubState); } iPubState_2++; if(iPubState_2 >= 10000) iPubState_2 = 0; statusCode = 0; if ( pubParms_2.lastSentMsg != MQISDP_INV_MSG_HANDLE) { while ( statusCode != MQISDP_DELIVERED && (iCntState_2 < 50)) { iCntState_2++; statusCode = MQtt_MQIsdp_getMsgStatus(1, pubParms_2.hConn, pubParms_2.lastSentMsg ); if(statusCode == MQISDP_DELIVERED) { printCom1("2 statusCode = MQISDP_DELIVERED\n\r"); } else { printCom1("2 statusCode = %d(NOT MQISDP_DELIVERED)\n\r",statusCode); } } } iCntState_2 = 0; } } }