From b124b8f3a9ca14c49b6edc4ee627f2401e06ac12 Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Tue, 22 Jul 2025 14:52:58 -0400 Subject: [PATCH 1/3] [ML] Use adaptive allocations in test --- .../test/ml/update_trained_model_deployment.yml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/update_trained_model_deployment.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/update_trained_model_deployment.yml index d02585de51e9a..d46b031dadeef 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/update_trained_model_deployment.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/update_trained_model_deployment.yml @@ -6,5 +6,9 @@ model_id: "missing-model" body: > { - "number_of_allocations": 4 + "adaptive_allocations": { + "enabled": true, + "min_number_of_allocations": 0, + "max_number_of_allocations": 1 + } } From 9ddaec47f83408669b5f0db820b66332aac6c5dc Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Wed, 23 Jul 2025 16:52:56 -0400 Subject: [PATCH 2/3] Flag when updates come from the inference endpoint --- .../org/elasticsearch/TransportVersions.java | 1 + .../UpdateTrainedModelDeploymentAction.java | 20 +++++++++++++++++-- .../TransportUpdateInferenceModelAction.java | 2 +- 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 0e96bfd6ebc78..ed125d98645d3 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -351,6 +351,7 @@ static TransportVersion def(int id) { public static final TransportVersion SHARD_WRITE_LOAD_IN_CLUSTER_INFO = def(9_126_0_00); public static final TransportVersion ESQL_SAMPLE_OPERATOR_STATUS = def(9_127_0_00); public static final TransportVersion ESQL_TOPN_TIMINGS = def(9_128_0_00); + public static final TransportVersion INFERENCE_UPDATE_ML = def(9_129_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateTrainedModelDeploymentAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateTrainedModelDeploymentAction.java index 2018c9526ec83..15224a551bdef 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateTrainedModelDeploymentAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateTrainedModelDeploymentAction.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.util.Objects; +import static org.elasticsearch.TransportVersions.INFERENCE_UPDATE_ML; import static org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction.Request.ADAPTIVE_ALLOCATIONS; import static org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction.Request.MODEL_ID; import static org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction.Request.NUMBER_OF_ALLOCATIONS; @@ -74,6 +75,7 @@ public static Request parseRequest(String deploymentId, XContentParser parser) { private Integer numberOfAllocations; private AdaptiveAllocationsSettings adaptiveAllocationsSettings; private boolean isInternal; + private boolean fromInference; private Request() { super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, DEFAULT_ACK_TIMEOUT); @@ -96,6 +98,7 @@ public Request(StreamInput in) throws IOException { adaptiveAllocationsSettings = in.readOptionalWriteable(AdaptiveAllocationsSettings::new); isInternal = in.readBoolean(); } + fromInference = in.getTransportVersion().onOrAfter(INFERENCE_UPDATE_ML) && in.readBoolean(); } public final void setDeploymentId(String deploymentId) { @@ -126,6 +129,15 @@ public void setIsInternal(boolean isInternal) { this.isInternal = isInternal; } + public boolean fromInference() { + return fromInference; + } + + public void setFromInference(boolean fromInference) { + this.fromInference = fromInference; + this.isInternal = fromInference; + } + public AdaptiveAllocationsSettings getAdaptiveAllocationsSettings() { return adaptiveAllocationsSettings; } @@ -141,6 +153,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(adaptiveAllocationsSettings); out.writeBoolean(isInternal); } + if (out.getTransportVersion().onOrAfter(INFERENCE_UPDATE_ML)) { + out.writeBoolean(fromInference); + } } @Override @@ -183,7 +198,7 @@ public ActionRequestValidationException validate() { @Override public int hashCode() { - return Objects.hash(deploymentId, numberOfAllocations, adaptiveAllocationsSettings, isInternal); + return Objects.hash(deploymentId, numberOfAllocations, adaptiveAllocationsSettings, isInternal, fromInference); } @Override @@ -198,7 +213,8 @@ public boolean equals(Object obj) { return Objects.equals(deploymentId, other.deploymentId) && Objects.equals(numberOfAllocations, other.numberOfAllocations) && Objects.equals(adaptiveAllocationsSettings, other.adaptiveAllocationsSettings) - && isInternal == other.isInternal; + && isInternal == other.isInternal + && fromInference == other.fromInference; } @Override diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/TransportUpdateInferenceModelAction.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/TransportUpdateInferenceModelAction.java index 77a370b2ef3dc..a4f8fd0bd5b0e 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/TransportUpdateInferenceModelAction.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/TransportUpdateInferenceModelAction.java @@ -294,7 +294,7 @@ private void updateInClusterEndpoint( var updateRequest = new UpdateTrainedModelDeploymentAction.Request(deploymentId); updateRequest.setNumberOfAllocations(elasticServiceSettings.getNumAllocations()); updateRequest.setAdaptiveAllocationsSettings(elasticServiceSettings.getAdaptiveAllocationsSettings()); - updateRequest.setIsInternal(true); + updateRequest.setFromInference(true); var delegate = listener.delegateFailure((l2, response) -> { modelRegistry.updateModelTransaction(newModel, existingParsedModel, l2); From 0107a7ce1f8a6171b7781fab2591cd7c629102bb Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Tue, 29 Jul 2025 12:16:04 -0400 Subject: [PATCH 3/3] Change booleans to enum --- .../UpdateTrainedModelDeploymentAction.java | 55 +++++++++++-------- .../TransportUpdateInferenceModelAction.java | 2 +- .../AdaptiveAllocationsScalerService.java | 2 +- ...AdaptiveAllocationsScalerServiceTests.java | 6 +- 4 files changed, 36 insertions(+), 29 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateTrainedModelDeploymentAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateTrainedModelDeploymentAction.java index 15224a551bdef..ee8f4c10dd9f2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateTrainedModelDeploymentAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateTrainedModelDeploymentAction.java @@ -33,6 +33,11 @@ import static org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction.Request.NUMBER_OF_ALLOCATIONS; public class UpdateTrainedModelDeploymentAction extends ActionType { + public enum Source { + API, + ADAPTIVE_ALLOCATIONS, + INFERENCE + } public static final UpdateTrainedModelDeploymentAction INSTANCE = new UpdateTrainedModelDeploymentAction(); public static final String NAME = "cluster:admin/xpack/ml/trained_models/deployment/update"; @@ -74,8 +79,7 @@ public static Request parseRequest(String deploymentId, XContentParser parser) { private String deploymentId; private Integer numberOfAllocations; private AdaptiveAllocationsSettings adaptiveAllocationsSettings; - private boolean isInternal; - private boolean fromInference; + private Source source = Source.API; private Request() { super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, DEFAULT_ACK_TIMEOUT); @@ -92,13 +96,18 @@ public Request(StreamInput in) throws IOException { if (in.getTransportVersion().before(TransportVersions.V_8_16_0)) { numberOfAllocations = in.readVInt(); adaptiveAllocationsSettings = null; - isInternal = false; + source = Source.API; } else { numberOfAllocations = in.readOptionalVInt(); adaptiveAllocationsSettings = in.readOptionalWriteable(AdaptiveAllocationsSettings::new); - isInternal = in.readBoolean(); + if (in.getTransportVersion().before(INFERENCE_UPDATE_ML)) { + // we changed over from a boolean to an enum + // when it was a boolean, true came from adaptive allocations and false came from the rest api + source = in.readBoolean() ? Source.ADAPTIVE_ALLOCATIONS : Source.API; + } else { + source = in.readEnum(Source.class); + } } - fromInference = in.getTransportVersion().onOrAfter(INFERENCE_UPDATE_ML) && in.readBoolean(); } public final void setDeploymentId(String deploymentId) { @@ -122,20 +131,15 @@ public void setAdaptiveAllocationsSettings(AdaptiveAllocationsSettings adaptiveA } public boolean isInternal() { - return isInternal; - } - - public void setIsInternal(boolean isInternal) { - this.isInternal = isInternal; + return source == Source.INFERENCE || source == Source.ADAPTIVE_ALLOCATIONS; } - public boolean fromInference() { - return fromInference; + public void setSource(Source source) { + this.source = source != null ? source : this.source; } - public void setFromInference(boolean fromInference) { - this.fromInference = fromInference; - this.isInternal = fromInference; + public Source getSource() { + return source; } public AdaptiveAllocationsSettings getAdaptiveAllocationsSettings() { @@ -151,10 +155,14 @@ public void writeTo(StreamOutput out) throws IOException { } else { out.writeOptionalVInt(numberOfAllocations); out.writeOptionalWriteable(adaptiveAllocationsSettings); - out.writeBoolean(isInternal); - } - if (out.getTransportVersion().onOrAfter(INFERENCE_UPDATE_ML)) { - out.writeBoolean(fromInference); + if (out.getTransportVersion().before(INFERENCE_UPDATE_ML)) { + // we changed over from a boolean to an enum + // when it was a boolean, true came from adaptive allocations and false came from the rest api + // treat "inference" as if it came from the api + out.writeBoolean(isInternal()); + } else { + out.writeEnum(source); + } } } @@ -176,10 +184,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public ActionRequestValidationException validate() { ActionRequestValidationException validationException = new ActionRequestValidationException(); if (numberOfAllocations != null) { - if (numberOfAllocations < 0 || (isInternal == false && numberOfAllocations == 0)) { + if (numberOfAllocations < 0 || (isInternal() == false && numberOfAllocations == 0)) { validationException.addValidationError("[" + NUMBER_OF_ALLOCATIONS + "] must be a positive integer"); } - if (isInternal == false + if (isInternal() == false && adaptiveAllocationsSettings != null && adaptiveAllocationsSettings.getEnabled() == Boolean.TRUE) { validationException.addValidationError( @@ -198,7 +206,7 @@ public ActionRequestValidationException validate() { @Override public int hashCode() { - return Objects.hash(deploymentId, numberOfAllocations, adaptiveAllocationsSettings, isInternal, fromInference); + return Objects.hash(deploymentId, numberOfAllocations, adaptiveAllocationsSettings, source); } @Override @@ -213,8 +221,7 @@ public boolean equals(Object obj) { return Objects.equals(deploymentId, other.deploymentId) && Objects.equals(numberOfAllocations, other.numberOfAllocations) && Objects.equals(adaptiveAllocationsSettings, other.adaptiveAllocationsSettings) - && isInternal == other.isInternal - && fromInference == other.fromInference; + && source == other.source; } @Override diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/TransportUpdateInferenceModelAction.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/TransportUpdateInferenceModelAction.java index a4f8fd0bd5b0e..903b4b59c2cd8 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/TransportUpdateInferenceModelAction.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/TransportUpdateInferenceModelAction.java @@ -294,7 +294,7 @@ private void updateInClusterEndpoint( var updateRequest = new UpdateTrainedModelDeploymentAction.Request(deploymentId); updateRequest.setNumberOfAllocations(elasticServiceSettings.getNumAllocations()); updateRequest.setAdaptiveAllocationsSettings(elasticServiceSettings.getAdaptiveAllocationsSettings()); - updateRequest.setFromInference(true); + updateRequest.setSource(UpdateTrainedModelDeploymentAction.Source.INFERENCE); var delegate = listener.delegateFailure((l2, response) -> { modelRegistry.updateModelTransaction(newModel, existingParsedModel, l2); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerService.java index a7812a5dfa0b3..f8095df922c2c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerService.java @@ -520,7 +520,7 @@ private void updateNumberOfAllocations( ) { UpdateTrainedModelDeploymentAction.Request updateRequest = new UpdateTrainedModelDeploymentAction.Request(deploymentId); updateRequest.setNumberOfAllocations(numberOfAllocations); - updateRequest.setIsInternal(true); + updateRequest.setSource(UpdateTrainedModelDeploymentAction.Source.ADAPTIVE_ALLOCATIONS); ClientHelper.executeAsyncWithOrigin( client, ClientHelper.ML_ORIGIN, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerServiceTests.java index 9e1f0c280ba54..e91e07b9e1532 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerServiceTests.java @@ -229,7 +229,7 @@ public void test_scaleUp() { verify(client, times(1)).execute(eq(GetDeploymentStatsAction.INSTANCE), any(), any()); var updateRequest = new UpdateTrainedModelDeploymentAction.Request("test-deployment"); updateRequest.setNumberOfAllocations(2); - updateRequest.setIsInternal(true); + updateRequest.setSource(UpdateTrainedModelDeploymentAction.Source.ADAPTIVE_ALLOCATIONS); verify(client, times(1)).execute(eq(UpdateTrainedModelDeploymentAction.INSTANCE), eq(updateRequest), any()); verifyNoMoreInteractions(client, clusterService); reset(client, clusterService); @@ -323,7 +323,7 @@ public void test_scaleDownToZero_whenNoRequests() { verify(client, times(1)).execute(eq(GetDeploymentStatsAction.INSTANCE), any(), any()); var updateRequest = new UpdateTrainedModelDeploymentAction.Request("test-deployment"); updateRequest.setNumberOfAllocations(0); - updateRequest.setIsInternal(true); + updateRequest.setSource(UpdateTrainedModelDeploymentAction.Source.ADAPTIVE_ALLOCATIONS); verify(client, times(1)).execute(eq(UpdateTrainedModelDeploymentAction.INSTANCE), eq(updateRequest), any()); verifyNoMoreInteractions(client, clusterService); @@ -468,7 +468,7 @@ public void test_noScaleDownToZero_whenRecentlyScaledUpByOtherNode() { verify(client, times(1)).execute(eq(GetDeploymentStatsAction.INSTANCE), any(), any()); var updateRequest = new UpdateTrainedModelDeploymentAction.Request("test-deployment"); updateRequest.setNumberOfAllocations(0); - updateRequest.setIsInternal(true); + updateRequest.setSource(UpdateTrainedModelDeploymentAction.Source.ADAPTIVE_ALLOCATIONS); verify(client, times(1)).execute(eq(UpdateTrainedModelDeploymentAction.INSTANCE), eq(updateRequest), any()); verifyNoMoreInteractions(client, clusterService);