Skip to content

Commit 17818bb

Browse files
authored
feat(mc2mc): expose retry config (#93)
feat: expose retry config
1 parent 98bb66c commit 17818bb

File tree

5 files changed

+23
-6
lines changed

5 files changed

+23
-6
lines changed

mc2mc/internal/client/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type OdpsClient interface {
1818
SetDefaultProject(project string)
1919
SetLogViewRetentionInDays(days int)
2020
SetDryRun(dryRun bool)
21+
SetRetry(max int, backoffMs int)
2122
}
2223

2324
type Client struct {

mc2mc/internal/client/odps.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ type odpsClient struct {
1919

2020
logViewRetentionInDays int
2121
isDryRun bool
22+
retry func(f func() error) error
2223
}
2324

2425
// NewODPSClient creates a new odpsClient instance
@@ -27,6 +28,9 @@ func NewODPSClient(logger *slog.Logger, client *odps.Odps) *odpsClient {
2728
logger: logger,
2829
client: client,
2930
logViewRetentionInDays: 2,
31+
retry: func(f func() error) error {
32+
return retry(logger, 3, 1000, f)
33+
},
3034
}
3135
}
3236

@@ -49,7 +53,7 @@ func (c *odpsClient) ExecSQL(ctx context.Context, query string, additionalHints
4953
// generate log view
5054
url, err := c.generateLogView(taskIns)
5155
if err != nil {
52-
err = e.Join(err, taskIns.Terminate())
56+
err = e.Join(err, c.terminate(taskIns))
5357
return errors.WithStack(err)
5458
}
5559
c.logger.Info(fmt.Sprintf("taskId: %s, log view: %s , hints: (%s)", taskIns.Id(), url, getHintsString(hints)))
@@ -83,6 +87,13 @@ func (c *odpsClient) SetDefaultProject(project string) {
8387
c.client.SetDefaultProjectName(project)
8488
}
8589

90+
// SetRetry sets the retry configuration for the odps client
91+
func (c *odpsClient) SetRetry(max int, backoffMs int) {
92+
c.retry = func(f func() error) error {
93+
return retry(c.logger, max, int64(backoffMs), f)
94+
}
95+
}
96+
8697
// GetPartitionNames returns the partition names of the given table
8798
// by querying the table schema.
8899
func (c *odpsClient) GetPartitionNames(_ context.Context, tableID string) ([]string, error) {
@@ -158,11 +169,6 @@ func (c *odpsClient) wait(taskIns *odps.Instance) <-chan error {
158169
return errChan
159170
}
160171

161-
// retry retries the given function with exponential backoff
162-
func (c *odpsClient) retry(f func() error) error {
163-
return retry(c.logger, 3, 1000, f)
164-
}
165-
166172
func (c *odpsClient) terminate(instance *odps.Instance) error {
167173
if instance == nil {
168174
return nil

mc2mc/internal/client/setup.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,3 +80,10 @@ func SetupOTelSDK(collectorGRPCEndpoint string, otelAttributes string) SetupFn {
8080
return nil
8181
}
8282
}
83+
84+
func SetupRetry(max int, backoffMs int) SetupFn {
85+
return func(c *Client) error {
86+
c.OdpsClient.SetRetry(max, backoffMs)
87+
return nil
88+
}
89+
}

mc2mc/internal/config/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ type ConfigEnv struct {
2424
LogViewRetentionInDays int `env:"LOG_VIEW_RETENTION_IN_DAYS" envDefault:"2"`
2525
DisableMultiQueryGeneration bool `env:"DISABLE_MULTI_QUERY_GENERATION" envDefault:"false"`
2626
DryRun bool `env:"DRY_RUN" envDefault:"false"`
27+
RetryMax int `env:"RETRY_MAX" envDefault:"3"`
28+
RetryBackoffMs int `env:"RETRY_BACKOFF_MS" envDefault:"1000"`
2729
// TODO: delete this
2830
DevEnablePartitionValue string `env:"DEV__ENABLE_PARTITION_VALUE" envDefault:"false"`
2931
DevEnableAutoPartition string `env:"DEV__ENABLE_AUTO_PARTITION" envDefault:"false"`

mc2mc/mc2mc.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ func mc2mc(envs []string) error {
4545
client.SetupDefaultProject(cfg.ExecutionProject),
4646
client.SetUpLogViewRetentionInDays(cfg.LogViewRetentionInDays),
4747
client.SetupDryRun(cfg.DryRun),
48+
client.SetupRetry(cfg.RetryMax, cfg.RetryBackoffMs),
4849
)
4950
if err != nil {
5051
return errors.WithStack(err)

0 commit comments

Comments
 (0)