using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Text; using System.Windows.Forms; using System.Threading; using ICPDAS.SmartQ; namespace WCSMixPubSub { public delegate void ShowRecevieMsgDelegate(string topic,string data); public partial class MainForm : Form { /* * If application must publish or subscribe at the same time * the programe must has two PSPARMS structures , * one responses publish works, another responses subscribe works. */ private PSPARMS pubpsparms; private PSPARMS subpsparms; private Thread subThread; private bool subscribing = false; private bool threadrunning = false; private bool connecting = false; //private PSPARMS psp for initial private string clientid = "QP-8x1(2) Client"; private string pBroker = "192.168.100.58"; private int port = 1883; private string Topic = "MyTopic"; private ushort keepAliveTime = 32000; private int qos = 1; private int retain = 0; private string lwtTopic = "MyTopic"; private int lwtQos = 1; private int lwtRetain = 0; private string lwtData = "QP-8x1(2) Client "; public MainForm() { InitializeComponent(); } private void InitializeCustomization() { //pub structure initialization pubpsparms = new PSPARMS(); pubpsparms.clientID = clientid + " pub"; pubpsparms.pBroker = pBroker; pubpsparms.port = port; pubpsparms.Topic = ""; pubpsparms.keepAliveTime = keepAliveTime; pubpsparms.qos = qos; pubpsparms.retain = retain; pubpsparms.lwtTopic = lwtTopic; pubpsparms.lwtQos = lwtQos; pubpsparms.lwtRetain = lwtRetain; pubpsparms.lwtData = lwtData + " pub terminated."; //sub structure initialization subpsparms = new PSPARMS(); subpsparms.clientID = clientid + " sub"; ; subpsparms.pBroker = pBroker; subpsparms.port = port; subpsparms.Topic = ""; subpsparms.keepAliveTime = keepAliveTime; subpsparms.qos = qos; subpsparms.retain = retain; subpsparms.lwtTopic = lwtTopic; subpsparms.lwtQos = lwtQos; subpsparms.lwtRetain = lwtRetain; subpsparms.lwtData = lwtData + " sub terminated."; subpsparms.timeout = 10; //subscribe thread initialization subThread = new Thread(new ThreadStart(SubscribeThreadWorker)); subThread.Name = "Subscriber"; subThread.IsBackground = true; //set UI PublishBt.Enabled = true; DisconnBt.Enabled = false; PublishBt.Enabled = false; SubBt.Enabled = false; UnSubBt.Enabled = false; } #region Controls Event Handle private void MainForm_Load(object sender, EventArgs e) { InitializeCustomization(); clientIDTxt.Text = clientid; QpIpTxt.Text = pBroker; subTopicTxt.Text = Topic; pubTopicTxt.Text = Topic; } private void MainForm_Closing(object sender, CancelEventArgs e) { //stop receiving if (subscribing) { subscribing = false; } Thread.Sleep(1000); if (connecting) { /* Disconnect the protocol*/ MQTTDotNet.ICPDAS_MQIsdpDisconnect(ref pubpsparms); MQTTDotNet.ICPDAS_MQIsdpDisconnect(ref subpsparms); } /*Close Subscribe Thread*/ threadrunning = false; subThread.Abort(); /* Terminate and Release memory */ MQTTDotNet.ICPDAS_MQIsdpTerminate(ref pubpsparms); MQTTDotNet.ICPDAS_MQIsdpTerminate(ref subpsparms); } private void ConnBt_Click(object sender, EventArgs e) { if (!string.IsNullOrEmpty(clientIDTxt.Text) && !string.IsNullOrEmpty(QpIpTxt.Text)) { pubpsparms.clientID = clientIDTxt.Text + " pub"; pubpsparms.pBroker = QpIpTxt.Text; subpsparms.clientID = clientIDTxt.Text + " sub"; subpsparms.pBroker = QpIpTxt.Text; if (!connecting) { /* Connect to QP-500 */ if ((MQTTDotNet.ICPDAS_MQIsdpConnect(ref pubpsparms) != 0)) { MessageBox.Show("Failed to connnect to QP-500 When pubclient connect.\n"); return; } if (MQTTDotNet.ICPDAS_MQIsdpConnect(ref subpsparms) != 0) { MQTTDotNet.ICPDAS_MQIsdpDisconnect(ref pubpsparms); MessageBox.Show("Failed to connnect to QP-500 When subclient connect.\n"); return; } MessageBox.Show("Connected to QP-500 Successfully."); connecting = true; ConnBt.Enabled = false; DisconnBt.Enabled = true; PublishBt.Enabled = true; SubBt.Enabled = true; UnSubBt.Enabled = true; } } } private void DisconnBt_Click(object sender, EventArgs e) { if (connecting) { /*Suspend subscribe thread*/ if (subscribing) { subscribing = false; } Thread.Sleep(1000); /* Disconnect the protocol*/ MQTTDotNet.ICPDAS_MQIsdpDisconnect(ref pubpsparms); MQTTDotNet.ICPDAS_MQIsdpDisconnect(ref subpsparms); MessageBox.Show("Disconnect to QP-500."); connecting = false; } DisconnBt.Enabled = false; ConnBt.Enabled = true; PublishBt.Enabled = false; SubBt.Enabled = false; UnSubBt.Enabled = false; rcvDataTxt.Text = string.Empty; rcvTopicTxt.Text = string.Empty; } private void PublishBt_Click(object sender, EventArgs e) { if (!string.IsNullOrEmpty(pubTopicTxt.Text) && !string.IsNullOrEmpty(pubDataTxt.Text)) { pubpsparms.Topic = pubTopicTxt.Text; string pubdata = pubDataTxt.Text; MQTTDotNet.ICPDAS_MQIsdpPublish(ref pubpsparms, pubdata, pubdata.Length); } } private void SubBt_Click(object sender, EventArgs e) { if (!string.IsNullOrEmpty(subTopicTxt.Text)) { subpsparms.Topic = subTopicTxt.Text; MQTTDotNet.ICPDAS_MQIsdpSubscribe(ref subpsparms); MessageBox.Show("Subscribe topic: " + subpsparms.Topic); if (!subscribing) { if(subThread!=null && !threadrunning) subThread.Start(); subscribing = true; } } } private void UnSubBt_Click(object sender, EventArgs e) { if (!string.IsNullOrEmpty(subTopicTxt.Text)) { subpsparms.Topic = subTopicTxt.Text; MQTTDotNet.ICPDAS_MQIsdpUnsubscribe(ref subpsparms); MessageBox.Show("Unsubscribe topic: " + subpsparms.Topic); } } #endregion private void SubscribeThreadWorker() { int buffersize = 1024; byte[] topicBytes; byte[] dataBytes; Encoding strencoding = Encoding.UTF8; threadrunning = true; //MessageBox.Show("Subscribe Thread worker start."); while (threadrunning) { if (subscribing) { //this.BeginInvoke(new ShowRecevieMsgDelegate(ShowReceiveMsg), "", "sub thread working"); topicBytes = new byte[buffersize]; dataBytes = new byte[buffersize]; int returncode = MQTTDotNet.ICPDAS_MQIsdpReceive(ref subpsparms, topicBytes, dataBytes); if (topicBytes[0] != strencoding.GetBytes("\0")[0]) { string rcvData = strencoding.GetString(dataBytes, 0, dataBytes.Length).TrimEnd('\0'); string rcvTopic = strencoding.GetString(topicBytes, 0, topicBytes.Length).TrimEnd('\0'); this.BeginInvoke(new ShowRecevieMsgDelegate(ShowReceiveMsg), rcvTopic, rcvData); } Thread.Sleep(100); } else { //this.BeginInvoke(new ShowRecevieMsgDelegate(ShowReceiveMsg), "", "Sub Thread sleeping"); Thread.Sleep(1000); } } //MessageBox.Show("Subscribe Thread worker end."); } private void ShowReceiveMsg(string topic, string data) { rcvTopicTxt.Text = topic; rcvDataTxt.Text = data; this.Update(); } } }