Files

385 lines
15 KiB
Plaintext

from PySide6.QtCore import QObject, Signal, QThread, Qt, QMutex, QWaitCondition, QTimer
from PySide6.QtGui import QImage, QPixmap
import cv2
import time
import numpy as np
from collections import deque
from typing import Dict, List, Optional
import os
import sys
# Add parent directory to path for imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Import utilities
from utils.annotation_utils import (
draw_detections,
draw_violations,
draw_performance_metrics,
resize_frame_for_display,
convert_cv_to_qimage,
convert_cv_to_pixmap
)
class VideoController(QObject):
frame_ready = Signal(object, object, object, dict) # QPixmap, detections, violations, metrics
raw_frame_ready = Signal(np.ndarray, list, list, float) # frame, detections, violations, fps
def __init__(self, model_manager=None):
"""
Initialize video controller.
Args:
model_manager: Model manager instance for detection and violation
"""
super().__init__()
self.model_manager = model_manager
self.source = 0 # Default camera source
self._running = False
self.frame_count = 0
self.start_time = 0
self.source_fps = 0
self.actual_fps = 0
self.processing_times = deque(maxlen=30)
self.cap = None # VideoCapture object
# Configure thread
self.thread = QThread()
self.moveToThread(self.thread)
self.thread.started.connect(self._run)
# Performance measurement
self.mutex = QMutex()
self.condition = QWaitCondition()
self.performance_metrics = {
'FPS': 0.0,
'Detection (ms)': 0.0,
'Violation (ms)': 0.0,
'Total (ms)': 0.0
}
# Setup render timer
self.render_timer = QTimer()
self.render_timer.timeout.connect(self._process_frame)
# Frame buffer
self.current_frame = None
self.current_detections = []
self.current_violations = []
# Debug counter
self.debug_counter = 0
def set_source(self, source):
"""Set video source (file path, camera index, or URL)"""
print(f"DEBUG: VideoController.set_source called with: {source} (type: {type(source)})")
was_running = self._running
if self._running:
self.stop()
# Critical fix: Make sure source is properly set
if source is None:
print("WARNING: Received None source, defaulting to camera 0")
self.source = 0
elif isinstance(source, str) and source.strip():
# Handle file paths - verify the file exists
if os.path.exists(source):
self.source = source
print(f"DEBUG: VideoController source set to file: {self.source}")
else:
# Try to interpret as camera index or URL
try:
# If it's a digit string, convert to integer camera index
if source.isdigit():
self.source = int(source)
print(f"DEBUG: VideoController source set to camera index: {self.source}")
else:
# Treat as URL or special device string
self.source = source
print(f"DEBUG: VideoController source set to URL/device: {self.source}")
except ValueError:
print(f"WARNING: Could not interpret source: {source}, defaulting to camera 0")
self.source = 0
elif isinstance(source, int):
# Camera index
self.source = source
print(f"DEBUG: VideoController source set to camera index: {self.source}")
else:
print(f"WARNING: Unrecognized source type: {type(source)}, defaulting to camera 0")
self.source = 0
# Get properties of the source (fps, dimensions, etc)
self._get_source_properties()
if was_running:
self.start()
def _get_source_properties(self):
"""Get properties of video source"""
try:
cap = cv2.VideoCapture(self.source)
if cap.isOpened():
self.source_fps = cap.get(cv2.CAP_PROP_FPS)
if self.source_fps <= 0:
self.source_fps = 30.0 # Default if undetectable
self.frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
self.frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
self.frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
cap.release()
print(f"Video source: {self.frame_width}x{self.frame_height}, {self.source_fps} FPS")
else:
print("Failed to open video source")
except Exception as e:
print(f"Error getting source properties: {e}")
def start(self):
"""Start video processing"""
if not self._running:
self._running = True
self.start_time = time.time()
self.frame_count = 0
self.debug_counter = 0
print("DEBUG: Starting video processing thread")
# Start the processing thread
if not self.thread.isRunning():
self.thread.start()
# Start the render timer with a faster interval (16ms = ~60fps)
self.render_timer.start(16)
print("DEBUG: Render timer started")
def stop(self):
"""Stop video processing"""
if self._running:
print("DEBUG: Stopping video processing")
self._running = False
self.render_timer.stop()
# Properly terminate the thread
self.thread.quit()
if not self.thread.wait(3000): # Wait 3 seconds max
self.thread.terminate()
print("WARNING: Thread termination forced")
# Close the capture if it exists
if self.cap and self.cap.isOpened():
self.cap.release()
self.cap = None
# Clear the current frame
self.mutex.lock()
self.current_frame = None
self.mutex.unlock()
print("DEBUG: Video processing stopped")
def capture_snapshot(self) -> np.ndarray:
"""Capture current frame"""
if self.current_frame is not None:
return self.current_frame.copy()
return None
def _run(self):
"""Main processing loop (runs in thread)"""
try:
# Print the source we're trying to open
print(f"DEBUG: Opening video source: {self.source} (type: {type(self.source)})")
# Initialize the capture
self.cap = None
# Handle different source types
if isinstance(self.source, str) and os.path.exists(self.source):
# It's a valid file path
print(f"DEBUG: Opening video file: {self.source}")
self.cap = cv2.VideoCapture(self.source)
# Verify file opened successfully
if not self.cap.isOpened():
print(f"ERROR: Could not open video file: {self.source}")
return
elif isinstance(self.source, int) or (isinstance(self.source, str) and self.source.isdigit()):
# It's a camera index
camera_idx = int(self.source) if isinstance(self.source, str) else self.source
print(f"DEBUG: Opening camera: {camera_idx}")
self.cap = cv2.VideoCapture(camera_idx)
# Try a few times to open camera (sometimes takes a moment)
retry_count = 0
while not self.cap.isOpened() and retry_count < 3:
print(f"Camera not ready, retrying ({retry_count+1}/3)...")
time.sleep(1)
self.cap.release()
self.cap = cv2.VideoCapture(camera_idx)
retry_count += 1
if not self.cap.isOpened():
print(f"ERROR: Could not open camera {camera_idx} after {retry_count} attempts")
return
else:
# Try as a string source (URL or device path)
print(f"DEBUG: Opening source as string: {self.source}")
self.cap = cv2.VideoCapture(str(self.source))
if not self.cap.isOpened():
print(f"ERROR: Could not open source: {self.source}")
return
# Check again to ensure capture is valid
if not self.cap or not self.cap.isOpened():
print(f"ERROR: Could not open video source {self.source}")
return
# Configure frame timing based on source FPS
frame_time = 1.0 / self.source_fps if self.source_fps > 0 else 0.033
prev_time = time.time()
# Log successful opening
print(f"SUCCESS: Video source opened: {self.source}")
print(f"Source info - FPS: {self.source_fps}, Size: {self.frame_width}x{self.frame_height}")
# Main processing loop
while self._running and self.cap.isOpened():
ret, frame = self.cap.read()
if not ret:
print("End of video or read error")
break
# Detection and violation processing
process_start = time.time()
# Process detections
detection_start = time.time()
detections = []
if self.model_manager:
detections = self.model_manager.detect(frame)
detection_time = (time.time() - detection_start) * 1000
# Violation detection is disabled
violation_start = time.time()
violations = []
# if self.model_manager and detections:
# violations = self.model_manager.detect_violations(
# detections, frame, time.time()
# )
violation_time = (time.time() - violation_start) * 1000
# Update tracking if available
if self.model_manager:
detections = self.model_manager.update_tracking(detections, frame)
# Calculate timing metrics
process_time = (time.time() - process_start) * 1000
self.processing_times.append(process_time)
# Update FPS
now = time.time()
self.frame_count += 1
elapsed = now - self.start_time
if elapsed > 0:
self.actual_fps = self.frame_count / elapsed
fps_smoothed = 1.0 / (now - prev_time) if now > prev_time else 0
prev_time = now
# Update metrics
self.performance_metrics = {
'FPS': f"{fps_smoothed:.1f}",
'Detection (ms)': f"{detection_time:.1f}",
'Violation (ms)': f"{violation_time:.1f}",
'Total (ms)': f"{process_time:.1f}"
}
# Store current frame data (thread-safe)
self.mutex.lock()
self.current_frame = frame.copy()
self.current_detections = detections
self.current_violations = violations
self.mutex.unlock()
# Signal for raw data subscribers
self.raw_frame_ready.emit(frame.copy(), detections, violations, fps_smoothed)
# Control processing rate for file sources
if isinstance(self.source, str) and self.source_fps > 0:
frame_duration = time.time() - process_start
if frame_duration < frame_time:
time.sleep(frame_time - frame_duration)
if self.cap:
self.cap.release()
self.cap = None
except Exception as e:
print(f"Video processing error: {e}")
import traceback
traceback.print_exc()
finally:
self._running = False
if self.cap and self.cap.isOpened():
self.cap.release()
self.cap = None
def _process_frame(self):
"""Process current frame for UI rendering (called by timer)"""
if not self._running:
return
# Debug counter
if hasattr(self, 'debug_counter'):
self.debug_counter += 1
if self.debug_counter % 30 == 0: # Print every ~30 frames
print(f"DEBUG: Frame processing iteration: {self.debug_counter}")
# Get frame data safely
self.mutex.lock()
frame = self.current_frame.copy() if self.current_frame is not None else None
detections = self.current_detections.copy() if hasattr(self, 'current_detections') and self.current_detections else []
violations = self.current_violations.copy() if hasattr(self, 'current_violations') and self.current_violations else []
metrics = self.performance_metrics.copy()
self.mutex.unlock()
if frame is None:
print("DEBUG: _process_frame skipped - no frame available")
return
try:
# Annotate frame
annotated_frame = frame.copy()
if detections:
annotated_frame = draw_detections(annotated_frame, detections, True, True)
# Draw metrics
annotated_frame = draw_performance_metrics(annotated_frame, metrics)
# Resize for display
display_frame = resize_frame_for_display(annotated_frame)
# Convert to QPixmap directly using a better approach
rgb_image = cv2.cvtColor(display_frame, cv2.COLOR_BGR2RGB)
h, w, ch = rgb_image.shape
bytes_per_line = ch * w
# Create QImage - critical: use .copy() to ensure data stays valid
q_image = QImage(rgb_image.data, w, h, bytes_per_line, QImage.Format_RGB888).copy()
# Convert to pixmap
pixmap = QPixmap.fromImage(q_image)
# Emit signal with processed frame
if not pixmap.isNull():
print(f"DEBUG: Emitting pixmap: {pixmap.width()}x{pixmap.height()}")
self.frame_ready.emit(pixmap, detections, violations, metrics)
else:
print("ERROR: Created QPixmap is null")
except Exception as e:
print(f"ERROR in _process_frame: {e}")
import traceback
traceback.print_exc()