""" Send test data to verify data flow after schema fix """ from influxdb_client import InfluxDBClient, Point from datetime import datetime import time def send_test_data(): # InfluxDB configuration url = "http://localhost:8086" token = "kNFfXEpPQoWrk5Tteowda21Dzv6xD3jY7QHSHHQHb5oYW6VH6mkAgX9ZMjQJkaHHa8FwzmyVFqDG7qqzxN09uQ==" org = "smart-intersection-org" bucket = "traffic_monitoring" try: print("Connecting to InfluxDB...") client = InfluxDBClient(url=url, token=token, org=org) write_api = client.write_api() print("Sending fresh test data...") # Current timestamp now = datetime.utcnow() # Device info device_point = Point("device_info") \ .tag("device_id", "smart_intersection_001") \ .tag("camera_id", "camera_001") \ .field("status", "online") \ .field("location", "Main Intersection") \ .field("version", "1.0.0") \ .field("uptime_seconds", 7200.0) \ .time(now) # Traffic light status - RED traffic_red_point = Point("traffic_light_status") \ .tag("device_id", "smart_intersection_001") \ .tag("camera_id", "camera_001") \ .tag("color", "red") \ .field("color_numeric", 1) \ .field("confidence", 0.92) \ .time(now) # Red light violation violation_point = Point("violation_events") \ .tag("device_id", "smart_intersection_001") \ .tag("camera_id", "camera_001") \ .tag("violation_type", "red_light_violation") \ .field("vehicle_id", "vehicle_123") \ .field("count", 1) \ .field("confidence", 0.88) \ .time(now) # Write points points = [device_point, traffic_red_point, violation_point] write_api.write(bucket=bucket, org=org, record=points) print("✅ Test data sent successfully!") print("✅ Device info: online status") print("✅ Traffic light: RED detected") print("✅ Violation: red light violation recorded") print("\nRefresh your Grafana dashboard to see the data!") # Send a few more data points with different states time.sleep(2) # Traffic light GREEN traffic_green_point = Point("traffic_light_status") \ .tag("device_id", "smart_intersection_001") \ .tag("camera_id", "camera_001") \ .tag("color", "green") \ .field("color_numeric", 3) \ .field("confidence", 0.95) \ .time(datetime.utcnow()) write_api.write(bucket=bucket, org=org, record=traffic_green_point) print("✅ Additional GREEN light status sent") except Exception as e: print(f"❌ Error sending test data: {e}") finally: if 'client' in locals(): client.close() if __name__ == "__main__": send_test_data()