try:
host = self.host
port = self.port
username = self.username
password = self.password
# 连接成功回调函数
sub = str(topic)
print sub
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
client.subscribe(sub)
# 消息推送回调函数
def on_message(client, userdata, msg):
if((msg.payload).find(keyword) >= 0):
print("Success:find the keyword>" + keyword)
print(msg.topic+" "+str(msg.payload))
sys.exit(0)
else:
print("Failed:not find the keyword>" + keyword)
raise AssertionError(msg.topic+" "+str(msg.payload))
client = mqtt.Client()
client.username_pw_set(username, password)
client.connect(host, port=int(port), keepalive=120, bind_address="")
client.on_connect = on_connect
client.on_message = on_message
count = int(timeout) + 5
j = 1
while (j < count):
client.loop()
j = j + 1
raise AssertionError("Failed:Timeout>" + str(timeout) + "second")
except SystemExit:
pass
现在你觉得编写一个MQTT的关键字还难嘛?So easy,怎么样,当关键字都写好后,就是根据关键字来组装你的自动化测试用例库了,现在开始你的MQTT自动化之旅吧。