Files

22 KiB
Raw Permalink Blame History

Traffic Monitoring System: End-to-End Pipeline Documentation (Deep Dive)


Table of Contents

  1. Introduction
  2. E2E Pipeline Overview
  3. VIDEO INPUT
  4. FRAME PREPROCESSING
  5. YOLO DETECTION
  6. BYTETRACK TRACKING
  7. TRAFFIC LIGHT DETECTION
  8. CROSSWALK DETECTION
  9. VIOLATION ANALYSIS
  10. UI VISUALIZATION
  11. LOGGING & STORAGE
  12. DEVICE & MODEL SWITCHING
  13. ANALYTICS & PERFORMANCE MONITORING
  14. SYSTEM ANALYSIS & REPORTING
  15. CONFIGURATION & EXTENSIBILITY
  16. ERROR HANDLING & FALLBACKS
  17. PACKAGING & DEPLOYMENT
  18. Developer Notes & Best Practices
  19. Example Data Flows
  20. Glossary
  21. Application Implementation Architecture & Deployment
  22. Migration to Containers & Microservices: Practical Guide

1. Introduction

This document is a comprehensive, code-mapped, and developer-friendly guide to the traffic video analytics system implemented in the khatam project. It covers every stage of the E2E pipeline, from video input to logging and storage, and explains the logic, function definitions, and data flow in detail. The goal is to make the system architecture, data flow, and component responsibilities clear and accessible for developers, maintainers, and reviewers.


2. E2E Pipeline Overview

📹 VIDEO INPUT
    ↓ (CPU)
🔍 FRAME PREPROCESSING
    ↓ (CPU → GPU/NPU)
🤖 YOLO DETECTION
    ↓ (CPU)
🎯 BYTETRACK TRACKING
    ↓ (CPU)
🚦 TRAFFIC LIGHT DETECTION
    ↓ (CPU)
🚶 CROSSWALK DETECTION
    ↓ (CPU)
⚖️ VIOLATION ANALYSIS
    ↓ (CPU)
🖼️ UI VISUALIZATION
    ↓ (CPU)
💾 LOGGING & STORAGE

3. VIDEO INPUT (Deep Dive)

Main Classes and Responsibilities

  • MainWindow / EnhancedMainWindow: Entry point for the UI, connects user actions (open file, start/stop, select camera) to the video controller.
  • VideoController: Handles all video source logic. Maintains state (current source, frame index, FPS, etc.), manages OpenCV capture object, and emits frames via Qt signals.
  • Signal Flow: User action → MainWindow slot → VideoController method → emits frame_ready signal → downstream slots (preprocessing, analytics, UI).

Key Methods

  • __init__: Initializes capture state, sets up signals/slots.
  • start_capture(source): Opens the video source, sets up a timer or thread for frame reading.
  • read_frame(): Reads a frame, handles errors (end of stream, device disconnect), emits frame.
  • stop_capture(): Releases resources, stops timers/threads.

Error Handling

  • If the video source fails (file not found, camera error), emits an error signal to the UI.
  • If end-of-stream is reached, can loop, stop, or prompt the user.

Example Signal Connection

self.video_controller.frame_ready.connect(self.on_frame_ready)

Example: Handling Multiple Sources

def start_capture(self, source):
    if isinstance(source, int):  # Webcam
        self.cap = cv2.VideoCapture(source)
    elif isinstance(source, str):  # File or RTSP
        self.cap = cv2.VideoCapture(source)
    # ... handle errors, set FPS, etc.

4. FRAME PREPROCESSING (Deep Dive)

Preprocessing Pipeline

  • Resize: Ensures frame matches model input size (e.g., 640x640 for YOLOv11n).
  • Color Conversion: Converts BGR (OpenCV default) to RGB or other formats as required.
  • Normalization: Scales pixel values to [0, 1] or [-1, 1] as needed by the model.
  • Padding/Cropping: Maintains aspect ratio or fits model input shape.
  • Device Transfer: If using GPU/NPU, may convert frame to appropriate memory space (e.g., OpenVINO blob, CUDA tensor).

Example: Preprocessing Function

