Skip to content

Commit e13e0fa

Browse files
committed
prepare version 0.48.7-public
1 parent d9c99b0 commit e13e0fa

38 files changed

+375
-121
lines changed

CHANGELOG.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,24 @@
11
# Changelog
22

3+
## [0.48.7-public] - 2024-08-07
4+
### Enhancements
5+
- **TableTunnel Configuration Optimization**: Introduced the `tags` attribute to `TableTunnel Configuration`, enabling users to attach custom tags to tunnel operations for enhanced logging and management. These tags are recorded in the tenant-level `information schema`.
6+
```java
7+
Odps odps;
8+
Configuration configuration =
9+
Configuration.builder(odps)
10+
.withTags(Arrays.asList("tag1", "tag2")) // Utilize Arrays.asList for code standardization
11+
.build();
12+
TableTunnel tableTunnel = odps.tableTunnel(configuration);
13+
// Proceed with tunnel operations
14+
```
15+
- **Instance Enhancement**: Added the `waitForTerminatedAndGetResult` method to the `Instance` class, integrating optimization strategies from versions 0.48.6 and 0.48.7 for the `SQLExecutor` interface, enhancing operational efficiency. Refer to `com.aliyun.odps.sqa.SQLExecutorImpl.getOfflineResultSet` for usage.
16+
17+
### Improve
18+
- **SQLExecutor Offline Job Processing Optimization**: Significantly reduced end-to-end latency by enabling immediate result retrieval after critical processing stages of offline jobs executed by `SQLExecutor`, without waiting for the job to fully complete, thus boosting response speed and resource utilization.
19+
### Fixes
20+
- **TunnelRetryHandler NPE Fix**: Rectified a potential null pointer exception issue in the `getRetryPolicy` method when the error code (`error code`) was `null`.
21+
322
## [0.48.6-public] - 2024-07-17
423

524
### Added

CHANGELOG_CN.md

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,63 @@
11
# 更新日志
22

3+
## [0.48.7-public] - 2024-08-07
4+
5+
### 增强
6+
7+
- **TableTunnel 配置优化**:引入 `tags` 属性至 `TableTunnel Configuration`
8+
,旨在允许用户为隧道相关操作附上自定义标签。这些标签会被记录在租户层级的 `information schema`
9+
中,便于日志追踪与管理。
10+
11+
```java
12+
Odps odps;
13+
Configuration configuration=
14+
Configuration.builder(odps)
15+
.withTags(Arrays.asList("tag1","tag2")) // 使用 Arrays.asList 以提升代码规范性
16+
.build();
17+
TableTunnel tableTunnel=odps.tableTunnel(configuration);
18+
// 继续执行隧道相关操作
19+
```
20+
21+
- **Instance 增强**:在 `Instance` 类中新增 `waitForTerminatedAndGetResult` 方法,此方法整合了 0.48.6
22+
及 0.48.7 版本中对 `SQLExecutor`
23+
接口的优化策略,提升了操作效率。使用方式可参考 `com.aliyun.odps.sqa.SQLExecutorImpl.getOfflineResultSet`
24+
方法。
25+
26+
### 优化
27+
28+
- **SQLExecutor 离线作业处理优化**:显著减少了端到端延迟,通过改进使得由 `SQLExecutor`
29+
执行的离线作业能在关键处理阶段完成后即刻获取结果,无需等待作业全部完成,提高了响应速度和资源利用率。
30+
31+
### 修复
32+
33+
- **TunnelRetryHandler NPE修复**:修正了 `getRetryPolicy` 方法中在错误码 (`error code`) 为 `null`
34+
的情况下潜在空指针异常问题。
35+
336
## [0.48.6-public] - 2024-07-17
437

538
### 新增
39+
640
- **支持序列化**
7-
- 主要数据类型如 `ArrayRecord``Column``TableSchema``TypeInfo` 现在支持序列化和反序列化,能够进行缓存和进程间通信。
41+
- 主要数据类型如 `ArrayRecord``Column``TableSchema``TypeInfo` 现在支持序列化和反序列化,能够进行缓存和进程间通信。
842
- **谓词下推**
9-
- 新增 `Attribute` 类型的谓词,用于指定列名。
43+
- 新增 `Attribute` 类型的谓词,用于指定列名。
1044

1145
### 变更
46+
1247
- **Tunnel 接口重构**
13-
- 重构了 Tunnel 相关接口,加入了无感知的重试逻辑,大大增强了稳定性和鲁棒性。
14-
- 删除了 `TunnelRetryStrategy``ConfigurationImpl` 类,分别被 `TunnelRetryHandler``Configuration` 所取代。
48+
- 重构了 Tunnel 相关接口,加入了无感知的重试逻辑,大大增强了稳定性和鲁棒性。
49+
- 删除了 `TunnelRetryStrategy``ConfigurationImpl` 类,分别被 `TunnelRetryHandler`
50+
`Configuration` 所取代。
1551

