Skip to content

Commit 39f2ec1

Browse files
committed
prepare 0.51.0-public.rc2
1 parent b6a3194 commit 39f2ec1

File tree

19 files changed

+229
-59
lines changed

19 files changed

+229
-59
lines changed

docs/docs/core-concept/execute-sql/offline.md

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,9 @@ public static List<Record> getResult(Instance instance,String taskName)throws Od
126126

127127
```java
128128
String taskName="test_select_sql_task";
129-
Instance instance=SQLTask.run(odps,odps.getDefaultProject(),"SELECT * FROM test_select_sql_result;",taskName,null,null,null);
130-
instance.waitForSuccess();
131-
List<Record> records=SQLTask.getResult(instance,taskName);
129+
Instance instance=SQLTask.run(odps,odps.getDefaultProject(),"SELECT * FROM test_select_sql_result;",taskName,null,null,null);
130+
instance.waitForSuccess();
131+
List<Record> records=SQLTask.getResult(instance,taskName);
132132
```
133133

134134
#### 方法二:使用 Instance Tunnel 获取查询结果 (最多1w条)
@@ -161,15 +161,16 @@ public static List<Record> getResultByInstanceTunnel(Instance instance,String ta
161161

162162
```java
163163
String taskName="test_select_sql_task";
164-
Instance instance=SQLTask.run(odps,odps.getDefaultProject(),"SELECT * FROM test_select_sql_result;",taskName,null,null,3);
165-
instance.waitForSuccess();
166-
List<Record> records=SQLTask.getResultByInstanceTunnel(instance,taskName,1000L);
164+
Instance instance=SQLTask.run(odps,odps.getDefaultProject(),"SELECT * FROM test_select_sql_result;",taskName,null,null,3);
165+
instance.waitForSuccess();
166+
List<Record> records=SQLTask.getResultByInstanceTunnel(instance,taskName,1000L);
167167
```
168168

169169
#### 方法三:使用 Instance Tunnel 分页获取查询结果
170170

171171
使用`getResultSet`方法可以使用 instance tunnel 获取记录迭代器,从而可以让用户通过迭代器逐条获取记录来避免一次性获取全量数据到本地时撑爆内存的问题。
172172
ResultSet 实现了 `Iterator<Record>`, `Iterable<Record>` 接口,可以直接使用迭代器进行遍历。
173+
173174
::: note
174175
只有instance的owner本人可以使用本接口。
175176
:::
@@ -203,7 +204,7 @@ public static ResultSet getResultSet(Instance instance,String taskName,
203204
- `taskName`:任务名称。默认值为`AnonymousSQLTask`
204205
- `limit`:获取结果的数量,默认值为`null`
205206
- `limitHint`
206-
:是否限制结果数量(可选)。当limitHint为true时,结果最多只能获得1条记录,超过将截断,但无需进行逐表的权限检查;当limitHint为false时,没有记录数限制,可获取instance对应query结果集的全量数据。但前提是需要逐表(SQL中
207+
:是否限制结果数量(可选)。当limitHint为true时,结果最多只能获得1w条记录,超过将截断,但无需进行逐表的权限检查;当limitHint为false时,没有记录数限制,可获取instance对应query结果集的全量数据。但前提是需要逐表(SQL中
207208
涉及的表与视图)对用户进行权限检查,所以当查询涉及表所在project打开protection时,需要提前在policy中为相应表和视图添加exception,否则无权下载。
208209
- `tunnelEndpoint`:指定的tunnel endpoint(可选),默认通过 endpoint 和 project 自动路由。
209210
- `instanceTunnel`:允许用户使用自己创建的`InstanceTunnel`对象(可选)。
@@ -215,14 +216,14 @@ public static ResultSet getResultSet(Instance instance,String taskName,
215216
**示例代码**
216217

217218
```java
218-
Instance instance=SQLTask.run(odps,"SELECT * FROM test_select_sql_result;");
219-
instance.waitForSuccess();
220-
ResultSet resultSet=SQLTask.getResultSet(instance);
219+
Instance instance = SQLTask.run(odps,"SELECT * FROM test_select_sql_result;");
220+
instance.waitForSuccess();
221+
ResultSet resultSet = SQLTask.getResultSet(instance);
221222

222-
while(resultSet.hasNext()){
223-
Record record=resultSet.next();
223+
while(resultSet.hasNext()){
224+
Record record = resultSet.next();
224225
System.out.println(record);
225-
}
226+
}
226227
```
227228

228229
## SQLExecutor

odps-sdk-impl/odps-mapred-bridge/src/main/java/com/aliyun/odps/mapred/BridgeJobRunner.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,10 @@ private void getProjectModeConf(){
393393
if (mode != null) {
394394
job.set("odps.mr.project.conf", mode);
395395
}
396+
String disable = metaExplorer.getProjectProperty("odps.mr.mode.disable");
397+
if (disable != null) {
398+
job.set("odps.mr.project.disable", disable);
399+
}
396400
} catch (OdpsException ignore) {
397401
}
398402
}

odps-sdk-impl/odps-mapred-bridge/src/main/java/com/aliyun/odps/mapred/bridge/LotReducerUDTF.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,9 @@ public com.aliyun.odps.udf.OdpsType[] resolve(com.aliyun.odps.udf.OdpsType[] unu
271271
int nodeId = Integer.parseInt(funtionName.split("_")[3]);
272272
Pipeline.TransformNode pipeNode = pipeline.getNode(nodeId);
273273
Column[] intermediateFields = (Column[]) ArrayUtils.addAll(pipeNode.getOutputKeySchema(), pipeNode.getOutputValueSchema());
274+
if (nodeId == pipeline.getNodeNum() - 1) {
275+
intermediateFields = appendMultiInsertField(intermediateFields);
276+
}
274277
if (pipeNode.getPartitionerClass() != null) {
275278
intermediateFields = (Column[]) ArrayUtils.addAll(SchemaUtils.fromString("__partition_id__:BIGINT"), intermediateFields);
276279
}
@@ -304,6 +307,9 @@ public com.aliyun.odps.type.TypeInfo[] resolve(com.aliyun.odps.type.TypeInfo[] u
304307
int nodeId = Integer.parseInt(funtionName.split("_")[3]);
305308
Pipeline.TransformNode pipeNode = pipeline.getNode(nodeId);
306309
Column[] intermediateFields = (Column[]) ArrayUtils.addAll(pipeNode.getOutputKeySchema(), pipeNode.getOutputValueSchema());
310+
if (nodeId == pipeline.getNodeNum() - 1) {
311+
intermediateFields = appendMultiInsertField(intermediateFields);
312+
}
307313
if (pipeNode.getPartitionerClass() != null) {
308314
intermediateFields = (Column[]) ArrayUtils.addAll(SchemaUtils.fromString("__partition_id__:BIGINT"), intermediateFields);
309315
}
@@ -315,6 +321,18 @@ public com.aliyun.odps.type.TypeInfo[] resolve(com.aliyun.odps.type.TypeInfo[] u
315321
return SchemaUtils.getTypeInfos(udtfCtx.getPackagedOutputSchema());
316322
}
317323

324+
Column[] appendMultiInsertField(Column[] intermediateFields) {
325+
if (intermediateFields == null || conf == null) {
326+
return intermediateFields;
327+
}
328+
TableInfo[] tables = com.aliyun.odps.mapred.utils.OutputUtils.getTables(conf);
329+
if (tables != null && tables.length > 1) {
330+
intermediateFields = (Column[]) ArrayUtils.addAll(
331+
intermediateFields, SchemaUtils.fromString("MULTIDEST_LABEL:STRING"));
332+
}
333+
return intermediateFields;
334+
}
335+
318336
@Override
319337
public void setup(ExecutionContext eCtx) {
320338
ctx = new ReduceContextImpl(conf);

odps-sdk-impl/odps-mapred-bridge/src/main/java/com/aliyun/odps/mapred/bridge/UDTFTaskContextImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,8 @@ protected void configure(ExecutionContext ctx) {
390390
}
391391

392392
private void resolveSqlModeNodeIndex(String exeMode, String tid) {
393-
if (exeMode != null && exeMode.equalsIgnoreCase("lot")) {
393+
if (exeMode != null && exeMode.equalsIgnoreCase("lot") &&
394+
(conf.get("odps.mr.project.disable") == null || !Boolean.parseBoolean(conf.get("odps.mr.project.disable")))) {
394395
return;
395396
}
396397
TableInfo[] infos = InputUtils.getTables(conf);

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/Instance.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -679,7 +679,6 @@ public TaskSummary getTaskSummary(String taskName) throws OdpsException {
679679
params.put("instancesummary", null);
680680
params.put("taskname", taskName);
681681
Response result = client.request(getResource(), "GET", params, null, null);
682-
System.out.println(new String(result.getBody()));
683682
TaskSummary summary = null;
684683
try {
685684
Gson gson = GsonObjectBuilder.get();

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/LogView.java

Lines changed: 47 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,26 @@ public class LogView {
3232
private static final String HOST_DEFAULT = "http://logview.aliyun.com";
3333
private String logViewHost = "";
3434

35+
private static final String HOST_DEFAULT_V2 = "https://maxcompute.console.aliyun.com";
36+
37+
private int version = 1;
38+
3539
Odps odps;
3640

3741
public LogView(Odps odps) {
42+
this(odps, 1);
43+
}
44+
45+
public LogView(Odps odps, int version) {
3846
this.odps = odps;
47+
this.version = version;
3948
}
4049

4150
private String getLogviewHost() {
51+
if (2 == version) {
52+
return HOST_DEFAULT_V2;
53+
}
54+
4255
if (odps.getLogViewHost() != null) {
4356
return odps.getLogViewHost();
4457
} else {
@@ -93,14 +106,7 @@ public void setLogViewHost(String logViewHost) {
93106
* @throws OdpsException
94107
*/
95108
public String generateLogView(Instance instance, long hours) throws OdpsException {
96-
if (StringUtils.isNullOrEmpty(logViewHost)) {
97-
logViewHost = getLogviewHost();
98-
}
99-
100-
String token = generateInstanceToken(instance, hours);
101-
String logview = logViewHost + "/logview/?h=" + odps.getEndpoint() + "&p="
102-
+ instance.getProject() + "&i=" + instance.getId() + "&token=" + token;
103-
return logview;
109+
return generateLogView(instance, hours, null, null);
104110
}
105111

106112
/**
@@ -116,14 +122,7 @@ public String generateLogView(Instance instance, long hours) throws OdpsExceptio
116122
* @throws OdpsException
117123
*/
118124
public String generateSubQueryLogView(Instance instance, int queryId, long hours) throws OdpsException {
119-
if (StringUtils.isNullOrEmpty(logViewHost)) {
120-
logViewHost = getLogviewHost();
121-
}
122-
123-
String token = generateInstanceToken(instance, hours);
124-
String logview = logViewHost + "/logview/?h=" + odps.getEndpoint() + "&p="
125-
+ instance.getProject() + "&i=" + instance.getId() + "&subQuery=" + queryId +"&token=" + token;
126-
return logview;
125+
return generateLogView(instance, hours, queryId, null);
127126
}
128127

129128
/**
@@ -137,14 +136,42 @@ public String generateSubQueryLogView(Instance instance, int queryId, long hours
137136
* 同一个attach session 可以复用已有的token
138137
* @return logview
139138
*/
140-
public String generateSubQueryLogView(Instance instance, int queryId, String token) {
139+
public String generateSubQueryLogView(Instance instance, int queryId, String token)
140+
throws OdpsException {
141+
return generateLogView(instance, 24, queryId, token);
142+
}
143+
144+
private String generateLogView(Instance instance, long hours, Integer queryId, String token)
145+
throws OdpsException {
141146
if (StringUtils.isNullOrEmpty(logViewHost)) {
142147
logViewHost = getLogviewHost();
143148
}
144149

145-
String logview = logViewHost + "/logview/?h=" + odps.getEndpoint() + "&p="
146-
+ instance.getProject() + "&i=" + instance.getId() + "&subQuery=" + queryId +"&token=" + token;
147-
return logview;
150+
if (1 == version) {
151+
StringBuilder urlBuilder = new StringBuilder(logViewHost);
152+
urlBuilder.append("/logview/?h=").append(odps.getEndpoint())
153+
.append("&p=").append(instance.getProject())
154+
.append("&i=").append(instance.getId());
155+
if (queryId != null) {
156+
urlBuilder.append("&subQuery=").append(queryId);
157+
}
158+
if (token == null) {
159+
token = generateInstanceToken(instance, hours);
160+
}
161+
urlBuilder.append("&token=").append(token);
162+
return urlBuilder.toString();
163+
} else if (2 == version) {
164+
String url = logViewHost + "/" + odps.projects().get().getRegionId()
165+
+ "/job-insights?h=" + odps.getEndpoint()
166+
+ "&p=" + odps.getDefaultProject()
167+
+ "&i=" + instance.getId();
168+
if (queryId != null) {
169+
url += "&subQuery=" + queryId;
170+
}
171+
return url;
172+
} else {
173+
throw new IllegalArgumentException("logview version must be 1 or 2");
174+
}
148175
}
149176

150177
/**

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/Project.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,18 @@ public Cluster(String name, String quotaID) {
271271
@Convert(SimpleXmlUtils.EmptyStringConverter.class)
272272
String regionId;
273273

274+
@Element(name = "ClusterRole", required = false)
275+
@Convert(SimpleXmlUtils.EmptyStringConverter.class)
276+
String clusterRole;
277+
278+
@Element(name = "JobDataPath", required = false)
279+
@Convert(SimpleXmlUtils.EmptyStringConverter.class)
280+
String jobDataPath;
281+
282+
@Element(name = "ZoneId", required = false)
283+
@Convert(SimpleXmlUtils.EmptyStringConverter.class)
284+
String zoneId;
285+
274286
@Element(name = "IsDefaultInRegion", required = false)
275287
@Convert(SimpleXmlUtils.EmptyStringConverter.class)
276288
String defaultInRegion;
@@ -290,6 +302,18 @@ public String getRegionId() {
290302
return regionId;
291303
}
292304

305+
public String getClusterRole() {
306+
return clusterRole;
307+
}
308+
309+
public String getJobDataPath() {
310+
return jobDataPath;
311+
}
312+
313+
public String getZoneId() {
314+
return zoneId;
315+
}
316+
293317
public boolean isDefaultInRegion() {
294318
return Boolean.parseBoolean(defaultInRegion);
295319
}

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/ProjectFilter.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,9 @@
2222
*/
2323
package com.aliyun.odps;
2424

25-
import com.aliyun.odps.utils.StringUtils;
26-
2725
import java.util.Map;
28-
import java.util.TreeMap;
26+
27+
import com.aliyun.odps.utils.StringUtils;
2928

3029
/**
3130
* ProjectFilter用于查询所有项目时根据条件过滤表
@@ -67,6 +66,8 @@ public class ProjectFilter {
6766
private String quotaType = null;
6867

6968
private String quotaName = null;
69+
70+
private Boolean enableDr = null;
7071
/**
7172
* 获得表所有者
7273
*
@@ -209,6 +210,18 @@ public void setQuotaName(String quotaName) {
209210
this.quotaName = quotaName;
210211
}
211212

213+
/**
214+
* 判断是否开启了存储容灾 "odps.project.dr.enable" == "true"
215+
* @return
216+
*/
217+
public Boolean getEnableDr() {
218+
return enableDr;
219+
}
220+
221+
public void setEnableDr(Boolean enableDr) {
222+
this.enableDr = enableDr;
223+
}
224+
212225
/**
213226
* Put this to @params as key-value. Replace value if key exists and value not empty.
214227
*/
@@ -240,5 +253,8 @@ public void addTo(Map<String, String> params) {
240253
if (!StringUtils.isNullOrEmpty(name)) {
241254
params.put("name", name);
242255
}
256+
if (enableDr != null) {
257+
params.put("enabledr", String.valueOf(enableDr));
258+
}
243259
}
244260
}

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/Session.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,13 @@ public String getLogView() throws OdpsException {
112112
return logView;
113113
}
114114

115+
public String getLogView(int version) throws OdpsException {
116+
if (logView == null && odps != null) {
117+
logView = new LogView(odps, version).generateLogView(instance, 7 * 24 /* by default one week. can be set by config */);
118+
}
119+
return logView;
120+
}
121+
115122
public void setLogView(String logView) {
116123
this.logView = logView;
117124
}

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/Table.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,8 @@ public enum TableType {
9999
/**
100100
* Materialized view
101101
*/
102-
MATERIALIZED_VIEW
102+
MATERIALIZED_VIEW,
103+
OBJECT_TABLE
103104
}
104105

105106
/**
@@ -203,6 +204,7 @@ static class Schema {
203204
boolean isMaterializedViewRewriteEnabled;
204205
boolean isMaterializedViewOutdated;
205206
boolean isExternalTable;
207+
boolean isObjectTable;
206208
long life = -1L;
207209
long hubLifecycle = -1L;
208210
String viewText;
@@ -964,6 +966,17 @@ public boolean isExternalTable() {
964966
}
965967
}
966968

969+
public boolean isObjectTable() {
970+
if (isLoaded()) {
971+
return model.isObjectTable;
972+
} else if (model.type != null) {
973+
return TableType.OBJECT_TABLE.equals(model.type);
974+
} else {
975+
lazyLoad();
976+
return model.isObjectTable;
977+
}
978+
}
979+
967980
/**
968981
* 获取视图的文本内容
969982
*
@@ -1410,6 +1423,10 @@ private TableSchema loadSchemaFromJson(String json) {
14101423
model.isExternalTable = tree.get("isExternal").getAsBoolean();
14111424
}
14121425

1426+
if (tree.has("isObjectTable")) {
1427+
model.isObjectTable = tree.get("isObjectTable").getAsBoolean();
1428+
}
1429+
14131430
if (tree.has("lifecycle")) {
14141431
model.life = tree.get("lifecycle").getAsLong();
14151432
}
@@ -2119,7 +2136,7 @@ public void changeOwner(String newOwner) throws OdpsException {
21192136
if (isVirtualView() || isMaterializedView()) {
21202137
target = "view";
21212138
}
2122-
runSQL(String.format("ALTER %s %s CHANGE OWNER TO %s;", target, getCoordinate(), OdpsCommonUtils.quoteStr(newOwner)));
2139+
runSQL(String.format("ALTER %s %s CHANGEOWNER TO %s;", target, getCoordinate(), OdpsCommonUtils.quoteStr(newOwner)));
21232140
}
21242141

21252142
/**

0 commit comments

Comments
 (0)