Skip to content

Commit 011f989

Browse files
committed
prepare for 0.51.10-public
1 parent 402fe8a commit 011f989

File tree

16 files changed

+422
-30
lines changed

16 files changed

+422
-30
lines changed

odps-sdk/odps-sdk-commons/src/main/java/com/aliyun/odps/utils/StringUtils.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -975,7 +975,7 @@ public static List<String> toLowerCase(List<String> names)
975975
return names.stream().map(String::toLowerCase).collect(Collectors.toList());
976976
}
977977

978-
public static final byte[] encodeQuotedPrintable(final byte[] bytes) {
978+
public static byte[] encodeQuotedPrintable(final byte[] bytes) {
979979
if (bytes == null) {
980980
return null;
981981
}
@@ -999,4 +999,25 @@ public static final byte[] encodeQuotedPrintable(final byte[] bytes) {
999999

10001000
return buffer.toByteArray();
10011001
}
1002+
1003+
public static byte[] decodeQuotedPrintable(byte[] input) {
1004+
if (input == null) {
1005+
return null;
1006+
}
1007+
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
1008+
for (int i = 0; i < input.length; i++) {
1009+
byte c = input[i];
1010+
if (c == PRINTABLE_ESCAPE_CHAR) {
1011+
if (i + 2 < input.length) {
1012+
String hexValue = new String(input, i + 1, 2);
1013+
int byteValue = Integer.parseInt(hexValue, 16);
1014+
buffer.write(byteValue);
1015+
i += 2;
1016+
}
1017+
} else {
1018+
buffer.write(c);
1019+
}
1020+
}
1021+
return buffer.toByteArray();
1022+
}
10021023
}

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/commons/proto/ProtoWireConstant.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,7 @@ class ProtoWireConstant {
3030
public static final int TUNNEL_META_CHECKSUM = 33554431; // magic num 2^25-1
3131
public static final int TUNNEL_END_RECORD = 33553408; // maigc num 2^25-1024
3232
public static final int SCHEMA_END_TAG = 33553920; //maigc num 2^25-512
33+
34+
public static final int METRICS_TAG = 1;
35+
public static final int METRICS_END_TAG = 33554176; // magic num 2^25-256
3336
}

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/commons/proto/ProtobufRecordStreamReader.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,12 @@
2323
import java.io.IOException;
2424
import java.io.InputStream;
2525
import java.math.BigDecimal;
26-
import java.sql.Timestamp;
2726
import java.time.Instant;
2827
import java.time.LocalDate;
2928
import java.time.LocalDateTime;
3029
import java.time.ZoneId;
3130
import java.time.ZoneOffset;
3231
import java.util.ArrayList;
33-
import java.util.Calendar;
3432
import java.util.HashMap;
3533
import java.util.List;
3634
import java.util.Map;
@@ -69,6 +67,7 @@ public class ProtobufRecordStreamReader implements RecordReader {
6967
private TableSchema schema;
7068
private long count;
7169
private long bytesReaded = 0;
70+
private String tunnelMetricsString;
7271
private Checksum crc = new Checksum();
7372
private Checksum crccrc = new Checksum();
7473
protected boolean shouldTransform = false;
@@ -202,15 +201,24 @@ public Record read(Record reuseRecord) throws IOException {
202201
if (count != in.readSInt64()) {
203202
throw new IOException("count does not match.");
204203
}
205-
206-
if (ProtoWireConstant.TUNNEL_META_CHECKSUM != getTagFieldNumber(in)) {
207-
throw new IOException("Invalid stream.");
204+
// 从服务端拿到 总行数后,预期,拿metrics(如果存在)
205+
int nextTag = getTagFieldNumber(in);
206+
if (ProtoWireConstant.METRICS_TAG == nextTag) {
207+
crc.update(ProtoWireConstant.METRICS_TAG);
208+
tunnelMetricsString = readString();
209+
nextTag = getTagFieldNumber(in);
208210
}
209-
210-
if ((int) crccrc.getValue() != in.readUInt32()) {
211-
throw new IOException("Checksum invalid.");
211+
if (ProtoWireConstant.METRICS_END_TAG == nextTag) {
212+
if ((int) crc.getValue() != in.readUInt32()) {
213+
throw new IOException("Metrics checksum invalid.");
214+
}
215+
nextTag = getTagFieldNumber(in);
216+
}
217+
if (ProtoWireConstant.TUNNEL_META_CHECKSUM == nextTag) {
218+
if ((int) crccrc.getValue() != in.readUInt32()) {
219+
throw new IOException("Checksum invalid.");
220+
}
212221
}
213-
214222
if (!in.isAtEnd()) {
215223
throw new IOException("Expect at the end of stream, but not.");
216224
}
@@ -574,4 +582,8 @@ public Map readMap(OdpsType keyType, OdpsType valueType) throws IOException {
574582
return map;
575583
}
576584

585+
public String getTunnelMetricsString() {
586+
return tunnelMetricsString;
587+
}
588+
577589
}

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/tunnel/HttpHeaders.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,6 @@ public class HttpHeaders extends Headers {
3535
public static final String HEADER_ODPS_TUNNEL_TAGS= "odps-tunnel-tags";
3636
public static final String HEADER_ODPS_TUNNEL_SDK_SUPPORT_SCHEMA_EVOLUTION = "odps-tunnel-sdk-support-schema-evolution";
3737
public static final String HEADER_ODPS_TUNNEL_LATEST_SCHEMA_VERSION = "odps-tunnel-latest-schema-version";
38+
public static final String HEADER_ODPS_TUNNEL_METRICS = "odps-tunnel-metrics";
3839

3940
}

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/tunnel/InstanceTunnel.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import com.aliyun.odps.rest.ResourceBuilder;
3939
import com.aliyun.odps.rest.RestClient;
4040
import com.aliyun.odps.tunnel.io.CompressOption;
41+
import com.aliyun.odps.tunnel.io.TunnelBufferedReader;
4142
import com.aliyun.odps.tunnel.io.TunnelRecordReader;
4243
import com.aliyun.odps.utils.StringUtils;
4344
import com.google.gson.JsonObject;
@@ -478,6 +479,23 @@ public TunnelRecordReader openRecordReader(long start, long count, long sizeLimi
478479
return reader;
479480
}
480481

482+
/**
483+
* 打开{@link RecordReader}用来读取记录
484+
* @param start 本次要读取记录的起始位置
485+
* @param count 本次要读取记录的数量
486+
* @param batchSize 每次读取的记录数量
487+
* @param compress 数据传输是否进行压缩;即使设置了压缩选项,如果server 不支持压缩,传输数据也不会被压缩
488+
* @param columns 本次需要下载的列
489+
* @return TunnelBufferedReader
490+
*/
491+
public RecordReader openBufferedRecordReader(long start, long count, long batchSize, CompressOption compress,
492+
List<Column> columns) {
493+
TunnelBufferedReader reader =
494+
new TunnelBufferedReader(start, count, batchSize, columns, compress, this);
495+
reader.setTransform(shouldTransform);
496+
return reader;
497+
}
498+
481499
/**
482500
* initiate a new download session
483501
* @throws TunnelException

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/tunnel/TableTunnel.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package com.aliyun.odps.tunnel;
2121

2222
import static com.aliyun.odps.tunnel.HttpHeaders.HEADER_ODPS_REQUEST_ID;
23+
import static com.aliyun.odps.tunnel.HttpHeaders.HEADER_ODPS_TUNNEL_METRICS;
2324
import static com.aliyun.odps.tunnel.TunnelConstants.TUNNEL_DATE_TRANSFORM_VERSION;
2425
import static java.lang.Math.max;
2526
import static java.lang.Math.min;
@@ -72,6 +73,7 @@
7273
import com.aliyun.odps.tunnel.io.Checksum;
7374
import com.aliyun.odps.tunnel.io.CompressOption;
7475
import com.aliyun.odps.tunnel.io.ProtobufRecordPack;
76+
import com.aliyun.odps.tunnel.io.TunnelBufferedReader;
7577
import com.aliyun.odps.tunnel.io.TunnelBufferedWriter;
7678
import com.aliyun.odps.tunnel.io.TunnelRecordReader;
7779
import com.aliyun.odps.tunnel.io.TunnelRecordWriter;
@@ -1599,7 +1601,10 @@ private void writeBlockInternal(long blockId, RecordPack pack, long timeout, lon
15991601
try {
16001602
if (pack instanceof ProtobufRecordPack) {
16011603
ProtobufRecordPack protoPack = (ProtobufRecordPack) pack;
1604+
long startTime = System.currentTimeMillis();
16021605
conn = getConnection(blockId, protoPack.getCompressOption(), blockVersion);
1606+
protoPack.addNetworkWallTimeMs(System.currentTimeMillis() - startTime);
1607+
protoPack.addLocalWallTimeMs(System.currentTimeMillis() - startTime);
16031608
sendBlock(protoPack, conn, timeout);
16041609
} else {
16051610
RecordWriter writer = openRecordWriter(blockId);
@@ -1635,6 +1640,7 @@ private void sendBlock(ProtobufRecordPack pack, Connection conn, long timeout) t
16351640
if (null == conn) {
16361641
throw new IOException("Invalid connection");
16371642
}
1643+
long startTime = System.currentTimeMillis();
16381644
pack.checkTransConsistency(shouldTransform);
16391645
pack.complete();
16401646
ByteArrayOutputStream baos = pack.getProtobufStream();
@@ -1663,6 +1669,12 @@ private void sendBlock(ProtobufRecordPack pack, Connection conn, long timeout) t
16631669
response.getStatus());
16641670
throw new IOException(exception.getMessage(), exception);
16651671
}
1672+
// metrics
1673+
String metricsStr = response.getHeader(HEADER_ODPS_TUNNEL_METRICS);
1674+
TunnelMetrics batchMetrics =
1675+
TunnelMetrics.parse(metricsStr, pack.getLocalWallTimeMs() + (System.currentTimeMillis() - startTime),
1676+
pack.getNetworkWallTimeMs() + (System.currentTimeMillis() - startTime));
1677+
pack.addMetrics(batchMetrics);
16661678
}
16671679

16681680
/**
@@ -1717,6 +1729,7 @@ public RecordWriter openRecordWriter(long blockId, CompressOption compress, long
17171729

17181730
private RecordWriter openRecordWriterInternal(long blockId, CompressOption compress, long blockVersion)
17191731
throws TunnelException {
1732+
long startTime = System.currentTimeMillis();
17201733
TunnelRetryHandler retryHandler = new TunnelRetryHandler(conf);
17211734
try {
17221735
return retryHandler.executeWithRetry(() -> {
@@ -1727,6 +1740,7 @@ private RecordWriter openRecordWriterInternal(long blockId, CompressOption compr
17271740
writer =
17281741
new TunnelRecordWriter(schema, conn, compress);
17291742
writer.setTransform(shouldTransform);
1743+
writer.addWallTimeMs(System.currentTimeMillis() - startTime);
17301744
return writer;
17311745
} catch (IOException e) {
17321746
if (conn != null) {
@@ -2398,6 +2412,17 @@ public TunnelRecordReader openRecordReader(long start, long count, CompressOptio
23982412
return reader;
23992413
}
24002414

2415+
public RecordReader openBufferedRecordReader(long start, long count, long batchSize, CompressOption compress,
2416+
List<Column> columns, boolean disableModifiedCheck)
2417+
throws TunnelException {
2418+
if (columns != null && columns.isEmpty()) {
2419+
throw new TunnelException("Specified column list is empty.");
2420+
}
2421+
TunnelBufferedReader reader = new TunnelBufferedReader(start, count, batchSize, columns, compress, this, disableModifiedCheck);
2422+
reader.setTransform(shouldTransform);
2423+
return reader;
2424+
}
2425+
24012426
private Schema arrowSchema;
24022427
public Schema getArrowSchema() {
24032428
if (this.arrowSchema == null){

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/tunnel/TunnelConstants.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
*/
2828
public interface TunnelConstants {
2929

30-
public static int VERSION = 5;
30+
public static int VERSION = 6;
3131
public static String RES_PARTITION = "partition";
3232
public static String RES_SHARD = "shard";
3333
public static String RES_COLUMNS = "columns";
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package com.aliyun.odps.tunnel;
2+
3+
import java.util.Map;
4+
5+
import com.fasterxml.jackson.core.type.TypeReference;
6+
import com.fasterxml.jackson.databind.ObjectMapper;
7+
8+
/**
9+
* @author dingxin ([email protected])
10+
*/
11+
public class TunnelMetrics {
12+
13+
private long networkWallCost;
14+
private long clientProcessCost;
15+
private long tunnelProcessCost;
16+
private long storageCost;
17+
private long serverTotalCost;
18+
private long serverIoCost;
19+
private long rateLimitCost;
20+
21+
22+
public TunnelMetrics() {
23+
}
24+
25+
TunnelMetrics(long networkWallCost, long clientProcessCost,
26+
long tunnelProcessCost, long storageCost, long serverTotalCost,
27+
long serverIoCost, long rateLimitCost) {
28+
this.networkWallCost = networkWallCost;
29+
this.clientProcessCost = clientProcessCost;
30+
this.tunnelProcessCost = tunnelProcessCost;
31+
this.storageCost = storageCost;
32+
this.serverTotalCost = serverTotalCost;
33+
this.serverIoCost = serverIoCost;
34+
this.rateLimitCost = rateLimitCost;
35+
}
36+
37+
public static TunnelMetrics parse(String metricsString, long localWallTime,
38+
long networkWallTime) {
39+
ObjectMapper objectMapper = new ObjectMapper();
40+
try {
41+
Map<String, Long>
42+
costMap =
43+
objectMapper.readValue(metricsString, new TypeReference<Map<String, Long>>() {
44+
});
45+
// The unit returned by the server is microseconds
46+
long storageCost = costMap.getOrDefault("PanguIOCost", 0L) / 1000;
47+
long serverIoCost = costMap.getOrDefault("ServerIOCost", 0L) / 1000;
48+
long serverTotalCost = costMap.getOrDefault("ServerTotalCost", 0L) / 1000;
49+
long rateLimitCost = costMap.getOrDefault("RateLimitCost", 0L) / 1000;
50+
51+
return new TunnelMetrics(networkWallTime,
52+
localWallTime - networkWallTime,
53+
serverTotalCost - serverIoCost - storageCost - rateLimitCost,
54+
storageCost,
55+
serverTotalCost,
56+
serverIoCost,
57+
rateLimitCost
58+
);
59+
60+
} catch (Exception ignored) {
61+
// parse failed do not break the process
62+
return new TunnelMetrics();
63+
}
64+
}
65+
66+
public void add(TunnelMetrics other) {
67+
this.networkWallCost += other.networkWallCost;
68+
this.clientProcessCost += other.clientProcessCost;
69+
this.tunnelProcessCost += other.tunnelProcessCost;
70+
this.storageCost += other.storageCost;
71+
this.serverTotalCost += other.serverTotalCost;
72+
this.serverIoCost += other.serverIoCost;
73+
this.rateLimitCost += other.rateLimitCost;
74+
}
75+
76+
@Override
77+
public String toString() {
78+
return "TunnelMetrics{" +
79+
"networkWallCost=" + networkWallCost +
80+
", clientProcessCost=" + clientProcessCost +
81+
", tunnelProcessCost=" + tunnelProcessCost +
82+
", storageCost=" + storageCost +
83+
", serverTotalCost=" + serverTotalCost +
84+
", serverIoCost=" + serverIoCost +
85+
", rateLimitCost=" + rateLimitCost +
86+
'}';
87+
}
88+
}

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/tunnel/impl/StreamUploadSessionImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.aliyun.odps.tunnel.impl;
22

33
import static com.aliyun.odps.tunnel.HttpHeaders.HEADER_ODPS_REQUEST_ID;
4+
import static com.aliyun.odps.tunnel.HttpHeaders.HEADER_ODPS_TUNNEL_METRICS;
45

56
import java.io.ByteArrayOutputStream;
67
import java.io.IOException;
@@ -30,6 +31,7 @@
3031
import com.aliyun.odps.tunnel.TableTunnel;
3132
import com.aliyun.odps.tunnel.TunnelConstants;
3233
import com.aliyun.odps.tunnel.TunnelException;
34+
import com.aliyun.odps.tunnel.TunnelMetrics;
3335
import com.aliyun.odps.tunnel.io.CompressOption;
3436
import com.aliyun.odps.tunnel.io.ProtobufRecordPack;
3537
import com.aliyun.odps.tunnel.io.StreamRecordPackImpl;
@@ -416,6 +418,8 @@ private String sendBlock(ProtobufRecordPack pack, Connection conn, Slot slot, lo
416418
if (null == conn) {
417419
throw new IOException("Invalid connection");
418420
}
421+
422+
long startTime = System.currentTimeMillis();
419423
ByteArrayOutputStream baos = pack.getProtobufStream();
420424
if (timeout > 0) {
421425
ConnectionWatcher.getInstance().mark(conn, timeout);
@@ -453,6 +457,12 @@ private String sendBlock(ProtobufRecordPack pack, Connection conn, Slot slot, lo
453457
response.getHeader(HttpHeaders.HEADER_ODPS_ROUTED_SERVER),
454458
Integer.valueOf(response.getHeader(HttpHeaders.HEADER_ODPS_SLOT_NUM)));
455459

460+
String metricsStr = response.getHeader(HEADER_ODPS_TUNNEL_METRICS);
461+
TunnelMetrics batchMetrics =
462+
TunnelMetrics.parse(metricsStr, pack.getLocalWallTimeMs() + (System.currentTimeMillis() - startTime),
463+
pack.getNetworkWallTimeMs() + (System.currentTimeMillis() - startTime));
464+
pack.addMetrics(batchMetrics);
465+
456466
return response.getHeader(HEADER_ODPS_REQUEST_ID);
457467
}
458468

0 commit comments

Comments
 (0)