Skip to content

Commit 8630ef7

Browse files
committed
Very first runnable BoT-SORT. Implement camera motion compensation with orb.
1 parent 300e078 commit 8630ef7

File tree

4 files changed

+638
-0
lines changed

4 files changed

+638
-0
lines changed

trackers/core/botsort/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# ------------------------------------------------------------------------
2+
# Trackers
3+
# Copyright (c) 2026 Roboflow. All Rights Reserved.
4+
# Licensed under the Apache License, Version 2.0 [see LICENSE for details]
5+
# ------------------------------------------------------------------------
6+
from .tracker import BoTSORTTracker
7+
8+
__all__ = ["BoTSORTTracker"]

trackers/core/botsort/cmc.py

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
# ------------------------------------------------------------------------
2+
# Trackers
3+
# Copyright (c) 2026 Roboflow. All Rights Reserved.
4+
# Licensed under the Apache License, Version 2.0 [see LICENSE for details]
5+
# ------------------------------------------------------------------------
6+
7+
from __future__ import annotations
8+
9+
from dataclasses import dataclass
10+
from typing import Optional
11+
12+
import copy
13+
import numpy as np
14+
import cv2
15+
16+
17+
@dataclass
18+
class CMCConfig:
19+
downscale: int = 2
20+
fast_threshold: int = 20
21+
22+
# Affine estimation
23+
ransac_reproj_threshold: float = 3.0
24+
25+
# Filtering matches by spatial displacement (fraction of image size)
26+
max_spatial_distance_frac: float = 0.25
27+
28+
# Keep features from central ROI (avoid borders)
29+
roi_min_frac: float = 0.02
30+
roi_max_frac: float = 0.98
31+
32+
33+
class CMC:
34+
def __init__(self, cfg: Optional[CMCConfig] = None) -> None:
35+
self.cfg = cfg or CMCConfig()
36+
self.downscale = max(1, int(self.cfg.downscale))
37+
38+
self.detector = cv2.FastFeatureDetector_create(self.cfg.fast_threshold)
39+
self.extractor = cv2.ORB_create()
40+
self.matcher = cv2.BFMatcher(cv2.NORM_HAMMING)
41+
42+
self._initialized = False
43+
self._prev_kps = None
44+
self._prev_desc: Optional[np.ndarray] = None
45+
46+
def reset(self) -> None:
47+
self._initialized = False
48+
self._prev_kps = None
49+
self._prev_desc = None
50+
51+
def estimate(self, frame_bgr: np.ndarray, dets_xyxy: Optional[np.ndarray] = None) -> np.ndarray:
52+
if frame_bgr is None:
53+
return np.eye(2, 3, dtype=np.float32)
54+
55+
H_img, W_img = frame_bgr.shape[:2]
56+
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
57+
58+
# Downscale for speed / robustness
59+
if self.downscale > 1:
60+
gray = cv2.resize(gray, (W_img // self.downscale, H_img // self.downscale))
61+
H, W = gray.shape[:2]
62+
63+
# Build mask: central ROI + remove detections (background features)
64+
mask = np.zeros_like(gray, dtype=np.uint8)
65+
y0 = int(self.cfg.roi_min_frac * H)
66+
y1 = int(self.cfg.roi_max_frac * H)
67+
x0 = int(self.cfg.roi_min_frac * W)
68+
x1 = int(self.cfg.roi_max_frac * W)
69+
mask[y0:y1, x0:x1] = 255
70+
71+
if dets_xyxy is not None and len(dets_xyxy) > 0:
72+
dets = np.asarray(dets_xyxy, dtype=np.float32) / float(self.downscale)
73+
dets = dets.astype(np.int32)
74+
dets[:, 0] = np.clip(dets[:, 0], 0, W - 1)
75+
dets[:, 2] = np.clip(dets[:, 2], 0, W - 1)
76+
dets[:, 1] = np.clip(dets[:, 1], 0, H - 1)
77+
dets[:, 3] = np.clip(dets[:, 3], 0, H - 1)
78+
for x1b, y1b, x2b, y2b in dets:
79+
if x2b > x1b and y2b > y1b:
80+
mask[y1b:y2b, x1b:x2b] = 0
81+
82+
# Detect + describe
83+
kps = self.detector.detect(gray, mask)
84+
kps, desc = self.extractor.compute(gray, kps)
85+
86+
H_aff = np.eye(2, 3, dtype=np.float32)
87+
88+
# First frame: only initialize
89+
if not self._initialized:
90+
self._prev_kps = copy.copy(kps)
91+
self._prev_desc = None if desc is None else copy.copy(desc)
92+
self._initialized = True
93+
return H_aff
94+
95+
# If missing descriptors
96+
if self._prev_desc is None or desc is None or len(desc) == 0:
97+
self._prev_kps = copy.copy(kps)
98+
self._prev_desc = None if desc is None else copy.copy(desc)
99+
return H_aff
100+
101+
# KNN match (k=2) + ratio test
102+
knn = self.matcher.knnMatch(self._prev_desc, desc, k=2)
103+
if len(knn) == 0:
104+
self._prev_kps = copy.copy(kps)
105+
self._prev_desc = copy.copy(desc)
106+
return H_aff
107+
108+
max_spatial = self.cfg.max_spatial_distance_frac * np.array([W, H], dtype=np.float32)
109+
110+
prev_pts = []
111+
curr_pts = []
112+
spatial = []
113+
114+
for pair in knn:
115+
if len(pair) < 2:
116+
continue
117+
m, n = pair
118+
if m.distance < 0.9 * n.distance:
119+
p_prev = np.array(self._prev_kps[m.queryIdx].pt, dtype=np.float32)
120+
p_curr = np.array(kps[m.trainIdx].pt, dtype=np.float32)
121+
d = p_prev - p_curr
122+
if (abs(d[0]) < max_spatial[0]) and (abs(d[1]) < max_spatial[1]):
123+
spatial.append(d)
124+
prev_pts.append(p_prev)
125+
curr_pts.append(p_curr)
126+
127+
if len(prev_pts) >= 5:
128+
spatial = np.asarray(spatial, dtype=np.float32)
129+
mean = spatial.mean(axis=0)
130+
std = spatial.std(axis=0) + 1e-6
131+
inl = np.logical_and(
132+
np.abs(spatial[:, 0] - mean[0]) < 2.5 * std[0],
133+
np.abs(spatial[:, 1] - mean[1]) < 2.5 * std[1],
134+
)
135+
prev_pts_np = np.asarray(prev_pts, dtype=np.float32)[inl]
136+
curr_pts_np = np.asarray(curr_pts, dtype=np.float32)[inl]
137+
138+
if len(prev_pts_np) >= 5:
139+
H_est, _ = cv2.estimateAffinePartial2D(
140+
prev_pts_np,
141+
curr_pts_np,
142+
method=cv2.RANSAC,
143+
ransacReprojThreshold=self.cfg.ransac_reproj_threshold,
144+
)
145+
if H_est is not None:
146+
H_aff = H_est.astype(np.float32)
147+
if self.downscale > 1:
148+
H_aff[0, 2] *= self.downscale
149+
H_aff[1, 2] *= self.downscale
150+
151+
# Update prev
152+
self._prev_kps = copy.copy(kps)
153+
self._prev_desc = copy.copy(desc)
154+
155+
return H_aff
156+
157+
@staticmethod
158+
def apply_to_tracks(tracks: list, H: np.ndarray) -> None:
159+
if H is None or len(tracks) == 0:
160+
return
161+
162+
H = H.astype(np.float32)
163+
R = H[:2, :2]
164+
t = H[:2, 2:3] # (2,1)
165+
166+
# A4 maps [x1,y1,x2,y2]
167+
A4 = np.zeros((4, 4), dtype=np.float32)
168+
A4[0:2, 0:2] = R
169+
A4[2:4, 2:4] = R
170+
171+
# A8 maps state (pos and vel blocks)
172+
A8 = np.zeros((8, 8), dtype=np.float32)
173+
A8[0:4, 0:4] = A4
174+
A8[4:8, 4:8] = A4
175+
176+
trans4 = np.array([t[0, 0], t[1, 0], t[0, 0], t[1, 0]], dtype=np.float32).reshape(4, 1)
177+
178+
for trk in tracks:
179+
trk.state = (A8 @ trk.state).astype(np.float32)
180+
trk.state[0:4] += trans4
181+
trk.P = (A8 @ trk.P @ A8.T).astype(np.float32)
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
# ------------------------------------------------------------------------
2+
# Trackers
3+
# Copyright (c) 2026 Roboflow. All Rights Reserved.
4+
# Licensed under the Apache License, Version 2.0 [see LICENSE for details]
5+
# ------------------------------------------------------------------------
6+
7+
import numpy as np
8+
9+
class BoTSORTKalmanBoxTracker:
10+
"""
11+
The `BoTSORTKalmanBoxTracker` class represents the internals of a single
12+
tracked object (bounding box), with a Kalman filter to predict and update
13+
its position.
14+
15+
Attributes:
16+
tracker_id: Unique identifier for the tracker.
17+
number_of_successful_updates: Number of times the object has been
18+
updated successfully.
19+
time_since_update: Number of frames since the last update.
20+
state: State vector of the bounding box.
21+
F: State transition matrix.
22+
H: Measurement matrix.
23+
Q: Process noise covariance matrix.
24+
R: Measurement noise covariance matrix.
25+
P: Error covariance matrix.
26+
count_id: Class variable to assign unique IDs to each tracker.
27+
28+
Args:
29+
bbox: Initial bounding box in the form [x1, y1, x2, y2].
30+
"""
31+
32+
count_id = 0
33+
34+
@classmethod
35+
def get_next_tracker_id(cls) -> int:
36+
"""
37+
Class method that returns the next available tracker ID.
38+
39+
Returns:
40+
The next available tracker ID.
41+
"""
42+
next_id = cls.count_id
43+
cls.count_id += 1
44+
return next_id
45+
46+
def __init__(self, bbox: np.ndarray):
47+
# Initialize with a temporary ID of -1
48+
# Will be assigned a real ID when the track is considered mature
49+
self.tracker_id = -1
50+
51+
# Number of hits indicates how many times the object has been
52+
# updated successfully
53+
self.number_of_successful_updates = 1
54+
# Number of frames since the last update
55+
self.time_since_update = 0
56+
57+
# For simplicity, we keep a small state vector:
58+
# (x, y, x2, y2, vx, vy, vx2, vy2).
59+
# We'll store the bounding box in "self.state"
60+
self.state = np.zeros((8, 1), dtype=np.float32)
61+
62+
# Initialize state directly from the first detection
63+
self.state[0] = bbox[0]
64+
self.state[1] = bbox[1]
65+
self.state[2] = bbox[2]
66+
self.state[3] = bbox[3]
67+
68+
# Basic constant velocity model
69+
self._initialize_kalman_filter()
70+
71+
def _initialize_kalman_filter(self) -> None:
72+
"""
73+
Sets up the matrices for the Kalman filter.
74+
"""
75+
# State transition matrix (F): 8x8
76+
# We assume a constant velocity model. Positions are incremented by
77+
# velocity each step.
78+
self.F = np.eye(8, dtype=np.float32)
79+
for i in range(4):
80+
self.F[i, i + 4] = 1.0
81+
82+
# Measurement matrix (H): we directly measure x1, y1, x2, y2
83+
self.H = np.eye(4, 8, dtype=np.float32) # 4x8
84+
85+
# Process covariance matrix (Q)
86+
self.Q = np.eye(8, dtype=np.float32) * 0.01
87+
88+
# Measurement covariance (R): noise in detection
89+
self.R = np.eye(4, dtype=np.float32) * 0.1
90+
91+
# Error covariance matrix (P)
92+
self.P = np.eye(8, dtype=np.float32)
93+
94+
def predict(self) -> None:
95+
"""
96+
Predict the next state of the bounding box (applies the state transition).
97+
"""
98+
# Predict state
99+
self.state = self.F @ self.state
100+
# Predict error covariance
101+
self.P = self.F @ self.P @ self.F.T + self.Q
102+
103+
# Increase time since update
104+
self.time_since_update += 1
105+
106+
def update(self, bbox: np.ndarray) -> None:
107+
"""
108+
Updates the state with a new detected bounding box.
109+
110+
Args:
111+
bbox: Detected bounding box in the form [x1, y1, x2, y2].
112+
"""
113+
self.time_since_update = 0
114+
self.number_of_successful_updates += 1
115+
116+
# Kalman Gain
117+
S = self.H @ self.P @ self.H.T + self.R
118+
K = self.P @ self.H.T @ np.linalg.inv(S)
119+
120+
# Residual
121+
measurement = bbox.reshape((4, 1))
122+
y = measurement - self.H @ self.state
123+
124+
# Update state
125+
self.state = self.state + K @ y
126+
127+
# Update covariance
128+
identity_matrix = np.eye(8, dtype=np.float32)
129+
self.P = (identity_matrix - K @ self.H) @ self.P
130+
131+
def get_state_bbox(self) -> np.ndarray:
132+
"""
133+
Returns the current bounding box estimate from the state vector.
134+
135+
Returns:
136+
The bounding box [x1, y1, x2, y2].
137+
"""
138+
return np.array(
139+
[
140+
self.state[0], # x1
141+
self.state[1], # y1
142+
self.state[2], # x2
143+
self.state[3], # y2
144+
],
145+
dtype=float,
146+
).reshape(-1)

0 commit comments

Comments
 (0)