From d920cdeb96ee3cc4b6391654b0d78f33f9838dbf Mon Sep 17 00:00:00 2001 From: Karen Xu Date: Sun, 9 Nov 2025 05:33:06 +0000 Subject: [PATCH 1/6] Add pipeline, fix update to use doc instead of object field (still fallsback to object for bwc) bytes optimize Set the default value of source to null to match REST Support allowExplicitIndex setting Signed-off-by: Karen Xu --- CHANGELOG.md | 1 + .../opensearch/transport/grpc/GrpcPlugin.java | 5 +- .../common/FetchSourceContextProtoUtils.java | 2 +- .../bulk/BulkRequestParserProtoUtils.java | 181 +++++++++++++----- .../document/bulk/BulkRequestProtoUtils.java | 10 +- .../grpc/services/DocumentServiceImpl.java | 8 +- .../FetchSourceContextProtoUtilsTests.java | 9 +- 7 files changed, 158 insertions(+), 58 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5375d13323d5c..41fb815551f37 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -63,6 +63,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add S3Repository.LEGACY_MD5_CHECKSUM_CALCULATION to list of repository-s3 settings ([#19788](https://github.com/opensearch-project/OpenSearch/pull/19788)) - Fix NullPointerException when restoring remote snapshot with missing shard size information ([#19684](https://github.com/opensearch-project/OpenSearch/pull/19684)) - Fix NPE of ScriptScoreQuery ([#19650](https://github.com/opensearch-project/OpenSearch/pull/19650)) +- Fix GRPC Bulk ([#x](https://github.com/opensearch-project/OpenSearch/pull/x)) ### Dependencies - Update to Gradle 9.2 ([#19575](https://github.com/opensearch-project/OpenSearch/pull/19575)) ([#19856](https://github.com/opensearch-project/OpenSearch/pull/19856)) diff --git a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java index 45e4a5b1eb022..fa736b1f30f14 100644 --- a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java +++ b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java @@ -210,7 +210,7 @@ public Map> getAuxTransports( return Collections.singletonMap(GRPC_TRANSPORT_SETTING_KEY, () -> { List grpcServices = new ArrayList<>( - List.of(new DocumentServiceImpl(client), new SearchServiceImpl(client, queryUtils)) + List.of(new DocumentServiceImpl(client, settings), new SearchServiceImpl(client, queryUtils)) ); for (GrpcServiceFactory serviceFac : servicesFactory) { List pluginServices = serviceFac.initClient(client) @@ -259,10 +259,9 @@ public Map> getSecureAuxTransports( if (client == null || queryRegistry == null) { throw new RuntimeException("createComponents must be called first to initialize server provided resources."); } - return Collections.singletonMap(GRPC_SECURE_TRANSPORT_SETTING_KEY, () -> { List grpcServices = new ArrayList<>( - List.of(new DocumentServiceImpl(client), new SearchServiceImpl(client, queryUtils)) + List.of(new DocumentServiceImpl(client, settings), new SearchServiceImpl(client, queryUtils)) ); for (GrpcServiceFactory serviceFac : servicesFactory) { List pluginServices = serviceFac.initClient(client) diff --git a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/common/FetchSourceContextProtoUtils.java b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/common/FetchSourceContextProtoUtils.java index 07c16fcfd21d8..6a754483c9793 100644 --- a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/common/FetchSourceContextProtoUtils.java +++ b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/common/FetchSourceContextProtoUtils.java @@ -38,7 +38,7 @@ private FetchSourceContextProtoUtils() { * @return A FetchSourceContext object based on the request parameters, or null if no source parameters are provided */ public static FetchSourceContext parseFromProtoRequest(org.opensearch.protobufs.BulkRequest request) { - Boolean fetchSource = true; + Boolean fetchSource = null; String[] sourceExcludes = null; String[] sourceIncludes = null; diff --git a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java index b9997013f7700..c6e11724285c6 100644 --- a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java +++ b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java @@ -8,6 +8,7 @@ package org.opensearch.transport.grpc.proto.request.document.bulk; +import com.google.protobuf.ByteString; import org.opensearch.action.DocWriteRequest; import org.opensearch.action.bulk.BulkRequestParser; import org.opensearch.action.delete.DeleteRequest; @@ -106,6 +107,7 @@ static MediaType detectMediaType(byte[] document) { * @param defaultFetchSourceContext * @param defaultPipeline * @param defaultRequireAlias + * @param allowExplicitIndex whether explicit index specification is allowed (security setting) * @return */ public static DocWriteRequest[] getDocWriteRequests( @@ -114,7 +116,8 @@ public static DocWriteRequest[] getDocWriteRequests( String defaultRouting, FetchSourceContext defaultFetchSourceContext, String defaultPipeline, - Boolean defaultRequireAlias + Boolean defaultRequireAlias, + boolean allowExplicitIndex ) { List bulkRequestBodyList = request.getBulkRequestBodyList(); DocWriteRequest[] docWriteRequests = new DocWriteRequest[bulkRequestBodyList.size()]; @@ -152,7 +155,9 @@ public static DocWriteRequest[] getDocWriteRequests( pipeline, ifSeqNo, ifPrimaryTerm, - requireAlias + requireAlias, + defaultIndex, + allowExplicitIndex ); break; case INDEX: @@ -168,13 +173,25 @@ public static DocWriteRequest[] getDocWriteRequests( pipeline, ifSeqNo, ifPrimaryTerm, - requireAlias + requireAlias, + defaultIndex, + allowExplicitIndex ); break; case UPDATE: + // Extract the doc field from UpdateAction (matches REST API structure) + // Use ByteString directly to avoid unnecessary byte array allocation + ByteString updateDocBytes = ByteString.EMPTY; + if (bulkRequestBodyEntry.hasUpdateAction() && bulkRequestBodyEntry.getUpdateAction().hasDoc()) { + updateDocBytes = bulkRequestBodyEntry.getUpdateAction().getDoc(); + } else if (bulkRequestBodyEntry.hasObject()) { + // Fallback to object field for backwards compatibility + // TODO: Remove this fallback once all clients use UpdateAction.doc + updateDocBytes = bulkRequestBodyEntry.getObject(); + } docWriteRequest = buildUpdateRequest( operationContainer.getUpdate(), - bulkRequestBodyEntry.getObject().toByteArray(), + updateDocBytes, bulkRequestBodyEntry, index, id, @@ -184,7 +201,9 @@ public static DocWriteRequest[] getDocWriteRequests( pipeline, ifSeqNo, ifPrimaryTerm, - requireAlias + requireAlias, + defaultIndex, + allowExplicitIndex ); break; case DELETE: @@ -196,7 +215,9 @@ public static DocWriteRequest[] getDocWriteRequests( version, versionType, ifSeqNo, - ifPrimaryTerm + ifPrimaryTerm, + defaultIndex, + allowExplicitIndex ); break; case OPERATIONCONTAINER_NOT_SET: @@ -225,6 +246,8 @@ public static DocWriteRequest[] getDocWriteRequests( * @param ifSeqNo The default sequence number for optimistic concurrency control * @param ifPrimaryTerm The default primary term for optimistic concurrency control * @param requireAlias Whether the index must be an alias + * @param defaultIndex The default index from the request URL (for security check) + * @param allowExplicitIndex Whether explicit index specification is allowed * @return The constructed IndexRequest */ public static IndexRequest buildCreateRequest( @@ -238,9 +261,18 @@ public static IndexRequest buildCreateRequest( String pipeline, long ifSeqNo, long ifPrimaryTerm, - boolean requireAlias + boolean requireAlias, + String defaultIndex, + boolean allowExplicitIndex ) { - index = createOperation.hasXIndex() ? createOperation.getXIndex() : index; + // Check explicit index (matches REST BulkRequestParser line 218-221) + if (createOperation.hasXIndex()) { + if (!allowExplicitIndex && defaultIndex != null) { + throw new IllegalArgumentException("explicit index in bulk is not allowed"); + } + index = createOperation.getXIndex(); + } + id = createOperation.hasXId() ? createOperation.getXId() : id; routing = createOperation.hasRouting() ? createOperation.getRouting() : routing; pipeline = createOperation.hasPipeline() ? createOperation.getPipeline() : pipeline; @@ -275,6 +307,8 @@ public static IndexRequest buildCreateRequest( * @param ifSeqNo The default sequence number for optimistic concurrency control * @param ifPrimaryTerm The default primary term for optimistic concurrency control * @param requireAlias Whether the index must be an alias + * @param defaultIndex The default index from the request URL (for security check) + * @param allowExplicitIndex Whether explicit index specification is allowed * @return The constructed IndexRequest */ public static IndexRequest buildIndexRequest( @@ -289,10 +323,20 @@ public static IndexRequest buildIndexRequest( String pipeline, long ifSeqNo, long ifPrimaryTerm, - boolean requireAlias + boolean requireAlias, + String defaultIndex, + boolean allowExplicitIndex ) { opType = indexOperation.hasOpType() ? indexOperation.getOpType() : opType; - index = indexOperation.hasXIndex() ? indexOperation.getXIndex() : index; + + // Check explicit index (matches REST BulkRequestParser line 218-221) + if (indexOperation.hasXIndex()) { + if (!allowExplicitIndex && defaultIndex != null) { + throw new IllegalArgumentException("explicit index in bulk is not allowed"); + } + index = indexOperation.getXIndex(); + } + id = indexOperation.hasXId() ? indexOperation.getXId() : id; routing = indexOperation.hasRouting() ? indexOperation.getRouting() : routing; version = indexOperation.hasVersion() ? indexOperation.getVersion() : version; @@ -335,7 +379,7 @@ public static IndexRequest buildIndexRequest( * Builds an UpdateRequest from an UpdateOperation protobuf message. * * @param updateOperation The update operation protobuf message - * @param document The document content as bytes + * @param documentBytes The document content as ByteString (zero-copy reference) * @param bulkRequestBody The bulk request body containing additional update options * @param index The default index name * @param id The default document ID @@ -346,11 +390,13 @@ public static IndexRequest buildIndexRequest( * @param ifSeqNo The default sequence number for optimistic concurrency control * @param ifPrimaryTerm The default primary term for optimistic concurrency control * @param requireAlias Whether the index must be an alias + * @param defaultIndex The default index from the request URL (for security check) + * @param allowExplicitIndex Whether explicit index specification is allowed * @return The constructed UpdateRequest */ public static UpdateRequest buildUpdateRequest( UpdateOperation updateOperation, - byte[] document, + ByteString documentBytes, BulkRequestBody bulkRequestBody, String index, String id, @@ -360,19 +406,26 @@ public static UpdateRequest buildUpdateRequest( String pipeline, long ifSeqNo, long ifPrimaryTerm, - boolean requireAlias + boolean requireAlias, + String defaultIndex, + boolean allowExplicitIndex ) { - index = updateOperation.hasXIndex() ? updateOperation.getXIndex() : index; + // Check explicit index (matches REST BulkRequestParser line 218-221) + if (updateOperation.hasXIndex()) { + if (!allowExplicitIndex && defaultIndex != null) { + throw new IllegalArgumentException("explicit index in bulk is not allowed"); + } + index = updateOperation.getXIndex(); + } + id = updateOperation.hasXId() ? updateOperation.getXId() : id; routing = updateOperation.hasRouting() ? updateOperation.getRouting() : routing; - fetchSourceContext = bulkRequestBody.hasUpdateAction() && bulkRequestBody.getUpdateAction().hasXSource() - ? FetchSourceContextProtoUtils.fromProto(bulkRequestBody.getUpdateAction().getXSource()) - : fetchSourceContext; retryOnConflict = updateOperation.hasRetryOnConflict() ? updateOperation.getRetryOnConflict() : retryOnConflict; ifSeqNo = updateOperation.hasIfSeqNo() ? updateOperation.getIfSeqNo() : ifSeqNo; ifPrimaryTerm = updateOperation.hasIfPrimaryTerm() ? updateOperation.getIfPrimaryTerm() : ifPrimaryTerm; requireAlias = updateOperation.hasRequireAlias() ? updateOperation.getRequireAlias() : requireAlias; + // Create UpdateRequest with operation-level fields UpdateRequest updateRequest = new UpdateRequest().index(index) .id(id) .routing(routing) @@ -382,77 +435,106 @@ public static UpdateRequest buildUpdateRequest( .setRequireAlias(requireAlias) .routing(routing); - updateRequest = fromProto(updateRequest, document, bulkRequestBody, updateOperation); + // Populate all document-level fields + updateRequest = fromProto(updateRequest, documentBytes, bulkRequestBody, ifSeqNo, ifPrimaryTerm); + // Apply fetchSourceContext default if (fetchSourceContext != null) { updateRequest.fetchSource(fetchSourceContext); } - // TODO: how is upsertRequest used? - // IndexRequest upsertRequest = updateRequest.upsertRequest(); - // if (upsertRequest != null) { - // upsertRequest.setPipeline(pipeline); - // } + + // Set pipeline on upsert request if it exists + IndexRequest upsertRequest = updateRequest.upsertRequest(); + if (upsertRequest != null) { + upsertRequest.setPipeline(pipeline); + } return updateRequest; } /** * Populates an UpdateRequest with values from protobuf messages. - * Similar to {@link UpdateRequest#fromXContent(XContentParser)} + * Equivalent to {@link UpdateRequest#fromXContent(XContentParser)} for REST API. * - * @param updateRequest The update request to populate - * @param document The document content as bytes + * @param updateRequest The update request to populate (may already have if_seq_no/if_primary_term set) + * @param documentBytes The document content as ByteString (zero-copy reference) * @param bulkRequestBody The bulk request body containing update options - * @param updateOperation The update operation protobuf message + * @param ifSeqNoFromOperation The sequence number + * @param ifPrimaryTermFromOperation The primary term * @return The populated UpdateRequest */ - public static UpdateRequest fromProto( + private static UpdateRequest fromProto( UpdateRequest updateRequest, - byte[] document, + ByteString documentBytes, BulkRequestBody bulkRequestBody, - UpdateOperation updateOperation + long ifSeqNoFromOperation, + long ifPrimaryTermFromOperation ) { + // Start with operation metadata values + long ifSeqNo = ifSeqNoFromOperation; + long ifPrimaryTerm = ifPrimaryTermFromOperation; if (bulkRequestBody.hasUpdateAction()) { UpdateAction updateAction = bulkRequestBody.getUpdateAction(); + // 1. script if (updateAction.hasScript()) { Script script = ScriptProtoUtils.parseFromProtoRequest(updateAction.getScript()); updateRequest.script(script); } + // 2. scripted_upsert if (updateAction.hasScriptedUpsert()) { updateRequest.scriptedUpsert(updateAction.getScriptedUpsert()); } + // 3. upsert if (updateAction.hasUpsert()) { - byte[] upsertBytes = updateAction.getUpsert().toByteArray(); - MediaType upsertMediaType = detectMediaType(upsertBytes); - updateRequest.upsert(upsertBytes, upsertMediaType); + ByteString upsertBytes = updateAction.getUpsert(); + byte[] upsertArray = upsertBytes.toByteArray(); + MediaType upsertMediaType = detectMediaType(upsertArray); + updateRequest.upsert(upsertArray, upsertMediaType); } + } + + // 4. doc + // Only set doc if ByteString is non-empty (empty ByteString = field not provided in proto) + // This check is structural, not business validation: + // - ByteString.EMPTY = no doc field in proto → don't call doc() → keeps doc=null + // - Non-empty ByteString = doc field in proto → call doc() → sets doc!=null + // UpdateRequest.validate() then handles business rules: + // - If script!=null && doc!=null → validation error + // - If script==null && doc==null → validation error + if (documentBytes != null && !documentBytes.isEmpty()) { + byte[] docArray = documentBytes.toByteArray(); + MediaType mediaType = detectMediaType(docArray); + updateRequest.doc(docArray, mediaType); + } + + if (bulkRequestBody.hasUpdateAction()) { + UpdateAction updateAction = bulkRequestBody.getUpdateAction(); + // 5. doc_as_upsert if (updateAction.hasDocAsUpsert()) { updateRequest.docAsUpsert(updateAction.getDocAsUpsert()); } + // 6. detect_noop if (updateAction.hasDetectNoop()) { updateRequest.detectNoop(updateAction.getDetectNoop()); } + // 7. _source if (updateAction.hasXSource()) { updateRequest.fetchSource(FetchSourceContextProtoUtils.fromProto(updateAction.getXSource())); } - } - - MediaType mediaType = detectMediaType(document); - updateRequest.doc(document, mediaType); - if (updateOperation.hasIfSeqNo()) { - updateRequest.setIfSeqNo(updateOperation.getIfSeqNo()); + // 8 + 9. if_seq_no and if_primary_term are excluded from UpdateAction protobufs intentionally, as users can just provide them + // in UpdateOperation. } - if (updateOperation.hasIfPrimaryTerm()) { - updateRequest.setIfPrimaryTerm(updateOperation.getIfPrimaryTerm()); - } + // Set if_seq_no and if_primary_term + updateRequest.setIfSeqNo(ifSeqNo); + updateRequest.setIfPrimaryTerm(ifPrimaryTerm); return updateRequest; } @@ -468,6 +550,8 @@ public static UpdateRequest fromProto( * @param versionType The default version type * @param ifSeqNo The default sequence number for optimistic concurrency control * @param ifPrimaryTerm The default primary term for optimistic concurrency control + * @param defaultIndex The default index from the request URL (for security check) + * @param allowExplicitIndex Whether explicit index specification is allowed * @return The constructed DeleteRequest */ public static DeleteRequest buildDeleteRequest( @@ -478,9 +562,18 @@ public static DeleteRequest buildDeleteRequest( long version, VersionType versionType, long ifSeqNo, - long ifPrimaryTerm + long ifPrimaryTerm, + String defaultIndex, + boolean allowExplicitIndex ) { - index = deleteOperation.hasXIndex() ? deleteOperation.getXIndex() : index; + // Check explicit index (matches REST BulkRequestParser line 218-221) + if (deleteOperation.hasXIndex()) { + if (!allowExplicitIndex && defaultIndex != null) { + throw new IllegalArgumentException("explicit index in bulk is not allowed"); + } + index = deleteOperation.getXIndex(); + } + id = deleteOperation.hasXId() ? deleteOperation.getXId() : id; routing = deleteOperation.hasRouting() ? deleteOperation.getRouting() : routing; version = deleteOperation.hasVersion() ? deleteOperation.getVersion() : version; diff --git a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtils.java b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtils.java index 7f336e56e87a9..b796de86d15b4 100644 --- a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtils.java +++ b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtils.java @@ -9,6 +9,7 @@ package org.opensearch.transport.grpc.proto.request.document.bulk; import org.opensearch.action.bulk.BulkShardRequest; +import org.opensearch.common.settings.Settings; import org.opensearch.protobufs.BulkRequest; import org.opensearch.rest.RestRequest; import org.opensearch.rest.action.document.RestBulkAction; @@ -37,9 +38,10 @@ private BulkRequestProtoUtils() { * Please ensure to keep both implementations consistent. * * @param request the request to execute + * @param settings node settings for security and configuration * @return a future of the bulk action that was executed */ - public static org.opensearch.action.bulk.BulkRequest prepareRequest(BulkRequest request) { + public static org.opensearch.action.bulk.BulkRequest prepareRequest(BulkRequest request, Settings settings) { org.opensearch.action.bulk.BulkRequest bulkRequest = Requests.bulkRequest(); String defaultIndex = request.hasIndex() ? request.getIndex() : null; @@ -60,6 +62,9 @@ public static org.opensearch.action.bulk.BulkRequest prepareRequest(BulkRequest bulkRequest.setRefreshPolicy(RefreshProtoUtils.getRefreshPolicy(request.getRefresh())); + // Read the allowExplicitIndex setting (matches REST BulkAction line 74) + boolean allowExplicitIndex = RestBulkAction.MULTI_ALLOW_EXPLICIT_INDEX.get(settings); + // Note: batch_size is deprecated in OS 3.x. Add batch_size parameter when backporting to OS 2.x /* if (request.hasBatchSize()){ @@ -75,7 +80,8 @@ public static org.opensearch.action.bulk.BulkRequest prepareRequest(BulkRequest defaultRouting, defaultFetchSourceContext, defaultPipeline, - defaultRequireAlias + defaultRequireAlias, + allowExplicitIndex ) ); diff --git a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java index e1348bc78961f..6302fe1f260f6 100644 --- a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java +++ b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.common.settings.Settings; import org.opensearch.protobufs.services.DocumentServiceGrpc; import org.opensearch.transport.client.Client; import org.opensearch.transport.grpc.listeners.BulkRequestActionListener; @@ -25,14 +26,17 @@ public class DocumentServiceImpl extends DocumentServiceGrpc.DocumentServiceImplBase { private static final Logger logger = LogManager.getLogger(DocumentServiceImpl.class); private final Client client; + private final Settings settings; /** * Creates a new DocumentServiceImpl. * * @param client Client for executing actions on the local node + * @param settings Node settings for security and configuration */ - public DocumentServiceImpl(Client client) { + public DocumentServiceImpl(Client client, Settings settings) { this.client = client; + this.settings = settings; } /** @@ -44,7 +48,7 @@ public DocumentServiceImpl(Client client) { @Override public void bulk(org.opensearch.protobufs.BulkRequest request, StreamObserver responseObserver) { try { - org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request); + org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, settings); BulkRequestActionListener listener = new BulkRequestActionListener(responseObserver); client.bulk(bulkRequest, listener); } catch (RuntimeException e) { diff --git a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/common/FetchSourceContextProtoUtilsTests.java b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/common/FetchSourceContextProtoUtilsTests.java index 0b8d170ea51e2..795fe2ac2a9de 100644 --- a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/common/FetchSourceContextProtoUtilsTests.java +++ b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/common/FetchSourceContextProtoUtilsTests.java @@ -109,12 +109,9 @@ public void testParseFromProtoRequestWithNoSourceParams() { FetchSourceContext context = FetchSourceContextProtoUtils.parseFromProtoRequest(request); // Verify the result - // The implementation returns a default FetchSourceContext with fetchSource=true - // and empty includes/excludes arrays when no source parameters are provided - assertNotNull("Context should not be null", context); - assertTrue("fetchSource should be true", context.fetchSource()); - assertArrayEquals("includes should be empty", Strings.EMPTY_ARRAY, context.includes()); - assertArrayEquals("excludes should be empty", Strings.EMPTY_ARRAY, context.excludes()); + // When no source parameters are provided, should return null to match REST API behavior + // This prevents the "get" field from being returned in update/upsert responses + assertNull("Context should be null when no source parameters provided", context); } public void testFromProtoWithFetch() { From 113d3ed890b448e2173534d7fa71f77f82412528 Mon Sep 17 00:00:00 2001 From: Karen Xu Date: Mon, 10 Nov 2025 03:16:10 +0000 Subject: [PATCH 2/6] fix tests Signed-off-by: Karen Xu --- .../bulk/BulkRequestParserProtoUtils.java | 2 +- .../BulkRequestParserProtoUtilsTests.java | 136 +++++++++++++----- .../bulk/BulkRequestProtoUtilsTests.java | 17 +-- .../services/BulkRequestProtoUtilsTests.java | 9 +- .../document/DocumentServiceImplTests.java | 3 +- 5 files changed, 117 insertions(+), 50 deletions(-) diff --git a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java index c6e11724285c6..a6ee9fb5dc994 100644 --- a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java +++ b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java @@ -463,7 +463,7 @@ public static UpdateRequest buildUpdateRequest( * @param ifPrimaryTermFromOperation The primary term * @return The populated UpdateRequest */ - private static UpdateRequest fromProto( + static UpdateRequest fromProto( UpdateRequest updateRequest, ByteString documentBytes, BulkRequestBody bulkRequestBody, diff --git a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtilsTests.java b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtilsTests.java index 11dfe757d9d52..e483e39801721 100644 --- a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtilsTests.java +++ b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtilsTests.java @@ -54,7 +54,9 @@ public void testBuildCreateRequest() { "default-pipeline", 1L, 2L, - false + false, + null, + true ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -97,7 +99,9 @@ public void testBuildIndexRequest() { "default-pipeline", 1L, 2L, - false + false, + null, + true ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -132,7 +136,9 @@ public void testBuildIndexRequestWithOpType() { "default-pipeline", SequenceNumbers.UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, - false + false, + null, + true ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -160,7 +166,9 @@ public void testBuildDeleteRequest() { 1L, VersionType.INTERNAL, 1L, - 2L + 2L, + null, + true ); assertNotNull("DeleteRequest should not be null", deleteRequest); @@ -194,7 +202,7 @@ public void testBuildUpdateRequest() { UpdateRequest updateRequest = BulkRequestParserProtoUtils.buildUpdateRequest( updateOperation, - document, + ByteString.copyFrom(document), bulkRequestBody, "default-index", "default-id", @@ -204,7 +212,9 @@ public void testBuildUpdateRequest() { "default-pipeline", 1L, 2L, - false + false, + null, + true ); assertNotNull("UpdateRequest should not be null", updateRequest); @@ -257,7 +267,8 @@ public void testGetDocWriteRequests() { "default-routing", null, "default-pipeline", - false + false, + true ); assertNotNull("Requests should not be null", requests); @@ -301,7 +312,8 @@ public void testGetDocWriteRequestsWithInvalidOperation() { "default-routing", null, "default-pipeline", - false + false, + true ) ); } @@ -322,7 +334,9 @@ public void testBuildCreateRequestWithDefaults() { "default-pipeline", 1L, 2L, - false + false, + null, + true ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -349,7 +363,9 @@ public void testBuildCreateRequestWithPipeline() { "default-pipeline", 1L, 2L, - false + false, + null, + true ); assertEquals("Pipeline should use custom value", "custom-pipeline", indexRequest.getPipeline()); @@ -382,7 +398,9 @@ public void testBuildIndexRequestWithAllFields() { "default-pipeline", 1L, 2L, - false + false, + null, + true ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -415,7 +433,9 @@ public void testBuildIndexRequestWithNullOpType() { "default-pipeline", 1L, 2L, - false + false, + null, + true ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -454,7 +474,7 @@ public void testBuildUpdateRequestWithScript() { UpdateRequest updateRequest = BulkRequestParserProtoUtils.buildUpdateRequest( updateOperation, - document, + ByteString.copyFrom(document), bulkRequestBody, "default-index", "default-id", @@ -464,7 +484,9 @@ public void testBuildUpdateRequestWithScript() { "default-pipeline", 1L, 2L, - false + false, + null, + true ); assertNotNull("UpdateRequest should not be null", updateRequest); @@ -486,7 +508,7 @@ public void testBuildUpdateRequestWithUpsert() { UpdateRequest updateRequest = BulkRequestParserProtoUtils.buildUpdateRequest( updateOperation, - document, + ByteString.copyFrom(document), bulkRequestBody, "default-index", "default-id", @@ -496,7 +518,9 @@ public void testBuildUpdateRequestWithUpsert() { "default-pipeline", 1L, 2L, - false + false, + null, + true ); assertNotNull("UpdateRequest should not be null", updateRequest); @@ -516,7 +540,7 @@ public void testBuildUpdateRequestWithScriptedUpsert() { UpdateRequest updateRequest = BulkRequestParserProtoUtils.buildUpdateRequest( updateOperation, - document, + ByteString.copyFrom(document), bulkRequestBody, "default-index", "default-id", @@ -526,7 +550,9 @@ public void testBuildUpdateRequestWithScriptedUpsert() { "default-pipeline", 1L, 2L, - false + false, + null, + true ); assertNotNull("UpdateRequest should not be null", updateRequest); @@ -550,7 +576,7 @@ public void testBuildUpdateRequestWithFetchSource() { UpdateRequest updateRequest = BulkRequestParserProtoUtils.buildUpdateRequest( updateOperation, - document, + ByteString.copyFrom(document), bulkRequestBody, "default-index", "default-id", @@ -560,7 +586,9 @@ public void testBuildUpdateRequestWithFetchSource() { "default-pipeline", 1L, 2L, - false + false, + null, + true ); assertNotNull("UpdateRequest should not be null", updateRequest); @@ -579,7 +607,7 @@ public void testBuildUpdateRequestWithoutUpdateAction() { UpdateRequest updateRequest = BulkRequestParserProtoUtils.buildUpdateRequest( updateOperation, - document, + ByteString.copyFrom(document), bulkRequestBody, "default-index", "default-id", @@ -589,7 +617,9 @@ public void testBuildUpdateRequestWithoutUpdateAction() { "default-pipeline", 1L, 2L, - false + false, + null, + true ); assertNotNull("UpdateRequest should not be null", updateRequest); @@ -608,7 +638,9 @@ public void testBuildDeleteRequestWithDefaults() { 1L, VersionType.INTERNAL, 1L, - 2L + 2L, + null, + true ); assertNotNull("DeleteRequest should not be null", deleteRequest); @@ -642,7 +674,8 @@ public void testGetDocWriteRequestsWithGlobalValues() { null, // Pass null to test global routing null, null, // Pass null to test global pipeline - null // Pass null to test global requireAlias + null, // Pass null to test global requireAlias + true ); assertNotNull("Requests should not be null", requests); @@ -666,7 +699,8 @@ public void testGetDocWriteRequestsWithEmptyList() { "default-routing", null, "default-pipeline", - false + false, + true ); assertNotNull("Requests should not be null", requests); @@ -705,7 +739,13 @@ public void testFromProtoWithAllUpdateActionFields() { UpdateOperation updateOperation = UpdateOperation.newBuilder().setIfSeqNo(123L).setIfPrimaryTerm(456L).build(); - UpdateRequest result = BulkRequestParserProtoUtils.fromProto(updateRequest, document, bulkRequestBody, updateOperation); + UpdateRequest result = BulkRequestParserProtoUtils.fromProto( + updateRequest, + ByteString.copyFrom(document), + bulkRequestBody, + 123L, + 456L + ); assertNotNull("Result should not be null", result); assertNotNull("Script should be set", result.script()); @@ -735,7 +775,9 @@ public void testBuildCreateRequestWithSmileContent() throws Exception { null, SequenceNumbers.UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, - false + false, + null, + true ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -763,7 +805,9 @@ public void testBuildCreateRequestWithCborContent() throws Exception { null, SequenceNumbers.UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, - false + false, + null, + true ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -792,7 +836,9 @@ public void testBuildIndexRequestWithSmileContent() throws Exception { null, SequenceNumbers.UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, - false + false, + null, + true ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -820,7 +866,9 @@ public void testBuildIndexRequestWithCborContent() throws Exception { null, SequenceNumbers.UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, - false + false, + null, + true ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -843,7 +891,13 @@ public void testUpdateRequestWithCborUpsert() throws Exception { UpdateOperation updateOperation = UpdateOperation.newBuilder().build(); - UpdateRequest result = BulkRequestParserProtoUtils.fromProto(updateRequest, document, bulkRequestBody, updateOperation); + UpdateRequest result = BulkRequestParserProtoUtils.fromProto( + updateRequest, + ByteString.copyFrom(document), + bulkRequestBody, + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM + ); assertNotNull("Result should not be null", result); assertNotNull("Upsert should be set", result.upsertRequest()); @@ -865,7 +919,9 @@ public void testBuildCreateRequestWithEmptyDocument() { null, SequenceNumbers.UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, - false + false, + null, + true ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -891,7 +947,9 @@ public void testBuildCreateRequestWithJsonContent() throws Exception { null, SequenceNumbers.UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, - false + false, + null, + true ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -919,7 +977,9 @@ public void testBuildCreateRequestWithYamlContent() throws Exception { null, SequenceNumbers.UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, - false + false, + null, + true ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -948,7 +1008,9 @@ public void testBuildIndexRequestWithJsonContent() throws Exception { null, SequenceNumbers.UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, - false + false, + null, + true ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -976,7 +1038,9 @@ public void testBuildIndexRequestWithYamlContent() throws Exception { null, SequenceNumbers.UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, - false + false, + null, + true ); assertNotNull("IndexRequest should not be null", indexRequest); diff --git a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtilsTests.java b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtilsTests.java index df22a8edd83d9..dd62edd50acd9 100644 --- a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtilsTests.java +++ b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtilsTests.java @@ -9,6 +9,7 @@ package org.opensearch.transport.grpc.proto.request.document.bulk; import org.opensearch.action.support.WriteRequest; +import org.opensearch.common.settings.Settings; import org.opensearch.protobufs.BulkRequest; import org.opensearch.test.OpenSearchTestCase; @@ -26,7 +27,7 @@ public void testPrepareRequestWithBasicSettings() { .build(); // Call prepareRequest - org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request); + org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, Settings.EMPTY); // Verify the result assertNotNull("BulkRequest should not be null", bulkRequest); @@ -39,7 +40,7 @@ public void testPrepareRequestWithDefaultValues() { BulkRequest request = BulkRequest.newBuilder().build(); // Call prepareRequest - org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request); + org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, Settings.EMPTY); // Verify the result assertNotNull("BulkRequest should not be null", bulkRequest); @@ -52,7 +53,7 @@ public void testPrepareRequestWithTimeout() throws ParseException { BulkRequest request = BulkRequest.newBuilder().setTimeout("5s").build(); // Call prepareRequest - org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request); + org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, Settings.EMPTY); // Verify the result assertNotNull("BulkRequest should not be null", bulkRequest); @@ -69,7 +70,7 @@ public void testPrepareRequestWithWaitForActiveShards() { BulkRequest request = BulkRequest.newBuilder().setWaitForActiveShards(waitForActiveShards).build(); // Call prepareRequest - org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request); + org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, Settings.EMPTY); // Verify the result assertNotNull("BulkRequest should not be null", bulkRequest); @@ -82,7 +83,7 @@ public void testPrepareRequestWithRequireAlias() { BulkRequest request = BulkRequest.newBuilder().setRequireAlias(true).build(); // Call prepareRequest - org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request); + org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, Settings.EMPTY); // Verify the result assertNotNull("BulkRequest should not be null", bulkRequest); @@ -95,7 +96,7 @@ public void testPrepareRequestWithPipeline() { BulkRequest request = BulkRequest.newBuilder().setPipeline("test-pipeline").build(); // Call prepareRequest - org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request); + org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, Settings.EMPTY); // Verify the result assertNotNull("BulkRequest should not be null", bulkRequest); @@ -108,7 +109,7 @@ public void testPrepareRequestWithRefreshWait() { BulkRequest request = BulkRequest.newBuilder().setRefresh(org.opensearch.protobufs.Refresh.REFRESH_WAIT_FOR).build(); // Call prepareRequest - org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request); + org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, Settings.EMPTY); // Verify the result assertNotNull("BulkRequest should not be null", bulkRequest); @@ -120,7 +121,7 @@ public void testPrepareRequestWithRefreshFalse() { BulkRequest request = BulkRequest.newBuilder().setRefresh(org.opensearch.protobufs.Refresh.REFRESH_FALSE).build(); // Call prepareRequest - org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request); + org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, Settings.EMPTY); // Verify the result assertNotNull("BulkRequest should not be null", bulkRequest); diff --git a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/BulkRequestProtoUtilsTests.java b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/BulkRequestProtoUtilsTests.java index 5a5055320ee99..ceb2db940666a 100644 --- a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/BulkRequestProtoUtilsTests.java +++ b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/BulkRequestProtoUtilsTests.java @@ -11,6 +11,7 @@ import org.opensearch.action.DocWriteRequest; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.WriteRequest; +import org.opensearch.common.settings.Settings; import org.opensearch.protobufs.BulkRequest; import org.opensearch.protobufs.BulkRequestBody; import org.opensearch.protobufs.DeleteOperation; @@ -43,7 +44,7 @@ public void testPrepareRequestWithIndexOperation() throws IOException { BulkRequest request = createBulkRequestWithIndexOperation(); // Convert to OpenSearch BulkRequest - org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request); + org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, Settings.EMPTY); // Verify the converted request assertEquals("Should have 1 request", 1, bulkRequest.numberOfActions()); @@ -64,7 +65,7 @@ public void testPrepareRequestWithCreateOperation() throws IOException { BulkRequest request = createBulkRequestWithCreateOperation(); // Convert to OpenSearch BulkRequest - org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request); + org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, Settings.EMPTY); // Verify the converted request assertEquals("Should have 1 request", 1, bulkRequest.numberOfActions()); @@ -81,7 +82,7 @@ public void testPrepareRequestWithDeleteOperation() throws IOException { BulkRequest request = createBulkRequestWithDeleteOperation(); // Convert to OpenSearch BulkRequest - org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request); + org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, Settings.EMPTY); // Verify the converted request assertEquals("Should have 1 request", 1, bulkRequest.numberOfActions()); @@ -97,7 +98,7 @@ public void testPrepareRequestWithUpdateOperation() throws IOException { BulkRequest request = createBulkRequestWithUpdateOperation(); // Convert to OpenSearch BulkRequest - org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request); + org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, Settings.EMPTY); // Verify the converted request assertEquals("Should have 1 request", 1, bulkRequest.numberOfActions()); diff --git a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/document/DocumentServiceImplTests.java b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/document/DocumentServiceImplTests.java index 3215238247bbd..eb2e4d95f309a 100644 --- a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/document/DocumentServiceImplTests.java +++ b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/document/DocumentServiceImplTests.java @@ -9,6 +9,7 @@ package org.opensearch.transport.grpc.services.document; import com.google.protobuf.ByteString; +import org.opensearch.common.settings.Settings; import org.opensearch.protobufs.BulkRequest; import org.opensearch.protobufs.BulkRequestBody; import org.opensearch.protobufs.IndexOperation; @@ -40,7 +41,7 @@ public class DocumentServiceImplTests extends OpenSearchTestCase { @Before public void setup() throws IOException { MockitoAnnotations.openMocks(this); - service = new DocumentServiceImpl(client); + service = new DocumentServiceImpl(client, Settings.EMPTY); } public void testBulkSuccess() throws IOException { From 34bbaf4fa62ea539a078a95fa39d1a7721321738 Mon Sep 17 00:00:00 2001 From: Karen Xu Date: Mon, 10 Nov 2025 03:17:03 +0000 Subject: [PATCH 3/6] changelog Signed-off-by: Karen Xu --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 41fb815551f37..46f07901bcc01 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -63,7 +63,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add S3Repository.LEGACY_MD5_CHECKSUM_CALCULATION to list of repository-s3 settings ([#19788](https://github.com/opensearch-project/OpenSearch/pull/19788)) - Fix NullPointerException when restoring remote snapshot with missing shard size information ([#19684](https://github.com/opensearch-project/OpenSearch/pull/19684)) - Fix NPE of ScriptScoreQuery ([#19650](https://github.com/opensearch-project/OpenSearch/pull/19650)) -- Fix GRPC Bulk ([#x](https://github.com/opensearch-project/OpenSearch/pull/x)) +- Fix GRPC Bulk ([#19937](https://github.com/opensearch-project/OpenSearch/pull/19937)) ### Dependencies - Update to Gradle 9.2 ([#19575](https://github.com/opensearch-project/OpenSearch/pull/19575)) ([#19856](https://github.com/opensearch-project/OpenSearch/pull/19856)) From 6d897b887c82d29ae42f1b76c0f6bfc3d95f4f21 Mon Sep 17 00:00:00 2001 From: Karen Xu Date: Mon, 10 Nov 2025 22:09:47 +0000 Subject: [PATCH 4/6] add tests Signed-off-by: Karen Xu --- .../BulkRequestParserProtoUtilsTests.java | 337 ++++++++++++++++++ 1 file changed, 337 insertions(+) diff --git a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtilsTests.java b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtilsTests.java index e483e39801721..acd435d7b265f 100644 --- a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtilsTests.java +++ b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtilsTests.java @@ -1102,4 +1102,341 @@ public void testDetectMediaTypeUnrecognizable() { MediaType result = BulkRequestParserProtoUtils.detectMediaType(invalidBytes); assertEquals("application/json", result.mediaTypeWithoutParameters()); } + + /** + * Test buildCreateRequest with explicit index when allowExplicitIndex is false + */ + public void testBuildCreateRequestExplicitIndexNotAllowed() { + WriteOperation writeOperation = WriteOperation.newBuilder().setXIndex("explicit-index").build(); + byte[] document = "{\"field\":\"value\"}".getBytes(StandardCharsets.UTF_8); + + IllegalArgumentException exception = expectThrows( + IllegalArgumentException.class, + () -> BulkRequestParserProtoUtils.buildCreateRequest( + writeOperation, + document, + "default-index", + "default-id", + null, + Versions.MATCH_ANY, + VersionType.INTERNAL, + null, + SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + false, + "default-index", // defaultIndex is not null + false // allowExplicitIndex is false + ) + ); + + assertEquals("explicit index in bulk is not allowed", exception.getMessage()); + } + + /** + * Test buildIndexRequest with explicit index when allowExplicitIndex is false + */ + public void testBuildIndexRequestExplicitIndexNotAllowed() { + IndexOperation indexOperation = IndexOperation.newBuilder().setXIndex("explicit-index").build(); + byte[] document = "{\"field\":\"value\"}".getBytes(StandardCharsets.UTF_8); + + IllegalArgumentException exception = expectThrows( + IllegalArgumentException.class, + () -> BulkRequestParserProtoUtils.buildIndexRequest( + indexOperation, + document, + null, + "default-index", + "default-id", + null, + Versions.MATCH_ANY, + VersionType.INTERNAL, + null, + SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + false, + "default-index", // defaultIndex is not null + false // allowExplicitIndex is false + ) + ); + + assertEquals("explicit index in bulk is not allowed", exception.getMessage()); + } + + /** + * Test buildUpdateRequest with explicit index when allowExplicitIndex is false + */ + public void testBuildUpdateRequestExplicitIndexNotAllowed() { + UpdateOperation updateOperation = UpdateOperation.newBuilder().setXIndex("explicit-index").build(); + byte[] document = "{\"field\":\"value\"}".getBytes(StandardCharsets.UTF_8); + + BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder() + .setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOperation).build()) + .setObject(ByteString.copyFrom(document)) + .build(); + + IllegalArgumentException exception = expectThrows( + IllegalArgumentException.class, + () -> BulkRequestParserProtoUtils.buildUpdateRequest( + updateOperation, + ByteString.copyFrom(document), + bulkRequestBody, + "default-index", + "default-id", + null, + null, + 0, + null, + SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + false, + "default-index", // defaultIndex is not null + false // allowExplicitIndex is false + ) + ); + + assertEquals("explicit index in bulk is not allowed", exception.getMessage()); + } + + /** + * Test buildDeleteRequest with explicit index when allowExplicitIndex is false + */ + public void testBuildDeleteRequestExplicitIndexNotAllowed() { + DeleteOperation deleteOperation = DeleteOperation.newBuilder().setXIndex("explicit-index").build(); + + IllegalArgumentException exception = expectThrows( + IllegalArgumentException.class, + () -> BulkRequestParserProtoUtils.buildDeleteRequest( + deleteOperation, + "default-index", + "default-id", + null, + Versions.MATCH_ANY, + VersionType.INTERNAL, + SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + "default-index", // defaultIndex is not null + false // allowExplicitIndex is false + ) + ); + + assertEquals("explicit index in bulk is not allowed", exception.getMessage()); + } + + /** + * Test buildUpdateRequest with upsert request and pipeline + */ + public void testBuildUpdateRequestWithUpsertAndPipeline() { + UpdateOperation updateOperation = UpdateOperation.newBuilder().setXIndex("test-index").setXId("test-id").build(); + + byte[] document = "{\"field\":\"value\"}".getBytes(StandardCharsets.UTF_8); + byte[] upsertDoc = "{\"upsert_field\":\"upsert_value\"}".getBytes(StandardCharsets.UTF_8); + + BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder() + .setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOperation).build()) + .setObject(ByteString.copyFrom(document)) + .setUpdateAction(org.opensearch.protobufs.UpdateAction.newBuilder().setUpsert(ByteString.copyFrom(upsertDoc)).build()) + .build(); + + UpdateRequest updateRequest = BulkRequestParserProtoUtils.buildUpdateRequest( + updateOperation, + ByteString.copyFrom(document), + bulkRequestBody, + "default-index", + "default-id", + null, + null, + 0, + "test-pipeline", // pipeline + SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + false, + null, + true + ); + + assertNotNull("UpdateRequest should not be null", updateRequest); + assertNotNull("Upsert request should be set", updateRequest.upsertRequest()); + assertEquals("Pipeline should be set on upsert request", "test-pipeline", updateRequest.upsertRequest().getPipeline()); + } + + /** + * Test fromProto with empty document bytes (ByteString.EMPTY) + */ + public void testFromProtoWithEmptyDocumentBytes() { + UpdateRequest updateRequest = new UpdateRequest("test-index", "test-id"); + + BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder() + .setUpdateAction( + org.opensearch.protobufs.UpdateAction.newBuilder() + .setScript( + org.opensearch.protobufs.Script.newBuilder() + .setInline( + org.opensearch.protobufs.InlineScript.newBuilder() + .setSource("ctx._source.counter += 1") + .setLang( + org.opensearch.protobufs.ScriptLanguage.newBuilder() + .setBuiltin(org.opensearch.protobufs.BuiltinScriptLanguage.BUILTIN_SCRIPT_LANGUAGE_PAINLESS) + .build() + ) + .build() + ) + .build() + ) + .build() + ) + .build(); + + // Test with ByteString.EMPTY (no doc field) + UpdateRequest result = BulkRequestParserProtoUtils.fromProto( + updateRequest, + ByteString.EMPTY, + bulkRequestBody, + SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM + ); + + assertNotNull("Result should not be null", result); + assertNotNull("Script should be set", result.script()); + assertNull("Doc should not be set when ByteString is empty", result.doc()); + } + + /** + * Test fromProto with null document bytes + */ + public void testFromProtoWithNullDocumentBytes() { + UpdateRequest updateRequest = new UpdateRequest("test-index", "test-id"); + + BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder() + .setUpdateAction( + org.opensearch.protobufs.UpdateAction.newBuilder() + .setScript( + org.opensearch.protobufs.Script.newBuilder() + .setInline( + org.opensearch.protobufs.InlineScript.newBuilder() + .setSource("ctx._source.counter += 1") + .setLang( + org.opensearch.protobufs.ScriptLanguage.newBuilder() + .setBuiltin(org.opensearch.protobufs.BuiltinScriptLanguage.BUILTIN_SCRIPT_LANGUAGE_PAINLESS) + .build() + ) + .build() + ) + .build() + ) + .build() + ) + .build(); + + // Test with null documentBytes + UpdateRequest result = BulkRequestParserProtoUtils.fromProto( + updateRequest, + null, + bulkRequestBody, + SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM + ); + + assertNotNull("Result should not be null", result); + assertNotNull("Script should be set", result.script()); + assertNull("Doc should not be set when documentBytes is null", result.doc()); + } + + /** + * Test getDocWriteRequests with update operation using UpdateAction.doc field + */ + public void testGetDocWriteRequestsWithUpdateActionDoc() { + UpdateOperation updateOp = UpdateOperation.newBuilder().setXIndex("test-index").setXId("test-id").build(); + + byte[] document = "{\"field\":\"value\"}".getBytes(StandardCharsets.UTF_8); + + BulkRequestBody updateBody = BulkRequestBody.newBuilder() + .setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOp).build()) + .setUpdateAction(org.opensearch.protobufs.UpdateAction.newBuilder().setDoc(ByteString.copyFrom(document)).build()) + .build(); + + BulkRequest request = BulkRequest.newBuilder().addBulkRequestBody(updateBody).build(); + + DocWriteRequest[] requests = BulkRequestParserProtoUtils.getDocWriteRequests( + request, + "default-index", + null, + null, + null, + false, + true + ); + + assertNotNull("Requests should not be null", requests); + assertEquals("Should have 1 request", 1, requests.length); + assertTrue("Request should be an UpdateRequest", requests[0] instanceof UpdateRequest); + + UpdateRequest updateRequest = (UpdateRequest) requests[0]; + assertNotNull("Doc should be set from UpdateAction.doc", updateRequest.doc()); + } + + /** + * Test valueOrDefault for String with null value and non-null globalDefault + */ + public void testValueOrDefaultStringWithNullValue() { + UpdateOperation updateOp = UpdateOperation.newBuilder().setXIndex("test-index").setXId("test-id").build(); + + byte[] document = "{\"field\":\"value\"}".getBytes(StandardCharsets.UTF_8); + + BulkRequestBody updateBody = BulkRequestBody.newBuilder() + .setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOp).build()) + .setObject(ByteString.copyFrom(document)) + .build(); + + BulkRequest request = BulkRequest.newBuilder() + .addBulkRequestBody(updateBody) + .setRouting("global-routing") + .setPipeline("global-pipeline") + .build(); + + DocWriteRequest[] requests = BulkRequestParserProtoUtils.getDocWriteRequests( + request, + "default-index", + null, // defaultRouting is null, should use global routing + null, + null, // defaultPipeline is null, should use global pipeline + false, + true + ); + + assertNotNull("Requests should not be null", requests); + assertEquals("Should have 1 request", 1, requests.length); + + UpdateRequest updateRequest = (UpdateRequest) requests[0]; + assertEquals("Routing should use global value", "global-routing", updateRequest.routing()); + } + + /** + * Test valueOrDefault for Boolean with null value and non-null globalDefault + */ + public void testValueOrDefaultBooleanWithNullValue() { + IndexOperation indexOp = IndexOperation.newBuilder().setXIndex("test-index").setXId("test-id").build(); + + BulkRequestBody indexBody = BulkRequestBody.newBuilder() + .setOperationContainer(OperationContainer.newBuilder().setIndex(indexOp).build()) + .setObject(ByteString.copyFromUtf8("{\"field\":\"value\"}")) + .build(); + + BulkRequest request = BulkRequest.newBuilder().addBulkRequestBody(indexBody).setRequireAlias(true).build(); + + DocWriteRequest[] requests = BulkRequestParserProtoUtils.getDocWriteRequests( + request, + "default-index", + null, + null, + null, + null, // defaultRequireAlias is null, should use global requireAlias + true + ); + + assertNotNull("Requests should not be null", requests); + assertEquals("Should have 1 request", 1, requests.length); + + IndexRequest indexRequest = (IndexRequest) requests[0]; + assertTrue("RequireAlias should use global value", indexRequest.isRequireAlias()); + } } From 2a16f306b68816582e0c0ff5ec208769b73fa448 Mon Sep 17 00:00:00 2001 From: Karen Xu Date: Wed, 12 Nov 2025 17:07:12 +0000 Subject: [PATCH 5/6] remove allowExplicitIndex Signed-off-by: Karen Xu --- .../opensearch/transport/grpc/GrpcPlugin.java | 4 +- .../bulk/BulkRequestParserProtoUtils.java | 62 +---- .../document/bulk/BulkRequestProtoUtils.java | 16 +- .../grpc/services/DocumentServiceImpl.java | 8 +- .../BulkRequestParserProtoUtilsTests.java | 247 +++--------------- .../bulk/BulkRequestProtoUtilsTests.java | 17 +- .../services/BulkRequestProtoUtilsTests.java | 9 +- .../document/DocumentServiceImplTests.java | 3 +- 8 files changed, 67 insertions(+), 299 deletions(-) diff --git a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java index fa736b1f30f14..d8ed8fee05a28 100644 --- a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java +++ b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java @@ -210,7 +210,7 @@ public Map> getAuxTransports( return Collections.singletonMap(GRPC_TRANSPORT_SETTING_KEY, () -> { List grpcServices = new ArrayList<>( - List.of(new DocumentServiceImpl(client, settings), new SearchServiceImpl(client, queryUtils)) + List.of(new DocumentServiceImpl(client), new SearchServiceImpl(client, queryUtils)) ); for (GrpcServiceFactory serviceFac : servicesFactory) { List pluginServices = serviceFac.initClient(client) @@ -261,7 +261,7 @@ public Map> getSecureAuxTransports( } return Collections.singletonMap(GRPC_SECURE_TRANSPORT_SETTING_KEY, () -> { List grpcServices = new ArrayList<>( - List.of(new DocumentServiceImpl(client, settings), new SearchServiceImpl(client, queryUtils)) + List.of(new DocumentServiceImpl(client), new SearchServiceImpl(client, queryUtils)) ); for (GrpcServiceFactory serviceFac : servicesFactory) { List pluginServices = serviceFac.initClient(client) diff --git a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java index a6ee9fb5dc994..6b381caa2db07 100644 --- a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java +++ b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java @@ -107,7 +107,6 @@ static MediaType detectMediaType(byte[] document) { * @param defaultFetchSourceContext * @param defaultPipeline * @param defaultRequireAlias - * @param allowExplicitIndex whether explicit index specification is allowed (security setting) * @return */ public static DocWriteRequest[] getDocWriteRequests( @@ -116,8 +115,7 @@ public static DocWriteRequest[] getDocWriteRequests( String defaultRouting, FetchSourceContext defaultFetchSourceContext, String defaultPipeline, - Boolean defaultRequireAlias, - boolean allowExplicitIndex + Boolean defaultRequireAlias ) { List bulkRequestBodyList = request.getBulkRequestBodyList(); DocWriteRequest[] docWriteRequests = new DocWriteRequest[bulkRequestBodyList.size()]; @@ -155,9 +153,7 @@ public static DocWriteRequest[] getDocWriteRequests( pipeline, ifSeqNo, ifPrimaryTerm, - requireAlias, - defaultIndex, - allowExplicitIndex + requireAlias ); break; case INDEX: @@ -173,13 +169,11 @@ public static DocWriteRequest[] getDocWriteRequests( pipeline, ifSeqNo, ifPrimaryTerm, - requireAlias, - defaultIndex, - allowExplicitIndex + requireAlias ); break; case UPDATE: - // Extract the doc field from UpdateAction (matches REST API structure) + // Extract the doc field from UpdateAction // Use ByteString directly to avoid unnecessary byte array allocation ByteString updateDocBytes = ByteString.EMPTY; if (bulkRequestBodyEntry.hasUpdateAction() && bulkRequestBodyEntry.getUpdateAction().hasDoc()) { @@ -201,9 +195,7 @@ public static DocWriteRequest[] getDocWriteRequests( pipeline, ifSeqNo, ifPrimaryTerm, - requireAlias, - defaultIndex, - allowExplicitIndex + requireAlias ); break; case DELETE: @@ -215,9 +207,7 @@ public static DocWriteRequest[] getDocWriteRequests( version, versionType, ifSeqNo, - ifPrimaryTerm, - defaultIndex, - allowExplicitIndex + ifPrimaryTerm ); break; case OPERATIONCONTAINER_NOT_SET: @@ -246,8 +236,6 @@ public static DocWriteRequest[] getDocWriteRequests( * @param ifSeqNo The default sequence number for optimistic concurrency control * @param ifPrimaryTerm The default primary term for optimistic concurrency control * @param requireAlias Whether the index must be an alias - * @param defaultIndex The default index from the request URL (for security check) - * @param allowExplicitIndex Whether explicit index specification is allowed * @return The constructed IndexRequest */ public static IndexRequest buildCreateRequest( @@ -261,15 +249,9 @@ public static IndexRequest buildCreateRequest( String pipeline, long ifSeqNo, long ifPrimaryTerm, - boolean requireAlias, - String defaultIndex, - boolean allowExplicitIndex + boolean requireAlias ) { - // Check explicit index (matches REST BulkRequestParser line 218-221) if (createOperation.hasXIndex()) { - if (!allowExplicitIndex && defaultIndex != null) { - throw new IllegalArgumentException("explicit index in bulk is not allowed"); - } index = createOperation.getXIndex(); } @@ -307,8 +289,6 @@ public static IndexRequest buildCreateRequest( * @param ifSeqNo The default sequence number for optimistic concurrency control * @param ifPrimaryTerm The default primary term for optimistic concurrency control * @param requireAlias Whether the index must be an alias - * @param defaultIndex The default index from the request URL (for security check) - * @param allowExplicitIndex Whether explicit index specification is allowed * @return The constructed IndexRequest */ public static IndexRequest buildIndexRequest( @@ -323,17 +303,11 @@ public static IndexRequest buildIndexRequest( String pipeline, long ifSeqNo, long ifPrimaryTerm, - boolean requireAlias, - String defaultIndex, - boolean allowExplicitIndex + boolean requireAlias ) { opType = indexOperation.hasOpType() ? indexOperation.getOpType() : opType; - // Check explicit index (matches REST BulkRequestParser line 218-221) if (indexOperation.hasXIndex()) { - if (!allowExplicitIndex && defaultIndex != null) { - throw new IllegalArgumentException("explicit index in bulk is not allowed"); - } index = indexOperation.getXIndex(); } @@ -390,8 +364,6 @@ public static IndexRequest buildIndexRequest( * @param ifSeqNo The default sequence number for optimistic concurrency control * @param ifPrimaryTerm The default primary term for optimistic concurrency control * @param requireAlias Whether the index must be an alias - * @param defaultIndex The default index from the request URL (for security check) - * @param allowExplicitIndex Whether explicit index specification is allowed * @return The constructed UpdateRequest */ public static UpdateRequest buildUpdateRequest( @@ -406,15 +378,9 @@ public static UpdateRequest buildUpdateRequest( String pipeline, long ifSeqNo, long ifPrimaryTerm, - boolean requireAlias, - String defaultIndex, - boolean allowExplicitIndex + boolean requireAlias ) { - // Check explicit index (matches REST BulkRequestParser line 218-221) if (updateOperation.hasXIndex()) { - if (!allowExplicitIndex && defaultIndex != null) { - throw new IllegalArgumentException("explicit index in bulk is not allowed"); - } index = updateOperation.getXIndex(); } @@ -550,8 +516,6 @@ static UpdateRequest fromProto( * @param versionType The default version type * @param ifSeqNo The default sequence number for optimistic concurrency control * @param ifPrimaryTerm The default primary term for optimistic concurrency control - * @param defaultIndex The default index from the request URL (for security check) - * @param allowExplicitIndex Whether explicit index specification is allowed * @return The constructed DeleteRequest */ public static DeleteRequest buildDeleteRequest( @@ -562,15 +526,9 @@ public static DeleteRequest buildDeleteRequest( long version, VersionType versionType, long ifSeqNo, - long ifPrimaryTerm, - String defaultIndex, - boolean allowExplicitIndex + long ifPrimaryTerm ) { - // Check explicit index (matches REST BulkRequestParser line 218-221) if (deleteOperation.hasXIndex()) { - if (!allowExplicitIndex && defaultIndex != null) { - throw new IllegalArgumentException("explicit index in bulk is not allowed"); - } index = deleteOperation.getXIndex(); } diff --git a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtils.java b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtils.java index b796de86d15b4..6d4ee623b066b 100644 --- a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtils.java +++ b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtils.java @@ -9,7 +9,6 @@ package org.opensearch.transport.grpc.proto.request.document.bulk; import org.opensearch.action.bulk.BulkShardRequest; -import org.opensearch.common.settings.Settings; import org.opensearch.protobufs.BulkRequest; import org.opensearch.rest.RestRequest; import org.opensearch.rest.action.document.RestBulkAction; @@ -37,11 +36,16 @@ private BulkRequestProtoUtils() { * Similar to {@link RestBulkAction#prepareRequest(RestRequest, NodeClient)} * Please ensure to keep both implementations consistent. * + * Note: Unlike REST API, gRPC does not enforce the allowExplicitIndex security setting. + * In REST, this setting provides network-level security by allowing proxies to filter + * requests based on URL paths. In gRPC, both default_index and x_index are in the + * request body, making this check ineffective for network-level security. + * For gRPC security, use mTLS, gRPC interceptors, or service mesh policies instead. + * * @param request the request to execute - * @param settings node settings for security and configuration * @return a future of the bulk action that was executed */ - public static org.opensearch.action.bulk.BulkRequest prepareRequest(BulkRequest request, Settings settings) { + public static org.opensearch.action.bulk.BulkRequest prepareRequest(BulkRequest request) { org.opensearch.action.bulk.BulkRequest bulkRequest = Requests.bulkRequest(); String defaultIndex = request.hasIndex() ? request.getIndex() : null; @@ -62,9 +66,6 @@ public static org.opensearch.action.bulk.BulkRequest prepareRequest(BulkRequest bulkRequest.setRefreshPolicy(RefreshProtoUtils.getRefreshPolicy(request.getRefresh())); - // Read the allowExplicitIndex setting (matches REST BulkAction line 74) - boolean allowExplicitIndex = RestBulkAction.MULTI_ALLOW_EXPLICIT_INDEX.get(settings); - // Note: batch_size is deprecated in OS 3.x. Add batch_size parameter when backporting to OS 2.x /* if (request.hasBatchSize()){ @@ -80,8 +81,7 @@ public static org.opensearch.action.bulk.BulkRequest prepareRequest(BulkRequest defaultRouting, defaultFetchSourceContext, defaultPipeline, - defaultRequireAlias, - allowExplicitIndex + defaultRequireAlias ) ); diff --git a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java index 6302fe1f260f6..e1348bc78961f 100644 --- a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java +++ b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java @@ -10,7 +10,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.common.settings.Settings; import org.opensearch.protobufs.services.DocumentServiceGrpc; import org.opensearch.transport.client.Client; import org.opensearch.transport.grpc.listeners.BulkRequestActionListener; @@ -26,17 +25,14 @@ public class DocumentServiceImpl extends DocumentServiceGrpc.DocumentServiceImplBase { private static final Logger logger = LogManager.getLogger(DocumentServiceImpl.class); private final Client client; - private final Settings settings; /** * Creates a new DocumentServiceImpl. * * @param client Client for executing actions on the local node - * @param settings Node settings for security and configuration */ - public DocumentServiceImpl(Client client, Settings settings) { + public DocumentServiceImpl(Client client) { this.client = client; - this.settings = settings; } /** @@ -48,7 +44,7 @@ public DocumentServiceImpl(Client client, Settings settings) { @Override public void bulk(org.opensearch.protobufs.BulkRequest request, StreamObserver responseObserver) { try { - org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, settings); + org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request); BulkRequestActionListener listener = new BulkRequestActionListener(responseObserver); client.bulk(bulkRequest, listener); } catch (RuntimeException e) { diff --git a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtilsTests.java b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtilsTests.java index acd435d7b265f..5d8013b34ba14 100644 --- a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtilsTests.java +++ b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtilsTests.java @@ -54,9 +54,7 @@ public void testBuildCreateRequest() { "default-pipeline", 1L, 2L, - false, - null, - true + false ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -99,9 +97,7 @@ public void testBuildIndexRequest() { "default-pipeline", 1L, 2L, - false, - null, - true + false ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -136,9 +132,7 @@ public void testBuildIndexRequestWithOpType() { "default-pipeline", SequenceNumbers.UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, - false, - null, - true + false ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -166,9 +160,7 @@ public void testBuildDeleteRequest() { 1L, VersionType.INTERNAL, 1L, - 2L, - null, - true + 2L ); assertNotNull("DeleteRequest should not be null", deleteRequest); @@ -212,9 +204,7 @@ public void testBuildUpdateRequest() { "default-pipeline", 1L, 2L, - false, - null, - true + false ); assertNotNull("UpdateRequest should not be null", updateRequest); @@ -267,8 +257,7 @@ public void testGetDocWriteRequests() { "default-routing", null, "default-pipeline", - false, - true + false ); assertNotNull("Requests should not be null", requests); @@ -312,8 +301,7 @@ public void testGetDocWriteRequestsWithInvalidOperation() { "default-routing", null, "default-pipeline", - false, - true + false ) ); } @@ -334,9 +322,7 @@ public void testBuildCreateRequestWithDefaults() { "default-pipeline", 1L, 2L, - false, - null, - true + false ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -363,9 +349,7 @@ public void testBuildCreateRequestWithPipeline() { "default-pipeline", 1L, 2L, - false, - null, - true + false ); assertEquals("Pipeline should use custom value", "custom-pipeline", indexRequest.getPipeline()); @@ -398,9 +382,7 @@ public void testBuildIndexRequestWithAllFields() { "default-pipeline", 1L, 2L, - false, - null, - true + false ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -433,9 +415,7 @@ public void testBuildIndexRequestWithNullOpType() { "default-pipeline", 1L, 2L, - false, - null, - true + false ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -484,9 +464,7 @@ public void testBuildUpdateRequestWithScript() { "default-pipeline", 1L, 2L, - false, - null, - true + false ); assertNotNull("UpdateRequest should not be null", updateRequest); @@ -518,9 +496,7 @@ public void testBuildUpdateRequestWithUpsert() { "default-pipeline", 1L, 2L, - false, - null, - true + false ); assertNotNull("UpdateRequest should not be null", updateRequest); @@ -550,9 +526,7 @@ public void testBuildUpdateRequestWithScriptedUpsert() { "default-pipeline", 1L, 2L, - false, - null, - true + false ); assertNotNull("UpdateRequest should not be null", updateRequest); @@ -586,9 +560,7 @@ public void testBuildUpdateRequestWithFetchSource() { "default-pipeline", 1L, 2L, - false, - null, - true + false ); assertNotNull("UpdateRequest should not be null", updateRequest); @@ -617,9 +589,7 @@ public void testBuildUpdateRequestWithoutUpdateAction() { "default-pipeline", 1L, 2L, - false, - null, - true + false ); assertNotNull("UpdateRequest should not be null", updateRequest); @@ -638,9 +608,7 @@ public void testBuildDeleteRequestWithDefaults() { 1L, VersionType.INTERNAL, 1L, - 2L, - null, - true + 2L ); assertNotNull("DeleteRequest should not be null", deleteRequest); @@ -674,8 +642,7 @@ public void testGetDocWriteRequestsWithGlobalValues() { null, // Pass null to test global routing null, null, // Pass null to test global pipeline - null, // Pass null to test global requireAlias - true + null // Pass null to test global requireAlias ); assertNotNull("Requests should not be null", requests); @@ -699,8 +666,7 @@ public void testGetDocWriteRequestsWithEmptyList() { "default-routing", null, "default-pipeline", - false, - true + false ); assertNotNull("Requests should not be null", requests); @@ -775,9 +741,7 @@ public void testBuildCreateRequestWithSmileContent() throws Exception { null, SequenceNumbers.UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, - false, - null, - true + false ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -805,9 +769,7 @@ public void testBuildCreateRequestWithCborContent() throws Exception { null, SequenceNumbers.UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, - false, - null, - true + false ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -836,9 +798,7 @@ public void testBuildIndexRequestWithSmileContent() throws Exception { null, SequenceNumbers.UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, - false, - null, - true + false ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -866,9 +826,7 @@ public void testBuildIndexRequestWithCborContent() throws Exception { null, SequenceNumbers.UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, - false, - null, - true + false ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -919,9 +877,7 @@ public void testBuildCreateRequestWithEmptyDocument() { null, SequenceNumbers.UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, - false, - null, - true + false ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -947,9 +903,7 @@ public void testBuildCreateRequestWithJsonContent() throws Exception { null, SequenceNumbers.UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, - false, - null, - true + false ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -977,9 +931,7 @@ public void testBuildCreateRequestWithYamlContent() throws Exception { null, SequenceNumbers.UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, - false, - null, - true + false ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -1008,9 +960,7 @@ public void testBuildIndexRequestWithJsonContent() throws Exception { null, SequenceNumbers.UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, - false, - null, - true + false ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -1038,9 +988,7 @@ public void testBuildIndexRequestWithYamlContent() throws Exception { null, SequenceNumbers.UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, - false, - null, - true + false ); assertNotNull("IndexRequest should not be null", indexRequest); @@ -1103,125 +1051,6 @@ public void testDetectMediaTypeUnrecognizable() { assertEquals("application/json", result.mediaTypeWithoutParameters()); } - /** - * Test buildCreateRequest with explicit index when allowExplicitIndex is false - */ - public void testBuildCreateRequestExplicitIndexNotAllowed() { - WriteOperation writeOperation = WriteOperation.newBuilder().setXIndex("explicit-index").build(); - byte[] document = "{\"field\":\"value\"}".getBytes(StandardCharsets.UTF_8); - - IllegalArgumentException exception = expectThrows( - IllegalArgumentException.class, - () -> BulkRequestParserProtoUtils.buildCreateRequest( - writeOperation, - document, - "default-index", - "default-id", - null, - Versions.MATCH_ANY, - VersionType.INTERNAL, - null, - SequenceNumbers.UNASSIGNED_SEQ_NO, - UNASSIGNED_PRIMARY_TERM, - false, - "default-index", // defaultIndex is not null - false // allowExplicitIndex is false - ) - ); - - assertEquals("explicit index in bulk is not allowed", exception.getMessage()); - } - - /** - * Test buildIndexRequest with explicit index when allowExplicitIndex is false - */ - public void testBuildIndexRequestExplicitIndexNotAllowed() { - IndexOperation indexOperation = IndexOperation.newBuilder().setXIndex("explicit-index").build(); - byte[] document = "{\"field\":\"value\"}".getBytes(StandardCharsets.UTF_8); - - IllegalArgumentException exception = expectThrows( - IllegalArgumentException.class, - () -> BulkRequestParserProtoUtils.buildIndexRequest( - indexOperation, - document, - null, - "default-index", - "default-id", - null, - Versions.MATCH_ANY, - VersionType.INTERNAL, - null, - SequenceNumbers.UNASSIGNED_SEQ_NO, - UNASSIGNED_PRIMARY_TERM, - false, - "default-index", // defaultIndex is not null - false // allowExplicitIndex is false - ) - ); - - assertEquals("explicit index in bulk is not allowed", exception.getMessage()); - } - - /** - * Test buildUpdateRequest with explicit index when allowExplicitIndex is false - */ - public void testBuildUpdateRequestExplicitIndexNotAllowed() { - UpdateOperation updateOperation = UpdateOperation.newBuilder().setXIndex("explicit-index").build(); - byte[] document = "{\"field\":\"value\"}".getBytes(StandardCharsets.UTF_8); - - BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder() - .setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOperation).build()) - .setObject(ByteString.copyFrom(document)) - .build(); - - IllegalArgumentException exception = expectThrows( - IllegalArgumentException.class, - () -> BulkRequestParserProtoUtils.buildUpdateRequest( - updateOperation, - ByteString.copyFrom(document), - bulkRequestBody, - "default-index", - "default-id", - null, - null, - 0, - null, - SequenceNumbers.UNASSIGNED_SEQ_NO, - UNASSIGNED_PRIMARY_TERM, - false, - "default-index", // defaultIndex is not null - false // allowExplicitIndex is false - ) - ); - - assertEquals("explicit index in bulk is not allowed", exception.getMessage()); - } - - /** - * Test buildDeleteRequest with explicit index when allowExplicitIndex is false - */ - public void testBuildDeleteRequestExplicitIndexNotAllowed() { - DeleteOperation deleteOperation = DeleteOperation.newBuilder().setXIndex("explicit-index").build(); - - IllegalArgumentException exception = expectThrows( - IllegalArgumentException.class, - () -> BulkRequestParserProtoUtils.buildDeleteRequest( - deleteOperation, - "default-index", - "default-id", - null, - Versions.MATCH_ANY, - VersionType.INTERNAL, - SequenceNumbers.UNASSIGNED_SEQ_NO, - UNASSIGNED_PRIMARY_TERM, - "default-index", // defaultIndex is not null - false // allowExplicitIndex is false - ) - ); - - assertEquals("explicit index in bulk is not allowed", exception.getMessage()); - } - /** * Test buildUpdateRequest with upsert request and pipeline */ @@ -1249,9 +1078,7 @@ public void testBuildUpdateRequestWithUpsertAndPipeline() { "test-pipeline", // pipeline SequenceNumbers.UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, - false, - null, - true + false ); assertNotNull("UpdateRequest should not be null", updateRequest); @@ -1356,15 +1183,7 @@ public void testGetDocWriteRequestsWithUpdateActionDoc() { BulkRequest request = BulkRequest.newBuilder().addBulkRequestBody(updateBody).build(); - DocWriteRequest[] requests = BulkRequestParserProtoUtils.getDocWriteRequests( - request, - "default-index", - null, - null, - null, - false, - true - ); + DocWriteRequest[] requests = BulkRequestParserProtoUtils.getDocWriteRequests(request, "default-index", null, null, null, false); assertNotNull("Requests should not be null", requests); assertEquals("Should have 1 request", 1, requests.length); @@ -1399,8 +1218,7 @@ public void testValueOrDefaultStringWithNullValue() { null, // defaultRouting is null, should use global routing null, null, // defaultPipeline is null, should use global pipeline - false, - true + false ); assertNotNull("Requests should not be null", requests); @@ -1429,8 +1247,7 @@ public void testValueOrDefaultBooleanWithNullValue() { null, null, null, - null, // defaultRequireAlias is null, should use global requireAlias - true + null // defaultRequireAlias is null, should use global requireAlias ); assertNotNull("Requests should not be null", requests); diff --git a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtilsTests.java b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtilsTests.java index dd62edd50acd9..df22a8edd83d9 100644 --- a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtilsTests.java +++ b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtilsTests.java @@ -9,7 +9,6 @@ package org.opensearch.transport.grpc.proto.request.document.bulk; import org.opensearch.action.support.WriteRequest; -import org.opensearch.common.settings.Settings; import org.opensearch.protobufs.BulkRequest; import org.opensearch.test.OpenSearchTestCase; @@ -27,7 +26,7 @@ public void testPrepareRequestWithBasicSettings() { .build(); // Call prepareRequest - org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, Settings.EMPTY); + org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request); // Verify the result assertNotNull("BulkRequest should not be null", bulkRequest); @@ -40,7 +39,7 @@ public void testPrepareRequestWithDefaultValues() { BulkRequest request = BulkRequest.newBuilder().build(); // Call prepareRequest - org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, Settings.EMPTY); + org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request); // Verify the result assertNotNull("BulkRequest should not be null", bulkRequest); @@ -53,7 +52,7 @@ public void testPrepareRequestWithTimeout() throws ParseException { BulkRequest request = BulkRequest.newBuilder().setTimeout("5s").build(); // Call prepareRequest - org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, Settings.EMPTY); + org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request); // Verify the result assertNotNull("BulkRequest should not be null", bulkRequest); @@ -70,7 +69,7 @@ public void testPrepareRequestWithWaitForActiveShards() { BulkRequest request = BulkRequest.newBuilder().setWaitForActiveShards(waitForActiveShards).build(); // Call prepareRequest - org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, Settings.EMPTY); + org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request); // Verify the result assertNotNull("BulkRequest should not be null", bulkRequest); @@ -83,7 +82,7 @@ public void testPrepareRequestWithRequireAlias() { BulkRequest request = BulkRequest.newBuilder().setRequireAlias(true).build(); // Call prepareRequest - org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, Settings.EMPTY); + org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request); // Verify the result assertNotNull("BulkRequest should not be null", bulkRequest); @@ -96,7 +95,7 @@ public void testPrepareRequestWithPipeline() { BulkRequest request = BulkRequest.newBuilder().setPipeline("test-pipeline").build(); // Call prepareRequest - org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, Settings.EMPTY); + org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request); // Verify the result assertNotNull("BulkRequest should not be null", bulkRequest); @@ -109,7 +108,7 @@ public void testPrepareRequestWithRefreshWait() { BulkRequest request = BulkRequest.newBuilder().setRefresh(org.opensearch.protobufs.Refresh.REFRESH_WAIT_FOR).build(); // Call prepareRequest - org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, Settings.EMPTY); + org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request); // Verify the result assertNotNull("BulkRequest should not be null", bulkRequest); @@ -121,7 +120,7 @@ public void testPrepareRequestWithRefreshFalse() { BulkRequest request = BulkRequest.newBuilder().setRefresh(org.opensearch.protobufs.Refresh.REFRESH_FALSE).build(); // Call prepareRequest - org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, Settings.EMPTY); + org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request); // Verify the result assertNotNull("BulkRequest should not be null", bulkRequest); diff --git a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/BulkRequestProtoUtilsTests.java b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/BulkRequestProtoUtilsTests.java index ceb2db940666a..5a5055320ee99 100644 --- a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/BulkRequestProtoUtilsTests.java +++ b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/BulkRequestProtoUtilsTests.java @@ -11,7 +11,6 @@ import org.opensearch.action.DocWriteRequest; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.WriteRequest; -import org.opensearch.common.settings.Settings; import org.opensearch.protobufs.BulkRequest; import org.opensearch.protobufs.BulkRequestBody; import org.opensearch.protobufs.DeleteOperation; @@ -44,7 +43,7 @@ public void testPrepareRequestWithIndexOperation() throws IOException { BulkRequest request = createBulkRequestWithIndexOperation(); // Convert to OpenSearch BulkRequest - org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, Settings.EMPTY); + org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request); // Verify the converted request assertEquals("Should have 1 request", 1, bulkRequest.numberOfActions()); @@ -65,7 +64,7 @@ public void testPrepareRequestWithCreateOperation() throws IOException { BulkRequest request = createBulkRequestWithCreateOperation(); // Convert to OpenSearch BulkRequest - org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, Settings.EMPTY); + org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request); // Verify the converted request assertEquals("Should have 1 request", 1, bulkRequest.numberOfActions()); @@ -82,7 +81,7 @@ public void testPrepareRequestWithDeleteOperation() throws IOException { BulkRequest request = createBulkRequestWithDeleteOperation(); // Convert to OpenSearch BulkRequest - org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, Settings.EMPTY); + org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request); // Verify the converted request assertEquals("Should have 1 request", 1, bulkRequest.numberOfActions()); @@ -98,7 +97,7 @@ public void testPrepareRequestWithUpdateOperation() throws IOException { BulkRequest request = createBulkRequestWithUpdateOperation(); // Convert to OpenSearch BulkRequest - org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request, Settings.EMPTY); + org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request); // Verify the converted request assertEquals("Should have 1 request", 1, bulkRequest.numberOfActions()); diff --git a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/document/DocumentServiceImplTests.java b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/document/DocumentServiceImplTests.java index eb2e4d95f309a..3215238247bbd 100644 --- a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/document/DocumentServiceImplTests.java +++ b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/document/DocumentServiceImplTests.java @@ -9,7 +9,6 @@ package org.opensearch.transport.grpc.services.document; import com.google.protobuf.ByteString; -import org.opensearch.common.settings.Settings; import org.opensearch.protobufs.BulkRequest; import org.opensearch.protobufs.BulkRequestBody; import org.opensearch.protobufs.IndexOperation; @@ -41,7 +40,7 @@ public class DocumentServiceImplTests extends OpenSearchTestCase { @Before public void setup() throws IOException { MockitoAnnotations.openMocks(this); - service = new DocumentServiceImpl(client, Settings.EMPTY); + service = new DocumentServiceImpl(client); } public void testBulkSuccess() throws IOException { From 3b80f634b584d7a3f477f892412d0e425bb4f4c4 Mon Sep 17 00:00:00 2001 From: Karen Xu Date: Thu, 13 Nov 2025 00:03:41 +0000 Subject: [PATCH 6/6] add bytesref to index/create ops too Signed-off-by: Karen Xu --- .../bulk/BulkRequestParserProtoUtils.java | 95 ++++++++++++++----- .../BulkRequestParserProtoUtilsTests.java | 77 +++++++-------- 2 files changed, 113 insertions(+), 59 deletions(-) diff --git a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java index 6b381caa2db07..0bd71c417d64b 100644 --- a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java +++ b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java @@ -9,6 +9,7 @@ package org.opensearch.transport.grpc.proto.request.document.bulk; import com.google.protobuf.ByteString; +import org.apache.lucene.util.BytesRef; import org.opensearch.action.DocWriteRequest; import org.opensearch.action.bulk.BulkRequestParser; import org.opensearch.action.delete.DeleteRequest; @@ -16,6 +17,7 @@ import org.opensearch.action.update.UpdateRequest; import org.opensearch.common.lucene.uid.Versions; import org.opensearch.core.common.Strings; +import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.xcontent.MediaType; import org.opensearch.core.xcontent.MediaTypeRegistry; @@ -32,10 +34,12 @@ import org.opensearch.protobufs.UpdateOperation; import org.opensearch.protobufs.WriteOperation; import org.opensearch.script.Script; +import org.opensearch.search.SearchHit; import org.opensearch.search.fetch.subphase.FetchSourceContext; import org.opensearch.transport.grpc.proto.request.common.FetchSourceContextProtoUtils; import org.opensearch.transport.grpc.proto.request.common.ScriptProtoUtils; import org.opensearch.transport.grpc.proto.response.document.common.VersionTypeProtoUtils; +import org.opensearch.transport.grpc.proto.response.search.SearchHitProtoUtils; import java.util.List; import java.util.Objects; @@ -84,17 +88,60 @@ private static Boolean valueOrDefault(Boolean value, Boolean globalDefault) { } /** - * Detects the media type from the byte content, with fallback to JSON if detection fails. + * Converts a protobuf ByteString to OpenSearch BytesReference with zero-copy optimization. + * + * This method uses asReadOnlyByteBuffer() to get a view of the ByteString's internal data + * without copying. The ByteBuffer is then converted to a BytesRef which wraps the underlying + * byte array. This approach matches the pattern used in + * {@link SearchHitProtoUtils#processSource(SearchHit, org.opensearch.protobufs.HitsMetadataHitsInner.Builder)}. + * + * @param byteString The protobuf ByteString to convert + * @return A BytesReference wrapping the ByteString data without copying + */ + private static BytesReference byteStringToBytesReference(ByteString byteString) { + if (byteString == null || byteString.isEmpty()) { + return BytesArray.EMPTY; + } + + // Use asReadOnlyByteBuffer() to get a zero-copy view of the ByteString's internal data + // Then extract the backing array and wrap it in BytesArray + java.nio.ByteBuffer buffer = byteString.asReadOnlyByteBuffer(); + + if (buffer.hasArray()) { + // Fast path: ByteBuffer is backed by an array + byte[] array = buffer.array(); + int offset = buffer.arrayOffset() + buffer.position(); + int length = buffer.remaining(); + + if (offset == 0 && length == array.length) { + // No offset, can use the array directly + return new BytesArray(array); + } else { + // Has offset or partial array + return new BytesArray(array, offset, length); + } + } else { + // Fallback: ByteBuffer is not array-backed (rare case) + // Must copy in this case + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + return new BytesArray(bytes); + } + } + + /** + * Detects the media type from BytesReference content, with fallback to JSON if detection fails. * This enables support for JSON, SMILE, and CBOR formats in gRPC bulk requests. * - * @param document The document content as bytes + * @param document The document content as BytesReference * @return The detected MediaType, or JSON if detection fails or document is empty */ - static MediaType detectMediaType(byte[] document) { - if (document == null || document.length == 0) { + static MediaType detectMediaType(BytesReference document) { + if (document == null || document.length() == 0) { return MediaTypeRegistry.JSON; } - MediaType detectedType = MediaTypeRegistry.mediaTypeFromBytes(document, 0, document.length); + BytesRef bytesRef = document.toBytesRef(); + MediaType detectedType = MediaTypeRegistry.mediaTypeFromBytes(bytesRef.bytes, bytesRef.offset, bytesRef.length); return detectedType != null ? detectedType : MediaTypeRegistry.JSON; } @@ -144,7 +191,7 @@ public static DocWriteRequest[] getDocWriteRequests( case CREATE: docWriteRequest = buildCreateRequest( operationContainer.getCreate(), - bulkRequestBodyEntry.getObject().toByteArray(), + bulkRequestBodyEntry.getObject(), index, id, routing, @@ -159,7 +206,7 @@ public static DocWriteRequest[] getDocWriteRequests( case INDEX: docWriteRequest = buildIndexRequest( operationContainer.getIndex(), - bulkRequestBodyEntry.getObject().toByteArray(), + bulkRequestBodyEntry.getObject(), opType, index, id, @@ -226,7 +273,7 @@ public static DocWriteRequest[] getDocWriteRequests( * Builds an IndexRequest with create flag set to true from a CreateOperation protobuf message. * * @param createOperation The create operation protobuf message - * @param document The document content as bytes + * @param documentBytes The document content as ByteString (zero-copy reference) * @param index The default index name * @param id The default document ID * @param routing The default routing value @@ -240,7 +287,7 @@ public static DocWriteRequest[] getDocWriteRequests( */ public static IndexRequest buildCreateRequest( WriteOperation createOperation, - byte[] document, + ByteString documentBytes, String index, String id, String routing, @@ -260,7 +307,8 @@ public static IndexRequest buildCreateRequest( pipeline = createOperation.hasPipeline() ? createOperation.getPipeline() : pipeline; requireAlias = createOperation.hasRequireAlias() ? createOperation.getRequireAlias() : requireAlias; - MediaType mediaType = detectMediaType(document); + BytesReference documentRef = byteStringToBytesReference(documentBytes); + MediaType mediaType = detectMediaType(documentRef); IndexRequest indexRequest = new IndexRequest(index).id(id) .routing(routing) .version(version) @@ -269,7 +317,7 @@ public static IndexRequest buildCreateRequest( .setPipeline(pipeline) .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) - .source(document, mediaType) + .source(documentRef, mediaType) .setRequireAlias(requireAlias); return indexRequest; } @@ -278,7 +326,7 @@ public static IndexRequest buildCreateRequest( * Builds an IndexRequest from an IndexOperation protobuf message. * * @param indexOperation The index operation protobuf message - * @param document The document content as bytes + * @param documentBytes The document content as ByteString (zero-copy reference) * @param opType The default operation type * @param index The default index name * @param id The default document ID @@ -293,7 +341,7 @@ public static IndexRequest buildCreateRequest( */ public static IndexRequest buildIndexRequest( IndexOperation indexOperation, - byte[] document, + ByteString documentBytes, OpType opType, String index, String id, @@ -322,7 +370,8 @@ public static IndexRequest buildIndexRequest( ifPrimaryTerm = indexOperation.hasIfPrimaryTerm() ? indexOperation.getIfPrimaryTerm() : ifPrimaryTerm; requireAlias = indexOperation.hasRequireAlias() ? indexOperation.getRequireAlias() : requireAlias; - MediaType mediaType = detectMediaType(document); + BytesReference documentRef = byteStringToBytesReference(documentBytes); + MediaType mediaType = detectMediaType(documentRef); IndexRequest indexRequest; if (opType == null) { indexRequest = new IndexRequest(index).id(id) @@ -332,7 +381,7 @@ public static IndexRequest buildIndexRequest( .setPipeline(pipeline) .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) - .source(document, mediaType) + .source(documentRef, mediaType) .setRequireAlias(requireAlias); } else { indexRequest = new IndexRequest(index).id(id) @@ -343,7 +392,7 @@ public static IndexRequest buildIndexRequest( .setPipeline(pipeline) .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) - .source(document, mediaType) + .source(documentRef, mediaType) .setRequireAlias(requireAlias); } return indexRequest; @@ -456,9 +505,10 @@ static UpdateRequest fromProto( // 3. upsert if (updateAction.hasUpsert()) { ByteString upsertBytes = updateAction.getUpsert(); - byte[] upsertArray = upsertBytes.toByteArray(); - MediaType upsertMediaType = detectMediaType(upsertArray); - updateRequest.upsert(upsertArray, upsertMediaType); + BytesReference upsertRef = byteStringToBytesReference(upsertBytes); + MediaType upsertMediaType = detectMediaType(upsertRef); + BytesRef bytesRef = upsertRef.toBytesRef(); + updateRequest.upsert(bytesRef.bytes, bytesRef.offset, bytesRef.length, upsertMediaType); } } @@ -471,9 +521,10 @@ static UpdateRequest fromProto( // - If script!=null && doc!=null → validation error // - If script==null && doc==null → validation error if (documentBytes != null && !documentBytes.isEmpty()) { - byte[] docArray = documentBytes.toByteArray(); - MediaType mediaType = detectMediaType(docArray); - updateRequest.doc(docArray, mediaType); + BytesReference docRef = byteStringToBytesReference(documentBytes); + MediaType mediaType = detectMediaType(docRef); + BytesRef bytesRef = docRef.toBytesRef(); + updateRequest.doc(bytesRef.bytes, bytesRef.offset, bytesRef.length, mediaType); } if (bulkRequestBody.hasUpdateAction()) { diff --git a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtilsTests.java b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtilsTests.java index 5d8013b34ba14..88634c6bd18f9 100644 --- a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtilsTests.java +++ b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtilsTests.java @@ -9,11 +9,14 @@ package org.opensearch.transport.grpc.proto.request.document.bulk; import com.google.protobuf.ByteString; +import com.google.protobuf.UnsafeByteOperations; import org.opensearch.action.DocWriteRequest; import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.update.UpdateRequest; import org.opensearch.common.lucene.uid.Versions; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.xcontent.MediaType; import org.opensearch.index.VersionType; import org.opensearch.index.seqno.SequenceNumbers; @@ -45,7 +48,7 @@ public void testBuildCreateRequest() { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest( writeOperation, - document, + UnsafeByteOperations.unsafeWrap(document), "default-index", "default-id", "default-routing", @@ -87,7 +90,7 @@ public void testBuildIndexRequest() { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildIndexRequest( indexOperation, - document, + UnsafeByteOperations.unsafeWrap(document), null, "default-index", "default-id", @@ -122,7 +125,7 @@ public void testBuildIndexRequestWithOpType() { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildIndexRequest( indexOperation, - document, + UnsafeByteOperations.unsafeWrap(document), opType, "default-index", "default-id", @@ -188,13 +191,13 @@ public void testBuildUpdateRequest() { BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder() .setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOperation).build()) - .setObject(ByteString.copyFrom(document)) + .setObject(UnsafeByteOperations.unsafeWrap(document)) .setUpdateAction(org.opensearch.protobufs.UpdateAction.newBuilder().setDocAsUpsert(true).setDetectNoop(true).build()) .build(); UpdateRequest updateRequest = BulkRequestParserProtoUtils.buildUpdateRequest( updateOperation, - ByteString.copyFrom(document), + UnsafeByteOperations.unsafeWrap(document), bulkRequestBody, "default-index", "default-id", @@ -313,7 +316,7 @@ public void testBuildCreateRequestWithDefaults() { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest( writeOperation, - document, + UnsafeByteOperations.unsafeWrap(document), "default-index", "default-id", "default-routing", @@ -340,7 +343,7 @@ public void testBuildCreateRequestWithPipeline() { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest( writeOperation, - document, + UnsafeByteOperations.unsafeWrap(document), "default-index", "default-id", "default-routing", @@ -372,7 +375,7 @@ public void testBuildIndexRequestWithAllFields() { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildIndexRequest( indexOperation, - document, + UnsafeByteOperations.unsafeWrap(document), OpType.OP_TYPE_INDEX, "default-index", "default-id", @@ -405,7 +408,7 @@ public void testBuildIndexRequestWithNullOpType() { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildIndexRequest( indexOperation, - document, + UnsafeByteOperations.unsafeWrap(document), null, "default-index", "default-id", @@ -431,7 +434,7 @@ public void testBuildUpdateRequestWithScript() { BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder() .setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOperation).build()) - .setObject(ByteString.copyFrom(document)) + .setObject(UnsafeByteOperations.unsafeWrap(document)) .setUpdateAction( org.opensearch.protobufs.UpdateAction.newBuilder() .setScript( @@ -454,7 +457,7 @@ public void testBuildUpdateRequestWithScript() { UpdateRequest updateRequest = BulkRequestParserProtoUtils.buildUpdateRequest( updateOperation, - ByteString.copyFrom(document), + UnsafeByteOperations.unsafeWrap(document), bulkRequestBody, "default-index", "default-id", @@ -480,13 +483,13 @@ public void testBuildUpdateRequestWithUpsert() { BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder() .setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOperation).build()) - .setObject(ByteString.copyFrom(document)) + .setObject(UnsafeByteOperations.unsafeWrap(document)) .setUpdateAction(org.opensearch.protobufs.UpdateAction.newBuilder().setUpsert(ByteString.copyFrom(upsertDoc)).build()) .build(); UpdateRequest updateRequest = BulkRequestParserProtoUtils.buildUpdateRequest( updateOperation, - ByteString.copyFrom(document), + UnsafeByteOperations.unsafeWrap(document), bulkRequestBody, "default-index", "default-id", @@ -510,13 +513,13 @@ public void testBuildUpdateRequestWithScriptedUpsert() { BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder() .setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOperation).build()) - .setObject(ByteString.copyFrom(document)) + .setObject(UnsafeByteOperations.unsafeWrap(document)) .setUpdateAction(org.opensearch.protobufs.UpdateAction.newBuilder().setScriptedUpsert(true).build()) .build(); UpdateRequest updateRequest = BulkRequestParserProtoUtils.buildUpdateRequest( updateOperation, - ByteString.copyFrom(document), + UnsafeByteOperations.unsafeWrap(document), bulkRequestBody, "default-index", "default-id", @@ -540,7 +543,7 @@ public void testBuildUpdateRequestWithFetchSource() { BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder() .setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOperation).build()) - .setObject(ByteString.copyFrom(document)) + .setObject(UnsafeByteOperations.unsafeWrap(document)) .setUpdateAction( org.opensearch.protobufs.UpdateAction.newBuilder() .setXSource(org.opensearch.protobufs.SourceConfig.newBuilder().setFetch(true).build()) @@ -550,7 +553,7 @@ public void testBuildUpdateRequestWithFetchSource() { UpdateRequest updateRequest = BulkRequestParserProtoUtils.buildUpdateRequest( updateOperation, - ByteString.copyFrom(document), + UnsafeByteOperations.unsafeWrap(document), bulkRequestBody, "default-index", "default-id", @@ -574,12 +577,12 @@ public void testBuildUpdateRequestWithoutUpdateAction() { BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder() .setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOperation).build()) - .setObject(ByteString.copyFrom(document)) + .setObject(UnsafeByteOperations.unsafeWrap(document)) .build(); UpdateRequest updateRequest = BulkRequestParserProtoUtils.buildUpdateRequest( updateOperation, - ByteString.copyFrom(document), + UnsafeByteOperations.unsafeWrap(document), bulkRequestBody, "default-index", "default-id", @@ -707,7 +710,7 @@ public void testFromProtoWithAllUpdateActionFields() { UpdateRequest result = BulkRequestParserProtoUtils.fromProto( updateRequest, - ByteString.copyFrom(document), + UnsafeByteOperations.unsafeWrap(document), bulkRequestBody, 123L, 456L @@ -732,7 +735,7 @@ public void testBuildCreateRequestWithSmileContent() throws Exception { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest( writeOperation, - smileDocument, + UnsafeByteOperations.unsafeWrap(smileDocument), "default-index", "default-id", null, @@ -760,7 +763,7 @@ public void testBuildCreateRequestWithCborContent() throws Exception { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest( writeOperation, - cborDocument, + UnsafeByteOperations.unsafeWrap(cborDocument), "default-index", "default-id", null, @@ -788,7 +791,7 @@ public void testBuildIndexRequestWithSmileContent() throws Exception { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildIndexRequest( indexOperation, - smileDocument, + UnsafeByteOperations.unsafeWrap(smileDocument), null, "default-index", "default-id", @@ -816,7 +819,7 @@ public void testBuildIndexRequestWithCborContent() throws Exception { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildIndexRequest( indexOperation, - cborDocument, + UnsafeByteOperations.unsafeWrap(cborDocument), null, "default-index", "default-id", @@ -851,7 +854,7 @@ public void testUpdateRequestWithCborUpsert() throws Exception { UpdateRequest result = BulkRequestParserProtoUtils.fromProto( updateRequest, - ByteString.copyFrom(document), + UnsafeByteOperations.unsafeWrap(document), bulkRequestBody, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM @@ -868,7 +871,7 @@ public void testBuildCreateRequestWithEmptyDocument() { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest( writeOperation, - emptyDocument, + UnsafeByteOperations.unsafeWrap(emptyDocument), "default-index", "default-id", null, @@ -894,7 +897,7 @@ public void testBuildCreateRequestWithJsonContent() throws Exception { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest( writeOperation, - jsonDocument, + UnsafeByteOperations.unsafeWrap(jsonDocument), "default-index", "default-id", null, @@ -922,7 +925,7 @@ public void testBuildCreateRequestWithYamlContent() throws Exception { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest( writeOperation, - yamlDocument, + UnsafeByteOperations.unsafeWrap(yamlDocument), "default-index", "default-id", null, @@ -950,7 +953,7 @@ public void testBuildIndexRequestWithJsonContent() throws Exception { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildIndexRequest( indexOperation, - jsonDocument, + UnsafeByteOperations.unsafeWrap(jsonDocument), null, "default-index", "default-id", @@ -978,7 +981,7 @@ public void testBuildIndexRequestWithYamlContent() throws Exception { IndexRequest indexRequest = BulkRequestParserProtoUtils.buildIndexRequest( indexOperation, - yamlDocument, + UnsafeByteOperations.unsafeWrap(yamlDocument), null, "default-index", "default-id", @@ -1035,10 +1038,10 @@ private byte[] createYamlDocument() throws Exception { * Test detectMediaType with null or empty document */ public void testDetectMediaTypeNullOrEmpty() { - MediaType result = BulkRequestParserProtoUtils.detectMediaType(null); + MediaType result = BulkRequestParserProtoUtils.detectMediaType((BytesReference) null); assertEquals("application/json", result.mediaTypeWithoutParameters()); - result = BulkRequestParserProtoUtils.detectMediaType(new byte[0]); + result = BulkRequestParserProtoUtils.detectMediaType(new BytesArray(new byte[0])); assertEquals("application/json", result.mediaTypeWithoutParameters()); } @@ -1047,7 +1050,7 @@ public void testDetectMediaTypeNullOrEmpty() { */ public void testDetectMediaTypeUnrecognizable() { byte[] invalidBytes = new byte[] { (byte) 0xFF, (byte) 0xFE, (byte) 0xFD, (byte) 0xFC }; - MediaType result = BulkRequestParserProtoUtils.detectMediaType(invalidBytes); + MediaType result = BulkRequestParserProtoUtils.detectMediaType(new BytesArray(invalidBytes)); assertEquals("application/json", result.mediaTypeWithoutParameters()); } @@ -1062,13 +1065,13 @@ public void testBuildUpdateRequestWithUpsertAndPipeline() { BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder() .setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOperation).build()) - .setObject(ByteString.copyFrom(document)) + .setObject(UnsafeByteOperations.unsafeWrap(document)) .setUpdateAction(org.opensearch.protobufs.UpdateAction.newBuilder().setUpsert(ByteString.copyFrom(upsertDoc)).build()) .build(); UpdateRequest updateRequest = BulkRequestParserProtoUtils.buildUpdateRequest( updateOperation, - ByteString.copyFrom(document), + UnsafeByteOperations.unsafeWrap(document), bulkRequestBody, "default-index", "default-id", @@ -1178,7 +1181,7 @@ public void testGetDocWriteRequestsWithUpdateActionDoc() { BulkRequestBody updateBody = BulkRequestBody.newBuilder() .setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOp).build()) - .setUpdateAction(org.opensearch.protobufs.UpdateAction.newBuilder().setDoc(ByteString.copyFrom(document)).build()) + .setUpdateAction(org.opensearch.protobufs.UpdateAction.newBuilder().setDoc(UnsafeByteOperations.unsafeWrap(document)).build()) .build(); BulkRequest request = BulkRequest.newBuilder().addBulkRequestBody(updateBody).build(); @@ -1203,7 +1206,7 @@ public void testValueOrDefaultStringWithNullValue() { BulkRequestBody updateBody = BulkRequestBody.newBuilder() .setOperationContainer(OperationContainer.newBuilder().setUpdate(updateOp).build()) - .setObject(ByteString.copyFrom(document)) + .setObject(UnsafeByteOperations.unsafeWrap(document)) .build(); BulkRequest request = BulkRequest.newBuilder()