using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Text; using System.Windows.Forms; using System.Threading; using ICPDAS.SmartQ; namespace WCSMixPubSub { public delegate void ShowRecevieMsgDelegate(string topic,string data); public partial class MainForm : Form { /* * If application must publish or subscribe at the same time * the programe must has two PSPARMS structures , * one responses publish works, another responses subscribe works. */ private PSPARMS psparms; private Thread subThread; private bool subscribing = false; private bool threadrunning = false; private bool connecting = false; //private PSPARMS psp for initial private string clientid = "Client_PC"; private string pBroker = "192.168.100.58"; private int port = 1883; private string Topic = "MyTopic"; private ushort keepAliveTime = 32000; private int qos = 1; private int retain = 0; private string lwtTopic = "MyTopic"; private int lwtQos = 1; private int lwtRetain = 0; private string lwtData = "Client_PC"; private string persistDir = "C:\\temp\\wmqtt"; public MainForm() { InitializeComponent(); } private void InitializeCustomization() { //pub structure initialization psparms = new PSPARMS(); psparms.clientID = clientid + " pub"; psparms.pBroker = pBroker; psparms.port = port; psparms.Topic = ""; psparms.keepAliveTime = keepAliveTime; psparms.qos = qos; psparms.retain = retain; psparms.lwtTopic = lwtTopic; psparms.lwtQos = lwtQos; psparms.lwtRetain = lwtRetain; psparms.lwtData = lwtData + " pub terminated."; psparms.persistDir = persistDir; psparms.timeout = 10; //subscribe thread initialization subThread = new Thread(new ThreadStart(SubscribeThreadWorker)); subThread.Name = "Subscriber"; subThread.IsBackground = true; } #region Controls Event Handle private void MainForm_Load(object sender, EventArgs e) { InitializeCustomization(); clientIDTxt.Text = clientid; QpIpTxt.Text = pBroker; subTopicTxt.Text = Topic; pubTopicTxt.Text = Topic; } private void MainForm_Closing(object sender, CancelEventArgs e) { //stop receiving if (subscribing) { subscribing = false; } Thread.Sleep(1000); if (connecting) { /* Disconnect the protocol*/ MQTTDotNet.ICPDAS_MQIsdpDisconnect(ref psparms); } /*Close Subscribe Thread*/ threadrunning = false; subThread.Abort(); /* Terminate and Release memory */ MQTTDotNet.ICPDAS_MQIsdpTerminate(ref psparms); } private void ConnBt_Click(object sender, EventArgs e) { if (!string.IsNullOrEmpty(clientIDTxt.Text) && !string.IsNullOrEmpty(QpIpTxt.Text)) { psparms.clientID = clientIDTxt.Text + " pub"; psparms.pBroker = QpIpTxt.Text; psparms.persistDir = persistDir; /* Connect to QP-500 */ if (!connecting) { int returncode = MQTTDotNet.ICPDAS_MQIsdpConnect(ref psparms); if (returncode != 0) { MessageBox.Show("Failed to connnect to QP-500 When pubclient connect.\n"); return; } MessageBox.Show("Connected to QP-500 Successfully."); connecting = true; } } } private void DisconnBt_Click(object sender, EventArgs e) { /*Suspend subscribe thread*/ if (connecting) { if (subscribing) { subscribing = false; } Thread.Sleep(1000); /* Disconnect the protocol*/ MQTTDotNet.ICPDAS_MQIsdpDisconnect(ref psparms); MessageBox.Show("Disconnect to QP-500."); connecting = false; } } private void PublishBt_Click(object sender, EventArgs e) { if (!string.IsNullOrEmpty(pubTopicTxt.Text) && !string.IsNullOrEmpty(pubDataTxt.Text)) { psparms.Topic = pubTopicTxt.Text; string pubdata = pubDataTxt.Text; MQTTDotNet.ICPDAS_MQIsdpPublish(ref psparms, pubdata, pubdata.Length); } } private void SubBt_Click(object sender, EventArgs e) { if (!string.IsNullOrEmpty(subTopicTxt.Text)) { psparms.Topic = subTopicTxt.Text; MQTTDotNet.ICPDAS_MQIsdpSubscribe(ref psparms); MessageBox.Show("Subscribe topic: " + psparms.Topic); if (!subscribing) { if(subThread!=null && !threadrunning) subThread.Start(); subscribing = true; } } } private void UnSubBt_Click(object sender, EventArgs e) { if (!string.IsNullOrEmpty(subTopicTxt.Text)) { psparms.Topic = subTopicTxt.Text; MQTTDotNet.ICPDAS_MQIsdpUnsubscribe(ref psparms); MessageBox.Show("Unsubscribe topic: " + psparms.Topic); } } #endregion private void SubscribeThreadWorker() { int buffersize = 1024; byte[] topicBytes; byte[] dataBytes; Encoding strencoding = Encoding.UTF8; threadrunning = true; //MessageBox.Show("Subscribe Thread worker start."); while (threadrunning) { if (subscribing) { //this.BeginInvoke(new ShowRecevieMsgDelegate(ShowReceiveMsg), "", "sub thread working"); topicBytes = new byte[buffersize]; dataBytes = new byte[buffersize]; int returncode = MQTTDotNet.ICPDAS_MQIsdpReceive(ref psparms, topicBytes, dataBytes); if (topicBytes[0] != strencoding.GetBytes("\0")[0]) { string rcvData = strencoding.GetString(dataBytes, 0, dataBytes.Length).TrimEnd('\0'); string rcvTopic = strencoding.GetString(topicBytes, 0, topicBytes.Length).TrimEnd('\0'); this.BeginInvoke(new ShowRecevieMsgDelegate(ShowReceiveMsg), rcvTopic, rcvData); } Thread.Sleep(100); } else { //this.BeginInvoke(new ShowRecevieMsgDelegate(ShowReceiveMsg), "", "Sub Thread sleeping"); Thread.Sleep(1000); } } //MessageBox.Show("Subscribe Thread worker end."); } private void ShowReceiveMsg(string topic, string data) { rcvTopicTxt.Text = topic; rcvDataTxt.Text = data; this.Update(); } } }