428 lines
16 KiB
Python
428 lines
16 KiB
Python
"""
|
|
Smart Intersection Scene Analytics Adapter
|
|
Adapted for desktop PySide6 application with Intel Arc GPU acceleration
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import math
|
|
import time
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from PySide6.QtCore import QObject, Signal, QTimer
|
|
from pathlib import Path
|
|
|
|
try:
|
|
from influxdb_client import InfluxDBClient, Point
|
|
INFLUXDB_AVAILABLE = True
|
|
except ImportError:
|
|
INFLUXDB_AVAILABLE = False
|
|
|
|
class SceneAnalyticsAdapter(QObject):
|
|
"""
|
|
Desktop-adapted scene analytics adapter for smart intersection functionality.
|
|
Provides scene-based analytics without external dependencies like MQTT.
|
|
"""
|
|
|
|
# Signals for desktop integration
|
|
object_detected = Signal(dict) # Emits object detection data
|
|
scene_analytics_updated = Signal(dict) # Emits scene analytics
|
|
roi_event_detected = Signal(dict) # Emits ROI-based events
|
|
|
|
def __init__(self, camera_id: str = "desktop_cam", config_path: Optional[str] = None):
|
|
super().__init__()
|
|
self.camera_id = camera_id
|
|
self.config = self._load_config(config_path)
|
|
self.logger = self._setup_logger()
|
|
|
|
# Analytics state
|
|
self.frame_count = 0
|
|
self.fps_calculator = FPSCalculator()
|
|
self.object_tracker = ObjectTracker(self.config.get('tracker', {}))
|
|
self.roi_analyzer = ROIAnalyzer()
|
|
|
|
# Performance tracking
|
|
self.processing_times = []
|
|
self.last_analytics_update = time.time()
|
|
|
|
# InfluxDB setup
|
|
self.influxdb_enabled = INFLUXDB_AVAILABLE
|
|
if self.influxdb_enabled:
|
|
try:
|
|
self.influxdb_url = "http://localhost:8086"
|
|
self.influxdb_token = "kNFfXEpPQoWrk5Tteowda21Dzv6xD3jY7QHSHHQHb5oYW6VH6mkAgX9ZMjQJkaHHa8FwzmyVFqDG7qqzxN09uQ=="
|
|
self.influxdb_org = "smart-intersection-org"
|
|
self.influxdb_bucket = "traffic_monitoring"
|
|
self.influxdb_client = InfluxDBClient(url=self.influxdb_url, token=self.influxdb_token, org=self.influxdb_org)
|
|
self.influxdb_write_api = self.influxdb_client.write_api()
|
|
print("[INFO] InfluxDB connection established successfully")
|
|
except Exception as e:
|
|
self.influxdb_enabled = False
|
|
print(f"[ERROR] Failed to initialize InfluxDB: {e}")
|
|
else:
|
|
print("[WARNING] influxdb_client not installed. InfluxDB logging disabled.")
|
|
|
|
def _load_config(self, config_path: Optional[str]) -> Dict:
|
|
"""Load configuration for scene analytics"""
|
|
if config_path and Path(config_path).exists():
|
|
with open(config_path, 'r') as f:
|
|
return json.load(f)
|
|
|
|
# Default configuration
|
|
return {
|
|
"tracker": {
|
|
"max_unreliable_frames": 10,
|
|
"non_measurement_frames_dynamic": 8,
|
|
"non_measurement_frames_static": 16,
|
|
"baseline_frame_rate": 30
|
|
},
|
|
"analytics": {
|
|
"enable_roi_detection": True,
|
|
"enable_speed_estimation": True,
|
|
"enable_direction_analysis": True
|
|
},
|
|
"performance": {
|
|
"target_fps": 30,
|
|
"max_processing_time_ms": 33
|
|
}
|
|
}
|
|
|
|
def _setup_logger(self) -> logging.Logger:
|
|
"""Setup logging for scene analytics"""
|
|
logger = logging.getLogger(f'SceneAnalytics_{self.camera_id}')
|
|
logger.setLevel(logging.INFO)
|
|
return logger
|
|
|
|
def process_frame(self, frame: np.ndarray, detections: List[Dict]) -> Dict:
|
|
"""
|
|
Process a frame with detections and generate scene analytics
|
|
|
|
Args:
|
|
frame: Input frame as numpy array
|
|
detections: List of detection dictionaries from YOLO/detection model
|
|
|
|
Returns:
|
|
Dictionary containing scene analytics data
|
|
"""
|
|
start_time = time.time()
|
|
self.frame_count += 1
|
|
|
|
# Update FPS calculation
|
|
current_fps = self.fps_calculator.update()
|
|
|
|
# Process detections into scene objects
|
|
scene_objects = self._process_detections(detections, frame.shape)
|
|
|
|
# Update object tracking
|
|
tracked_objects = self.object_tracker.update(scene_objects)
|
|
|
|
# Perform ROI analysis
|
|
roi_events = self.roi_analyzer.analyze_objects(tracked_objects, frame.shape)
|
|
|
|
# Generate analytics data
|
|
analytics_data = {
|
|
'timestamp': datetime.now().isoformat(),
|
|
'camera_id': self.camera_id,
|
|
'frame_number': self.frame_count,
|
|
'fps': current_fps,
|
|
'objects': tracked_objects,
|
|
'roi_events': roi_events,
|
|
'processing_time_ms': (time.time() - start_time) * 1000
|
|
}
|
|
|
|
# Emit signals for desktop integration
|
|
self._emit_analytics_signals(analytics_data)
|
|
|
|
# Track performance
|
|
self.processing_times.append(analytics_data['processing_time_ms'])
|
|
if len(self.processing_times) > 100:
|
|
self.processing_times.pop(0)
|
|
|
|
# Write to InfluxDB
|
|
if self.influxdb_enabled:
|
|
try:
|
|
# Performance metrics
|
|
perf_point = Point("performance") \
|
|
.tag("camera_id", self.camera_id) \
|
|
.field("fps", float(analytics_data['fps'])) \
|
|
.field("processing_time_ms", float(analytics_data['processing_time_ms'])) \
|
|
.time(datetime.utcnow())
|
|
self.influxdb_write_api.write(bucket=self.influxdb_bucket, org=self.influxdb_org, record=perf_point)
|
|
|
|
# Detection events - count vehicles and pedestrians
|
|
vehicle_count = 0
|
|
pedestrian_count = 0
|
|
for obj in tracked_objects:
|
|
if obj.get('category') == 'vehicle':
|
|
vehicle_count += 1
|
|
elif obj.get('category') == 'pedestrian':
|
|
pedestrian_count += 1
|
|
|
|
detect_point = Point("detection_events") \
|
|
.tag("camera_id", self.camera_id) \
|
|
.field("vehicle_count", vehicle_count) \
|
|
.field("pedestrian_count", pedestrian_count) \
|
|
.time(datetime.utcnow())
|
|
self.influxdb_write_api.write(bucket=self.influxdb_bucket, org=self.influxdb_org, record=detect_point)
|
|
|
|
# Log any violations
|
|
for event in roi_events:
|
|
if event.get('event_type') and 'violation' in event.get('event_type'):
|
|
violation_point = Point("violation_events") \
|
|
.tag("camera_id", self.camera_id) \
|
|
.tag("violation_type", event.get('event_type')) \
|
|
.field("count", 1) \
|
|
.time(datetime.utcnow())
|
|
self.influxdb_write_api.write(bucket=self.influxdb_bucket,
|
|
org=self.influxdb_org,
|
|
record=violation_point)
|
|
|
|
except Exception as e:
|
|
print(f"[ERROR] Failed to write to InfluxDB: {e}")
|
|
|
|
return analytics_data
|
|
|
|
def _process_detections(self, detections: List[Dict], frame_shape: Tuple[int, int, int]) -> List[Dict]:
|
|
"""Convert YOLO detections to scene objects"""
|
|
scene_objects = []
|
|
height, width = frame_shape[:2]
|
|
|
|
for i, detection in enumerate(detections):
|
|
# Extract detection data (adapt based on your detection format)
|
|
if 'bbox' in detection:
|
|
x, y, w, h = detection['bbox']
|
|
elif 'box' in detection:
|
|
x, y, w, h = detection['box']
|
|
else:
|
|
continue
|
|
|
|
confidence = detection.get('confidence', detection.get('conf', 0.0))
|
|
class_name = detection.get('class_name', detection.get('name', 'unknown'))
|
|
class_id = detection.get('class_id', detection.get('cls', 0))
|
|
|
|
# Create scene object
|
|
scene_obj = {
|
|
'id': f"{self.camera_id}_{self.frame_count}_{i}",
|
|
'category': self._map_class_to_category(class_name),
|
|
'confidence': float(confidence),
|
|
'bounding_box_px': {
|
|
'x': int(x),
|
|
'y': int(y),
|
|
'width': int(w),
|
|
'height': int(h)
|
|
},
|
|
'center_of_mass': {
|
|
'x': int(x + w/2),
|
|
'y': int(y + h/2),
|
|
'width': w/3,
|
|
'height': h/4
|
|
},
|
|
'normalized_bbox': {
|
|
'x': x / width,
|
|
'y': y / height,
|
|
'w': w / width,
|
|
'h': h / height
|
|
}
|
|
}
|
|
|
|
scene_objects.append(scene_obj)
|
|
|
|
return scene_objects
|
|
|
|
def _map_class_to_category(self, class_name: str) -> str:
|
|
"""Map detection class names to scene categories"""
|
|
person_classes = ['person', 'pedestrian', 'cyclist']
|
|
vehicle_classes = ['car', 'truck', 'bus', 'motorcycle', 'vehicle']
|
|
|
|
class_lower = class_name.lower()
|
|
|
|
if any(p in class_lower for p in person_classes):
|
|
return 'person'
|
|
elif any(v in class_lower for v in vehicle_classes):
|
|
return 'vehicle'
|
|
else:
|
|
return 'other'
|
|
|
|
def _emit_analytics_signals(self, analytics_data: Dict):
|
|
"""Emit Qt signals for desktop integration"""
|
|
try:
|
|
# Emit object detection signal
|
|
for obj in analytics_data['objects']:
|
|
self.object_detected.emit(obj)
|
|
|
|
# Emit scene analytics
|
|
self.scene_analytics_updated.emit({
|
|
'timestamp': analytics_data['timestamp'],
|
|
'camera_id': analytics_data['camera_id'],
|
|
'fps': analytics_data['fps'],
|
|
'object_count': len(analytics_data['objects']),
|
|
'processing_time_ms': analytics_data['processing_time_ms']
|
|
})
|
|
|
|
# Emit ROI events
|
|
for event in analytics_data['roi_events']:
|
|
self.roi_event_detected.emit(event)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error emitting signals: {e}")
|
|
|
|
def get_performance_stats(self) -> Dict:
|
|
"""Get performance statistics"""
|
|
if not self.processing_times:
|
|
return {}
|
|
|
|
return {
|
|
'avg_processing_time_ms': np.mean(self.processing_times),
|
|
'max_processing_time_ms': np.max(self.processing_times),
|
|
'min_processing_time_ms': np.min(self.processing_times),
|
|
'current_fps': self.fps_calculator.get_current_fps(),
|
|
'total_frames_processed': self.frame_count
|
|
}
|
|
|
|
|
|
class FPSCalculator:
|
|
"""Calculate FPS with smoothing"""
|
|
|
|
def __init__(self, alpha: float = 0.75):
|
|
self.alpha = alpha
|
|
self.fps = 30.0
|
|
self.last_time = time.time()
|
|
self.frame_count = 0
|
|
|
|
def update(self) -> float:
|
|
"""Update FPS calculation"""
|
|
current_time = time.time()
|
|
self.frame_count += 1
|
|
|
|
if self.frame_count > 1:
|
|
frame_time = current_time - self.last_time
|
|
if frame_time > 0:
|
|
instant_fps = 1.0 / frame_time
|
|
self.fps = self.fps * self.alpha + (1 - self.alpha) * instant_fps
|
|
|
|
self.last_time = current_time
|
|
return self.fps
|
|
|
|
def get_current_fps(self) -> float:
|
|
"""Get current FPS"""
|
|
return self.fps
|
|
|
|
|
|
class ObjectTracker:
|
|
"""Simple object tracker for desktop application"""
|
|
|
|
def __init__(self, config: Dict):
|
|
self.config = config
|
|
self.tracked_objects = {}
|
|
self.next_track_id = 1
|
|
self.max_unreliable_frames = config.get('max_unreliable_frames', 10)
|
|
|
|
def update(self, detections: List[Dict]) -> List[Dict]:
|
|
"""Update object tracking"""
|
|
# Simple tracking based on position proximity
|
|
tracked = []
|
|
|
|
for detection in detections:
|
|
track_id = self._find_or_create_track(detection)
|
|
detection['track_id'] = track_id
|
|
tracked.append(detection)
|
|
|
|
# Remove stale tracks
|
|
self._cleanup_stale_tracks()
|
|
|
|
return tracked
|
|
|
|
def _find_or_create_track(self, detection: Dict) -> int:
|
|
"""Find existing track or create new one"""
|
|
center = detection['center_of_mass']
|
|
best_match = None
|
|
best_distance = float('inf')
|
|
|
|
# Find closest existing track
|
|
for track_id, track_data in self.tracked_objects.items():
|
|
if track_data['category'] == detection['category']:
|
|
track_center = track_data['last_center']
|
|
distance = math.sqrt(
|
|
(center['x'] - track_center['x'])**2 +
|
|
(center['y'] - track_center['y'])**2
|
|
)
|
|
|
|
if distance < best_distance and distance < 100: # Threshold
|
|
best_distance = distance
|
|
best_match = track_id
|
|
|
|
if best_match:
|
|
# Update existing track
|
|
self.tracked_objects[best_match]['last_center'] = center
|
|
self.tracked_objects[best_match]['frames_since_detection'] = 0
|
|
return best_match
|
|
else:
|
|
# Create new track
|
|
track_id = self.next_track_id
|
|
self.next_track_id += 1
|
|
self.tracked_objects[track_id] = {
|
|
'category': detection['category'],
|
|
'last_center': center,
|
|
'frames_since_detection': 0,
|
|
'created_frame': time.time()
|
|
}
|
|
return track_id
|
|
|
|
def _cleanup_stale_tracks(self):
|
|
"""Remove tracks that haven't been updated"""
|
|
stale_tracks = []
|
|
for track_id, track_data in self.tracked_objects.items():
|
|
track_data['frames_since_detection'] += 1
|
|
if track_data['frames_since_detection'] > self.max_unreliable_frames:
|
|
stale_tracks.append(track_id)
|
|
|
|
for track_id in stale_tracks:
|
|
del self.tracked_objects[track_id]
|
|
|
|
|
|
class ROIAnalyzer:
|
|
"""Analyze objects within regions of interest"""
|
|
|
|
def __init__(self):
|
|
self.roi_definitions = {}
|
|
self.roi_events = []
|
|
|
|
def add_roi(self, roi_id: str, roi_data: Dict):
|
|
"""Add a region of interest definition"""
|
|
self.roi_definitions[roi_id] = roi_data
|
|
|
|
def analyze_objects(self, objects: List[Dict], frame_shape: Tuple[int, int, int]) -> List[Dict]:
|
|
"""Analyze objects for ROI events"""
|
|
events = []
|
|
|
|
for obj in objects:
|
|
for roi_id, roi_data in self.roi_definitions.items():
|
|
if self._object_in_roi(obj, roi_data, frame_shape):
|
|
event = {
|
|
'type': 'object_in_roi',
|
|
'roi_id': roi_id,
|
|
'object_id': obj.get('track_id', obj['id']),
|
|
'object_category': obj['category'],
|
|
'timestamp': datetime.now().isoformat(),
|
|
'confidence': obj['confidence']
|
|
}
|
|
events.append(event)
|
|
|
|
return events
|
|
|
|
def _object_in_roi(self, obj: Dict, roi_data: Dict, frame_shape: Tuple[int, int, int]) -> bool:
|
|
"""Check if object is within ROI"""
|
|
# Simple rectangular ROI check
|
|
if roi_data.get('type') == 'rectangle':
|
|
obj_center = obj['center_of_mass']
|
|
roi_rect = roi_data.get('coordinates', {})
|
|
|
|
return (roi_rect.get('x', 0) <= obj_center['x'] <= roi_rect.get('x', 0) + roi_rect.get('width', 0) and
|
|
roi_rect.get('y', 0) <= obj_center['y'] <= roi_rect.get('y', 0) + roi_rect.get('height', 0))
|
|
|
|
return False
|