43 lines
1.2 KiB
Python
43 lines
1.2 KiB
Python
"""
|
|
Test script to send traffic light status and device info to InfluxDB
|
|
"""
|
|
|
|
from utils.data_publisher import DataPublisher
|
|
import time
|
|
|
|
def test_traffic_light_and_device_data():
|
|
print("Testing traffic light status and device info publishing...")
|
|
|
|
# Initialize data publisher
|
|
publisher = DataPublisher()
|
|
|
|
# Send device info
|
|
print("Sending device info...")
|
|
publisher.publish_device_info()
|
|
|
|
# Send traffic light status
|
|
print("Sending traffic light status...")
|
|
publisher.publish_traffic_light_status("red", 0.8)
|
|
time.sleep(1)
|
|
publisher.publish_traffic_light_status("yellow", 0.9)
|
|
time.sleep(1)
|
|
publisher.publish_traffic_light_status("green", 0.7)
|
|
time.sleep(1)
|
|
|
|
# Send a test violation
|
|
print("Sending test violation...")
|
|
publisher.publish_violation_event("red_light_violation", "test_vehicle_123", {
|
|
"timestamp": time.time(),
|
|
"confidence": 0.95,
|
|
"location": "main_intersection"
|
|
})
|
|
|
|
# Wait for batch to be published
|
|
print("Waiting for data to be published...")
|
|
time.sleep(6) # Wait for batch publishing
|
|
|
|
print("✅ Test data sent. Check your Grafana dashboard now!")
|
|
|
|
if __name__ == "__main__":
|
|
test_traffic_light_and_device_data()
|