Skip to content

Commit 59b2230

Browse files
committed
[lake/iceberg] Support MAP type in Iceberg tables
1 parent 71b625f commit 59b2230

File tree

14 files changed

+600
-40
lines changed

14 files changed

+600
-40
lines changed

fluss-common/src/main/java/org/apache/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,13 @@ public static FieldWriter createFieldWriter(DataType fieldType) {
196196

197197
case BIGINT:
198198
return (writer, value) -> writer.writeLong((long) value);
199+
200+
case FLOAT:
201+
return (writer, value) -> writer.writeFloat((float) value);
202+
203+
case DOUBLE:
204+
return (writer, value) -> writer.writeDouble((double) value);
205+
199206
// support for nanoseconds come check again after #1195 merge
200207
case TIMESTAMP_WITHOUT_TIME_ZONE:
201208
return (writer, value) -> {
@@ -215,6 +222,14 @@ public static FieldWriter createFieldWriter(DataType fieldType) {
215222
case BYTES:
216223
return (writer, value) -> writer.writeBytes((byte[]) value, true);
217224

225+
case ARRAY:
226+
throw new IllegalArgumentException(
227+
"Array types cannot be used as bucket keys. Bucket keys must be scalar types.");
228+
229+
case MAP:
230+
throw new IllegalArgumentException(
231+
"Map types cannot be used as bucket keys. Bucket keys must be scalar types.");
232+
218233
default:
219234
throw new IllegalArgumentException(
220235
"Unsupported type for Iceberg binary row writer: " + fieldType);

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,19 @@ public Type visit(ArrayType arrayType) {
167167

168168
@Override
169169
public Type visit(MapType mapType) {
170-
throw new UnsupportedOperationException("Unsupported map type");
170+
// According to the Iceberg spec,
171+
// the key and value fields of a map should have consecutive IDs
172+
int keyFieldId = getNextId();
173+
int valueFieldId = getNextId();
174+
175+
Type keyType = mapType.getKeyType().accept(this);
176+
Type valueType = mapType.getValueType().accept(this);
177+
178+
if (mapType.getValueType().isNullable()) {
179+
return Types.MapType.ofOptional(keyFieldId, valueFieldId, keyType, valueType);
180+
} else {
181+
return Types.MapType.ofRequired(keyFieldId, valueFieldId, keyType, valueType);
182+
}
171183
}
172184

173185
@Override

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.lake.iceberg.source;
1919

2020
import org.apache.fluss.row.InternalArray;
21+
import org.apache.fluss.row.InternalMap;
2122
import org.apache.fluss.types.ArrayType;
2223
import org.apache.fluss.types.BigIntType;
2324
import org.apache.fluss.types.BinaryType;
@@ -31,6 +32,7 @@
3132
import org.apache.fluss.types.FloatType;
3233
import org.apache.fluss.types.IntType;
3334
import org.apache.fluss.types.LocalZonedTimestampType;
35+
import org.apache.fluss.types.MapType;
3436
import org.apache.fluss.types.SmallIntType;
3537
import org.apache.fluss.types.StringType;
3638
import org.apache.fluss.types.TimeType;
@@ -104,6 +106,13 @@ public Object get(int index) {
104106
? null
105107
: new FlussArrayAsIcebergList(
106108
innerArray, ((ArrayType) elementType).getElementType());
109+
} else if (elementType instanceof MapType) {
110+
MapType mapType = (MapType) elementType;
111+
InternalMap internalMap = flussArray.getMap(index);
112+
return internalMap == null
113+
? null
114+
: new FlussMapAsIcebergMap(
115+
internalMap, mapType.getKeyType(), mapType.getValueType());
107116
} else {
108117
throw new UnsupportedOperationException(
109118
"Unsupported array element type conversion for Fluss type: "
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.iceberg.source;
19+
20+
import org.apache.fluss.lake.iceberg.FlussDataTypeToIcebergDataType;
21+
import org.apache.fluss.row.InternalArray;
22+
import org.apache.fluss.row.InternalMap;
23+
import org.apache.fluss.row.InternalRow;
24+
import org.apache.fluss.types.ArrayType;
25+
import org.apache.fluss.types.BigIntType;
26+
import org.apache.fluss.types.BinaryType;
27+
import org.apache.fluss.types.BooleanType;
28+
import org.apache.fluss.types.BytesType;
29+
import org.apache.fluss.types.CharType;
30+
import org.apache.fluss.types.DataType;
31+
import org.apache.fluss.types.DateType;
32+
import org.apache.fluss.types.DecimalType;
33+
import org.apache.fluss.types.DoubleType;
34+
import org.apache.fluss.types.FloatType;
35+
import org.apache.fluss.types.IntType;
36+
import org.apache.fluss.types.LocalZonedTimestampType;
37+
import org.apache.fluss.types.MapType;
38+
import org.apache.fluss.types.RowType;
39+
import org.apache.fluss.types.SmallIntType;
40+
import org.apache.fluss.types.StringType;
41+
import org.apache.fluss.types.TimeType;
42+
import org.apache.fluss.types.TimestampType;
43+
import org.apache.fluss.types.TinyIntType;
44+
import org.apache.fluss.utils.DateTimeUtils;
45+
46+
import org.apache.iceberg.types.Types;
47+
48+
import java.nio.ByteBuffer;
49+
import java.time.Instant;
50+
import java.time.OffsetDateTime;
51+
import java.time.ZoneOffset;
52+
import java.util.AbstractMap;
53+
import java.util.AbstractSet;
54+
import java.util.Iterator;
55+
import java.util.Set;
56+
57+
/** Adapter class for converting Fluss InternalMap to a Java Map for Iceberg. */
58+
public class FlussMapAsIcebergMap extends AbstractMap<Object, Object> {
59+
60+
private final InternalMap flussMap;
61+
private final DataType keyType;
62+
private final DataType valueType;
63+
64+
public FlussMapAsIcebergMap(InternalMap flussMap, DataType keyType, DataType valueType) {
65+
this.flussMap = flussMap;
66+
this.keyType = keyType;
67+
this.valueType = valueType;
68+
}
69+
70+
@Override
71+
public int size() {
72+
return flussMap.size();
73+
}
74+
75+
@Override
76+
public Set<Entry<Object, Object>> entrySet() {
77+
return new AbstractSet<>() {
78+
@Override
79+
public Iterator<Entry<Object, Object>> iterator() {
80+
return new Iterator<>() {
81+
private final InternalArray keyArray = flussMap.keyArray();
82+
private final InternalArray valueArray = flussMap.valueArray();
83+
private final int size = flussMap.size();
84+
private int currentIndex = 0;
85+
86+
@Override
87+
public boolean hasNext() {
88+
return currentIndex < size;
89+
}
90+
91+
@Override
92+
public Entry<Object, Object> next() {
93+
Object key = convertElement(keyArray, currentIndex, keyType);
94+
Object value = convertElement(valueArray, currentIndex, valueType);
95+
currentIndex++;
96+
return new AbstractMap.SimpleEntry<>(key, value);
97+
}
98+
};
99+
}
100+
101+
@Override
102+
public int size() {
103+
return flussMap.size();
104+
}
105+
};
106+
}
107+
108+
private Object convertElement(InternalArray array, int index, DataType elementType) {
109+
if (array.isNullAt(index)) {
110+
return null;
111+
}
112+
113+
if (elementType instanceof BooleanType) {
114+
return array.getBoolean(index);
115+
} else if (elementType instanceof TinyIntType) {
116+
return (int) array.getByte(index);
117+
} else if (elementType instanceof SmallIntType) {
118+
return (int) array.getShort(index);
119+
} else if (elementType instanceof IntType) {
120+
return array.getInt(index);
121+
} else if (elementType instanceof BigIntType) {
122+
return array.getLong(index);
123+
} else if (elementType instanceof FloatType) {
124+
return array.getFloat(index);
125+
} else if (elementType instanceof DoubleType) {
126+
return array.getDouble(index);
127+
} else if (elementType instanceof StringType) {
128+
return array.getString(index).toString();
129+
} else if (elementType instanceof CharType) {
130+
CharType charType = (CharType) elementType;
131+
return array.getChar(index, charType.getLength()).toString();
132+
} else if (elementType instanceof DecimalType) {
133+
DecimalType decimalType = (DecimalType) elementType;
134+
return array.getDecimal(index, decimalType.getPrecision(), decimalType.getScale())
135+
.toBigDecimal();
136+
} else if (elementType instanceof LocalZonedTimestampType) {
137+
LocalZonedTimestampType ltzType = (LocalZonedTimestampType) elementType;
138+
return toIcebergTimestampLtz(
139+
array.getTimestampLtz(index, ltzType.getPrecision()).toInstant());
140+
} else if (elementType instanceof TimestampType) {
141+
TimestampType tsType = (TimestampType) elementType;
142+
return array.getTimestampNtz(index, tsType.getPrecision()).toLocalDateTime();
143+
} else if (elementType instanceof DateType) {
144+
return DateTimeUtils.toLocalDate(array.getInt(index));
145+
} else if (elementType instanceof TimeType) {
146+
return DateTimeUtils.toLocalTime(array.getInt(index));
147+
} else if (elementType instanceof BytesType || elementType instanceof BinaryType) {
148+
return ByteBuffer.wrap(array.getBytes(index));
149+
} else if (elementType instanceof ArrayType) {
150+
ArrayType arrayType = (ArrayType) elementType;
151+
InternalArray internalArray = array.getArray(index);
152+
return internalArray == null
153+
? null
154+
: new FlussArrayAsIcebergList(internalArray, arrayType.getElementType());
155+
} else if (elementType instanceof MapType) {
156+
MapType mapType = (MapType) elementType;
157+
InternalMap internalMap = array.getMap(index);
158+
return internalMap == null
159+
? null
160+
: new FlussMapAsIcebergMap(
161+
internalMap, mapType.getKeyType(), mapType.getValueType());
162+
} else if (elementType instanceof RowType) {
163+
RowType rowType = (RowType) elementType;
164+
Types.StructType nestedStructType =
165+
(Types.StructType) rowType.accept(FlussDataTypeToIcebergDataType.INSTANCE);
166+
InternalRow internalRow = array.getRow(index, rowType.getFieldCount());
167+
return internalRow == null
168+
? null
169+
: new FlussRowAsIcebergRecord(nestedStructType, rowType, internalRow);
170+
} else {
171+
throw new UnsupportedOperationException(
172+
"Unsupported array element type conversion for Fluss type: "
173+
+ elementType.getClass().getSimpleName());
174+
}
175+
}
176+
177+
private OffsetDateTime toIcebergTimestampLtz(Instant instant) {
178+
return OffsetDateTime.ofInstant(instant, ZoneOffset.UTC);
179+
}
180+
}

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.lake.iceberg.FlussDataTypeToIcebergDataType;
2121
import org.apache.fluss.row.InternalArray;
22+
import org.apache.fluss.row.InternalMap;
2223
import org.apache.fluss.row.InternalRow;
2324
import org.apache.fluss.types.ArrayType;
2425
import org.apache.fluss.types.BigIntType;
@@ -33,6 +34,7 @@
3334
import org.apache.fluss.types.FloatType;
3435
import org.apache.fluss.types.IntType;
3536
import org.apache.fluss.types.LocalZonedTimestampType;
37+
import org.apache.fluss.types.MapType;
3638
import org.apache.fluss.types.RowType;
3739
import org.apache.fluss.types.SmallIntType;
3840
import org.apache.fluss.types.StringType;
@@ -189,6 +191,15 @@ private FlussRowToIcebergFieldConverter createTypeConverter(DataType flussType,
189191
InternalRow nestedRow = row.getRow(pos, rowType.getFieldCount());
190192
return new FlussRowAsIcebergRecord(nestedStructType, rowType, nestedRow);
191193
};
194+
} else if (flussType instanceof MapType) {
195+
MapType mapType = (MapType) flussType;
196+
return row -> {
197+
InternalMap map = row.getMap(pos);
198+
return map == null
199+
? null
200+
: new FlussMapAsIcebergMap(
201+
map, mapType.getKeyType(), mapType.getValueType());
202+
};
192203
} else {
193204
throw new UnsupportedOperationException(
194205
"Unsupported data type conversion for Fluss type: "

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.time.LocalDateTime;
3333
import java.time.OffsetDateTime;
3434
import java.util.List;
35+
import java.util.Map;
3536

3637
/** Adapter for Iceberg List as Fluss InternalArray. */
3738
public class IcebergArrayAsFlussArray implements InternalArray {
@@ -139,7 +140,8 @@ public InternalArray getArray(int pos) {
139140

140141
@Override
141142
public InternalMap getMap(int pos) {
142-
throw new UnsupportedOperationException();
143+
Map<?, ?> nestedMap = (Map<?, ?>) icebergList.get(pos);
144+
return nestedMap == null ? null : new IcebergMapAsFlussMap(nestedMap);
143145
}
144146

145147
@Override
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.lake.iceberg.source;
20+
21+
import org.apache.fluss.row.InternalArray;
22+
import org.apache.fluss.row.InternalMap;
23+
24+
import java.util.ArrayList;
25+
import java.util.Map;
26+
27+
/** Adapter for Iceberg Map as Fluss InternalMap. */
28+
public class IcebergMapAsFlussMap implements InternalMap {
29+
30+
private final Map<?, ?> icebergMap;
31+
32+
public IcebergMapAsFlussMap(Map<?, ?> icebergMap) {
33+
this.icebergMap = icebergMap;
34+
}
35+
36+
@Override
37+
public int size() {
38+
return icebergMap.size();
39+
}
40+
41+
@Override
42+
public InternalArray keyArray() {
43+
return new IcebergArrayAsFlussArray(new ArrayList<>(icebergMap.keySet()));
44+
}
45+
46+
@Override
47+
public InternalArray valueArray() {
48+
return new IcebergArrayAsFlussArray(new ArrayList<>(icebergMap.values()));
49+
}
50+
}

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.time.LocalDateTime;
3535
import java.time.OffsetDateTime;
3636
import java.util.List;
37+
import java.util.Map;
3738

3839
import static org.apache.fluss.lake.iceberg.IcebergLakeCatalog.SYSTEM_COLUMNS;
3940

@@ -167,8 +168,12 @@ public InternalArray getArray(int pos) {
167168

168169
@Override
169170
public InternalMap getMap(int pos) {
170-
// TODO: Support Map type conversion from Iceberg to Fluss
171-
throw new UnsupportedOperationException();
171+
Object value = icebergRecord.get(pos);
172+
if (value == null) {
173+
return null;
174+
}
175+
Map<?, ?> icebergMap = (Map<?, ?>) value;
176+
return new IcebergMapAsFlussMap(icebergMap);
172177
}
173178

174179
@Override

0 commit comments

Comments
 (0)