1919
2020package com .aliyun .odps ;
2121
22+ import static com .aliyun .odps .task .SQLTask .parseCsvRecord ;
23+
24+ import java .io .InputStream ;
2225import java .util .ArrayList ;
2326import java .util .HashMap ;
2427import java .util .Iterator ;
2528import java .util .List ;
2629import java .util .Map ;
30+ import java .util .Properties ;
31+ import java .util .concurrent .atomic .AtomicReference ;
2732
2833import com .aliyun .odps .Schema .SchemaModel ;
2934import com .aliyun .odps .commons .transport .Headers ;
35+ import com .aliyun .odps .data .Record ;
3036import com .aliyun .odps .rest .ResourceBuilder ;
3137import com .aliyun .odps .rest .RestClient ;
3238import com .aliyun .odps .rest .SimpleXmlUtils ;
3339import com .aliyun .odps .simpleframework .xml .Element ;
3440import com .aliyun .odps .simpleframework .xml .ElementList ;
3541import com .aliyun .odps .simpleframework .xml .Root ;
3642import com .aliyun .odps .simpleframework .xml .convert .Convert ;
43+ import com .aliyun .odps .task .SQLTask ;
3744import com .aliyun .odps .utils .ExceptionUtils ;
3845import com .aliyun .odps .utils .StringUtils ;
3946
@@ -289,9 +296,62 @@ public String getMarker() {
289296 return params .get ("marker" );
290297 }
291298
299+
300+ /**
301+ * getExternalProjectSchemaList , special for EPV2
302+ * @param projectName
303+ * @return
304+ * @throws OdpsException
305+ */
306+ public List <Schema > getExternalProjectSchemaList (String projectName ) throws OdpsException {
307+ //Temporary code for openlake Demo(YunQi big conference):
308+ // Long-term this code block should be converted to SQL for both internal and external projects,
309+ // and should not have any setting flag, except for opening the 3 layer model based on project attributes(user can choose use 2 or 3tier for 3 layer Project ).
310+ //warning filter not support in EPV2
311+ Map <String , String > queryHint = new HashMap <>();
312+ InputStream is = null ;
313+ try {
314+ is = Schemas .class .getResourceAsStream ("/com/aliyun/odps/core/base.conf" );
315+ Properties properties = new Properties ();
316+ properties .load (is );
317+ String majorVersion = properties .getProperty ("epv2flighting" );
318+ if (majorVersion != null && !majorVersion .isEmpty () && !"default" .equals (majorVersion )) {
319+ queryHint .put ("odps.task.major.version" , majorVersion );
320+ }
321+
322+ } catch (Exception e ) {
323+ } finally {
324+ org .apache .commons .io .IOUtils .closeQuietly (is );
325+ }
326+ queryHint .put ("odps.sql.select.output.format" , "csv" );
327+ Instance i = SQLTask .run (odps , projectName , "show schemas;" , queryHint , null );
328+ i .waitForSuccess ();
329+ Instance .InstanceResultModel .TaskResult taskResult = i .getRawTaskResults ().get (0 );
330+ Instance .TaskStatus .Status taskStatus =
331+ Instance .TaskStatus .Status .valueOf (taskResult .status .toUpperCase ());
332+ if (taskStatus != Instance .TaskStatus .Status .SUCCESS ) {
333+ throw new RuntimeException ("show schemas failed. instanceId:" + i .getId ());
334+ }
335+
336+ String result = taskResult .result .getString ();
337+ List <Record > schemalist = parseCsvRecord (result );
338+ if (schemalist == null || schemalist .isEmpty ()) {
339+ return null ;
340+ }
341+ ArrayList <Schema > schemas = new ArrayList <>();
342+ for (Record s : schemalist ) {
343+ SchemaModel model = new SchemaModel ();
344+ model .name = s .get (0 ).toString ();
345+ Schema schema = new Schema (model , projectName , odps );
346+ schema .setLoaded (true );
347+ schemas .add (schema );
348+ }
349+ return schemas ;
350+ }
351+
292352 @ Override
293353 protected List <Schema > list () {
294- ArrayList < Schema > schemas = new ArrayList <>();
354+ AtomicReference < List < Schema >> schemas = new AtomicReference <>( new ArrayList <>() );
295355 params .put ("expectmarker" , "true" );
296356 String lastMarker = params .get ("marker" );
297357 if (params .containsKey ("marker" ) && StringUtils .isNullOrEmpty (lastMarker )) {
@@ -307,26 +367,28 @@ protected List<Schema> list() {
307367 params .put ("owner" , filter .getOwner ());
308368 }
309369 }
310-
311370 String resource = ResourceBuilder .buildSchemaResource (projectName );
312-
313371 try {
314-
315- ListSchemasResponse resp = client .request (
316- ListSchemasResponse .class , resource , "GET" , params );
317-
318- for (SchemaModel model : resp .schemas ) {
319- Schema schema = new Schema (model , projectName , odps );
320- schemas .add (schema );
321- }
322-
323- params .put ("marker" , resp .marker );
324-
372+ return odps .projects ().get (projectName ).executeIfEpv2 (() -> {
373+ schemas .set (getExternalProjectSchemaList (projectName ));
374+ if (schemas .get () == null || schemas .get ().isEmpty ()) {
375+ return null ;
376+ }
377+ params .put ("marker" , "" );
378+ return schemas .get ();
379+ }, () -> {
380+ ListSchemasResponse resp = client .request (
381+ ListSchemasResponse .class , resource , "GET" , params );
382+ for (SchemaModel model : resp .schemas ) {
383+ Schema schema = new Schema (model , projectName , odps );
384+ schemas .get ().add (schema );
385+ }
386+ params .put ("marker" , resp .marker );
387+ return schemas .get ();
388+ });
325389 } catch (OdpsException e ) {
326- throw new RuntimeException ( e . getMessage (), e );
390+ throw new UncheckedOdpsException ( e );
327391 }
328-
329- return schemas ;
330392 }
331393 }
332394}
0 commit comments