1652
### 优化
53+
1754
- **SQLExecutor 优化**
18-
- 在使用 `SQLExecutor` 接口执行离线 SQL 作业时进行优化,减少每个作业获取结果时的一次网络请求,从而减少端到端延时。
55+
- 在使用 `SQLExecutor` 接口执行离线 SQL 作业时进行优化,减少每个作业获取结果时的一次网络请求,从而减少端到端延时。
1956

2057
### 修复
58+
2159
- **Table.read Decimal 读取**
22-
- 修复了 `Table.read` 接口在读取 `decimal` 类型时,后面补零不符合预期的问题。
60+
- 修复了 `Table.read` 接口在读取 `decimal` 类型时,后面补零不符合预期的问题。
2361

2462
## [0.48.5-public] - 2024-06-17
2563

odps-sdk-impl/odps-mapred-bridge/src/main/java/com/aliyun/odps/mapred/BridgeJobRunner.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,7 @@ protected void setUp() throws OdpsException {
296296
OutputUtils.setTables(infos, job);
297297
}
298298

299+
getProjectModeConf();
299300
processTempResources();
300301

301302
// Adding jobconf jar.
@@ -386,6 +387,16 @@ protected void tearDown() throws OdpsException {
386387
isClean = true;
387388
}
388389

390+
private void getProjectModeConf(){
391+
try {
392+
String mode = metaExplorer.getProjectProperty(SessionState.MR_EXECUTION_MODE);
393+
if (mode != null) {
394+
job.set("odps.mr.project.conf", mode);
395+
}
396+
} catch (OdpsException ignore) {
397+
}
398+
}
399+
389400
abstract protected Instance submitInternal() throws OdpsException;
390401

391402
@Override

odps-sdk-impl/odps-mapred-bridge/src/main/java/com/aliyun/odps/mapred/bridge/UDTFTaskContextImpl.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,7 @@ protected void configure(ExecutionContext ctx) {
371371
this.pipeIndex = this.pipeIndex < 0 ? 0 :this.pipeIndex;
372372
}
373373
}
374+
resolveSqlModeNodeIndex(exeMode, tid);
374375
this.pipeNode = pipeline.getNode(pipeIndex);
375376
}
376377

@@ -388,6 +389,42 @@ protected void configure(ExecutionContext ctx) {
388389
}
389390
}
390391

