Skip to content

Commit f71cd3c

Browse files
authored
Merge pull request #9834 from s1ck/object-long-map
Support VARCHAR and BIGINT as external node identifiers
2 parents 64a1e56 + 266e54c commit f71cd3c

File tree

3 files changed

+444
-0
lines changed

3 files changed

+444
-0
lines changed
Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Neo4j is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*/
20+
package org.neo4j.gds.core.utils.paged;
21+
22+
import com.carrotsearch.hppc.BitMixer;
23+
import com.carrotsearch.hppc.ObjectLongHashMap;
24+
import com.carrotsearch.hppc.ObjectLongMap;
25+
import com.carrotsearch.hppc.procedures.ObjectLongProcedure;
26+
import org.eclipse.collections.impl.collection.mutable.AbstractMultiReaderMutableCollection;
27+
import org.neo4j.gds.api.IdMap;
28+
import org.neo4j.gds.collections.ha.HugeObjectArray;
29+
import org.neo4j.gds.core.concurrency.Concurrency;
30+
import org.neo4j.gds.mem.BitUtil;
31+
32+
import java.util.Arrays;
33+
import java.util.concurrent.atomic.AtomicLong;
34+
import java.util.concurrent.locks.ReentrantLock;
35+
import java.util.stream.IntStream;
36+
37+
public final class ShardedByteArrayLongMap {
38+
39+
private final HugeObjectArray<byte[]> internalNodeMapping;
40+
private final ObjectLongMap<byte[]>[] originalNodeMappingShards;
41+
private final int shardShift;
42+
43+
public static Builder builder(Concurrency concurrency) {
44+
return new Builder(concurrency);
45+
}
46+
47+
public static Builder builder(Concurrency concurrency, long capacity) {
48+
return new Builder(concurrency, capacity);
49+
}
50+
51+
private ShardedByteArrayLongMap(
52+
HugeObjectArray<byte[]> internalNodeMapping,
53+
ObjectLongMap<byte[]>[] originalNodeMappingShards,
54+
int shardShift
55+
) {
56+
this.internalNodeMapping = internalNodeMapping;
57+
this.originalNodeMappingShards = originalNodeMappingShards;
58+
this.shardShift = shardShift;
59+
}
60+
61+
public long toMappedNodeId(byte[] nodeId) {
62+
var shard = findShard(nodeId, this.originalNodeMappingShards, this.shardShift);
63+
return shard.getOrDefault(nodeId, IdMap.NOT_FOUND);
64+
}
65+
66+
public boolean contains(byte[] originalId) {
67+
var shard = findShard(originalId, this.originalNodeMappingShards, this.shardShift);
68+
return shard.containsKey(originalId);
69+
}
70+
71+
public byte[] toOriginalNodeId(long nodeId) {
72+
return internalNodeMapping.get(nodeId);
73+
}
74+
75+
public long size() {
76+
return internalNodeMapping.size();
77+
}
78+
79+
private static <T> T findShard(byte[] key, T[] shards, int shift) {
80+
int idx = shardIdx(key, shift);
81+
return shards[idx];
82+
}
83+
84+
// https://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function
85+
private static final long FNV1_64_INIT = 0xcbf29ce484222325L;
86+
private static final long FNV1_64_PRIME = 1099511628211L;
87+
88+
// We use FNV-1a 64-bit hash function to hash the byte array key
89+
// and achieve a somewhat uniform distribution of keys across shards.
90+
private static int shardIdx(byte[] key, int shift) {
91+
long hash = FNV1_64_INIT;
92+
93+
for (int i = 0; i < key.length; i++) {
94+
hash ^= (key[i] & 0xff);
95+
hash *= FNV1_64_PRIME;
96+
}
97+
98+
return (int) (hash >>> shift);
99+
}
100+
101+
private static int numberOfShards(Concurrency concurrency) {
102+
return BitUtil.nextHighestPowerOfTwo(concurrency.value() * 4);
103+
}
104+
105+
@SuppressWarnings("unchecked")
106+
private static <S extends MapShard> ShardedByteArrayLongMap build(
107+
long nodeCount,
108+
S[] shards,
109+
int shardShift
110+
) {
111+
var internalNodeMapping = HugeObjectArray.newArray(byte[].class, nodeCount);
112+
var mapShards = new ObjectLongMap[shards.length];
113+
114+
// ignoring concurrency limitation 🤷
115+
Arrays.parallelSetAll(
116+
mapShards, idx -> {
117+
var shard = shards[idx];
118+
var mapping = shard.intoMapping();
119+
mapping.forEach((ObjectLongProcedure<? super byte[]>) (originalId, mappedId) -> {
120+
internalNodeMapping.set(mappedId, originalId);
121+
});
122+
return mapping;
123+
}
124+
);
125+
126+
return new ShardedByteArrayLongMap(
127+
internalNodeMapping,
128+
mapShards,
129+
shardShift
130+
);
131+
}
132+
133+
abstract static class MapShard {
134+
135+
private static final class Map extends ObjectLongHashMap<byte[]> {
136+
137+
Map() {
138+
super();
139+
}
140+
141+
Map(int initialCapacity) {
142+
super(initialCapacity);
143+
}
144+
145+
@Override
146+
protected int hashKey(byte[] key) {
147+
return BitMixer.mix(Arrays.hashCode(key), this.keyMixer);
148+
}
149+
150+
@Override
151+
protected boolean equals(Object v1, Object v2) {
152+
return Arrays.equals((byte[]) v1, (byte[]) v2);
153+
}
154+
}
155+
156+
private final ReentrantLock lock;
157+
private final AbstractMultiReaderMutableCollection.LockWrapper lockWrapper;
158+
final ObjectLongMap<byte[]> mapping;
159+
160+
MapShard() {
161+
this.mapping = new Map();
162+
this.lock = new ReentrantLock();
163+
this.lockWrapper = new AbstractMultiReaderMutableCollection.LockWrapper(lock);
164+
}
165+
166+
MapShard(long capacity) {
167+
this.mapping = new Map(Math.toIntExact(capacity));
168+
this.lock = new ReentrantLock();
169+
this.lockWrapper = new AbstractMultiReaderMutableCollection.LockWrapper(lock);
170+
}
171+
172+
final AbstractMultiReaderMutableCollection.LockWrapper acquireLock() {
173+
this.lock.lock();
174+
return this.lockWrapper;
175+
}
176+
177+
void assertIsUnderLock() {
178+
assert this.lock.isHeldByCurrentThread() : "addNode must only be called while holding the lock";
179+
}
180+
181+
ObjectLongMap<byte[]> intoMapping() {
182+
return mapping;
183+
}
184+
}
185+
186+
public static final class Builder {
187+
188+
private final AtomicLong nodeCount;
189+
private final Shard[] shards;
190+
private final int shardShift;
191+
private final int shardMask;
192+
193+
Builder(Concurrency concurrency) {
194+
this.nodeCount = new AtomicLong();
195+
int numberOfShards = numberOfShards(concurrency);
196+
this.shardShift = Long.SIZE - Integer.numberOfTrailingZeros(numberOfShards);
197+
this.shardMask = numberOfShards - 1;
198+
this.shards = IntStream.range(0, numberOfShards)
199+
.mapToObj(__ -> new Shard(this.nodeCount))
200+
.toArray(Shard[]::new);
201+
}
202+
203+
Builder(Concurrency concurrency, long capacity) {
204+
this.nodeCount = new AtomicLong();
205+
int numberOfShards = numberOfShards(concurrency);
206+
this.shardShift = Long.SIZE - Integer.numberOfTrailingZeros(numberOfShards);
207+
this.shardMask = numberOfShards - 1;
208+
this.shards = IntStream.range(0, numberOfShards)
209+
.mapToObj(__ -> new Shard(this.nodeCount, BitUtil.ceilDiv(capacity, numberOfShards)))
210+
.toArray(Shard[]::new);
211+
}
212+
213+
/**
214+
* Add a node to the mapping.
215+
*
216+
* @return {@code mappedId >= 0} if the node was added,
217+
* or {@code -(mappedId) - 1} if the node was already mapped.
218+
*/
219+
public long addNode(byte[] nodeId) {
220+
var shard = findShard(nodeId, this.shards, this.shardShift);
221+
try (var ignoredLock = shard.acquireLock()) {
222+
return shard.addNode(nodeId);
223+
}
224+
}
225+
226+
public ShardedByteArrayLongMap build() {
227+
return ShardedByteArrayLongMap.build(
228+
this.nodeCount.get(),
229+
this.shards,
230+
this.shardShift
231+
);
232+
}
233+
234+
private static final class Shard extends MapShard {
235+
private final AtomicLong nextId;
236+
237+
private Shard(AtomicLong nextId) {
238+
super();
239+
this.nextId = nextId;
240+
}
241+
242+
private Shard(AtomicLong nextId, long capacity) {
243+
super(capacity);
244+
this.nextId = nextId;
245+
}
246+
247+
long addNode(byte[] nodeId) {
248+
this.assertIsUnderLock();
249+
int index = mapping.indexOf(nodeId);
250+
if (mapping.indexExists(index)) {
251+
return -mapping.indexGet(index) - 1;
252+
}
253+
long mappedId = nextId.getAndIncrement();
254+
mapping.indexInsert(index, nodeId, mappedId);
255+
return mappedId;
256+
}
257+
}
258+
}
259+
}

0 commit comments

Comments
 (0)