Files
Traffic-Intersection-Monito…/qt_app_pyside1/test_influxdb_connection.py
2025-08-26 13:24:53 -07:00

89 lines
3.1 KiB
Python

"""
Test script to verify InfluxDB connection and write a test point
"""
from influxdb_client import InfluxDBClient, Point, WritePrecision
from datetime import datetime
import time
def test_influxdb_connection():
# InfluxDB setup
url = "http://localhost:8086"
token = "kNFfXEpPQoWrk5Tteowda21Dzv6xD3jY7QHSHHQHb5oYW6VH6mkAgX9ZMjQJkaHHa8FwzmyVFqDG7qqzxN09uQ=="
org = "smart-intersection-org"
bucket = "traffic_monitoring"
try:
print(f"Connecting to InfluxDB at {url}...")
client = InfluxDBClient(url=url, token=token, org=org)
# Check if the connection is successful
health = client.health()
print(f"InfluxDB health: {health.status}")
# Write test point
write_api = client.write_api()
print("Writing test data points to InfluxDB...")
# Create 10 test points with timestamps spaced a few seconds apart
for i in range(10):
# Performance metrics
point = Point("performance") \
.tag("camera_id", "test_camera") \
.field("fps", 25.0 + i) \
.field("processing_time_ms", 40.0 - i) \
.field("gpu_usage", 30.0 + i*2) \
.time(datetime.utcnow())
write_api.write(bucket=bucket, org=org, record=point)
# Detection events
detect_point = Point("detection_events") \
.tag("camera_id", "test_camera") \
.field("vehicle_count", 5 + i) \
.field("pedestrian_count", 2 + (i % 3)) \
.time(datetime.utcnow())
write_api.write(bucket=bucket, org=org, record=detect_point)
# Violation events (every 3rd point)
if i % 3 == 0:
violation_point = Point("violation_events") \
.tag("camera_id", "test_camera") \
.tag("violation_type", "red_light") \
.field("count", 1) \
.time(datetime.utcnow())
write_api.write(bucket=bucket, org=org, record=violation_point)
print(f"Written data point {i+1}/10")
time.sleep(1) # Space out the points
print("✓ Test data written successfully!")
print("Please check your Grafana dashboard now. Data should be visible.")
# Try to read data to confirm it was written
query_api = client.query_api()
query = f'''
from(bucket: "{bucket}")
|> range(start: -2m)
|> filter(fn: (r) => r._measurement == "performance")
'''
result = query_api.query(query)
print("\nReading back data:")
if len(result) == 0:
print("No data found in the database!")
else:
for table in result:
for record in table.records:
print(f"- {record.get_field()}: {record.get_value()}")
except Exception as e:
print(f"❌ Error: {e}")
if __name__ == "__main__":
test_influxdb_connection()