在物联网应用中,会用到MQTT协议进行平台与设备之间的通信。当我们做自动化测试时,用例最后需要对发送的MQTT消息执行结果做确认检查,即需要对返回的MQTT消息中关键字做检查。下面为一个python示例:
def checkSubContent(self,topic,keyword,timeout=60):
"""说明:检查指定订阅消息的内容中是否包含关键字keyword
topic - 指定订阅消息主题
keyword - 关键字,可以是字符或字符串
timeout - 检查超时时间,超过该时间后,默认本次检查内容失败"""
try:
host = self.host
port = self.port
username = self.username
password = self.password
# 连接成功回调函数
sub = str(topic)
print sub
#连接及订阅消息
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
client.subscribe(sub)
# 消息推送回调函数
def on_message(client, userdata, msg):
#下面为关键字检查,如果检查出指定的关键字,则退出;否则跑出错误
if((msg.payload).find(keyword) >= 0):
print("Success:find the keyword>" + keyword)
print(msg.topic+" "+str(msg.payload))
sys.exit(0)
else:
print("Failed:not find the keyword>" + keyword)
raise AssertionError(msg.topic+" "+str(msg.payload))
client = mqtt.Client()
client.username_pw_set(username, password)
client.connect(host, port=int(port), keepalive=120, bind_address="")
client.on_connect = on_connect
client.on_message = on_message
#下面为loop的控制,本来打算做个定时器,但发现实现起来比较麻烦,通过简单的实验,#可以通过这种方式对loop的时间做控制;经本人后续测试,每次count与timeout的关系##非一成不变的,跟订阅的内容有关,具体为什么后面把结果另成文,同时因为笔者使用的#python2.7,根据官方说明python3.0已经实现了定时器这个功能,所以如果大家用的#python3.0,可以尝试重新写下控制方法
count = int(timeout) + 5
j = 1
while (j < count):
client.loop()
j = j + 1
raise AssertionError("Failed:Timeout>" + str(timeout) + "second")
except SystemExit:
pass