39 lines
1.2 KiB
Python
39 lines
1.2 KiB
Python
"""
|
|
Test script to verify InfluxDB bucket exists
|
|
"""
|
|
|
|
from influxdb_client import InfluxDBClient
|
|
|
|
def check_buckets():
|
|
# InfluxDB setup
|
|
url = "http://localhost:8086"
|
|
token = "kNFfXEpPQoWrk5Tteowda21Dzv6xD3jY7QHSHHQHb5oYW6VH6mkAgX9ZMjQJkaHHa8FwzmyVFqDG7qqzxN09uQ=="
|
|
org = "smart-intersection-org"
|
|
|
|
try:
|
|
print(f"Connecting to InfluxDB at {url}...")
|
|
client = InfluxDBClient(url=url, token=token, org=org)
|
|
|
|
# List all buckets
|
|
buckets_api = client.buckets_api()
|
|
buckets = buckets_api.find_buckets().buckets
|
|
|
|
print(f"Found {len(buckets)} buckets:")
|
|
for bucket in buckets:
|
|
print(f"- {bucket.name}")
|
|
|
|
# Check if our bucket exists
|
|
if any(b.name == "traffic_monitoring" for b in buckets):
|
|
print("\n✅ 'traffic_monitoring' bucket exists!")
|
|
else:
|
|
print("\n❌ 'traffic_monitoring' bucket NOT found!")
|
|
print("Creating 'traffic_monitoring' bucket...")
|
|
buckets_api.create_bucket(bucket_name="traffic_monitoring", org=org)
|
|
print("Bucket created successfully.")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
check_buckets()
|