Files
Traffic-Intersection-Monito…/detection_openvino_async.py
2025-08-26 13:07:59 -07:00

2879 lines
123 KiB
Python

"""
Enhanced OpenVINO vehicle detector with async inference support
"""
# Import original detector to extend it
from detection_openvino import OpenVINOVehicleDetector as BaseDetector
import numpy as np
import time
from typing import List, Dict, Optional
class OpenVINOVehicleDetector(BaseDetector):
"""
Enhanced OpenVINO vehicle detector with async inference support
"""
def __init__(self, model_path: str = None, device: str = "AUTO",
use_quantized: bool = False, enable_ocr: bool = False,
confidence_threshold: float = 0.4, num_requests: int = 4):
"""
Initialize the detector with async inference support.
Args:
model_path: Path to the model XML file
device: Inference device (CPU, GPU, AUTO)
use_quantized: Whether to use INT8 quantized model
enable_ocr: Whether to enable OCR
confidence_threshold: Detection confidence threshold
num_requests: Number of async inference requests to create
"""
# Initialize the base detector
super().__init__(model_path, device, use_quantized, enable_ocr, confidence_threshold)
# Create multiple inference requests for pipelining
self.num_requests = num_requests
self.infer_requests = [self.compiled_model.create_infer_request() for _ in range(num_requests)]
self.current_request_idx = 0
# Keep track of requests in flight
self.active_requests = {} # frame_id -> (request, frame_shape, start_time)
self.next_frame_id = 0
print(f"✅ Created {num_requests} async inference requests for {device} device")
def detect_async_start(self, frame: np.ndarray) -> int:
"""
Start asynchronous detection on a frame.
Args:
frame: Input frame
Returns:
frame_id: ID to use when retrieving results
"""
# Get next available request
request = self.infer_requests[self.current_request_idx]
self.current_request_idx = (self.current_request_idx + 1) % len(self.infer_requests)
# Preprocess frame
preprocessed_frame = self._preprocess(frame)
# Get frame ID and add to active requests
frame_id = self.next_frame_id
self.next_frame_id += 1
# Record the start time for performance tracking
start_time = time.time()
# Start async inference
request.start_async({0: preprocessed_frame})
# Store request info
self.active_requests[frame_id] = (request, frame.shape[:2], start_time)
return frame_id
def detect_async_get_result(self, frame_id: int, wait: bool = True,
conf_threshold: Optional[float] = None) -> Optional[List[Dict]]:
"""
Get results from an async inference request.
Args:
frame_id: Frame ID returned from detect_async_start
wait: Whether to wait for the request to complete
conf_threshold: Optional confidence threshold override
Returns:
Detections or None if not ready
"""
if frame_id not in self.active_requests:
print(f"⚠️ Frame ID {frame_id} not found in active requests")
return None
request, frame_shape, start_time = self.active_requests[frame_id]
# Check if request is complete
if wait:
request.wait()
elif request.wait(0) != 0: # Not finished yet
return None
# Get output and process
output = request.get_output_tensor().data
# Use provided threshold or default
threshold = conf_threshold if conf_threshold is not None else self.confidence_threshold
# Process results
detections = self._postprocess(output, frame_shape, threshold)
# Update performance stats
inference_time = time.time() - start_time
self._inference_times.append(inference_time)
if len(self._inference_times) > 30:
self._inference_times.pop(0)
self.performance_stats['avg_inference_time'] = np.mean(self._inference_times) * 1000
self.performance_stats['frames_processed'] += 1
self._frame_count += 1
self.performance_stats['total_detections'] += len(detections)
# Clean up
del self.active_requests[frame_id]
return detections
def are_requests_complete(self) -> bool:
"""Check if all inference requests are complete."""
return len(self.active_requests) == 0
def wait_for_all(self) -> None:
"""Wait for all outstanding inference requests to complete."""
for frame_id in list(self.active_requests.keys()):
self.detect_async_get_result(frame_id, wait=True)
def detect_vehicles(self, frame: np.ndarray, conf_threshold: Optional[float] = None) -> List[Dict]:
"""
Detect vehicles in a frame using async API internally.
This maintains compatibility with the existing API but uses async under the hood.
Args:
frame: Input frame
conf_threshold: Optional confidence threshold override
Returns:
List of detections
"""
# Start async detection
frame_id = self.detect_async_start(frame)
# Wait for and get results
return self.detect_async_get_result(frame_id, wait=True, conf_threshold=conf_threshold)
# Detection logic using OpenVINO models (YOLO, etc.)
import os
import sys
import time
import cv2
import numpy as np
from pathlib import Path
from typing import List, Dict, Tuple, Optional
from red_light_violation_pipeline import RedLightViolationPipeline
# --- Install required packages if missing ---
try:
import openvino as ov
except ImportError:
print("Installing openvino...")
os.system('pip install --quiet "openvino>=2024.0.0"')
import openvino as ov
try:
from ultralytics import YOLO
except ImportError:
print("Installing ultralytics...")
os.system('pip install --quiet "ultralytics==8.3.0"')
from ultralytics import YOLO
try:
import nncf
except ImportError:
print("Installing nncf...")
os.system('pip install --quiet "nncf>=2.9.0"')
import nncf
# --- COCO dataset class names ---
COCO_CLASSES = {
0: 'person', 1: 'bicycle', 2: 'car', 3: 'motorcycle', 4: 'airplane', 5: 'bus',
6: 'train', 7: 'truck', 8: 'boat', 9: 'traffic light', 10: 'fire hydrant',
11: 'stop sign', 12: 'parking meter', 13: 'bench', 14: 'bird', 15: 'cat',
16: 'dog', 17: 'horse', 18: 'sheep', 19: 'cow', 20: 'elephant', 21: 'bear',
22: 'zebra', 23: 'giraffe', 24: 'backpack', 25: 'umbrella', 26: 'handbag',
27: 'tie', 28: 'suitcase', 29: 'frisbee', 30: 'skis', 31: 'snowboard',
32: 'sports ball', 33: 'kite', 34: 'baseball bat', 35: 'baseball glove',
36: 'skateboard', 37: 'surfboard', 38: 'tennis racket', 39: 'bottle',
40: 'wine glass', 41: 'cup', 42: 'fork', 43: 'knife', 44: 'spoon', 45: 'bowl',
46: 'banana', 47: 'apple', 48: 'sandwich', 49: 'orange', 50: 'broccoli',
51: 'carrot', 52: 'hot dog', 53: 'pizza', 54: 'donut', 55: 'cake', 56: 'chair',
57: 'couch', 58: 'potted plant', 59: 'bed', 60: 'dining table', 61: 'toilet',
62: 'tv', 63: 'laptop', 64: 'mouse', 65: 'remote', 66: 'keyboard',
67: 'cell phone', 68: 'microwave', 69: 'oven', 70: 'toaster', 71: 'sink',
72: 'refrigerator', 73: 'book', 74: 'clock', 75: 'vase', 76: 'scissors',
77: 'teddy bear', 78: 'hair drier', 79: 'toothbrush'
}
# Traffic-related classes we're interested in (using standard COCO indices)
TRAFFIC_CLASS_NAMES = COCO_CLASSES
# --- Model Conversion and Quantization ---
def convert_yolo_to_openvino(model_name: str = "yolo11x", half: bool = True) -> Path:
"""Convert YOLOv11x PyTorch model to OpenVINO IR format."""
pt_path = Path(f"{model_name}.pt")
ov_dir = Path(f"{model_name}_openvino_model")
ov_xml = ov_dir / f"{model_name}.xml"
if not ov_xml.exists():
print(f"Exporting {pt_path} to OpenVINO IR...")
model = YOLO(str(pt_path))
model.export(format="openvino", dynamic=True, half=half)
else:
print(f"OpenVINO IR already exists: {ov_xml}")
return ov_xml
def quantize_openvino_model(ov_xml: Path, model_name: str = "yolo11x") -> Path:
"""Quantize OpenVINO IR model to INT8 using NNCF."""
int8_dir = Path(f"{model_name}_openvino_int8_model")
int8_xml = int8_dir / f"{model_name}.xml"
if int8_xml.exists():
print(f"INT8 model already exists: {int8_xml}")
return int8_xml
print("Quantization requires a calibration dataset. Skipping actual quantization in this demo.")
return ov_xml # Return FP32 if no quantization
# --- OpenVINO Inference Pipeline ---
class OpenVINOYOLODetector:
def __init__(self, model_xml: Path, device: str = "AUTO"):
self.core = ov.Core()
self.device = device
self.model = self.core.read_model(model_xml)
self.input_shape = self.model.inputs[0].shape
self.input_height = self.input_shape[2]
self.input_width = self.input_shape[3]
self.ov_config = {}
if device != "CPU":
self.model.reshape({0: [1, 3, 640, 640]})
if "GPU" in device or ("AUTO" in device and "GPU" in self.core.available_devices):
self.ov_config = {"GPU_DISABLE_WINOGRAD_CONVOLUTION": "YES"}
self.compiled_model = self.core.compile_model(model=self.model, device_name=self.device, config=self.ov_config)
self.output_layer = self.compiled_model.output(0)
def preprocess(self, frame: np.ndarray) -> np.ndarray:
img = cv2.resize(frame, (self.input_width, self.input_height))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = img.astype(np.float32) / 255.0
img = img.transpose(2, 0, 1)[None]
return img
def infer(self, frame: np.ndarray, conf_threshold: float = 0.25) -> List[Dict]:
input_tensor = self.preprocess(frame)
output = self.compiled_model([input_tensor])[self.output_layer]
return self.postprocess(output, frame.shape, conf_threshold)
def postprocess(self, output: np.ndarray, frame_shape, conf_threshold: float) -> List[Dict]:
# Output: (1, 84, 8400) or (84, 8400) or (8400, 84)
if output.ndim == 3:
output = np.squeeze(output)
if output.shape[0] == 84:
output = output.T # (8400, 84)
boxes = output[:, :4]
scores = output[:, 4:]
class_ids = np.argmax(scores, axis=1)
confidences = np.max(scores, axis=1)
detections = []
h, w = frame_shape[:2]
for i, (box, score, class_id) in enumerate(zip(boxes, confidences, class_ids)):
if score < conf_threshold:
continue
x_c, y_c, bw, bh = box
# If normalized, scale to input size
if all(0.0 <= v <= 1.0 for v in box):
x_c *= self.input_width
y_c *= self.input_height
bw *= self.input_width
bh *= self.input_height
# Scale to original frame size
scale_x = w / self.input_width
scale_y = h / self.input_height
x_c *= scale_x
y_c *= scale_y
bw *= scale_x
bh *= scale_y
x1 = int(round(x_c - bw / 2))
y1 = int(round(y_c - bh / 2))
x2 = int(round(x_c + bw / 2))
y2 = int(round(y_c + bh / 2))
x1 = max(0, min(x1, w - 1))
y1 = max(0, min(y1, h - 1))
x2 = max(0, min(x2, w - 1))
y2 = max(0, min(y2, h - 1))
if x2 <= x1 or y2 <= y1:
continue
# Only keep class 9 as traffic light, rename if found
if class_id == 9:
class_name = "traffic light"
elif class_id < len(TRAFFIC_CLASS_NAMES):
class_name = TRAFFIC_CLASS_NAMES[class_id]
else:
continue # Remove unknown/other classes
detections.append({
'bbox': [x1, y1, x2, y2],
'confidence': float(score),
'class_id': int(class_id),
'class_name': class_name
})
# Apply NMS
if len(detections) > 0:
boxes = np.array([det['bbox'] for det in detections])
scores = np.array([det['confidence'] for det in detections])
indices = cv2.dnn.NMSBoxes(boxes.tolist(), scores.tolist(), conf_threshold, 0.5)
if isinstance(indices, (list, tuple)) and len(indices) > 0:
indices = np.array(indices).flatten()
elif isinstance(indices, np.ndarray) and indices.size > 0:
indices = indices.flatten()
else:
indices = []
detections = [detections[int(i)] for i in indices] if len(indices) > 0 else []
return detections
def draw(self, frame: np.ndarray, detections: List[Dict], box_thickness: int = 2) -> np.ndarray:
# 80+ visually distinct colors for COCO classes (BGR)
COCO_COLORS = [
(255, 56, 56), (255, 157, 151), (255, 112, 31), (255, 178, 29), (207, 210, 49),
(72, 249, 10), (146, 204, 23), (61, 219, 134), (26, 147, 52), (0, 212, 187),
(44, 153, 168), (0, 194, 255), (52, 69, 147), (100, 115, 255), (0, 24, 236),
(132, 56, 255), (82, 0, 133), (203, 56, 255), (255, 149, 200), (255, 55, 199),
(255, 255, 56), (255, 255, 151), (255, 255, 31), (255, 255, 29), (207, 255, 49),
(72, 255, 10), (146, 255, 23), (61, 255, 134), (26, 255, 52), (0, 255, 187),
(44, 255, 168), (0, 255, 255), (52, 255, 147), (100, 255, 255), (0, 255, 236),
(132, 255, 255), (82, 255, 133), (203, 255, 255), (255, 255, 200), (255, 255, 199),
(56, 255, 255), (157, 255, 151), (112, 255, 31), (178, 255, 29), (210, 255, 49),
(249, 255, 10), (204, 255, 23), (219, 255, 134), (147, 255, 52), (212, 255, 187),
(153, 255, 168), (194, 255, 255), (69, 255, 147), (115, 255, 255), (24, 255, 236),
(56, 132, 255), (157, 82, 151), (112, 203, 31), (178, 255, 29), (210, 255, 49),
(249, 72, 10), (204, 146, 23), (219, 61, 134), (147, 26, 52), (212, 0, 187),
(153, 44, 168), (194, 0, 255), (69, 52, 147), (115, 100, 255), (24, 0, 236),
(56, 132, 255), (157, 82, 151), (112, 203, 31), (178, 255, 29), (210, 255, 49),
(249, 72, 10), (204, 146, 23), (219, 61, 134), (147, 26, 52), (212, 0, 187),
(153, 44, 168), (194, 0, 255), (69, 52, 147), (115, 100, 255), (24, 0, 236),
(56, 132, 255), (157, 82, 151), (112, 203, 31), (178, 255, 29), (210, 255, 49)
]
for det in detections:
x1, y1, x2, y2 = det['bbox']
label = f"{det['class_name']} {det['confidence']:.2f}"
color = COCO_COLORS[det['class_id'] % len(COCO_COLORS)]
cv2.rectangle(frame, (x1, y1), (x2, y2), color, box_thickness)
cv2.putText(frame, label, (x1, max(y1 - 10, 0)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
return frame
# --- Video/Image/Live Inference ---
def run_inference(detector: OpenVINOYOLODetector, source=0, conf_threshold=0.25, flip=False, use_popup=False, video_width=None):
if isinstance(source, str) and not os.path.exists(source):
print(f"Downloading sample video: {source}")
import requests
url = "https://storage.openvinotoolkit.org/repositories/openvino_notebooks/data/data/video/people.mp4"
r = requests.get(url)
with open(source, 'wb') as f:
f.write(r.content)
cap = cv2.VideoCapture(source)
if not cap.isOpened():
print(f"Failed to open video source: {source}")
return
window_name = "YOLOv11x + OpenVINO Detection"
if use_popup:
cv2.namedWindow(window_name, cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)
frame_count = 0
times = []
while True:
ret, frame = cap.read()
if not ret:
break
if flip:
frame = cv2.flip(frame, 1)
if video_width:
scale = video_width / max(frame.shape[:2])
frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
start = time.time()
detections = detector.infer(frame, conf_threshold=conf_threshold)
frame = detector.draw(frame, detections)
elapsed = time.time() - start
times.append(elapsed)
if len(times) > 200:
times.pop(0)
fps = 1.0 / np.mean(times) if times else 0
cv2.putText(frame, f"FPS: {fps:.1f}", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 2)
if use_popup:
cv2.imshow(window_name, frame)
if cv2.waitKey(1) & 0xFF == 27:
break
else:
cv2.imshow(window_name, frame)
if cv2.waitKey(1) & 0xFF == 27:
break
frame_count += 1
cap.release()
cv2.destroyAllWindows()
def run_inference_async(detector: OpenVINOVehicleDetector, source=0, conf_threshold=0.25, flip=False, use_popup=False, video_width=None, max_pipeline=4):
"""
Run video inference using the async API of OpenVINOVehicleDetector.
"""
if isinstance(source, str) and not os.path.exists(source):
print(f"Downloading sample video: {source}")
import requests
url = "https://storage.openvinotoolkit.org/repositories/openvino_notebooks/data/data/video/people.mp4"
r = requests.get(url)
with open(source, 'wb') as f:
f.write(r.content)
cap = cv2.VideoCapture(source)
if not cap.isOpened():
print(f"Failed to open video source: {source}")
return
window_name = "YOLOv11x + OpenVINO Async Detection"
if use_popup:
cv2.namedWindow(window_name, cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)
frame_count = 0
times = []
pipeline = [] # List of (frame_id, frame, t0)
while True:
# Fill pipeline
while len(pipeline) < max_pipeline:
ret, frame = cap.read()
if not ret:
break
if flip:
frame = cv2.flip(frame, 1)
if video_width:
scale = video_width / max(frame.shape[:2])
frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
t0 = time.time()
frame_id = detector.detect_async_start(frame)
pipeline.append((frame_id, frame, t0))
if not pipeline:
break
# Get result for the oldest frame in pipeline
frame_id, frame, t0 = pipeline.pop(0)
detections = detector.detect_async_get_result(frame_id, wait=True, conf_threshold=conf_threshold)
frame = detector.draw(frame, detections)
elapsed = time.time() - t0
times.append(elapsed)
if len(times) > 200:
times.pop(0)
fps = 1.0 / np.mean(times) if times else 0
cv2.putText(frame, f"FPS: {fps:.1f}", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 2)
if use_popup:
cv2.imshow(window_name, frame)
if cv2.waitKey(1) & 0xFF == 27:
break
else:
cv2.imshow(window_name, frame)
if cv2.waitKey(1) & 0xFF == 27:
break
frame_count += 1
cap.release()
cv2.destroyAllWindows()
# --- Main Entrypoint ---
if __name__ == "__main__":
# Choose model: yolo11x or yolo11n, etc.
MODEL_NAME = "yolo11x"
DEVICE = "AUTO" # or "CPU", "GPU"
# Step 1: Convert model if needed
ov_xml = convert_yolo_to_openvino(MODEL_NAME)
# Step 2: Quantize (optional, demo skips actual quantization)
ov_xml = quantize_openvino_model(ov_xml, MODEL_NAME)
# Step 3: Create detector
detector = OpenVINOYOLODetector(ov_xml, device=DEVICE)
# Step 4: Run on webcam, video, or image
# Webcam: source=0, Video: source="video.mp4", Image: source="image.jpg"
run_inference(detector, source=0, conf_threshold=0.25, flip=True, use_popup=True, video_width=1280)
# To run on a video file: run_inference(detector, source="people.mp4", conf_threshold=0.25)
# To run on an image: run_inference(detector, source="image.jpg", conf_threshold=0.25)
# To run async or batch, extend the OpenVINOYOLODetector class with async API as needed.
import numpy as np
import cv2
def postprocess_openvino_yolo(output, conf_threshold=0.4, iou_threshold=0.5, input_shape=(640, 640), original_shape=None):
"""
output: OpenVINO raw output tensor (e.g., shape [1, 25200, 85])
conf_threshold: minimum confidence
iou_threshold: for NMS
input_shape: model input size (w, h)
original_shape: original image size (w, h)
"""
# 1. Squeeze batch dimension
output = np.squeeze(output) # [25200, 85]
# 2. Split predictions
boxes = output[:, :4]
obj_conf = output[:, 4]
class_scores = output[:, 5:]
# 3. Get class with highest score
class_ids = np.argmax(class_scores, axis=1)
class_conf = class_scores[np.arange(len(class_scores)), class_ids]
# 4. Multiply objectness confidence with class confidence
scores = obj_conf * class_conf
# 5. Filter by confidence threshold
mask = scores > conf_threshold
boxes = boxes[mask]
scores = scores[mask]
class_ids = class_ids[mask]
if original_shape is not None:
# Rescale boxes from input_shape to original image shape
input_w, input_h = input_shape
orig_w, orig_h = original_shape
scale_x = orig_w / input_w
scale_y = orig_h / input_h
boxes[:, 0] *= scale_x # x1
boxes[:, 1] *= scale_y # y1
boxes[:, 2] *= scale_x # x2
boxes[:, 3] *= scale_y # y2
# 6. Convert boxes to [x, y, w, h] format for OpenCV NMS
boxes_xywh = []
for box in boxes:
x1, y1, x2, y2 = box
boxes_xywh.append([x1, y1, x2 - x1, y2 - y1])
# 7. Apply NMS
indices = cv2.dnn.NMSBoxes(boxes_xywh, scores.tolist(), conf_threshold, iou_threshold)
# 8. Return filtered boxes
result_boxes = []
result_scores = []
result_classes = []
if len(boxes) > 0 and len(scores) > 0:
indices = cv2.dnn.NMSBoxes(boxes_xywh, scores.tolist(), conf_threshold, iou_threshold)
if len(indices) > 0:
indices = np.array(indices).flatten()
for i in indices:
i = int(i)
result_boxes.append(boxes[i])
result_scores.append(scores[i])
result_classes.append(class_ids[i])
return result_boxes, result_scores, result_classes
import os
import time
import numpy as np
import cv2
from pathlib import Path
from typing import List, Dict, Optional
# Only traffic-related classes for detection
TRAFFIC_CLASS_NAMES = [
'person', 'bicycle', 'car', 'motorcycle', 'bus', 'truck',
'traffic light', 'stop sign', 'parking meter'
]
class OpenVINOVehicleDetector:
def __init__(self, model_path: str = None, device: str = "AUTO", use_quantized: bool = False, enable_ocr: bool = False, confidence_threshold: float = 0.4):
import openvino as ov
self.device = device
self.confidence_threshold = confidence_threshold
self.ocr_reader = None
self.class_names = TRAFFIC_CLASS_NAMES
self.performance_stats = {
'fps': 0,
'avg_inference_time': 0,
'frames_processed': 0,
'backend': f"OpenVINO-{device}",
'total_detections': 0,
'detection_rate': 0
}
self._inference_times = []
self._start_time = time.time()
self._frame_count = 0
# Model selection logic
self.model_path = self._find_best_model(model_path, use_quantized)
self.core = ov.Core()
self.model = self.core.read_model(self.model_path)
# Always reshape to static shape before accessing .shape
self.model.reshape({0: [1, 3, 640, 640]})
self.input_shape = self.model.inputs[0].shape
self.input_height = self.input_shape[2]
self.input_width = self.input_shape[3]
self.ov_config = {}
if device != "CPU":
# Already reshaped above, so nothing more needed here
pass
if "GPU" in device or ("AUTO" in device and "GPU" in self.core.available_devices):
self.ov_config = {"GPU_DISABLE_WINOGRAD_CONVOLUTION": "YES"}
self.compiled_model = self.core.compile_model(model=self.model, device_name=self.device, config=self.ov_config)
self.output_layer = self.compiled_model.output(0)
def _find_best_model(self, model_path, use_quantized):
# Priority: quantized IR > IR > .pt
search_paths = [
Path(model_path) if model_path else None,
Path("yolo11x_openvino_int8_model/yolo11x.xml") if use_quantized else None,
Path("yolo11x_openvino_model/yolo11x.xml"),
Path("rcb/yolo11x_openvino_model/yolo11x.xml"),
Path("yolo11x.xml"),
Path("rcb/yolo11x.xml"),
Path("yolo11x.pt"),
Path("rcb/yolo11x.pt")
]
for p in search_paths:
if p and p.exists():
return str(p)
raise FileNotFoundError("No suitable YOLOv11x model found for OpenVINO.")
def detect_vehicles(self, frame: np.ndarray, conf_threshold: float = None) -> List[Dict]:
if conf_threshold is None:
conf_threshold = self.confidence_threshold
start = time.time()
input_tensor = self._preprocess(frame)
output = self.compiled_model([input_tensor])[self.output_layer]
detections = self._postprocess(output, frame.shape, conf_threshold)
elapsed = time.time() - start
self._inference_times.append(elapsed)
self._frame_count += 1
self.performance_stats['frames_processed'] = self._frame_count
self.performance_stats['total_detections'] += len(detections)
if len(self._inference_times) > 100:
self._inference_times.pop(0)
self.performance_stats['avg_inference_time'] = float(np.mean(self._inference_times)) if self._inference_times else 0
total_time = time.time() - self._start_time
self.performance_stats['fps'] = self._frame_count / total_time if total_time > 0 else 0
return detections
def _preprocess(self, frame: np.ndarray) -> np.ndarray:
img = cv2.resize(frame, (self.input_width, self.input_height))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = img.astype(np.float32) / 255.0
img = img.transpose(2, 0, 1)[None]
return img
def _postprocess(self, output: np.ndarray, frame_shape, conf_threshold: float) -> List[Dict]:
# Output: (1, 84, 8400) or (84, 8400) or (8400, 84)
if output.ndim == 3:
output = np.squeeze(output)
if output.shape[0] == 84:
output = output.T # (8400, 84)
boxes = output[:, :4]
scores = output[:, 4:]
class_ids = np.argmax(scores, axis=1)
confidences = np.max(scores, axis=1)
detections = []
h, w = frame_shape[:2]
for i, (box, score, class_id) in enumerate(zip(boxes, confidences, class_ids)):
if score < conf_threshold:
continue
x_c, y_c, bw, bh = box
# If normalized, scale to input size
if all(0.0 <= v <= 1.0 for v in box):
x_c *= self.input_width
y_c *= self.input_height
bw *= self.input_width
bh *= self.input_height
# Scale to original frame size
scale_x = w / self.input_width
scale_y = h / self.input_height
x_c *= scale_x
y_c *= scale_y
bw *= scale_x
bh *= scale_y
x1 = int(round(x_c - bw / 2))
y1 = int(round(y_c - bh / 2))
x2 = int(round(x_c + bw / 2))
y2 = int(round(y_c + bh / 2))
x1 = max(0, min(x1, w - 1))
y1 = max(0, min(y1, h - 1))
x2 = max(0, min(x2, w - 1))
y2 = max(0, min(y2, h - 1))
if x2 <= x1 or y2 <= y1:
continue
# Only keep class 9 as traffic light, rename if found
if class_id == 9:
class_name = "traffic light"
elif class_id < len(TRAFFIC_CLASS_NAMES):
class_name = TRAFFIC_CLASS_NAMES[class_id]
else:
continue # Remove unknown/other classes
detections.append({
'bbox': [x1, y1, x2, y2],
'confidence': float(score),
'class_id': int(class_id),
'class_name': class_name
})
# Apply NMS
if len(detections) > 0:
boxes = np.array([det['bbox'] for det in detections])
scores = np.array([det['confidence'] for det in detections])
indices = cv2.dnn.NMSBoxes(boxes.tolist(), scores.tolist(), conf_threshold, 0.5)
if isinstance(indices, (list, tuple)) and len(indices) > 0:
indices = np.array(indices).flatten()
elif isinstance(indices, np.ndarray) and indices.size > 0:
indices = indices.flatten()
else:
indices = []
detections = [detections[int(i)] for i in indices] if len(indices) > 0 else []
return detections
def draw(self, frame: np.ndarray, detections: List[Dict], box_thickness: int = 2) -> np.ndarray:
# 80+ visually distinct colors for COCO classes (BGR)
COCO_COLORS = [
(255, 56, 56), (255, 157, 151), (255, 112, 31), (255, 178, 29), (207, 210, 49),
(72, 249, 10), (146, 204, 23), (61, 219, 134), (26, 147, 52), (0, 212, 187),
(44, 153, 168), (0, 194, 255), (52, 69, 147), (100, 115, 255), (0, 24, 236),
(132, 56, 255), (82, 0, 133), (203, 56, 255), (255, 149, 200), (255, 55, 199),
(255, 255, 56), (255, 255, 151), (255, 255, 31), (255, 255, 29), (207, 255, 49),
(72, 255, 10), (146, 255, 23), (61, 255, 134), (26, 255, 52), (0, 255, 187),
(44, 255, 168), (0, 255, 255), (52, 255, 147), (100, 255, 255), (0, 255, 236),
(132, 255, 255), (82, 255, 133), (203, 255, 255), (255, 255, 200), (255, 255, 199),
(56, 255, 255), (157, 255, 151), (112, 255, 31), (178, 255, 29), (210, 255, 49),
(249, 255, 10), (204, 255, 23), (219, 255, 134), (147, 255, 52), (212, 255, 187),
(153, 255, 168), (194, 255, 255), (69, 255, 147), (115, 255, 255), (24, 255, 236),
(56, 132, 255), (157, 82, 151), (112, 203, 31), (178, 255, 29), (210, 255, 49),
(249, 72, 10), (204, 146, 23), (219, 61, 134), (147, 26, 52), (212, 0, 187),
(153, 44, 168), (194, 0, 255), (69, 52, 147), (115, 100, 255), (24, 0, 236),
(56, 132, 255), (157, 82, 151), (112, 203, 31), (178, 255, 29), (210, 255, 49),
(249, 72, 10), (204, 146, 23), (219, 61, 134), (147, 26, 52), (212, 0, 187),
(153, 44, 168), (194, 0, 255), (69, 52, 147), (115, 100, 255), (24, 0, 236),
(56, 132, 255), (157, 82, 151), (112, 203, 31), (178, 255, 29), (210, 255, 49)
]
for det in detections:
x1, y1, x2, y2 = det['bbox']
label = f"{det['class_name']} {det['confidence']:.2f}"
color = COCO_COLORS[det['class_id'] % len(COCO_COLORS)]
cv2.rectangle(frame, (x1, y1), (x2, y2), color, box_thickness)
cv2.putText(frame, label, (x1, max(y1 - 10, 0)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
return frame
# --- Video/Image/Live Inference ---
def run_inference(detector: OpenVINOYOLODetector, source=0, conf_threshold=0.25, flip=False, use_popup=False, video_width=None):
if isinstance(source, str) and not os.path.exists(source):
print(f"Downloading sample video: {source}")
import requests
url = "https://storage.openvinotoolkit.org/repositories/openvino_notebooks/data/data/video/people.mp4"
r = requests.get(url)
with open(source, 'wb') as f:
f.write(r.content)
cap = cv2.VideoCapture(source)
if not cap.isOpened():
print(f"Failed to open video source: {source}")
return
window_name = "YOLOv11x + OpenVINO Detection"
if use_popup:
cv2.namedWindow(window_name, cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)
frame_count = 0
times = []
while True:
ret, frame = cap.read()
if not ret:
break
if flip:
frame = cv2.flip(frame, 1)
if video_width:
scale = video_width / max(frame.shape[:2])
frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
start = time.time()
detections = detector.infer(frame, conf_threshold=conf_threshold)
frame = detector.draw(frame, detections)
elapsed = time.time() - start
times.append(elapsed)
if len(times) > 200:
times.pop(0)
fps = 1.0 / np.mean(times) if times else 0
cv2.putText(frame, f"FPS: {fps:.1f}", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 2)
if use_popup:
cv2.imshow(window_name, frame)
if cv2.waitKey(1) & 0xFF == 27:
break
else:
cv2.imshow(window_name, frame)
if cv2.waitKey(1) & 0xFF == 27:
break
frame_count += 1
cap.release()
cv2.destroyAllWindows()
def run_inference_async(detector: OpenVINOVehicleDetector, source=0, conf_threshold=0.25, flip=False, use_popup=False, video_width=None, max_pipeline=4):
"""
Run video inference using the async API of OpenVINOVehicleDetector.
"""
if isinstance(source, str) and not os.path.exists(source):
print(f"Downloading sample video: {source}")
import requests
url = "https://storage.openvinotoolkit.org/repositories/openvino_notebooks/data/data/video/people.mp4"
r = requests.get(url)
with open(source, 'wb') as f:
f.write(r.content)
cap = cv2.VideoCapture(source)
if not cap.isOpened():
print(f"Failed to open video source: {source}")
return
window_name = "YOLOv11x + OpenVINO Async Detection"
if use_popup:
cv2.namedWindow(window_name, cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)
frame_count = 0
times = []
pipeline = [] # List of (frame_id, frame, t0)
while True:
# Fill pipeline
while len(pipeline) < max_pipeline:
ret, frame = cap.read()
if not ret:
break
if flip:
frame = cv2.flip(frame, 1)
if video_width:
scale = video_width / max(frame.shape[:2])
frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
t0 = time.time()
frame_id = detector.detect_async_start(frame)
pipeline.append((frame_id, frame, t0))
if not pipeline:
break
# Get result for the oldest frame in pipeline
frame_id, frame, t0 = pipeline.pop(0)
detections = detector.detect_async_get_result(frame_id, wait=True, conf_threshold=conf_threshold)
frame = detector.draw(frame, detections)
elapsed = time.time() - t0
times.append(elapsed)
if len(times) > 200:
times.pop(0)
fps = 1.0 / np.mean(times) if times else 0
cv2.putText(frame, f"FPS: {fps:.1f}", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 2)
if use_popup:
cv2.imshow(window_name, frame)
if cv2.waitKey(1) & 0xFF == 27:
break
else:
cv2.imshow(window_name, frame)
if cv2.waitKey(1) & 0xFF == 27:
break
frame_count += 1
cap.release()
cv2.destroyAllWindows()
# --- Main Entrypoint ---
if __name__ == "__main__":
# Choose model: yolo11x or yolo11n, etc.
MODEL_NAME = "yolo11x"
DEVICE = "AUTO" # or "CPU", "GPU"
# Step 1: Convert model if needed
ov_xml = convert_yolo_to_openvino(MODEL_NAME)
# Step 2: Quantize (optional, demo skips actual quantization)
ov_xml = quantize_openvino_model(ov_xml, MODEL_NAME)
# Step 3: Create detector
detector = OpenVINOYOLODetector(ov_xml, device=DEVICE)
# Step 4: Run on webcam, video, or image
# Webcam: source=0, Video: source="video.mp4", Image: source="image.jpg"
run_inference(detector, source=0, conf_threshold=0.25, flip=True, use_popup=True, video_width=1280)
# To run on a video file: run_inference(detector, source="people.mp4", conf_threshold=0.25)
# To run on an image: run_inference(detector, source="image.jpg", conf_threshold=0.25)
# To run async or batch, extend the OpenVINOYOLODetector class with async API as needed.
import numpy as np
import cv2
def postprocess_openvino_yolo(output, conf_threshold=0.4, iou_threshold=0.5, input_shape=(640, 640), original_shape=None):
"""
output: OpenVINO raw output tensor (e.g., shape [1, 25200, 85])
conf_threshold: minimum confidence
iou_threshold: for NMS
input_shape: model input size (w, h)
original_shape: original image size (w, h)
"""
# 1. Squeeze batch dimension
output = np.squeeze(output) # [25200, 85]
# 2. Split predictions
boxes = output[:, :4]
obj_conf = output[:, 4]
class_scores = output[:, 5:]
# 3. Get class with highest score
class_ids = np.argmax(class_scores, axis=1)
class_conf = class_scores[np.arange(len(class_scores)), class_ids]
# 4. Multiply objectness confidence with class confidence
scores = obj_conf * class_conf
# 5. Filter by confidence threshold
mask = scores > conf_threshold
boxes = boxes[mask]
scores = scores[mask]
class_ids = class_ids[mask]
if original_shape is not None:
# Rescale boxes from input_shape to original image shape
input_w, input_h = input_shape
orig_w, orig_h = original_shape
scale_x = orig_w / input_w
scale_y = orig_h / input_h
boxes[:, 0] *= scale_x # x1
boxes[:, 1] *= scale_y # y1
boxes[:, 2] *= scale_x # x2
boxes[:, 3] *= scale_y # y2
# 6. Convert boxes to [x, y, w, h] format for OpenCV NMS
boxes_xywh = []
for box in boxes:
x1, y1, x2, y2 = box
boxes_xywh.append([x1, y1, x2 - x1, y2 - y1])
# 7. Apply NMS
indices = cv2.dnn.NMSBoxes(boxes_xywh, scores.tolist(), conf_threshold, iou_threshold)
# 8. Return filtered boxes
result_boxes = []
result_scores = []
result_classes = []
if len(boxes) > 0 and len(scores) > 0:
indices = cv2.dnn.NMSBoxes(boxes_xywh, scores.tolist(), conf_threshold, iou_threshold)
if len(indices) > 0:
indices = np.array(indices).flatten()
for i in indices:
i = int(i)
result_boxes.append(boxes[i])
result_scores.append(scores[i])
result_classes.append(class_ids[i])
return result_boxes, result_scores, result_classes
import os
import time
import numpy as np
import cv2
from pathlib import Path
from typing import List, Dict, Optional
# Only traffic-related classes for detection
TRAFFIC_CLASS_NAMES = [
'person', 'bicycle', 'car', 'motorcycle', 'bus', 'truck',
'traffic light', 'stop sign', 'parking meter'
]
class OpenVINOVehicleDetector:
def __init__(self, model_path: str = None, device: str = "AUTO", use_quantized: bool = False, enable_ocr: bool = False, confidence_threshold: float = 0.4):
import openvino as ov
self.device = device
self.confidence_threshold = confidence_threshold
self.ocr_reader = None
self.class_names = TRAFFIC_CLASS_NAMES
self.performance_stats = {
'fps': 0,
'avg_inference_time': 0,
'frames_processed': 0,
'backend': f"OpenVINO-{device}",
'total_detections': 0,
'detection_rate': 0
}
self._inference_times = []
self._start_time = time.time()
self._frame_count = 0
# Model selection logic
self.model_path = self._find_best_model(model_path, use_quantized)
self.core = ov.Core()
self.model = self.core.read_model(self.model_path)
# Always reshape to static shape before accessing .shape
self.model.reshape({0: [1, 3, 640, 640]})
self.input_shape = self.model.inputs[0].shape
self.input_height = self.input_shape[2]
self.input_width = self.input_shape[3]
self.ov_config = {}
if device != "CPU":
# Already reshaped above, so nothing more needed here
pass
if "GPU" in device or ("AUTO" in device and "GPU" in self.core.available_devices):
self.ov_config = {"GPU_DISABLE_WINOGRAD_CONVOLUTION": "YES"}
self.compiled_model = self.core.compile_model(model=self.model, device_name=self.device, config=self.ov_config)
self.output_layer = self.compiled_model.output(0)
def _find_best_model(self, model_path, use_quantized):
# Priority: quantized IR > IR > .pt
search_paths = [
Path(model_path) if model_path else None,
Path("yolo11x_openvino_int8_model/yolo11x.xml") if use_quantized else None,
Path("yolo11x_openvino_model/yolo11x.xml"),
Path("rcb/yolo11x_openvino_model/yolo11x.xml"),
Path("yolo11x.xml"),
Path("rcb/yolo11x.xml"),
Path("yolo11x.pt"),
Path("rcb/yolo11x.pt")
]
for p in search_paths:
if p and p.exists():
return str(p)
raise FileNotFoundError("No suitable YOLOv11x model found for OpenVINO.")
def detect_vehicles(self, frame: np.ndarray, conf_threshold: float = None) -> List[Dict]:
if conf_threshold is None:
conf_threshold = self.confidence_threshold
start = time.time()
input_tensor = self._preprocess(frame)
output = self.compiled_model([input_tensor])[self.output_layer]
detections = self._postprocess(output, frame.shape, conf_threshold)
elapsed = time.time() - start
self._inference_times.append(elapsed)
self._frame_count += 1
self.performance_stats['frames_processed'] = self._frame_count
self.performance_stats['total_detections'] += len(detections)
if len(self._inference_times) > 100:
self._inference_times.pop(0)
self.performance_stats['avg_inference_time'] = float(np.mean(self._inference_times)) if self._inference_times else 0
total_time = time.time() - self._start_time
self.performance_stats['fps'] = self._frame_count / total_time if total_time > 0 else 0
return detections
def _preprocess(self, frame: np.ndarray) -> np.ndarray:
img = cv2.resize(frame, (self.input_width, self.input_height))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = img.astype(np.float32) / 255.0
img = img.transpose(2, 0, 1)[None]
return img
def _postprocess(self, output: np.ndarray, frame_shape, conf_threshold: float) -> List[Dict]:
# Output: (1, 84, 8400) or (84, 8400) or (8400, 84)
if output.ndim == 3:
output = np.squeeze(output)
if output.shape[0] == 84:
output = output.T # (8400, 84)
boxes = output[:, :4]
scores = output[:, 4:]
class_ids = np.argmax(scores, axis=1)
confidences = np.max(scores, axis=1)
detections = []
h, w = frame_shape[:2]
for i, (box, score, class_id) in enumerate(zip(boxes, confidences, class_ids)):
if score < conf_threshold:
continue
x_c, y_c, bw, bh = box
# If normalized, scale to input size
if all(0.0 <= v <= 1.0 for v in box):
x_c *= self.input_width
y_c *= self.input_height
bw *= self.input_width
bh *= self.input_height
# Scale to original frame size
scale_x = w / self.input_width
scale_y = h / self.input_height
x_c *= scale_x
y_c *= scale_y
bw *= scale_x
bh *= scale_y
x1 = int(round(x_c - bw / 2))
y1 = int(round(y_c - bh / 2))
x2 = int(round(x_c + bw / 2))
y2 = int(round(y_c + bh / 2))
x1 = max(0, min(x1, w - 1))
y1 = max(0, min(y1, h - 1))
x2 = max(0, min(x2, w - 1))
y2 = max(0, min(y2, h - 1))
if x2 <= x1 or y2 <= y1:
continue
# Only keep class 9 as traffic light, rename if found
if class_id == 9:
class_name = "traffic light"
elif class_id < len(TRAFFIC_CLASS_NAMES):
class_name = TRAFFIC_CLASS_NAMES[class_id]
else:
continue # Remove unknown/other classes
detections.append({
'bbox': [x1, y1, x2, y2],
'confidence': float(score),
'class_id': int(class_id),
'class_name': class_name
})
# Apply NMS
if len(detections) > 0:
boxes = np.array([det['bbox'] for det in detections])
scores = np.array([det['confidence'] for det in detections])
indices = cv2.dnn.NMSBoxes(boxes.tolist(), scores.tolist(), conf_threshold, 0.5)
if isinstance(indices, (list, tuple)) and len(indices) > 0:
indices = np.array(indices).flatten()
elif isinstance(indices, np.ndarray) and indices.size > 0:
indices = indices.flatten()
else:
indices = []
detections = [detections[int(i)] for i in indices] if len(indices) > 0 else []
return detections
def draw(self, frame: np.ndarray, detections: List[Dict], box_thickness: int = 2) -> np.ndarray:
# 80+ visually distinct colors for COCO classes (BGR)
COCO_COLORS = [
(255, 56, 56), (255, 157, 151), (255, 112, 31), (255, 178, 29), (207, 210, 49),
(72, 249, 10), (146, 204, 23), (61, 219, 134), (26, 147, 52), (0, 212, 187),
(44, 153, 168), (0, 194, 255), (52, 69, 147), (100, 115, 255), (0, 24, 236),
(132, 56, 255), (82, 0, 133), (203, 56, 255), (255, 149, 200), (255, 55, 199),
(255, 255, 56), (255, 255, 151), (255, 255, 31), (255, 255, 29), (207, 255, 49),
(72, 255, 10), (146, 255, 23), (61, 255, 134), (26, 255, 52), (0, 255, 187),
(44, 255, 168), (0, 255, 255), (52, 255, 147), (100, 255, 255), (0, 255, 236),
(132, 255, 255), (82, 255, 133), (203, 255, 255), (255, 255, 200), (255, 255, 199),
(56, 255, 255), (157, 255, 151), (112, 255, 31), (178, 255, 29), (210, 255, 49),
(249, 255, 10), (204, 255, 23), (219, 255, 134), (147, 255, 52), (212, 255, 187),
(153, 255, 168), (194, 255, 255), (69, 255, 147), (115, 255, 255), (24, 255, 236),
(56, 132, 255), (157, 82, 151), (112, 203, 31), (178, 255, 29), (210, 255, 49),
(249, 72, 10), (204, 146, 23), (219, 61, 134), (147, 26, 52), (212, 0, 187),
(153, 44, 168), (194, 0, 255), (69, 52, 147), (115, 100, 255), (24, 0, 236),
(56, 132, 255), (157, 82, 151), (112, 203, 31), (178, 255, 29), (210, 255, 49),
(249, 72, 10), (204, 146, 23), (219, 61, 134), (147, 26, 52), (212, 0, 187),
(153, 44, 168), (194, 0, 255), (69, 52, 147), (115, 100, 255), (24, 0, 236),
(56, 132, 255), (157, 82, 151), (112, 203, 31), (178, 255, 29), (210, 255, 49)
]
for det in detections:
x1, y1, x2, y2 = det['bbox']
label = f"{det['class_name']} {det['confidence']:.2f}"
color = COCO_COLORS[det['class_id'] % len(COCO_COLORS)]
cv2.rectangle(frame, (x1, y1), (x2, y2), color, box_thickness)
cv2.putText(frame, label, (x1, max(y1 - 10, 0)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
return frame
# --- Video/Image/Live Inference ---
def run_inference(detector: OpenVINOYOLODetector, source=0, conf_threshold=0.25, flip=False, use_popup=False, video_width=None):
if isinstance(source, str) and not os.path.exists(source):
print(f"Downloading sample video: {source}")
import requests
url = "https://storage.openvinotoolkit.org/repositories/openvino_notebooks/data/data/video/people.mp4"
r = requests.get(url)
with open(source, 'wb') as f:
f.write(r.content)
cap = cv2.VideoCapture(source)
if not cap.isOpened():
print(f"Failed to open video source: {source}")
return
window_name = "YOLOv11x + OpenVINO Detection"
if use_popup:
cv2.namedWindow(window_name, cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)
frame_count = 0
times = []
while True:
ret, frame = cap.read()
if not ret:
break
if flip:
frame = cv2.flip(frame, 1)
if video_width:
scale = video_width / max(frame.shape[:2])
frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
start = time.time()
detections = detector.infer(frame, conf_threshold=conf_threshold)
frame = detector.draw(frame, detections)
elapsed = time.time() - start
times.append(elapsed)
if len(times) > 200:
times.pop(0)
fps = 1.0 / np.mean(times) if times else 0
cv2.putText(frame, f"FPS: {fps:.1f}", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 2)
if use_popup:
cv2.imshow(window_name, frame)
if cv2.waitKey(1) & 0xFF == 27:
break
else:
cv2.imshow(window_name, frame)
if cv2.waitKey(1) & 0xFF == 27:
break
frame_count += 1
cap.release()
cv2.destroyAllWindows()
def run_inference_async(detector: OpenVINOVehicleDetector, source=0, conf_threshold=0.25, flip=False, use_popup=False, video_width=None, max_pipeline=4):
"""
Run video inference using the async API of OpenVINOVehicleDetector.
"""
if isinstance(source, str) and not os.path.exists(source):
print(f"Downloading sample video: {source}")
import requests
url = "https://storage.openvinotoolkit.org/repositories/openvino_notebooks/data/data/video/people.mp4"
r = requests.get(url)
with open(source, 'wb') as f:
f.write(r.content)
cap = cv2.VideoCapture(source)
if not cap.isOpened():
print(f"Failed to open video source: {source}")
return
window_name = "YOLOv11x + OpenVINO Async Detection"
if use_popup:
cv2.namedWindow(window_name, cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)
frame_count = 0
times = []
pipeline = [] # List of (frame_id, frame, t0)
while True:
# Fill pipeline
while len(pipeline) < max_pipeline:
ret, frame = cap.read()
if not ret:
break
if flip:
frame = cv2.flip(frame, 1)
if video_width:
scale = video_width / max(frame.shape[:2])
frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
t0 = time.time()
frame_id = detector.detect_async_start(frame)
pipeline.append((frame_id, frame, t0))
if not pipeline:
break
# Get result for the oldest frame in pipeline
frame_id, frame, t0 = pipeline.pop(0)
detections = detector.detect_async_get_result(frame_id, wait=True, conf_threshold=conf_threshold)
frame = detector.draw(frame, detections)
elapsed = time.time() - t0
times.append(elapsed)
if len(times) > 200:
times.pop(0)
fps = 1.0 / np.mean(times) if times else 0
cv2.putText(frame, f"FPS: {fps:.1f}", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 2)
if use_popup:
cv2.imshow(window_name, frame)
if cv2.waitKey(1) & 0xFF == 27:
break
else:
cv2.imshow(window_name, frame)
if cv2.waitKey(1) & 0xFF == 27:
break
frame_count += 1
cap.release()
cv2.destroyAllWindows()
# --- Main Entrypoint ---
if __name__ == "__main__":
# Choose model: yolo11x or yolo11n, etc.
MODEL_NAME = "yolo11x"
DEVICE = "AUTO" # or "CPU", "GPU"
# Step 1: Convert model if needed
ov_xml = convert_yolo_to_openvino(MODEL_NAME)
# Step 2: Quantize (optional, demo skips actual quantization)
ov_xml = quantize_openvino_model(ov_xml, MODEL_NAME)
# Step 3: Create detector
detector = OpenVINOYOLODetector(ov_xml, device=DEVICE)
# Step 4: Run on webcam, video, or image
# Webcam: source=0, Video: source="video.mp4", Image: source="image.jpg"
run_inference(detector, source=0, conf_threshold=0.25, flip=True, use_popup=True, video_width=1280)
# To run on a video file: run_inference(detector, source="people.mp4", conf_threshold=0.25)
# To run on an image: run_inference(detector, source="image.jpg", conf_threshold=0.25)
# To run async or batch, extend the OpenVINOYOLODetector class with async API as needed.
import numpy as np
import cv2
def postprocess_openvino_yolo(output, conf_threshold=0.4, iou_threshold=0.5, input_shape=(640, 640), original_shape=None):
"""
output: OpenVINO raw output tensor (e.g., shape [1, 25200, 85])
conf_threshold: minimum confidence
iou_threshold: for NMS
input_shape: model input size (w, h)
original_shape: original image size (w, h)
"""
# 1. Squeeze batch dimension
output = np.squeeze(output) # [25200, 85]
# 2. Split predictions
boxes = output[:, :4]
obj_conf = output[:, 4]
class_scores = output[:, 5:]
# 3. Get class with highest score
class_ids = np.argmax(class_scores, axis=1)
class_conf = class_scores[np.arange(len(class_scores)), class_ids]
# 4. Multiply objectness confidence with class confidence
scores = obj_conf * class_conf
# 5. Filter by confidence threshold
mask = scores > conf_threshold
boxes = boxes[mask]
scores = scores[mask]
class_ids = class_ids[mask]
if original_shape is not None:
# Rescale boxes from input_shape to original image shape
input_w, input_h = input_shape
orig_w, orig_h = original_shape
scale_x = orig_w / input_w
scale_y = orig_h / input_h
boxes[:, 0] *= scale_x # x1
boxes[:, 1] *= scale_y # y1
boxes[:, 2] *= scale_x # x2
boxes[:, 3] *= scale_y # y2
# 6. Convert boxes to [x, y, w, h] format for OpenCV NMS
boxes_xywh = []
for box in boxes:
x1, y1, x2, y2 = box
boxes_xywh.append([x1, y1, x2 - x1, y2 - y1])
# 7. Apply NMS
indices = cv2.dnn.NMSBoxes(boxes_xywh, scores.tolist(), conf_threshold, iou_threshold)
# 8. Return filtered boxes
result_boxes = []
result_scores = []
result_classes = []
if len(boxes) > 0 and len(scores) > 0:
indices = cv2.dnn.NMSBoxes(boxes_xywh, scores.tolist(), conf_threshold, iou_threshold)
if len(indices) > 0:
indices = np.array(indices).flatten()
for i in indices:
i = int(i)
result_boxes.append(boxes[i])
result_scores.append(scores[i])
result_classes.append(class_ids[i])
return result_boxes, result_scores, result_classes
import os
import time
import numpy as np
import cv2
from pathlib import Path
from typing import List, Dict, Optional
# Only traffic-related classes for detection
TRAFFIC_CLASS_NAMES = [
'person', 'bicycle', 'car', 'motorcycle', 'bus', 'truck',
'traffic light', 'stop sign', 'parking meter'
]
class OpenVINOVehicleDetector:
def __init__(self, model_path: str = None, device: str = "AUTO", use_quantized: bool = False, enable_ocr: bool = False, confidence_threshold: float = 0.4):
import openvino as ov
self.device = device
self.confidence_threshold = confidence_threshold
self.ocr_reader = None
self.class_names = TRAFFIC_CLASS_NAMES
self.performance_stats = {
'fps': 0,
'avg_inference_time': 0,
'frames_processed': 0,
'backend': f"OpenVINO-{device}",
'total_detections': 0,
'detection_rate': 0
}
self._inference_times = []
self._start_time = time.time()
self._frame_count = 0
# Model selection logic
self.model_path = self._find_best_model(model_path, use_quantized)
self.core = ov.Core()
self.model = self.core.read_model(self.model_path)
# Always reshape to static shape before accessing .shape
self.model.reshape({0: [1, 3, 640, 640]})
self.input_shape = self.model.inputs[0].shape
self.input_height = self.input_shape[2]
self.input_width = self.input_shape[3]
self.ov_config = {}
if device != "CPU":
# Already reshaped above, so nothing more needed here
pass
if "GPU" in device or ("AUTO" in device and "GPU" in self.core.available_devices):
self.ov_config = {"GPU_DISABLE_WINOGRAD_CONVOLUTION": "YES"}
self.compiled_model = self.core.compile_model(model=self.model, device_name=self.device, config=self.ov_config)
self.output_layer = self.compiled_model.output(0)
def _find_best_model(self, model_path, use_quantized):
# Priority: quantized IR > IR > .pt
search_paths = [
Path(model_path) if model_path else None,
Path("yolo11x_openvino_int8_model/yolo11x.xml") if use_quantized else None,
Path("yolo11x_openvino_model/yolo11x.xml"),
Path("rcb/yolo11x_openvino_model/yolo11x.xml"),
Path("yolo11x.xml"),
Path("rcb/yolo11x.xml"),
Path("yolo11x.pt"),
Path("rcb/yolo11x.pt")
]
for p in search_paths:
if p and p.exists():
return str(p)
raise FileNotFoundError("No suitable YOLOv11x model found for OpenVINO.")
def detect_vehicles(self, frame: np.ndarray, conf_threshold: float = None) -> List[Dict]:
if conf_threshold is None:
conf_threshold = self.confidence_threshold
start = time.time()
input_tensor = self._preprocess(frame)
output = self.compiled_model([input_tensor])[self.output_layer]
detections = self._postprocess(output, frame.shape, conf_threshold)
elapsed = time.time() - start
self._inference_times.append(elapsed)
self._frame_count += 1
self.performance_stats['frames_processed'] = self._frame_count
self.performance_stats['total_detections'] += len(detections)
if len(self._inference_times) > 100:
self._inference_times.pop(0)
self.performance_stats['avg_inference_time'] = float(np.mean(self._inference_times)) if self._inference_times else 0
total_time = time.time() - self._start_time
self.performance_stats['fps'] = self._frame_count / total_time if total_time > 0 else 0
return detections
def _preprocess(self, frame: np.ndarray) -> np.ndarray:
img = cv2.resize(frame, (self.input_width, self.input_height))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = img.astype(np.float32) / 255.0
img = img.transpose(2, 0, 1)[None]
return img
def _postprocess(self, output: np.ndarray, frame_shape, conf_threshold: float) -> List[Dict]:
# Output: (1, 84, 8400) or (84, 8400) or (8400, 84)
if output.ndim == 3:
output = np.squeeze(output)
if output.shape[0] == 84:
output = output.T # (8400, 84)
boxes = output[:, :4]
scores = output[:, 4:]
class_ids = np.argmax(scores, axis=1)
confidences = np.max(scores, axis=1)
detections = []
h, w = frame_shape[:2]
for i, (box, score, class_id) in enumerate(zip(boxes, confidences, class_ids)):
if score < conf_threshold:
continue
x_c, y_c, bw, bh = box
# If normalized, scale to input size
if all(0.0 <= v <= 1.0 for v in box):
x_c *= self.input_width
y_c *= self.input_height
bw *= self.input_width
bh *= self.input_height
# Scale to original frame size
scale_x = w / self.input_width
scale_y = h / self.input_height
x_c *= scale_x
y_c *= scale_y
bw *= scale_x
bh *= scale_y
x1 = int(round(x_c - bw / 2))
y1 = int(round(y_c - bh / 2))
x2 = int(round(x_c + bw / 2))
y2 = int(round(y_c + bh / 2))
x1 = max(0, min(x1, w - 1))
y1 = max(0, min(y1, h - 1))
x2 = max(0, min(x2, w - 1))
y2 = max(0, min(y2, h - 1))
if x2 <= x1 or y2 <= y1:
continue
# Only keep class 9 as traffic light, rename if found
if class_id == 9:
class_name = "traffic light"
elif class_id < len(TRAFFIC_CLASS_NAMES):
class_name = TRAFFIC_CLASS_NAMES[class_id]
else:
continue # Remove unknown/other classes
detections.append({
'bbox': [x1, y1, x2, y2],
'confidence': float(score),
'class_id': int(class_id),
'class_name': class_name
})
# Apply NMS
if len(detections) > 0:
boxes = np.array([det['bbox'] for det in detections])
scores = np.array([det['confidence'] for det in detections])
indices = cv2.dnn.NMSBoxes(boxes.tolist(), scores.tolist(), conf_threshold, 0.5)
if isinstance(indices, (list, tuple)) and len(indices) > 0:
indices = np.array(indices).flatten()
elif isinstance(indices, np.ndarray) and indices.size > 0:
indices = indices.flatten()
else:
indices = []
detections = [detections[int(i)] for i in indices] if len(indices) > 0 else []
return detections
def draw(self, frame: np.ndarray, detections: List[Dict], box_thickness: int = 2) -> np.ndarray:
# 80+ visually distinct colors for COCO classes (BGR)
COCO_COLORS = [
(255, 56, 56), (255, 157, 151), (255, 112, 31), (255, 178, 29), (207, 210, 49),
(72, 249, 10), (146, 204, 23), (61, 219, 134), (26, 147, 52), (0, 212, 187),
(44, 153, 168), (0, 194, 255), (52, 69, 147), (100, 115, 255), (0, 24, 236),
(132, 56, 255), (82, 0, 133), (203, 56, 255), (255, 149, 200), (255, 55, 199),
(255, 255, 56), (255, 255, 151), (255, 255, 31), (255, 255, 29), (207, 255, 49),
(72, 255, 10), (146, 255, 23), (61, 255, 134), (26, 255, 52), (0, 255, 187),
(44, 255, 168), (0, 255, 255), (52, 255, 147), (100, 255, 255), (0, 255, 236),
(132, 255, 255), (82, 255, 133), (203, 255, 255), (255, 255, 200), (255, 255, 199),
(56, 255, 255), (157, 255, 151), (112, 255, 31), (178, 255, 29), (210, 255, 49),
(249, 255, 10), (204, 255, 23), (219, 255, 134), (147, 255, 52), (212, 255, 187),
(153, 255, 168), (194, 255, 255), (69, 255, 147), (115, 255, 255), (24, 255, 236),
(56, 132, 255), (157, 82, 151), (112, 203, 31), (178, 255, 29), (210, 255, 49),
(249, 72, 10), (204, 146, 23), (219, 61, 134), (147, 26, 52), (212, 0, 187),
(153, 44, 168), (194, 0, 255), (69, 52, 147), (115, 100, 255), (24, 0, 236),
(56, 132, 255), (157, 82, 151), (112, 203, 31), (178, 255, 29), (210, 255, 49),
(249, 72, 10), (204, 146, 23), (219, 61, 134), (147, 26, 52), (212, 0, 187),
(153, 44, 168), (194, 0, 255), (69, 52, 147), (115, 100, 255), (24, 0, 236),
(56, 132, 255), (157, 82, 151), (112, 203, 31), (178, 255, 29), (210, 255, 49)
]
for det in detections:
x1, y1, x2, y2 = det['bbox']
label = f"{det['class_name']} {det['confidence']:.2f}"
color = COCO_COLORS[det['class_id'] % len(COCO_COLORS)]
cv2.rectangle(frame, (x1, y1), (x2, y2), color, box_thickness)
cv2.putText(frame, label, (x1, max(y1 - 10, 0)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
return frame
# --- Video/Image/Live Inference ---
def run_inference(detector: OpenVINOYOLODetector, source=0, conf_threshold=0.25, flip=False, use_popup=False, video_width=None):
if isinstance(source, str) and not os.path.exists(source):
print(f"Downloading sample video: {source}")
import requests
url = "https://storage.openvinotoolkit.org/repositories/openvino_notebooks/data/data/video/people.mp4"
r = requests.get(url)
with open(source, 'wb') as f:
f.write(r.content)
cap = cv2.VideoCapture(source)
if not cap.isOpened():
print(f"Failed to open video source: {source}")
return
window_name = "YOLOv11x + OpenVINO Detection"
if use_popup:
cv2.namedWindow(window_name, cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)
frame_count = 0
times = []
while True:
ret, frame = cap.read()
if not ret:
break
if flip:
frame = cv2.flip(frame, 1)
if video_width:
scale = video_width / max(frame.shape[:2])
frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
start = time.time()
detections = detector.infer(frame, conf_threshold=conf_threshold)
frame = detector.draw(frame, detections)
elapsed = time.time() - start
times.append(elapsed)
if len(times) > 200:
times.pop(0)
fps = 1.0 / np.mean(times) if times else 0
cv2.putText(frame, f"FPS: {fps:.1f}", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 2)
if use_popup:
cv2.imshow(window_name, frame)
if cv2.waitKey(1) & 0xFF == 27:
break
else:
cv2.imshow(window_name, frame)
if cv2.waitKey(1) & 0xFF == 27:
break
frame_count += 1
cap.release()
cv2.destroyAllWindows()
def run_inference_async(detector: OpenVINOVehicleDetector, source=0, conf_threshold=0.25, flip=False, use_popup=False, video_width=None, max_pipeline=4):
"""
Run video inference using the async API of OpenVINOVehicleDetector.
"""
if isinstance(source, str) and not os.path.exists(source):
print(f"Downloading sample video: {source}")
import requests
url = "https://storage.openvinotoolkit.org/repositories/openvino_notebooks/data/data/video/people.mp4"
r = requests.get(url)
with open(source, 'wb') as f:
f.write(r.content)
cap = cv2.VideoCapture(source)
if not cap.isOpened():
print(f"Failed to open video source: {source}")
return
window_name = "YOLOv11x + OpenVINO Async Detection"
if use_popup:
cv2.namedWindow(window_name, cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)
frame_count = 0
times = []
pipeline = [] # List of (frame_id, frame, t0)
while True:
# Fill pipeline
while len(pipeline) < max_pipeline:
ret, frame = cap.read()
if not ret:
break
if flip:
frame = cv2.flip(frame, 1)
if video_width:
scale = video_width / max(frame.shape[:2])
frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
t0 = time.time()
frame_id = detector.detect_async_start(frame)
pipeline.append((frame_id, frame, t0))
if not pipeline:
break
# Get result for the oldest frame in pipeline
frame_id, frame, t0 = pipeline.pop(0)
detections = detector.detect_async_get_result(frame_id, wait=True, conf_threshold=conf_threshold)
frame = detector.draw(frame, detections)
elapsed = time.time() - t0
times.append(elapsed)
if len(times) > 200:
times.pop(0)
fps = 1.0 / np.mean(times) if times else 0
cv2.putText(frame, f"FPS: {fps:.1f}", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 2)
if use_popup:
cv2.imshow(window_name, frame)
if cv2.waitKey(1) & 0xFF == 27:
break
else:
cv2.imshow(window_name, frame)
if cv2.waitKey(1) & 0xFF == 27:
break
frame_count += 1
cap.release()
cv2.destroyAllWindows()
# --- Main Entrypoint ---
if __name__ == "__main__":
# Choose model: yolo11x or yolo11n, etc.
MODEL_NAME = "yolo11x"
DEVICE = "AUTO" # or "CPU", "GPU"
# Step 1: Convert model if needed
ov_xml = convert_yolo_to_openvino(MODEL_NAME)
# Step 2: Quantize (optional, demo skips actual quantization)
ov_xml = quantize_openvino_model(ov_xml, MODEL_NAME)
# Step 3: Create detector
detector = OpenVINOYOLODetector(ov_xml, device=DEVICE)
# Step 4: Run on webcam, video, or image
# Webcam: source=0, Video: source="video.mp4", Image: source="image.jpg"
run_inference(detector, source=0, conf_threshold=0.25, flip=True, use_popup=True, video_width=1280)
# To run on a video file: run_inference(detector, source="people.mp4", conf_threshold=0.25)
# To run on an image: run_inference(detector, source="image.jpg", conf_threshold=0.25)
# To run async or batch, extend the OpenVINOYOLODetector class with async API as needed.
import numpy as np
import cv2
def postprocess_openvino_yolo(output, conf_threshold=0.4, iou_threshold=0.5, input_shape=(640, 640), original_shape=None):
"""
output: OpenVINO raw output tensor (e.g., shape [1, 25200, 85])
conf_threshold: minimum confidence
iou_threshold: for NMS
input_shape: model input size (w, h)
original_shape: original image size (w, h)
"""
# 1. Squeeze batch dimension
output = np.squeeze(output) # [25200, 85]
# 2. Split predictions
boxes = output[:, :4]
obj_conf = output[:, 4]
class_scores = output[:, 5:]
# 3. Get class with highest score
class_ids = np.argmax(class_scores, axis=1)
class_conf = class_scores[np.arange(len(class_scores)), class_ids]
# 4. Multiply objectness confidence with class confidence
scores = obj_conf * class_conf
# 5. Filter by confidence threshold
mask = scores > conf_threshold
boxes = boxes[mask]
scores = scores[mask]
class_ids = class_ids[mask]
if original_shape is not None:
# Rescale boxes from input_shape to original image shape
input_w, input_h = input_shape
orig_w, orig_h = original_shape
scale_x = orig_w / input_w
scale_y = orig_h / input_h
boxes[:, 0] *= scale_x # x1
boxes[:, 1] *= scale_y # y1
boxes[:, 2] *= scale_x # x2
boxes[:, 3] *= scale_y # y2
# 6. Convert boxes to [x, y, w, h] format for OpenCV NMS
boxes_xywh = []
for box in boxes:
x1, y1, x2, y2 = box
boxes_xywh.append([x1, y1, x2 - x1, y2 - y1])
# 7. Apply NMS
indices = cv2.dnn.NMSBoxes(boxes_xywh, scores.tolist(), conf_threshold, iou_threshold)
# 8. Return filtered boxes
result_boxes = []
result_scores = []
result_classes = []
if len(boxes) > 0 and len(scores) > 0:
indices = cv2.dnn.NMSBoxes(boxes_xywh, scores.tolist(), conf_threshold, iou_threshold)
if len(indices) > 0:
indices = np.array(indices).flatten()
for i in indices:
i = int(i)
result_boxes.append(boxes[i])
result_scores.append(scores[i])
result_classes.append(class_ids[i])
return result_boxes, result_scores, result_classes
# Detection logic using OpenVINO models (YOLO, etc.)
import os
import sys
import time
import cv2
import numpy as np
from pathlib import Path
from typing import List, Dict, Tuple, Optional
from red_light_violation_pipeline import RedLightViolationPipeline
# --- Install required packages if missing ---
try:
import openvino as ov
except ImportError:
print("Installing openvino...")
os.system('pip install --quiet "openvino>=2024.0.0"')
import openvino as ov
try:
from ultralytics import YOLO
except ImportError:
print("Installing ultralytics...")
os.system('pip install --quiet "ultralytics==8.3.0"')
from ultralytics import YOLO
try:
import nncf
except ImportError:
print("Installing nncf...")
os.system('pip install --quiet "nncf>=2.9.0"')
import nncf
# --- COCO dataset class names ---
COCO_CLASSES = {
0: 'person', 1: 'bicycle', 2: 'car', 3: 'motorcycle', 4: 'airplane', 5: 'bus',
6: 'train', 7: 'truck', 8: 'boat', 9: 'traffic light', 10: 'fire hydrant',
11: 'stop sign', 12: 'parking meter', 13: 'bench', 14: 'bird', 15: 'cat',
16: 'dog', 17: 'horse', 18: 'sheep', 19: 'cow', 20: 'elephant', 21: 'bear',
22: 'zebra', 23: 'giraffe', 24: 'backpack', 25: 'umbrella', 26: 'handbag',
27: 'tie', 28: 'suitcase', 29: 'frisbee', 30: 'skis', 31: 'snowboard',
32: 'sports ball', 33: 'kite', 34: 'baseball bat', 35: 'baseball glove',
36: 'skateboard', 37: 'surfboard', 38: 'tennis racket', 39: 'bottle',
40: 'wine glass', 41: 'cup', 42: 'fork', 43: 'knife', 44: 'spoon', 45: 'bowl',
46: 'banana', 47: 'apple', 48: 'sandwich', 49: 'orange', 50: 'broccoli',
51: 'carrot', 52: 'hot dog', 53: 'pizza', 54: 'donut', 55: 'cake', 56: 'chair',
57: 'couch', 58: 'potted plant', 59: 'bed', 60: 'dining table', 61: 'toilet',
62: 'tv', 63: 'laptop', 64: 'mouse', 65: 'remote', 66: 'keyboard',
67: 'cell phone', 68: 'microwave', 69: 'oven', 70: 'toaster', 71: 'sink',
72: 'refrigerator', 73: 'book', 74: 'clock', 75: 'vase', 76: 'scissors',
77: 'teddy bear', 78: 'hair drier', 79: 'toothbrush'
}
# Traffic-related classes we're interested in (using standard COCO indices)
TRAFFIC_CLASS_NAMES = COCO_CLASSES
# --- Model Conversion and Quantization ---
def convert_yolo_to_openvino(model_name: str = "yolo11x", half: bool = True) -> Path:
"""Convert YOLOv11x PyTorch model to OpenVINO IR format."""
pt_path = Path(f"{model_name}.pt")
ov_dir = Path(f"{model_name}_openvino_model")
ov_xml = ov_dir / f"{model_name}.xml"
if not ov_xml.exists():
print(f"Exporting {pt_path} to OpenVINO IR...")
model = YOLO(str(pt_path))
model.export(format="openvino", dynamic=True, half=half)
else:
print(f"OpenVINO IR already exists: {ov_xml}")
return ov_xml
def quantize_openvino_model(ov_xml: Path, model_name: str = "yolo11x") -> Path:
"""Quantize OpenVINO IR model to INT8 using NNCF."""
int8_dir = Path(f"{model_name}_openvino_int8_model")
int8_xml = int8_dir / f"{model_name}.xml"
if int8_xml.exists():
print(f"INT8 model already exists: {int8_xml}")
return int8_xml
print("Quantization requires a calibration dataset. Skipping actual quantization in this demo.")
return ov_xml # Return FP32 if no quantization
# --- OpenVINO Inference Pipeline ---
class OpenVINOYOLODetector:
def __init__(self, model_xml: Path, device: str = "AUTO"):
self.core = ov.Core()
self.device = device
self.model = self.core.read_model(model_xml)
self.input_shape = self.model.inputs[0].shape
self.input_height = self.input_shape[2]
self.input_width = self.input_shape[3]
self.ov_config = {}
if device != "CPU":
self.model.reshape({0: [1, 3, 640, 640]})
if "GPU" in device or ("AUTO" in device and "GPU" in self.core.available_devices):
self.ov_config = {"GPU_DISABLE_WINOGRAD_CONVOLUTION": "YES"}
self.compiled_model = self.core.compile_model(model=self.model, device_name=self.device, config=self.ov_config)
self.output_layer = self.compiled_model.output(0)
def preprocess(self, frame: np.ndarray) -> np.ndarray:
img = cv2.resize(frame, (self.input_width, self.input_height))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = img.astype(np.float32) / 255.0
img = img.transpose(2, 0, 1)[None]
return img
def infer(self, frame: np.ndarray, conf_threshold: float = 0.25) -> List[Dict]:
input_tensor = self.preprocess(frame)
output = self.compiled_model([input_tensor])[self.output_layer]
return self.postprocess(output, frame.shape, conf_threshold)
def postprocess(self, output: np.ndarray, frame_shape, conf_threshold: float) -> List[Dict]:
# Output: (1, 84, 8400) or (84, 8400) or (8400, 84)
if output.ndim == 3:
output = np.squeeze(output)
if output.shape[0] == 84:
output = output.T # (8400, 84)
boxes = output[:, :4]
scores = output[:, 4:]
class_ids = np.argmax(scores, axis=1)
confidences = np.max(scores, axis=1)
detections = []
h, w = frame_shape[:2]
for i, (box, score, class_id) in enumerate(zip(boxes, confidences, class_ids)):
if score < conf_threshold:
continue
x_c, y_c, bw, bh = box
# If normalized, scale to input size
if all(0.0 <= v <= 1.0 for v in box):
x_c *= self.input_width
y_c *= self.input_height
bw *= self.input_width
bh *= self.input_height
# Scale to original frame size
scale_x = w / self.input_width
scale_y = h / self.input_height
x_c *= scale_x
y_c *= scale_y
bw *= scale_x
bh *= scale_y
x1 = int(round(x_c - bw / 2))
y1 = int(round(y_c - bh / 2))
x2 = int(round(x_c + bw / 2))
y2 = int(round(y_c + bh / 2))
x1 = max(0, min(x1, w - 1))
y1 = max(0, min(y1, h - 1))
x2 = max(0, min(x2, w - 1))
y2 = max(0, min(y2, h - 1))
if x2 <= x1 or y2 <= y1:
continue
# Only keep class 9 as traffic light, rename if found
if class_id == 9:
class_name = "traffic light"
elif class_id < len(TRAFFIC_CLASS_NAMES):
class_name = TRAFFIC_CLASS_NAMES[class_id]
else:
continue # Remove unknown/other classes
detections.append({
'bbox': [x1, y1, x2, y2],
'confidence': float(score),
'class_id': int(class_id),
'class_name': class_name
})
return detections
def draw(self, frame: np.ndarray, detections: List[Dict], box_thickness: int = 2) -> np.ndarray:
# 80+ visually distinct colors for COCO classes (BGR)
COCO_COLORS = [
(255, 56, 56), (255, 157, 151), (255, 112, 31), (255, 178, 29), (207, 210, 49),
(72, 249, 10), (146, 204, 23), (61, 219, 134), (26, 147, 52), (0, 212, 187),
(44, 153, 168), (0, 194, 255), (52, 69, 147), (100, 115, 255), (0, 24, 236),
(132, 56, 255), (82, 0, 133), (203, 56, 255), (255, 149, 200), (255, 55, 199),
(255, 255, 56), (255, 255, 151), (255, 255, 31), (255, 255, 29), (207, 255, 49),
(72, 255, 10), (146, 255, 23), (61, 255, 134), (26, 255, 52), (0, 255, 187),
(44, 255, 168), (0, 255, 255), (52, 255, 147), (100, 255, 255), (0, 255, 236),
(132, 255, 255), (82, 255, 133), (203, 255, 255), (255, 255, 200), (255, 255, 199),
(56, 255, 255), (157, 255, 151), (112, 255, 31), (178, 255, 29), (210, 255, 49),
(249, 255, 10), (204, 255, 23), (219, 255, 134), (147, 255, 52), (212, 255, 187),
(153, 255, 168), (194, 255, 255), (69, 255, 147), (115, 255, 255), (24, 255, 236),
(56, 132, 255), (157, 82, 151), (112, 203, 31), (178, 255, 29), (210, 255, 49),
(249, 72, 10), (204, 146, 23), (219, 61, 134), (147, 26, 52), (212, 0, 187),
(153, 44, 168), (194, 0, 255), (69, 52, 147), (115, 100, 255), (24, 0, 236),
(56, 132, 255), (157, 82, 151), (112, 203, 31), (178, 255, 29), (210, 255, 49),
(249, 72, 10), (204, 146, 23), (219, 61, 134), (147, 26, 52), (212, 0, 187),
(153, 44, 168), (194, 0, 255), (69, 52, 147), (115, 100, 255), (24, 0, 236),
(56, 132, 255), (157, 82, 151), (112, 203, 31), (178, 255, 29), (210, 255, 49)
]
for det in detections:
x1, y1, x2, y2 = det['bbox']
label = f"{det['class_name']} {det['confidence']:.2f}"
color = COCO_COLORS[det['class_id'] % len(COCO_COLORS)]
cv2.rectangle(frame, (x1, y1), (x2, y2), color, box_thickness)
cv2.putText(frame, label, (x1, max(y1 - 10, 0)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
return frame
# --- Video/Image/Live Inference ---
def run_inference(detector: OpenVINOYOLODetector, source=0, conf_threshold=0.25, flip=False, use_popup=False, video_width=None):
if isinstance(source, str) and not os.path.exists(source):
print(f"Downloading sample video: {source}")
import requests
url = "https://storage.openvinotoolkit.org/repositories/openvino_notebooks/data/data/video/people.mp4"
r = requests.get(url)
with open(source, 'wb') as f:
f.write(r.content)
cap = cv2.VideoCapture(source)
if not cap.isOpened():
print(f"Failed to open video source: {source}")
return
window_name = "YOLOv11x + OpenVINO Detection"
if use_popup:
cv2.namedWindow(window_name, cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)
frame_count = 0
times = []
while True:
ret, frame = cap.read()
if not ret:
break
if flip:
frame = cv2.flip(frame, 1)
if video_width:
scale = video_width / max(frame.shape[:2])
frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
start = time.time()
detections = detector.infer(frame, conf_threshold=conf_threshold)
frame = detector.draw(frame, detections)
elapsed = time.time() - start
times.append(elapsed)
if len(times) > 200:
times.pop(0)
fps = 1.0 / np.mean(times) if times else 0
cv2.putText(frame, f"FPS: {fps:.1f}", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 2)
if use_popup:
cv2.imshow(window_name, frame)
if cv2.waitKey(1) & 0xFF == 27:
break
else:
cv2.imshow(window_name, frame)
if cv2.waitKey(1) & 0xFF == 27:
break
frame_count += 1
cap.release()
cv2.destroyAllWindows()
# --- Main Entrypoint ---
if __name__ == "__main__":
# Choose model: yolo11x or yolo11n, etc.
MODEL_NAME = "yolo11x"
DEVICE = "AUTO" # or "CPU", "GPU"
# Step 1: Convert model if needed
ov_xml = convert_yolo_to_openvino(MODEL_NAME)
# Step 2: Quantize (optional, demo skips actual quantization)
ov_xml = quantize_openvino_model(ov_xml, MODEL_NAME)
# Step 3: Create detector
detector = OpenVINOYOLODetector(ov_xml, device=DEVICE)
# Step 4: Run on webcam, video, or image
# Webcam: source=0, Video: source="video.mp4", Image: source="image.jpg"
run_inference(detector, source=0, conf_threshold=0.25, flip=True, use_popup=True, video_width=1280)
# To run on a video file: run_inference(detector, source="people.mp4", conf_threshold=0.25)
# To run on an image: run_inference(detector, source="image.jpg", conf_threshold=0.25)
# To run async or batch, extend the OpenVINOYOLODetector class with async API as needed.
import numpy as np
import cv2
def postprocess_openvino_yolo(output, conf_threshold=0.4, iou_threshold=0.5, input_shape=(640, 640), original_shape=None):
"""
output: OpenVINO raw output tensor (e.g., shape [1, 25200, 85])
conf_threshold: minimum confidence
iou_threshold: for NMS
input_shape: model input size (w, h)
original_shape: original image size (w, h)
"""
# 1. Squeeze batch dimension
output = np.squeeze(output) # [25200, 85]
# 2. Split predictions
boxes = output[:, :4]
obj_conf = output[:, 4]
class_scores = output[:, 5:]
# 3. Get class with highest score
class_ids = np.argmax(class_scores, axis=1)
class_conf = class_scores[np.arange(len(class_scores)), class_ids]
# 4. Multiply objectness confidence with class confidence
scores = obj_conf * class_conf
# 5. Filter by confidence threshold
mask = scores > conf_threshold
boxes = boxes[mask]
scores = scores[mask]
class_ids = class_ids[mask]
if original_shape is not None:
# Rescale boxes from input_shape to original image shape
input_w, input_h = input_shape
orig_w, orig_h = original_shape
scale_x = orig_w / input_w
scale_y = orig_h / input_h
boxes[:, 0] *= scale_x # x1
boxes[:, 1] *= scale_y # y1
boxes[:, 2] *= scale_x # x2
boxes[:, 3] *= scale_y # y2
# 6. Convert boxes to [x, y, w, h] format for OpenCV NMS
boxes_xywh = []
for box in boxes:
x1, y1, x2, y2 = box
boxes_xywh.append([x1, y1, x2 - x1, y2 - y1])
# 7. Apply NMS
indices = cv2.dnn.NMSBoxes(boxes_xywh, scores.tolist(), conf_threshold, iou_threshold)
# 8. Return filtered boxes
result_boxes = []
result_scores = []
result_classes = []
if len(boxes) > 0 and len(scores) > 0:
indices = cv2.dnn.NMSBoxes(boxes_xywh, scores.tolist(), conf_threshold, iou_threshold)
if len(indices) > 0:
indices = np.array(indices).flatten()
for i in indices:
i = int(i)
result_boxes.append(boxes[i])
result_scores.append(scores[i])
result_classes.append(class_ids[i])
return result_boxes, result_scores, result_classes
import os
import time
import numpy as np
import cv2
from pathlib import Path
from typing import List, Dict, Optional
# Only traffic-related classes for detection
TRAFFIC_CLASS_NAMES = [
'person', 'bicycle', 'car', 'motorcycle', 'bus', 'truck',
'traffic light', 'stop sign', 'parking meter'
]
class OpenVINOVehicleDetector:
def __init__(self, model_path: str = None, device: str = "AUTO", use_quantized: bool = False, enable_ocr: bool = False, confidence_threshold: float = 0.4):
import openvino as ov
self.device = device
self.confidence_threshold = confidence_threshold
self.ocr_reader = None
self.class_names = TRAFFIC_CLASS_NAMES
self.performance_stats = {
'fps': 0,
'avg_inference_time': 0,
'frames_processed': 0,
'backend': f"OpenVINO-{device}",
'total_detections': 0,
'detection_rate': 0
}
self._inference_times = []
self._start_time = time.time()
self._frame_count = 0
# Model selection logic
self.model_path = self._find_best_model(model_path, use_quantized)
self.core = ov.Core()
self.model = self.core.read_model(self.model_path)
# Always reshape to static shape before accessing .shape
self.model.reshape({0: [1, 3, 640, 640]})
self.input_shape = self.model.inputs[0].shape
self.input_height = self.input_shape[2]
self.input_width = self.input_shape[3]
self.ov_config = {}
if device != "CPU":
# Already reshaped above, so nothing more needed here
pass
if "GPU" in device or ("AUTO" in device and "GPU" in self.core.available_devices):
self.ov_config = {"GPU_DISABLE_WINOGRAD_CONVOLUTION": "YES"}
self.compiled_model = self.core.compile_model(model=self.model, device_name=self.device, config=self.ov_config)
self.output_layer = self.compiled_model.output(0)
def _find_best_model(self, model_path, use_quantized):
# Priority: quantized IR > IR > .pt
search_paths = [
Path(model_path) if model_path else None,
Path("yolo11x_openvino_int8_model/yolo11x.xml") if use_quantized else None,
Path("yolo11x_openvino_model/yolo11x.xml"),
Path("rcb/yolo11x_openvino_model/yolo11x.xml"),
Path("yolo11x.xml"),
Path("rcb/yolo11x.xml"),
Path("yolo11x.pt"),
Path("rcb/yolo11x.pt")
]
for p in search_paths:
if p and p.exists():
return str(p)
raise FileNotFoundError("No suitable YOLOv11x model found for OpenVINO.")
def detect_vehicles(self, frame: np.ndarray, conf_threshold: float = None) -> List[Dict]:
if conf_threshold is None:
conf_threshold = 0.1 # Lowered for debugging
start = time.time()
input_tensor = self._preprocess(frame)
output = self.compiled_model([input_tensor])[self.output_layer]
# Debug: print raw output shape
print(f"[DEBUG] Model output shape: {output.shape}")
detections = self._postprocess(output, frame.shape, conf_threshold)
print(f"[DEBUG] Detections after postprocess: {len(detections)}")
elapsed = time.time() - start
self._inference_times.append(elapsed)
self._frame_count += 1
self.performance_stats['frames_processed'] = self._frame_count
self.performance_stats['total_detections'] += len(detections)
if len(self._inference_times) > 100:
self._inference_times.pop(0)
self.performance_stats['avg_inference_time'] = float(np.mean(self._inference_times)) if self._inference_times else 0
total_time = time.time() - self._start_time
self.performance_stats['fps'] = self._frame_count / total_time if total_time > 0 else 0
return detections
def _preprocess(self, frame: np.ndarray) -> np.ndarray:
img = cv2.resize(frame, (self.input_width, self.input_height))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = img.astype(np.float32) / 255.0
img = img.transpose(2, 0, 1)[None]
return img
def _postprocess(self, output: np.ndarray, frame_shape, conf_threshold: float) -> List[Dict]:
# Output: (1, 84, 8400) or (84, 8400) or (8400, 84)
if output.ndim == 3:
output = np.squeeze(output)
if output.shape[0] == 84:
output = output.T # (8400, 84)
boxes = output[:, :4]
scores = output[:, 4:]
class_ids = np.argmax(scores, axis=1)
confidences = np.max(scores, axis=1)
detections = []
h, w = frame_shape[:2]
for i, (box, score, class_id) in enumerate(zip(boxes, confidences, class_ids)):
if score < conf_threshold:
continue
x_c, y_c, bw, bh = box
# If normalized, scale to input size
if all(0.0 <= v <= 1.0 for v in box):
x_c *= self.input_width
y_c *= self.input_height
bw *= self.input_width
bh *= self.input_height
# Scale to original frame size
scale_x = w / self.input_width
scale_y = h / self.input_height
x_c *= scale_x
y_c *= scale_y
bw *= scale_x
bh *= scale_y
x1 = int(round(x_c - bw / 2))
y1 = int(round(y_c - bh / 2))
x2 = int(round(x_c + bw / 2))
y2 = int(round(y_c + bh / 2))
x1 = max(0, min(x1, w - 1))
y1 = max(0, min(y1, h - 1))
x2 = max(0, min(x2, w - 1))
y2 = max(0, min(y2, h - 1))
if x2 <= x1 or y2 <= y1:
continue
# Only keep class 9 as traffic light, rename if found
if class_id == 9:
class_name = "traffic light"
elif class_id < len(TRAFFIC_CLASS_NAMES):
class_name = TRAFFIC_CLASS_NAMES[class_id]
else:
continue # Remove unknown/other classes
detections.append({
'bbox': [x1, y1, x2, y2],
'confidence': float(score),
'class_id': int(class_id),
'class_name': class_name
})
print(f"[DEBUG] Raw detections before NMS: {len(detections)}")
# Apply NMS
if len(detections) > 0:
boxes = np.array([det['bbox'] for det in detections])
scores = np.array([det['confidence'] for det in detections])
indices = cv2.dnn.NMSBoxes(boxes.tolist(), scores.tolist(), conf_threshold, 0.5)
if isinstance(indices, (list, tuple)) and len(indices) > 0:
indices = np.array(indices).flatten()
elif isinstance(indices, np.ndarray) and indices.size > 0:
indices = indices.flatten()
else:
indices = []
detections = [detections[int(i)] for i in indices] if len(indices) > 0 else []
print(f"[DEBUG] Detections after NMS: {len(detections)}")
return detections
def draw(self, frame: np.ndarray, detections: List[Dict], box_thickness: int = 2) -> np.ndarray:
# 80+ visually distinct colors for COCO classes (BGR)
COCO_COLORS = [
(255, 56, 56), (255, 157, 151), (255, 112, 31), (255, 178, 29), (207, 210, 49),
(72, 249, 10), (146, 204, 23), (61, 219, 134), (26, 147, 52), (0, 212, 187),
(44, 153, 168), (0, 194, 255), (52, 69, 147), (100, 115, 255), (0, 24, 236),
(132, 56, 255), (82, 0, 133), (203, 56, 255), (255, 149, 200), (255, 55, 199),
(255, 255, 56), (255, 255, 151), (255, 255, 31), (255, 255, 29), (207, 255, 49),
(72, 255, 10), (146, 255, 23), (61, 255, 134), (26, 255, 52), (0, 255, 187),
(44, 255, 168), (0, 255, 255), (52, 255, 147), (100, 255, 255), (0, 255, 236),
(132, 255, 255), (82, 255, 133), (203, 255, 255), (255, 255, 200), (255, 255, 199),
(56, 255, 255), (157, 255, 151), (112, 255, 31), (178, 255, 29), (210, 255, 49),
(249, 255, 10), (204, 255, 23), (219, 255, 134), (147, 255, 52), (212, 255, 187),
(153, 255, 168), (194, 255, 255), (69, 255, 147), (115, 255, 255), (24, 255, 236),
(56, 132, 255), (157, 82, 151), (112, 203, 31), (178, 255, 29), (210, 255, 49),
(249, 72, 10), (204, 146, 23), (219, 61, 134), (147, 26, 52), (212, 0, 187),
(153, 44, 168), (194, 0, 255), (69, 52, 147), (115, 100, 255), (24, 0, 236),
(56, 132, 255), (157, 82, 151), (112, 203, 31), (178, 255, 29), (210, 255, 49),
(249, 72, 10), (204, 146, 23), (219, 61, 134), (147, 26, 52), (212, 0, 187),
(153, 44, 168), (194, 0, 255), (69, 52, 147), (115, 100, 255), (24, 0, 236),
(56, 132, 255), (157, 82, 151), (112, 203, 31), (178, 255, 29), (210, 255, 49)
]
for det in detections:
x1, y1, x2, y2 = det['bbox']
label = f"{det['class_name']} {det['confidence']:.2f}"
color = COCO_COLORS[det['class_id'] % len(COCO_COLORS)]
cv2.rectangle(frame, (x1, y1), (x2, y2), color, box_thickness)
cv2.putText(frame, label, (x1, max(y1 - 10, 0)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
return frame
# --- Video/Image/Live Inference ---
def run_inference(detector: OpenVINOYOLODetector, source=0, conf_threshold=0.25, flip=False, use_popup=False, video_width=None):
if isinstance(source, str) and not os.path.exists(source):
print(f"Downloading sample video: {source}")
import requests
url = "https://storage.openvinotoolkit.org/repositories/openvino_notebooks/data/data/video/people.mp4"
r = requests.get(url)
with open(source, 'wb') as f:
f.write(r.content)
cap = cv2.VideoCapture(source)
if not cap.isOpened():
print(f"Failed to open video source: {source}")
return
window_name = "YOLOv11x + OpenVINO Detection"
if use_popup:
cv2.namedWindow(window_name, cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)
frame_count = 0
times = []
while True:
ret, frame = cap.read()
if not ret:
break
if flip:
frame = cv2.flip(frame, 1)
if video_width:
scale = video_width / max(frame.shape[:2])
frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
start = time.time()
detections = detector.infer(frame, conf_threshold=conf_threshold)
frame = detector.draw(frame, detections)
elapsed = time.time() - start
times.append(elapsed)
if len(times) > 200:
times.pop(0)
fps = 1.0 / np.mean(times) if times else 0
cv2.putText(frame, f"FPS: {fps:.1f}", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 2)
if use_popup:
cv2.imshow(window_name, frame)
if cv2.waitKey(1) & 0xFF == 27:
break
else:
cv2.imshow(window_name, frame)
if cv2.waitKey(1) & 0xFF == 27:
break
frame_count += 1
cap.release()
cv2.destroyAllWindows()
# --- Main Entrypoint ---
if __name__ == "__main__":
# Choose model: yolo11x or yolo11n, etc.
MODEL_NAME = "yolo11x"
DEVICE = "AUTO" # or "CPU", "GPU"
# Step 1: Convert model if needed
ov_xml = convert_yolo_to_openvino(MODEL_NAME)
# Step 2: Quantize (optional, demo skips actual quantization)
ov_xml = quantize_openvino_model(ov_xml, MODEL_NAME)
# Step 3: Create detector
detector = OpenVINOYOLODetector(ov_xml, device=DEVICE)
# Step 4: Run on webcam, video, or image
# Webcam: source=0, Video: source="video.mp4", Image: source="image.jpg"
run_inference(detector, source=0, conf_threshold=0.25, flip=True, use_popup=True, video_width=1280)
# To run on a video file: run_inference(detector, source="people.mp4", conf_threshold=0.25)
# To run on an image: run_inference(detector, source="image.jpg", conf_threshold=0.25)
# To run async or batch, extend the OpenVINOYOLODetector class with async API as needed.
import numpy as np
import cv2
def postprocess_openvino_yolo(output, conf_threshold=0.4, iou_threshold=0.5, input_shape=(640, 640), original_shape=None):
"""
output: OpenVINO raw output tensor (e.g., shape [1, 25200, 85])
conf_threshold: minimum confidence
iou_threshold: for NMS
input_shape: model input size (w, h)
original_shape: original image size (w, h)
"""
# 1. Squeeze batch dimension
output = np.squeeze(output) # [25200, 85]
# 2. Split predictions
boxes = output[:, :4]
obj_conf = output[:, 4]
class_scores = output[:, 5:]
# 3. Get class with highest score
class_ids = np.argmax(class_scores, axis=1)
class_conf = class_scores[np.arange(len(class_scores)), class_ids]
# 4. Multiply objectness confidence with class confidence
scores = obj_conf * class_conf
# 5. Filter by confidence threshold
mask = scores > conf_threshold
boxes = boxes[mask]
scores = scores[mask]
class_ids = class_ids[mask]
if original_shape is not None:
# Rescale boxes from input_shape to original image shape
input_w, input_h = input_shape
orig_w, orig_h = original_shape
scale_x = orig_w / input_w
scale_y = orig_h / input_h
boxes[:, 0] *= scale_x # x1
boxes[:, 1] *= scale_y # y1
boxes[:, 2] *= scale_x # x2
boxes[:, 3] *= scale_y # y2
# 6. Convert boxes to [x, y, w, h] format for OpenCV NMS
boxes_xywh = []
for box in boxes:
x1, y1, x2, y2 = box
boxes_xywh.append([x1, y1, x2 - x1, y2 - y1])
# 7. Apply NMS
indices = cv2.dnn.NMSBoxes(boxes_xywh, scores.tolist(), conf_threshold, iou_threshold)
# 8. Return filtered boxes
result_boxes = []
result_scores = []
result_classes = []
if len(boxes) > 0 and len(scores) > 0:
indices = cv2.dnn.NMSBoxes(boxes_xywh, scores.tolist(), conf_threshold, iou_threshold)
if len(indices) > 0:
indices = np.array(indices).flatten()
for i in indices:
i = int(i)
result_boxes.append(boxes[i])
result_scores.append(scores[i])
result_classes.append(class_ids[i])
return result_boxes, result_scores, result_classes
import os
import time
import numpy as np
import cv2
from pathlib import Path
from typing import List, Dict, Optional
# Only traffic-related classes for detection
TRAFFIC_CLASS_NAMES = [
'person', 'bicycle', 'car', 'motorcycle', 'bus', 'truck',
'traffic light', 'stop sign', 'parking meter'
]
class OpenVINOVehicleDetector:
def __init__(self, model_path: str = None, device: str = "AUTO", use_quantized: bool = False, enable_ocr: bool = False, confidence_threshold: float = 0.4):
import openvino as ov
self.device = device
self.confidence_threshold = confidence_threshold
self.ocr_reader = None
self.class_names = TRAFFIC_CLASS_NAMES
self.performance_stats = {
'fps': 0,
'avg_inference_time': 0,
'frames_processed': 0,
'backend': f"OpenVINO-{device}",
'total_detections': 0,
'detection_rate': 0
}
self._inference_times = []
self._start_time = time.time()
self._frame_count = 0
# Model selection logic
self.model_path = self._find_best_model(model_path, use_quantized)
self.core = ov.Core()
self.model = self.core.read_model(self.model_path)
# Always reshape to static shape before accessing .shape
self.model.reshape({0: [1, 3, 640, 640]})
self.input_shape = self.model.inputs[0].shape
self.input_height = self.input_shape[2]
self.input_width = self.input_shape[3]
self.ov_config = {}
if device != "CPU":
# Already reshaped above, so nothing more needed here
pass
if "GPU" in device or ("AUTO" in device and "GPU" in self.core.available_devices):
self.ov_config = {"GPU_DISABLE_WINOGRAD_CONVOLUTION": "YES"}
self.compiled_model = self.core.compile_model(model=self.model, device_name=self.device, config=self.ov_config)
self.output_layer = self.compiled_model.output(0)
def _find_best_model(self, model_path, use_quantized):
# Priority: quantized IR > IR > .pt
search_paths = [
Path(model_path) if model_path else None,
Path("yolo11x_openvino_int8_model/yolo11x.xml") if use_quantized else None,
Path("yolo11x_openvino_model/yolo11x.xml"),
Path("rcb/yolo11x_openvino_model/yolo11x.xml"),
Path("yolo11x.xml"),
Path("rcb/yolo11x.xml"),
Path("yolo11x.pt"),
Path("rcb/yolo11x.pt")
]
for p in search_paths:
if p and p.exists():
return str(p)
raise FileNotFoundError("No suitable YOLOv11x model found for OpenVINO.")
def detect_vehicles(self, frame: np.ndarray, conf_threshold: float = None) -> List[Dict]:
if conf_threshold is None:
conf_threshold = 0.1 # Lowered for debugging
start = time.time()
input_tensor = self._preprocess(frame)
output = self.compiled_model([input_tensor])[self.output_layer]
# Debug: print raw output shape
print(f"[DEBUG] Model output shape: {output.shape}")
detections = self._postprocess(output, frame.shape, conf_threshold)
print(f"[DEBUG] Detections after postprocess: {len(detections)}")
elapsed = time.time() - start
self._inference_times.append(elapsed)
self._frame_count += 1
self.performance_stats['frames_processed'] = self._frame_count
self.performance_stats['total_detections'] += len(detections)
if len(self._inference_times) > 100:
self._inference_times.pop(0)
self.performance_stats['avg_inference_time'] = float(np.mean(self._inference_times)) if self._inference_times else 0
total_time = time.time() - self._start_time
self.performance_stats['fps'] = self._frame_count / total_time if total_time > 0 else 0
return detections
def _preprocess(self, frame: np.ndarray) -> np.ndarray:
img = cv2.resize(frame, (self.input_width, self.input_height))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = img.astype(np.float32) / 255.0
img = img.transpose(2, 0, 1)[None]
return img
def _postprocess(self, output: np.ndarray, frame_shape, conf_threshold: float) -> List[Dict]:
# Output: (1, 84, 8400) or (84, 8400) or (8400, 84)
if output.ndim == 3:
output = np.squeeze(output)
if output.shape[0] == 84:
output = output.T # (8400, 84)
boxes = output[:, :4]
scores = output[:, 4:]
class_ids = np.argmax(scores, axis=1)
confidences = np.max(scores, axis=1)
detections = []
h, w = frame_shape[:2]
for i, (box, score, class_id) in enumerate(zip(boxes, confidences, class_ids)):
if score < conf_threshold:
continue
x_c, y_c, bw, bh = box
# If normalized, scale to input size
if all(0.0 <= v <= 1.0 for v in box):
x_c *= self.input_width
y_c *= self.input_height
bw *= self.input_width
bh *= self.input_height
# Scale to original frame size
scale_x = w / self.input_width
scale_y = h / self.input_height
x_c *= scale_x
y_c *= scale_y
bw *= scale_x
bh *= scale_y
x1 = int(round(x_c - bw / 2))
y1 = int(round(y_c - bh / 2))
x2 = int(round(x_c + bw / 2))
y2 = int(round(y_c + bh / 2))
x1 = max(0, min(x1, w - 1))
y1 = max(0, min(y1, h - 1))
x2 = max(0, min(x2, w - 1))
y2 = max(0, min(y2, h - 1))
if x2 <= x1 or y2 <= y1:
continue
# Only keep class 9 as traffic light, rename if found
if class_id == 9:
class_name = "traffic light"
elif class_id < len(TRAFFIC_CLASS_NAMES):
class_name = TRAFFIC_CLASS_NAMES[class_id]
else:
continue # Remove unknown/other classes
detections.append({
'bbox': [x1, y1, x2, y2],
'confidence': float(score),
'class_id': int(class_id),
'class_name': class_name
})
print(f"[DEBUG] Raw detections before NMS: {len(detections)}")
# Apply NMS
if len(detections) > 0:
boxes = np.array([det['bbox'] for det in detections])
scores = np.array([det['confidence'] for det in detections])
indices = cv2.dnn.NMSBoxes(boxes.tolist(), scores.tolist(), conf_threshold, 0.5)
if isinstance(indices, (list, tuple)) and len(indices) > 0:
indices = np.array(indices).flatten()
elif isinstance(indices, np.ndarray) and indices.size > 0:
indices = indices.flatten()
else:
indices = []
detections = [detections[int(i)] for i in indices] if len(indices) > 0 else []
print(f"[DEBUG] Detections after NMS: {len(detections)}")
return detections
def draw(self, frame: np.ndarray, detections: List[Dict], box_thickness: int = 2) -> np.ndarray:
# 80+ visually distinct colors for COCO classes (BGR)
COCO_COLORS = [
(255, 56, 56), (255, 157, 151), (255, 112, 31), (255, 178, 29), (207, 210, 49),
(72, 249, 10), (146, 204, 23), (61, 219, 134), (26, 147, 52), (0, 212, 187),
(44, 153, 168), (0, 194, 255), (52, 69, 147), (100, 115, 255), (0, 24, 236),
(132, 56, 255), (82, 0, 133), (203, 56, 255), (255, 149, 200), (255, 55, 199),
(255, 255, 56), (255, 255, 151), (255, 255, 31), (255, 255, 29), (207, 255, 49),
(72, 255, 10), (146, 255, 23), (61, 255, 134), (26, 255, 52), (0, 255, 187),
(44, 255, 168), (0, 255, 255), (52, 255, 147), (100, 255, 255), (0, 255, 236),
(132, 255, 255), (82, 255, 133), (203, 255, 255), (255, 255, 200), (255, 255, 199),
(56, 255, 255), (157, 255, 151), (112, 255, 31), (178, 255, 29), (210, 255, 49),
(249, 255, 10), (204, 255, 23), (219, 255, 134), (147, 255, 52), (212, 255, 187),
(153, 255, 168), (194, 255, 255), (69, 255, 147), (115, 255, 255), (24, 255, 236),
(56, 132, 255), (157, 82, 151), (112, 203, 31), (178, 255, 29), (210, 255, 49),
(249, 72, 10), (204, 146, 23), (219, 61, 134), (147, 26, 52), (212, 0, 187),
(153, 44, 168), (194, 0, 255), (69, 52, 147), (115, 100, 255), (24, 0, 236),
(56, 132, 255), (157, 82, 151), (112, 203, 31), (178, 255, 29), (210, 255, 49),
(249, 72, 10), (204, 146, 23), (219, 61, 134), (147, 26, 52), (212, 0, 187),
(153, 44, 168), (194, 0, 255), (69, 52, 147), (115, 100, 255), (24, 0, 236),
(56, 132, 255), (157, 82, 151), (112, 203, 31), (178, 255, 29), (210, 255, 49)
]
for det in detections:
x1, y1, x2, y2 = det['bbox']
label = f"{det['class_name']} {det['confidence']:.2f}"
color = COCO_COLORS[det['class_id'] % len(COCO_COLORS)]
cv2.rectangle(frame, (x1, y1), (x2, y2), color, box_thickness)
cv2.putText(frame, label, (x1, max(y1 - 10, 0)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
return frame
# --- Video/Image/Live Inference ---
def run_inference(detector: OpenVINOYOLODetector, source=0, conf_threshold=0.25, flip=False, use_popup=False, video_width=None):
if isinstance(source, str) and not os.path.exists(source):
print(f"Downloading sample video: {source}")
import requests
url = "https://storage.openvinotoolkit.org/repositories/openvino_notebooks/data/data/video/people.mp4"
r = requests.get(url)
with open(source, 'wb') as f:
f.write(r.content)
cap = cv2.VideoCapture(source)
if not cap.isOpened():
print(f"Failed to open video source: {source}")
return
window_name = "YOLOv11x + OpenVINO Detection"
if use_popup:
cv2.namedWindow(window_name, cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)
frame_count = 0
times = []
while True:
ret, frame = cap.read()
if not ret:
break
if flip:
frame = cv2.flip(frame, 1)
if video_width:
scale = video_width / max(frame.shape[:2])
frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
start = time.time()
detections = detector.infer(frame, conf_threshold=conf_threshold)
frame = detector.draw(frame, detections)
elapsed = time.time() - start
times.append(elapsed)
if len(times) > 200:
times.pop(0)
fps = 1.0 / np.mean(times) if times else 0
cv2.putText(frame, f"FPS: {fps:.1f}", (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 2)
if use_popup:
cv2.imshow(window_name, frame)
if cv2.waitKey(1) & 0xFF == 27:
break
else:
cv2.imshow(window_name, frame)
if cv2.waitKey(1) & 0xFF == 27:
break
frame_count += 1
cap.release()
cv2.destroyAllWindows()
# --- Main Entrypoint ---
if __name__ == "__main__":
# Choose model: yolo11x or yolo11n, etc.
MODEL_NAME = "yolo11x"
DEVICE = "AUTO" # or "CPU", "GPU"
# Step 1: Convert model if needed
ov_xml = convert_yolo_to_openvino(MODEL_NAME)
# Step 2: Quantize (optional, demo skips actual quantization)
ov_xml = quantize_openvino_model(ov_xml, MODEL_NAME)
# Step 3: Create detector
detector = OpenVINOYOLODetector(ov_xml, device=DEVICE)
# Step 4: Run on webcam, video, or image
# Webcam: source=0, Video: source="video.mp4", Image: source="image.jpg"
run_inference(detector, source=0, conf_threshold=0.25, flip=True, use_popup=True, video_width=1280)
# To run on a video file: run_inference(detector, source="people.mp4", conf_threshold=0.25)
# To run on an image: run_inference(detector, source="image.jpg", conf_threshold=0.25)
# To run async or batch, extend the OpenVINOYOLODetector class with async API as needed.
import numpy as np
import cv2
def postprocess_openvino_yolo(output, conf_threshold=0.4, iou_threshold=0.5, input_shape=(640, 640), original_shape=None):
"""
output: OpenVINO raw output tensor (e.g., shape [1, 25200, 85])
conf_threshold: minimum confidence
iou_threshold: for NMS
input_shape: model input size (w, h)
original_shape: original image size (w, h)
"""
# 1. Squeeze batch dimension
output = np.squeeze(output) # [25200, 85]
# 2. Split predictions
boxes = output[:, :4]
obj_conf = output[:, 4]
class_scores = output[:, 5:]
# 3. Get class with highest score
class_ids = np.argmax(class_scores, axis=1)
class_conf = class_scores[np.arange(len(class_scores)), class_ids]
# 4. Multiply objectness confidence with class confidence
scores = obj_conf * class_conf
# 5. Filter by confidence threshold
mask = scores > conf_threshold
boxes = boxes[mask]
scores = scores[mask]
class_ids = class_ids[mask]
if original_shape is not None:
# Rescale boxes from input_shape to original image shape
input_w, input_h = input_shape
orig_w, orig_h = original_shape
scale_x = orig_w / input_w
scale_y = orig_h / input_h
boxes[:, 0] *= scale_x # x1
boxes[:, 1] *= scale_y # y1
boxes[:, 2] *= scale_x # x2
boxes[:, 3] *= scale_y # y2
# 6. Convert boxes to [x, y, w, h] format for OpenCV NMS
boxes_xywh = []
for box in boxes:
x1, y1, x2, y2 = box
boxes_xywh.append([x1, y1, x2 - x1, y2 - y1])
# 7. Apply NMS
indices = cv2.dnn.NMSBoxes(boxes_xywh, scores.tolist(), conf_threshold, iou_threshold)
# 8. Return filtered boxes
result_boxes = []
result_scores = []
result_classes = []
if len(boxes) > 0 and len(scores) > 0:
indices = cv2.dnn.NMSBoxes(boxes_xywh, scores.tolist(), conf_threshold, iou_threshold)
if len(indices) > 0:
indices = np.array(indices).flatten()
for i in indices:
i = int(i)
result_boxes.append(boxes[i])
result_scores.append(scores[i])
result_classes.append(class_ids[i])
return result_boxes, result_scores, result_classes
import os
import time
import numpy as np
import cv2
from pathlib import Path
from typing import List, Dict, Optional
# Only traffic-related classes for detection
TRAFFIC_CLASS_NAMES = [
'person', 'bicycle', 'car', 'motorcycle', 'bus', 'truck',
'traffic light', 'stop sign', 'parking meter'
]
class OpenVINOVehicleDetector:
def __init__(self, model_path: str = None, device: str = "AUTO", use_quantized: bool = False, enable_ocr: bool = False, confidence_threshold: float = 0.4):
import openvino as ov
self.device = device
self.confidence_threshold = confidence_threshold
self.ocr_reader = None
self.class_names = TRAFFIC_CLASS_NAMES
self.performance_stats = {
'fps': 0,
'avg_inference_time': 0,
'frames_processed': 0,
'backend': f"OpenVINO-{device}",
'total_detections': 0,
'detection_rate': 0
}
self._inference_times = []
self._start_time = time.time()
self._frame_count = 0
# Model selection logic
self.model_path = self._find_best_model(model_path, use_quantized)
self.core = ov.Core()
self.model = self.core.read_model(self.model_path)
# Always reshape to static shape before accessing .shape
self.model.reshape({0: [1, 3, 640, 640]})
self.input_shape = self.model.inputs[0].shape
self.input_height = self.input_shape[2]
self.input_width = self.input_shape[3]
self.ov_config = {}
if device != "CPU":
# Already reshaped above, so nothing more needed here
pass
if "GPU" in device or ("AUTO" in device and "GPU" in self.core.available_devices):
self.ov_config = {"GPU_DISABLE_WINOGRAD_CONVOLUTION": "YES"}
self.compiled_model = self.core.compile_model(model=self.model, device_name=self.device, config=self.ov_config)
self.output_layer = self.compiled_model.output(0)
def _find_best_model(self, model_path, use_quantized):
# Priority: quantized IR > IR > .pt
search_paths = [
Path(model_path) if model_path else None,
Path("yolo11x_openvino_int8_model/yolo11x.xml") if use_quantized else None,
Path("yolo11x_openvino_model/yolo11x.xml"),
Path("rcb/yolo11x_openvino_model/yolo11x.xml"),
Path("yolo11x.xml"),
Path("rcb/yolo11x.xml"),
Path("yolo11x.pt"),
Path("rcb/yolo11x.pt")
]
for p in search_paths:
if p and p.exists():
return str(p)
raise FileNotFoundError("No suitable YOLOv11x model found for OpenVINO.")
def detect_vehicles(self, frame: np.ndarray, conf_threshold: float = None) -> List[Dict]:
if conf_threshold is None:
conf_threshold = 0.1 # Lowered for debugging
start = time.time()
input_tensor = self._preprocess(frame)
output = self.compiled_model([input_tensor])[self.output_layer]
# Debug: print raw output shape
print(f"[DEBUG] Model output shape: {output.shape}")
detections = self._postprocess(output, frame.shape, conf_threshold)
print(f"[DEBUG] Detections after postprocess: {len(detections)}")
elapsed = time.time() - start
self._inference_times.append(elapsed)
self._frame_count += 1
self.performance_stats['frames_processed'] = self._frame_count
self.performance_stats['total_detections'] += len(detections)
if len(self._inference_times) > 100:
self._inference_times.pop(0)
self.performance_stats['avg_inference_time'] = float(np.mean(self._inference_times)) if self._inference_times else 0
total_time = time.time() - self._start_time
self.performance_stats['fps'] = self._frame_count / total_time if total_time > 0 else 0
return detections
def _preprocess(self, frame: np.ndarray) -> np.ndarray:
img = cv2.resize(frame, (self.input_width, self.input_height))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = img.astype(np.float32) / 255.0
img = img.transpose(2, 0, 1)[None]
return img
def _postprocess(self, output: np.ndarray, frame_shape, conf_threshold: float) -> List[Dict]:
# Output: (1, 84, 8400) or (84, 8400) or (8400, 84)
if output.ndim == 3:
output = np.squeeze(output)
if output.shape[0] == 84:
output = output.T # (8400, 84)
boxes = output[:, :4]
scores = output[:, 4:]
class_ids = np.argmax(scores, axis=1)
confidences = np.max(scores, axis=1)
detections = []
h, w = frame_shape[:2]
for i, (box, score, class_id) in enumerate(zip(boxes, confidences, class_ids)):
if score < conf_threshold:
continue
x_c, y_c, bw, bh = box
# If normalized, scale to input size
if all(0.0 <= v <= 1.0 for v in box):
x_c *= self.input_width
y_c *= self.input_height
bw *= self.input_width
bh *= self.input_height
# Scale to original frame size
scale_x = w / self.input_width
scale_y = h / self.input_height
x_c *= scale_x
y_c *= scale_y
bw *= scale_x
bh *= scale_y
x1 = int(round(x_c - bw / 2))
y1 = int(round(y_c - bh / 2))
x2 = int(round(x_c + bw / 2))
y2 = int(round(y_c + bh / 2))
x1 = max(0, min(x1, w - 1))
y1 = max(0, min(y1, h - 1))
x2 = max(0, min(x2, w - 1))
y2 = max(0, min(y2, h - 1))
if x2 <= x1 or y2 <= y1:
continue
# Only keep class 9 as traffic light, rename if found
if class_id == 9:
class_name = "traffic light"
elif class_id < len(TRAFFIC_CLASS_NAMES):
class_name = TRAFFIC_CLASS_NAMES[class_id]
else:
continue # Remove unknown/other classes
detections.append({
'bbox': [x1, y1, x2, y2],
'confidence': float(score),
'class_id': int(class_id),
'class_name': class_name
})
print(f"[DEBUG] Raw detections before NMS: {len(detections)}")
# Apply NMS
if len(detections) > 0:
boxes = np.array([det['bbox'] for det in detections])
scores = np.array([det['confidence'] for det in detections])
indices = cv2.dnn.NMSBoxes(boxes.tolist(), scores.tolist(), conf_threshold, 0.5)
if isinstance(indices, (list, tuple)) and len(indices) > 0:
indices = np.array(indices).flatten()
elif isinstance(indices, np.ndarray) and indices.size > 0:
indices = indices.flatten()
else:
indices = []
detections = [detections[int(i)] for i in indices] if len(indices) > 0 else []
print(f"[DEBUG] Detections after NMS: {len(detections)}")
return detections
def draw(self, frame: np.ndarray, detections: List[Dict], box_thickness: int = 2) -> np.ndarray:
# 80+ visually distinct colors for COCO classes (BGR)
COCO_COLORS = [
(255, 56, 56), (255, 157, 151), (255, 112, 31), (255, 178, 29), (207, 210, 49),
(72, 249, 10), (146, 204, 23), (61, 219, 134), (26, 147, 52), (0, 212, 187),
(44, 153, 168), (0, 194, 255), (52, 69, 147), (100, 115, 255), (0, 24, 236),
(132, 56, 255), (82, 0, 133), (203, 56, 255), (255, 149, 200), (255, 55, 199),
(255, 255, 56), (255, 255, 151), (255, 255, 31), (255, 255, 29), (207, 255, 49),
(72, 255, 10), (146, 255, 23), (61, 255, 134), (26, 255, 52), (0, 255, 187),
(44, 255, 168), (0, 255, 255), (52, 255, 147), (100, 255, 255), (0, 255, 236),
(132, 255, 255), (82, 255, 133), (203, 255, 255), (255, 255, 200), (255, 255, 199),
(56, 255, 255), (157, 255, 151), (112, 255, 31), (178, 255, 29), (210, 255, 49),
(249, 255, 10), (204, 255, 23), (219, 255, 134), (147, 255, 52), (212, 255, 187),
(153, 255, 168), (194, 255, 255), (69, 255, 147), (115, 255, 255), (24, 255, 236),
(56, 132, 255), (157, 82, 151), (112, 203, 31), (178, 255, 29), (210, 255, 49),
(249, 72, 10), (204, 146, 23), (219, 61, 134), (147, 26, 52), (212, 0, 187),
(153, 44, 168), (194, 0, 255), (69, 52, 147), (115, 100, 255), (24, 0, 236),
(56, 132, 255), (157, 82, 151), (112, 203, 31), (178, 255, 29), (210, 255, 49),
(249, 72, 10), (204, 146, 23), (219, 61, 134), (147, 26, 52), (212, 0, 187),
(153, 44, 168), (194, 0, 255), (69, 52, 147), (115, 100, 255), (24, 0, 236),
(56, 132, 255), (157, 82, 151), (112, 203, 31), (178, 255, 29), (210, 255, 49)
]
for det in detections:
x1, y1, x2, y2 = det['bbox']
label = f"{det['class_name']} {det['confidence']:.2f}"
color = COCO_COLORS[det['class_id'] % len(COCO_COLORS)]
cv2.rectangle(frame, (x1, y1), (x2, y2), color, box_thickness)
cv2.putText(frame, label, (x1, max(y1 - 10, 0)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
return frame