/* Project: MQtt Client demo program with X-Server(connect to one micro broker) Compiler: BC++ 3.1 Compile mode: large Project: MQttX_2X.C v7000.c Vmodbus.C ..\LIB\7186EL.Lib ..\LIB\TCPIPL.Lib ..\LIB\VCOM_NNNN.Lib, with NNNN being the lib file's version. ..\lib\MQtt_X.lib" Details: This demo shows how to use MQtt_X library in 7186. Step 1: Initiate the controller. Step 2-1: Initiate MQtt client 1. Step 2-2: Initiate MQtt client 2. Step 3-1: Connect MQtt client to miro broker 1(IP:192.168.1.91). Step 3-2: Connect MQtt client to miro broker 2(IP:192.168.1.94). Step 4-1: Subscribe Topic to micro broker 1. Step 4-2: Subscribe Topic to micro broker 2. Step 5: It is a loop function which receive data published from other MQtt clients and publish its data to other MQtt clients via micro broker 1 and micro broker 2. [Dec 15, 2009] by Wilson */ #include #include "..\lib\7186e.h" #include "..\lib\Tcpip32.h" #include "..\lib\vxcomm.h" #include "..\lib\MQtt_X.h" int i=0, iRet=0, iLength=0, iFlag=0; unsigned long lStart_TimeTick=0; //To record when the 7186E/8000E do //Ethernet communication. PUBPARMS pubParms_1; SUBPARMS subParms_1; PUBPARMS pubParms_2; SUBPARMS subParms_2; MQISDPTI *pSendTaskParms; MQISDPTI *pRcvTaskParms; MQISDPTI *pApiTaskParms; char SDataBuf[1024]; long dataLength = 0; long topicLength = 0; char STopic[100]; char SData[100]; char *pBuffer=NULL; char PDataBuf[32]; int statusCode = 0; unsigned int iCntState_1 =0; unsigned int iPubState_1 =0; unsigned int iCntState_2 = 0; unsigned int iPubState_2 = 0; void UserCount(void) { // user's timer trigger function // // In this function cannot use any function that will use the hardware signal "clock", // Such as: // 1. ClockHigh(),ClockLow(), ClockHighLow(), // 2. Any EEPROM functions. // 3. Any 5DigitLed functions. // 4. Any NVRAM function. // 5. Any RTC function.(GetTime(),SetTime(),GetDate(),SetDate()) } void UserInit(void) { /* In this function, user CAN: 1. initialize user's program. 2. set time interval for calling UserCount(). 3. set initial value of I/O or variables for UserLoopFun(). 4. set initial value of I/O or variables for another functions. 5. change the default TCP PORT 10000/9999/502 to others value. [after vcom3004.lib] Syntax: Port10000=newport_10000; for calling UserCmd (user.c) Port9999=newport_9999; for calling VcomCmd7000 (v7000.c) PortUser=newport_User; for calling VcomCmdUser (user.c) [after vcom3005.lib] Default port value: Port10000=10000; Port9999=9999; Port502=502; PortUser=0; If the port value is 0, Xserver will not listen that port. That means the port will be disable. Please refer to demo9 & demo11 for example code */ //Step1. Initiate the controller. InitLib(); SetBaudrate(1, 115200L); //Step2-1. Initiate MQtt client 1. //======= Initiate MQtt client ======= iRet = MQtt_Init(0); if(iRet!=0) { // Initial MQtt_X library error. printCom1("Initial MQtt_X library error.\n\r"); } else { // Initial MQtt_X library ok. printCom1("Initial MQtt_X library OK.\n\r"); } //Step2-2. Initiate MQtt client 2. iRet = MQtt_Init(1); if(iRet!=0) { // Initial MQtt_X library error. printCom1("Initial MQtt_X library error.\n\r"); } else { // Initial MQtt_X library ok. printCom1("Initial MQtt_X library OK.\n\r"); } //=======End Initiate MQtt client ======= } void UserLoopFun(void) { if(!iFlag) { //Step3-1. Connect MQtt client to miro broker 1(IP:192.168.1.91). pubParms_1.port = 1883; subParms_1.port = 1888; sprintf(pubParms_1.pBroker,"%s","192.168.1.91"); sprintf(subParms_1.pBroker,"%s","192.168.1.91"); pubParms_1.qos = 1; subParms_1.qos = 1; pubParms_1.retain = 0; pubParms_1.debug = 0; subParms_1.debug = 0; //Last will settings sprintf(pubParms_1.lwtTopic,"%s","uPAC1/system/alive"); sprintf(subParms_1.lwtTopic,"%s","uPAC1/system/alive"); pubParms_1.lwtQos = 1; subParms_1.lwtQos = 1; pubParms_1.lwtRetain = 0; subParms_1.lwtRetain = 0; sprintf(pubParms_1.lwtData,"%s","help"); sprintf(subParms_1.lwtData,"%s","help"); pubParms_1.dataArg = 0; subParms_1.dataArg = 0; pubParms_1.lastSentMsg = MQISDP_INV_MSG_HANDLE; subParms_1.lastSentMsg = MQISDP_INV_MSG_HANDLE; subParms_1.timeout = 10; sprintf(pubParms_1.clientId,"%s","uPAC1"); iRet = MQtt_MQIsdpConnect(0, &pubParms_1, pApiTaskParms, 2, 5, 1); if(iRet!=0) { // MQtt_MQIsdpConnect error. printCom1("MQtt_MQIsdpConnect 1 error.\n\r"); } else { // MQtt_MQIsdpConnect OK. printCom1("MQtt_MQIsdpConnect 1 OK.\n\r"); } //Step3-2. Connect MQtt client to miro broker 2(IP:192.168.1.94). pubParms_2.port = 1883; subParms_2.port = 1888; sprintf(pubParms_2.pBroker,"%s","192.168.1.94"); sprintf(subParms_2.pBroker,"%s","192.168.1.94"); pubParms_2.qos = 1; subParms_2.qos = 1; pubParms_2.retain = 0; pubParms_2.debug = 0; subParms_2.debug = 0; //Last will settings sprintf(pubParms_2.lwtTopic,"%s","uPAC1/system/alive"); sprintf(subParms_2.lwtTopic,"%s","uPAC1/system/alive"); pubParms_2.lwtQos = 1; subParms_2.lwtQos = 1; pubParms_2.lwtRetain = 0; subParms_2.lwtRetain = 0; sprintf(pubParms_2.lwtData,"%s","help"); sprintf(subParms_2.lwtData,"%s","help"); pubParms_2.dataArg = 0; subParms_2.dataArg = 0; pubParms_2.lastSentMsg = MQISDP_INV_MSG_HANDLE; subParms_2.lastSentMsg = MQISDP_INV_MSG_HANDLE; subParms_2.timeout = 10; sprintf(pubParms_2.clientId,"%s","uPAC1"); iRet = MQtt_MQIsdpConnect(1, &pubParms_2, pApiTaskParms, 2, 5, 1); if(iRet!=0) { // MQtt_MQIsdpConnect error. printCom1("MQtt_MQIsdpConnect 2 error.\n\r"); } else { // MQtt_MQIsdpConnect OK. printCom1("MQtt_MQIsdpConnect 2 OK.\n\r"); } //Step4-1. Subscribe Topic to micro broker 1. subParms_1.hConn = pubParms_1.hConn; subParms_1.qos = 1; subParms_1.timeout = 50; //Sub topic 1 sprintf(subParms_1.topic,"%s","uPAC101/XBoard/DO/ch0"); iRet = MQtt_MQIsdpSubscribe(0, &subParms_1 ); if(iRet!=0) { // MQtt_MQIsdpSubscribe 1 error. printCom1("MQtt_MQIsdpSubscribe 1 error.\n\r"); } else { // MQtt_MQIsdpSubscribe 1 OK. printCom1("MQtt_MQIsdpSubscribe 1 OK.\n\r"); } //Sub topic 2 sprintf(subParms_1.topic,"%s","uPAC101/XBoard/DO/ch1"); iRet = MQtt_MQIsdpSubscribe(0, &subParms_1 ); if(iRet!=0) { // MQtt_MQIsdpSubscribe 2 error. printCom1("MQtt_MQIsdpSubscribe 2 error.\n\r"); } else { // MQtt_MQIsdpSubscribe 2 OK. printCom1("MQtt_MQIsdpSubscribe 2 OK.\n\r"); } //Sub topic 3 sprintf(subParms_1.topic,"%s","uPAC101/XBoard/AO/ch0"); iRet = MQtt_MQIsdpSubscribe(0, &subParms_1 ); if(iRet!=0) { // MQtt_MQIsdpSubscribe 3 error. printCom1("MQtt_MQIsdpSubscribe 3 error.\n\r"); } else { // MQtt_MQIsdpSubscribe 3 OK. printCom1("MQtt_MQIsdpSubscribe 3 OK.\n\r"); } //Sub topic 4 sprintf(subParms_1.topic,"%s","uPAC101/XBoard/AO/ch1"); iRet = MQtt_MQIsdpSubscribe(0, &subParms_1 ); if(iRet!=0) { // MQtt_MQIsdpSubscribe 4 error. printCom1("MQtt_MQIsdpSubscribe 4 error.\n\r"); } else { // MQtt_MQIsdpSubscribe 4 OK. printCom1("MQtt_MQIsdpSubscribe 4 OK.\n\r"); } //Step4-2. Subscribe Topic to micro broker 2. subParms_2.hConn = pubParms_2.hConn; subParms_2.qos = 1; subParms_2.timeout = 50; //Sub topic 1 sprintf(subParms_2.topic,"%s","uPAC101/XBoard/DO/ch0"); iRet = MQtt_MQIsdpSubscribe(1, &subParms_2 ); if(iRet!=0) { // MQtt_MQIsdpSubscribe 1 error. printCom1("MQtt_MQIsdpSubscribe 1 error.\n\r"); } else { // MQtt_MQIsdpSubscribe 1 OK. printCom1("MQtt_MQIsdpSubscribe 1 OK.\n\r"); } //Sub topic 2 sprintf(subParms_2.topic,"%s","uPAC101/XBoard/DO/ch1"); iRet = MQtt_MQIsdpSubscribe(1, &subParms_2 ); if(iRet!=0) { // MQtt_MQIsdpSubscribe 2 error. printCom1("MQtt_MQIsdpSubscribe 2 error.\n\r"); } else { // MQtt_MQIsdpSubscribe 2 OK. printCom1("MQtt_MQIsdpSubscribe 2 OK.\n\r"); } //Sub topic 3 sprintf(subParms_2.topic,"%s","uPAC101/XBoard/AO/ch0"); iRet = MQtt_MQIsdpSubscribe(1, &subParms_2 ); if(iRet!=0) { // MQtt_MQIsdpSubscribe 3 error. printCom1("MQtt_MQIsdpSubscribe 3 error.\n\r"); } else { // MQtt_MQIsdpSubscribe 3 OK. printCom1("MQtt_MQIsdpSubscribe 3 OK.\n\r"); } //Sub topic 4 sprintf(subParms_2.topic,"%s","uPAC101/XBoard/AO/ch1"); iRet = MQtt_MQIsdpSubscribe(1, &subParms_2 ); if(iRet!=0) { // MQtt_MQIsdpSubscribe 4 error. printCom1("MQtt_MQIsdpSubscribe 4 error.\n\r"); } else { // MQtt_MQIsdpSubscribe 4 OK. printCom1("MQtt_MQIsdpSubscribe 4 OK.\n\r"); } iFlag=1; } if((GetTimeTicks()-lStart_TimeTick)>50) { lStart_TimeTick=GetTimeTicks(); //Step5.1 Recieve Publish from micro broker 1 iRet = MQtt_MQIsdpRcvPub(0, &subParms_1, SDataBuf, &topicLength , &dataLength); switch ( iRet ) { case MQISDP_OK: strncpy(STopic,SDataBuf,topicLength); STopic[topicLength] = '\0'; strcpy(SData,SDataBuf+topicLength); printCom1("1 Topic:%s Data:%s\n\r",STopic,SData); break; case MQISDP_NO_PUBS_AVAILABLE: // Timed out // printCom1("MQISDP_NO_PUBS_AVAILABLE\n\r"); break; default: // Some other problem printCom1("1 Some other problem\n\r"); break; } //Step5.2 Publish to micro broker 1 pubParms_1.qos = 1; pubParms_1.retain = 0 ? 1: 0; iPubState_1 = iPubState_1 % 4; switch(iPubState_1) { case 0: //DI = 0 sprintf(PDataBuf,"%s","0"); sprintf(pubParms_1.topic,"%s","uPAC1/XBoard/DI/ch0"); break; case 1: //DI = 1 sprintf(PDataBuf,"%s","1"); sprintf(pubParms_1.topic,"%s","uPAC1/XBoard/DI/ch0"); break; case 2: //AI = 3.5 sprintf(PDataBuf,"%s","3.5"); sprintf(pubParms_1.topic,"%s","uPAC1/XBoard/AI/ch0"); break; case 3: //AI = -3.5 sprintf(PDataBuf,"%s","-3.5"); sprintf(pubParms_1.topic,"%s","uPAC1/XBoard/AI/ch0"); break; } pBuffer = PDataBuf; iRet = MQtt_MQIsdpPublish(0, &pubParms_1, pBuffer, strlen(pBuffer)); if(iRet!=0) { // MQtt_MQIsdpPublish error. printCom1("1 MQtt_MQIsdpPublish %d error.\n\r",iPubState_1); } else { // MQtt_MQIsdpPublish OK. // printCom1("MQtt_MQIsdpPublish %d OK.\n\r",iPubState); } iPubState_1++; if(iPubState_1 >= 10000) iPubState_1 = 0; statusCode = 0; if ( pubParms_1.lastSentMsg != MQISDP_INV_MSG_HANDLE) { while ( statusCode != MQISDP_DELIVERED && (iCntState_1 < 50)) { iCntState_1++; statusCode = MQtt_MQIsdp_getMsgStatus(0, pubParms_1.hConn, pubParms_1.lastSentMsg ); if(statusCode == MQISDP_DELIVERED) { printCom1("1 statusCode = MQISDP_DELIVERED\n\r"); } else { printCom1("1 statusCode = %d(NOT MQISDP_DELIVERED)\n\r",statusCode); } } } iCntState_1 = 0; //Step5.3 Recieve Publish from micro broker 2 iRet = MQtt_MQIsdpRcvPub(1, &subParms_2, SDataBuf, &topicLength , &dataLength); switch ( iRet ) { case MQISDP_OK: strncpy(STopic,SDataBuf,topicLength); STopic[topicLength] = '\0'; strcpy(SData,SDataBuf+topicLength); printCom1("2 Topic:%s Data:%s\n\r",STopic,SData); break; case MQISDP_NO_PUBS_AVAILABLE: // Timed out // printCom1("MQISDP_NO_PUBS_AVAILABLE\n\r"); break; default: // Some other problem printCom1("Some other problem\n\r"); break; } //Step5.4 Publish to micro broker 2 pubParms_2.qos = 1; pubParms_2.retain = 0 ? 1: 0; iPubState_2 = iPubState_2 % 4; switch(iPubState_2) { case 0: //DI = 0 sprintf(PDataBuf,"%s","0"); sprintf(pubParms_2.topic,"%s","uPAC1/XBoard/DI/ch0"); break; case 1: //DI = 1 sprintf(PDataBuf,"%s","1"); sprintf(pubParms_2.topic,"%s","uPAC1/XBoard/DI/ch0"); break; case 2: //AI = 3.5 sprintf(PDataBuf,"%s","3.5"); sprintf(pubParms_2.topic,"%s","uPAC1/XBoard/AI/ch0"); break; case 3: //AI = -3.5 sprintf(PDataBuf,"%s","-3.5"); sprintf(pubParms_2.topic,"%s","uPAC1/XBoard/AI/ch0"); break; } pBuffer = PDataBuf; iRet = MQtt_MQIsdpPublish(1, &pubParms_2, pBuffer, strlen(pBuffer)); if(iRet!=0) { // MQtt_MQIsdpPublish error. printCom1("2 MQtt_MQIsdpPublish %d error.\n\r",iPubState_2); } else { // MQtt_MQIsdpPublish OK. // printCom1("MQtt_MQIsdpPublish %d OK.\n\r",iPubState); } iPubState_2++; if(iPubState_2 >= 10000) iPubState_2 = 0; statusCode = 0; if ( pubParms_2.lastSentMsg != MQISDP_INV_MSG_HANDLE) { while ( statusCode != MQISDP_DELIVERED && (iCntState_2 < 50)) { iCntState_2++; statusCode = MQtt_MQIsdp_getMsgStatus(1, pubParms_2.hConn, pubParms_2.lastSentMsg ); if(statusCode == MQISDP_DELIVERED) { printCom1("2 statusCode = MQISDP_DELIVERED\n\r"); } else { printCom1("2 statusCode = %d(NOT MQISDP_DELIVERED)\n\r",statusCode); } } } iCntState_2 = 0; } } int UserCmd(unsigned char *Cmd,unsigned char *Response) { sprintf(Response,"%s",Cmd); return 1; // return ok } int VcomUserBinaryCmd(TCPREADDATA *p) { /* VXCOMM.EXE 2.6.12(09/04/2001) or later will support this function. TCP PORT 10000, command 23 will call this function. user can get the following message: p->ReadUartChar : the buffer store the command data(include "23") p->Length : the command data length(include the two byte "23") p->Socket : the socket number that receive the command, that is when the user function want return message to the client, just use the socket to send data. use: VcomSendSocket(p->Socket,pdata,datalength); */ VcomSendSocket(p->Socket,"User-defined command(23)",24); // return 24 bytes. return 1; /* any value will be accept */ } void PortUserStart(int skt) { /* XS8_3200.Lib Version 3.2.00 (20,Apr,2004) or later version supports this function. When a TCP/IP client connects to the 7186E/8000E TCP port 10000, the Xserver calls the function once. You can use function VcomSendSocket to send a message to the client when a connection is established. For example: VcomSendSocket(skt,"Connection is established.",26); //return 26 bytes. skt: socket number assigned to the TCP/IP client. */ skt=skt; //do nothing } int VcomCmdUser(TCPREADDATA *p) { /* VCOM3005 (Feb,22,2002) or later will call this function for PortUser. When packets received by TCP PORT PortUser(user defined) of 7186E/8000E, Xserver will call this function. user can get the following message: p->ReadUartChar : the buffer store the command data. Maximum length of p->ReadUartChar is 32767 bytes. p->Length : the command data length. p->Socket : the socket number that receive the command, that is when the user function wants return message to the client, just use the socket to send data. usage: VcomSendSocket(p->Socket,pdata,datalength); */ /* here just send back the command to the client. */ VcomSendSocket(p->Socket,p->ReadUartChar,p->Length); return 1; /* any value will be accept */ } void Port502Start(int skt) { /* XS8_3200.Lib Version 3.2.00 (20,Apr,2004) or later version supports this function. When a TCP/IP client connects to the 7188E/8000E TCP port 502, the Xserver calls the function once. You can use function VcomSendSocket to send a message to the client when a connection is established. For example: VcomSendSocket(skt,"Connection is established.",26); //return 26 bytes. skt: socket number assigned to the TCP/IP client. */ skt=skt; /*do nothing*/ }