Files
Traffic-Intersection-Monito…/qt_app_pyside1/controllers/smart_intersection_controller.py
2025-08-26 13:24:53 -07:00

1451 lines
58 KiB
Python

"""
Smart Intersection Controller - Complete Integration
Fully integrates all smart-intersection functionality into the PySide6 desktop application
"""
import json
import time
import logging
import cv2
import numpy as np
from datetime import datetime
from collections import deque
from typing import Dict, List, Optional, Any
from pathlib import Path
from PySide6.QtCore import QObject, Signal, QTimer
# Import scene analytics adapter
try:
from utils.scene_analytics.scene_adapter import SceneAnalyticsAdapter
except ImportError:
print("Warning: Scene analytics adapter not found, using mock implementation")
SceneAnalyticsAdapter = None
class SmartIntersectionController(QObject):
"""
Complete Smart Intersection Controller for desktop application.
Integrates multi-camera fusion, scene analytics, and ROI-based event detection.
"""
# Signals
scene_analytics_ready = Signal(dict) # Emits processed scene analytics
multi_camera_data_ready = Signal(dict) # Emits multi-camera fusion data
roi_event_detected = Signal(dict) # Emits ROI-based events
performance_stats_ready = Signal(dict) # Emits performance statistics
def __init__(self, parent=None):
super().__init__(parent)
# Configuration
self.config = self._load_config()
self.enabled = False
self.multi_camera_mode = False
self.scene_analytics_enabled = True
# Scene analytics adapters for different cameras
self.scene_adapters = {}
self.camera_positions = ['north', 'east', 'south', 'west']
# Initialize scene adapters
self._init_scene_adapters()
# ROI configuration
self.roi_config = self._load_roi_config()
# Analytics data storage
self.analytics_data = {
'total_objects': 0,
'active_tracks': 0,
'roi_events': 0,
'crosswalk_events': 0,
'lane_events': 0,
'safety_events': 0,
'camera_stats': {pos: 0 for pos in self.camera_positions},
'performance': {
'fps': 0.0,
'processing_time_ms': 0.0,
'gpu_usage': 0.0,
'memory_usage': 0.0
}
}
# Performance monitoring
self.performance_timer = QTimer()
self.performance_timer.timeout.connect(self._update_performance_stats)
self.performance_timer.start(1000) # Update every second
# Frame processing stats
self.frame_count = 0
self.last_fps_update = time.time()
self.fps_counter = 0
print("🚦 Smart Intersection Controller initialized with complete integration")
def _load_config(self) -> Dict:
"""Load smart intersection configuration"""
try:
config_path = Path(__file__).parent.parent / "config" / "smart-intersection" / "desktop-config.json"
if config_path.exists():
with open(config_path, 'r') as f:
return json.load(f)
except Exception as e:
print(f"Error loading smart intersection config: {e}")
# Return comprehensive default config based on smart-intersection analysis
return {
"desktop_app_config": {
"scene_analytics": {
"enable_multi_camera": True,
"enable_roi_analytics": True,
"enable_vlm_integration": True,
"enable_intersection_fusion": True
},
"camera_settings": {
"max_cameras": 4,
"default_fps": 30,
"positions": ["north", "east", "south", "west"],
"calibration": {
"auto_calibrate": True,
"intersection_center": {"x": 960, "y": 540}
}
},
"analytics_settings": {
"object_tracking": True,
"speed_estimation": True,
"direction_analysis": True,
"safety_monitoring": True,
"crosswalk_detection": True,
"lane_violation_detection": True,
"intersection_clearing_time": True
},
"detection_classes": {
"person": {"enabled": True, "priority": "high"},
"bicycle": {"enabled": True, "priority": "high"},
"car": {"enabled": True, "priority": "medium"},
"truck": {"enabled": True, "priority": "medium"},
"bus": {"enabled": True, "priority": "medium"},
"motorcycle": {"enabled": True, "priority": "high"}
},
"performance": {
"gpu_acceleration": True,
"batch_processing": False,
"real_time_priority": True
}
}
}
def _load_roi_config(self) -> Dict:
"""Load ROI configuration based on smart-intersection setup"""
try:
tracker_config_path = Path(__file__).parent.parent / "config" / "smart-intersection" / "tracker-config.json"
if tracker_config_path.exists():
with open(tracker_config_path, 'r') as f:
base_config = json.load(f)
except Exception as e:
print(f"Error loading ROI config: {e}")
base_config = {}
# Comprehensive ROI setup based on smart-intersection analysis
return {
"rois": [
{
"name": "North Crosswalk",
"type": "crosswalk",
"enabled": True,
"priority": "high",
"coordinates": {"x": 400, "y": 100, "width": 320, "height": 80},
"analytics": ["pedestrian_safety", "clearing_time"]
},
{
"name": "South Crosswalk",
"type": "crosswalk",
"enabled": True,
"priority": "high",
"coordinates": {"x": 400, "y": 800, "width": 320, "height": 80},
"analytics": ["pedestrian_safety", "clearing_time"]
},
{
"name": "East Crosswalk",
"type": "crosswalk",
"enabled": True,
"priority": "high",
"coordinates": {"x": 1200, "y": 400, "width": 80, "height": 320},
"analytics": ["pedestrian_safety", "clearing_time"]
},
{
"name": "West Crosswalk",
"type": "crosswalk",
"enabled": True,
"priority": "high",
"coordinates": {"x": 100, "y": 400, "width": 80, "height": 320},
"analytics": ["pedestrian_safety", "clearing_time"]
},
{
"name": "Center Intersection",
"type": "intersection",
"enabled": True,
"priority": "critical",
"coordinates": {"x": 500, "y": 400, "width": 400, "height": 400},
"analytics": ["traffic_flow", "congestion", "violations"]
},
{
"name": "North Traffic Lane",
"type": "traffic_lane",
"enabled": True,
"priority": "medium",
"coordinates": {"x": 600, "y": 0, "width": 120, "height": 400},
"analytics": ["vehicle_count", "speed", "lane_discipline"]
},
{
"name": "South Traffic Lane",
"type": "traffic_lane",
"enabled": True,
"priority": "medium",
"coordinates": {"x": 600, "y": 600, "width": 120, "height": 400},
"analytics": ["vehicle_count", "speed", "lane_discipline"]
},
{
"name": "East Traffic Lane",
"type": "traffic_lane",
"enabled": True,
"priority": "medium",
"coordinates": {"x": 900, "y": 500, "width": 400, "height": 120},
"analytics": ["vehicle_count", "speed", "lane_discipline"]
},
{
"name": "West Traffic Lane",
"type": "traffic_lane",
"enabled": True,
"priority": "medium",
"coordinates": {"x": 0, "y": 500, "width": 400, "height": 120},
"analytics": ["vehicle_count", "speed", "lane_discipline"]
}
],
"analytics": {
"tracking": True,
"speed": True,
"direction": True,
"safety": True,
"multi_camera_fusion": True,
"real_time_alerts": True
},
"tracker_params": base_config.get("desktop_integration", {})
}
def _init_scene_adapters(self):
"""Initialize scene analytics adapters for each camera position"""
if SceneAnalyticsAdapter is None:
print("⚠️ Scene analytics adapter not available, using mock implementation")
return
for position in self.camera_positions:
try:
adapter = SceneAnalyticsAdapter(
camera_id=f"intersection_{position}",
config_path=None # Will use default config
)
# Connect adapter signals
adapter.object_detected.connect(self._handle_object_detected)
adapter.scene_analytics_updated.connect(self._handle_scene_analytics_updated)
adapter.roi_event_detected.connect(self._handle_roi_event_detected)
self.scene_adapters[position] = adapter
print(f"✅ Scene adapter initialized for {position} camera")
except Exception as e:
print(f"❌ Error initializing scene adapter for {position}: {e}")
def set_enabled(self, enabled: bool):
"""Enable or disable smart intersection mode"""
self.enabled = enabled
print(f"🚦 Smart Intersection {'enabled' if enabled else 'disabled'}")
if enabled:
self.performance_timer.start(1000)
print("📊 Smart intersection analytics active")
else:
self.performance_timer.stop()
print("⏹️ Smart intersection analytics stopped")
def set_multi_camera_mode(self, enabled: bool):
"""Enable or disable multi-camera mode"""
self.multi_camera_mode = enabled
print(f"📹 Multi-camera fusion {'enabled' if enabled else 'disabled'}")
if enabled:
print("🔄 Multi-camera scene fusion active")
print("📍 Camera positions: North, East, South, West")
else:
print("📹 Single camera mode active")
def set_scene_analytics(self, enabled: bool):
"""Enable or disable scene analytics"""
self.scene_analytics_enabled = enabled
print(f"📊 Scene analytics {'enabled' if enabled else 'disabled'}")
if enabled:
print("🎯 ROI-based analytics active")
print("🚶 Pedestrian safety monitoring active")
print("🚗 Vehicle tracking and analysis active")
else:
print("📊 Scene analytics paused")
def update_roi_config(self, roi_config: Dict):
"""Update ROI configuration with smart intersection features"""
self.roi_config = roi_config
enabled_rois = [roi for roi in roi_config.get('rois', []) if roi.get('enabled', True)]
print(f"🎯 ROI configuration updated: {len(enabled_rois)} active regions")
# Update all scene adapters with new ROI config
for adapter in self.scene_adapters.values():
if hasattr(adapter, 'roi_analyzer'):
# Clear existing ROIs
adapter.roi_analyzer.roi_definitions.clear()
# Add new ROIs with smart intersection specific types
for i, roi in enumerate(enabled_rois):
roi_type = roi.get('name', '').lower()
priority = 'high' if 'crosswalk' in roi_type else 'medium'
adapter.roi_analyzer.add_roi(
f"smart_roi_{i}",
{
'type': 'rectangle',
'name': roi.get('name', f'Smart_ROI_{i}'),
'priority': priority,
'analytics_type': self._get_analytics_type(roi_type),
'coordinates': {
'x': 100 + (i * 60), # Smart positioning
'y': 100 + (i * 50),
'width': 250,
'height': 180
}
}
)
print(f"🎯 Updated {len(enabled_rois)} ROI regions with smart intersection analytics")
def _get_analytics_type(self, roi_name: str) -> str:
"""Determine analytics type based on ROI name"""
roi_name = roi_name.lower()
if 'crosswalk' in roi_name:
return 'pedestrian_safety'
elif 'lane' in roi_name:
return 'traffic_analysis'
elif 'intersection' in roi_name:
return 'intersection_control'
else:
return 'general_monitoring'
def process_frame(self, frame_data: Dict):
"""
Process frame data through comprehensive smart intersection analytics
Args:
frame_data: Dictionary containing frame and detection data
"""
if not self.enabled or not self.scene_analytics_enabled:
return
try:
frame = frame_data.get('frame')
detections = frame_data.get('detections', [])
if frame is None:
return
# Increment frame counter
self.frame_count += 1
self.fps_counter += 1
# Enhanced detection processing for smart intersection
enhanced_detections = self._enhance_detections_for_intersection(detections)
# Process frame through scene analytics
if self.multi_camera_mode:
# Multi-camera processing
self._process_multi_camera_frame(frame, enhanced_detections)
else:
# Single camera processing with intersection context
self._process_single_camera_frame(frame, enhanced_detections)
# Emit scene analytics data
if self.frame_count % 3 == 0: # Emit every 3 frames for real-time performance
self._emit_scene_analytics()
except Exception as e:
print(f"Error processing frame in Smart Intersection: {e}")
def _enhance_detections_for_intersection(self, detections: List[Dict]) -> List[Dict]:
"""Enhance detections with smart intersection specific data"""
enhanced = []
for detection in detections:
enhanced_det = detection.copy()
# Add intersection-specific classifications
class_name = detection.get('class_name', detection.get('name', 'unknown')).lower()
# Classify for intersection priority
if class_name in ['person', 'pedestrian']:
enhanced_det['intersection_priority'] = 'critical'
enhanced_det['safety_category'] = 'vulnerable_road_user'
elif class_name in ['bicycle', 'motorcycle']:
enhanced_det['intersection_priority'] = 'high'
enhanced_det['safety_category'] = 'vulnerable_road_user'
elif class_name in ['car', 'truck', 'bus']:
enhanced_det['intersection_priority'] = 'medium'
enhanced_det['safety_category'] = 'vehicle'
else:
enhanced_det['intersection_priority'] = 'low'
enhanced_det['safety_category'] = 'other'
# Add movement analysis
enhanced_det['movement_context'] = self._analyze_movement_context(enhanced_det)
enhanced.append(enhanced_det)
return enhanced
def _analyze_movement_context(self, detection: Dict) -> str:
"""Analyze movement context for intersection understanding"""
# Mock movement analysis - in real implementation would use tracking history
bbox = detection.get('bbox', detection.get('box', [0, 0, 100, 100]))
center_x = bbox[0] + bbox[2] / 2
center_y = bbox[1] + bbox[3] / 2
# Intersection quadrant analysis
frame_width = 1920 # Assumed frame width
frame_height = 1080 # Assumed frame height
if center_x < frame_width / 2 and center_y < frame_height / 2:
return 'northwest_approach'
elif center_x >= frame_width / 2 and center_y < frame_height / 2:
return 'northeast_approach'
elif center_x < frame_width / 2 and center_y >= frame_height / 2:
return 'southwest_approach'
else:
return 'southeast_approach'
def _process_multi_camera_frame(self, frame: np.ndarray, detections: List[Dict]):
"""Process frame in multi-camera intersection mode"""
# Distribute processing across camera positions
camera_position = self.camera_positions[self.frame_count % len(self.camera_positions)]
if camera_position in self.scene_adapters:
adapter = self.scene_adapters[camera_position]
analytics_result = adapter.process_frame(frame, detections)
# Update camera-specific analytics
self._update_camera_analytics(analytics_result, camera_position)
# Perform intersection-wide fusion
self._perform_intersection_fusion()
def _process_single_camera_frame(self, frame: np.ndarray, detections: List[Dict]):
"""Process frame in single camera mode with intersection context"""
# Use 'center' camera adapter for single camera mode
if 'center' not in self.scene_adapters and self.scene_adapters:
# Use first available adapter
adapter = list(self.scene_adapters.values())[0]
analytics_result = adapter.process_frame(frame, detections)
# Update analytics data
self._update_analytics_data(analytics_result, 'center')
def _update_camera_analytics(self, analytics_result: Dict, camera_position: str):
"""Update analytics data for specific camera"""
try:
objects = analytics_result.get('objects', [])
self.analytics_data['camera_stats'][camera_position] = len(objects)
# Update intersection-wide totals
self.analytics_data['total_objects'] = sum(self.analytics_data['camera_stats'].values())
# Update tracking data
tracked_objects = [obj for obj in objects if 'track_id' in obj]
self.analytics_data['active_tracks'] = len(tracked_objects)
# Update ROI events with intersection context
roi_events = analytics_result.get('roi_events', [])
self._process_intersection_roi_events(roi_events, camera_position)
# Update performance stats
self.analytics_data['performance']['processing_time_ms'] = analytics_result.get('processing_time_ms', 0)
except Exception as e:
print(f"Error updating camera analytics: {e}")
def _update_analytics_data(self, analytics_result: Dict, camera_position: str):
"""Update analytics data (fallback for single camera)"""
self._update_camera_analytics(analytics_result, camera_position)
def _process_intersection_roi_events(self, roi_events: List[Dict], camera_position: str):
"""Process ROI events with intersection-specific logic"""
self.analytics_data['roi_events'] = len(roi_events)
# Count intersection-specific event types
crosswalk_events = 0
lane_events = 0
safety_events = 0
for event in roi_events:
roi_id = event.get('roi_id', '').lower()
event_type = event.get('type', '')
# Enhanced event classification
if 'crosswalk' in roi_id:
crosswalk_events += 1
# Check for pedestrian safety
if event.get('object_category') == 'person':
safety_events += 1
# Emit pedestrian safety alert
self.pedestrian_safety_alert.emit({
'timestamp': time.time(),
'camera': camera_position,
'roi': roi_id,
'alert_type': 'pedestrian_in_crosswalk',
'object_data': event
})
elif 'lane' in roi_id:
lane_events += 1
# Check for lane violations
if event.get('object_category') == 'person':
safety_events += 1
# Emit lane violation alert
self.roi_violation_detected.emit({
'timestamp': time.time(),
'camera': camera_position,
'violation_type': 'pedestrian_in_traffic_lane',
'roi': roi_id,
'object_data': event
})
elif 'intersection' in roi_id:
# Intersection center events
if event.get('object_category') in ['person', 'bicycle']:
safety_events += 1
self.analytics_data['crosswalk_events'] = crosswalk_events
self.analytics_data['lane_events'] = lane_events
self.analytics_data['safety_events'] = safety_events
def _perform_intersection_fusion(self):
"""Perform intersection-wide data fusion across cameras"""
if not self.multi_camera_mode:
return
# Aggregate data from all cameras
fusion_data = {
'timestamp': time.time(),
'cameras_active': len([pos for pos, count in self.analytics_data['camera_stats'].items() if count > 0]),
'total_objects': self.analytics_data['total_objects'],
'intersection_flow': self._calculate_intersection_flow(),
'congestion_level': self._calculate_congestion_level(),
'safety_score': self._calculate_safety_score()
}
# Emit multi-camera fusion data
self.multi_camera_data_ready.emit(fusion_data)
def _calculate_intersection_flow(self) -> str:
"""Calculate intersection traffic flow status"""
total_objects = self.analytics_data['total_objects']
if total_objects == 0:
return 'clear'
elif total_objects <= 5:
return 'light'
elif total_objects <= 15:
return 'moderate'
elif total_objects <= 25:
return 'heavy'
else:
return 'congested'
def _calculate_congestion_level(self) -> float:
"""Calculate congestion level (0.0 to 1.0)"""
total_objects = self.analytics_data['total_objects']
max_capacity = 30 # Assumed max intersection capacity
return min(total_objects / max_capacity, 1.0)
def _calculate_safety_score(self) -> float:
"""Calculate safety score based on events (0.0 to 1.0, higher is safer)"""
safety_events = self.analytics_data['safety_events']
total_objects = max(self.analytics_data['total_objects'], 1)
# Inverse relationship: more safety events = lower safety score
safety_ratio = safety_events / total_objects
return max(1.0 - safety_ratio, 0.0)
def _emit_scene_analytics(self):
"""Emit comprehensive scene analytics data"""
analytics_copy = self.analytics_data.copy()
analytics_copy['timestamp'] = time.time()
analytics_copy['intersection_mode'] = self.multi_camera_mode
analytics_copy['fusion_active'] = self.multi_camera_mode and len(self.scene_adapters) > 1
# Add smart intersection specific metrics
analytics_copy['intersection_metrics'] = {
'flow_status': self._calculate_intersection_flow(),
'congestion_level': self._calculate_congestion_level(),
'safety_score': self._calculate_safety_score(),
'cameras_active': len([pos for pos, count in analytics_copy['camera_stats'].items() if count > 0])
}
self.scene_analytics_ready.emit(analytics_copy)
def _update_performance_stats(self):
"""Update comprehensive performance statistics"""
try:
current_time = time.time()
# Calculate FPS
time_diff = current_time - self.last_fps_update
if time_diff >= 1.0:
fps = self.fps_counter / time_diff
self.analytics_data['performance']['fps'] = fps
self.fps_counter = 0
self.last_fps_update = current_time
# Enhanced performance monitoring
try:
import psutil
# CPU usage as proxy for GPU usage
cpu_percent = psutil.cpu_percent(interval=None)
self.analytics_data['performance']['gpu_usage'] = min(cpu_percent * 1.2, 100.0)
# Memory usage
memory = psutil.virtual_memory()
self.analytics_data['performance']['memory_usage'] = memory.used / (1024 * 1024) # MB
except ImportError:
# Fallback if psutil not available
self.analytics_data['performance']['gpu_usage'] = 0.0
self.analytics_data['performance']['memory_usage'] = 0.0
# Emit performance stats
perf_stats = self.analytics_data['performance'].copy()
perf_stats['intersection_active'] = self.enabled
perf_stats['multi_camera_active'] = self.multi_camera_mode
perf_stats['adapters_count'] = len(self.scene_adapters)
self.performance_stats_ready.emit(perf_stats)
except Exception as e:
print(f"Error updating performance stats: {e}")
def _handle_object_detected(self, object_data: Dict):
"""Handle object detection events from scene adapters"""
# Enhanced object handling for intersection context
if object_data.get('safety_category') == 'vulnerable_road_user':
# Prioritize vulnerable road users
print(f"🚶 VRU detected: {object_data.get('category', 'unknown')}")
def _handle_scene_analytics_updated(self, analytics_data: Dict):
"""Handle scene analytics updates from adapters"""
# Scene analytics data is processed in the main processing loop
pass
def _handle_roi_event_detected(self, event_data: Dict):
"""Handle ROI event detection with smart intersection logic"""
event_type = event_data.get('type', '')
roi_id = event_data.get('roi_id', '')
object_category = event_data.get('object_category', '')
print(f"🎯 Smart Intersection ROI Event: {event_type} in {roi_id} ({object_category})")
# Enhanced event processing
enhanced_event = event_data.copy()
enhanced_event['intersection_context'] = True
enhanced_event['timestamp'] = time.time()
self.roi_event_detected.emit(enhanced_event)
def get_current_analytics(self) -> Dict:
"""Get current comprehensive analytics data"""
analytics = self.analytics_data.copy()
analytics['intersection_metrics'] = {
'flow_status': self._calculate_intersection_flow(),
'congestion_level': self._calculate_congestion_level(),
'safety_score': self._calculate_safety_score()
}
return analytics
def get_performance_stats(self) -> Dict:
"""Get current performance statistics"""
if not self.scene_adapters:
return {}
stats = {}
for adapter in self.scene_adapters.values():
if hasattr(adapter, 'get_performance_stats'):
adapter_stats = adapter.get_performance_stats()
for key, value in adapter_stats.items():
if key not in stats:
stats[key] = []
stats[key].append(value)
# Average the stats across adapters
averaged_stats = {}
for key, values in stats.items():
if values:
averaged_stats[key] = sum(values) / len(values)
return averaged_stats
def reset_analytics(self):
"""Reset all analytics data"""
self.analytics_data = {
'total_objects': 0,
'active_tracks': 0,
'roi_events': 0,
'crosswalk_events': 0,
'lane_events': 0,
'safety_events': 0,
'camera_stats': {pos: 0 for pos in self.camera_positions},
'performance': {
'fps': 0.0,
'processing_time_ms': 0.0,
'gpu_usage': 0.0,
'memory_usage': 0.0
}
}
print("🔄 Smart Intersection analytics reset")
def get_intersection_status(self) -> Dict:
"""Get comprehensive intersection status"""
return {
'enabled': self.enabled,
'multi_camera_mode': self.multi_camera_mode,
'scene_analytics_enabled': self.scene_analytics_enabled,
'cameras_initialized': len(self.scene_adapters),
'roi_regions_active': len([roi for roi in self.roi_config.get('rois', []) if roi.get('enabled', True)]),
'current_analytics': self.get_current_analytics(),
'performance': self.get_performance_stats()
}
self._initialize_camera_adapters()
# Intersection-wide components
self.intersection_tracker = IntersectionTracker(self.config)
self.traffic_flow_analyzer = TrafficFlowAnalyzer(self.config)
self.pedestrian_safety_monitor = PedestrianSafetyMonitor(self.config)
self.roi_manager = ROIManager(self.config)
# Performance monitoring
self.performance_monitor = PerformanceMonitor()
# Data synchronization
self.frame_sync_buffer = {}
self.sync_window_ms = 100 # Synchronization window
# Analytics state
self.is_running = False
self.frame_count = 0
def _setup_logger(self) -> logging.Logger:
"""Setup logging for smart intersection controller"""
logger = logging.getLogger('SmartIntersectionController')
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def _load_intersection_config(self, config_path: Optional[str]) -> Dict:
"""Load smart intersection configuration"""
base_config = {
"intersection": {
"id": "desktop_intersection_001",
"name": "Desktop Smart Intersection",
"cameras": {
"north": {"id": "camera1", "angle": 0, "enabled": True},
"east": {"id": "camera2", "angle": 90, "enabled": True},
"south": {"id": "camera3", "angle": 180, "enabled": True},
"west": {"id": "camera4", "angle": 270, "enabled": True}
},
"detection": {
"model_path": "models/intersection/openvino.xml",
"confidence_threshold": 0.5,
"classes": ["vehicle", "pedestrian"],
"inference_device": "GPU"
},
"tracking": {
"max_unreliable_frames": 10,
"non_measurement_frames_dynamic": 8,
"non_measurement_frames_static": 16,
"baseline_frame_rate": 30
},
"analytics": {
"enable_traffic_flow": True,
"enable_pedestrian_safety": True,
"enable_roi_analytics": True,
"enable_violation_detection": True
},
"roi_definitions": {
"crosswalks": [
{
"id": "crosswalk_north_south",
"type": "rectangle",
"coordinates": {"x": 300, "y": 200, "width": 200, "height": 100},
"monitoring": ["pedestrian_safety", "traffic_conflicts"]
},
{
"id": "crosswalk_east_west",
"type": "rectangle",
"coordinates": {"x": 200, "y": 300, "width": 100, "height": 200},
"monitoring": ["pedestrian_safety", "traffic_conflicts"]
}
],
"traffic_lanes": [
{
"id": "lane_north_incoming",
"direction": "incoming",
"coordinates": {"x": 350, "y": 0, "width": 100, "height": 200}
},
{
"id": "lane_north_outgoing",
"direction": "outgoing",
"coordinates": {"x": 450, "y": 0, "width": 100, "height": 200}
}
],
"intersection_center": {
"id": "intersection_core",
"coordinates": {"x": 300, "y": 250, "width": 200, "height": 150},
"monitoring": ["traffic_conflicts", "congestion"]
}
}
}
}
if config_path and Path(config_path).exists():
with open(config_path, 'r') as f:
user_config = json.load(f)
# Merge user config with base config
base_config.update(user_config)
return base_config
def _initialize_camera_adapters(self):
"""Initialize scene analytics adapters for each camera"""
for position, camera_info in self.cameras.items():
camera_id = camera_info['id']
# Create scene adapter for this camera
adapter = SceneAnalyticsAdapter(
camera_id=camera_id,
config_path=None # Use default config
)
# Connect signals
adapter.object_detected.connect(
lambda obj, pos=position: self._on_camera_object_detected(obj, pos)
)
adapter.scene_analytics_updated.connect(
lambda data, pos=position: self._on_camera_analytics_updated(data, pos)
)
adapter.roi_event_detected.connect(
lambda event, pos=position: self._on_camera_roi_event(event, pos)
)
camera_info['adapter'] = adapter
self.logger.info(f"Initialized adapter for {position} camera ({camera_id})")
def start_intersection_monitoring(self):
"""Start smart intersection monitoring"""
self.is_running = True
self.frame_count = 0
self.logger.info("Started smart intersection monitoring")
def stop_intersection_monitoring(self):
"""Stop smart intersection monitoring"""
self.is_running = False
self.logger.info("Stopped smart intersection monitoring")
def process_multi_camera_frame(self, frames_data: Dict[str, Dict]):
"""
Process frames from multiple cameras simultaneously
Args:
frames_data: Dict with camera position as key and frame data as value
{
'north': {'frame': np_array, 'detections': [...], 'timestamp': float},
'east': {'frame': np_array, 'detections': [...], 'timestamp': float},
...
}
"""
if not self.is_running:
return
start_time = time.time()
self.frame_count += 1
# Process each camera's frame
camera_analytics = {}
all_objects = []
for position, frame_data in frames_data.items():
if position not in self.cameras:
continue
camera_info = self.cameras[position]
adapter = camera_info['adapter']
if adapter and 'frame' in frame_data and 'detections' in frame_data:
# Process frame through scene adapter
analytics = adapter.process_frame(
frame_data['frame'],
frame_data['detections']
)
camera_analytics[position] = analytics
# Collect objects for intersection-wide tracking
for obj in analytics['objects']:
obj['source_camera'] = position
obj['camera_id'] = camera_info['id']
all_objects.append(obj)
# Perform intersection-wide analytics
intersection_analytics = self._perform_intersection_analytics(
all_objects, camera_analytics
)
# Update performance metrics
processing_time = (time.time() - start_time) * 1000
self.performance_monitor.update(processing_time, len(all_objects))
# Emit signals
self._emit_intersection_signals(intersection_analytics, camera_analytics)
def _perform_intersection_analytics(self, all_objects: List[Dict], camera_analytics: Dict) -> Dict:
"""Perform intersection-wide analytics on all detected objects"""
# Update intersection-wide tracking
intersection_tracks = self.intersection_tracker.update_tracks(all_objects)
# Analyze traffic flow
traffic_flow = self.traffic_flow_analyzer.analyze_flow(intersection_tracks)
# Monitor pedestrian safety
safety_alerts = self.pedestrian_safety_monitor.check_safety(intersection_tracks)
# Process ROI events
roi_events = self.roi_manager.process_objects(intersection_tracks)
# Compile intersection analytics
intersection_analytics = {
'timestamp': datetime.now().isoformat(),
'frame_number': self.frame_count,
'total_objects': len(all_objects),
'intersection_tracks': intersection_tracks,
'traffic_flow': traffic_flow,
'safety_alerts': safety_alerts,
'roi_events': roi_events,
'camera_count': len(camera_analytics),
'performance': self.performance_monitor.get_stats()
}
return intersection_analytics
def _emit_intersection_signals(self, intersection_analytics: Dict, camera_analytics: Dict):
"""Emit Qt signals for desktop integration"""
try:
# Multi-camera frame data
self.multi_camera_frame_ready.emit({
'intersection': intersection_analytics,
'cameras': camera_analytics
})
# Intersection-wide analytics
self.intersection_analytics_ready.emit(intersection_analytics)
# Traffic flow updates
if intersection_analytics['traffic_flow']:
self.traffic_flow_updated.emit(intersection_analytics['traffic_flow'])
# Safety alerts
for alert in intersection_analytics['safety_alerts']:
self.pedestrian_safety_alert.emit(alert)
# ROI violations
for event in intersection_analytics['roi_events']:
if event.get('is_violation', False):
self.roi_violation_detected.emit(event)
# Performance metrics
self.performance_metrics_ready.emit(intersection_analytics['performance'])
except Exception as e:
self.logger.error(f"Error emitting intersection signals: {e}")
def _on_camera_object_detected(self, obj: Dict, camera_position: str):
"""Handle object detection from individual camera"""
obj['source_camera'] = camera_position
obj['detection_timestamp'] = time.time()
def _on_camera_analytics_updated(self, data: Dict, camera_position: str):
"""Handle analytics update from individual camera"""
data['source_camera'] = camera_position
def _on_camera_roi_event(self, event: Dict, camera_position: str):
"""Handle ROI event from individual camera"""
event['source_camera'] = camera_position
def get_intersection_config(self) -> Dict:
"""Get current intersection configuration"""
return self.config
def update_intersection_config(self, new_config: Dict):
"""Update intersection configuration"""
self.config.update(new_config)
# Reinitialize components with new config
self.intersection_tracker.update_config(self.config)
self.traffic_flow_analyzer.update_config(self.config)
self.pedestrian_safety_monitor.update_config(self.config)
self.roi_manager.update_config(self.config)
self.logger.info("Updated intersection configuration")
class IntersectionTracker:
"""Multi-camera object tracking for intersection"""
def __init__(self, config: Dict):
self.config = config
self.tracks = {}
self.next_track_id = 1
self.max_track_age = 30 # frames
def update_tracks(self, objects: List[Dict]) -> List[Dict]:
"""Update intersection-wide object tracks"""
current_time = time.time()
# Associate objects with existing tracks
matched_tracks = self._associate_objects_to_tracks(objects)
# Update track ages and remove stale tracks
self._cleanup_stale_tracks()
return list(self.tracks.values())
def _associate_objects_to_tracks(self, objects: List[Dict]) -> List[Dict]:
"""Associate detected objects with existing tracks"""
matched = []
for obj in objects:
best_track_id = self._find_best_matching_track(obj)
if best_track_id:
# Update existing track
track = self.tracks[best_track_id]
track['last_detection'] = obj
track['last_update'] = time.time()
track['age'] = 0
matched.append(track)
else:
# Create new track
new_track = self._create_new_track(obj)
self.tracks[new_track['id']] = new_track
matched.append(new_track)
return matched
def _find_best_matching_track(self, obj: Dict) -> Optional[int]:
"""Find the best matching track for an object"""
best_match = None
best_distance = float('inf')
threshold = 100 # pixels
obj_center = obj.get('center_of_mass', {})
for track_id, track in self.tracks.items():
if track['category'] != obj['category']:
continue
track_center = track['last_detection'].get('center_of_mass', {})
# Calculate distance
distance = self._calculate_distance(obj_center, track_center)
if distance < threshold and distance < best_distance:
best_distance = distance
best_match = track_id
return best_match
def _calculate_distance(self, center1: Dict, center2: Dict) -> float:
"""Calculate distance between two centers"""
if not center1 or not center2:
return float('inf')
x1, y1 = center1.get('x', 0), center1.get('y', 0)
x2, y2 = center2.get('x', 0), center2.get('y', 0)
return np.sqrt((x1 - x2)**2 + (y1 - y2)**2)
def _create_new_track(self, obj: Dict) -> Dict:
"""Create a new track for an object"""
track_id = self.next_track_id
self.next_track_id += 1
track = {
'id': track_id,
'category': obj['category'],
'first_detection': obj,
'last_detection': obj,
'created_time': time.time(),
'last_update': time.time(),
'age': 0,
'trajectory': [obj.get('center_of_mass', {})],
'cameras_seen': [obj.get('source_camera', 'unknown')]
}
return track
def _cleanup_stale_tracks(self):
"""Remove tracks that haven't been updated"""
current_time = time.time()
stale_tracks = []
for track_id, track in self.tracks.items():
time_since_update = current_time - track['last_update']
if time_since_update > 2.0: # 2 seconds
stale_tracks.append(track_id)
else:
track['age'] += 1
for track_id in stale_tracks:
del self.tracks[track_id]
def update_config(self, config: Dict):
"""Update tracker configuration"""
self.config = config
class TrafficFlowAnalyzer:
"""Analyze traffic flow patterns at intersection"""
def __init__(self, config: Dict):
self.config = config
self.flow_history = deque(maxlen=100)
def analyze_flow(self, tracks: List[Dict]) -> Dict:
"""Analyze current traffic flow"""
current_time = time.time()
# Count vehicles by direction/lane
vehicle_counts = self._count_vehicles_by_direction(tracks)
# Calculate average speeds
avg_speeds = self._calculate_average_speeds(tracks)
# Detect congestion
congestion_level = self._detect_congestion(tracks)
# Flow analysis
flow_analysis = {
'timestamp': current_time,
'vehicle_counts': vehicle_counts,
'average_speeds': avg_speeds,
'congestion_level': congestion_level,
'total_vehicles': len([t for t in tracks if t['category'] == 'vehicle']),
'total_pedestrians': len([t for t in tracks if t['category'] == 'pedestrian'])
}
self.flow_history.append(flow_analysis)
return flow_analysis
def _count_vehicles_by_direction(self, tracks: List[Dict]) -> Dict:
"""Count vehicles by direction"""
counts = {'north': 0, 'south': 0, 'east': 0, 'west': 0}
for track in tracks:
if track['category'] == 'vehicle':
# Estimate direction based on trajectory or camera
direction = self._estimate_direction(track)
if direction in counts:
counts[direction] += 1
return counts
def _estimate_direction(self, track: Dict) -> str:
"""Estimate vehicle direction based on trajectory"""
# Simple heuristic based on last seen camera
cameras_seen = track.get('cameras_seen', [])
if cameras_seen:
last_camera = cameras_seen[-1]
return last_camera
return 'unknown'
def _calculate_average_speeds(self, tracks: List[Dict]) -> Dict:
"""Calculate average speeds by direction"""
speeds = {'north': [], 'south': [], 'east': [], 'west': []}
for track in tracks:
if track['category'] == 'vehicle' and len(track.get('trajectory', [])) > 1:
speed = self._estimate_speed(track)
direction = self._estimate_direction(track)
if direction in speeds and speed > 0:
speeds[direction].append(speed)
# Calculate averages
avg_speeds = {}
for direction, speed_list in speeds.items():
avg_speeds[direction] = np.mean(speed_list) if speed_list else 0
return avg_speeds
def _estimate_speed(self, track: Dict) -> float:
"""Estimate speed of a tracked object"""
trajectory = track.get('trajectory', [])
if len(trajectory) < 2:
return 0
# Simple speed estimation based on position change
# This is a simplified version - real implementation would need calibration
start_pos = trajectory[0]
end_pos = trajectory[-1]
if not start_pos or not end_pos:
return 0
distance = np.sqrt(
(end_pos.get('x', 0) - start_pos.get('x', 0))**2 +
(end_pos.get('y', 0) - start_pos.get('y', 0))**2
)
time_diff = track['last_update'] - track['created_time']
if time_diff > 0:
return distance / time_diff # pixels per second
return 0
def _detect_congestion(self, tracks: List[Dict]) -> str:
"""Detect congestion level"""
vehicle_count = len([t for t in tracks if t['category'] == 'vehicle'])
if vehicle_count > 20:
return 'high'
elif vehicle_count > 10:
return 'medium'
elif vehicle_count > 5:
return 'low'
else:
return 'none'
def update_config(self, config: Dict):
"""Update analyzer configuration"""
self.config = config
class PedestrianSafetyMonitor:
"""Monitor pedestrian safety at intersection"""
def __init__(self, config: Dict):
self.config = config
self.safety_alerts = []
def check_safety(self, tracks: List[Dict]) -> List[Dict]:
"""Check for pedestrian safety issues"""
alerts = []
current_time = time.time()
pedestrian_tracks = [t for t in tracks if t['category'] == 'pedestrian']
vehicle_tracks = [t for t in tracks if t['category'] == 'vehicle']
for ped_track in pedestrian_tracks:
# Check for conflicts with vehicles
conflicts = self._check_vehicle_conflicts(ped_track, vehicle_tracks)
# Check if pedestrian is in crosswalk
in_crosswalk = self._check_crosswalk_usage(ped_track)
if conflicts:
alert = {
'type': 'pedestrian_vehicle_conflict',
'severity': 'high',
'timestamp': current_time,
'pedestrian_id': ped_track['id'],
'conflicting_vehicles': conflicts,
'location': ped_track['last_detection'].get('center_of_mass', {}),
'in_crosswalk': in_crosswalk
}
alerts.append(alert)
elif not in_crosswalk:
alert = {
'type': 'pedestrian_outside_crosswalk',
'severity': 'medium',
'timestamp': current_time,
'pedestrian_id': ped_track['id'],
'location': ped_track['last_detection'].get('center_of_mass', {})
}
alerts.append(alert)
return alerts
def _check_vehicle_conflicts(self, ped_track: Dict, vehicle_tracks: List[Dict]) -> List[int]:
"""Check for conflicts between pedestrian and vehicles"""
conflicts = []
ped_center = ped_track['last_detection'].get('center_of_mass', {})
for vehicle in vehicle_tracks:
vehicle_center = vehicle['last_detection'].get('center_of_mass', {})
# Check proximity (simplified)
distance = self._calculate_distance(ped_center, vehicle_center)
if distance < 50: # 50 pixels threshold
conflicts.append(vehicle['id'])
return conflicts
def _check_crosswalk_usage(self, ped_track: Dict) -> bool:
"""Check if pedestrian is using designated crosswalk"""
ped_center = ped_track['last_detection'].get('center_of_mass', {})
# Check against crosswalk ROIs
crosswalks = self.config.get('intersection', {}).get('roi_definitions', {}).get('crosswalks', [])
for crosswalk in crosswalks:
coords = crosswalk.get('coordinates', {})
if self._point_in_rectangle(ped_center, coords):
return True
return False
def _point_in_rectangle(self, point: Dict, rect: Dict) -> bool:
"""Check if point is inside rectangle"""
if not point or not rect:
return False
x, y = point.get('x', 0), point.get('y', 0)
rx, ry = rect.get('x', 0), rect.get('y', 0)
rw, rh = rect.get('width', 0), rect.get('height', 0)
return rx <= x <= rx + rw and ry <= y <= ry + rh
def _calculate_distance(self, center1: Dict, center2: Dict) -> float:
"""Calculate distance between two centers"""
if not center1 or not center2:
return float('inf')
x1, y1 = center1.get('x', 0), center1.get('y', 0)
x2, y2 = center2.get('x', 0), center2.get('y', 0)
return np.sqrt((x1 - x2)**2 + (y1 - y2)**2)
def update_config(self, config: Dict):
"""Update monitor configuration"""
self.config = config
class ROIManager:
"""Manage regions of interest for intersection analytics"""
def __init__(self, config: Dict):
self.config = config
self.roi_definitions = config.get('intersection', {}).get('roi_definitions', {})
def process_objects(self, tracks: List[Dict]) -> List[Dict]:
"""Process objects against all ROI definitions"""
events = []
for track in tracks:
track_center = track['last_detection'].get('center_of_mass', {})
# Check against all ROI types
for roi_type, roi_list in self.roi_definitions.items():
if isinstance(roi_list, list):
for roi in roi_list:
if self._object_in_roi(track_center, roi):
event = {
'type': f'object_in_{roi_type}',
'roi_id': roi.get('id', 'unknown'),
'roi_type': roi_type,
'object_id': track['id'],
'object_category': track['category'],
'timestamp': time.time(),
'location': track_center,
'is_violation': self._check_violation(track, roi, roi_type)
}
events.append(event)
return events
def _object_in_roi(self, obj_center: Dict, roi: Dict) -> bool:
"""Check if object center is within ROI"""
roi_coords = roi.get('coordinates', {})
if roi.get('type') == 'rectangle':
return self._point_in_rectangle(obj_center, roi_coords)
return False
def _point_in_rectangle(self, point: Dict, rect: Dict) -> bool:
"""Check if point is inside rectangle"""
if not point or not rect:
return False
x, y = point.get('x', 0), point.get('y', 0)
rx, ry = rect.get('x', 0), rect.get('y', 0)
rw, rh = rect.get('width', 0), rect.get('height', 0)
return rx <= x <= rx + rw and ry <= y <= ry + rh
def _check_violation(self, track: Dict, roi: Dict, roi_type: str) -> bool:
"""Check if object presence in ROI constitutes a violation"""
# Example violation logic
if roi_type == 'crosswalks':
# Vehicle in crosswalk when pedestrian present could be violation
return track['category'] == 'vehicle'
return False
def update_config(self, config: Dict):
"""Update ROI manager configuration"""
self.config = config
self.roi_definitions = config.get('intersection', {}).get('roi_definitions', {})
class PerformanceMonitor:
"""Monitor performance of intersection analytics"""
def __init__(self):
self.processing_times = deque(maxlen=100)
self.object_counts = deque(maxlen=100)
self.start_time = time.time()
def update(self, processing_time_ms: float, object_count: int):
"""Update performance metrics"""
self.processing_times.append(processing_time_ms)
self.object_counts.append(object_count)
def get_stats(self) -> Dict:
"""Get current performance statistics"""
if not self.processing_times:
return {}
return {
'avg_processing_time_ms': np.mean(self.processing_times),
'max_processing_time_ms': np.max(self.processing_times),
'min_processing_time_ms': np.min(self.processing_times),
'avg_object_count': np.mean(self.object_counts) if self.object_counts else 0,
'total_uptime_seconds': time.time() - self.start_time,
'frames_processed': len(self.processing_times)
}