Files
2025-08-26 13:24:53 -07:00

219 lines
8.2 KiB
Python

#!/usr/bin/env python3
"""
MQTT Broker Status Checker for Smart Intersection System
Checks topics, message counts, broker health, and connectivity
"""
import paho.mqtt.client as mqtt
import json
import time
from datetime import datetime
import threading
from collections import defaultdict
class MQTTStatusChecker:
def __init__(self, broker_host="localhost", broker_port=1883):
self.broker_host = broker_host
self.broker_port = broker_port
self.client = mqtt.Client()
self.connected = False
self.topics_data = defaultdict(list)
self.message_count = defaultdict(int)
self.last_message_time = defaultdict(str)
# Setup callbacks
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_disconnect = self.on_disconnect
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
self.connected = True
print(f"✅ Connected to MQTT broker at {self.broker_host}:{self.broker_port}")
# Subscribe to all smart intersection topics
topics = [
"smart-intersection/+/detection/+",
"smart-intersection/+/violation/+",
"smart-intersection/+/performance/+",
"smart-intersection/+/status/+",
"smart-intersection/+/traffic-light/+",
"smart-intersection/+/crosswalk/+",
"$SYS/broker/+/+", # System topics for broker stats
]
for topic in topics:
client.subscribe(topic)
print(f"📡 Subscribed to: {topic}")
else:
print(f"❌ Failed to connect to MQTT broker. Return code: {rc}")
def on_message(self, client, userdata, msg):
topic = msg.topic
try:
# Try to decode as JSON
payload = json.loads(msg.payload.decode())
except:
# If not JSON, store as string
payload = msg.payload.decode()
self.topics_data[topic].append({
'payload': payload,
'timestamp': datetime.now().isoformat(),
'qos': msg.qos,
'retain': msg.retain
})
self.message_count[topic] += 1
self.last_message_time[topic] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Keep only last 10 messages per topic to avoid memory issues
if len(self.topics_data[topic]) > 10:
self.topics_data[topic] = self.topics_data[topic][-10:]
def on_disconnect(self, client, userdata, rc):
self.connected = False
print(f"❌ Disconnected from MQTT broker. Return code: {rc}")
def check_broker_status(self, monitoring_time=30):
"""Check broker status for specified time in seconds"""
print(f"\n🔍 MQTT BROKER STATUS CHECK")
print(f"{'='*50}")
print(f"Broker: {self.broker_host}:{self.broker_port}")
print(f"Monitoring for {monitoring_time} seconds...")
print(f"{'='*50}")
try:
# Connect to broker
self.client.connect(self.broker_host, self.broker_port, 60)
self.client.loop_start()
# Monitor for specified time
start_time = time.time()
while time.time() - start_time < monitoring_time:
if self.connected:
time.sleep(1)
else:
print("❌ Connection lost, attempting to reconnect...")
break
self.client.loop_stop()
self.client.disconnect()
# Generate report
self.generate_status_report()
except Exception as e:
print(f"❌ Error connecting to MQTT broker: {e}")
def generate_status_report(self):
"""Generate comprehensive status report"""
print(f"\n📊 MQTT BROKER STATUS REPORT")
print(f"{'='*60}")
print(f"Report Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Connection Status: {'✅ Connected' if self.connected else '❌ Disconnected'}")
print(f"Total Topics with Activity: {len(self.topics_data)}")
print(f"Total Messages Received: {sum(self.message_count.values())}")
print(f"\n📡 ACTIVE TOPICS SUMMARY:")
print(f"{'-'*60}")
if not self.topics_data:
print("❌ No topics with activity detected")
print(" This could indicate:")
print(" - No publishers are sending data")
print(" - Topic names don't match subscription patterns")
print(" - MQTT clients not connected")
return
# Sort topics by message count
sorted_topics = sorted(self.message_count.items(), key=lambda x: x[1], reverse=True)
for topic, count in sorted_topics:
print(f"📌 Topic: {topic}")
print(f" Messages: {count}")
print(f" Last Activity: {self.last_message_time[topic]}")
# Show latest message if available
if self.topics_data[topic]:
latest = self.topics_data[topic][-1]
payload_preview = str(latest['payload'])[:100]
if len(str(latest['payload'])) > 100:
payload_preview += "..."
print(f" Latest Data: {payload_preview}")
print(f" QoS: {latest['qos']}, Retained: {latest['retain']}")
print()
# System statistics if available
self.show_broker_statistics()
def show_broker_statistics(self):
"""Show broker system statistics from $SYS topics"""
print(f"\n🔧 BROKER SYSTEM STATISTICS:")
print(f"{'-'*40}")
sys_topics = {topic: data for topic, data in self.topics_data.items() if topic.startswith('$SYS')}
if not sys_topics:
print("❌ No system statistics available")
print(" (Broker may not publish $SYS topics)")
return
for topic, messages in sys_topics.items():
if messages:
latest_value = messages[-1]['payload']
print(f"📊 {topic.replace('$SYS/broker/', '')}: {latest_value}")
def test_publish(self):
"""Test publishing a message to verify write capability"""
print(f"\n🧪 TESTING MQTT PUBLISH CAPABILITY...")
try:
test_client = mqtt.Client()
test_client.connect(self.broker_host, self.broker_port, 60)
test_data = {
"test_type": "connectivity_check",
"timestamp": datetime.now().isoformat(),
"message": "MQTT broker test from status checker"
}
topic = "smart-intersection/test/status/connectivity"
test_client.publish(topic, json.dumps(test_data))
test_client.disconnect()
print(f"✅ Successfully published test message to: {topic}")
except Exception as e:
print(f"❌ Failed to publish test message: {e}")
def main():
"""Main function to run MQTT status check"""
print("🚦 SMART INTERSECTION - MQTT BROKER STATUS CHECKER")
print("="*60)
# Check if running locally or with custom settings
broker_host = "localhost" # Change if broker is elsewhere
broker_port = 1883
checker = MQTTStatusChecker(broker_host, broker_port)
# Test publish capability first
checker.test_publish()
# Run status check for 30 seconds
checker.check_broker_status(monitoring_time=30)
print(f"\n💡 TROUBLESHOOTING TIPS:")
print(f"{'-'*40}")
print("If no activity detected:")
print("1. Check if MQTT broker is running: netstat -an | findstr :1883")
print("2. Verify your application is publishing to correct topics")
print("3. Check MQTT broker logs for errors")
print("4. Ensure firewall allows port 1883")
print("5. Test with MQTT client tools like mosquitto_pub/sub")
if __name__ == "__main__":
main()