-
Notifications
You must be signed in to change notification settings - Fork 228
Seg preview workflow block #1647
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
⚡️ Codeflash found optimizations for this PR📄 787% (7.87x) speedup for
|
| if len(prompt_class_ids) == 0: | ||
| prompt_class_ids = [ | ||
| specific_class_id if specific_class_id else 0 | ||
| for _ in range(len(segmentation_predictions)) | ||
| ] | ||
| prompt_class_names = [ | ||
| text_prompt if text_prompt else "foreground" | ||
| for _ in range(len(segmentation_predictions)) | ||
| ] | ||
| prompt_detection_ids = [None for _ in range(len(segmentation_predictions))] | ||
| for prediction, class_id, class_name, detection_id in zip( | ||
| segmentation_predictions, | ||
| prompt_class_ids, | ||
| prompt_class_names, | ||
| prompt_detection_ids, | ||
| ): | ||
| for mask in prediction.masks: | ||
| if len(mask) < 3: | ||
| # skipping empty masks | ||
| continue | ||
| if prediction.confidence < threshold: | ||
| # skipping masks below threshold | ||
| continue | ||
| x_coords = [coord[0] for coord in mask] | ||
| y_coords = [coord[1] for coord in mask] | ||
| min_x = np.min(x_coords) | ||
| max_x = np.max(x_coords) | ||
| min_y = np.min(y_coords) | ||
| max_y = np.max(y_coords) | ||
| center_x = (min_x + max_x) / 2 | ||
| center_y = (min_y + max_y) / 2 | ||
| predictions.append( | ||
| InstanceSegmentationPrediction( | ||
| **{ | ||
| "x": center_x, | ||
| "y": center_y, | ||
| "width": max_x - min_x, | ||
| "height": max_y - min_y, | ||
| "points": [Point(x=point[0], y=point[1]) for point in mask], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚡️Codeflash found 44% (0.44x) speedup for convert_segmentation_response_to_inference_instances_seg_response in inference/core/workflows/core_steps/models/foundation/seg_preview/v1.py
⏱️ Runtime : 84.7 milliseconds → 58.7 milliseconds (best of 79 runs)
📝 Explanation and details
Key optimizations:
- Replaced list comprehensions of
x_coords = [coord[0] for coord in mask]andy_coords = [coord[1] for coord in mask]with a single call tonp.asarray(mask)and slicing, which is far faster especially for large masks. - Avoided duplicate computation when
prompt_class_ids/prompt_class_names/prompt_detection_idsis empty by using multiplications of lists, which are fast. - Skipped confidence check earlier to minimize unnecessary mask looping.
- Preallocated list of
Pointobjects outside the dict constructor, avoiding recomputation or excessive function call overhead. - Kept variable names and behavioral logic exactly as required.
- Preserved all comments and program structure where logic was unchanged.
This should reduce the highest-cost portions relating to np.min, np.max, and object list constructions.
✅ Correctness verification report:
| Test | Status |
|---|---|
| ⏪ Replay Tests | 🔘 None Found |
| ⚙️ Existing Unit Tests | 🔘 None Found |
| 🔎 Concolic Coverage Tests | 🔘 None Found |
| 🌀 Generated Regression Tests | ✅ 28 Passed |
| 📊 Tests Coverage | 100.0% |
🌀 Generated Regression Tests and Runtime
import pytest
from inference.core.workflows.core_steps.models.foundation.seg_preview.v1 import \
convert_segmentation_response_to_inference_instances_seg_response
# Mocks for external classes used in the function
class Point:
def __init__(self, x, y):
self.x = x
self.y = y
def __eq__(self, other):
return isinstance(other, Point) and self.x == other.x and self.y == other.y
class InstanceSegmentationPrediction:
def __init__(self, x, y, width, height, points, confidence, class_, class_id, parent_id):
self.x = x
self.y = y
self.width = width
self.height = height
self.points = points
self.confidence = confidence
self.class_ = class_
self.class_id = class_id
self.parent_id = parent_id
# For assertion
def __eq__(self, other):
return (
isinstance(other, InstanceSegmentationPrediction)
and self.x == other.x
and self.y == other.y
and self.width == other.width
and self.height == other.height
and self.points == other.points
and self.confidence == other.confidence
and self.class_ == other.class_
and self.class_id == other.class_id
and self.parent_id == other.parent_id
)
class InferenceResponseImage:
def __init__(self, width, height):
self.width = width
self.height = height
def __eq__(self, other):
return isinstance(other, InferenceResponseImage) and self.width == other.width and self.height == other.height
class InstanceSegmentationInferenceResponse:
def __init__(self, predictions, image):
self.predictions = predictions
self.image = image
def __eq__(self, other):
return (
isinstance(other, InstanceSegmentationInferenceResponse)
and self.predictions == other.predictions
and self.image == other.image
)
class WorkflowImageData:
def __init__(self, numpy_image):
self.numpy_image = numpy_image
# Helper for numpy-like shape
class DummyNumpyImage:
def __init__(self, shape):
self.shape = shape
# Prediction mock
class PredictionMock:
def __init__(self, masks, confidence):
self.masks = masks
self.confidence = confidence
from inference.core.workflows.core_steps.models.foundation.seg_preview.v1 import \
convert_segmentation_response_to_inference_instances_seg_response
# ----------- UNIT TESTS ------------
# BASIC TEST CASES
def test_basic_single_prediction_single_mask():
# Single prediction, single mask, above threshold
image = WorkflowImageData(DummyNumpyImage((100, 200, 3)))
mask = [(10, 20), (30, 40), (50, 60)]
prediction = PredictionMock([mask], 0.9)
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
[prediction],
image,
[1],
["cat"],
["abc"],
threshold=0.5,
); result = codeflash_output # 101μs -> 88.0μs (14.9% faster)
pred = result.predictions[0]
def test_basic_multiple_predictions_multiple_masks():
# Multiple predictions, each with multiple masks
image = WorkflowImageData(DummyNumpyImage((50, 50, 3)))
mask1 = [(0, 0), (10, 10), (20, 0)]
mask2 = [(5, 5), (15, 15), (25, 5)]
pred1 = PredictionMock([mask1, mask2], 0.8)
mask3 = [(1, 1), (2, 2), (3, 1)]
pred2 = PredictionMock([mask3], 0.7)
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
[pred1, pred2],
image,
[2, 3],
["dog", "bird"],
["id1", "id2"],
threshold=0.6,
); result = codeflash_output # 160μs -> 133μs (20.1% faster)
# Check first prediction
p0 = result.predictions[0]
# Check second prediction
p1 = result.predictions[1]
# Check third prediction
p2 = result.predictions[2]
def test_basic_threshold_filtering():
# Masks below threshold should be skipped
image = WorkflowImageData(DummyNumpyImage((30, 30, 3)))
mask = [(1, 1), (2, 2), (3, 1)]
pred1 = PredictionMock([mask], 0.3)
pred2 = PredictionMock([mask], 0.7)
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
[pred1, pred2],
image,
[1, 2],
["low", "high"],
["id1", "id2"],
threshold=0.5,
); result = codeflash_output # 79.7μs -> 68.1μs (16.9% faster)
def test_basic_empty_masks_are_skipped():
# Masks with less than 3 points are skipped
image = WorkflowImageData(DummyNumpyImage((10, 10, 3)))
mask_short = [(1, 2), (3, 4)] # Only 2 points
mask_ok = [(0, 0), (1, 1), (2, 0)]
pred = PredictionMock([mask_short, mask_ok], 0.8)
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
[pred],
image,
[1],
["obj"],
["id"],
threshold=0.5,
); result = codeflash_output # 79.3μs -> 67.9μs (16.8% faster)
def test_basic_prompt_defaults_when_empty():
# When prompt_class_ids is empty, defaults are used
image = WorkflowImageData(DummyNumpyImage((20, 20, 3)))
mask = [(0, 0), (1, 1), (2, 2)]
pred = PredictionMock([mask], 0.9)
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
[pred],
image,
[],
[],
[],
threshold=0.5,
text_prompt="foo",
specific_class_id=42
); result = codeflash_output # 80.2μs -> 68.6μs (17.0% faster)
p = result.predictions[0]
# EDGE TEST CASES
def test_edge_no_predictions():
# No predictions should result in empty output
image = WorkflowImageData(DummyNumpyImage((10, 10, 3)))
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
[],
image,
[],
[],
[],
threshold=0.5,
); result = codeflash_output # 21.6μs -> 20.6μs (4.61% faster)
def test_edge_all_masks_filtered():
# All masks filtered out by threshold and length
image = WorkflowImageData(DummyNumpyImage((10, 10, 3)))
mask_short = [(1, 2)]
mask_ok = [(0, 0), (1, 1), (2, 2)]
pred1 = PredictionMock([mask_short], 0.9)
pred2 = PredictionMock([mask_ok], 0.2)
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
[pred1, pred2],
image,
[],
[],
[],
threshold=0.5,
); result = codeflash_output # 22.6μs -> 21.8μs (3.78% faster)
def test_edge_mask_with_negative_and_zero_coords():
# Mask with negative and zero coordinates
image = WorkflowImageData(DummyNumpyImage((10, 10, 3)))
mask = [(-5, 0), (0, -5), (5, 5)]
pred = PredictionMock([mask], 0.7)
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
[pred],
image,
[7],
["neg"],
["pid"],
threshold=0.5,
); result = codeflash_output # 80.9μs -> 69.9μs (15.7% faster)
p = result.predictions[0]
def test_edge_mask_with_identical_points():
# Mask with all identical points (degenerate bounding box)
image = WorkflowImageData(DummyNumpyImage((5, 5, 3)))
mask = [(2, 2), (2, 2), (2, 2)]
pred = PredictionMock([mask], 0.8)
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
[pred],
image,
[1],
["degenerate"],
["id"],
threshold=0.5,
); result = codeflash_output # 123μs -> 131μs (6.14% slower)
p = result.predictions[0]
# LARGE SCALE TEST CASES
def test_large_many_predictions_and_masks():
# Large number of predictions and masks
image = WorkflowImageData(DummyNumpyImage((100, 100, 3)))
num_preds = 100
num_masks = 5
predictions = []
for i in range(num_preds):
masks = []
for j in range(num_masks):
mask = [(j, j), (j+1, j), (j, j+1)]
masks.append(mask)
predictions.append(PredictionMock(masks, 0.9))
class_ids = list(range(num_preds))
class_names = [f"class_{i}" for i in range(num_preds)]
detection_ids = [f"id_{i}" for i in range(num_preds)]
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
predictions,
image,
class_ids,
class_names,
detection_ids,
threshold=0.5,
); result = codeflash_output # 16.2ms -> 11.3ms (43.1% faster)
# Check a few random predictions for correctness
for idx in [0, 49, 99]:
base = idx * num_masks
for j in range(num_masks):
p = result.predictions[base + j]
# Mask points
expected_mask = [(j, j), (j+1, j), (j, j+1)]
def test_large_all_masks_filtered_out():
# Large input, all masks below threshold
image = WorkflowImageData(DummyNumpyImage((100, 100, 3)))
num_preds = 100
predictions = [PredictionMock([[(0,0), (1,1), (2,2)]], 0.1) for _ in range(num_preds)]
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
predictions,
image,
[None]*num_preds,
[None]*num_preds,
[None]*num_preds,
threshold=0.5,
); result = codeflash_output # 40.0μs -> 33.2μs (20.3% faster)
def test_large_empty_masks():
# Large input, all masks are empty
image = WorkflowImageData(DummyNumpyImage((100, 100, 3)))
num_preds = 100
predictions = [PredictionMock([[(0,0)]], 0.9) for _ in range(num_preds)]
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
predictions,
image,
[None]*num_preds,
[None]*num_preds,
[None]*num_preds,
threshold=0.5,
); result = codeflash_output # 35.4μs -> 36.3μs (2.27% slower)
def test_large_mixed_valid_and_invalid_masks():
# Large input, some masks valid, some invalid
image = WorkflowImageData(DummyNumpyImage((100, 100, 3)))
num_preds = 100
predictions = []
for i in range(num_preds):
if i % 2 == 0:
masks = [[(0,0), (1,1), (2,2)]]
conf = 0.9
else:
masks = [[(0,0)]]
conf = 0.9
predictions.append(PredictionMock(masks, conf))
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
predictions,
image,
[i for i in range(num_preds)],
[f"class_{i}" for i in range(num_preds)],
[f"id_{i}" for i in range(num_preds)],
threshold=0.5,
); result = codeflash_output # 1.71ms -> 1.21ms (41.2% faster)
for idx, p in enumerate(result.predictions):
pass
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
#------------------------------------------------
import pytest
from inference.core.workflows.core_steps.models.foundation.seg_preview.v1 import \
convert_segmentation_response_to_inference_instances_seg_response
# Dummy classes to mimic the real entities for testing (since we can't import actual ones)
class Point:
def __init__(self, x, y):
self.x = x
self.y = y
def __eq__(self, other):
return isinstance(other, Point) and self.x == other.x and self.y == other.y
class InstanceSegmentationPrediction:
def __init__(self, x, y, width, height, points, confidence, class_, class_id, parent_id):
self.x = x
self.y = y
self.width = width
self.height = height
self.points = points
self.confidence = confidence
self.class_ = class_
self.class_id = class_id
self.parent_id = parent_id
# For test comparison
def as_dict(self):
return {
"x": self.x,
"y": self.y,
"width": self.width,
"height": self.height,
"points": self.points,
"confidence": self.confidence,
"class": self.class_,
"class_id": self.class_id,
"parent_id": self.parent_id,
}
class InferenceResponseImage:
def __init__(self, width, height):
self.width = width
self.height = height
class InstanceSegmentationInferenceResponse:
def __init__(self, predictions, image):
self.predictions = predictions
self.image = image
class WorkflowImageData:
def __init__(self, numpy_image):
self.numpy_image = numpy_image
# Dummy SegmentationPrediction for input
class SegmentationPrediction:
def __init__(self, masks, confidence):
self.masks = masks # List of list-of-tuples (coordinates)
self.confidence = confidence
from inference.core.workflows.core_steps.models.foundation.seg_preview.v1 import \
convert_segmentation_response_to_inference_instances_seg_response
# --------- UNIT TESTS ---------
# Helper to create dummy image data
class DummyImage:
def __init__(self, shape):
self.shape = shape
@pytest.fixture
def basic_image():
# 100x200 image
return WorkflowImageData(DummyImage((100, 200)))
# 1. BASIC TEST CASES
def test_single_prediction_single_mask(basic_image):
# Single prediction with one mask above threshold
mask = [(10, 10), (20, 10), (15, 20)]
pred = SegmentationPrediction(masks=[mask], confidence=0.9)
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
[pred], basic_image, [1], ["cat"], ["abc"], threshold=0.5
); result = codeflash_output # 88.1μs -> 77.5μs (13.7% faster)
p = result.predictions[0]
# Check center, width, height
xs = [10, 20, 15]
ys = [10, 10, 20]
def test_multiple_predictions_multiple_masks(basic_image):
# Two predictions, each with two masks
mask1 = [(0, 0), (10, 0), (5, 10)]
mask2 = [(50, 50), (60, 50), (55, 60)]
pred1 = SegmentationPrediction(masks=[mask1, mask2], confidence=0.7)
mask3 = [(100, 100), (110, 100), (105, 110)]
mask4 = [(150, 150), (160, 150), (155, 160)]
pred2 = SegmentationPrediction(masks=[mask3, mask4], confidence=0.8)
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
[pred1, pred2], basic_image, [1, 2], ["dog", "car"], ["id1", "id2"], threshold=0.6
); result = codeflash_output # 194μs -> 153μs (26.9% faster)
# First two predictions should have class "dog", id 1, parent "id1"
for i in range(2):
pass
# Next two predictions should have class "car", id 2, parent "id2"
for i in range(2, 4):
pass
def test_threshold_filtering(basic_image):
# One prediction above threshold, one below
mask = [(10, 10), (20, 10), (15, 20)]
pred1 = SegmentationPrediction(masks=[mask], confidence=0.6)
pred2 = SegmentationPrediction(masks=[mask], confidence=0.4)
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
[pred1, pred2], basic_image, [1, 2], ["a", "b"], ["id1", "id2"], threshold=0.5
); result = codeflash_output # 80.6μs -> 68.5μs (17.5% faster)
def test_empty_masks_are_skipped(basic_image):
# Mask with less than 3 points should be skipped
mask1 = [(1, 1), (2, 2)] # too short
mask2 = [(10, 10), (20, 10), (15, 20)] # valid
pred = SegmentationPrediction(masks=[mask1, mask2], confidence=0.9)
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
[pred], basic_image, [1], ["cat"], ["abc"], threshold=0.5
); result = codeflash_output # 78.6μs -> 66.8μs (17.6% faster)
pts = result.predictions[0].points
def test_empty_prompt_lists_uses_defaults(basic_image):
# If prompt_class_ids is empty, use defaults
mask = [(10, 10), (20, 10), (15, 20)]
pred = SegmentationPrediction(masks=[mask], confidence=0.9)
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
[pred], basic_image, [], [], [], threshold=0.5, text_prompt="apple", specific_class_id=42
); result = codeflash_output # 79.2μs -> 68.1μs (16.3% faster)
p = result.predictions[0]
def test_empty_prompt_lists_no_text_or_class_id(basic_image):
# If prompt_class_ids is empty, and no text_prompt/specific_class_id, use "foreground" and 0
mask = [(10, 10), (20, 10), (15, 20)]
pred = SegmentationPrediction(masks=[mask], confidence=0.9)
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
[pred], basic_image, [], [], [], threshold=0.5
); result = codeflash_output # 78.2μs -> 68.1μs (14.9% faster)
p = result.predictions[0]
# 2. EDGE TEST CASES
def test_no_predictions_returns_empty(basic_image):
# No segmentation predictions
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
[], basic_image, [], [], [], threshold=0.5
); result = codeflash_output # 21.2μs -> 20.7μs (2.22% faster)
def test_all_masks_filtered_out(basic_image):
# All masks too short or below threshold
mask1 = [(1, 1), (2, 2)] # too short
mask2 = [(10, 10), (20, 10), (15, 20)] # valid but low confidence
pred = SegmentationPrediction(masks=[mask1, mask2], confidence=0.1)
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
[pred], basic_image, [1], ["cat"], ["abc"], threshold=0.5
); result = codeflash_output # 20.7μs -> 19.7μs (4.99% faster)
def test_mask_with_negative_coordinates(basic_image):
# Mask with negative coordinates should still work
mask = [(-10, -10), (-20, -10), (-15, -20)]
pred = SegmentationPrediction(masks=[mask], confidence=0.9)
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
[pred], basic_image, [1], ["neg"], ["neg_id"], threshold=0.5
); result = codeflash_output # 81.1μs -> 69.4μs (16.9% faster)
p = result.predictions[0]
xs = [-10, -20, -15]
ys = [-10, -10, -20]
def test_mask_with_duplicate_points(basic_image):
# Mask with duplicate points
mask = [(5, 5), (5, 5), (10, 10)]
pred = SegmentationPrediction(masks=[mask], confidence=0.9)
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
[pred], basic_image, [1], ["dup"], ["dup_id"], threshold=0.5
); result = codeflash_output # 78.5μs -> 68.6μs (14.4% faster)
p = result.predictions[0]
xs = [5, 5, 10]
ys = [5, 5, 10]
def test_mask_with_non_integer_coordinates(basic_image):
# Mask with float coordinates
mask = [(1.5, 2.5), (3.5, 4.5), (2.5, 5.5)]
pred = SegmentationPrediction(masks=[mask], confidence=0.95)
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
[pred], basic_image, [7], ["float"], ["float_id"], threshold=0.5
); result = codeflash_output # 80.5μs -> 66.8μs (20.5% faster)
p = result.predictions[0]
xs = [1.5, 3.5, 2.5]
ys = [2.5, 4.5, 5.5]
def test_mask_with_large_values(basic_image):
# Mask with very large coordinates
mask = [(100000, 100000), (200000, 100000), (150000, 200000)]
pred = SegmentationPrediction(masks=[mask], confidence=0.99)
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
[pred], basic_image, [999], ["large"], ["large_id"], threshold=0.5
); result = codeflash_output # 78.7μs -> 66.5μs (18.4% faster)
p = result.predictions[0]
xs = [100000, 200000, 150000]
ys = [100000, 100000, 200000]
# 3. LARGE SCALE TEST CASES
def test_many_predictions_and_masks(basic_image):
# Test with 100 predictions, each with 10 valid masks
num_preds = 100
num_masks = 10
preds = []
for i in range(num_preds):
masks = []
for j in range(num_masks):
# Triangle mask
base = i * 10 + j * 5
mask = [(base, base), (base + 2, base), (base + 1, base + 3)]
masks.append(mask)
preds.append(SegmentationPrediction(masks=masks, confidence=0.8))
class_ids = list(range(num_preds))
class_names = [f"class_{i}" for i in range(num_preds)]
detection_ids = [f"id_{i}" for i in range(num_preds)]
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
preds, basic_image, class_ids, class_names, detection_ids, threshold=0.5
); result = codeflash_output # 32.6ms -> 22.4ms (45.6% faster)
# Spot check a few predictions
for idx in [0, 99, 500, 999]:
p = result.predictions[idx]
# Check that points are correct
mask_idx = idx % num_masks
base = (idx // num_masks) * 10 + mask_idx * 5
expected_mask = [(base, base), (base + 2, base), (base + 1, base + 3)]
def test_large_image_dimensions():
# Test with a large image
image = WorkflowImageData(DummyImage((999, 888)))
mask = [(1, 1), (2, 2), (3, 3)]
pred = SegmentationPrediction(masks=[mask], confidence=0.8)
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
[pred], image, [1], ["big"], ["big_id"], threshold=0.5
); result = codeflash_output # 99.0μs -> 85.5μs (15.9% faster)
def test_large_number_of_masks_per_prediction(basic_image):
# One prediction with 1000 masks
masks = []
for i in range(1000):
mask = [(i, i), (i+1, i), (i, i+1)]
masks.append(mask)
pred = SegmentationPrediction(masks=masks, confidence=0.8)
codeflash_output = convert_segmentation_response_to_inference_instances_seg_response(
[pred], basic_image, [1], ["bulk"], ["bulk_id"], threshold=0.5
); result = codeflash_output # 32.3ms -> 22.1ms (46.1% faster)
# Check a few masks
for idx in [0, 500, 999]:
p = result.predictions[idx]
mask = [(idx, idx), (idx+1, idx), (idx, idx+1)]
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.To test or edit this optimization locally git merge codeflash/optimize-pr1647-2025-10-28T23.09.28
Click to see suggested changes
| if len(prompt_class_ids) == 0: | |
| prompt_class_ids = [ | |
| specific_class_id if specific_class_id else 0 | |
| for _ in range(len(segmentation_predictions)) | |
| ] | |
| prompt_class_names = [ | |
| text_prompt if text_prompt else "foreground" | |
| for _ in range(len(segmentation_predictions)) | |
| ] | |
| prompt_detection_ids = [None for _ in range(len(segmentation_predictions))] | |
| for prediction, class_id, class_name, detection_id in zip( | |
| segmentation_predictions, | |
| prompt_class_ids, | |
| prompt_class_names, | |
| prompt_detection_ids, | |
| ): | |
| for mask in prediction.masks: | |
| if len(mask) < 3: | |
| # skipping empty masks | |
| continue | |
| if prediction.confidence < threshold: | |
| # skipping masks below threshold | |
| continue | |
| x_coords = [coord[0] for coord in mask] | |
| y_coords = [coord[1] for coord in mask] | |
| min_x = np.min(x_coords) | |
| max_x = np.max(x_coords) | |
| min_y = np.min(y_coords) | |
| max_y = np.max(y_coords) | |
| center_x = (min_x + max_x) / 2 | |
| center_y = (min_y + max_y) / 2 | |
| predictions.append( | |
| InstanceSegmentationPrediction( | |
| **{ | |
| "x": center_x, | |
| "y": center_y, | |
| "width": max_x - min_x, | |
| "height": max_y - min_y, | |
| "points": [Point(x=point[0], y=point[1]) for point in mask], | |
| num_preds = len(segmentation_predictions) | |
| if len(prompt_class_ids) == 0: | |
| prompt_class_ids = [ | |
| specific_class_id if specific_class_id is not None else 0 | |
| ] * num_preds | |
| prompt_class_names = [ | |
| text_prompt if text_prompt is not None else "foreground" | |
| ] * num_preds | |
| prompt_detection_ids = [None] * num_preds | |
| # Preallocate and reuse numpy arrays for coords extraction | |
| for prediction, class_id, class_name, detection_id in zip( | |
| segmentation_predictions, | |
| prompt_class_ids, | |
| prompt_class_names, | |
| prompt_detection_ids, | |
| ): | |
| # Avoid expensive masking loop for empty/low confidence | |
| if prediction.confidence < threshold: | |
| continue | |
| masks = prediction.masks | |
| for mask in masks: | |
| # mask must be a sequence of 2-tuples, typically short | |
| if len(mask) < 3: | |
| continue | |
| # Efficient coords extraction using numpy for large masks | |
| # Most masks are short, but numpy is still faster for >3 points | |
| # Avoid reinterpreting or copying unless necessary | |
| mask_np = np.asarray(mask, dtype=np.float32) | |
| # shape is (N,2) | |
| # This is only slightly slower for tiny masks, but much faster for large ones | |
| x_coords = mask_np[:, 0] | |
| y_coords = mask_np[:, 1] | |
| min_x = x_coords.min() | |
| max_x = x_coords.max() | |
| min_y = y_coords.min() | |
| max_y = y_coords.max() | |
| center_x = (min_x + max_x) / 2 | |
| center_y = (min_y + max_y) / 2 | |
| # Preallocate Point objects efficiently | |
| # Numba/cython is not appropriate here, .append is fastest for Python objects | |
| points = [ | |
| Point(x=pt[0], y=pt[1]) for pt in mask | |
| ] # cannot avoid loop for custom objects | |
| predictions.append( | |
| InstanceSegmentationPrediction( | |
| **{ | |
| "x": center_x, | |
| "y": center_y, | |
| "width": max_x - min_x, | |
| "height": max_y - min_y, | |
| "points": points, |
inference/core/env.py
Outdated
| SAM_VERSION_ID = os.getenv("SAM_VERSION_ID", "vit_h") | ||
| SAM2_VERSION_ID = os.getenv("SAM2_VERSION_ID", "hiera_large") | ||
|
|
||
| SEG_PREVIEW_ENDPOINT = os.getenv( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update to roboflow API url + /inferences proxy/seg-preview?
SolomonLake
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, tested locally!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| response = requests.post( | ||
| f"{endpoint}?api_key={api_key}", | ||
| json=payload, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Skip appending
api_key=None when no API key is configured
The block’s constructor accepts api_key: Optional[str], so _api_key is often None when the workflow relies on the internal service headers instead of a user key. The request URL is always built as f"{endpoint}?api_key={api_key}"; when no key is provided this produces …?api_key=None, which many backends treat as an explicit (but invalid) API key and return 401. Because the exception handler then returns an empty prediction set, the block never succeeds in deployments that authenticate only via X‑Roboflow‑Internal-* headers. Guard the query parameter so it is omitted whenever _api_key is falsy.
Useful? React with 👍 / 👎.
Description
Adds a workflow block for a new seg preview model
Type of change
Please delete options that are not relevant.
How has this change been tested, please provide a testcase or example of how you tested the change?
running locally
Any specific deployment considerations
meant for model that will be served by roboflow api endpoint
Docs