def preprocess(frame, input_shape):
    # Resize
    frame = cv2.resize(frame, input_shape)
    # Convert color
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    # Normalize
    frame = frame.astype(np.float32) / 255.0
    # (Optional) Pad/crop
    # (Optional) Convert to OpenVINO blob
    return frame

Integration with Device Selection

  • If the model is running on GPU/NPU, preprocessing may include conversion to device-specific format.
  • Device selection logic (in ModelManager) determines if preprocessing should prepare data for CPU, GPU, or NPU.

Error Handling

  • If frame is None or invalid, preprocessing returns a default or skips the frame.
  • Handles exceptions in color conversion or resizing gracefully.

5. YOLO DETECTION (Deep Dive)

Model Loading and Compilation

  • ModelManager: Responsible for loading YOLOv11 models, compiling with OpenVINO, and managing device selection.
  • OpenVINO Core: Used to read and compile models for CPU, GPU, or NPU.
  • Model Switching: If performance drops, ModelManager can switch to a lighter model or different device.

Inference Logic

  • Receives preprocessed frame.
  • Runs inference using OpenVINO's compiled_model([input_tensor]).
  • Parses output to extract bounding boxes, class labels, and confidence scores.

Example: Detection Function

def detect_vehicles(self, frame):
    input_tensor = self.preprocess(frame)
    output = self.compiled_model([input_tensor])[self.output_layer]
    detections = self.postprocess(output, frame.shape)
    return detections

Device/Model Switching

  • If FPS < threshold or latency > threshold, triggers switch_device() or switch_model().
  • Switch events are logged and visualized in the UI.

Error Handling

  • If inference fails, logs error and may fallback to CPU or a lighter model.
  • Handles device unavailability and model loading errors.

6. BYTETRACK TRACKING

Code Location

  • qt_app_pyside/controllers/video_controller_new.py
  • qt_app_pyside/bytetrack/

Description

Detected objects are passed to the ByteTrack tracker for multi-object tracking. ByteTrack assigns unique IDs to objects and maintains their trajectories across frames. Tracking is performed on the CPU for efficiency. The tracker handles object association, lost/found logic, and ID management.

Key Functions

  • ByteTrackTracker.update(detections): Updates the tracker with new detections.
  • VideoController._track_objects(): Manages the tracking process.

Data Flow

  1. Detected objects received from the YOLO detection stage.
  2. Objects are passed to the ByteTrack tracker.
  3. Tracker updates object states and IDs.

Example

def update(self, detections):
    for detection in detections:
        if detection.confidence > self.confidence_threshold:
            self.tracked_objects.append(detection)

7. TRAFFIC LIGHT DETECTION

Code Location

  • qt_app_pyside/utils/traffic_light_utils.py
  • qt_app_pyside/red_light_violation_pipeline.py

Description

Specialized logic detects the state and position of traffic lights in the frame. May use color thresholding, region-of-interest analysis, or a dedicated model. Results are used for violation analysis (e.g., red light running).

Key Functions

  • detect_traffic_lights(frame): Detects traffic lights in the frame.
  • RedLightViolationPipeline.process_traffic_lights(): Processes and analyzes traffic light data.

Data Flow

  1. Frame with detected objects received from the tracking stage.
  2. Traffic light detection applied to the frame.
  3. Results used for violation analysis.

Example

def detect_traffic_lights(frame):
    # Convert to HSV and threshold for red color
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    mask = cv2.inRange(hsv, LOWER_RED, UPPER_RED)
    return mask

8. CROSSWALK DETECTION

Code Location

  • qt_app_pyside/utils/crosswalk_utils_advanced.py
  • qt_app_pyside/utils/crosswalk_utils2.py

Description

Detects crosswalks using image processing or deep learning. Used to determine pedestrian zones and for violation logic.

Key Functions

  • detect_crosswalks(frame): Detects crosswalks in the frame.

Data Flow

  1. Frame with detected objects received from the tracking stage.
  2. Crosswalk detection applied to the frame.
  3. Results used for violation analysis and UI visualization.

Example

def detect_crosswalks(frame):
    # Use Hough Transform to detect lines that form crosswalks
    lines = cv2.HoughLinesP(frame, 1, np.pi / 180, threshold=100)
    return lines

9. VIOLATION ANALYSIS

