Skip to content

Commit 0b14049

Browse files
fix size calculation in bulk ingester (#1167) (#1169)
Co-authored-by: Laura Trotta <153528055+l-trotta@users.noreply.github.com>
1 parent a4d0ffd commit 0b14049

File tree

2 files changed

+51
-34
lines changed

2 files changed

+51
-34
lines changed

java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public class BulkIngester<Context> implements AutoCloseable {
7373
private BackoffPolicy backoffPolicy;
7474

7575
// Current state
76-
private List<RetryableBulkOperation<Context>> operations = new ArrayList<>();
76+
private List<IngesterOperation<Context>> operations = new ArrayList<>();
7777
private long currentSize;
7878
private int requestsInFlightCount;
7979
private volatile boolean isClosed = false;
@@ -190,7 +190,7 @@ public Duration flushInterval() {
190190
* The number of operations that have been buffered, waiting to be sent.
191191
*/
192192
public int pendingOperations() {
193-
List<RetryableBulkOperation<Context>> operations = this.operations;
193+
List<IngesterOperation<Context>> operations = this.operations;
194194
return operations == null ? 0 : operations.size();
195195
}
196196

@@ -296,26 +296,27 @@ private void failsafeFlush() {
296296
}
297297

298298
public void flush() {
299-
List<RetryableBulkOperation<Context>> sentRequests = new ArrayList<>();
299+
List<IngesterOperation<Context>> sentRequests = new ArrayList<>();
300300
RequestExecution<Context> exec = sendRequestCondition.whenReadyIf(
301301
() -> {
302302
// May happen on manual and periodic flushes
303303
return !operations.isEmpty() && operations.stream()
304-
.anyMatch(RetryableBulkOperation::isSendable);
304+
.anyMatch(IngesterOperation::isSendable);
305305
},
306306
() -> {
307307
// Selecting operations that can be sent immediately,
308308
// Dividing actual operations from contexts
309309
List<BulkOperation> immediateOps = new ArrayList<>();
310310
List<Context> contexts = new ArrayList<>();
311311

312-
for(Iterator<RetryableBulkOperation<Context>> it = operations.iterator(); it.hasNext();){
313-
RetryableBulkOperation<Context> op = it.next();
312+
for(Iterator<IngesterOperation<Context>> it = operations.iterator(); it.hasNext();){
313+
IngesterOperation<Context> op = it.next();
314314
if (op.isSendable()) {
315315
immediateOps.add(op.operation());
316316
contexts.add(op.context());
317317

318318
sentRequests.add(op);
319+
currentSize -= op.size();
319320
it.remove();
320321
}
321322
}
@@ -324,7 +325,6 @@ public void flush() {
324325
BulkRequest request = newRequest().operations(immediateOps).build();
325326

326327
// Prepare for next round
327-
currentSize = operations.size();
328328
addCondition.signalIfReady();
329329

330330
long id = sendRequestCondition.invocations();
@@ -362,7 +362,7 @@ public void flush() {
362362
// Partial success, retrying failed requests if policy allows it
363363
// Keeping list of retryable requests/responses, to exclude them for calling
364364
// listener later
365-
List<RetryableBulkOperation<Context>> retryableReq = new ArrayList<>();
365+
List<IngesterOperation<Context>> retryableReq = new ArrayList<>();
366366
List<RetryableBulkOperation<Context>> refires = new ArrayList<>();
367367
List<BulkResponseItem> retryableResp = new ArrayList<>();
368368

@@ -381,7 +381,7 @@ public void flush() {
381381
// Creating partial BulkRequest
382382
List<BulkOperation> partialOps = new ArrayList<>();
383383
List<Context> partialCtx = new ArrayList<>();
384-
for (RetryableBulkOperation<Context> op : sentRequests) {
384+
for (IngesterOperation<Context> op : sentRequests) {
385385
partialOps.add(op.operation());
386386
partialCtx.add(op.context());
387387
}
@@ -428,16 +428,17 @@ public void flush() {
428428
}
429429

430430
private void selectingRetries(int index, BulkResponseItem bulkItemResponse,
431-
List<RetryableBulkOperation<Context>> sentRequests,
431+
List<IngesterOperation<Context>> sentRequests,
432432
List<BulkResponseItem> retryableResp,
433-
List<RetryableBulkOperation<Context>> retryableReq,
433+
List<IngesterOperation<Context>> retryableReq,
434434
List<RetryableBulkOperation<Context>> refires) {
435435

436436
// Getting original failed, requests and keeping successful ones to send to the listener
437-
RetryableBulkOperation<Context> original = sentRequests.get(index);
437+
IngesterOperation<Context> original = sentRequests.get(index);
438438
if (original.canRetry()) {
439439
retryableResp.add(bulkItemResponse);
440-
Iterator<Long> retryTimes = Optional.ofNullable(original.retries()).orElse(backoffPolicy.iterator());
440+
RetryableBulkOperation<Context> repeatableOp = original.repeatableOperation();
441+
Iterator<Long> retryTimes = Optional.ofNullable(repeatableOp.retries()).orElse(backoffPolicy.iterator());
441442
RetryableBulkOperation<Context> refire = new RetryableBulkOperation<>(original.operation(), original.context(), retryTimes);
442443
retryableReq.add(original);
443444
refires.add(refire);
@@ -517,10 +518,10 @@ private void addRetry(RetryableBulkOperation<Context> repeatableOp) {
517518
}
518519

519520
private void innerAdd(RetryableBulkOperation<Context> repeatableOp) {
520-
IngesterOperation ingestOp = IngesterOperation.of(repeatableOp, client._jsonpMapper());
521+
IngesterOperation<Context> ingestOp = IngesterOperation.of(repeatableOp, client._jsonpMapper());
521522

522523
addCondition.whenReady(() -> {
523-
operations.add(ingestOp.repeatableOperation());
524+
operations.add(ingestOp);
524525
currentSize += ingestOp.size();
525526

526527
if (!canAddOperation()) {

java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,16 @@
3434
/**
3535
* A bulk operation whose size has been calculated and content turned to a binary blob (to compute its size).
3636
*/
37-
class IngesterOperation {
38-
private final RetryableBulkOperation repeatableOp;
37+
class IngesterOperation<Context> {
38+
private final RetryableBulkOperation<Context> repeatableOp;
3939
private final long size;
4040

41-
IngesterOperation(RetryableBulkOperation repeatableOp, long size) {
41+
IngesterOperation(RetryableBulkOperation<Context> repeatableOp, long size) {
4242
this.repeatableOp = repeatableOp;
4343
this.size = size;
4444
}
4545

46-
public static IngesterOperation of(RetryableBulkOperation repeatableOp, JsonpMapper mapper) {
46+
public static <Context> IngesterOperation<Context> of(RetryableBulkOperation<Context> repeatableOp, JsonpMapper mapper) {
4747
switch (repeatableOp.operation()._kind()) {
4848
case Create:
4949
return createOperation(repeatableOp, mapper);
@@ -58,17 +58,33 @@ public static IngesterOperation of(RetryableBulkOperation repeatableOp, JsonpMap
5858
}
5959
}
6060

61-
public RetryableBulkOperation repeatableOperation() {
61+
public RetryableBulkOperation<Context> repeatableOperation() {
6262
return this.repeatableOp;
6363
}
6464

6565
public long size() {
6666
return this.size;
6767
}
6868

69-
private static IngesterOperation createOperation(RetryableBulkOperation repeatableOp, JsonpMapper mapper) {
69+
public BulkOperation operation() {
70+
return repeatableOp.operation();
71+
}
72+
73+
public Context context() {
74+
return repeatableOp.context();
75+
}
76+
77+
public boolean isSendable() {
78+
return repeatableOp.isSendable();
79+
}
80+
81+
public boolean canRetry() {
82+
return repeatableOp.canRetry();
83+
}
84+
85+
private static <Context> IngesterOperation<Context> createOperation(RetryableBulkOperation<Context> repeatableOp, JsonpMapper mapper) {
7086
CreateOperation<?> create = repeatableOp.operation().create();
71-
RetryableBulkOperation newOperation;
87+
RetryableBulkOperation<Context> newOperation;
7288

7389
long size = basePropertiesSize(create);
7490

@@ -79,18 +95,18 @@ private static IngesterOperation createOperation(RetryableBulkOperation repeatab
7995
} else {
8096
BinaryData binaryDoc = BinaryData.of(create.document(), mapper);
8197
size += binaryDoc.size();
82-
newOperation = new RetryableBulkOperation(BulkOperation.of(bo -> bo.create(idx -> {
98+
newOperation = new RetryableBulkOperation<>(BulkOperation.of(bo -> bo.create(idx -> {
8399
copyCreateProperties(create, idx);
84100
return idx.document(binaryDoc);
85101
})),repeatableOp.context(),repeatableOp.retries());
86102
}
87103

88-
return new IngesterOperation(newOperation, size);
104+
return new IngesterOperation<>(newOperation, size);
89105
}
90106

91-
private static IngesterOperation indexOperation(RetryableBulkOperation repeatableOp, JsonpMapper mapper) {
107+
private static <Context> IngesterOperation<Context> indexOperation(RetryableBulkOperation<Context> repeatableOp, JsonpMapper mapper) {
92108
IndexOperation<?> index = repeatableOp.operation().index();
93-
RetryableBulkOperation newOperation;
109+
RetryableBulkOperation<Context> newOperation;
94110

95111
long size = basePropertiesSize(index);
96112

@@ -101,18 +117,18 @@ private static IngesterOperation indexOperation(RetryableBulkOperation repeatabl
101117
} else {
102118
BinaryData binaryDoc = BinaryData.of(index.document(), mapper);
103119
size += binaryDoc.size();
104-
newOperation = new RetryableBulkOperation(BulkOperation.of(bo -> bo.index(idx -> {
120+
newOperation = new RetryableBulkOperation<>(BulkOperation.of(bo -> bo.index(idx -> {
105121
copyIndexProperties(index, idx);
106122
return idx.document(binaryDoc);
107123
})),repeatableOp.context(),repeatableOp.retries());
108124
}
109125

110-
return new IngesterOperation(newOperation, size);
126+
return new IngesterOperation<>(newOperation, size);
111127
}
112128

113-
private static IngesterOperation updateOperation(RetryableBulkOperation repeatableOp, JsonpMapper mapper) {
129+
private static <Context> IngesterOperation<Context> updateOperation(RetryableBulkOperation<Context> repeatableOp, JsonpMapper mapper) {
114130
UpdateOperation<?, ?> update = repeatableOp.operation().update();
115-
RetryableBulkOperation newOperation;
131+
RetryableBulkOperation<Context> newOperation;
116132

117133
long size = basePropertiesSize(update) +
118134
size("retry_on_conflict", update.retryOnConflict()) +
@@ -125,7 +141,7 @@ private static IngesterOperation updateOperation(RetryableBulkOperation repeatab
125141
} else {
126142
BinaryData action = BinaryData.of(update.action(), mapper);
127143
size += action.size();
128-
newOperation = new RetryableBulkOperation(BulkOperation.of(bo -> bo.update(u -> {
144+
newOperation = new RetryableBulkOperation<>(BulkOperation.of(bo -> bo.update(u -> {
129145
copyBaseProperties(update, u);
130146
return u
131147
.binaryAction(action)
@@ -134,12 +150,12 @@ private static IngesterOperation updateOperation(RetryableBulkOperation repeatab
134150
})),repeatableOp.context(),repeatableOp.retries());
135151
}
136152

137-
return new IngesterOperation(newOperation, size);
153+
return new IngesterOperation<>(newOperation, size);
138154
}
139155

140-
private static IngesterOperation deleteOperation(RetryableBulkOperation repeatableOp) {
156+
private static <Context> IngesterOperation<Context> deleteOperation(RetryableBulkOperation<Context> repeatableOp) {
141157
DeleteOperation delete = repeatableOp.operation().delete();
142-
return new IngesterOperation(repeatableOp, basePropertiesSize(delete));
158+
return new IngesterOperation<>(repeatableOp, basePropertiesSize(delete));
143159
}
144160

145161

0 commit comments

Comments
 (0)