Skip to content

Commit a317b4a

Browse files
committed
feat: update ReorderableStruct
1 parent bfc6e69 commit a317b4a

File tree

2 files changed

+154
-19
lines changed

2 files changed

+154
-19
lines changed

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/data/ReorderableStruct.java

Lines changed: 106 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,22 @@
44
import java.util.HashMap;
55
import java.util.List;
66
import java.util.Map;
7+
import java.util.Objects;
78

89
import com.aliyun.odps.type.ArrayTypeInfo;
910
import com.aliyun.odps.type.MapTypeInfo;
1011
import com.aliyun.odps.type.NestedTypeInfo;
1112
import com.aliyun.odps.type.StructTypeInfo;
1213
import com.aliyun.odps.type.TypeInfo;
14+
import com.aliyun.odps.utils.StringUtils;
1315

1416
/**
1517
* @author dingxin ([email protected])
1618
*/
17-
public class ReorderableStruct extends SimpleStruct {
19+
public class ReorderableStruct implements Struct {
20+
21+
protected StructTypeInfo typeInfo;
22+
protected List<Object> values;
1823

1924
/**
2025
* lower field name to index
@@ -29,7 +34,23 @@ public class ReorderableStruct extends SimpleStruct {
2934
* be careful: the struct value list is a reference of this param
3035
*/
3136
public ReorderableStruct(StructTypeInfo type, List<Object> values) {
32-
super(type, values);
37+
if (type == null || values == null || values.size() != type.getFieldCount()) {
38+
throw new IllegalArgumentException("Illegal arguments for StructObject.");
39+
}
40+
this.typeInfo = type;
41+
this.values = values;
42+
rebuildFieldNameIndexMap();
43+
}
44+
45+
public ReorderableStruct(StructTypeInfo type) {
46+
if (type == null) {
47+
throw new IllegalArgumentException("Illegal arguments for StructObject.");
48+
}
49+
this.typeInfo = type;
50+
values = new ArrayList<>(type.getFieldCount());
51+
for (int i = 0; i < type.getFieldCount(); i++) {
52+
values.add(null);
53+
}
3354
rebuildFieldNameIndexMap();
3455
}
3556

@@ -41,14 +62,29 @@ private void rebuildFieldNameIndexMap() {
4162
}
4263
}
4364

44-
public void set(String fieldName, Object value) {
65+
public void setFieldValue(String fieldName, Object value) {
4566
values.set(fieldNameToIndex.get(fieldName.toLowerCase()), value);
4667
}
4768

48-
public Object get(String fieldName) {
69+
public void setFieldValue(int index, Object value) {
70+
values.set(index, value);
71+
}
72+
73+
@Override
74+
public Object getFieldValue(String fieldName) {
4975
return getFieldValue(fieldNameToIndex.get(fieldName.toLowerCase()));
5076
}
5177

78+
@Override
79+
public Object getFieldValue(int index) {
80+
return values.get(index);
81+
}
82+
83+
@Override
84+
public TypeInfo getFieldTypeInfo(String fieldName) {
85+
return typeInfo.getFieldTypeInfos().get(fieldNameToIndex.get(fieldName));
86+
}
87+
5288
public synchronized void reorder(StructTypeInfo orderedType) {
5389
List<String> orderedTypeFieldNames = orderedType.getFieldNames();
5490
if (orderedTypeFieldNames.size() != typeInfo.getFieldNames().size()) {
@@ -91,8 +127,9 @@ public static Object reorderNestedType(Object value, TypeInfo oldTypeInfo, TypeI
91127
oldTypeInfo = ((Struct) value).getTypeInfo();
92128
}
93129
// Automatically wrap ordinary Struct as ReorderableStruct
94-
if (!(value instanceof ReorderableStruct)) {
95-
value = new ReorderableStruct((StructTypeInfo) oldTypeInfo, ((Struct) value).getFieldValues());
130+
if (value instanceof Struct && !(value instanceof ReorderableStruct)) {
131+
value =
132+
new ReorderableStruct((StructTypeInfo) oldTypeInfo, ((Struct) value).getFieldValues());
96133
}
97134
// Recursively process nested structures
98135
ReorderableStruct struct = (ReorderableStruct) value;
@@ -141,4 +178,67 @@ public static Object reorderNestedType(Object value, TypeInfo oldTypeInfo, TypeI
141178
// The basic type or type that does not require reordering will return directly to the original value
142179
return value;
143180
}
181+
182+
@Override
183+
public int getFieldCount() {
184+
return values.size();
185+
}
186+
187+
@Override
188+
public String getFieldName(int index) {
189+
return typeInfo.getFieldNames().get(index);
190+
}
191+
192+
@Override
193+
public TypeInfo getFieldTypeInfo(int index) {
194+
return typeInfo.getFieldTypeInfos().get(index);
195+
}
196+
197+
@Override
198+
public TypeInfo getTypeInfo() {
199+
return typeInfo;
200+
}
201+
202+
@Override
203+
public List<Object> getFieldValues() {
204+
return values;
205+
}
206+
207+
@Override
208+
public String toString() {
209+
String valueStr = "{";
210+
int colCount = getFieldCount();
211+
for (int i = 0; i < colCount; ++i) {
212+
valueStr += getFieldName(i) + ":" + getFieldValue(i);
213+
if (i != colCount - 1) {
214+
valueStr += ", ";
215+
}
216+
}
217+
valueStr += "}";
218+
return valueStr;
219+
}
220+
221+
@Override
222+
public boolean equals(Object o) {
223+
if (this == o) {
224+
return true;
225+
}
226+
227+
if (o == null || getClass() != o.getClass()) {
228+
return false;
229+
}
230+
231+
ReorderableStruct that = (ReorderableStruct) o;
232+
233+
if (!StringUtils.equalsIgnoreCase(this.typeInfo.getFieldNames(), that.typeInfo.getFieldNames())) {
234+
return false;
235+
}
236+
237+
return Objects.equals(this.getFieldValues(), that.getFieldValues());
238+
}
239+
240+
@Override
241+
public int hashCode() {
242+
return Objects.hash(StringUtils.toLowerCase(typeInfo.getFieldNames()), values);
243+
}
144244
}

odps-sdk/odps-sdk-core/src/test/java/com/aliyun/odps/data/ReorderableRecordTest.java

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,55 @@
99
import org.junit.Test;
1010

1111
import com.aliyun.odps.Column;
12+
import com.aliyun.odps.Instance;
13+
import com.aliyun.odps.Odps;
1214
import com.aliyun.odps.TableSchema;
15+
import com.aliyun.odps.commons.transport.OdpsTestUtils;
16+
import com.aliyun.odps.task.SQLTask;
17+
import com.aliyun.odps.tunnel.TableTunnel;
1318
import com.aliyun.odps.type.ArrayTypeInfo;
1419
import com.aliyun.odps.type.MapTypeInfo;
1520
import com.aliyun.odps.type.StructTypeInfo;
1621
import com.aliyun.odps.type.TypeInfo;
1722
import com.aliyun.odps.type.TypeInfoFactory;
23+
import com.google.common.collect.ImmutableMap;
1824

1925
/**
2026
* @author dingxin ([email protected])
2127
*/
2228
public class ReorderableRecordTest {
29+
@Test
30+
public void testE2E() throws Exception {
31+
Odps odps = OdpsTestUtils.newDefaultOdps();
32+
String tableName = "test_ReorderableRecordTest";
33+
odps.tables().delete(tableName, true);
34+
35+
TableSchema schema = TableSchema.builder()
36+
.withStringColumn("c1")
37+
.withBigintColumn("c2")
38+
.withColumn(new Column("c3", getOrderedStructType())).build();
39+
odps.tables().newTableCreator(tableName, schema).withLifeCycle(1L)
40+
.withHints(ImmutableMap.of("odps.sql.type.system.odps2", "true")).ifNotExists().create();
41+
42+
TableTunnel.StreamUploadSession session = odps.tableTunnel()
43+
.buildStreamUploadSession(odps.getDefaultProject(), tableName).build();
44+
45+
Record record = new ReorderableRecord(session.getSchema());
46+
Struct misOrderedStruct = getMisOrderedStruct();
47+
record.set("C3", misOrderedStruct);
48+
record.set("C2", 123L);
49+
record.set("C1", "test");
50+
51+
TableTunnel.StreamRecordPack recordPack = session.newRecordPack();
52+
for (int i = 0; i < 10; i++) {
53+
recordPack.append(record);
54+
}
55+
recordPack.flush();
56+
57+
Instance instance = SQLTask.run(odps, "select c3 from " + tableName + ";");
58+
System.out.println(instance.waitForTerminatedAndGetResult().getString());
59+
}
60+
2361

2462
@Test
2563
public void testSetStruct() {
@@ -141,33 +179,30 @@ private Struct getMisOrderedStruct() {
141179
List<TypeInfo> personFieldTypes = new ArrayList<>();
142180
List<String> personFieldNames = new ArrayList<>();
143181

144-
// Person 结构体字段:name (String), age (Int)
182+
// Person 结构体字段:name (String), age (Int)
145183
personFieldTypes.add(TypeInfoFactory.BIGINT); // money
146184
personFieldTypes.add(TypeInfoFactory.INT); // age
147185
personFieldTypes.add(TypeInfoFactory.STRING); // name
148186
personFieldNames.add("money");
149187
personFieldNames.add("age");
150188
personFieldNames.add("name");
151189

152-
StructTypeInfo
153-
personStructType =
190+
StructTypeInfo personStructType =
154191
TypeInfoFactory.getStructTypeInfo(personFieldNames, personFieldTypes);
192+
ReorderableStruct person = new ReorderableStruct(personStructType);
193+
person.setFieldValue("money", 1234L);
194+
person.setFieldValue("age", 25);
195+
person.setFieldValue("name", "Jason");
155196

156-
List<Object> value = new ArrayList<>();
157-
value.add(1234L);
158-
value.add(25);
159-
value.add("Jason");
160-
Struct person = new SimpleStruct(personStructType, value);
161-
162-
// 2. 创建 Array 类型(元素类型为 Person Struct)
197+
// 2. 创建 Array 类型(元素类型为 Person Struct)
163198
ArrayTypeInfo personArrayType = TypeInfoFactory.getArrayTypeInfo(personStructType);
164199

165200
List<Struct> personArray = new ArrayList<>();
166201
personArray.add(person);
167202
personArray.add(person);
168203
personArray.add(person);
169204

170-
// 3. 创建 Map 类型(键为 String,值为 Person Array)
205+
// 3. 创建 Map 类型(键为 String,值为 Person Array)
171206
MapTypeInfo personMapType = TypeInfoFactory.getMapTypeInfo(
172207
TypeInfoFactory.STRING, // key type: String
173208
personArrayType // value type: Array<Person>
@@ -178,11 +213,11 @@ private Struct getMisOrderedStruct() {
178213
personMap.put("key2", personArray);
179214
personMap.put("key3", personArray);
180215

181-
// 4. 创建外层 Struct 类型(包含 Map 和 Array 的嵌套)
216+
// 4. 创建外层 Struct 类型(包含 Map 和 Array 的嵌套)
182217
List<TypeInfo> outerFieldTypes = new ArrayList<>();
183218
List<String> outerFieldNames = new ArrayList<>();
184219

185-
// 外层结构体字段:config (Array<Person>) metadata (Map<String, Array<Person>>),
220+
// 外层结构体字段:config (Array<Person>) metadata (Map<String, Array<Person>>),
186221
outerFieldTypes.add(personArrayType); // config
187222
outerFieldTypes.add(personMapType); // metadata
188223
outerFieldNames.add("config");

0 commit comments

Comments
 (0)