Code Location

  • qt_app_pyside/red_light_violation_pipeline.py
  • qt_app_pyside/violation_openvino.py

Description

Combines tracking, traffic light, and crosswalk data to detect violations (e.g., red light running, crosswalk violations). Applies rule-based or ML-based logic to determine if a violation occurred. Results are logged and visualized.

Key Functions

  • RedLightViolationPipeline.analyze_violations(): Analyzes potential violations.
  • ViolationAnalyzer.process(): Processes violations for logging and visualization.

Data Flow

  1. Tracked objects and traffic light states received.
  2. Violation analysis applied based on rules or ML models.
  3. Violations are logged and may trigger alerts or actions.

Example

def analyze_violations(self):
    for track in self.tracks:
        if track.violation_flag:
            self.violations.append(track)

10. UI VISUALIZATION

Code Location

  • qt_app_pyside/main.py
  • qt_app_pyside/enhanced_main_window.py
  • qt_app_pyside/ui/analytics_tab.py
  • qt_app_pyside/ui/performance_graphs.py

Description

The PySide6 UI displays the video, overlays detections, tracks, and violation markers. Real-time analytics (FPS, latency, counts) are shown in dedicated tabs. Performance graphs update live using signals from the analytics controller. Device/model switches and latency spikes are visualized.

Key Functions

  • MainWindow.display_frame(): Displays the current frame in the UI.
  • AnalyticsTab.update_charts(): Updates analytics charts with new data.
  • PerformanceGraphsWidget.update_metrics(): Updates performance metrics in the UI.

Data Flow

  1. Processed frame with overlays ready from the violation analysis stage.
  2. Frame displayed in the UI with real-time updates for analytics and performance.

Example

def display_frame(self, frame):
    # Convert the frame to QImage and display in the label
    height, width, channel = frame.shape
    bytes_per_line = 3 * width
    qimg = QImage(frame.data, width, height, bytes_per_line, QImage.Format_RGB888)
    self.video_label.setPixmap(QPixmap.fromImage(qimg))

11. LOGGING & STORAGE

Code Location

  • qt_app_pyside/annotation_utils.py
  • qt_app_pyside/logging_utils.py
  • qt_app_pyside/analytics_controller.py

Description

All detections, tracks, violations, and analytics are logged to disk (JSON, CSV, or database). System analysis and performance reports are saved for later review. Logging is handled asynchronously to avoid blocking the main pipeline.

Key Functions

  • AnalyticsController.save_report(): Saves the analytics report to disk.
  • LoggingUtils.log_event(): Logs events and metrics to the configured sink.

Data Flow

  1. Detection, tracking, and violation data generated.
  2. Data logged asynchronously to the configured storage (file, database).
  3. Reports and analytics data saved for review and debugging.

Example

def log_event(self, event_data):
    # Append the event data to the log file
    with open(self.log_file, 'a') as f:
        json.dump(event_data, f)
        f.write('\n')

12. DEVICE & MODEL SWITCHING

Code Location

  • qt_app_pyside/controllers/model_manager.py
  • qt_app_pyside/controllers/analytics_controller.py

Description

The system monitors FPS, latency, and resource usage. If performance drops (e.g., FPS < threshold, high latency), the model or device is switched automatically. Device switch events are logged and visualized in the UI.

Key Functions

  • ModelManager.switch_device(): Switches the device for model inference.
  • AnalyticsController.update_device(): Updates the device configuration based on performance.

Data Flow

  1. Performance metrics monitored in real time.
  2. If metrics exceed thresholds, device or model is switched.
  3. New device/model is used for subsequent inference and processing.

Example

def switch_device(self, new_device):
    self.current_device = new_device
    # Reinitialize the model with the new device
    self.model = Core().compile_model(self.model, new_device)

13. ANALYTICS & PERFORMANCE MONITORING

Code Location

  • qt_app_pyside/controllers/analytics_controller.py
  • qt_app_pyside/ui/performance_graphs.py
  • qt_app_pyside/system_metrics_monitor.py

Description

The analytics controller collects per-frame and aggregated metrics (FPS, latency, counts, spikes). Live system metrics (CPU/RAM) are collected using psutil and included in analytics data. All metrics are emitted via Qt signals to update the UI in real time.

