Files
2025-08-26 13:24:53 -07:00

373 lines
15 KiB
Python

from PySide6.QtCore import QObject, Signal, QThread, Qt, QMutex, QWaitCondition
from PySide6.QtWidgets import QApplication
import os
import sys
import cv2
import numpy as np
from pathlib import Path
from datetime import datetime
import json
from typing import Dict, List, Tuple, Optional
# Add parent directory to path for imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# OpenVINO GenAI imports
try:
import openvino_genai as ov_genai
import openvino as ov
print("[VLM DEBUG] OpenVINO GenAI imported successfully")
OPENVINO_AVAILABLE = True
except ImportError as e:
print(f"[VLM DEBUG] Failed to import OpenVINO GenAI: {e}")
OPENVINO_AVAILABLE = False
# PIL for image processing
try:
from PIL import Image
print("[VLM DEBUG] PIL imported successfully")
PIL_AVAILABLE = True
except ImportError as e:
print(f"[VLM DEBUG] Failed to import PIL: {e}")
PIL_AVAILABLE = False
class VLMControllerThread(QThread):
"""Worker thread for VLM processing using OpenVINO GenAI."""
result_ready = Signal(dict)
error_occurred = Signal(str)
progress_updated = Signal(int)
def __init__(self, vlm_dir=None):
print("[VLM DEBUG] >>> Entering VLMControllerThread.__init__")
super().__init__()
# Set VLM directory to the downloaded OpenVINO model
if vlm_dir is None:
current_dir = Path(__file__).parent.parent
self.vlm_dir = current_dir / "llava_openvino_model"
else:
self.vlm_dir = Path(vlm_dir).resolve()
print(f"[VLM DEBUG] vlm_dir resolved to: {self.vlm_dir}")
self.mutex = QMutex()
self.condition = QWaitCondition()
self.abort = False
self.image = None
self.prompt = None
self.vlm_pipeline = None
self.device = "GPU" # DEFAULT TO GPU FOR MAXIMUM PERFORMANCE
print(f"[VLM DEBUG] VLMControllerThread initialized (OpenVINO GenAI)")
print(f"[VLM DEBUG] VLM directory: {self.vlm_dir}")
print(f"[VLM DEBUG] Directory exists: {self.vlm_dir.exists()}")
print(f"[VLM DEBUG] 🚀 DEFAULT DEVICE: GPU (priority)")
print("[VLM DEBUG] >>> Calling self._load_model()...")
self._load_model()
print("[VLM DEBUG] <<< Exiting VLMControllerThread.__init__")
def _load_model(self):
print("[VLM DEBUG] >>> Entering _load_model")
try:
print(f"[VLM DEBUG] Starting OpenVINO GenAI model loading...")
# Check if OpenVINO GenAI is available
if not OPENVINO_AVAILABLE:
print(f"[VLM DEBUG] ❌ OpenVINO GenAI not available")
return
# Check if VLM directory exists
if not self.vlm_dir.exists():
print(f"[VLM DEBUG] ❌ VLM directory does not exist: {self.vlm_dir}")
return
# List files in VLM directory
files_in_dir = list(self.vlm_dir.glob("*"))
print(f"[VLM DEBUG] 📁 Files in VLM directory ({len(files_in_dir)}):")
for file in sorted(files_in_dir):
print(f"[VLM DEBUG] - {file.name}")
# Check for required OpenVINO files
required_files = [
"openvino_language_model.xml",
"openvino_language_model.bin",
"openvino_vision_embeddings_model.xml",
"openvino_vision_embeddings_model.bin",
"openvino_text_embeddings_model.xml",
"openvino_text_embeddings_model.bin"
]
missing_files = []
for file in required_files:
if not (self.vlm_dir / file).exists():
missing_files.append(file)
if missing_files:
print(f"[VLM DEBUG] ⚠️ Missing files: {missing_files}")
else:
print(f"[VLM DEBUG] ✅ All required OpenVINO files found")
# Detect available devices with GPU priority
try:
print("[VLM DEBUG] >>> Detecting OpenVINO devices...")
core = ov.Core()
available_devices = core.available_devices
print(f"[VLM DEBUG] 🔍 Available OpenVINO devices: {available_devices}")
gpu_available = "GPU" in available_devices
print(f"[VLM DEBUG] GPU detected by OpenVINO: {gpu_available}")
if not gpu_available:
print(f"[VLM DEBUG] ⚠️ GPU not detected by OpenVINO")
if "GPU" in available_devices:
self.device = "GPU"
print(f"[VLM DEBUG] 🚀 PRIORITY: GPU selected for VLM inference")
elif "CPU" in available_devices:
self.device = "CPU"
print(f"[VLM DEBUG] 🔧 FALLBACK: CPU selected (GPU not available)")
else:
self.device = "AUTO"
print(f"[VLM DEBUG] 🤖 AUTO: Letting OpenVINO choose device")
except Exception as e:
print(f"[VLM DEBUG] ⚠️ Device detection failed: {e}")
print(f"[VLM DEBUG] 🔄 Defaulting to GPU (will fallback to CPU if needed)")
self.device = "GPU"
# Load the VLM pipeline with GPU priority
try:
print(f"[VLM DEBUG] 🚀 Loading VLMPipeline from: {self.vlm_dir}")
print(f"[VLM DEBUG] 🎯 Target device: {self.device}")
self.vlm_pipeline = ov_genai.VLMPipeline(str(self.vlm_dir), self.device)
print(f"[VLM DEBUG] ✅ VLMPipeline loaded successfully on {self.device}!")
except Exception as e:
print(f"[VLM DEBUG] ❌ Failed to load VLMPipeline: {e}")
self.vlm_pipeline = None
except Exception as e:
print(f"[VLM DEBUG] ❌ Error in _load_model: {e}")
self.vlm_pipeline = None
print("[VLM DEBUG] <<< Exiting _load_model")
def process_request(self, image, prompt):
"""Process a VLM request."""
self.mutex.lock()
try:
self.image = image
self.prompt = prompt
self.condition.wakeOne()
finally:
self.mutex.unlock()
def run(self):
"""Main thread loop."""
while not self.abort:
self.mutex.lock()
try:
if self.image is None or self.prompt is None:
self.condition.wait(self.mutex, 100) # Wait for 100ms
continue
# Process the request
image = self.image
prompt = self.prompt
self.image = None
self.prompt = None
finally:
self.mutex.unlock()
# Process outside the lock
result = self._process_request(image, prompt)
self.result_ready.emit(result)
def _process_request(self, image: np.ndarray, prompt: str) -> dict:
"""Process a single VLM request using OpenVINO GenAI."""
try:
if self.vlm_pipeline is None:
return {
"status": "error",
"message": "VLM pipeline not loaded",
"response": "❌ VLM pipeline failed to load. Check logs for OpenVINO GenAI setup.",
"confidence": 0.0,
"timestamp": datetime.now().isoformat(),
"device": "none",
"processing_time": 0.0
}
return self._run_genai_inference(image, prompt)
except Exception as e:
print(f"[VLM DEBUG] Error in _process_request: {e}")
return {
"status": "error",
"message": str(e),
"response": f"❌ VLM processing error: {str(e)}",
"confidence": 0.0,
"timestamp": datetime.now().isoformat(),
"device": getattr(self, 'device', 'unknown'),
"processing_time": 0.0
}
def _run_genai_inference(self, image: np.ndarray, prompt: str) -> dict:
"""Run inference using OpenVINO GenAI VLMPipeline."""
start_time = datetime.now()
try:
print(f"[VLM DEBUG] 🚀 Starting OpenVINO GenAI inference...")
print(f"[VLM DEBUG] 📝 Prompt: {prompt}")
print(f"[VLM DEBUG] 🖼️ Image shape: {image.shape}")
print(f"[VLM DEBUG] 🎯 Device: {self.device}")
# Convert numpy image to PIL Image
if not PIL_AVAILABLE:
return {
"status": "error",
"message": "PIL not available",
"response": "❌ PIL required for image processing",
"confidence": 0.0,
"timestamp": start_time.isoformat(),
"device": self.device,
"processing_time": 0.0
}
# Convert BGR to RGB if needed
if len(image.shape) == 3 and image.shape[2] == 3:
if image.dtype == np.uint8:
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
else:
image_rgb = image
else:
image_rgb = image
# Convert to PIL Image
pil_image = Image.fromarray(image_rgb.astype(np.uint8))
print(f"[VLM DEBUG] 🖼️ PIL Image size: {pil_image.size}")
# Convert PIL image to OpenVINO tensor
image_array = np.array(pil_image)
# Ensure NCHW format for OpenVINO
if len(image_array.shape) == 3:
image_array = np.transpose(image_array, (2, 0, 1)) # HWC to CHW
image_array = np.expand_dims(image_array, axis=0) # Add batch dimension
image_tensor = ov.Tensor(image_array)
print(f"[VLM DEBUG] 🔢 Image tensor shape: {image_tensor.shape}")
# Start chat session
print(f"[VLM DEBUG] 💬 Starting chat session...")
self.vlm_pipeline.start_chat()
# Generate response
print(f"[VLM DEBUG] 🎲 Generating response...")
response = self.vlm_pipeline.generate(
prompt,
image=image_tensor,
max_new_tokens=100,
do_sample=False
)
# Finish chat session
self.vlm_pipeline.finish_chat()
processing_time = (datetime.now() - start_time).total_seconds()
print(f"[VLM DEBUG] ✅ Generation complete!")
print(f"[VLM DEBUG] 📝 Raw response type: {type(response)}")
print(f"[VLM DEBUG] 📝 Raw response: {response}")
# Extract text from VLMDecodedResults object
response_text = ""
try:
if hasattr(response, 'texts'):
if isinstance(response.texts, list) and len(response.texts) > 0:
response_text = response.texts[0]
else:
response_text = str(response.texts)
elif hasattr(response, 'text'):
response_text = response.text
elif hasattr(response, '__str__'):
response_text = str(response)
else:
response_text = f"Unable to extract text from response: {type(response)}"
print(f"[VLM DEBUG] 📝 Extracted text: {response_text}")
except Exception as text_extract_error:
print(f"[VLM DEBUG] ❌ Error extracting text: {text_extract_error}")
response_text = f"Text extraction failed: {str(text_extract_error)}"
print(f"[VLM DEBUG] ⏱️ Processing time: {processing_time:.2f}s")
print(f"[VLM DEBUG] 🎯 Used device: {self.device}")
return {
"status": "success",
"message": "OpenVINO GenAI inference completed",
"response": response_text, # Return extracted text instead of raw object
"raw_response": response, # Keep raw response for debugging
"confidence": 1.0, # GenAI doesn't provide confidence scores
"timestamp": start_time.isoformat(),
"device": self.device,
"processing_time": processing_time
}
except Exception as e:
processing_time = (datetime.now() - start_time).total_seconds()
error_msg = f"OpenVINO GenAI inference failed: {str(e)}"
print(f"[VLM DEBUG] ❌ {error_msg}")
# Try to finish chat session in case of error
try:
self.vlm_pipeline.finish_chat()
except:
pass
return {
"status": "error",
"message": error_msg,
"response": f"❌ VLM inference error: {str(e)}",
"confidence": 0.0,
"timestamp": start_time.isoformat(),
"device": self.device,
"processing_time": processing_time
}
def stop(self):
"""Stop the thread."""
self.mutex.lock()
self.abort = True
self.condition.wakeOne()
self.mutex.unlock()
class VLMController(QObject):
"""Main VLM controller class using OpenVINO GenAI."""
result_ready = Signal(dict)
error_occurred = Signal(str)
def __init__(self, vlm_dir=None):
super().__init__()
print(f"[VLM DEBUG] Initializing VLM Controller (OpenVINO GenAI)")
# Set VLM directory to the downloaded OpenVINO model
if vlm_dir is None:
current_dir = Path(__file__).parent.parent
vlm_dir = current_dir / "llava_openvino_model"
print(f"[VLM DEBUG] VLM directory: {vlm_dir}")
print(f"[VLM DEBUG] VLM directory exists: {vlm_dir.exists()}")
# Store comprehensive data for analysis
self.data_context = {
'detection_data': None,
'frame_analysis': None,
'scene_context': None,
'traffic_state': None
}
# Create worker thread
self.worker_thread = VLMControllerThread(vlm_dir)
self.worker_thread.result_ready.connect(self.result_ready.emit)
self.worker_thread.error_occurred.connect(self.error_occurred.emit)
self.worker_thread.start()
print(f"[VLM DEBUG] VLM Controller initialized successfully (OpenVINO GenAI)")
def process_image(self, image: np.ndarray, prompt: str):
"""Process an image with VLM."""
if self.worker_thread and self.worker_thread.isRunning():
self.worker_thread.process_request(image, prompt)
else:
self.error_occurred.emit("VLM worker thread not running")
def stop(self):
"""Stop the VLM controller."""
if self.worker_thread:
self.worker_thread.stop()
self.worker_thread.wait(5000) # Wait up to 5 seconds for thread to finish