392+
private void resolveSqlModeNodeIndex(String exeMode, String tid) {
393+
if (exeMode != null && exeMode.equalsIgnoreCase("lot")) {
394+
return;
395+
}
396+
TableInfo[] infos = InputUtils.getTables(conf);
397+
if (infos == null || infos.length < 2) {
398+
return;
399+
}
400+
String projectConfMode = "";
401+
if (conf.get("odps.mr.project.conf") != null) {
402+
projectConfMode = conf.get("odps.mr.project.conf").toLowerCase();
403+
}
404+
if (projectConfMode.equals("lot")) {
405+
return;
406+
}
407+
if (tid.startsWith("M")) {
408+
this.pipeIndex = 0;
409+
} else {
410+
int pipePrefix = Integer.parseInt(tid.split("_")[0].substring(1));
411+
String taskIdPrefix = "R" + pipePrefix;
412+
int prefixLength = pipePrefix > 9 ? 9 : pipePrefix;
413+
for (int i = 1; i < prefixLength; i++) {
414+
taskIdPrefix += "_" + i;
415+
}
416+
if (tid.startsWith(taskIdPrefix)) {
417+
this.pipeIndex = 1;
418+
} else if (tid.startsWith("R3_2_")) {
419+
this.pipeIndex = 2;
420+
} else if (this.pipeIndex > 2 &&
421+
(projectConfMode.equals("sql") || projectConfMode.equals("hybrid"))) {
422+
this.pipeIndex = pipePrefix - infos.length;
423+
this.pipeIndex = this.pipeIndex < 0 ? 0 : this.pipeIndex;
424+
}
425+
}
426+
}
427+
391428
protected boolean hasLabel(String label) {
392429
if (label2offset.get(label) == null) {
393430
return false;

odps-sdk-impl/odps-mapred-bridge/src/main/java/com/aliyun/odps/mapred/bridge/sqlgen/SqlGenContext.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -201,27 +201,32 @@ public String getId() {
201201
public String getMapStreamProcessor() {
202202
String cmd = job.getStreamProcessor("map");
203203
if (cmd != null) {
204-
try {
205-
return URLDecoder.decode(cmd, "UTF-8");
206-
} catch (UnsupportedEncodingException e) {
207-
e.printStackTrace();
208-
}
204+
return decodeStreamProcessor(cmd);
209205
}
210206
return null;
211207
}
212208

213209
public String getReduceStreamProcessor() {
214210
String cmd = job.getStreamProcessor("reduce");
215211
if (cmd != null) {
216-
try {
217-
return URLDecoder.decode(cmd, "UTF-8");
218-
} catch (UnsupportedEncodingException e) {
219-
e.printStackTrace();
220-
}
212+
return decodeStreamProcessor(cmd);
221213
}
222214
return null;
223215
}
224216

217+
String decodeStreamProcessor(String cmd) {
218+
String result = null;
219+
try {
220+
result = URLDecoder.decode(cmd, "UTF-8");
221+
if (result.contains("'")) {
222+
result = result.replaceAll("'", "\\\\'");
223+
}
224+
} catch (UnsupportedEncodingException e) {
225+
e.printStackTrace();
226+
}
227+
return result;
228+
}
229+
225230
public String getFunctionCreateText() {
226231
return job.getFunctionCreateText();
227232
}

odps-sdk-impl/odps-mapred-bridge/src/main/java/com/aliyun/odps/mapred/bridge/sqlgen/SqlGenerator.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,6 @@
4545
public class SqlGenerator {
4646

4747
public static String generate(JobConf job, String id, MetaExplorer metaExplorer, Map<String, String> aliasToTempResource) {
48-
if(!isSqlMode(job, metaExplorer)) {
49-
return null;
50-
}
5148
createFunction((BridgeJobConf) job, id, aliasToTempResource);
5249
Properties p = new Properties();
5350
p.setProperty("resource.loader", "class");
@@ -103,6 +100,13 @@ static void createFunction(BridgeJobConf job, String jobId, Map<String, String>
103100
resouceText.append(s).append(",");
104101
}
105102
if (job.isStreamJob()) {
103+
if (job.getResources() != null && aliasToTempResource != null) {
104+
for (String s : job.getResources()) {
105+
if (!aliasToTempResource.containsKey(s)) {
106+
resouceText.append(s).append(",");
107+
}
108+
}
109+
}
106110
resouceText.deleteCharAt(resouceText.lastIndexOf(","));
107111
createText.append(String.format("set %s=", SessionState.MR_EXECUTION_SESSION_RESOURCES));
108112
if (job.get(SessionState.MR_EXECUTION_SESSION_RESOURCES) != null) {

odps-sdk-impl/odps-mapred-bridge/src/main/resources/pl.vm

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,9 @@ $!{ctx.FunctionCreateText}
7171
#end
7272
#if(${ctx.isNoInputTableInfos()})
7373
#if(!${ctx.mapOnly()})
74-
SELECT mr2sql_mapper_${ctx.Id}_${nodeId}() as (${node.IntermediateColsJoinedMapOut}) FROM values('') _t (_c)
74+
SELECT mr2sql_mapper_${ctx.Id}_${nodeId}() as (${node.IntermediateColsJoinedMapOut}) FROM values('') _t (_c) where false
7575
#else
76-
SELECT mr2sql_mapper_${ctx.Id}_${nodeId}() as (${ctx.PackagedColsJoined}) FROM values('') _t (_c)
76+
SELECT mr2sql_mapper_${ctx.Id}_${nodeId}() as (${ctx.PackagedColsJoined}) FROM values('') _t (_c) where false
7777
#end##
7878
#end
7979
) open_mr_alias_${nodeId}_1

odps-sdk-impl/odps-mapred-bridge/src/main/resources/sql.vm

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,9 @@ $!{ctx.functionCreateText}
6868
#end##
6969
#if(${ctx.isNoInputTableInfos()})
7070
#if(!${ctx.mapOnly()})
71-
SELECT mr2sql_mapper_${ctx.Id}() as (${ctx.IntermediateColsJoinedMapOut}) FROM values('') _t (_c)
71+
SELECT mr2sql_mapper_${ctx.Id}() as (${ctx.IntermediateColsJoinedMapOut}) FROM values('') _t (_c) where false
7272
#else
73-
SELECT mr2sql_mapper_${ctx.Id}() as (${ctx.PackagedColsJoined}) FROM values('') _t (_c)
73+
SELECT mr2sql_mapper_${ctx.Id}() as (${ctx.PackagedColsJoined}) FROM values('') _t (_c) where false
7474
#end##
7575
#end
7676
) open_mr_alias1
@@ -104,4 +104,4 @@ SELECT #collist($output_table)
104104
#else
105105
SELECT *
106106
#end
107-
;
107+
;

odps-sdk-impl/odps-mapred-bridge/src/main/resources/stream.vm

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -48,31 +48,43 @@ ${entry.Key} = "${entry.Value}" #if(${foreach.hasNext}) AND #end
4848

4949
$!{ctx.functionCreateText}
5050
@sub_query_mapper :=
51-
FROM
52-
(
51+
select * FROM (
5352
#foreach ( $input_table in ${ctx.InputTableInfos} )
54-
SELECT #selist($input_table ${ctx.InputTableInfos[0].Cols}) FROM #tblname($input_table)
55-
#if (${input_table.PartSpec} && ${input_table.PartSpec.size()} > 0)
56-
WHERE #partfilter($input_table)
53+
FROM (SELECT * FROM #tblname($input_table)
54+
#if (${input_table.PartSpec} && ${input_table.PartSpec.size()} > 0)
55+
WHERE #partfilter($input_table)
56+
#end##
57+
)
58+
MAP #collist($input_table) $!{ctx.mapInputSeparator}
59+
USING '$!{ctx.MapStreamProcessor}'
60+
#if(!${ctx.mapOnly()})
61+
AS ${ctx.IntermediateColsJoinedMapOut} $!{ctx.mapOutputSeparator}
62+
#else
63+
AS #outputColsJoined() $!{ctx.mapOutputSeparator}
64+
#end##
65+
#if(${foreach.hasNext})UNION ALL
5766
#end##
58-
#if(${foreach.hasNext})UNION ALL#end##
5967
#end##
6068
#if(${ctx.isNoInputTableInfos()})
61-
select _c from values('') _t (_c)
69+
FROM (select _c from values('') _t (_c) where false)
70+
MAP '' $!{ctx.mapInputSeparator}
71+
USING '$!{ctx.MapStreamProcessor}'
72+
#if(!${ctx.mapOnly()})
73+
AS ${ctx.IntermediateColsJoinedMapOut} $!{ctx.mapOutputSeparator}
74+
#else
75+
AS #outputColsJoined() $!{ctx.mapOutputSeparator}
76+
#end##
6277
#end
6378
) open_mr_alias1
64-
MAP #inputColsJoined()
65-
USING '$!{ctx.MapStreamProcessor}' $!{ctx.mapInputSeparator}
6679
#if(!${ctx.mapOnly()})
67-
AS ${ctx.IntermediateColsJoined} $!{ctx.mapOutputSeparator};
68-
#else
69-
AS #outputColsJoined() $!{ctx.mapOutputSeparator};
70-
#end##
80+
DISTRIBUTE BY ${ctx.PartitionColsJoined} SORT BY ${ctx.SortColsJoined}
81+
#end
82+
;
7183
#if(!${ctx.mapOnly()})
7284

7385
@sub_query_reducer :=
74-
REDUCE ${ctx.IntermediateColsJoined}
75-
USING '$!{ctx.ReduceStreamProcessor}' $!{ctx.reduceInputSeparator}
86+
REDUCE ${ctx.IntermediateColsJoined} $!{ctx.reduceInputSeparator}
87+
USING '$!{ctx.ReduceStreamProcessor}'
7688
AS #outputColsJoined() $!{ctx.reduceOutputSeparator}
7789
FROM @sub_query_mapper;
7890
#end##

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/Instance.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -903,7 +903,7 @@ public void waitForSuccess() throws OdpsException {
903903
* Instance失败
904904
*/
905905
public void waitForSuccess(long interval) throws OdpsException {
906-
waitForTerminated(interval);
906+
waitForTerminated(interval, false);
907907

908908
if (!isSuccessful()) {
909909
for (Entry<String, TaskStatus> e : getTaskStatus().entrySet()) {
@@ -921,9 +921,14 @@ public void waitForSuccess(long interval) throws OdpsException {
921921
*
922922
* @param interval
923923
* 内部轮询间隔
924+
* @param isBlock
925+
* 是否阻塞,
926+
* 不开启 block,将在客户端长轮询,直到作业结束
927+
* 开启 block 请求将会在服务端等待一段时间(每次请求等待5s,直到作业结束。long-polling),
928+
* block 模式能够跳过 instance post running 阶段(key-path-end optimize)
924929
*/
925-
public void waitForTerminated(long interval) {
926-
while (!isTerminated()) {
930+
public void waitForTerminated(long interval, boolean isBlock) {
931+
while (getStatus(isBlock) != Instance.Status.TERMINATED) {
927932
try {
928933
Thread.sleep(interval);
929934
} catch (InterruptedException e) {
@@ -937,7 +942,7 @@ public void waitForTerminated(long interval) {
937942
* 该方法仅适用于离线作业(1个Instance对应1个Task)
938943
*/
939944
public String waitForTerminatedAndGetResult() throws OdpsException {
940-
waitForTerminated(1000);
945+
waitForTerminated(1000, true);
941946

942947
TaskResult taskResult = getRawTaskResults().get(0);
943948
String resultStr = taskResult.getResult().getString();

0 commit comments

Comments
 (0)