diff --git a/.env b/.env
index 12e861496..47479a252 100644
--- a/.env
+++ b/.env
@@ -3,7 +3,7 @@ ELASTICSEARCH_CONTAINER_NAME=terraform-elasticstack-es
 ELASTICSEARCH_PASSWORD=password
 ELASTICSEARCH_PORT=9200
 ELASTICSEARCH_URL=http://localhost:${ELASTICSEARCH_PORT}
-ELASTICSEARCH_JAVA_OPTS="-Xms128m -Xmx2g"
+ELASTICSEARCH_JAVA_OPTS="-Xms128m -Xmx1g"
 KIBANA_CONTAINER_NAME=terraform-elasticstack-kb
 KIBANA_SETTINGS_CONTAINER_NAME=terraform-elasticstack-kb-settings
 FLEET_SETTINGS_CONTAINER_NAME=terraform-elasticstack-fleet-settings
diff --git a/docs/resources/elasticsearch_ml_job_state.md b/docs/resources/elasticsearch_ml_job_state.md
new file mode 100644
index 000000000..7e586c7ce
--- /dev/null
+++ b/docs/resources/elasticsearch_ml_job_state.md
@@ -0,0 +1,145 @@
+---
+page_title: "elasticstack_elasticsearch_ml_job_state Resource - terraform-provider-elasticstack"
+subcategory: ""
+description: |-
+  ML Job State Resource
+  Manages the state of an Elasticsearch Machine Learning (ML) job, allowing you to open or close ML jobs.
+  This resource uses the following Elasticsearch APIs:
+  Open ML Job API https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-open-job.htmlClose ML Job API https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-close-job.htmlGet ML Job Stats API https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-get-job-stats.html
+  Important Notes
+  This resource manages the state of an existing ML job, not the job configuration itself.The ML job must already exist before using this resource.Opening a job allows it to receive and process data.Closing a job stops data processing and frees up resources.Jobs can be opened and closed multiple times throughout their lifecycle.
+---
+
+# elasticstack_elasticsearch_ml_job_state (Resource)
+
+# ML Job State Resource
+
+Manages the state of an Elasticsearch Machine Learning (ML) job, allowing you to open or close ML jobs.
+
+This resource uses the following Elasticsearch APIs:
+- [Open ML Job API](https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-open-job.html)
+- [Close ML Job API](https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-close-job.html)
+- [Get ML Job Stats API](https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-get-job-stats.html)
+
+## Important Notes
+
+- This resource manages the **state** of an existing ML job, not the job configuration itself.
+- The ML job must already exist before using this resource.
+- Opening a job allows it to receive and process data.
+- Closing a job stops data processing and frees up resources.
+- Jobs can be opened and closed multiple times throughout their lifecycle.
+
+## Example Usage
+
+```terraform
+provider "elasticstack" {
+  elasticsearch {}
+}
+
+# First create an ML anomaly detection job
+resource "elasticstack_elasticsearch_ml_anomaly_detector" "example" {
+  job_id      = "example-ml-job"
+  description = "Example anomaly detection job"
+
+  analysis_config = {
+    bucket_span = "15m"
+    detectors = [
+      {
+        function             = "count"
+        detector_description = "Count detector"
+      }
+    ]
+  }
+
+  data_description = {
+    time_field  = "@timestamp"
+    time_format = "epoch_ms"
+  }
+}
+
+# Manage the state of the ML job - open it
+resource "elasticstack_elasticsearch_ml_job_state" "example" {
+  job_id = elasticstack_elasticsearch_ml_anomaly_detector.example.job_id
+  state  = "opened"
+
+  # Optional settings
+  force       = false
+  job_timeout = "30s"
+
+  # Timeouts for asynchronous operations
+  timeouts {
+    create = "5m"
+    update = "5m"
+  }
+
+  depends_on = [elasticstack_elasticsearch_ml_anomaly_detector.example]
+}
+
+# Example with different configuration options
+resource "elasticstack_elasticsearch_ml_job_state" "example_with_options" {
+  job_id = elasticstack_elasticsearch_ml_anomaly_detector.example.job_id
+  state  = "closed"
+
+  # Use force close for quicker shutdown
+  force = true
+
+  # Custom timeout
+  job_timeout = "2m"
+
+  # Custom timeouts for asynchronous operations  
+  timeouts {
+    create = "10m"
+    update = "3m"
+  }
+
+  depends_on = [elasticstack_elasticsearch_ml_anomaly_detector.example]
+}
+```
+
+
+## Schema
+
+### Required
+
+- `job_id` (String) Identifier for the anomaly detection job.
+- `state` (String) The desired state for the ML job. Valid values are `opened` and `closed`.
+
+### Optional
+
+- `elasticsearch_connection` (Block List, Deprecated) Elasticsearch connection configuration block. (see [below for nested schema](#nestedblock--elasticsearch_connection))
+- `force` (Boolean) When closing a job, use to forcefully close it. This method is quicker but can miss important clean up tasks.
+- `job_timeout` (String) Timeout for the operation. Examples: `30s`, `5m`, `1h`. Default is `30s`.
+- `timeouts` (Attributes) (see [below for nested schema](#nestedatt--timeouts))
+
+### Read-Only
+
+- `id` (String) Internal identifier of the resource
+
+
+### Nested Schema for `elasticsearch_connection`
+
+Optional:
+
+- `api_key` (String, Sensitive) API Key to use for authentication to Elasticsearch
+- `bearer_token` (String, Sensitive) Bearer Token to use for authentication to Elasticsearch
+- `ca_data` (String) PEM-encoded custom Certificate Authority certificate
+- `ca_file` (String) Path to a custom Certificate Authority certificate
+- `cert_data` (String) PEM encoded certificate for client auth
+- `cert_file` (String) Path to a file containing the PEM encoded certificate for client auth
+- `endpoints` (List of String, Sensitive) A list of endpoints where the terraform provider will point to, this must include the http(s) schema and port number.
+- `es_client_authentication` (String, Sensitive) ES Client Authentication field to be used with the JWT token
+- `headers` (Map of String, Sensitive) A list of headers to be sent with each request to Elasticsearch.
+- `insecure` (Boolean) Disable TLS certificate validation
+- `key_data` (String, Sensitive) PEM encoded private key for client auth
+- `key_file` (String) Path to a file containing the PEM encoded private key for client auth
+- `password` (String, Sensitive) Password to use for API authentication to Elasticsearch.
+- `username` (String) Username to use for API authentication to Elasticsearch.
+
+
+
+### Nested Schema for `timeouts`
+
+Optional:
+
+- `create` (String) A string that can be [parsed as a duration](https://pkg.go.dev/time#ParseDuration) consisting of numbers and unit suffixes, such as "30s" or "2h45m". Valid time units are "s" (seconds), "m" (minutes), "h" (hours).
+- `update` (String) A string that can be [parsed as a duration](https://pkg.go.dev/time#ParseDuration) consisting of numbers and unit suffixes, such as "30s" or "2h45m". Valid time units are "s" (seconds), "m" (minutes), "h" (hours).
\ No newline at end of file
diff --git a/examples/resources/elasticstack_elasticsearch_ml_job_state/resource.tf b/examples/resources/elasticstack_elasticsearch_ml_job_state/resource.tf
new file mode 100644
index 000000000..1bfc44c00
--- /dev/null
+++ b/examples/resources/elasticstack_elasticsearch_ml_job_state/resource.tf
@@ -0,0 +1,62 @@
+provider "elasticstack" {
+  elasticsearch {}
+}
+
+# First create an ML anomaly detection job
+resource "elasticstack_elasticsearch_ml_anomaly_detector" "example" {
+  job_id      = "example-ml-job"
+  description = "Example anomaly detection job"
+
+  analysis_config = {
+    bucket_span = "15m"
+    detectors = [
+      {
+        function             = "count"
+        detector_description = "Count detector"
+      }
+    ]
+  }
+
+  data_description = {
+    time_field  = "@timestamp"
+    time_format = "epoch_ms"
+  }
+}
+
+# Manage the state of the ML job - open it
+resource "elasticstack_elasticsearch_ml_job_state" "example" {
+  job_id = elasticstack_elasticsearch_ml_anomaly_detector.example.job_id
+  state  = "opened"
+
+  # Optional settings
+  force       = false
+  job_timeout = "30s"
+
+  # Timeouts for asynchronous operations
+  timeouts {
+    create = "5m"
+    update = "5m"
+  }
+
+  depends_on = [elasticstack_elasticsearch_ml_anomaly_detector.example]
+}
+
+# Example with different configuration options
+resource "elasticstack_elasticsearch_ml_job_state" "example_with_options" {
+  job_id = elasticstack_elasticsearch_ml_anomaly_detector.example.job_id
+  state  = "closed"
+
+  # Use force close for quicker shutdown
+  force = true
+
+  # Custom timeout
+  job_timeout = "2m"
+
+  # Custom timeouts for asynchronous operations  
+  timeouts {
+    create = "10m"
+    update = "3m"
+  }
+
+  depends_on = [elasticstack_elasticsearch_ml_anomaly_detector.example]
+}
\ No newline at end of file
diff --git a/go.mod b/go.mod
index 7cae924bf..36102e7b5 100644
--- a/go.mod
+++ b/go.mod
@@ -13,6 +13,7 @@ require (
 	github.com/hashicorp/go-version v1.7.0
 	github.com/hashicorp/terraform-plugin-framework v1.16.1
 	github.com/hashicorp/terraform-plugin-framework-jsontypes v0.2.0
+	github.com/hashicorp/terraform-plugin-framework-timeouts v0.6.0
 	github.com/hashicorp/terraform-plugin-framework-validators v0.19.0
 	github.com/hashicorp/terraform-plugin-go v0.29.0
 	github.com/hashicorp/terraform-plugin-log v0.9.0
diff --git a/go.sum b/go.sum
index 9428b20b0..5991270a8 100644
--- a/go.sum
+++ b/go.sum
@@ -379,8 +379,6 @@ github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f/go.mod h1:vw97
 github.com/evanphx/json-patch/v5 v5.9.11 h1:/8HVnzMq13/3x9TPvjG08wUGqBTmZBsCWzjTM0wiaDU=
 github.com/evanphx/json-patch/v5 v5.9.11/go.mod h1:3j+LviiESTElxA4p3EMKAB9HXj3/XEtnUf6OZxqIQTM=
 github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
-github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4=
-github.com/fatih/color v1.17.0/go.mod h1:YZ7TlrGPkiz6ku9fK3TLD/pl3CpsiFyu8N92HLgmosI=
 github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM=
 github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU=
 github.com/fatih/set v0.2.1 h1:nn2CaJyknWE/6txyUDGwysr3G5QC6xWB/PtVjPBbeaA=
@@ -617,6 +615,8 @@ github.com/hashicorp/terraform-plugin-framework v1.16.1 h1:1+zwFm3MEqd/0K3YBB2v9
 github.com/hashicorp/terraform-plugin-framework v1.16.1/go.mod h1:0xFOxLy5lRzDTayc4dzK/FakIgBhNf/lC4499R9cV4Y=
 github.com/hashicorp/terraform-plugin-framework-jsontypes v0.2.0 h1:SJXL5FfJJm17554Kpt9jFXngdM6fXbnUnZ6iT2IeiYA=
 github.com/hashicorp/terraform-plugin-framework-jsontypes v0.2.0/go.mod h1:p0phD0IYhsu9bR4+6OetVvvH59I6LwjXGnTVEr8ox6E=
+github.com/hashicorp/terraform-plugin-framework-timeouts v0.6.0 h1:Vv16e7EW4nT9668IV0RhdpEmnLl0im7BZx6J+QMlUkg=
+github.com/hashicorp/terraform-plugin-framework-timeouts v0.6.0/go.mod h1:rpHo9hZLn4vEkvNL5xsSdLRdaDZKSinuc0xL+BdOpVA=
 github.com/hashicorp/terraform-plugin-framework-validators v0.19.0 h1:Zz3iGgzxe/1XBkooZCewS0nJAaCFPFPHdNJd8FgE4Ow=
 github.com/hashicorp/terraform-plugin-framework-validators v0.19.0/go.mod h1:GBKTNGbGVJohU03dZ7U8wHqc2zYnMUawgCN+gC0itLc=
 github.com/hashicorp/terraform-plugin-go v0.29.0 h1:1nXKl/nSpaYIUBU1IG/EsDOX0vv+9JxAltQyDMpq5mU=
@@ -1377,9 +1377,8 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
 gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 gotest.tools/gotestsum v1.13.0 h1:+Lh454O9mu9AMG1APV4o0y7oDYKyik/3kBOiCqiEpRo=
 gotest.tools/gotestsum v1.13.0/go.mod h1:7f0NS5hFb0dWr4NtcsAsF0y1kzjEFfAil0HiBQJE03Q=
-gotest.tools/v3 v3.0.3 h1:4AuOwCGf4lLR9u3YOe2awrHygurzhO/HeQ6laiA6Sx0=
-gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8=
 gotest.tools/v3 v3.5.2 h1:7koQfIKdy+I8UTetycgUqXWSDwpgv193Ka+qRsmBY8Q=
+gotest.tools/v3 v3.5.2/go.mod h1:LtdLGcnqToBH83WByAAi/wiwSFCArdFIUV/xxN4pcjA=
 honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
 honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
 honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
diff --git a/internal/clients/elasticsearch/ml_job.go b/internal/clients/elasticsearch/ml_job.go
index 3c8747ca4..fc51037cd 100644
--- a/internal/clients/elasticsearch/ml_job.go
+++ b/internal/clients/elasticsearch/ml_job.go
@@ -15,6 +15,47 @@ import (
 	"github.com/hashicorp/terraform-plugin-framework/diag"
 )
 
+// MLJobStats represents the statistics structure for an ML job
+type MLJobStats struct {
+	Jobs []MLJob `json:"jobs"`
+}
+
+// MLJob represents a single ML job in the stats response
+type MLJob struct {
+	JobId string     `json:"job_id"`
+	State string     `json:"state"`
+	Node  *MLJobNode `json:"node,omitempty"`
+}
+
+// MLJobNode represents the node information for an ML job
+type MLJobNode struct {
+	Id         string                 `json:"id"`
+	Name       string                 `json:"name"`
+	Attributes map[string]interface{} `json:"attributes"`
+}
+
+// OpenMLJob opens a machine learning job
+func OpenMLJob(ctx context.Context, apiClient *clients.ApiClient, jobId string) diag.Diagnostics {
+	var diags diag.Diagnostics
+
+	esClient, err := apiClient.GetESClient()
+	if err != nil {
+		diags.AddError("Failed to get Elasticsearch client", err.Error())
+		return diags
+	}
+
+	res, err := esClient.ML.OpenJob(jobId, esClient.ML.OpenJob.WithContext(ctx))
+	if err != nil {
+		diags.AddError("Failed to open ML job", err.Error())
+		return diags
+	}
+	defer res.Body.Close()
+	fwDiags := diagutil.CheckErrorFromFW(res, fmt.Sprintf("Unable to open ML job: %s", jobId))
+	diags.Append(fwDiags...)
+
+	return diags
+}
+
 // PutDatafeed creates a machine learning datafeed
 func PutDatafeed(ctx context.Context, apiClient *clients.ApiClient, datafeedId string, createRequest models.DatafeedCreateRequest) diag.Diagnostics {
 	var diags diag.Diagnostics
@@ -45,6 +86,86 @@ func PutDatafeed(ctx context.Context, apiClient *clients.ApiClient, datafeedId s
 	return diags
 }
 
+// CloseMLJob closes a machine learning job
+func CloseMLJob(ctx context.Context, apiClient *clients.ApiClient, jobId string, force bool, timeout time.Duration) diag.Diagnostics {
+	var diags diag.Diagnostics
+
+	esClient, err := apiClient.GetESClient()
+	if err != nil {
+		diags.AddError("Failed to get Elasticsearch client", err.Error())
+		return diags
+	}
+
+	options := []func(*esapi.MLCloseJobRequest){
+		esClient.ML.CloseJob.WithContext(ctx),
+		esClient.ML.CloseJob.WithForce(force),
+		esClient.ML.CloseJob.WithAllowNoMatch(true),
+	}
+
+	if timeout > 0 {
+		options = append(options, esClient.ML.CloseJob.WithTimeout(timeout))
+	}
+
+	res, err := esClient.ML.CloseJob(jobId, options...)
+	if err != nil {
+		diags.AddError("Failed to close ML job", err.Error())
+		return diags
+	}
+	defer res.Body.Close()
+
+	fwDiags := diagutil.CheckErrorFromFW(res, fmt.Sprintf("Unable to close ML job: %s", jobId))
+	diags.Append(fwDiags...)
+
+	return diags
+}
+
+// GetMLJobStats retrieves the stats for a specific machine learning job
+func GetMLJobStats(ctx context.Context, apiClient *clients.ApiClient, jobId string) (*MLJob, diag.Diagnostics) {
+	var diags diag.Diagnostics
+
+	esClient, err := apiClient.GetESClient()
+	if err != nil {
+		diags.AddError("Failed to get Elasticsearch client", err.Error())
+		return nil, diags
+	}
+	options := []func(*esapi.MLGetJobStatsRequest){
+		esClient.ML.GetJobStats.WithContext(ctx),
+		esClient.ML.GetJobStats.WithJobID(jobId),
+		esClient.ML.GetJobStats.WithAllowNoMatch(true),
+	}
+
+	res, err := esClient.ML.GetJobStats(options...)
+	if err != nil {
+		diags.AddError("Failed to get ML job stats", err.Error())
+		return nil, diags
+	}
+	defer res.Body.Close()
+
+	if res.StatusCode == http.StatusNotFound {
+		return nil, diags
+	}
+	diags.Append(diagutil.CheckErrorFromFW(res, fmt.Sprintf("Unable to get ML job stats: %s", jobId))...)
+	if diags.HasError() {
+		return nil, diags
+	}
+
+	var jobStats MLJobStats
+	if err := json.NewDecoder(res.Body).Decode(&jobStats); err != nil {
+		diags.AddError("Failed to decode ML job stats response", err.Error())
+		return nil, diags
+	}
+
+	// Find the specific job in the response
+	for _, job := range jobStats.Jobs {
+		if job.JobId == jobId {
+			return &job, diags
+		}
+	}
+
+	// Job not found in response
+	return nil, diags
+}
+
 // GetDatafeed retrieves a machine learning datafeed
 func GetDatafeed(ctx context.Context, apiClient *clients.ApiClient, datafeedId string) (*models.Datafeed, diag.Diagnostics) {
 	var diags diag.Diagnostics
@@ -72,8 +193,7 @@ func GetDatafeed(ctx context.Context, apiClient *clients.ApiClient, datafeedId s
 		return nil, diags
 	}
 
-	fwDiags := diagutil.CheckErrorFromFW(res, fmt.Sprintf("Unable to get ML datafeed: %s", datafeedId))
-	diags.Append(fwDiags...)
+	diags.Append(diagutil.CheckErrorFromFW(res, fmt.Sprintf("Unable to get ML datafeed: %s", datafeedId))...)
 	if diags.HasError() {
 		return nil, diags
 	}
@@ -120,8 +240,7 @@ func UpdateDatafeed(ctx context.Context, apiClient *clients.ApiClient, datafeedI
 	}
 	defer res.Body.Close()
 
-	fwDiags := diagutil.CheckErrorFromFW(res, fmt.Sprintf("Unable to update ML datafeed: %s", datafeedId))
-	diags.Append(fwDiags...)
+	diags.Append(diagutil.CheckErrorFromFW(res, fmt.Sprintf("Unable to update ML datafeed: %s", datafeedId))...)
 
 	return diags
 }
@@ -148,8 +267,7 @@ func DeleteDatafeed(ctx context.Context, apiClient *clients.ApiClient, datafeedI
 	}
 	defer res.Body.Close()
 
-	fwDiags := diagutil.CheckErrorFromFW(res, fmt.Sprintf("Unable to delete ML datafeed: %s", datafeedId))
-	diags.Append(fwDiags...)
+	diags.Append(diagutil.CheckErrorFromFW(res, fmt.Sprintf("Unable to delete ML datafeed: %s", datafeedId))...)
 
 	return diags
 }
@@ -181,8 +299,7 @@ func StopDatafeed(ctx context.Context, apiClient *clients.ApiClient, datafeedId
 	}
 	defer res.Body.Close()
 
-	fwDiags := diagutil.CheckErrorFromFW(res, fmt.Sprintf("Unable to stop ML datafeed: %s", datafeedId))
-	diags.Append(fwDiags...)
+	diags.Append(diagutil.CheckErrorFromFW(res, fmt.Sprintf("Unable to stop ML datafeed: %s", datafeedId))...)
 
 	return diags
 }
@@ -220,8 +337,7 @@ func StartDatafeed(ctx context.Context, apiClient *clients.ApiClient, datafeedId
 	}
 	defer res.Body.Close()
 
-	fwDiags := diagutil.CheckErrorFromFW(res, fmt.Sprintf("Unable to start ML datafeed: %s", datafeedId))
-	diags.Append(fwDiags...)
+	diags.Append(diagutil.CheckErrorFromFW(res, fmt.Sprintf("Unable to start ML datafeed: %s", datafeedId))...)
 
 	return diags
 }
@@ -253,8 +369,7 @@ func GetDatafeedStats(ctx context.Context, apiClient *clients.ApiClient, datafee
 		return nil, diags
 	}
 
-	fwDiags := diagutil.CheckErrorFromFW(res, fmt.Sprintf("Unable to get ML datafeed stats: %s", datafeedId))
-	diags.Append(fwDiags...)
+	diags.Append(diagutil.CheckErrorFromFW(res, fmt.Sprintf("Unable to get ML datafeed stats: %s", datafeedId))...)
 	if diags.HasError() {
 		return nil, diags
 	}
diff --git a/internal/elasticsearch/ml/job_state/acc_test.go b/internal/elasticsearch/ml/job_state/acc_test.go
new file mode 100644
index 000000000..aa91e2e5e
--- /dev/null
+++ b/internal/elasticsearch/ml/job_state/acc_test.go
@@ -0,0 +1,117 @@
+package job_state_test
+
+import (
+	"fmt"
+	"regexp"
+	"testing"
+
+	"github.com/elastic/terraform-provider-elasticstack/internal/acctest"
+	"github.com/hashicorp/terraform-plugin-testing/config"
+	sdkacctest "github.com/hashicorp/terraform-plugin-testing/helper/acctest"
+	"github.com/hashicorp/terraform-plugin-testing/helper/resource"
+	"github.com/hashicorp/terraform-plugin-testing/terraform"
+)
+
+func TestAccResourceMLJobState(t *testing.T) {
+	jobID := fmt.Sprintf("test-ml-job-state-%s", sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum))
+
+	resource.Test(t, resource.TestCase{
+		PreCheck: func() { acctest.PreCheck(t) },
+		Steps: []resource.TestStep{
+			{
+				ProtoV6ProviderFactories: acctest.Providers,
+				ConfigDirectory:          acctest.NamedTestCaseDirectory("opened"),
+				ConfigVariables: config.Variables{
+					"job_id": config.StringVariable(jobID),
+				},
+				Check: resource.ComposeTestCheckFunc(
+					resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_job_state.test", "job_id", jobID),
+					resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_job_state.test", "state", "opened"),
+					resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_job_state.test", "force", "false"),
+					resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_job_state.test", "job_timeout", "30s"),
+					resource.TestCheckResourceAttrSet("elasticstack_elasticsearch_ml_job_state.test", "id"),
+					// Verify that the ML job was created by the anomaly detector resource
+					resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_anomaly_detection_job.test", "job_id", jobID),
+				),
+			},
+			{
+				ProtoV6ProviderFactories: acctest.Providers,
+				ConfigDirectory:          acctest.NamedTestCaseDirectory("closed"),
+				ConfigVariables: config.Variables{
+					"job_id": config.StringVariable(jobID),
+				},
+				Check: resource.ComposeTestCheckFunc(
+					resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_job_state.test", "job_id", jobID),
+					resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_job_state.test", "state", "closed"),
+					resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_job_state.test", "force", "false"),
+					resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_job_state.test", "job_timeout", "30s"),
+					resource.TestCheckResourceAttrSet("elasticstack_elasticsearch_ml_job_state.test", "id"),
+				),
+			},
+			{
+				ProtoV6ProviderFactories: acctest.Providers,
+				ConfigDirectory:          acctest.NamedTestCaseDirectory("opened_with_options"),
+				ConfigVariables: config.Variables{
+					"job_id":      config.StringVariable(jobID),
+					"force":       config.BoolVariable(true),
+					"job_timeout": config.StringVariable("1m"),
+				},
+				Check: resource.ComposeTestCheckFunc(
+					resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_job_state.test", "job_id", jobID),
+					resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_job_state.test", "state", "opened"),
+					resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_job_state.test", "force", "true"),
+					resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_job_state.test", "job_timeout", "1m"),
+					resource.TestCheckResourceAttrSet("elasticstack_elasticsearch_ml_job_state.test", "id"),
+				),
+			},
+		},
+	})
+}
+
+func TestAccResourceMLJobStateNonExistent(t *testing.T) {
+	resource.Test(t, resource.TestCase{
+		PreCheck: func() { acctest.PreCheck(t) },
+		Steps: []resource.TestStep{
+			{
+				ProtoV6ProviderFactories: acctest.Providers,
+				ConfigDirectory:          acctest.NamedTestCaseDirectory("non_existent"),
+				ExpectError:              regexp.MustCompile(`ML job .* does not exist`),
+			},
+		},
+	})
+}
+
+func TestAccResourceMLJobStateImport(t *testing.T) {
+	jobID := fmt.Sprintf("test-ml-job-state-import-%s", sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum))
+
+	resource.Test(t, resource.TestCase{
+		PreCheck: func() { acctest.PreCheck(t) },
+		Steps: []resource.TestStep{
+			{
+				ProtoV6ProviderFactories: acctest.Providers,
+				ConfigDirectory:          acctest.NamedTestCaseDirectory("opened"),
+				ConfigVariables: config.Variables{
+					"job_id": config.StringVariable(jobID),
+				},
+				Check: resource.ComposeTestCheckFunc(
+					resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_job_state.test", "job_id", jobID),
+					resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_job_state.test", "state", "opened"),
+				),
+			},
+			{
+				ProtoV6ProviderFactories: acctest.Providers,
+				ConfigDirectory:          acctest.NamedTestCaseDirectory("opened"),
+				ConfigVariables: config.Variables{
+					"job_id": config.StringVariable(jobID),
+				},
+				ResourceName:      "elasticstack_elasticsearch_ml_job_state.test",
+				ImportState:       true,
+				ImportStateVerify: true,
+				ImportStateIdFunc: func(s *terraform.State) (string, error) {
+					rs := s.RootModule().Resources["elasticstack_elasticsearch_ml_job_state.test"]
+					return rs.Primary.ID, nil
+				},
+			},
+		},
+	})
+}
diff --git a/internal/elasticsearch/ml/job_state/create.go b/internal/elasticsearch/ml/job_state/create.go
new file mode 100644
index 000000000..10e7914cd
--- /dev/null
+++ b/internal/elasticsearch/ml/job_state/create.go
@@ -0,0 +1,27 @@
+package job_state
+
+import (
+	"context"
+	"time"
+
+	"github.com/hashicorp/terraform-plugin-framework/resource"
+)
+
+func (r *mlJobStateResource) Create(ctx context.Context, req resource.CreateRequest, resp *resource.CreateResponse) {
+	var data MLJobStateData
+	diags := req.Plan.Get(ctx, &data)
+	resp.Diagnostics.Append(diags...)
+	if resp.Diagnostics.HasError() {
+		return
+	}
+
+	// Get create timeout
+	createTimeout, fwDiags := data.Timeouts.Create(ctx, 5*time.Minute) // Default 5 minutes
+	resp.Diagnostics.Append(fwDiags...)
+	if resp.Diagnostics.HasError() {
+		return
+	}
+
+	diags = r.update(ctx, req.Plan, &resp.State, createTimeout)
+	resp.Diagnostics.Append(diags...)
+}
diff --git a/internal/elasticsearch/ml/job_state/delete.go b/internal/elasticsearch/ml/job_state/delete.go
new file mode 100644
index 000000000..fa8457cc6
--- /dev/null
+++ b/internal/elasticsearch/ml/job_state/delete.go
@@ -0,0 +1,21 @@
+package job_state
+
+import (
+	"context"
+	"fmt"
+
+	"github.com/hashicorp/terraform-plugin-framework/path"
+	"github.com/hashicorp/terraform-plugin-framework/resource"
+	"github.com/hashicorp/terraform-plugin-framework/types/basetypes"
+	"github.com/hashicorp/terraform-plugin-log/tflog"
+)
+
+func (r *mlJobStateResource) Delete(ctx context.Context, req resource.DeleteRequest, resp *resource.DeleteResponse) {
+	// ML job state resource only manages the state, not the job itself.
+	// When the resource is deleted, we simply remove it from Terraform state
+	// without affecting the actual ML job state in Elasticsearch.
+	// The job will remain in its current state (opened or closed).
+	var jobId basetypes.StringValue
+	resp.Diagnostics.Append(req.State.GetAttribute(ctx, path.Root("job_id"), &jobId)...)
+	tflog.Info(ctx, fmt.Sprintf(`Dropping ML job state "%s", this does not close the job`, jobId.ValueString()))
+}
diff --git a/internal/elasticsearch/ml/job_state/models.go b/internal/elasticsearch/ml/job_state/models.go
new file mode 100644
index 000000000..fd9638343
--- /dev/null
+++ b/internal/elasticsearch/ml/job_state/models.go
@@ -0,0 +1,36 @@
+package job_state
+
+import (
+	"github.com/elastic/terraform-provider-elasticstack/internal/utils/customtypes"
+	"github.com/hashicorp/terraform-plugin-framework-timeouts/resource/timeouts"
+	"github.com/hashicorp/terraform-plugin-framework/types"
+)
+
+type MLJobStateData struct {
+	Id                      types.String         `tfsdk:"id"`
+	ElasticsearchConnection types.List           `tfsdk:"elasticsearch_connection"`
+	JobId                   types.String         `tfsdk:"job_id"`
+	State                   types.String         `tfsdk:"state"`
+	Force                   types.Bool           `tfsdk:"force"`
+	Timeout                 customtypes.Duration `tfsdk:"job_timeout"`
+	Timeouts                timeouts.Value       `tfsdk:"timeouts"`
+}
+
+// MLJobStats represents the statistics structure for an ML job
+type MLJobStats struct {
+	Jobs []MLJob `json:"jobs"`
+}
+
+// MLJob represents a single ML job in the stats response
+type MLJob struct {
+	JobId string     `json:"job_id"`
+	State string     `json:"state"`
+	Node  *MLJobNode `json:"node,omitempty"`
+}
+
+// MLJobNode represents the node information for an ML job
+type MLJobNode struct {
+	Id         string                 `json:"id"`
+	Name       string                 `json:"name"`
+	Attributes map[string]interface{} `json:"attributes"`
+}
diff --git a/internal/elasticsearch/ml/job_state/read.go b/internal/elasticsearch/ml/job_state/read.go
new file mode 100644
index 000000000..4dc2d46d7
--- /dev/null
+++ b/internal/elasticsearch/ml/job_state/read.go
@@ -0,0 +1,54 @@
+package job_state
+
+import (
+	"context"
+	"fmt"
+
+	"github.com/elastic/terraform-provider-elasticstack/internal/clients"
+	"github.com/elastic/terraform-provider-elasticstack/internal/utils/customtypes"
+	"github.com/hashicorp/terraform-plugin-framework/resource"
+	"github.com/hashicorp/terraform-plugin-framework/types"
+	"github.com/hashicorp/terraform-plugin-log/tflog"
+)
+
+func (r *mlJobStateResource) Read(ctx context.Context, req resource.ReadRequest, resp *resource.ReadResponse) {
+	var data MLJobStateData
+	resp.Diagnostics.Append(req.State.Get(ctx, &data)...)
+	if resp.Diagnostics.HasError() {
+		return
+	}
+
+	compId, diags := clients.CompositeIdFromStrFw(data.Id.ValueString())
+	resp.Diagnostics.Append(diags...)
+	if resp.Diagnostics.HasError() {
+		return
+	}
+
+	// Get job stats to check current state
+	jobId := compId.ResourceId
+	currentState, diags := r.getJobState(ctx, jobId)
+	resp.Diagnostics.Append(diags...)
+	if resp.Diagnostics.HasError() {
+		return
+	}
+
+	if currentState == nil {
+		tflog.Warn(ctx, fmt.Sprintf(`ML job "%s" not found, removing from state`, jobId))
+		resp.State.RemoveResource(ctx)
+		return
+	}
+
+	// Update the state with current job information
+	data.JobId = types.StringValue(jobId)
+	data.State = types.StringValue(*currentState)
+
+	// Set defaults for computed attributes if they're not already set (e.g., during import)
+	if data.Force.IsNull() {
+		data.Force = types.BoolValue(false)
+	}
+	if data.Timeout.IsNull() {
+		data.Timeout = customtypes.NewDurationValue("30s")
+	}
+
+	resp.Diagnostics.Append(resp.State.Set(ctx, &data)...)
+}
diff --git a/internal/elasticsearch/ml/job_state/resource-description.md b/internal/elasticsearch/ml/job_state/resource-description.md
new file mode 100644
index 000000000..6da15e32a
--- /dev/null
+++ b/internal/elasticsearch/ml/job_state/resource-description.md
@@ -0,0 +1,16 @@
+# ML Job State Resource
+
+Manages the state of an Elasticsearch Machine Learning (ML) job, allowing you to open or close ML jobs.
+
+This resource uses the following Elasticsearch APIs:
+- [Open ML Job API](https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-open-job.html)
+- [Close ML Job API](https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-close-job.html)
+- [Get ML Job Stats API](https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-get-job-stats.html)
+
+## Important Notes
+
+- This resource manages the **state** of an existing ML job, not the job configuration itself.
+- The ML job must already exist before using this resource.
+- Opening a job allows it to receive and process data.
+- Closing a job stops data processing and frees up resources.
+- Jobs can be opened and closed multiple times throughout their lifecycle.
\ No newline at end of file
diff --git a/internal/elasticsearch/ml/job_state/resource.go b/internal/elasticsearch/ml/job_state/resource.go
new file mode 100644
index 000000000..0067a7689
--- /dev/null
+++ b/internal/elasticsearch/ml/job_state/resource.go
@@ -0,0 +1,32 @@
+package job_state
+
+import (
+	"context"
+
+	"github.com/elastic/terraform-provider-elasticstack/internal/clients"
+	"github.com/hashicorp/terraform-plugin-framework/path"
+	"github.com/hashicorp/terraform-plugin-framework/resource"
+)
+
+func NewMLJobStateResource() resource.Resource {
+	return &mlJobStateResource{}
+}
+
+type mlJobStateResource struct {
+	client *clients.ApiClient
+}
+
+func (r *mlJobStateResource) Metadata(_ context.Context, req resource.MetadataRequest, resp *resource.MetadataResponse) {
+	resp.TypeName = req.ProviderTypeName + "_elasticsearch_ml_job_state"
+}
+
+func (r *mlJobStateResource) Configure(_ context.Context, req resource.ConfigureRequest, resp *resource.ConfigureResponse) {
+	client, diags := clients.ConvertProviderData(req.ProviderData)
+	resp.Diagnostics.Append(diags...)
+	r.client = client
+}
+
+func (r *mlJobStateResource) ImportState(ctx context.Context, req resource.ImportStateRequest, resp *resource.ImportStateResponse) {
+	// Retrieve import ID and save to id attribute
+	resource.ImportStatePassthroughID(ctx, path.Root("id"), req, resp)
+}
diff --git a/internal/elasticsearch/ml/job_state/schema.go b/internal/elasticsearch/ml/job_state/schema.go
new file mode 100644
index 000000000..a5ddb67d2
--- /dev/null
+++ b/internal/elasticsearch/ml/job_state/schema.go
@@ -0,0 +1,80 @@
+package job_state
+
+import (
+	"context"
+	_ "embed"
+	"regexp"
+
+	"github.com/elastic/terraform-provider-elasticstack/internal/utils/customtypes"
+	"github.com/hashicorp/terraform-plugin-framework-timeouts/resource/timeouts"
+	"github.com/hashicorp/terraform-plugin-framework-validators/stringvalidator"
+	"github.com/hashicorp/terraform-plugin-framework/resource"
+	"github.com/hashicorp/terraform-plugin-framework/resource/schema"
+	"github.com/hashicorp/terraform-plugin-framework/resource/schema/booldefault"
+	"github.com/hashicorp/terraform-plugin-framework/resource/schema/planmodifier"
+	"github.com/hashicorp/terraform-plugin-framework/resource/schema/stringdefault"
+	"github.com/hashicorp/terraform-plugin-framework/resource/schema/stringplanmodifier"
+	"github.com/hashicorp/terraform-plugin-framework/schema/validator"
+
+	providerschema "github.com/elastic/terraform-provider-elasticstack/internal/schema"
+)
+
+//go:embed resource-description.md
+var mlJobStateResourceDescription string
+
+func (r *mlJobStateResource) Schema(_ context.Context, _ resource.SchemaRequest, resp *resource.SchemaResponse) {
+	resp.Schema = GetSchema()
+}
+
+func GetSchema() schema.Schema {
+	return schema.Schema{
+		MarkdownDescription: mlJobStateResourceDescription,
+		Blocks: map[string]schema.Block{
+			"elasticsearch_connection": providerschema.GetEsFWConnectionBlock("elasticsearch_connection", false),
+		},
+		Attributes: map[string]schema.Attribute{
+			"id": schema.StringAttribute{
+				MarkdownDescription: "Internal identifier of the resource",
+				Computed:            true,
+				PlanModifiers: []planmodifier.String{
+					stringplanmodifier.UseStateForUnknown(),
+				},
+			},
+			"job_id": schema.StringAttribute{
+				MarkdownDescription: "Identifier for the anomaly detection job.",
+				Required:            true,
+				PlanModifiers: []planmodifier.String{
+					stringplanmodifier.RequiresReplace(),
+				},
+				Validators: []validator.String{
+					stringvalidator.LengthBetween(1, 64),
+					stringvalidator.RegexMatches(regexp.MustCompile(`^[a-zA-Z0-9_-]+$`), "must contain only alphanumeric characters, hyphens, and underscores"),
+				},
+			},
+			"state": schema.StringAttribute{
+				MarkdownDescription: "The desired state for the ML job. Valid values are `opened` and `closed`.",
+				Required:            true,
+				Validators: []validator.String{
+					stringvalidator.OneOf("opened", "closed"),
+				},
+			},
+			"force": schema.BoolAttribute{
+				MarkdownDescription: "When closing a job, use to forcefully close it. This method is quicker but can miss important clean up tasks.",
+				Optional:            true,
+				Computed:            true,
+				Default:             booldefault.StaticBool(false),
+			},
+			"job_timeout": schema.StringAttribute{
+				MarkdownDescription: "Timeout for the operation. Examples: `30s`, `5m`, `1h`. Default is `30s`.",
+				Optional:            true,
+				Computed:            true,
+				Default:             stringdefault.StaticString("30s"),
+				CustomType:          customtypes.DurationType{},
+			},
+			"timeouts": timeouts.Attributes(context.Background(), timeouts.Opts{
+				Create: true,
+				Update: true,
+			}),
+		},
+	}
+}
diff --git a/internal/elasticsearch/ml/job_state/state_utils.go b/internal/elasticsearch/ml/job_state/state_utils.go
new file mode 100644
index 000000000..db1e60d6a
--- /dev/null
+++ b/internal/elasticsearch/ml/job_state/state_utils.go
@@ -0,0 +1,47 @@
+package job_state
+
+import (
+	"context"
+	"fmt"
+
+	"github.com/elastic/terraform-provider-elasticstack/internal/asyncutils"
+	"github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch"
+	"github.com/elastic/terraform-provider-elasticstack/internal/diagutil"
+	"github.com/hashicorp/terraform-plugin-framework/diag"
+)
+
+var errJobNotFound = fmt.Errorf("ML job not found")
+
+// getJobState returns the current state of a job
+func (r *mlJobStateResource) getJobState(ctx context.Context, jobId string) (*string, diag.Diagnostics) {
+	// Get job stats to check current state
+	currentJob, diags := elasticsearch.GetMLJobStats(ctx, r.client, jobId)
+	if diags.HasError() {
+		return nil, diags
+	}
+
+	if currentJob == nil {
+		return nil, nil
+	}
+
+	return ¤tJob.State, nil
+}
+
+// waitForJobState waits for a job to reach the desired state
+func (r *mlJobStateResource) waitForJobState(ctx context.Context, jobId, desiredState string) diag.Diagnostics {
+	stateChecker := func(ctx context.Context) (bool, error) {
+		currentState, diags := r.getJobState(ctx, jobId)
+		if diags.HasError() {
+			return false, diagutil.FwDiagsAsError(diags)
+		}
+
+		if currentState == nil {
+			return false, errJobNotFound
+		}
+
+		return *currentState == desiredState, nil
+	}
+
+	err := asyncutils.WaitForStateTransition(ctx, "ml_job", jobId, stateChecker)
+	return diagutil.FrameworkDiagFromError(err)
+}
diff --git a/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState/closed/job_state.tf b/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState/closed/job_state.tf
new file mode 100644
index 000000000..b8a58aa65
--- /dev/null
+++ b/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState/closed/job_state.tf
@@ -0,0 +1,39 @@
+variable "job_id" {
+  description = "The job ID for the ML job"
+  type        = string
+}
+
+provider "elasticstack" {
+  elasticsearch {}
+}
+
+# First create an ML anomaly detection job
+resource "elasticstack_elasticsearch_ml_anomaly_detection_job" "test" {
+  job_id      = var.job_id
+  description = "Test anomaly detection job for state management"
+
+  analysis_config = {
+    bucket_span = "15m"
+    detectors = [
+      {
+        function             = "count"
+        detector_description = "Count detector"
+      }
+    ]
+  }
+
+  analysis_limits = {
+    model_memory_limit = "100mb"
+  }
+
+  data_description = {
+    time_field  = "@timestamp"
+    time_format = "epoch_ms"
+  }
+}
+
+# Then manage the state of that ML job
+resource "elasticstack_elasticsearch_ml_job_state" "test" {
+  job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.test.job_id
+  state  = "closed"
+}
\ No newline at end of file
diff --git a/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState/opened/job_state.tf b/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState/opened/job_state.tf
new file mode 100644
index 000000000..6a00ae939
--- /dev/null
+++ b/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState/opened/job_state.tf
@@ -0,0 +1,39 @@
+variable "job_id" {
+  description = "The job ID for the ML job"
+  type        = string
+}
+
+provider "elasticstack" {
+  elasticsearch {}
+}
+
+# First create an ML anomaly detection job
+resource "elasticstack_elasticsearch_ml_anomaly_detection_job" "test" {
+  job_id      = var.job_id
+  description = "Test anomaly detection job for state management"
+
+  analysis_config = {
+    bucket_span = "15m"
+    detectors = [
+      {
+        function             = "count"
+        detector_description = "Count detector"
+      }
+    ]
+  }
+
+  analysis_limits = {
+    model_memory_limit = "100mb"
+  }
+
+  data_description = {
+    time_field  = "@timestamp"
+    time_format = "epoch_ms"
+  }
+}
+
+# Then manage the state of that ML job
+resource "elasticstack_elasticsearch_ml_job_state" "test" {
+  job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.test.job_id
+  state  = "opened"
+}
\ No newline at end of file
diff --git a/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState/opened_with_options/job_state.tf b/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState/opened_with_options/job_state.tf
new file mode 100644
index 000000000..f67d25f61
--- /dev/null
+++ b/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobState/opened_with_options/job_state.tf
@@ -0,0 +1,53 @@
+variable "job_id" {
+  description = "The job ID for the ML job"
+  type        = string
+}
+
+variable "force" {
+  description = "Whether to force the job state change"
+  type        = bool
+  default     = true
+}
+
+variable "job_timeout" {
+  description = "Timeout for the job state change"
+  type        = string
+  default     = "1m"
+}
+
+provider "elasticstack" {
+  elasticsearch {}
+}
+
+# First create an ML anomaly detection job
+resource "elasticstack_elasticsearch_ml_anomaly_detection_job" "test" {
+  job_id      = var.job_id
+  description = "Test anomaly detection job for state management with options"
+
+  analysis_config = {
+    bucket_span = "15m"
+    detectors = [
+      {
+        function             = "count"
+        detector_description = "Count detector"
+      }
+    ]
+  }
+
+  analysis_limits = {
+    model_memory_limit = "100mb"
+  }
+
+  data_description = {
+    time_field  = "@timestamp"
+    time_format = "epoch_ms"
+  }
+}
+
+# Then manage the state of that ML job with custom options
+resource "elasticstack_elasticsearch_ml_job_state" "test" {
+  job_id      = elasticstack_elasticsearch_ml_anomaly_detection_job.test.job_id
+  state       = "opened"
+  force       = var.force
+  job_timeout = var.job_timeout
+}
\ No newline at end of file
diff --git a/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobStateImport/opened/job_state.tf b/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobStateImport/opened/job_state.tf
new file mode 100644
index 000000000..e99f17133
--- /dev/null
+++ b/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobStateImport/opened/job_state.tf
@@ -0,0 +1,39 @@
+variable "job_id" {
+  description = "The job ID for the ML job"
+  type        = string
+}
+
+provider "elasticstack" {
+  elasticsearch {}
+}
+
+# First create an ML anomaly detection job
+resource "elasticstack_elasticsearch_ml_anomaly_detection_job" "test" {
+  job_id      = var.job_id
+  description = "Test anomaly detection job for state management import"
+
+  analysis_config = {
+    bucket_span = "15m"
+    detectors = [
+      {
+        function             = "count"
+        detector_description = "Count detector"
+      }
+    ]
+  }
+
+  analysis_limits = {
+    model_memory_limit = "100mb"
+  }
+
+  data_description = {
+    time_field  = "@timestamp"
+    time_format = "epoch_ms"
+  }
+}
+
+# Then manage the state of that ML job
+resource "elasticstack_elasticsearch_ml_job_state" "test" {
+  job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.test.job_id
+  state  = "opened"
+}
\ No newline at end of file
diff --git a/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobStateNonExistent/non_existent/job_state.tf b/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobStateNonExistent/non_existent/job_state.tf
new file mode 100644
index 000000000..32f74cd89
--- /dev/null
+++ b/internal/elasticsearch/ml/job_state/testdata/TestAccResourceMLJobStateNonExistent/non_existent/job_state.tf
@@ -0,0 +1,9 @@
+provider "elasticstack" {
+  elasticsearch {}
+}
+
+# Try to manage state of a non-existent ML job
+resource "elasticstack_elasticsearch_ml_job_state" "test" {
+  job_id = "non-existent-ml-job"
+  state  = "opened"
+}
\ No newline at end of file
diff --git a/internal/elasticsearch/ml/job_state/update.go b/internal/elasticsearch/ml/job_state/update.go
new file mode 100644
index 000000000..03be71df3
--- /dev/null
+++ b/internal/elasticsearch/ml/job_state/update.go
@@ -0,0 +1,144 @@
+package job_state
+
+import (
+	"context"
+	"fmt"
+	"time"
+
+	"github.com/elastic/terraform-provider-elasticstack/internal/clients"
+	"github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch"
+	"github.com/hashicorp/terraform-plugin-framework/diag"
+	"github.com/hashicorp/terraform-plugin-framework/resource"
+	"github.com/hashicorp/terraform-plugin-framework/tfsdk"
+	"github.com/hashicorp/terraform-plugin-framework/types"
+	"github.com/hashicorp/terraform-plugin-log/tflog"
+)
+
+func (r *mlJobStateResource) Update(ctx context.Context, req resource.UpdateRequest, resp *resource.UpdateResponse) {
+	var data MLJobStateData
+	diags := req.Plan.Get(ctx, &data)
+	resp.Diagnostics.Append(diags...)
+	if resp.Diagnostics.HasError() {
+		return
+	}
+
+	// Get update timeout
+	updateTimeout, fwDiags := data.Timeouts.Update(ctx, 5*time.Minute) // Default 5 minutes
+	resp.Diagnostics.Append(fwDiags...)
+	if resp.Diagnostics.HasError() {
+		return
+	}
+
+	diags = r.update(ctx, req.Plan, &resp.State, updateTimeout)
+	resp.Diagnostics.Append(diags...)
+	if resp.Diagnostics.HasError() {
+		return
+	}
+}
+
+func (r *mlJobStateResource) update(ctx context.Context, plan tfsdk.Plan, state *tfsdk.State, operationTimeout time.Duration) diag.Diagnostics {
+	var data MLJobStateData
+	diags := plan.Get(ctx, &data)
+	if diags.HasError() {
+		return diags
+	}
+
+	client, fwDiags := clients.MaybeNewApiClientFromFrameworkResource(ctx, data.ElasticsearchConnection, r.client)
+	diags.Append(fwDiags...)
+	if diags.HasError() {
+		return diags
+	}
+
+	jobId := data.JobId.ValueString()
+	desiredState := data.State.ValueString()
+
+	// Create context with timeout
+	ctx, cancel := context.WithTimeout(ctx, operationTimeout)
+	defer cancel()
+
+	// First, get the current job stats to check if the job exists and its current state
+	currentState, fwDiags := r.getJobState(ctx, jobId)
+	diags.Append(fwDiags...)
+	if diags.HasError() {
+		return diags
+	}
+
+	if currentState == nil {
+		diags.AddError(
+			"ML Job not found",
+			fmt.Sprintf("ML job %s does not exist", jobId),
+		)
+		return diags
+	}
+
+	// Perform state transition if needed
+	fwDiags = r.performStateTransition(ctx, client, data, *currentState, operationTimeout)
+	diags.Append(fwDiags...)
+	if diags.HasError() {
+		return diags
+	}
+
+	// Generate composite ID
+	compId, sdkDiags := client.ID(ctx, jobId)
+	if len(sdkDiags) > 0 {
+		for _, d := range sdkDiags {
+			diags.AddError(d.Summary, d.Detail)
+		}
+		return diags
+	}
+
+	// Set the response state
+	data.Id = types.StringValue(compId.String())
+	data.JobId = types.StringValue(jobId)
+	data.State = types.StringValue(desiredState)
+
+	diags.Append(state.Set(ctx, data)...)
+	return diags
+}
+
+// performStateTransition handles the ML job state transition process
+func (r *mlJobStateResource) performStateTransition(ctx context.Context, client *clients.ApiClient, data MLJobStateData, currentState string, operationTimeout time.Duration) diag.Diagnostics {
+	jobId := data.JobId.ValueString()
+	desiredState := data.State.ValueString()
+	force := data.Force.ValueBool()
+
+	// Parse timeout duration
+	timeout, parseErrs := data.Timeout.Parse()
+	if parseErrs.HasError() {
+		return parseErrs
+	}
+
+	// Return early if no state change is needed
+	if currentState == desiredState {
+		tflog.Debug(ctx, fmt.Sprintf("ML job %s is already in desired state %s", jobId, desiredState))
+		return nil
+	}
+
+	// Initiate the state change
+	switch desiredState {
+	case "opened":
+		if diags := elasticsearch.OpenMLJob(ctx, client, jobId); diags.HasError() {
+			return diags
+		}
+	case "closed":
+		if diags := elasticsearch.CloseMLJob(ctx, client, jobId, force, timeout); diags.HasError() {
+			return diags
+		}
+	default:
+		return diag.Diagnostics{
+			diag.NewErrorDiagnostic(
+				"Invalid state",
+				fmt.Sprintf("Invalid state %s. Valid states are 'opened' and 'closed'", desiredState),
+			),
+		}
+	}
+
+	// Wait for state transition to complete
+	diags := r.waitForJobState(ctx, jobId, desiredState)
+	if diags.HasError() {
+		return diags
+	}
+
+	tflog.Info(ctx, fmt.Sprintf("ML job %s successfully transitioned to state %s", jobId, desiredState))
+	return nil
+}
diff --git a/provider/plugin_framework.go b/provider/plugin_framework.go
index fc0f901e7..14a0d9939 100644
--- a/provider/plugin_framework.go
+++ b/provider/plugin_framework.go
@@ -13,6 +13,7 @@ import (
 	"github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/index/indices"
 	"github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/ml/anomaly_detection_job"
 	"github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/ml/datafeed"
+	"github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/ml/job_state"
 	"github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/security/api_key"
 	"github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/security/role_mapping"
 	"github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/security/system_user"
@@ -126,5 +127,6 @@ func (p *Provider) Resources(ctx context.Context) []func() resource.Resource {
 		datafeed.NewDatafeedResource,
 		anomaly_detection_job.NewAnomalyDetectionJobResource,
 		security_detection_rule.NewSecurityDetectionRuleResource,
+		job_state.NewMLJobStateResource,
 	}
 }
diff --git a/templates/resources/elasticsearch_ml_job_state.md.tmpl b/templates/resources/elasticsearch_ml_job_state.md.tmpl
new file mode 100644
index 000000000..113f174f2
--- /dev/null
+++ b/templates/resources/elasticsearch_ml_job_state.md.tmpl
@@ -0,0 +1,18 @@
+---
+page_title: "{{.Name}} {{.Type}} - {{.ProviderName}}"
+subcategory: ""
+description: |-
+{{ .Description | plainmarkdown | trimspace | prefixlines "  " }}
+---
+
+# {{.Name}} ({{.Type}})
+
+{{ .Description | trimspace }}
+
+{{ if .HasExample -}}
+## Example Usage
+
+{{ tffile .ExampleFile }}
+{{- end }}
+
+{{ .SchemaMarkdown | trimspace }}
\ No newline at end of file