@@ -35,7 +35,6 @@ module.exports = {
3535 next ( null , jobs )
3636 } )
3737 } ,
38-
3938 async getJobsByTask ( input ) {
4039 const { task, customer } = input
4140
@@ -304,6 +303,95 @@ module.exports = {
304303 )
305304 } )
306305 } ,
306+ /**
307+ *
308+ * @summary Finalize task execution. Save result and submit to elk
309+ *
310+ * @param {Object } input
311+ * @property {Job } input.job
312+ * @property {Object } input.result
313+ * @param {Function } done
314+ *
315+ */
316+ async finish ( input , done ) {
317+ try {
318+ const { job, user, customer, result = { } } = input
319+
320+ let state
321+ let lifecycle
322+ let eventName
323+ let output
324+
325+ if ( result . killed === true ) {
326+ lifecycle = LifecycleConstants . TERMINATED
327+ state = StateConstants . TIMEOUT
328+ eventName = StateConstants . TIMEOUT
329+ } else {
330+ lifecycle = LifecycleConstants . FINISHED
331+ if (
332+ input . state === StateConstants . SUCCESS ||
333+ input . state === StateConstants . FAILURE
334+ ) {
335+ state = input . state
336+ } else {
337+ // assuming success for backward compatibility
338+ state = ( job . default_state_evaluation || StateConstants . SUCCESS )
339+ }
340+ }
341+
342+ job . lifecycle = lifecycle
343+ job . state = state
344+
345+ const file = await this . storeJobOutput ( { job, customer, result } )
346+ job . result_id = file . _id
347+
348+ // data output, can be anything. stringify for security reason
349+ if ( result ?. output ) {
350+ job . output = this . parseOutputParameters ( result . output )
351+ }
352+
353+ if ( result . lastline ) {
354+ try {
355+ const jsonLastline = JSON . parse ( result . lastline )
356+ // looking for state and output
357+ if ( isObject ( jsonLastline ) ) {
358+ if ( jsonLastline . components ) {
359+ job . components = jsonLastline . components
360+ }
361+ if ( jsonLastline . next ) {
362+ job . next = jsonLastline . next
363+ }
364+ if ( jsonLastline . event_name ) {
365+ eventName = jsonLastline . event_name
366+ }
367+ }
368+ } catch ( err ) {
369+ //logger.log(err)
370+ }
371+ } else if ( input . eventName ) {
372+ eventName = input . eventName
373+ }
374+
375+ job . trigger_name = ( eventName || state )
376+
377+ await job . save ( )
378+
379+ this . finishedPostprocessing ( { job, user } )
380+
381+ done ( null , job )
382+ } catch ( err ) {
383+ logger . error ( err )
384+ done ( err )
385+ }
386+ } ,
387+ finishedPostprocessing ( { job, user } ) {
388+ process . nextTick ( ( ) => {
389+ RegisterOperation . submit ( Constants . UPDATE , TopicsConstants . job . crud , { job, user } )
390+ App . scheduler . cancelScheduledTimeoutVerificationJob ( job ) // async
391+ dispatchFinishedJobExecutionEvent ( job )
392+ emitJobFinishedNotification ( { job } )
393+ } )
394+ } ,
307395 /**
308396 * @return {Promise<Job> }
309397 */
@@ -464,108 +552,21 @@ module.exports = {
464552 } ,
465553 /**
466554 *
467- * @summary Finalize task execution. Save result and submit to elk
468- *
469- * @param {Object } input
470- * @property {Job } input.job
471- * @property {Object } input.result
472- * @param {Function } done
555+ * @summary parse incomming job output -> input parameters.
556+ * @return {[String] } array of strings (json encoded strings)
473557 *
474558 */
475- async finish ( input , done ) {
476- try {
477- const { job, user, customer, result = { } } = input
478-
479- let state
480- let lifecycle
481- let eventName
482- let output
483-
484- if ( result . killed === true ) {
485- lifecycle = LifecycleConstants . TERMINATED
486- state = StateConstants . TIMEOUT
487- eventName = StateConstants . TIMEOUT
559+ parseOutputParameters ( output ) {
560+ if ( typeof output === 'string' ) {
561+ return parseOutputStringAsJSON ( output )
562+ } else {
563+ if ( Array . isArray ( output ) ) {
564+ return filterOutputArray ( output )
488565 } else {
489- lifecycle = LifecycleConstants . FINISHED
490- if (
491- input . state === StateConstants . SUCCESS ||
492- input . state === StateConstants . FAILURE
493- ) {
494- state = input . state
495- } else {
496- // assuming success for backward compatibility
497- state = ( job . default_state_evaluation || StateConstants . SUCCESS )
498- }
499- }
500-
501- job . lifecycle = lifecycle
502- job . state = state
503-
504- const file = await this . storeJobOutput ( { job, customer, result } )
505- job . result_id = file . _id
506-
507- // data output, can be anything. stringify for security reason
508- if ( result ?. output ) {
509- job . output = this . parseOutputParameters ( result . output )
510- }
511-
512- if ( result . lastline ) {
513- try {
514- const jsonLastline = JSON . parse ( result . lastline )
515- // looking for state and output
516- if ( isObject ( jsonLastline ) ) {
517- if ( jsonLastline . components ) {
518- job . components = jsonLastline . components
519- }
520- if ( jsonLastline . next ) {
521- job . next = jsonLastline . next
522- }
523- if ( jsonLastline . event_name ) {
524- eventName = jsonLastline . event_name
525- }
526- }
527- } catch ( err ) {
528- //logger.log(err)
529- }
530- } else if ( input . eventName ) {
531- eventName = input . eventName
566+ return [ JSON . stringify ( output ) ]
532567 }
533-
534- job . trigger_name = ( eventName || state )
535-
536- await job . save ( )
537-
538- this . finishedPostprocessing ( { job, user } )
539-
540- done ( null , job )
541- } catch ( err ) {
542- logger . error ( err )
543- done ( err )
544568 }
545569 } ,
546- async storeJobOutput ( { job, customer, result } ) {
547- // parse result output
548- const data = JSON . stringify ( { result } , null , 2 )
549-
550- const storeParams = {
551- customer,
552- data, // result string
553- filename : `${ job . task_id } _result.json` ,
554- mimetype : 'application/json' ,
555- extension : 'json' ,
556- size : data . length ,
557- description : null ,
558- md5 : null ,
559- tags : [ ] ,
560- }
561-
562- const file = await App . file . create ( storeParams , {
563- encoded_data : false ,
564- pathname : 'outputs'
565- } )
566-
567- return file
568- } ,
569570 /**
570571 *
571572 * @summary Cancel Job execution.
@@ -602,31 +603,6 @@ module.exports = {
602603 next ( err )
603604 }
604605 } ,
605- /**
606- *
607- * @summary parse incomming job output -> input parameters.
608- * @return {[String] } array of strings (json encoded strings)
609- *
610- */
611- parseOutputParameters ( output ) {
612- if ( typeof output === 'string' ) {
613- return parseOutputStringAsJSON ( output )
614- } else {
615- if ( Array . isArray ( output ) ) {
616- return filterOutputArray ( output )
617- } else {
618- return [ JSON . stringify ( output ) ]
619- }
620- }
621- } ,
622- finishedPostprocessing ( { job, user } ) {
623- process . nextTick ( ( ) => {
624- RegisterOperation . submit ( Constants . UPDATE , TopicsConstants . job . crud , { job, user } )
625- App . scheduler . cancelScheduledTimeoutVerificationJob ( job ) // async
626- dispatchFinishedJobExecutionEvent ( job )
627- emitJobFinishedNotification ( { job } )
628- } )
629- } ,
630606 jobMustHaveATask ( job ) {
631607 const result = (
632608 job . _type === JobConstants . SCRAPER_TYPE ||
@@ -701,6 +677,29 @@ module.exports = {
701677 } )
702678
703679 return promise
680+ } ,
681+ async storeJobOutput ( { job, customer, result } ) {
682+ // parse result output
683+ const data = JSON . stringify ( { result } , null , 2 )
684+
685+ const storeParams = {
686+ customer,
687+ data, // result string
688+ filename : `${ job . task_id } _result.json` ,
689+ mimetype : 'application/json' ,
690+ extension : 'json' ,
691+ size : data . length ,
692+ description : null ,
693+ md5 : null ,
694+ tags : [ ] ,
695+ }
696+
697+ const file = await App . file . create ( storeParams , {
698+ encoded_data : false ,
699+ pathname : 'outputs'
700+ } )
701+
702+ return file
704703 }
705704}
706705
0 commit comments