Key Functions

  • AnalyticsController.process_frame_data(): Processes and emits frame-level analytics data.
  • AnalyticsController.get_latency_statistics(): Returns latency statistics for analysis.
  • SystemMetricsMonitor.get_cpu_ram_metrics(): Collects CPU and RAM usage metrics.

Data Flow

  1. Frame processing completes, and analytics data is ready.
  2. Data is emitted via signals to update UI components (charts, labels).
  3. System metrics are collected and displayed in real time.

Example

def process_frame_data(self, frame_data):
    # Calculate FPS and latency
    self.fps = 1.0 / (time.time() - self.last_frame_time)
    self.last_frame_time = time.time()
    # Emit the new metrics
    self.fps_changed.emit(self.fps)

14. SYSTEM ANALYSIS & REPORTING

Code Location

  • qt_app_pyside/system_analysis.py

Description

Provides comprehensive system and pipeline analysis, including platform specs, pipeline architecture, tracking performance, latency spikes, model switching, and optimization recommendations. Generates and saves detailed reports for debugging and optimization.

Key Functions

  • TrafficMonitoringAnalyzer.generate_comprehensive_report(): Generates a detailed report of the system's performance and configuration.

Data Flow

  1. System and pipeline data is collected.
  2. Analysis is performed to identify issues and optimizations.
  3. Reports are generated and saved for review.

Example

def generate_comprehensive_report(self):
    # Collect data from all relevant sources
    data = self.collect_data()
    # Analyze the data and generate a report
    report = self.analyze_data(data)
    # Save the report to a file
    with open(self.report_file, 'w') as f:
        f.write(report)

15. CONFIGURATION & EXTENSIBILITY

Code Location

  • qt_app_pyside/config.json
  • qt_app_pyside/requirements.txt
  • qt_app_pyside/build_exe.py

Description

All model, device, and pipeline parameters are configurable via JSON and command-line arguments. The system is designed for easy extension (new models, trackers, analytics).


16. ERROR HANDLING & FALLBACKS

Code Location

  • All major modules

Description

Robust error handling ensures the pipeline continues running even if a component fails. Fallbacks are in place for device switching, model loading, and analytics.


17. PACKAGING & DEPLOYMENT

Code Location

  • qt_app_pyside/qt_app.spec
  • qt_app_pyside/build_exe.py
  • qt_app_pyside/requirements.txt

Description

The application is packaged as a single executable using PyInstaller. All dependencies, models, and resources are bundled for easy deployment.


18. Developer Notes & Best Practices

  • Use virtual environments to manage dependencies (venv, conda).
  • Regularly update models and dependencies for best performance and features.
  • Monitor system performance and adjust device/model configurations as needed.
  • Refer to the code comments and function docstrings for detailed logic and usage.

19. Example Data Flows

19.1. From Video File

  1. User selects a video file in the UI.
  2. VideoController opens the file and starts reading frames.
  3. Frames are preprocessed and passed to the YOLO detection model.
  4. Detected objects are tracked, and violations are analyzed.
  5. Results are logged, and analytics are updated in the UI.

19.2. From Webcam

  1. User selects the webcam as the video source.
  2. VideoController initializes the webcam stream.
  3. Frames are captured and processed in real time.
  4. Detected objects and violations are displayed in the UI.
  5. Performance metrics are logged and visualized.

20. Glossary

  • E2E: End-to-End, referring to the complete pipeline from video input to logging and storage.
  • YOLO: You Only Look Once, a real-time object detection system.
  • ByteTrack: A multi-object tracking algorithm.
  • OpenVINO: Open Visual Inference and Neural Network Optimization, a toolkit for optimizing and deploying AI inference.
  • Qt: A free and open-source widget toolkit for creating graphical user interfaces as well as non-GUI programs.

21. Application Implementation Architecture & Deployment

Monolithic Desktop Application

  • The traffic monitoring system is implemented as a monolithic desktop application using Python and PySide6 (Qt for Python).
  • All major components (video input, detection, tracking, analytics, UI, logging) are integrated into a single process and codebase.

Containers

  • No containers are used in the standard deployment. The application is designed to run directly on Windows (and optionally Linux) as a standalone executable.
  • All dependencies (Python runtime, libraries, models) are bundled using PyInstaller, so users do not need Docker or other container runtimes.

