import paho.mqtt.client as mqtt import json import base64 import socket import signal import threading import datetime import argparse # Provide the hostname and port number here of the broker. #MQTTBROKER_HOSTNAME = "86.50.17.54" #MQTTBROKER_HOSTNAME = "localhost" #MQTTBROKER_PORT = 8883 # The topic used to subscribe at the broker. MQTT_RTCM3_TOPIC = "rtcm3" # The name of the client MQTT_RTCM3_CLIENT = "rtcm3client" # Preamble used. RTCM3_PREAMBLE = b'\xd3' class MQTTSubscriber: mqtt_client = None event_semaphore = None rtcm3_op_stream = None timelog_stream = None """ create an MQTT client that connects to the broker """ def __init__(self, scenario_name, broker_hostname, broker_port): # Create a log file to save the logs. self.rtcm3_op_stream = open(scenario_name+"-rtrcv.rtcm3", "wb") self.timelog_stream = open(scenario_name + "-timestamps.txt", "w") # Create the client and provide the handlers for the events self.mqtt_client = mqtt.Client(client_id=MQTT_RTCM3_CLIENT, clean_session=True, userdata=None, transport='tcp') self.mqtt_client.on_connect = self._mqtt_connect self.mqtt_client.on_disconnect = self._mqtt_disconnect self.mqtt_client.on_message = self.mqtt_message_recv # Connect the client to the broker and subscribed to the topics. self.mqtt_client.connect(broker_hostname, broker_port) self.mqtt_client.subscribe(topic=MQTT_RTCM3_TOPIC) print("Connected to MQTT Broker") # Create an event semaphore used to indicate that the mqtt client is running. self.event_semaphore = threading.Semaphore(0) # Specify the signal handlers. signal.signal(signal.SIGINT, self.handle_exit) signal.signal(signal.SIGTERM, self.handle_exit) """ Debug messages to show when connected """ def _mqtt_connect(self, client, obj, flags, rc): #self.logger.debug("rc: %s",str(rc)) return """ Do a clean disconnect """ def _mqtt_disconnect(self, client, userdata, rc=0): client.loop_stop() return def mqtt_message_recv(self, client, obj, msg): if msg.topic != MQTT_RTCM3_TOPIC: return try: payload = json.loads(msg.payload) rtcm3_data = payload['rtcm3'] rtcm3_data = base64.b64decode(rtcm3_data) #print(rtcm3_data) self.rtcm3_op_stream.write(rtcm3_data) timestamp = payload['timestamp'] log_line = str(timestamp) + "," + str(datetime.datetime.now().timestamp()) + "\n" self.timelog_stream.write(log_line) except Exception as e: print(e) def handle_exit(self, signum, frame): # Disconnect the client self.mqtt_client.disconnect() # Close the files self.rtcm3_op_stream.close() self.timelog_stream.close() # Specify that the mqtt client has safely terminated. self.event_semaphore.release() """ Start the MQTT client and for it to exit """ def main(self): # Start the mqtt client. self.mqtt_client.loop_start() print ("Saving the logs") # Now wait for the mqtt client to safely terminate. self.event_semaphore.acquire(blocking=True) print ("Quitting") return if __name__ == "__main__": parser = argparse.ArgumentParser(add_help=False) parser.add_argument('-s', '--scenario', help="Scenario in which the client is running. This is used as a prefix for the file", type=str) parser.add_argument('-h','--hostname', help="hostname of the MQTT broker", type=str) parser.add_argument('-p', '--port', help="Scenario in which the client is running. This is used as a prefix for the file", type=int) args = parser.parse_args() if None in [ args.scenario, args.hostname, args.port]: parser.print_help() else: s = MQTTSubscriber(args.scenario, args.hostname, args.port) s.main()