""" Test script to verify InfluxDB bucket exists """ from influxdb_client import InfluxDBClient def check_buckets(): # InfluxDB setup url = "http://localhost:8086" token = "kNFfXEpPQoWrk5Tteowda21Dzv6xD3jY7QHSHHQHb5oYW6VH6mkAgX9ZMjQJkaHHa8FwzmyVFqDG7qqzxN09uQ==" org = "smart-intersection-org" try: print(f"Connecting to InfluxDB at {url}...") client = InfluxDBClient(url=url, token=token, org=org) # List all buckets buckets_api = client.buckets_api() buckets = buckets_api.find_buckets().buckets print(f"Found {len(buckets)} buckets:") for bucket in buckets: print(f"- {bucket.name}") # Check if our bucket exists if any(b.name == "traffic_monitoring" for b in buckets): print("\nāœ… 'traffic_monitoring' bucket exists!") else: print("\nāŒ 'traffic_monitoring' bucket NOT found!") print("Creating 'traffic_monitoring' bucket...") buckets_api.create_bucket(bucket_name="traffic_monitoring", org=org) print("Bucket created successfully.") except Exception as e: print(f"āŒ Error: {e}") if __name__ == "__main__": check_buckets()