#!/usr/bin/env python3 """ InfluxDB Status Checker for Smart Intersection System Checks databases, measurements, query performance, and data freshness """ from influxdb_client import InfluxDBClient, Point, WritePrecision from influxdb_client.client.write_api import SYNCHRONOUS import time import json from datetime import datetime, timedelta class InfluxDBStatusChecker: def __init__(self, url="http://localhost:8086", token="smart-intersection-super-secret-token", org="smart-intersection-org"): self.url = url self.token = token self.org = org self.client = None self.buckets = [] self.measurements_info = {} def connect(self): """Connect to InfluxDB and verify health""" try: print(f"๐Ÿ”Œ Connecting to InfluxDB at {self.url}...") self.client = InfluxDBClient(url=self.url, token=self.token, org=self.org) # Check health health = self.client.health() print(f"โœ… InfluxDB Health Status: {health.status}") print(f"๐Ÿ“… Server Time: {health.checks[0].time if health.checks else 'Unknown'}") return True except Exception as e: print(f"โŒ Failed to connect to InfluxDB: {e}") return False def get_buckets_info(self): """Get information about all buckets (databases)""" try: buckets_api = self.client.buckets_api() buckets = buckets_api.find_buckets() print(f"\n๐Ÿ“Š INFLUXDB BUCKETS (DATABASES):") print(f"{'='*50}") for bucket in buckets.buckets: self.buckets.append(bucket.name) print(f"๐Ÿ—„๏ธ Bucket: {bucket.name}") print(f" ID: {bucket.id}") print(f" Organization: {bucket.org_id}") print(f" Created: {bucket.created_at}") print(f" Retention: {bucket.retention_rules[0].every_seconds if bucket.retention_rules else 'Forever'} seconds") print() except Exception as e: print(f"โŒ Error getting buckets: {e}") def check_measurements_and_data(self, bucket_name="traffic_monitoring"): """Check measurements and data freshness in specified bucket""" try: query_api = self.client.query_api() print(f"\n๐Ÿ“‹ MEASUREMENTS IN BUCKET '{bucket_name}':") print(f"{'='*50}") # Get all measurements measurements_query = f''' import "influxdata/influxdb/schema" schema.measurements(bucket: "{bucket_name}") ''' result = query_api.query(measurements_query) measurements = [] for table in result: for record in table.records: measurement = record.get_value() if measurement not in measurements: measurements.append(measurement) if not measurements: print("โŒ No measurements found in bucket") print(" This indicates no data has been written yet") return # Check each measurement for measurement in measurements: print(f"\n๐Ÿ“Š Measurement: {measurement}") self.check_measurement_details(bucket_name, measurement) except Exception as e: print(f"โŒ Error checking measurements: {e}") def check_measurement_details(self, bucket_name, measurement): """Get detailed information about a specific measurement""" try: query_api = self.client.query_api() # Get count of records count_query = f''' from(bucket: "{bucket_name}") |> range(start: -24h) |> filter(fn: (r) => r._measurement == "{measurement}") |> count() ''' result = query_api.query(count_query) total_records = 0 for table in result: for record in table.records: total_records += record.get_value() # Get latest record latest_query = f''' from(bucket: "{bucket_name}") |> range(start: -24h) |> filter(fn: (r) => r._measurement == "{measurement}") |> last() ''' result = query_api.query(latest_query) latest_time = "No data" latest_fields = [] for table in result: for record in table.records: latest_time = record.get_time().strftime("%Y-%m-%d %H:%M:%S") field_info = f"{record.get_field()}: {record.get_value()}" if field_info not in latest_fields: latest_fields.append(field_info) # Get field schema fields_query = f''' import "influxdata/influxdb/schema" schema.measurementFieldKeys( bucket: "{bucket_name}", measurement: "{measurement}" ) ''' result = query_api.query(fields_query) fields = [] for table in result: for record in table.records: fields.append(record.get_value()) print(f" ๐Ÿ“ˆ Records (24h): {total_records}") print(f" ๐Ÿ• Latest Write: {latest_time}") print(f" ๐Ÿท๏ธ Fields: {', '.join(fields)}") if latest_fields: print(f" ๐Ÿ“Š Latest Values:") for field in latest_fields[:5]: # Show first 5 fields print(f" {field}") # Calculate data freshness if latest_time != "No data": try: latest_dt = datetime.strptime(latest_time, "%Y-%m-%d %H:%M:%S") freshness = datetime.now() - latest_dt if freshness.total_seconds() < 300: # 5 minutes print(f" โœ… Data Freshness: FRESH ({freshness})") elif freshness.total_seconds() < 3600: # 1 hour print(f" โš ๏ธ Data Freshness: STALE ({freshness})") else: print(f" โŒ Data Freshness: OLD ({freshness})") except: print(f" โ“ Data Freshness: Unknown") except Exception as e: print(f" โŒ Error getting measurement details: {e}") def test_query_performance(self, bucket_name="traffic_monitoring"): """Test query performance with various queries""" print(f"\nโšก QUERY PERFORMANCE TESTS:") print(f"{'='*40}") queries = [ { "name": "Simple Count (1h)", "query": f''' from(bucket: "{bucket_name}") |> range(start: -1h) |> count() ''' }, { "name": "Aggregation (24h)", "query": f''' from(bucket: "{bucket_name}") |> range(start: -24h) |> aggregateWindow(every: 1h, fn: mean) ''' }, { "name": "Latest Values", "query": f''' from(bucket: "{bucket_name}") |> range(start: -1h) |> last() ''' } ] query_api = self.client.query_api() for test in queries: try: start_time = time.time() result = query_api.query(test["query"]) end_time = time.time() response_time = (end_time - start_time) * 1000 # Convert to ms record_count = sum(len(table.records) for table in result) print(f"๐Ÿ” {test['name']}:") print(f" Response Time: {response_time:.2f} ms") print(f" Records Returned: {record_count}") if response_time < 100: print(f" Performance: โœ… EXCELLENT") elif response_time < 500: print(f" Performance: โœ… GOOD") elif response_time < 2000: print(f" Performance: โš ๏ธ ACCEPTABLE") else: print(f" Performance: โŒ SLOW") print() except Exception as e: print(f"โŒ Query failed: {e}") def write_test_data(self, bucket_name="traffic_monitoring"): """Write test data to verify write capability""" print(f"\n๐Ÿงช TESTING WRITE CAPABILITY:") print(f"{'-'*30}") try: write_api = self.client.write_api(write_options=SYNCHRONOUS) # Create test point test_point = Point("test_measurement") \ .tag("test_type", "connectivity") \ .tag("component", "status_checker") \ .field("value", 1) \ .field("timestamp_ms", int(time.time() * 1000)) \ .time(datetime.utcnow(), WritePrecision.NS) start_time = time.time() write_api.write(bucket=bucket_name, org=self.org, record=test_point) write_time = (time.time() - start_time) * 1000 print(f"โœ… Write successful!") print(f"โฑ๏ธ Write Time: {write_time:.2f} ms") # Verify by reading back time.sleep(1) # Wait for data to be indexed query = f''' from(bucket: "{bucket_name}") |> range(start: -1m) |> filter(fn: (r) => r._measurement == "test_measurement") |> last() ''' result = self.client.query_api().query(query) if any(table.records for table in result): print(f"โœ… Data verified in database") else: print(f"โš ๏ธ Data written but not immediately queryable") except Exception as e: print(f"โŒ Write test failed: {e}") def generate_status_report(self): """Generate comprehensive status report""" print(f"\n๐Ÿ“‹ INFLUXDB COMPREHENSIVE STATUS REPORT") print(f"{'='*60}") print(f"Report Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"Server URL: {self.url}") print(f"Organization: {self.org}") if not self.connect(): print("โŒ Cannot generate report - connection failed") return # Get buckets information self.get_buckets_info() # Check main traffic monitoring bucket if "traffic_monitoring" in self.buckets: self.check_measurements_and_data("traffic_monitoring") self.test_query_performance("traffic_monitoring") else: print("โš ๏ธ 'traffic_monitoring' bucket not found") print(" Creating bucket and writing test data...") self.create_default_bucket() # Test write capability self.write_test_data() # Connection cleanup if self.client: self.client.close() def create_default_bucket(self): """Create default bucket if it doesn't exist""" try: buckets_api = self.client.buckets_api() bucket = buckets_api.create_bucket( bucket_name="traffic_monitoring", org=self.org, retention_rules=[] # Infinite retention ) print(f"โœ… Created bucket: {bucket.name}") self.buckets.append("traffic_monitoring") except Exception as e: print(f"โŒ Failed to create bucket: {e}") def main(): """Main function to run InfluxDB status check""" print("๐Ÿšฆ SMART INTERSECTION - INFLUXDB STATUS CHECKER") print("="*60) # Default connection settings - modify if needed url = "http://localhost:8086" token = "smart-intersection-super-secret-token" # Your actual token org = "smart-intersection-org" print(f"๐Ÿ“ก Checking InfluxDB at: {url}") print(f"๐Ÿข Organization: {org}") checker = InfluxDBStatusChecker(url, token, org) checker.generate_status_report() print(f"\n๐Ÿ’ก TROUBLESHOOTING TIPS:") print(f"{'-'*40}") print("If connection fails:") print("1. Check if InfluxDB is running: netstat -an | findstr :8086") print("2. Verify token and organization name") print("3. Check InfluxDB logs for errors") print("4. Ensure firewall allows port 8086") print("5. Try accessing web UI: http://localhost:8086") if __name__ == "__main__": main()