Python编写MQTT的检查函数

在物联网应用中,会用到MQTT协议进行平台与设备之间的通信。当我们做自动化测试时,用例最后需要对发送的MQTT消息执行结果做确认检查,即需要对返回的MQTT消息中关键字做检查。下面为一个python示例:

def checkSubContent(self,topic,keyword,timeout=60):

        """说明:检查指定订阅消息的内容中是否包含关键字keyword

         topic - 指定订阅消息主题

         keyword - 关键字,可以是字符或字符串

         timeout - 检查超时时间,超过该时间后,默认本次检查内容失败"""

        try:

            host = self.host

            port = self.port

            username = self.username

            password = self.password

            # 连接成功回调函数

            sub = str(topic)

            print sub 

            #连接及订阅消息

            def on_connect(client, userdata, flags, rc): 

                print("Connected with result code " + str(rc)) 

                client.subscribe(sub) 

            # 消息推送回调函数 

            def on_message(client, userdata, msg):

               #下面为关键字检查,如果检查出指定的关键字,则退出;否则跑出错误

                if((msg.payload).find(keyword) >= 0):

                    print("Success:find the keyword>" + keyword)

                    print(msg.topic+" "+str(msg.payload))

                    sys.exit(0)

                else:

                    print("Failed:not find the keyword>" + keyword)

                    raise AssertionError(msg.topic+" "+str(msg.payload))

            client = mqtt.Client()

            client.username_pw_set(username, password)

            client.connect(host, port=int(port), keepalive=120, bind_address="")

            client.on_connect = on_connect 

            client.on_message = on_message

#下面为loop的控制,本来打算做个定时器,但发现实现起来比较麻烦,通过简单的实验,#可以通过这种方式对loop的时间做控制;经本人后续测试,每次counttimeout的关系##非一成不变的,跟订阅的内容有关,具体为什么后面把结果另成文,同时因为笔者使用的#python2.7,根据官方说明python3.0已经实现了定时器这个功能,所以如果大家用的#python3.0,可以尝试重新写下控制方法

            count = int(timeout) + 5

            j = 1

            while (j < count): 

                client.loop()

                j = j + 1

            raise AssertionError("Failed:Timeout>" + str(timeout) + "second")

        except SystemExit:          

            pass



留言