-
Notifications
You must be signed in to change notification settings - Fork 3.1k
Description
Search before asking
- I have searched the Supervision issues and found no similar feature requests.
Question
I am using a Lucid Machine Vision Camera and I want to use supervision for detection, speed estimation of objects from real time feed.
Since some of the functions/annotators are having the arguments as the video_info which has fps and total_frames which cannot be calculated on a real time feed and also I am using a machine vision camera which are connected with ethernet cable so getting feed from the cameras is also very different compared to web cam's since cv2.VideoCapture() doesn't work.
I am attaching the code how I get the camera feed below.
import time
import cv2
import numpy as np
from arena_api import enums
from arena_api.buffer import BufferFactory
from arena_api.system import system
window_width = 800
window_height = 600
def select_device_from_user_input():
device_infos = system.device_infos
if len(device_infos) == 0:
print("No camera connected\nPress enter to search again")
input()
print("Devices found:")
selected_index = 0
for i in range(len(device_infos)):
if device_infos[i]['serial'] == "camera_serial_number":
selected_index = i
selected_model = device_infos[selected_index]['model']
print(f"\nCreate device: {selected_model}...")
device = system.create_device(device_infos=device_infos[selected_index])[0]
return device
def apply_gamma_correction(frame, gamma):
corrected_frame = frame.astype(np.float32) / 255.0
corrected_frame = np.power(corrected_frame, gamma)
corrected_frame = (corrected_frame * 255.0).astype(np.uint8)
return corrected_frame
def is_moving(frame1, frame2):
diff = np.sqrt(np.mean(np.square(frame1 - frame2)))
print(diff)
return diff
def get_image_buffers(is_color_camera=False):
device = select_device_from_user_input()
device.tl_stream_nodemap.get_node(
'StreamBufferHandlingMode').value = 'NewestOnly'
device.tl_stream_nodemap.get_node('StreamPacketResendEnable').value = True
device.tl_stream_nodemap.get_node(
'StreamAutoNegotiatePacketSize').value = True
isp_bayer_pattern = device.nodemap.get_node('IspBayerPattern').value
is_color_camera = False
device.nodemap.get_node('Width').value = 3072
device.nodemap.get_node('Height').value = 1080
if isp_bayer_pattern != 'NONE':
is_color_camera = True
if is_color_camera == True:
device.nodemap.get_node('PixelFormat').value = "BayerRG8"
else:
device.nodemap.get_node('PixelFormat').value = "Mono8"
device.nodemap.get_node('DeviceStreamChannelPacketSize').value = 1500
device.nodemap.get_node('AcquisitionMode').value = "Continuous"
device.nodemap.get_node('AcquisitionFrameRateEnable').value = True
device.nodemap.get_node('AcquisitionFrameRate').value = 30.0
device.nodemap.get_node('AcquisitionFrameRateEnable').value = True
device.nodemap.get_node('GainAuto').value = "Off"
device.nodemap.get_node('Gain').value = 0.0
device.nodemap.get_node('BalanceWhiteEnable').value = True
device.nodemap.get_node('BalanceWhiteAuto').value = "Continuous"
device.nodemap['GammaEnable'].value = True
device.nodemap.get_node('ExposureAuto').value = "Off"
device.nodemap.get_node('ExposureTime').value = 4000.00
device.nodemap['Gamma'].value = 0.350
device.nodemap['ColorTransformationEnable'].value = True
key = -1
cv2.namedWindow("Image-1", cv2.WINDOW_NORMAL)
device.start_stream()
# Initialize FPS calculation
fps_start_time = time.time()
fps_counter = 0
while True:
image_buffer = device.get_buffer() # optional args
nparray = np.ctypeslib.as_array(image_buffer.pdata, shape=(image_buffer.height, image_buffer.width, int(
image_buffer.bits_per_pixel / 8))).reshape(image_buffer.height, image_buffer.width, int(image_buffer.bits_per_pixel / 8))
if is_color_camera == True:
display_img = cv2.cvtColor(nparray, cv2.COLOR_BayerBG2BGR)
nparray = cv2.cvtColor(display_img, cv2.COLOR_BGR2GRAY)
else:
display_img = cv2.cvtColor(nparray, cv2.COLOR_GRAY2BGR)
decoded_img = display_img
# decoded_img = display_img[700:2000, :]
print(decoded_img.shape)
# decoded_img = apply_gamma_correction(decoded_img, gamma=0.5)
# Calculate and display FPS
fps_counter += 1
if time.time() - fps_start_time >= 1:
fps = fps_counter / (time.time() - fps_start_time)
fps_start_time = time.time()
fps_counter = 0
cv2.putText(decoded_img, f'FPS: {fps:.2f}', (50, 150),
cv2.FONT_HERSHEY_SIMPLEX, 6, (0, 255, 0), 2)
print(fps)
cv2.imshow("Image-1", decoded_img)
key = cv2.waitKey(1) & 0xFF
if key == ord("q"):
break
device.requeue_buffer(image_buffer)
if __name__ == "__main__":
get_image_buffers(is_color_camera=True)
Don't worry about device.nodemap or device.nodemap.get_node these are the settings to get a clear image from the camera.
π Seeking Expert Help π
Dear @LinasKo, @skylargivens, @iurisilvio, @sberan,
I hope you can assist with a challenging issue I've encountered.
Thanks
Likith
Additional
I've been deeply engaged in refining the following code snippet to achieve a dual goal: detecting objects and calculating speed estimation. But sadly unable to achieve what I am looking for, The core of our computations revolves around the decoded_image(have a look at the code below). Your expertise in this area could provide crucial insights into overcoming the current challenges.
I hope this context gives you a clear understanding of the work at hand. Your input on this matter would be greatly appreciated.
import argparse
from collections import defaultdict, deque
from pathlib import Path
import cv2
import numpy as np
from shapely.geometry import Polygon, Point
from ultralytics import YOLO
from ultralytics.utils.plotting import Annotator, colors
import time
from arena_api import enums
from arena_api.buffer import BufferFactory
from arena_api.system import system
import torch
import supervision as sv
SOURCE = np.array([[0, 0], [3072, 0], [3072, 1080], [0, 1080]])
TARGET_WIDTH = 0.7388
TARGET_HEIGHT = 0.2594
TARGET = np.array([
[0, 0],
[TARGET_WIDTH, 0],
[TARGET_WIDTH, TARGET_HEIGHT],
[0, TARGET_HEIGHT]
])
# Global variables
window_width = 800
window_height = 600
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print("Current Device:", device)
is_color_camera = False
gamma = 0.1
gamma_table = np.array([((i / 255.0) ** (1.0 / gamma)) * 255 for i in np.arange(0, 256)]).astype(np.uint8)
def parse_arguments() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Speed Estimation using Ultralytics and Supervision"
)
parser.add_argument(
"--source_video_path",
required=False,
default="/home/harvestedlabs/Desktop/Codes/39l.mp4",
help="Path to the source video file",
type=str
)
return parser.parse_args()
class ViewTransformer:
def __init__(self, source=np.ndarray, target=np.ndarray):
source = source.astype(np.float32)
target = target.astype(np.float32)
self.m = cv2.getPerspectiveTransform(source, target)
def transform_points(self, points: np.ndarray) -> np.ndarray:
if points.size == 0:
print("Warning: No points to transform.")
return np.array([])
reshaped_points = points.reshape(-1, 1, 2).astype(np.float32)
transformed_points = cv2.perspectiveTransform(reshaped_points, self.m)
return transformed_points.reshape(-1, 2)
def select_device_from_user_input():
device_infos = system.device_infos
if len(device_infos) == 0:
print("No camera connected\nPress enter to search again")
input()
print("Devices found:")
selected_index = 0
for i in range(len(device_infos)):
if device_infos[i]['serial'] == "222600043":
selected_index = i
# 222600043 223200992
selected_model = device_infos[selected_index]['model']
print(f"\nCreate device: {selected_model}...")
device = system.create_device(device_infos=device_infos[selected_index])[0]
return device
def get_image_buffers(is_color_camera=False):
"""Captures and processes image buffers from the camera."""
device = select_device_from_user_input()
# Camera configuration
device.tl_stream_nodemap.get_node('StreamBufferHandlingMode').value = 'NewestOnly'
device.tl_stream_nodemap.get_node('StreamPacketResendEnable').value = True
device.tl_stream_nodemap.get_node('StreamAutoNegotiatePacketSize').value = True
isp_bayer_pattern = device.nodemap.get_node('IspBayerPattern').value
is_color_camera = isp_bayer_pattern != 'NONE'
device.nodemap.get_node('Width').value = 3072
device.nodemap.get_node('Height').value = 2048
if is_color_camera:
device.nodemap.get_node('PixelFormat').value = "BayerRG8"
else:
device.nodemap.get_node('PixelFormat').value = "Mono8"
# Features
device.nodemap.get_node('BalanceWhiteAuto').value = "Continuous"
device.nodemap.get_node('DeviceStreamChannelPacketSize').value = 1500
device.nodemap.get_node('AcquisitionMode').value = "Continuous"
device.nodemap.get_node('AcquisitionFrameRateEnable').value = True
device.nodemap['ColorTransformationEnable'].value = True
device.nodemap['BalanceWhiteEnable'].value = True
device.nodemap['GammaEnable'].value = True
device.nodemap['Gamma'].value = 0.350
device.nodemap.get_node('ExposureAuto').value = "Off"
device.nodemap.get_node('ExposureTime').value = 2000.00
device.nodemap.get_node('GainAuto').value = "Off"
device.nodemap.get_node('Gain').value = 0.0
device.start_stream()
fps_start_time = time.time() # Initialize start_time
fps_counter = 0 # Initialize fps_counter
return device, fps_start_time, fps_counter
def process_frames(device, fps_start_time, fps_counter):
"""Process frames from the device and calculate FPS."""
model = YOLO("/home/harvestedlabs/Desktop/Codes/Likith/token.pt")
print("YOLO model loaded.")
byte_track = sv.ByteTrack(frame_rate=0) # Placeholder, will be updated later
print("ByteTrack initialized.")
# Obtain the resolution of the camera feed
width = device.nodemap.get_node('Width').value
height = device.nodemap.get_node('Height').value
resolution_wh = (width, height)
thickness = sv.calculate_optimal_line_thickness(resolution_wh=resolution_wh)
text_scale = sv.calculate_optimal_text_scale(resolution_wh=resolution_wh)
bounding_box_annotator = sv.BoundingBoxAnnotator(thickness=thickness, color_lookup=sv.ColorLookup.TRACK)
label_annotator = sv.LabelAnnotator(text_scale=text_scale, text_thickness=thickness, text_position=sv.Position.BOTTOM_CENTER, color_lookup=sv.ColorLookup.TRACK)
trace_annotator = sv.TraceAnnotator(thickness=thickness, trace_length=0, position=sv.Position.BOTTOM_CENTER, color_lookup=sv.ColorLookup.TRACK) # Placeholder, will be updated later
polygon_zone = sv.PolygonZone(SOURCE)
zone_annotator = sv.PolygonZoneAnnotator(zone=polygon_zone, color=sv.Color.WHITE, thickness=6, text_thickness=6, text_scale=4)
view_transformer = ViewTransformer(SOURCE, TARGET)
coordinates = defaultdict(lambda: deque(maxlen=0)) # Placeholder, will be updated later
# Define video_info
video_info = sv.VideoInfo(width=width, height=height, fps=30, total_frames=None) # Change fps if necessary
with sv.VideoSink(target_path='target.mp4', video_info=video_info) as sink:
while True:
image_buffer = device.get_buffer()
nparray = np.ctypeslib.as_array(image_buffer.pdata, shape=(image_buffer.height, image_buffer.width, int(
image_buffer.bits_per_pixel / 8))).reshape(image_buffer.height, image_buffer.width, int(image_buffer.bits_per_pixel / 8))
if is_color_camera:
display_img = cv2.cvtColor(nparray, cv2.COLOR_BayerBG2BGR)
nparray = cv2.cvtColor(display_img, cv2.COLOR_BGR2GRAY)
else:
display_img = cv2.cvtColor(nparray, cv2.COLOR_GRAY2BGR)
decoded_img = display_img
image = cv2.resize(decoded_img, (960, 540))
# Calculate and display FPS
fps_counter += 1
if time.time() - fps_start_time >= 1:
fps = fps_counter / (time.time() - fps_start_time)
fps_start_time = time.time()
fps_counter = 0
cv2.putText(decoded_img, f'FPS: {fps:.2f}', (50, 150),
cv2.FONT_HERSHEY_SIMPLEX, 6, (0, 255, 0), 2)
print(fps)
byte_track.frame_rate = fps
trace_annotator.trace_length = fps * 2
coordinates.default_factory = lambda: deque(maxlen=fps)
try:
result = model(image)
print("Frame processed by model.")
if not result:
print("No result for the frame, skipping.")
continue
detections = sv.Detections.from_ultralytics(result[0])
detections = detections[polygon_zone.trigger(detections)]
detections = byte_track.update_with_detections(detections=detections)
points = detections.get_anchors_coordinates(anchor=sv.Position.BOTTOM_CENTER)
if points.size > 0:
points = view_transformer.transform_points(points=points)
else:
print("No points detected in the frame.")
labels = []
for tracker_id, [_, point] in zip(detections.tracker_id, points):
coordinates[tracker_id].append(point)
points = np.array(coordinates[tracker_id], np.int32)
if points.size > 0:
speeds = sv.calculate_speed(
points=points, fps=byte_track.frame_rate, scaler=3600 / 1000)
label = f'{speeds[-1]:.2f} km/h'
labels.append(label)
zone_annotator.annotate(frame=image)
bounding_box_annotator.annotate(frame=image, detections=detections)
label_annotator.annotate(frame=image, detections=detections, labels=labels)
trace_annotator.annotate(frame=image, detections=detections, tracker_coordinates=coordinates)
except Exception as e:
print(f"Error processing frame: {e}")
sink.write_frame(image)
cv2.imshow("Processed Frame", image)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
device.stop_stream()
system.destroy_device(device)
def main() -> None:
args = parse_arguments()
device, fps_start_time, fps_counter = get_image_buffers()
process_frames(device, fps_start_time, fps_counter)
if __name__ == "__main__":
main()