Skip to content

Commit c02d798

Browse files
committed
Add manual switch to allow non-distributed execution.
1 parent 29fe525 commit c02d798

File tree

6 files changed

+151
-115
lines changed

6 files changed

+151
-115
lines changed

contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSGroupScan.java

Lines changed: 132 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,15 @@
5555
import org.slf4j.LoggerFactory;
5656

5757
import java.io.IOException;
58-
import java.util.ArrayList;
58+
import java.util.HashMap;
5959
import java.util.LinkedHashMap;
60+
import java.util.LinkedList;
6061
import java.util.List;
6162
import java.util.Map;
6263
import java.util.Optional;
6364
import java.util.Random;
6465
import java.util.concurrent.ForkJoinPool;
66+
import java.util.concurrent.ForkJoinTask;
6567
import java.util.concurrent.RecursiveTask;
6668
import java.util.concurrent.TimeUnit;
6769
import java.util.stream.Collectors;
@@ -116,34 +118,21 @@ private void init() {
116118

117119
Multihash topHash = ipfsScanSpec.getTargetHash(ipfsHelper);
118120
try {
119-
Map<Multihash, String> leafAddrMap = getLeafAddrMappings(topHash);
120-
logger.debug("Iterating on {} leaves...", leafAddrMap.size());
121-
ClusterCoordinator coordinator = ipfsContext.getStoragePlugin().getContext().getClusterCoordinator();
122-
for (Multihash leaf : leafAddrMap.keySet()) {
123-
String peerHostname = leafAddrMap.get(leaf);
121+
Map<Multihash, IPFSPeer> leafPeerMap = getLeafPeerMappings(topHash);
122+
logger.debug("Iterating on {} leaves...", leafPeerMap.size());
124123

125-
Optional<DrillbitEndpoint> oep = coordinator.getAvailableEndpoints()
126-
.stream()
127-
.filter(a -> a.getAddress().equals(peerHostname))
128-
.findAny();
124+
ClusterCoordinator coordinator = ipfsContext.getStoragePlugin().getContext().getClusterCoordinator();
125+
for (Multihash leaf : leafPeerMap.keySet()) {
129126
DrillbitEndpoint ep;
130-
if (oep.isPresent()) {
131-
ep = oep.get();
132-
logger.debug("Using existing endpoint {}", ep.getAddress());
127+
if (config.isDistributedMode()) {
128+
String peerHostname = leafPeerMap
129+
.get(leaf)
130+
.getDrillbitAddress()
131+
.orElseThrow(() -> new RuntimeException("Chosen IPFS peer does not have drillbit address"));
132+
ep = registerEndpoint(coordinator, peerHostname);
133133
} else {
134-
logger.debug("created new endpoint on the fly {}", peerHostname);
135-
//DRILL-7754: read ports & version info from IPFS instead of hard-coded
136-
ep = DrillbitEndpoint.newBuilder()
137-
.setAddress(peerHostname)
138-
.setUserPort(DEFAULT_USER_PORT)
139-
.setControlPort(DEFAULT_CONTROL_PORT)
140-
.setDataPort(DEFAULT_DATA_PORT)
141-
.setHttpPort(DEFAULT_HTTP_PORT)
142-
.setVersion(DrillVersionInfo.getVersion())
143-
.setState(DrillbitEndpoint.State.ONLINE)
144-
.build();
145-
//DRILL-7777: how to safely remove endpoints that are no longer needed once the query is completed?
146-
ClusterCoordinator.RegistrationHandle handle = coordinator.register(ep);
134+
// the foreman is used to execute the plan
135+
ep = ipfsContext.getStoragePlugin().getContext().getEndpoint();
147136
}
148137

149138
IPFSWork work = new IPFSWork(leaf);
@@ -161,15 +150,56 @@ private void init() {
161150
}
162151
}
163152

164-
Map<Multihash, String> getLeafAddrMappings(Multihash topHash) {
153+
private DrillbitEndpoint registerEndpoint(ClusterCoordinator coordinator, String peerHostname) {
154+
Optional<DrillbitEndpoint> oep = coordinator.getAvailableEndpoints()
155+
.stream()
156+
.filter(ep -> ep.getAddress().equals(peerHostname))
157+
.findAny();
158+
DrillbitEndpoint ep;
159+
if (oep.isPresent()) {
160+
ep = oep.get();
161+
logger.debug("Using existing endpoint {}", ep.getAddress());
162+
} else {
163+
logger.debug("created new endpoint on the fly {}", peerHostname);
164+
//DRILL-7754: read ports & version info from IPFS instead of hard-coded
165+
ep = DrillbitEndpoint.newBuilder()
166+
.setAddress(peerHostname)
167+
.setUserPort(DEFAULT_USER_PORT)
168+
.setControlPort(DEFAULT_CONTROL_PORT)
169+
.setDataPort(DEFAULT_DATA_PORT)
170+
.setHttpPort(DEFAULT_HTTP_PORT)
171+
.setVersion(DrillVersionInfo.getVersion())
172+
.setState(DrillbitEndpoint.State.ONLINE)
173+
.build();
174+
//DRILL-7777: how to safely remove endpoints that are no longer needed once the query is completed?
175+
ClusterCoordinator.RegistrationHandle handle = coordinator.register(ep);
176+
}
177+
178+
return ep;
179+
}
180+
181+
Map<Multihash, IPFSPeer> getLeafPeerMappings(Multihash topHash) {
165182
logger.debug("start to recursively expand nested IPFS hashes, topHash={}", topHash);
166183
Stopwatch watch = Stopwatch.createStarted();
167184
ForkJoinPool forkJoinPool = new ForkJoinPool(config.getNumWorkerThreads());
168-
IPFSTreeFlattener topTask = new IPFSTreeFlattener(topHash, false, ipfsContext);
169-
Map<Multihash, String> leafAddrMap = forkJoinPool.invoke(topTask);
185+
IPFSTreeFlattener topTask = new IPFSTreeFlattener(topHash, ipfsContext);
186+
List<Multihash> leaves = forkJoinPool.invoke(topTask);
170187
logger.debug("Took {} ms to expand hash leaves", watch.elapsed(TimeUnit.MILLISECONDS));
171188

172-
return leafAddrMap;
189+
logger.debug("Start to resolve providers");
190+
watch.reset().start();
191+
Map<Multihash, IPFSPeer> leafPeerMap;
192+
if (config.isDistributedMode()) {
193+
leafPeerMap = forkJoinPool.invoke(new IPFSProviderResolver(leaves, ipfsContext));
194+
} else {
195+
leafPeerMap = new HashMap<>();
196+
for (Multihash leaf : leaves) {
197+
leafPeerMap.put(leaf, ipfsContext.getMyself());
198+
}
199+
}
200+
logger.debug("Took {} ms to resolve providers", watch.elapsed(TimeUnit.MILLISECONDS));
201+
202+
return leafPeerMap;
173203
}
174204

175205
private IPFSGroupScan(IPFSGroupScan that) {
@@ -330,50 +360,93 @@ public String toString() {
330360
}
331361
}
332362

333-
//DRILL-7756: detect and warn about loops/recursions in case of a malformed tree
334-
static class IPFSTreeFlattener extends RecursiveTask<Map<Multihash, String>> {
335-
private final Multihash hash;
336-
private final boolean isProvider;
337-
private final Map<Multihash, String> ret = new LinkedHashMap<>();
363+
static class IPFSProviderResolver extends RecursiveTask<Map<Multihash, IPFSPeer>> {
364+
private final List<Multihash> leaves;
365+
private final Map<Multihash, IPFSPeer> ret = new LinkedHashMap<>();
338366
private final IPFSPeer myself;
339367
private final IPFSHelper helper;
340368
private final LoadingCache<Multihash, IPFSPeer> peerCache;
341369
private final LoadingCache<Multihash, List<Multihash>> providerCache;
342370

343-
public IPFSTreeFlattener(Multihash hash, boolean isProvider, IPFSContext context) {
371+
public IPFSProviderResolver(List<Multihash> leaves, IPFSContext context) {
372+
this(leaves, context.getMyself(), context.getIPFSHelper(), context.getIPFSPeerCache(), context.getProviderCache());
373+
}
374+
375+
public IPFSProviderResolver(IPFSProviderResolver reference, List<Multihash> leaves) {
376+
this(leaves, reference.myself, reference.helper, reference.peerCache, reference.providerCache);
377+
}
378+
379+
IPFSProviderResolver(List<Multihash> leaves, IPFSPeer myself, IPFSHelper helper, LoadingCache<Multihash, IPFSPeer> peerCache, LoadingCache<Multihash, List<Multihash>> providerCache) {
380+
this.leaves = leaves;
381+
this.myself = myself;
382+
this.helper = helper;
383+
this.peerCache = peerCache;
384+
this.providerCache = providerCache;
385+
}
386+
387+
@Override
388+
protected Map<Multihash, IPFSPeer> compute() {
389+
int totalLeaves = leaves.size();
390+
if (totalLeaves == 1) {
391+
Multihash hash = leaves.get(0);
392+
List<IPFSPeer> providers = providerCache.getUnchecked(hash).parallelStream()
393+
.map(peerCache::getUnchecked)
394+
.filter(IPFSPeer::isDrillReady)
395+
.filter(IPFSPeer::hasDrillbitAddress)
396+
.collect(Collectors.toList());
397+
if (providers.size() < 1) {
398+
logger.warn("No drill-ready provider found for leaf {}, adding foreman as the provider", hash);
399+
providers.add(myself);
400+
}
401+
logger.debug("Got {} providers for {} from IPFS", providers.size(), hash);
402+
403+
//DRILL-7753: better peer selection algorithm
404+
Random random = new Random();
405+
IPFSPeer chosenPeer = providers.get(random.nextInt(providers.size()));
406+
ret.put(hash, chosenPeer);
407+
logger.debug("Use peer {} for leaf {}", chosenPeer, hash);
408+
return ret;
409+
}
410+
411+
int firstHalf = totalLeaves / 2;
412+
ImmutableList<IPFSProviderResolver> resolvers = ImmutableList.of(
413+
new IPFSProviderResolver(this, leaves.subList(0, firstHalf)),
414+
new IPFSProviderResolver(this, leaves.subList(firstHalf, totalLeaves))
415+
);
416+
resolvers.forEach(ForkJoinTask::fork);
417+
resolvers.reverse().forEach(resolver -> ret.putAll(resolver.join()));
418+
return ret;
419+
}
420+
}
421+
422+
//DRILL-7756: detect and warn about loops/recursions in case of a malformed tree
423+
static class IPFSTreeFlattener extends RecursiveTask<List<Multihash>> {
424+
private final Multihash hash;
425+
private final List<Multihash> ret = new LinkedList<>();
426+
private final IPFSPeer myself;
427+
private final IPFSHelper helper;
428+
429+
public IPFSTreeFlattener(Multihash hash, IPFSContext context) {
344430
this(
345431
hash,
346-
isProvider,
347432
context.getMyself(),
348-
context.getIPFSHelper(),
349-
context.getIPFSPeerCache(),
350-
context.getProviderCache()
433+
context.getIPFSHelper()
351434
);
352435
}
353436

354-
IPFSTreeFlattener(Multihash hash, boolean isProvider, IPFSPeer myself, IPFSHelper ipfsHelper,
355-
LoadingCache<Multihash, IPFSPeer> peerCache, LoadingCache<Multihash, List<Multihash>> providerCache) {
437+
IPFSTreeFlattener(Multihash hash, IPFSPeer myself, IPFSHelper ipfsHelper) {
356438
this.hash = hash;
357-
this.isProvider = isProvider;
358439
this.myself = myself;
359440
this.helper = ipfsHelper;
360-
this.peerCache = peerCache;
361-
this.providerCache = providerCache;
362441
}
363442

364-
public IPFSTreeFlattener(IPFSTreeFlattener reference, Multihash hash, boolean isProvider) {
365-
this(hash, isProvider, reference.myself, reference.helper, reference.peerCache, reference.providerCache);
443+
public IPFSTreeFlattener(IPFSTreeFlattener reference, Multihash hash) {
444+
this(hash, reference.myself, reference.helper);
366445
}
367446

368447
@Override
369-
public Map<Multihash, String> compute() {
448+
public List<Multihash> compute() {
370449
try {
371-
if (isProvider) {
372-
IPFSPeer peer = peerCache.getUnchecked(hash);
373-
ret.put(hash, peer.getDrillbitAddress().orElse(null));
374-
return ret;
375-
}
376-
377450
MerkleNode metaOrSimpleNode = helper.getObjectLinksTimeout(hash);
378451
if (metaOrSimpleNode.links.size() > 0) {
379452
logger.debug("{} is a meta node", hash);
@@ -382,68 +455,19 @@ public Map<Multihash, String> compute() {
382455

383456
ImmutableList.Builder<IPFSTreeFlattener> builder = ImmutableList.builder();
384457
for (Multihash intermediate : intermediates.subList(1, intermediates.size())) {
385-
builder.add(new IPFSTreeFlattener(this, intermediate, false));
458+
builder.add(new IPFSTreeFlattener(this, intermediate));
386459
}
387460
ImmutableList<IPFSTreeFlattener> subtasks = builder.build();
388461
subtasks.forEach(IPFSTreeFlattener::fork);
389462

390-
IPFSTreeFlattener first = new IPFSTreeFlattener(this, intermediates.get(0), false);
391-
ret.putAll(first.compute());
463+
IPFSTreeFlattener first = new IPFSTreeFlattener(this, intermediates.get(0));
464+
ret.addAll(first.compute());
392465
subtasks.reverse().forEach(
393-
subtask -> ret.putAll(subtask.join())
466+
subtask -> ret.addAll(subtask.join())
394467
);
395468
} else {
396469
logger.debug("{} is a simple node", hash);
397-
List<IPFSPeer> providers = providerCache.getUnchecked(hash).stream()
398-
.map(peerCache::getUnchecked)
399-
.collect(Collectors.toList());
400-
providers = providers.stream()
401-
.filter(IPFSPeer::isDrillReady)
402-
.collect(Collectors.toList());
403-
if (providers.size() < 1) {
404-
logger.warn("No drill-ready provider found for leaf {}, adding foreman as the provider", hash);
405-
providers.add(myself);
406-
}
407-
408-
logger.debug("Got {} providers for {} from IPFS", providers.size(), hash);
409-
ImmutableList.Builder<IPFSTreeFlattener> builder = ImmutableList.builder();
410-
for (IPFSPeer provider : providers.subList(1, providers.size())) {
411-
builder.add(new IPFSTreeFlattener(this, provider.getId(), true));
412-
}
413-
ImmutableList<IPFSTreeFlattener> subtasks = builder.build();
414-
subtasks.forEach(IPFSTreeFlattener::fork);
415-
416-
List<String> possibleAddrs = new ArrayList<>();
417-
Multihash firstProvider = providers.get(0).getId();
418-
IPFSTreeFlattener firstTask = new IPFSTreeFlattener(this, firstProvider, true);
419-
String firstAddr = firstTask.compute().get(firstProvider);
420-
if (firstAddr != null) {
421-
possibleAddrs.add(firstAddr);
422-
}
423-
424-
subtasks.reverse().forEach(
425-
subtask -> {
426-
String addr = subtask.join().get(subtask.hash);
427-
if (addr != null) {
428-
possibleAddrs.add(addr);
429-
}
430-
}
431-
);
432-
433-
if (possibleAddrs.size() < 1) {
434-
logger.error("All attempts to find an appropriate provider address for {} have failed", hash);
435-
throw UserException
436-
.planError()
437-
.message("No address found for any provider for leaf " + hash)
438-
.build(logger);
439-
} else {
440-
//DRILL-7753: better peer selection algorithm
441-
Random random = new Random();
442-
String chosenAddr = possibleAddrs.get(random.nextInt(possibleAddrs.size()));
443-
ret.clear();
444-
ret.put(hash, chosenAddr);
445-
logger.debug("Got peer host {} for leaf {}", chosenAddr, hash);
446-
}
470+
ret.add(hash);
447471
}
448472
} catch (IOException e) {
449473
throw UserException.planError(e).message("Exception during planning").build(logger);

contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePluginConfig.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ public class IPFSStoragePluginConfig extends StoragePluginConfigBase {
4646
@JsonProperty("max-nodes-per-leaf")
4747
private final int maxNodesPerLeaf;
4848

49+
@JsonProperty("distributed-mode")
50+
private final boolean distributedMode;
51+
4952
@JsonProperty("ipfs-timeouts")
5053
private final Map<IPFSTimeOut, Integer> ipfsTimeouts;
5154

@@ -156,13 +159,15 @@ public IPFSStoragePluginConfig(
156159
@JsonProperty("host") String host,
157160
@JsonProperty("port") int port,
158161
@JsonProperty("max-nodes-per-leaf") int maxNodesPerLeaf,
162+
@JsonProperty("distributed-mode") boolean distributedMode,
159163
@JsonProperty("ipfs-timeouts") Map<IPFSTimeOut, Integer> ipfsTimeouts,
160164
@JsonProperty("ipfs-caches") Map<IPFSCacheType, IPFSCache> ipfsCaches,
161165
@JsonProperty("groupscan-worker-threads") int numWorkerThreads,
162166
@JsonProperty("formats") Map<String, FormatPluginConfig> formats) {
163167
this.host = host;
164168
this.port = port;
165169
this.maxNodesPerLeaf = maxNodesPerLeaf > 0 ? maxNodesPerLeaf : 1;
170+
this.distributedMode = distributedMode;
166171
this.ipfsTimeouts = applyDefaultMap(ipfsTimeouts, ipfsTimeoutDefaults);
167172
this.ipfsCaches = applyDefaultMap(ipfsCaches, ipfsCacheDefaults);
168173
this.numWorkerThreads = numWorkerThreads > 0 ? numWorkerThreads : 1;
@@ -196,6 +201,11 @@ public int getMaxNodesPerLeaf() {
196201
return maxNodesPerLeaf;
197202
}
198203

204+
@JsonProperty("distributed-mode")
205+
public boolean isDistributedMode() {
206+
return distributedMode;
207+
}
208+
199209
@JsonIgnore
200210
public int getIPFSTimeout(IPFSTimeOut which) {
201211
return ipfsTimeouts.get(which);
@@ -228,7 +238,7 @@ public Map<String, FormatPluginConfig> getFormats() {
228238

229239
@Override
230240
public int hashCode() {
231-
return Objects.hashCode(host, port, maxNodesPerLeaf, ipfsTimeouts, ipfsCaches, formats);
241+
return Objects.hashCode(host, port, maxNodesPerLeaf, distributedMode, ipfsTimeouts, ipfsCaches, formats);
232242
}
233243

234244
@Override
@@ -246,6 +256,7 @@ public boolean equals(Object obj) {
246256
&& Objects.equal(ipfsCaches, other.ipfsTimeouts)
247257
&& port == other.port
248258
&& maxNodesPerLeaf == other.maxNodesPerLeaf
259+
&& distributedMode == other.distributedMode
249260
&& numWorkerThreads == other.numWorkerThreads;
250261
}
251262
}

contrib/storage-ipfs/src/main/resources/bootstrap-storage-plugins.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
"host": "127.0.0.1",
66
"port": 5001,
77
"max-nodes-per-leaf": 3,
8+
"distributed-mode": false,
89
"ipfs-timeouts": {
910
"find-provider": 4,
1011
"find-peer-info": 4,

0 commit comments

Comments
 (0)