Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import paho.mqtt.client as mqtt
import json
import base64
import socket
import signal
import threading
import datetime
import argparse
# Provide the hostname and port number here of the broker.
#MQTTBROKER_HOSTNAME = "86.50.17.54"
#MQTTBROKER_HOSTNAME = "localhost"
#MQTTBROKER_PORT = 8883
# The topic used to subscribe at the broker.
MQTT_RTCM3_TOPIC = "rtcm3"
# The name of the client
MQTT_RTCM3_CLIENT = "rtcm3client"
# Preamble used.
RTCM3_PREAMBLE = b'\xd3'
class MQTTSubscriber:
mqtt_client = None
event_semaphore = None
rtcm3_op_stream = None
timelog_stream = None
"""
create an MQTT client that connects to the broker
"""
def __init__(self, scenario_name, broker_hostname, broker_port):
# Create a log file to save the logs.
self.rtcm3_op_stream = open(scenario_name+"-rtrcv.rtcm3", "wb")
self.timelog_stream = open(scenario_name + "-timestamps.txt", "w")
# Create the client and provide the handlers for the events
self.mqtt_client = mqtt.Client(client_id=MQTT_RTCM3_CLIENT, clean_session=True, userdata=None, transport='tcp')
self.mqtt_client.on_connect = self._mqtt_connect
self.mqtt_client.on_disconnect = self._mqtt_disconnect
self.mqtt_client.on_message = self.mqtt_message_recv
# Connect the client to the broker and subscribed to the topics.
self.mqtt_client.connect(broker_hostname, broker_port)
self.mqtt_client.subscribe(topic=MQTT_RTCM3_TOPIC)
print("Connected to MQTT Broker")
# Create an event semaphore used to indicate that the mqtt client is running.
self.event_semaphore = threading.Semaphore(0)
# Specify the signal handlers.
signal.signal(signal.SIGINT, self.handle_exit)
signal.signal(signal.SIGTERM, self.handle_exit)
"""
Debug messages to show when connected
"""
def _mqtt_connect(self, client, obj, flags, rc):
#self.logger.debug("rc: %s",str(rc))
return
"""
Do a clean disconnect
"""
def _mqtt_disconnect(self, client, userdata, rc=0):
client.loop_stop()
return
def mqtt_message_recv(self, client, obj, msg):
if msg.topic != MQTT_RTCM3_TOPIC:
return
try:
payload = json.loads(msg.payload)
rtcm3_data = payload['rtcm3']
rtcm3_data = base64.b64decode(rtcm3_data)
#print(rtcm3_data)
self.rtcm3_op_stream.write(rtcm3_data)
timestamp = payload['timestamp']
log_line = str(timestamp) + "," + str(datetime.datetime.now().timestamp()) + "\n"
self.timelog_stream.write(log_line)
except Exception as e:
print(e)
def handle_exit(self, signum, frame):
# Disconnect the client
self.mqtt_client.disconnect()
# Close the files
self.rtcm3_op_stream.close()
self.timelog_stream.close()
# Specify that the mqtt client has safely terminated.
self.event_semaphore.release()
"""
Start the MQTT client and for it to exit
"""
def main(self):
# Start the mqtt client.
self.mqtt_client.loop_start()
print ("Saving the logs")
# Now wait for the mqtt client to safely terminate.
self.event_semaphore.acquire(blocking=True)
print ("Quitting")
return
if __name__ == "__main__":
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument('-s', '--scenario',
help="Scenario in which the client is running. This is used as a prefix for the file",
type=str)
parser.add_argument('-h','--hostname',
help="hostname of the MQTT broker",
type=str)
parser.add_argument('-p', '--port',
help="Scenario in which the client is running. This is used as a prefix for the file",
type=int)
args = parser.parse_args()
if None in [ args.scenario, args.hostname, args.port]:
parser.print_help()
else:
s = MQTTSubscriber(args.scenario, args.hostname, args.port)
s.main()