319 lines
12 KiB
Python
319 lines
12 KiB
Python
"""
|
|
OpenVINO-based embedder for DeepSORT tracking.
|
|
"""
|
|
|
|
import os
|
|
import numpy as np
|
|
from pathlib import Path
|
|
import cv2
|
|
import time
|
|
from typing import List, Optional, Union
|
|
|
|
try:
|
|
import openvino as ov
|
|
except ImportError:
|
|
print("Installing openvino...")
|
|
os.system('pip install --quiet "openvino>=2024.0.0"')
|
|
import openvino as ov
|
|
|
|
class OpenVINOEmbedder:
|
|
"""
|
|
OpenVINO embedder for DeepSORT tracking.
|
|
|
|
This class provides an optimized version of the feature embedder used in DeepSORT,
|
|
using OpenVINO for inference acceleration.
|
|
"""
|
|
def __init__(
|
|
self,
|
|
model_path: Optional[str] = None,
|
|
device: str = "AUTO",
|
|
input_size: tuple = (128, 64),
|
|
batch_size: int = 16,
|
|
bgr: bool = True,
|
|
half: bool = True
|
|
):
|
|
"""
|
|
Initialize the OpenVINO embedder.
|
|
|
|
Args:
|
|
model_path: Path to the model file. If None, will use the default MobileNetV2 model.
|
|
device: Device to run inference on ('CPU', 'GPU', 'AUTO', etc.)
|
|
input_size: Input size for the model (height, width)
|
|
batch_size: Batch size for inference
|
|
bgr: Whether input images are BGR (True) or RGB (False)
|
|
half: Whether to use half precision (FP16)
|
|
"""
|
|
self.device = device
|
|
self.input_size = input_size # (h, w)
|
|
self.batch_size = batch_size
|
|
self.bgr = bgr
|
|
self.half = half
|
|
|
|
# Initialize OpenVINO Core
|
|
self.core = ov.Core()
|
|
|
|
# Find and load model
|
|
if model_path is None:
|
|
# Use MobileNetV2 converted to OpenVINO
|
|
model_path = self._find_mobilenet_model()
|
|
|
|
# If model not found, convert it
|
|
if model_path is None:
|
|
print("⚠️ MobileNetV2 OpenVINO model not found. Creating it...")
|
|
model_path = self._convert_mobilenet()
|
|
else:
|
|
# When model_path is explicitly provided, verify it exists
|
|
if not os.path.exists(model_path):
|
|
print(f"⚠️ Specified model path does not exist: {model_path}")
|
|
print("Falling back to default model search...")
|
|
model_path = self._find_mobilenet_model()
|
|
if model_path is None:
|
|
print("⚠️ Default model search also failed. Creating new model...")
|
|
model_path = self._convert_mobilenet()
|
|
else:
|
|
print(f"✅ Using explicitly provided model: {model_path}")
|
|
|
|
print(f"📦 Loading embedder model: {model_path} on {device}")
|
|
|
|
# Load and compile the model
|
|
self.model = self.core.read_model(model_path)
|
|
|
|
# Set up configuration for device
|
|
ov_config = {}
|
|
if device != "CPU":
|
|
self.model.reshape({0: [self.batch_size, 3, self.input_size[0], self.input_size[1]]})
|
|
if "GPU" in device or ("AUTO" in device and "GPU" in self.core.available_devices):
|
|
ov_config = {"GPU_DISABLE_WINOGRAD_CONVOLUTION": "YES"}
|
|
|
|
# Compile model for the specified device
|
|
self.compiled_model = self.core.compile_model(model=self.model, device_name=self.device, config=ov_config)
|
|
|
|
# Get input and output tensors
|
|
self.input_layer = self.compiled_model.inputs[0]
|
|
self.output_layer = self.compiled_model.outputs[0]
|
|
|
|
# Create inference requests for async inference
|
|
self.infer_requests = [self.compiled_model.create_infer_request() for _ in range(2)]
|
|
self.current_request_idx = 0
|
|
|
|
# Performance stats
|
|
self.total_inference_time = 0
|
|
self.inference_count = 0
|
|
|
|
def _find_mobilenet_model(self) -> Optional[str]:
|
|
"""
|
|
Find MobileNetV2 model converted to OpenVINO format.
|
|
|
|
Returns:
|
|
Path to the model file or None if not found
|
|
"""
|
|
search_paths = [
|
|
# Standard locations
|
|
"mobilenetv2_embedder/mobilenetv2.xml",
|
|
"../mobilenetv2_embedder/mobilenetv2.xml",
|
|
"../../mobilenetv2_embedder/mobilenetv2.xml",
|
|
# Look in models directory
|
|
"../models/mobilenetv2.xml",
|
|
"../../models/mobilenetv2.xml",
|
|
# Look relative to DeepSORT location
|
|
os.path.join(os.path.dirname(__file__), "models/mobilenetv2.xml"),
|
|
# Look in openvino_models
|
|
"../openvino_models/mobilenetv2.xml",
|
|
"../../openvino_models/mobilenetv2.xml"
|
|
]
|
|
|
|
for path in search_paths:
|
|
if os.path.exists(path):
|
|
return path
|
|
|
|
return None
|
|
|
|
def _convert_mobilenet(self) -> str:
|
|
"""
|
|
Convert MobileNetV2 model to OpenVINO IR format.
|
|
|
|
Returns:
|
|
Path to the converted model
|
|
"""
|
|
try:
|
|
# Create directory for the model
|
|
output_dir = Path("mobilenetv2_embedder")
|
|
output_dir.mkdir(exist_ok=True)
|
|
|
|
# First, we need to download the PyTorch model
|
|
import torch
|
|
import torch.nn as nn
|
|
from torchvision.models import mobilenet_v2, MobileNet_V2_Weights
|
|
|
|
print("⬇️ Downloading MobileNetV2 model...")
|
|
model = mobilenet_v2(weights=MobileNet_V2_Weights.IMAGENET1K_V1)
|
|
|
|
# Modify for feature extraction (remove classifier)
|
|
class FeatureExtractor(nn.Module):
|
|
def __init__(self, model):
|
|
super(FeatureExtractor, self).__init__()
|
|
self.features = nn.Sequential(*list(model.children())[:-1])
|
|
|
|
def forward(self, x):
|
|
return self.features(x).squeeze()
|
|
|
|
feature_model = FeatureExtractor(model)
|
|
feature_model.eval()
|
|
|
|
# Save to ONNX
|
|
onnx_path = output_dir / "mobilenetv2.onnx"
|
|
print(f"💾 Converting to ONNX: {onnx_path}")
|
|
dummy_input = torch.randn(1, 3, self.input_size[0], self.input_size[1])
|
|
|
|
torch.onnx.export(
|
|
feature_model,
|
|
dummy_input,
|
|
onnx_path,
|
|
input_names=["input"],
|
|
output_names=["output"],
|
|
dynamic_axes={"input": {0: "batch_size"}, "output": {0: "batch_size"}},
|
|
opset_version=11
|
|
)
|
|
|
|
# Convert ONNX to OpenVINO IR
|
|
ir_path = output_dir / "mobilenetv2.xml"
|
|
print(f"💾 Converting to OpenVINO IR: {ir_path}")
|
|
|
|
# Use the proper OpenVINO API to convert the model
|
|
try:
|
|
from openvino.tools.mo import convert_model
|
|
|
|
print(f"Converting ONNX model using OpenVINO convert_model API...")
|
|
print(f"Input model: {onnx_path}")
|
|
print(f"Output directory: {output_dir}")
|
|
print(f"Input shape: [{self.batch_size},3,{self.input_size[0]},{self.input_size[1]}]")
|
|
print(f"Data type: {'FP16' if self.half else 'FP32'}")
|
|
|
|
# Convert using the proper API
|
|
convert_model(
|
|
model_path=str(onnx_path),
|
|
output_dir=str(output_dir),
|
|
input_shape=[self.batch_size, 3, self.input_size[0], self.input_size[1]],
|
|
data_type="FP16" if self.half else "FP32"
|
|
)
|
|
|
|
print(f"✅ Model successfully converted using OpenVINO convert_model API")
|
|
except Exception as e:
|
|
print(f"Error with convert_model: {e}, trying alternative approach...")
|
|
|
|
# Fallback to subprocess with explicit path if needed
|
|
import subprocess
|
|
import sys
|
|
import os
|
|
|
|
# Try to find mo.py in the OpenVINO installation
|
|
mo_paths = [
|
|
os.path.join(os.environ.get("INTEL_OPENVINO_DIR", ""), "tools", "mo", "mo.py"),
|
|
os.path.join(os.path.dirname(os.path.dirname(os.__file__)), "openvino", "tools", "mo", "mo.py"),
|
|
"C:/Program Files (x86)/Intel/openvino_2021/tools/mo/mo.py",
|
|
"C:/Program Files (x86)/Intel/openvino/tools/mo/mo.py"
|
|
]
|
|
|
|
mo_script = None
|
|
for path in mo_paths:
|
|
if os.path.exists(path):
|
|
mo_script = path
|
|
break
|
|
|
|
if not mo_script:
|
|
raise FileNotFoundError("Cannot find OpenVINO Model Optimizer (mo.py)")
|
|
|
|
cmd = [
|
|
sys.executable,
|
|
mo_script,
|
|
"--input_model", str(onnx_path),
|
|
"--output_dir", str(output_dir),
|
|
"--input_shape", f"[{self.batch_size},3,{self.input_size[0]},{self.input_size[1]}]",
|
|
"--data_type", "FP16" if self.half else "FP32"
|
|
]
|
|
|
|
print(f"Running Model Optimizer: {' '.join(cmd)}")
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
if result.returncode != 0:
|
|
print(f"Error running Model Optimizer: {result.stderr}")
|
|
raise RuntimeError(f"Model Optimizer failed: {result.stderr}")
|
|
|
|
print(f"✅ Model converted: {ir_path}")
|
|
return str(ir_path)
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error converting model: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return None
|
|
|
|
def preprocess(self, crops: List[np.ndarray]) -> np.ndarray:
|
|
"""
|
|
Preprocess image crops for model input.
|
|
|
|
Args:
|
|
crops: List of image crops
|
|
|
|
Returns:
|
|
Preprocessed batch tensor
|
|
"""
|
|
processed = []
|
|
for crop in crops:
|
|
# Resize to expected input size
|
|
crop = cv2.resize(crop, (self.input_size[1], self.input_size[0]))
|
|
|
|
# Convert BGR to RGB if needed
|
|
if not self.bgr and crop.shape[2] == 3:
|
|
crop = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB)
|
|
|
|
# Normalize (0-255 to 0-1)
|
|
crop = crop.astype(np.float32) / 255.0
|
|
|
|
# Change to NCHW format
|
|
crop = crop.transpose(2, 0, 1)
|
|
processed.append(crop)
|
|
|
|
# Stack into batch
|
|
batch = np.stack(processed)
|
|
return batch
|
|
|
|
def __call__(self, crops: List[np.ndarray]) -> np.ndarray:
|
|
"""
|
|
Get embeddings for the image crops.
|
|
|
|
Args:
|
|
crops: List of image crops
|
|
|
|
Returns:
|
|
Embeddings for each crop
|
|
"""
|
|
if not crops:
|
|
return np.array([])
|
|
|
|
# Preprocess crops
|
|
batch = self.preprocess(crops)
|
|
|
|
# Run inference
|
|
start_time = time.time()
|
|
|
|
# Use async inference to improve performance
|
|
request = self.infer_requests[self.current_request_idx]
|
|
self.current_request_idx = (self.current_request_idx + 1) % len(self.infer_requests)
|
|
|
|
request.start_async({self.input_layer.any_name: batch})
|
|
request.wait()
|
|
|
|
# Get output
|
|
embeddings = request.get_output_tensor().data
|
|
|
|
# Track inference time
|
|
inference_time = time.time() - start_time
|
|
self.total_inference_time += inference_time
|
|
self.inference_count += 1
|
|
|
|
# Normalize embeddings
|
|
embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
|
|
|
|
return embeddings
|