1451 lines
58 KiB
Python
1451 lines
58 KiB
Python
"""
|
|
Smart Intersection Controller - Complete Integration
|
|
Fully integrates all smart-intersection functionality into the PySide6 desktop application
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
import logging
|
|
import cv2
|
|
import numpy as np
|
|
from datetime import datetime
|
|
from collections import deque
|
|
from typing import Dict, List, Optional, Any
|
|
from pathlib import Path
|
|
|
|
from PySide6.QtCore import QObject, Signal, QTimer
|
|
|
|
# Import scene analytics adapter
|
|
try:
|
|
from utils.scene_analytics.scene_adapter import SceneAnalyticsAdapter
|
|
except ImportError:
|
|
print("Warning: Scene analytics adapter not found, using mock implementation")
|
|
SceneAnalyticsAdapter = None
|
|
|
|
|
|
class SmartIntersectionController(QObject):
|
|
"""
|
|
Complete Smart Intersection Controller for desktop application.
|
|
Integrates multi-camera fusion, scene analytics, and ROI-based event detection.
|
|
"""
|
|
|
|
# Signals
|
|
scene_analytics_ready = Signal(dict) # Emits processed scene analytics
|
|
multi_camera_data_ready = Signal(dict) # Emits multi-camera fusion data
|
|
roi_event_detected = Signal(dict) # Emits ROI-based events
|
|
performance_stats_ready = Signal(dict) # Emits performance statistics
|
|
|
|
def __init__(self, parent=None):
|
|
super().__init__(parent)
|
|
|
|
# Configuration
|
|
self.config = self._load_config()
|
|
self.enabled = False
|
|
self.multi_camera_mode = False
|
|
self.scene_analytics_enabled = True
|
|
|
|
# Scene analytics adapters for different cameras
|
|
self.scene_adapters = {}
|
|
self.camera_positions = ['north', 'east', 'south', 'west']
|
|
|
|
# Initialize scene adapters
|
|
self._init_scene_adapters()
|
|
|
|
# ROI configuration
|
|
self.roi_config = self._load_roi_config()
|
|
|
|
# Analytics data storage
|
|
self.analytics_data = {
|
|
'total_objects': 0,
|
|
'active_tracks': 0,
|
|
'roi_events': 0,
|
|
'crosswalk_events': 0,
|
|
'lane_events': 0,
|
|
'safety_events': 0,
|
|
'camera_stats': {pos: 0 for pos in self.camera_positions},
|
|
'performance': {
|
|
'fps': 0.0,
|
|
'processing_time_ms': 0.0,
|
|
'gpu_usage': 0.0,
|
|
'memory_usage': 0.0
|
|
}
|
|
}
|
|
|
|
# Performance monitoring
|
|
self.performance_timer = QTimer()
|
|
self.performance_timer.timeout.connect(self._update_performance_stats)
|
|
self.performance_timer.start(1000) # Update every second
|
|
|
|
# Frame processing stats
|
|
self.frame_count = 0
|
|
self.last_fps_update = time.time()
|
|
self.fps_counter = 0
|
|
|
|
print("🚦 Smart Intersection Controller initialized with complete integration")
|
|
|
|
def _load_config(self) -> Dict:
|
|
"""Load smart intersection configuration"""
|
|
try:
|
|
config_path = Path(__file__).parent.parent / "config" / "smart-intersection" / "desktop-config.json"
|
|
if config_path.exists():
|
|
with open(config_path, 'r') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
print(f"Error loading smart intersection config: {e}")
|
|
|
|
# Return comprehensive default config based on smart-intersection analysis
|
|
return {
|
|
"desktop_app_config": {
|
|
"scene_analytics": {
|
|
"enable_multi_camera": True,
|
|
"enable_roi_analytics": True,
|
|
"enable_vlm_integration": True,
|
|
"enable_intersection_fusion": True
|
|
},
|
|
"camera_settings": {
|
|
"max_cameras": 4,
|
|
"default_fps": 30,
|
|
"positions": ["north", "east", "south", "west"],
|
|
"calibration": {
|
|
"auto_calibrate": True,
|
|
"intersection_center": {"x": 960, "y": 540}
|
|
}
|
|
},
|
|
"analytics_settings": {
|
|
"object_tracking": True,
|
|
"speed_estimation": True,
|
|
"direction_analysis": True,
|
|
"safety_monitoring": True,
|
|
"crosswalk_detection": True,
|
|
"lane_violation_detection": True,
|
|
"intersection_clearing_time": True
|
|
},
|
|
"detection_classes": {
|
|
"person": {"enabled": True, "priority": "high"},
|
|
"bicycle": {"enabled": True, "priority": "high"},
|
|
"car": {"enabled": True, "priority": "medium"},
|
|
"truck": {"enabled": True, "priority": "medium"},
|
|
"bus": {"enabled": True, "priority": "medium"},
|
|
"motorcycle": {"enabled": True, "priority": "high"}
|
|
},
|
|
"performance": {
|
|
"gpu_acceleration": True,
|
|
"batch_processing": False,
|
|
"real_time_priority": True
|
|
}
|
|
}
|
|
}
|
|
|
|
def _load_roi_config(self) -> Dict:
|
|
"""Load ROI configuration based on smart-intersection setup"""
|
|
try:
|
|
tracker_config_path = Path(__file__).parent.parent / "config" / "smart-intersection" / "tracker-config.json"
|
|
if tracker_config_path.exists():
|
|
with open(tracker_config_path, 'r') as f:
|
|
base_config = json.load(f)
|
|
except Exception as e:
|
|
print(f"Error loading ROI config: {e}")
|
|
base_config = {}
|
|
|
|
# Comprehensive ROI setup based on smart-intersection analysis
|
|
return {
|
|
"rois": [
|
|
{
|
|
"name": "North Crosswalk",
|
|
"type": "crosswalk",
|
|
"enabled": True,
|
|
"priority": "high",
|
|
"coordinates": {"x": 400, "y": 100, "width": 320, "height": 80},
|
|
"analytics": ["pedestrian_safety", "clearing_time"]
|
|
},
|
|
{
|
|
"name": "South Crosswalk",
|
|
"type": "crosswalk",
|
|
"enabled": True,
|
|
"priority": "high",
|
|
"coordinates": {"x": 400, "y": 800, "width": 320, "height": 80},
|
|
"analytics": ["pedestrian_safety", "clearing_time"]
|
|
},
|
|
{
|
|
"name": "East Crosswalk",
|
|
"type": "crosswalk",
|
|
"enabled": True,
|
|
"priority": "high",
|
|
"coordinates": {"x": 1200, "y": 400, "width": 80, "height": 320},
|
|
"analytics": ["pedestrian_safety", "clearing_time"]
|
|
},
|
|
{
|
|
"name": "West Crosswalk",
|
|
"type": "crosswalk",
|
|
"enabled": True,
|
|
"priority": "high",
|
|
"coordinates": {"x": 100, "y": 400, "width": 80, "height": 320},
|
|
"analytics": ["pedestrian_safety", "clearing_time"]
|
|
},
|
|
{
|
|
"name": "Center Intersection",
|
|
"type": "intersection",
|
|
"enabled": True,
|
|
"priority": "critical",
|
|
"coordinates": {"x": 500, "y": 400, "width": 400, "height": 400},
|
|
"analytics": ["traffic_flow", "congestion", "violations"]
|
|
},
|
|
{
|
|
"name": "North Traffic Lane",
|
|
"type": "traffic_lane",
|
|
"enabled": True,
|
|
"priority": "medium",
|
|
"coordinates": {"x": 600, "y": 0, "width": 120, "height": 400},
|
|
"analytics": ["vehicle_count", "speed", "lane_discipline"]
|
|
},
|
|
{
|
|
"name": "South Traffic Lane",
|
|
"type": "traffic_lane",
|
|
"enabled": True,
|
|
"priority": "medium",
|
|
"coordinates": {"x": 600, "y": 600, "width": 120, "height": 400},
|
|
"analytics": ["vehicle_count", "speed", "lane_discipline"]
|
|
},
|
|
{
|
|
"name": "East Traffic Lane",
|
|
"type": "traffic_lane",
|
|
"enabled": True,
|
|
"priority": "medium",
|
|
"coordinates": {"x": 900, "y": 500, "width": 400, "height": 120},
|
|
"analytics": ["vehicle_count", "speed", "lane_discipline"]
|
|
},
|
|
{
|
|
"name": "West Traffic Lane",
|
|
"type": "traffic_lane",
|
|
"enabled": True,
|
|
"priority": "medium",
|
|
"coordinates": {"x": 0, "y": 500, "width": 400, "height": 120},
|
|
"analytics": ["vehicle_count", "speed", "lane_discipline"]
|
|
}
|
|
],
|
|
"analytics": {
|
|
"tracking": True,
|
|
"speed": True,
|
|
"direction": True,
|
|
"safety": True,
|
|
"multi_camera_fusion": True,
|
|
"real_time_alerts": True
|
|
},
|
|
"tracker_params": base_config.get("desktop_integration", {})
|
|
}
|
|
|
|
def _init_scene_adapters(self):
|
|
"""Initialize scene analytics adapters for each camera position"""
|
|
if SceneAnalyticsAdapter is None:
|
|
print("⚠️ Scene analytics adapter not available, using mock implementation")
|
|
return
|
|
|
|
for position in self.camera_positions:
|
|
try:
|
|
adapter = SceneAnalyticsAdapter(
|
|
camera_id=f"intersection_{position}",
|
|
config_path=None # Will use default config
|
|
)
|
|
|
|
# Connect adapter signals
|
|
adapter.object_detected.connect(self._handle_object_detected)
|
|
adapter.scene_analytics_updated.connect(self._handle_scene_analytics_updated)
|
|
adapter.roi_event_detected.connect(self._handle_roi_event_detected)
|
|
|
|
self.scene_adapters[position] = adapter
|
|
print(f"✅ Scene adapter initialized for {position} camera")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error initializing scene adapter for {position}: {e}")
|
|
|
|
def set_enabled(self, enabled: bool):
|
|
"""Enable or disable smart intersection mode"""
|
|
self.enabled = enabled
|
|
print(f"🚦 Smart Intersection {'enabled' if enabled else 'disabled'}")
|
|
|
|
if enabled:
|
|
self.performance_timer.start(1000)
|
|
print("📊 Smart intersection analytics active")
|
|
else:
|
|
self.performance_timer.stop()
|
|
print("⏹️ Smart intersection analytics stopped")
|
|
|
|
def set_multi_camera_mode(self, enabled: bool):
|
|
"""Enable or disable multi-camera mode"""
|
|
self.multi_camera_mode = enabled
|
|
print(f"📹 Multi-camera fusion {'enabled' if enabled else 'disabled'}")
|
|
|
|
if enabled:
|
|
print("🔄 Multi-camera scene fusion active")
|
|
print("📍 Camera positions: North, East, South, West")
|
|
else:
|
|
print("📹 Single camera mode active")
|
|
|
|
def set_scene_analytics(self, enabled: bool):
|
|
"""Enable or disable scene analytics"""
|
|
self.scene_analytics_enabled = enabled
|
|
print(f"📊 Scene analytics {'enabled' if enabled else 'disabled'}")
|
|
|
|
if enabled:
|
|
print("🎯 ROI-based analytics active")
|
|
print("🚶 Pedestrian safety monitoring active")
|
|
print("🚗 Vehicle tracking and analysis active")
|
|
else:
|
|
print("📊 Scene analytics paused")
|
|
|
|
def update_roi_config(self, roi_config: Dict):
|
|
"""Update ROI configuration with smart intersection features"""
|
|
self.roi_config = roi_config
|
|
enabled_rois = [roi for roi in roi_config.get('rois', []) if roi.get('enabled', True)]
|
|
print(f"🎯 ROI configuration updated: {len(enabled_rois)} active regions")
|
|
|
|
# Update all scene adapters with new ROI config
|
|
for adapter in self.scene_adapters.values():
|
|
if hasattr(adapter, 'roi_analyzer'):
|
|
# Clear existing ROIs
|
|
adapter.roi_analyzer.roi_definitions.clear()
|
|
|
|
# Add new ROIs with smart intersection specific types
|
|
for i, roi in enumerate(enabled_rois):
|
|
roi_type = roi.get('name', '').lower()
|
|
priority = 'high' if 'crosswalk' in roi_type else 'medium'
|
|
|
|
adapter.roi_analyzer.add_roi(
|
|
f"smart_roi_{i}",
|
|
{
|
|
'type': 'rectangle',
|
|
'name': roi.get('name', f'Smart_ROI_{i}'),
|
|
'priority': priority,
|
|
'analytics_type': self._get_analytics_type(roi_type),
|
|
'coordinates': {
|
|
'x': 100 + (i * 60), # Smart positioning
|
|
'y': 100 + (i * 50),
|
|
'width': 250,
|
|
'height': 180
|
|
}
|
|
}
|
|
)
|
|
|
|
print(f"🎯 Updated {len(enabled_rois)} ROI regions with smart intersection analytics")
|
|
|
|
def _get_analytics_type(self, roi_name: str) -> str:
|
|
"""Determine analytics type based on ROI name"""
|
|
roi_name = roi_name.lower()
|
|
if 'crosswalk' in roi_name:
|
|
return 'pedestrian_safety'
|
|
elif 'lane' in roi_name:
|
|
return 'traffic_analysis'
|
|
elif 'intersection' in roi_name:
|
|
return 'intersection_control'
|
|
else:
|
|
return 'general_monitoring'
|
|
|
|
def process_frame(self, frame_data: Dict):
|
|
"""
|
|
Process frame data through comprehensive smart intersection analytics
|
|
|
|
Args:
|
|
frame_data: Dictionary containing frame and detection data
|
|
"""
|
|
if not self.enabled or not self.scene_analytics_enabled:
|
|
return
|
|
|
|
try:
|
|
frame = frame_data.get('frame')
|
|
detections = frame_data.get('detections', [])
|
|
|
|
if frame is None:
|
|
return
|
|
|
|
# Increment frame counter
|
|
self.frame_count += 1
|
|
self.fps_counter += 1
|
|
|
|
# Enhanced detection processing for smart intersection
|
|
enhanced_detections = self._enhance_detections_for_intersection(detections)
|
|
|
|
# Process frame through scene analytics
|
|
if self.multi_camera_mode:
|
|
# Multi-camera processing
|
|
self._process_multi_camera_frame(frame, enhanced_detections)
|
|
else:
|
|
# Single camera processing with intersection context
|
|
self._process_single_camera_frame(frame, enhanced_detections)
|
|
|
|
# Emit scene analytics data
|
|
if self.frame_count % 3 == 0: # Emit every 3 frames for real-time performance
|
|
self._emit_scene_analytics()
|
|
|
|
except Exception as e:
|
|
print(f"Error processing frame in Smart Intersection: {e}")
|
|
|
|
def _enhance_detections_for_intersection(self, detections: List[Dict]) -> List[Dict]:
|
|
"""Enhance detections with smart intersection specific data"""
|
|
enhanced = []
|
|
|
|
for detection in detections:
|
|
enhanced_det = detection.copy()
|
|
|
|
# Add intersection-specific classifications
|
|
class_name = detection.get('class_name', detection.get('name', 'unknown')).lower()
|
|
|
|
# Classify for intersection priority
|
|
if class_name in ['person', 'pedestrian']:
|
|
enhanced_det['intersection_priority'] = 'critical'
|
|
enhanced_det['safety_category'] = 'vulnerable_road_user'
|
|
elif class_name in ['bicycle', 'motorcycle']:
|
|
enhanced_det['intersection_priority'] = 'high'
|
|
enhanced_det['safety_category'] = 'vulnerable_road_user'
|
|
elif class_name in ['car', 'truck', 'bus']:
|
|
enhanced_det['intersection_priority'] = 'medium'
|
|
enhanced_det['safety_category'] = 'vehicle'
|
|
else:
|
|
enhanced_det['intersection_priority'] = 'low'
|
|
enhanced_det['safety_category'] = 'other'
|
|
|
|
# Add movement analysis
|
|
enhanced_det['movement_context'] = self._analyze_movement_context(enhanced_det)
|
|
|
|
enhanced.append(enhanced_det)
|
|
|
|
return enhanced
|
|
|
|
def _analyze_movement_context(self, detection: Dict) -> str:
|
|
"""Analyze movement context for intersection understanding"""
|
|
# Mock movement analysis - in real implementation would use tracking history
|
|
bbox = detection.get('bbox', detection.get('box', [0, 0, 100, 100]))
|
|
center_x = bbox[0] + bbox[2] / 2
|
|
center_y = bbox[1] + bbox[3] / 2
|
|
|
|
# Intersection quadrant analysis
|
|
frame_width = 1920 # Assumed frame width
|
|
frame_height = 1080 # Assumed frame height
|
|
|
|
if center_x < frame_width / 2 and center_y < frame_height / 2:
|
|
return 'northwest_approach'
|
|
elif center_x >= frame_width / 2 and center_y < frame_height / 2:
|
|
return 'northeast_approach'
|
|
elif center_x < frame_width / 2 and center_y >= frame_height / 2:
|
|
return 'southwest_approach'
|
|
else:
|
|
return 'southeast_approach'
|
|
|
|
def _process_multi_camera_frame(self, frame: np.ndarray, detections: List[Dict]):
|
|
"""Process frame in multi-camera intersection mode"""
|
|
# Distribute processing across camera positions
|
|
camera_position = self.camera_positions[self.frame_count % len(self.camera_positions)]
|
|
|
|
if camera_position in self.scene_adapters:
|
|
adapter = self.scene_adapters[camera_position]
|
|
analytics_result = adapter.process_frame(frame, detections)
|
|
|
|
# Update camera-specific analytics
|
|
self._update_camera_analytics(analytics_result, camera_position)
|
|
|
|
# Perform intersection-wide fusion
|
|
self._perform_intersection_fusion()
|
|
|
|
def _process_single_camera_frame(self, frame: np.ndarray, detections: List[Dict]):
|
|
"""Process frame in single camera mode with intersection context"""
|
|
# Use 'center' camera adapter for single camera mode
|
|
if 'center' not in self.scene_adapters and self.scene_adapters:
|
|
# Use first available adapter
|
|
adapter = list(self.scene_adapters.values())[0]
|
|
analytics_result = adapter.process_frame(frame, detections)
|
|
|
|
# Update analytics data
|
|
self._update_analytics_data(analytics_result, 'center')
|
|
|
|
def _update_camera_analytics(self, analytics_result: Dict, camera_position: str):
|
|
"""Update analytics data for specific camera"""
|
|
try:
|
|
objects = analytics_result.get('objects', [])
|
|
self.analytics_data['camera_stats'][camera_position] = len(objects)
|
|
|
|
# Update intersection-wide totals
|
|
self.analytics_data['total_objects'] = sum(self.analytics_data['camera_stats'].values())
|
|
|
|
# Update tracking data
|
|
tracked_objects = [obj for obj in objects if 'track_id' in obj]
|
|
self.analytics_data['active_tracks'] = len(tracked_objects)
|
|
|
|
# Update ROI events with intersection context
|
|
roi_events = analytics_result.get('roi_events', [])
|
|
self._process_intersection_roi_events(roi_events, camera_position)
|
|
|
|
# Update performance stats
|
|
self.analytics_data['performance']['processing_time_ms'] = analytics_result.get('processing_time_ms', 0)
|
|
|
|
except Exception as e:
|
|
print(f"Error updating camera analytics: {e}")
|
|
|
|
def _update_analytics_data(self, analytics_result: Dict, camera_position: str):
|
|
"""Update analytics data (fallback for single camera)"""
|
|
self._update_camera_analytics(analytics_result, camera_position)
|
|
|
|
def _process_intersection_roi_events(self, roi_events: List[Dict], camera_position: str):
|
|
"""Process ROI events with intersection-specific logic"""
|
|
self.analytics_data['roi_events'] = len(roi_events)
|
|
|
|
# Count intersection-specific event types
|
|
crosswalk_events = 0
|
|
lane_events = 0
|
|
safety_events = 0
|
|
|
|
for event in roi_events:
|
|
roi_id = event.get('roi_id', '').lower()
|
|
event_type = event.get('type', '')
|
|
|
|
# Enhanced event classification
|
|
if 'crosswalk' in roi_id:
|
|
crosswalk_events += 1
|
|
# Check for pedestrian safety
|
|
if event.get('object_category') == 'person':
|
|
safety_events += 1
|
|
# Emit pedestrian safety alert
|
|
self.pedestrian_safety_alert.emit({
|
|
'timestamp': time.time(),
|
|
'camera': camera_position,
|
|
'roi': roi_id,
|
|
'alert_type': 'pedestrian_in_crosswalk',
|
|
'object_data': event
|
|
})
|
|
elif 'lane' in roi_id:
|
|
lane_events += 1
|
|
# Check for lane violations
|
|
if event.get('object_category') == 'person':
|
|
safety_events += 1
|
|
# Emit lane violation alert
|
|
self.roi_violation_detected.emit({
|
|
'timestamp': time.time(),
|
|
'camera': camera_position,
|
|
'violation_type': 'pedestrian_in_traffic_lane',
|
|
'roi': roi_id,
|
|
'object_data': event
|
|
})
|
|
elif 'intersection' in roi_id:
|
|
# Intersection center events
|
|
if event.get('object_category') in ['person', 'bicycle']:
|
|
safety_events += 1
|
|
|
|
self.analytics_data['crosswalk_events'] = crosswalk_events
|
|
self.analytics_data['lane_events'] = lane_events
|
|
self.analytics_data['safety_events'] = safety_events
|
|
|
|
def _perform_intersection_fusion(self):
|
|
"""Perform intersection-wide data fusion across cameras"""
|
|
if not self.multi_camera_mode:
|
|
return
|
|
|
|
# Aggregate data from all cameras
|
|
fusion_data = {
|
|
'timestamp': time.time(),
|
|
'cameras_active': len([pos for pos, count in self.analytics_data['camera_stats'].items() if count > 0]),
|
|
'total_objects': self.analytics_data['total_objects'],
|
|
'intersection_flow': self._calculate_intersection_flow(),
|
|
'congestion_level': self._calculate_congestion_level(),
|
|
'safety_score': self._calculate_safety_score()
|
|
}
|
|
|
|
# Emit multi-camera fusion data
|
|
self.multi_camera_data_ready.emit(fusion_data)
|
|
|
|
def _calculate_intersection_flow(self) -> str:
|
|
"""Calculate intersection traffic flow status"""
|
|
total_objects = self.analytics_data['total_objects']
|
|
|
|
if total_objects == 0:
|
|
return 'clear'
|
|
elif total_objects <= 5:
|
|
return 'light'
|
|
elif total_objects <= 15:
|
|
return 'moderate'
|
|
elif total_objects <= 25:
|
|
return 'heavy'
|
|
else:
|
|
return 'congested'
|
|
|
|
def _calculate_congestion_level(self) -> float:
|
|
"""Calculate congestion level (0.0 to 1.0)"""
|
|
total_objects = self.analytics_data['total_objects']
|
|
max_capacity = 30 # Assumed max intersection capacity
|
|
|
|
return min(total_objects / max_capacity, 1.0)
|
|
|
|
def _calculate_safety_score(self) -> float:
|
|
"""Calculate safety score based on events (0.0 to 1.0, higher is safer)"""
|
|
safety_events = self.analytics_data['safety_events']
|
|
total_objects = max(self.analytics_data['total_objects'], 1)
|
|
|
|
# Inverse relationship: more safety events = lower safety score
|
|
safety_ratio = safety_events / total_objects
|
|
return max(1.0 - safety_ratio, 0.0)
|
|
|
|
def _emit_scene_analytics(self):
|
|
"""Emit comprehensive scene analytics data"""
|
|
analytics_copy = self.analytics_data.copy()
|
|
analytics_copy['timestamp'] = time.time()
|
|
analytics_copy['intersection_mode'] = self.multi_camera_mode
|
|
analytics_copy['fusion_active'] = self.multi_camera_mode and len(self.scene_adapters) > 1
|
|
|
|
# Add smart intersection specific metrics
|
|
analytics_copy['intersection_metrics'] = {
|
|
'flow_status': self._calculate_intersection_flow(),
|
|
'congestion_level': self._calculate_congestion_level(),
|
|
'safety_score': self._calculate_safety_score(),
|
|
'cameras_active': len([pos for pos, count in analytics_copy['camera_stats'].items() if count > 0])
|
|
}
|
|
|
|
self.scene_analytics_ready.emit(analytics_copy)
|
|
|
|
def _update_performance_stats(self):
|
|
"""Update comprehensive performance statistics"""
|
|
try:
|
|
current_time = time.time()
|
|
|
|
# Calculate FPS
|
|
time_diff = current_time - self.last_fps_update
|
|
if time_diff >= 1.0:
|
|
fps = self.fps_counter / time_diff
|
|
self.analytics_data['performance']['fps'] = fps
|
|
self.fps_counter = 0
|
|
self.last_fps_update = current_time
|
|
|
|
# Enhanced performance monitoring
|
|
try:
|
|
import psutil
|
|
|
|
# CPU usage as proxy for GPU usage
|
|
cpu_percent = psutil.cpu_percent(interval=None)
|
|
self.analytics_data['performance']['gpu_usage'] = min(cpu_percent * 1.2, 100.0)
|
|
|
|
# Memory usage
|
|
memory = psutil.virtual_memory()
|
|
self.analytics_data['performance']['memory_usage'] = memory.used / (1024 * 1024) # MB
|
|
|
|
except ImportError:
|
|
# Fallback if psutil not available
|
|
self.analytics_data['performance']['gpu_usage'] = 0.0
|
|
self.analytics_data['performance']['memory_usage'] = 0.0
|
|
|
|
# Emit performance stats
|
|
perf_stats = self.analytics_data['performance'].copy()
|
|
perf_stats['intersection_active'] = self.enabled
|
|
perf_stats['multi_camera_active'] = self.multi_camera_mode
|
|
perf_stats['adapters_count'] = len(self.scene_adapters)
|
|
|
|
self.performance_stats_ready.emit(perf_stats)
|
|
|
|
except Exception as e:
|
|
print(f"Error updating performance stats: {e}")
|
|
|
|
def _handle_object_detected(self, object_data: Dict):
|
|
"""Handle object detection events from scene adapters"""
|
|
# Enhanced object handling for intersection context
|
|
if object_data.get('safety_category') == 'vulnerable_road_user':
|
|
# Prioritize vulnerable road users
|
|
print(f"🚶 VRU detected: {object_data.get('category', 'unknown')}")
|
|
|
|
def _handle_scene_analytics_updated(self, analytics_data: Dict):
|
|
"""Handle scene analytics updates from adapters"""
|
|
# Scene analytics data is processed in the main processing loop
|
|
pass
|
|
|
|
def _handle_roi_event_detected(self, event_data: Dict):
|
|
"""Handle ROI event detection with smart intersection logic"""
|
|
event_type = event_data.get('type', '')
|
|
roi_id = event_data.get('roi_id', '')
|
|
object_category = event_data.get('object_category', '')
|
|
|
|
print(f"🎯 Smart Intersection ROI Event: {event_type} in {roi_id} ({object_category})")
|
|
|
|
# Enhanced event processing
|
|
enhanced_event = event_data.copy()
|
|
enhanced_event['intersection_context'] = True
|
|
enhanced_event['timestamp'] = time.time()
|
|
|
|
self.roi_event_detected.emit(enhanced_event)
|
|
|
|
def get_current_analytics(self) -> Dict:
|
|
"""Get current comprehensive analytics data"""
|
|
analytics = self.analytics_data.copy()
|
|
analytics['intersection_metrics'] = {
|
|
'flow_status': self._calculate_intersection_flow(),
|
|
'congestion_level': self._calculate_congestion_level(),
|
|
'safety_score': self._calculate_safety_score()
|
|
}
|
|
return analytics
|
|
|
|
def get_performance_stats(self) -> Dict:
|
|
"""Get current performance statistics"""
|
|
if not self.scene_adapters:
|
|
return {}
|
|
|
|
stats = {}
|
|
for adapter in self.scene_adapters.values():
|
|
if hasattr(adapter, 'get_performance_stats'):
|
|
adapter_stats = adapter.get_performance_stats()
|
|
for key, value in adapter_stats.items():
|
|
if key not in stats:
|
|
stats[key] = []
|
|
stats[key].append(value)
|
|
|
|
# Average the stats across adapters
|
|
averaged_stats = {}
|
|
for key, values in stats.items():
|
|
if values:
|
|
averaged_stats[key] = sum(values) / len(values)
|
|
|
|
return averaged_stats
|
|
|
|
def reset_analytics(self):
|
|
"""Reset all analytics data"""
|
|
self.analytics_data = {
|
|
'total_objects': 0,
|
|
'active_tracks': 0,
|
|
'roi_events': 0,
|
|
'crosswalk_events': 0,
|
|
'lane_events': 0,
|
|
'safety_events': 0,
|
|
'camera_stats': {pos: 0 for pos in self.camera_positions},
|
|
'performance': {
|
|
'fps': 0.0,
|
|
'processing_time_ms': 0.0,
|
|
'gpu_usage': 0.0,
|
|
'memory_usage': 0.0
|
|
}
|
|
}
|
|
|
|
print("🔄 Smart Intersection analytics reset")
|
|
|
|
def get_intersection_status(self) -> Dict:
|
|
"""Get comprehensive intersection status"""
|
|
return {
|
|
'enabled': self.enabled,
|
|
'multi_camera_mode': self.multi_camera_mode,
|
|
'scene_analytics_enabled': self.scene_analytics_enabled,
|
|
'cameras_initialized': len(self.scene_adapters),
|
|
'roi_regions_active': len([roi for roi in self.roi_config.get('rois', []) if roi.get('enabled', True)]),
|
|
'current_analytics': self.get_current_analytics(),
|
|
'performance': self.get_performance_stats()
|
|
}
|
|
self._initialize_camera_adapters()
|
|
|
|
# Intersection-wide components
|
|
self.intersection_tracker = IntersectionTracker(self.config)
|
|
self.traffic_flow_analyzer = TrafficFlowAnalyzer(self.config)
|
|
self.pedestrian_safety_monitor = PedestrianSafetyMonitor(self.config)
|
|
self.roi_manager = ROIManager(self.config)
|
|
|
|
# Performance monitoring
|
|
self.performance_monitor = PerformanceMonitor()
|
|
|
|
# Data synchronization
|
|
self.frame_sync_buffer = {}
|
|
self.sync_window_ms = 100 # Synchronization window
|
|
|
|
# Analytics state
|
|
self.is_running = False
|
|
self.frame_count = 0
|
|
|
|
def _setup_logger(self) -> logging.Logger:
|
|
"""Setup logging for smart intersection controller"""
|
|
logger = logging.getLogger('SmartIntersectionController')
|
|
logger.setLevel(logging.INFO)
|
|
|
|
if not logger.handlers:
|
|
handler = logging.StreamHandler()
|
|
formatter = logging.Formatter(
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
handler.setFormatter(formatter)
|
|
logger.addHandler(handler)
|
|
|
|
return logger
|
|
|
|
def _load_intersection_config(self, config_path: Optional[str]) -> Dict:
|
|
"""Load smart intersection configuration"""
|
|
base_config = {
|
|
"intersection": {
|
|
"id": "desktop_intersection_001",
|
|
"name": "Desktop Smart Intersection",
|
|
"cameras": {
|
|
"north": {"id": "camera1", "angle": 0, "enabled": True},
|
|
"east": {"id": "camera2", "angle": 90, "enabled": True},
|
|
"south": {"id": "camera3", "angle": 180, "enabled": True},
|
|
"west": {"id": "camera4", "angle": 270, "enabled": True}
|
|
},
|
|
"detection": {
|
|
"model_path": "models/intersection/openvino.xml",
|
|
"confidence_threshold": 0.5,
|
|
"classes": ["vehicle", "pedestrian"],
|
|
"inference_device": "GPU"
|
|
},
|
|
"tracking": {
|
|
"max_unreliable_frames": 10,
|
|
"non_measurement_frames_dynamic": 8,
|
|
"non_measurement_frames_static": 16,
|
|
"baseline_frame_rate": 30
|
|
},
|
|
"analytics": {
|
|
"enable_traffic_flow": True,
|
|
"enable_pedestrian_safety": True,
|
|
"enable_roi_analytics": True,
|
|
"enable_violation_detection": True
|
|
},
|
|
"roi_definitions": {
|
|
"crosswalks": [
|
|
{
|
|
"id": "crosswalk_north_south",
|
|
"type": "rectangle",
|
|
"coordinates": {"x": 300, "y": 200, "width": 200, "height": 100},
|
|
"monitoring": ["pedestrian_safety", "traffic_conflicts"]
|
|
},
|
|
{
|
|
"id": "crosswalk_east_west",
|
|
"type": "rectangle",
|
|
"coordinates": {"x": 200, "y": 300, "width": 100, "height": 200},
|
|
"monitoring": ["pedestrian_safety", "traffic_conflicts"]
|
|
}
|
|
],
|
|
"traffic_lanes": [
|
|
{
|
|
"id": "lane_north_incoming",
|
|
"direction": "incoming",
|
|
"coordinates": {"x": 350, "y": 0, "width": 100, "height": 200}
|
|
},
|
|
{
|
|
"id": "lane_north_outgoing",
|
|
"direction": "outgoing",
|
|
"coordinates": {"x": 450, "y": 0, "width": 100, "height": 200}
|
|
}
|
|
],
|
|
"intersection_center": {
|
|
"id": "intersection_core",
|
|
"coordinates": {"x": 300, "y": 250, "width": 200, "height": 150},
|
|
"monitoring": ["traffic_conflicts", "congestion"]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if config_path and Path(config_path).exists():
|
|
with open(config_path, 'r') as f:
|
|
user_config = json.load(f)
|
|
# Merge user config with base config
|
|
base_config.update(user_config)
|
|
|
|
return base_config
|
|
|
|
def _initialize_camera_adapters(self):
|
|
"""Initialize scene analytics adapters for each camera"""
|
|
for position, camera_info in self.cameras.items():
|
|
camera_id = camera_info['id']
|
|
|
|
# Create scene adapter for this camera
|
|
adapter = SceneAnalyticsAdapter(
|
|
camera_id=camera_id,
|
|
config_path=None # Use default config
|
|
)
|
|
|
|
# Connect signals
|
|
adapter.object_detected.connect(
|
|
lambda obj, pos=position: self._on_camera_object_detected(obj, pos)
|
|
)
|
|
adapter.scene_analytics_updated.connect(
|
|
lambda data, pos=position: self._on_camera_analytics_updated(data, pos)
|
|
)
|
|
adapter.roi_event_detected.connect(
|
|
lambda event, pos=position: self._on_camera_roi_event(event, pos)
|
|
)
|
|
|
|
camera_info['adapter'] = adapter
|
|
self.logger.info(f"Initialized adapter for {position} camera ({camera_id})")
|
|
|
|
def start_intersection_monitoring(self):
|
|
"""Start smart intersection monitoring"""
|
|
self.is_running = True
|
|
self.frame_count = 0
|
|
self.logger.info("Started smart intersection monitoring")
|
|
|
|
def stop_intersection_monitoring(self):
|
|
"""Stop smart intersection monitoring"""
|
|
self.is_running = False
|
|
self.logger.info("Stopped smart intersection monitoring")
|
|
|
|
def process_multi_camera_frame(self, frames_data: Dict[str, Dict]):
|
|
"""
|
|
Process frames from multiple cameras simultaneously
|
|
|
|
Args:
|
|
frames_data: Dict with camera position as key and frame data as value
|
|
{
|
|
'north': {'frame': np_array, 'detections': [...], 'timestamp': float},
|
|
'east': {'frame': np_array, 'detections': [...], 'timestamp': float},
|
|
...
|
|
}
|
|
"""
|
|
if not self.is_running:
|
|
return
|
|
|
|
start_time = time.time()
|
|
self.frame_count += 1
|
|
|
|
# Process each camera's frame
|
|
camera_analytics = {}
|
|
all_objects = []
|
|
|
|
for position, frame_data in frames_data.items():
|
|
if position not in self.cameras:
|
|
continue
|
|
|
|
camera_info = self.cameras[position]
|
|
adapter = camera_info['adapter']
|
|
|
|
if adapter and 'frame' in frame_data and 'detections' in frame_data:
|
|
# Process frame through scene adapter
|
|
analytics = adapter.process_frame(
|
|
frame_data['frame'],
|
|
frame_data['detections']
|
|
)
|
|
|
|
camera_analytics[position] = analytics
|
|
|
|
# Collect objects for intersection-wide tracking
|
|
for obj in analytics['objects']:
|
|
obj['source_camera'] = position
|
|
obj['camera_id'] = camera_info['id']
|
|
all_objects.append(obj)
|
|
|
|
# Perform intersection-wide analytics
|
|
intersection_analytics = self._perform_intersection_analytics(
|
|
all_objects, camera_analytics
|
|
)
|
|
|
|
# Update performance metrics
|
|
processing_time = (time.time() - start_time) * 1000
|
|
self.performance_monitor.update(processing_time, len(all_objects))
|
|
|
|
# Emit signals
|
|
self._emit_intersection_signals(intersection_analytics, camera_analytics)
|
|
|
|
def _perform_intersection_analytics(self, all_objects: List[Dict], camera_analytics: Dict) -> Dict:
|
|
"""Perform intersection-wide analytics on all detected objects"""
|
|
|
|
# Update intersection-wide tracking
|
|
intersection_tracks = self.intersection_tracker.update_tracks(all_objects)
|
|
|
|
# Analyze traffic flow
|
|
traffic_flow = self.traffic_flow_analyzer.analyze_flow(intersection_tracks)
|
|
|
|
# Monitor pedestrian safety
|
|
safety_alerts = self.pedestrian_safety_monitor.check_safety(intersection_tracks)
|
|
|
|
# Process ROI events
|
|
roi_events = self.roi_manager.process_objects(intersection_tracks)
|
|
|
|
# Compile intersection analytics
|
|
intersection_analytics = {
|
|
'timestamp': datetime.now().isoformat(),
|
|
'frame_number': self.frame_count,
|
|
'total_objects': len(all_objects),
|
|
'intersection_tracks': intersection_tracks,
|
|
'traffic_flow': traffic_flow,
|
|
'safety_alerts': safety_alerts,
|
|
'roi_events': roi_events,
|
|
'camera_count': len(camera_analytics),
|
|
'performance': self.performance_monitor.get_stats()
|
|
}
|
|
|
|
return intersection_analytics
|
|
|
|
def _emit_intersection_signals(self, intersection_analytics: Dict, camera_analytics: Dict):
|
|
"""Emit Qt signals for desktop integration"""
|
|
try:
|
|
# Multi-camera frame data
|
|
self.multi_camera_frame_ready.emit({
|
|
'intersection': intersection_analytics,
|
|
'cameras': camera_analytics
|
|
})
|
|
|
|
# Intersection-wide analytics
|
|
self.intersection_analytics_ready.emit(intersection_analytics)
|
|
|
|
# Traffic flow updates
|
|
if intersection_analytics['traffic_flow']:
|
|
self.traffic_flow_updated.emit(intersection_analytics['traffic_flow'])
|
|
|
|
# Safety alerts
|
|
for alert in intersection_analytics['safety_alerts']:
|
|
self.pedestrian_safety_alert.emit(alert)
|
|
|
|
# ROI violations
|
|
for event in intersection_analytics['roi_events']:
|
|
if event.get('is_violation', False):
|
|
self.roi_violation_detected.emit(event)
|
|
|
|
# Performance metrics
|
|
self.performance_metrics_ready.emit(intersection_analytics['performance'])
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error emitting intersection signals: {e}")
|
|
|
|
def _on_camera_object_detected(self, obj: Dict, camera_position: str):
|
|
"""Handle object detection from individual camera"""
|
|
obj['source_camera'] = camera_position
|
|
obj['detection_timestamp'] = time.time()
|
|
|
|
def _on_camera_analytics_updated(self, data: Dict, camera_position: str):
|
|
"""Handle analytics update from individual camera"""
|
|
data['source_camera'] = camera_position
|
|
|
|
def _on_camera_roi_event(self, event: Dict, camera_position: str):
|
|
"""Handle ROI event from individual camera"""
|
|
event['source_camera'] = camera_position
|
|
|
|
def get_intersection_config(self) -> Dict:
|
|
"""Get current intersection configuration"""
|
|
return self.config
|
|
|
|
def update_intersection_config(self, new_config: Dict):
|
|
"""Update intersection configuration"""
|
|
self.config.update(new_config)
|
|
|
|
# Reinitialize components with new config
|
|
self.intersection_tracker.update_config(self.config)
|
|
self.traffic_flow_analyzer.update_config(self.config)
|
|
self.pedestrian_safety_monitor.update_config(self.config)
|
|
self.roi_manager.update_config(self.config)
|
|
|
|
self.logger.info("Updated intersection configuration")
|
|
|
|
|
|
class IntersectionTracker:
|
|
"""Multi-camera object tracking for intersection"""
|
|
|
|
def __init__(self, config: Dict):
|
|
self.config = config
|
|
self.tracks = {}
|
|
self.next_track_id = 1
|
|
self.max_track_age = 30 # frames
|
|
|
|
def update_tracks(self, objects: List[Dict]) -> List[Dict]:
|
|
"""Update intersection-wide object tracks"""
|
|
current_time = time.time()
|
|
|
|
# Associate objects with existing tracks
|
|
matched_tracks = self._associate_objects_to_tracks(objects)
|
|
|
|
# Update track ages and remove stale tracks
|
|
self._cleanup_stale_tracks()
|
|
|
|
return list(self.tracks.values())
|
|
|
|
def _associate_objects_to_tracks(self, objects: List[Dict]) -> List[Dict]:
|
|
"""Associate detected objects with existing tracks"""
|
|
matched = []
|
|
|
|
for obj in objects:
|
|
best_track_id = self._find_best_matching_track(obj)
|
|
|
|
if best_track_id:
|
|
# Update existing track
|
|
track = self.tracks[best_track_id]
|
|
track['last_detection'] = obj
|
|
track['last_update'] = time.time()
|
|
track['age'] = 0
|
|
matched.append(track)
|
|
else:
|
|
# Create new track
|
|
new_track = self._create_new_track(obj)
|
|
self.tracks[new_track['id']] = new_track
|
|
matched.append(new_track)
|
|
|
|
return matched
|
|
|
|
def _find_best_matching_track(self, obj: Dict) -> Optional[int]:
|
|
"""Find the best matching track for an object"""
|
|
best_match = None
|
|
best_distance = float('inf')
|
|
threshold = 100 # pixels
|
|
|
|
obj_center = obj.get('center_of_mass', {})
|
|
|
|
for track_id, track in self.tracks.items():
|
|
if track['category'] != obj['category']:
|
|
continue
|
|
|
|
track_center = track['last_detection'].get('center_of_mass', {})
|
|
|
|
# Calculate distance
|
|
distance = self._calculate_distance(obj_center, track_center)
|
|
|
|
if distance < threshold and distance < best_distance:
|
|
best_distance = distance
|
|
best_match = track_id
|
|
|
|
return best_match
|
|
|
|
def _calculate_distance(self, center1: Dict, center2: Dict) -> float:
|
|
"""Calculate distance between two centers"""
|
|
if not center1 or not center2:
|
|
return float('inf')
|
|
|
|
x1, y1 = center1.get('x', 0), center1.get('y', 0)
|
|
x2, y2 = center2.get('x', 0), center2.get('y', 0)
|
|
|
|
return np.sqrt((x1 - x2)**2 + (y1 - y2)**2)
|
|
|
|
def _create_new_track(self, obj: Dict) -> Dict:
|
|
"""Create a new track for an object"""
|
|
track_id = self.next_track_id
|
|
self.next_track_id += 1
|
|
|
|
track = {
|
|
'id': track_id,
|
|
'category': obj['category'],
|
|
'first_detection': obj,
|
|
'last_detection': obj,
|
|
'created_time': time.time(),
|
|
'last_update': time.time(),
|
|
'age': 0,
|
|
'trajectory': [obj.get('center_of_mass', {})],
|
|
'cameras_seen': [obj.get('source_camera', 'unknown')]
|
|
}
|
|
|
|
return track
|
|
|
|
def _cleanup_stale_tracks(self):
|
|
"""Remove tracks that haven't been updated"""
|
|
current_time = time.time()
|
|
stale_tracks = []
|
|
|
|
for track_id, track in self.tracks.items():
|
|
time_since_update = current_time - track['last_update']
|
|
if time_since_update > 2.0: # 2 seconds
|
|
stale_tracks.append(track_id)
|
|
else:
|
|
track['age'] += 1
|
|
|
|
for track_id in stale_tracks:
|
|
del self.tracks[track_id]
|
|
|
|
def update_config(self, config: Dict):
|
|
"""Update tracker configuration"""
|
|
self.config = config
|
|
|
|
|
|
class TrafficFlowAnalyzer:
|
|
"""Analyze traffic flow patterns at intersection"""
|
|
|
|
def __init__(self, config: Dict):
|
|
self.config = config
|
|
self.flow_history = deque(maxlen=100)
|
|
|
|
def analyze_flow(self, tracks: List[Dict]) -> Dict:
|
|
"""Analyze current traffic flow"""
|
|
current_time = time.time()
|
|
|
|
# Count vehicles by direction/lane
|
|
vehicle_counts = self._count_vehicles_by_direction(tracks)
|
|
|
|
# Calculate average speeds
|
|
avg_speeds = self._calculate_average_speeds(tracks)
|
|
|
|
# Detect congestion
|
|
congestion_level = self._detect_congestion(tracks)
|
|
|
|
# Flow analysis
|
|
flow_analysis = {
|
|
'timestamp': current_time,
|
|
'vehicle_counts': vehicle_counts,
|
|
'average_speeds': avg_speeds,
|
|
'congestion_level': congestion_level,
|
|
'total_vehicles': len([t for t in tracks if t['category'] == 'vehicle']),
|
|
'total_pedestrians': len([t for t in tracks if t['category'] == 'pedestrian'])
|
|
}
|
|
|
|
self.flow_history.append(flow_analysis)
|
|
|
|
return flow_analysis
|
|
|
|
def _count_vehicles_by_direction(self, tracks: List[Dict]) -> Dict:
|
|
"""Count vehicles by direction"""
|
|
counts = {'north': 0, 'south': 0, 'east': 0, 'west': 0}
|
|
|
|
for track in tracks:
|
|
if track['category'] == 'vehicle':
|
|
# Estimate direction based on trajectory or camera
|
|
direction = self._estimate_direction(track)
|
|
if direction in counts:
|
|
counts[direction] += 1
|
|
|
|
return counts
|
|
|
|
def _estimate_direction(self, track: Dict) -> str:
|
|
"""Estimate vehicle direction based on trajectory"""
|
|
# Simple heuristic based on last seen camera
|
|
cameras_seen = track.get('cameras_seen', [])
|
|
if cameras_seen:
|
|
last_camera = cameras_seen[-1]
|
|
return last_camera
|
|
return 'unknown'
|
|
|
|
def _calculate_average_speeds(self, tracks: List[Dict]) -> Dict:
|
|
"""Calculate average speeds by direction"""
|
|
speeds = {'north': [], 'south': [], 'east': [], 'west': []}
|
|
|
|
for track in tracks:
|
|
if track['category'] == 'vehicle' and len(track.get('trajectory', [])) > 1:
|
|
speed = self._estimate_speed(track)
|
|
direction = self._estimate_direction(track)
|
|
if direction in speeds and speed > 0:
|
|
speeds[direction].append(speed)
|
|
|
|
# Calculate averages
|
|
avg_speeds = {}
|
|
for direction, speed_list in speeds.items():
|
|
avg_speeds[direction] = np.mean(speed_list) if speed_list else 0
|
|
|
|
return avg_speeds
|
|
|
|
def _estimate_speed(self, track: Dict) -> float:
|
|
"""Estimate speed of a tracked object"""
|
|
trajectory = track.get('trajectory', [])
|
|
if len(trajectory) < 2:
|
|
return 0
|
|
|
|
# Simple speed estimation based on position change
|
|
# This is a simplified version - real implementation would need calibration
|
|
start_pos = trajectory[0]
|
|
end_pos = trajectory[-1]
|
|
|
|
if not start_pos or not end_pos:
|
|
return 0
|
|
|
|
distance = np.sqrt(
|
|
(end_pos.get('x', 0) - start_pos.get('x', 0))**2 +
|
|
(end_pos.get('y', 0) - start_pos.get('y', 0))**2
|
|
)
|
|
|
|
time_diff = track['last_update'] - track['created_time']
|
|
if time_diff > 0:
|
|
return distance / time_diff # pixels per second
|
|
|
|
return 0
|
|
|
|
def _detect_congestion(self, tracks: List[Dict]) -> str:
|
|
"""Detect congestion level"""
|
|
vehicle_count = len([t for t in tracks if t['category'] == 'vehicle'])
|
|
|
|
if vehicle_count > 20:
|
|
return 'high'
|
|
elif vehicle_count > 10:
|
|
return 'medium'
|
|
elif vehicle_count > 5:
|
|
return 'low'
|
|
else:
|
|
return 'none'
|
|
|
|
def update_config(self, config: Dict):
|
|
"""Update analyzer configuration"""
|
|
self.config = config
|
|
|
|
|
|
class PedestrianSafetyMonitor:
|
|
"""Monitor pedestrian safety at intersection"""
|
|
|
|
def __init__(self, config: Dict):
|
|
self.config = config
|
|
self.safety_alerts = []
|
|
|
|
def check_safety(self, tracks: List[Dict]) -> List[Dict]:
|
|
"""Check for pedestrian safety issues"""
|
|
alerts = []
|
|
current_time = time.time()
|
|
|
|
pedestrian_tracks = [t for t in tracks if t['category'] == 'pedestrian']
|
|
vehicle_tracks = [t for t in tracks if t['category'] == 'vehicle']
|
|
|
|
for ped_track in pedestrian_tracks:
|
|
# Check for conflicts with vehicles
|
|
conflicts = self._check_vehicle_conflicts(ped_track, vehicle_tracks)
|
|
|
|
# Check if pedestrian is in crosswalk
|
|
in_crosswalk = self._check_crosswalk_usage(ped_track)
|
|
|
|
if conflicts:
|
|
alert = {
|
|
'type': 'pedestrian_vehicle_conflict',
|
|
'severity': 'high',
|
|
'timestamp': current_time,
|
|
'pedestrian_id': ped_track['id'],
|
|
'conflicting_vehicles': conflicts,
|
|
'location': ped_track['last_detection'].get('center_of_mass', {}),
|
|
'in_crosswalk': in_crosswalk
|
|
}
|
|
alerts.append(alert)
|
|
|
|
elif not in_crosswalk:
|
|
alert = {
|
|
'type': 'pedestrian_outside_crosswalk',
|
|
'severity': 'medium',
|
|
'timestamp': current_time,
|
|
'pedestrian_id': ped_track['id'],
|
|
'location': ped_track['last_detection'].get('center_of_mass', {})
|
|
}
|
|
alerts.append(alert)
|
|
|
|
return alerts
|
|
|
|
def _check_vehicle_conflicts(self, ped_track: Dict, vehicle_tracks: List[Dict]) -> List[int]:
|
|
"""Check for conflicts between pedestrian and vehicles"""
|
|
conflicts = []
|
|
ped_center = ped_track['last_detection'].get('center_of_mass', {})
|
|
|
|
for vehicle in vehicle_tracks:
|
|
vehicle_center = vehicle['last_detection'].get('center_of_mass', {})
|
|
|
|
# Check proximity (simplified)
|
|
distance = self._calculate_distance(ped_center, vehicle_center)
|
|
|
|
if distance < 50: # 50 pixels threshold
|
|
conflicts.append(vehicle['id'])
|
|
|
|
return conflicts
|
|
|
|
def _check_crosswalk_usage(self, ped_track: Dict) -> bool:
|
|
"""Check if pedestrian is using designated crosswalk"""
|
|
ped_center = ped_track['last_detection'].get('center_of_mass', {})
|
|
|
|
# Check against crosswalk ROIs
|
|
crosswalks = self.config.get('intersection', {}).get('roi_definitions', {}).get('crosswalks', [])
|
|
|
|
for crosswalk in crosswalks:
|
|
coords = crosswalk.get('coordinates', {})
|
|
if self._point_in_rectangle(ped_center, coords):
|
|
return True
|
|
|
|
return False
|
|
|
|
def _point_in_rectangle(self, point: Dict, rect: Dict) -> bool:
|
|
"""Check if point is inside rectangle"""
|
|
if not point or not rect:
|
|
return False
|
|
|
|
x, y = point.get('x', 0), point.get('y', 0)
|
|
rx, ry = rect.get('x', 0), rect.get('y', 0)
|
|
rw, rh = rect.get('width', 0), rect.get('height', 0)
|
|
|
|
return rx <= x <= rx + rw and ry <= y <= ry + rh
|
|
|
|
def _calculate_distance(self, center1: Dict, center2: Dict) -> float:
|
|
"""Calculate distance between two centers"""
|
|
if not center1 or not center2:
|
|
return float('inf')
|
|
|
|
x1, y1 = center1.get('x', 0), center1.get('y', 0)
|
|
x2, y2 = center2.get('x', 0), center2.get('y', 0)
|
|
|
|
return np.sqrt((x1 - x2)**2 + (y1 - y2)**2)
|
|
|
|
def update_config(self, config: Dict):
|
|
"""Update monitor configuration"""
|
|
self.config = config
|
|
|
|
|
|
class ROIManager:
|
|
"""Manage regions of interest for intersection analytics"""
|
|
|
|
def __init__(self, config: Dict):
|
|
self.config = config
|
|
self.roi_definitions = config.get('intersection', {}).get('roi_definitions', {})
|
|
|
|
def process_objects(self, tracks: List[Dict]) -> List[Dict]:
|
|
"""Process objects against all ROI definitions"""
|
|
events = []
|
|
|
|
for track in tracks:
|
|
track_center = track['last_detection'].get('center_of_mass', {})
|
|
|
|
# Check against all ROI types
|
|
for roi_type, roi_list in self.roi_definitions.items():
|
|
if isinstance(roi_list, list):
|
|
for roi in roi_list:
|
|
if self._object_in_roi(track_center, roi):
|
|
event = {
|
|
'type': f'object_in_{roi_type}',
|
|
'roi_id': roi.get('id', 'unknown'),
|
|
'roi_type': roi_type,
|
|
'object_id': track['id'],
|
|
'object_category': track['category'],
|
|
'timestamp': time.time(),
|
|
'location': track_center,
|
|
'is_violation': self._check_violation(track, roi, roi_type)
|
|
}
|
|
events.append(event)
|
|
|
|
return events
|
|
|
|
def _object_in_roi(self, obj_center: Dict, roi: Dict) -> bool:
|
|
"""Check if object center is within ROI"""
|
|
roi_coords = roi.get('coordinates', {})
|
|
|
|
if roi.get('type') == 'rectangle':
|
|
return self._point_in_rectangle(obj_center, roi_coords)
|
|
|
|
return False
|
|
|
|
def _point_in_rectangle(self, point: Dict, rect: Dict) -> bool:
|
|
"""Check if point is inside rectangle"""
|
|
if not point or not rect:
|
|
return False
|
|
|
|
x, y = point.get('x', 0), point.get('y', 0)
|
|
rx, ry = rect.get('x', 0), rect.get('y', 0)
|
|
rw, rh = rect.get('width', 0), rect.get('height', 0)
|
|
|
|
return rx <= x <= rx + rw and ry <= y <= ry + rh
|
|
|
|
def _check_violation(self, track: Dict, roi: Dict, roi_type: str) -> bool:
|
|
"""Check if object presence in ROI constitutes a violation"""
|
|
# Example violation logic
|
|
if roi_type == 'crosswalks':
|
|
# Vehicle in crosswalk when pedestrian present could be violation
|
|
return track['category'] == 'vehicle'
|
|
|
|
return False
|
|
|
|
def update_config(self, config: Dict):
|
|
"""Update ROI manager configuration"""
|
|
self.config = config
|
|
self.roi_definitions = config.get('intersection', {}).get('roi_definitions', {})
|
|
|
|
|
|
class PerformanceMonitor:
|
|
"""Monitor performance of intersection analytics"""
|
|
|
|
def __init__(self):
|
|
self.processing_times = deque(maxlen=100)
|
|
self.object_counts = deque(maxlen=100)
|
|
self.start_time = time.time()
|
|
|
|
def update(self, processing_time_ms: float, object_count: int):
|
|
"""Update performance metrics"""
|
|
self.processing_times.append(processing_time_ms)
|
|
self.object_counts.append(object_count)
|
|
|
|
def get_stats(self) -> Dict:
|
|
"""Get current performance statistics"""
|
|
if not self.processing_times:
|
|
return {}
|
|
|
|
return {
|
|
'avg_processing_time_ms': np.mean(self.processing_times),
|
|
'max_processing_time_ms': np.max(self.processing_times),
|
|
'min_processing_time_ms': np.min(self.processing_times),
|
|
'avg_object_count': np.mean(self.object_counts) if self.object_counts else 0,
|
|
'total_uptime_seconds': time.time() - self.start_time,
|
|
'frames_processed': len(self.processing_times)
|
|
}
|