Skip to content

Commit fc2a42e

Browse files
Backport to branch(3.15) : Add close for storage, transaction in data loader core (#2718)
Co-authored-by: inv-jishnu <[email protected]>
1 parent 42fd5a7 commit fc2a42e

File tree

2 files changed

+139
-1
lines changed

2 files changed

+139
-1
lines changed

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/ImportManager.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,45 @@ public void onTaskComplete(ImportTaskResult taskResult) {
152152
/** {@inheritDoc} Forwards the event to all registered listeners. */
153153
@Override
154154
public void onAllDataChunksCompleted() {
155+
Throwable firstException = null;
156+
155157
for (ImportEventListener listener : listeners) {
156-
listener.onAllDataChunksCompleted();
158+
try {
159+
listener.onAllDataChunksCompleted();
160+
} catch (Throwable e) {
161+
if (firstException == null) {
162+
firstException = e;
163+
} else {
164+
firstException.addSuppressed(e);
165+
}
166+
}
167+
}
168+
169+
try {
170+
closeResources();
171+
} catch (Throwable e) {
172+
if (firstException != null) {
173+
firstException.addSuppressed(e);
174+
} else {
175+
firstException = e;
176+
}
177+
}
178+
179+
if (firstException != null) {
180+
throw new RuntimeException("Error during completion", firstException);
181+
}
182+
}
183+
184+
/** Close resources properly once the process is completed */
185+
public void closeResources() {
186+
try {
187+
if (distributedStorage != null) {
188+
distributedStorage.close();
189+
} else if (distributedTransactionManager != null) {
190+
distributedTransactionManager.close();
191+
}
192+
} catch (Throwable e) {
193+
throw new RuntimeException("Failed to close the resource", e);
157194
}
158195
}
159196

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package com.scalar.db.dataloader.core.dataimport;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertThrows;
5+
import static org.junit.jupiter.api.Assertions.assertTrue;
6+
import static org.mockito.Mockito.doThrow;
7+
import static org.mockito.Mockito.mock;
8+
import static org.mockito.Mockito.verify;
9+
10+
import com.scalar.db.api.DistributedStorage;
11+
import com.scalar.db.api.DistributedTransactionManager;
12+
import com.scalar.db.api.TableMetadata;
13+
import com.scalar.db.dataloader.core.ScalarDbMode;
14+
import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessorFactory;
15+
import java.io.BufferedReader;
16+
import java.util.HashMap;
17+
import java.util.Map;
18+
import org.junit.jupiter.api.BeforeEach;
19+
import org.junit.jupiter.api.Test;
20+
21+
public class ImportManagerTest {
22+
23+
private ImportManager importManager;
24+
private ImportEventListener listener1;
25+
private ImportEventListener listener2;
26+
private DistributedStorage distributedStorage;
27+
private DistributedTransactionManager distributedTransactionManager;
28+
29+
@BeforeEach
30+
void setUp() {
31+
Map<String, TableMetadata> tableMetadata = new HashMap<>();
32+
BufferedReader reader = mock(BufferedReader.class);
33+
ImportOptions options = mock(ImportOptions.class);
34+
ImportProcessorFactory processorFactory = mock(ImportProcessorFactory.class);
35+
36+
listener1 = mock(ImportEventListener.class);
37+
listener2 = mock(ImportEventListener.class);
38+
distributedStorage = mock(DistributedStorage.class);
39+
distributedTransactionManager = mock(DistributedTransactionManager.class);
40+
41+
importManager =
42+
new ImportManager(
43+
tableMetadata,
44+
reader,
45+
options,
46+
processorFactory,
47+
ScalarDbMode.STORAGE,
48+
distributedStorage,
49+
null); // Only one resource present
50+
importManager.addListener(listener1);
51+
importManager.addListener(listener2);
52+
}
53+
54+
@Test
55+
void onAllDataChunksCompleted_shouldNotifyListenersAndCloseStorage() {
56+
importManager.onAllDataChunksCompleted();
57+
58+
verify(listener1).onAllDataChunksCompleted();
59+
verify(listener2).onAllDataChunksCompleted();
60+
verify(distributedStorage).close();
61+
}
62+
63+
@Test
64+
void onAllDataChunksCompleted_shouldAggregateListenerExceptionAndStillCloseResources() {
65+
doThrow(new RuntimeException("Listener1 failed")).when(listener1).onAllDataChunksCompleted();
66+
67+
RuntimeException thrown =
68+
assertThrows(RuntimeException.class, () -> importManager.onAllDataChunksCompleted());
69+
70+
assertTrue(thrown.getMessage().contains("Error during completion"));
71+
assertEquals("Listener1 failed", thrown.getCause().getMessage());
72+
verify(distributedStorage).close();
73+
}
74+
75+
@Test
76+
void closeResources_shouldCloseTransactionManagerIfStorageIsNull() {
77+
ImportManager managerWithTx =
78+
new ImportManager(
79+
new HashMap<>(),
80+
mock(BufferedReader.class),
81+
mock(ImportOptions.class),
82+
mock(ImportProcessorFactory.class),
83+
ScalarDbMode.TRANSACTION,
84+
null,
85+
distributedTransactionManager);
86+
87+
managerWithTx.closeResources();
88+
verify(distributedTransactionManager).close();
89+
}
90+
91+
@Test
92+
void closeResources_shouldThrowIfResourceCloseFails() {
93+
doThrow(new RuntimeException("Close failed")).when(distributedStorage).close();
94+
95+
RuntimeException ex =
96+
assertThrows(RuntimeException.class, () -> importManager.closeResources());
97+
98+
assertEquals("Failed to close the resource", ex.getMessage());
99+
assertEquals("Close failed", ex.getCause().getMessage());
100+
}
101+
}

0 commit comments

Comments
 (0)