Microservices

  • No microservices are used. The architecture is not distributed; all logic runs in a single process.
  • Communication between components is handled via Python function calls and Qt signals/slots, not via network APIs or service calls.

Rationale

  • This design is chosen for ease of deployment, real-time performance, and simplicity for end users (e.g., traffic authorities, researchers).
  • The system can be extended to use microservices or containers for cloud-based or distributed deployments, but the current implementation is optimized for local, real-time desktop use.

Extensibility

  • The codebase is modular, so individual components (e.g., detection, analytics, UI) can be refactored into microservices if needed in the future.
  • For large-scale deployments (e.g., city-wide monitoring), a distributed architecture with containers and microservices could be considered, but is not present in the current version.

Summary Table

Aspect Implementation
Containerized? No
Microservices? No (Monolithic)
Platform Windows Desktop (PyInstaller)
UI Framework PySide6 (Qt for Python)
Deployment Single executable

Conclusion

This documentation provides a detailed, code-mapped explanation of the traffic monitoring system's E2E pipeline. Each stage is modular, extensible, and robust, with clear separation of concerns and real-time analytics for performance monitoring and optimization. For further details, refer to the code comments and function docstrings in each module.


22. How to Move from Conda to Containers & Microservices: Step-by-Step Guide

1 Identify and Modularize Services

  • Detection Service: Handles frame input, runs YOLOv11, returns detections (bounding boxes, classes, scores).
  • Tracking Service: Accepts detections, runs ByteTrack/DeepSORT, returns tracked IDs and trajectories.
  • Analytics Service: Processes tracking data, computes counts, violations, and aggregates.
  • UI Service: (Optional) PySide6 desktop UI or a web UI (Flask/FastAPI + React/Vue).

Action:

  • Refactor your codebase so each of these is a separate Python module or folder with a clear entry point (e.g., detector.py, tracker.py, analytics.py).

2 Replace Conda with Docker for Environment Management

  • Write a requirements.txt using pip freeze > requirements.txt inside your Conda environment.
  • Remove any Conda-specific packages from requirements.txt (e.g., conda, conda-package-handling).
  • Create a Dockerfile:
FROM python:3.10-slim
RUN apt-get update && apt-get install -y \
    ffmpeg \
    libgl1 \
    && rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "main.py"]  # Replace with your entry point
  • Build and run:
docker build -t traffic-monitor .
docker run --rm -it traffic-monitor

3 Add REST APIs for Microservices

  • Use FastAPI or Flask in each service to expose endpoints:
    • /detect for detection
    • /track for tracking
    • /analyze for analytics
  • Example (FastAPI):
from fastapi import FastAPI, File, UploadFile
app = FastAPI()
@app.post("/detect")
def detect(file: UploadFile = File(...)):
    # Run detection logic
    return {"detections": ...}
  • The UI/controller sends HTTP requests to these endpoints using requests or httpx.

4 Orchestrate with Docker Compose

  • Create a docker-compose.yml to run all services together:
version: "3"
services:
  detector:
    build: ./detector
    ports: ["8001:8000"]
  tracker:
    build: ./tracker
    ports: ["8002:8000"]
  analytics:
    build: ./analytics
    ports: ["8003:8000"]
  ui:
    build: ./ui
    ports: ["8501:8501"]
  • Now you can start all services with docker-compose up.

5 (Optional) Scale with Kubernetes

  • For large deployments, write Kubernetes manifests to deploy and scale each service.
  • Use cloud GPU nodes for detection, CPU nodes for analytics/UI.

6 Practical Migration Steps

  • Start by containerizing your current monolithic app (single Dockerfile).
  • Refactor detection, tracking, analytics into separate modules/services.
  • Add REST APIs to each service.
  • Use Docker Compose for local multi-service testing.
  • Gradually move to cloud or edge as needed.

7 Resources


Summary:

  • Containers replace Conda for environment management and make deployment portable.
  • Microservices make your system modular, scalable, and cloud/edge-ready.
  • Start with Docker, then add REST APIs, then orchestrate with Docker Compose/Kubernetes.
  • This approach prepares your project for production, research, and smart city scale.