Skip to content

Commit a6c4cc5

Browse files
authored
feat(mc2mc): termination on exhausted retry (#92)
feat: termination on exhausted retry
1 parent 17818bb commit a6c4cc5

File tree

1 file changed

+11
-1
lines changed

1 file changed

+11
-1
lines changed

mc2mc/internal/client/odps.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,12 @@ func (c *odpsClient) ExecSQL(ctx context.Context, query string, additionalHints
6868
c.logger.Info(msg)
6969
return errors.WithStack(c.terminate(taskIns))
7070
case err := <-c.wait(taskIns):
71-
return errors.WithStack(err)
71+
if err != nil {
72+
c.logger.Error(fmt.Sprintf("task instance %s failed: %s", taskIns.Id(), err))
73+
err = e.Join(err, c.terminate(taskIns)) // terminate task instance on failure
74+
return errors.WithStack(err)
75+
}
76+
return nil
7277
}
7378
}
7479

@@ -157,6 +162,11 @@ func (c *odpsClient) wait(taskIns *odps.Instance) <-chan error {
157162
if err != nil {
158163
err := errors.Wrap(err, fmt.Sprintf("task instance %s failed", taskIns.Id()))
159164
errChan <- errors.WithStack(err)
165+
return
166+
}
167+
if err := taskIns.Load(); err != nil {
168+
c.logger.Warn(fmt.Sprintf("failed to load task instance %s: %s", taskIns.Id(), err))
169+
return
160170
}
161171
c.logger.Info(fmt.Sprintf("task instance %s finished with status: %s", taskIns.Id(), taskIns.Status()))
162172
sum, err := taskIns.GetTaskSummary(taskIns.TaskNameCommitted())

0 commit comments

Comments
 (0)