-
Notifications
You must be signed in to change notification settings - Fork 125
feat: add dynamic module registration #996
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Warning Rate limit exceeded@gfyrag has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 13 minutes and 14 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ⛔ Files ignored due to path filters (4)
📒 Files selected for processing (10)
WalkthroughThis change refactors configuration loading and worker integration by introducing a dynamic, reflection-based runner factory system. It removes hardcoded configuration structs and static flag registrations, replacing them with a generic mapping and factory pattern. The update also migrates runner lifecycle management to a new Uber Fx module, decoupling configuration from runner instantiation. Changes
Sequence Diagram(s)sequenceDiagram
participant CLI as Cobra Command
participant Config as MapConfig
participant Worker as WorkerFactory
participant FX as Uber Fx App
CLI->>Config: MapConfig(cmd, cfg)
Config->>Config: Bind flags, Unmarshal config
CLI->>Worker: NewFXModule(configLoader)
Worker->>Worker: For each RunnerFactory:
Worker->>Config: Load config via configLoader
Worker->>Worker: CreateRunner(config)
Worker->>FX: fx.Provide(runner), fx.Invoke(lifecycle)
FX->>Worker: Start runners on app start
FX->>Worker: Stop runners on app stop
Possibly related PRs
Suggested reviewers
Poem
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🔭 Outside diff range comments (1)
internal/storage/async_block.go (1)
1-19
: Add missing import for worker package interfaces.The code uses
Runner
andRunnerFactory
interfaces that are defined ininternal/worker/module.go
, but the import statement is missing. This will cause a compilation error.Add the missing import:
import ( "context" "fmt" + "github.com/formancehq/ledger/internal/worker" "github.com/formancehq/go-libs/v3/bun/bunpaginate"
Then update the interface references:
-) (Runner, error) { +) (worker.Runner, error) {-var _ RunnerFactory[AsyncBlockRunnerConfig] = (*AsyncBlockRunnerFactory)(nil) +var _ worker.RunnerFactory[AsyncBlockRunnerConfig] = (*AsyncBlockRunnerFactory)(nil)Also applies to: 156-156, 173-173
♻️ Duplicate comments (1)
pkg/testserver/worker.go (1)
18-18
: Apply the same constant extraction pattern here.
🧹 Nitpick comments (2)
pkg/testserver/worker.go (1)
11-11
: Consider extracting the hardcoded flag name to a constant.While the removal of the
cmd
package dependency aligns with the dynamic configuration approach, hardcoding flag names as string literals can lead to maintenance issues and potential typos.Consider defining constants for these flag names at the package level:
+const ( + WorkerAsyncBlockHasherMaxSizeFlag = "worker-async-block-hasher-max-block-size" + WorkerAsyncBlockHasherScheduleFlag = "worker-async-block-hasher-schedule" +) + func LogsHashBlockMaxSizeInstrumentation(size int) testservice.InstrumentationFunc { return func(ctx context.Context, runConfiguration *testservice.RunConfiguration) error { - runConfiguration.AppendArgs("--worker-async-block-hasher-max-block-size", strconv.Itoa(size)) + runConfiguration.AppendArgs("--" + WorkerAsyncBlockHasherMaxSizeFlag, strconv.Itoa(size)) return nil } }internal/storage/async_block.go (1)
151-171
: Consider documenting the CRON format in the config description.The CRON parser includes seconds (
cron.Second
), making it a 6-field format instead of the standard 5-field format. While the default value shows this correctly, consider updating the description to clarify this.- HashLogsBlockCRONSpec string `mapstructure:"worker-async-block-hasher-schedule" description:"Schedule" default:"0 * * * * *"` + HashLogsBlockCRONSpec string `mapstructure:"worker-async-block-hasher-schedule" description:"Schedule (6-field CRON: seconds minutes hours day month weekday)" default:"0 * * * * *"`
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
cmd/config.go
(1 hunks)cmd/serve.go
(2 hunks)cmd/worker.go
(3 hunks)internal/storage/async_block.go
(6 hunks)internal/worker/fx.go
(0 hunks)internal/worker/module.go
(1 hunks)pkg/testserver/worker.go
(1 hunks)
💤 Files with no reviewable changes (1)
- internal/worker/fx.go
🧰 Additional context used
🧬 Code Graph Analysis (1)
internal/storage/async_block.go (2)
internal/storage/bucket/bucket.go (1)
Bucket
(13-21)internal/worker/module.go (3)
Runner
(57-60)RunnerFactory
(62-66)RegisterRunnerFactory
(74-76)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Tests
- GitHub Check: Dirty
🔇 Additional comments (7)
cmd/serve.go (1)
151-157
: LGTM! Clean integration of the dynamic worker module.The error handling is appropriate and the conditional module creation based on
cfg.WorkerEnabled
is well implemented.cmd/config.go (2)
9-16
: LGTM! Clean separation of concerns.The refactoring properly separates generic type handling from Viper configuration mapping, enabling reuse in the dynamic configuration system.
18-29
: Well-structured configuration mapping with proper error handling.The error messages are descriptive and will help with debugging configuration issues.
cmd/worker.go (1)
57-62
: LGTM! Clean implementation of dynamic worker module creation.The error handling is appropriate and consistent with the pattern used in serve.go.
internal/storage/async_block.go (3)
21-24
: Config struct changes look good!The more specific field names (
HashLogsBlockMaxSize
andHashLogsBlockCRONSpec
) improve clarity, and the mapstructure tags with default values support the new dynamic configuration system.
26-33
: Struct refactoring improves testability.Replacing the config field with explicit fields (
maxBlockSize
andschedule
) decouples the runner from its configuration structure, making it more testable and following dependency injection best practices.
115-135
: Constructor refactoring aligns with DI principles.The updated constructor with explicit parameters makes dependencies clear and aligns well with the factory pattern implementation.
func addWorkerFlags(cmd *cobra.Command) { | ||
for _, runnerFactory := range worker.AllRunnerFactories() { | ||
typeOfRunnerFactory := reflect.TypeOf(runnerFactory) | ||
method, _ := typeOfRunnerFactory.MethodByName("CreateRunner") | ||
configType := method.Type.In(1) | ||
|
||
type WorkerConfiguration struct { | ||
HashLogsBlockMaxSize int `mapstructure:"worker-async-block-hasher-max-block-size"` | ||
HashLogsBlockCRONSpec string `mapstructure:"worker-async-block-hasher-schedule"` | ||
} | ||
for i := 0; i < configType.NumField(); i++ { | ||
field := configType.Field(i) | ||
fieldTag := field.Tag | ||
flag := field.Tag.Get("mapstructure") | ||
description := fieldTag.Get("description") | ||
defaultValue := fieldTag.Get("default") | ||
|
||
func addWorkerFlags(cmd *cobra.Command) { | ||
cmd.Flags().Int(WorkerAsyncBlockHasherMaxBlockSizeFlag, 1000, "Max block size") | ||
cmd.Flags().String(WorkerAsyncBlockHasherScheduleFlag, "0 * * * * *", "Schedule") | ||
switch field.Type.Kind() { | ||
case reflect.Int, reflect.Int64, reflect.Int32, reflect.Int16, reflect.Int8: | ||
defaultValue, err := strconv.ParseInt(defaultValue, 10, 64) | ||
if err != nil { | ||
panic(err) | ||
} | ||
cmd.Flags().Int(flag, int(defaultValue), description) | ||
case reflect.String: | ||
cmd.Flags().String(flag, defaultValue, description) | ||
default: | ||
panic(fmt.Sprintf("cannot config flag %s as type %T is not handled", flag, field.Type)) | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve error handling and add safety checks in reflection code.
The dynamic flag registration is innovative but has several areas for improvement:
- Method lookup should verify existence
- Panicking on errors is too harsh for a configuration system
- Limited type support
Apply these improvements:
func addWorkerFlags(cmd *cobra.Command) {
for _, runnerFactory := range worker.AllRunnerFactories() {
typeOfRunnerFactory := reflect.TypeOf(runnerFactory)
method, _ := typeOfRunnerFactory.MethodByName("CreateRunner")
+ if !method.IsValid() {
+ continue // Skip factories without CreateRunner method
+ }
configType := method.Type.In(1)
for i := 0; i < configType.NumField(); i++ {
field := configType.Field(i)
fieldTag := field.Tag
flag := field.Tag.Get("mapstructure")
+ if flag == "" {
+ continue // Skip fields without mapstructure tag
+ }
description := fieldTag.Get("description")
defaultValue := fieldTag.Get("default")
switch field.Type.Kind() {
case reflect.Int, reflect.Int64, reflect.Int32, reflect.Int16, reflect.Int8:
defaultValue, err := strconv.ParseInt(defaultValue, 10, 64)
if err != nil {
- panic(err)
+ defaultValue = 0 // Use zero value on parse error
}
cmd.Flags().Int(flag, int(defaultValue), description)
case reflect.String:
cmd.Flags().String(flag, defaultValue, description)
+ case reflect.Bool:
+ boolDefault, _ := strconv.ParseBool(defaultValue)
+ cmd.Flags().Bool(flag, boolDefault, description)
default:
- panic(fmt.Sprintf("cannot config flag %s as type %T is not handled", flag, field.Type))
+ // Log warning or skip unsupported types silently
+ continue
}
}
}
}
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In cmd/worker.go lines 18 to 45, improve the reflection code by first checking
if the "CreateRunner" method exists before using it to avoid panics. Replace
panics on errors like strconv.ParseInt failures with proper error handling or
logging to prevent crashing the application. Add safety checks for unsupported
field types and handle them gracefully instead of panicking. Consider extending
type support beyond just int and string for better flexibility.
var runnerFactories = make([]any, 0) | ||
|
||
func AllRunnerFactories() []any { | ||
return runnerFactories | ||
} | ||
|
||
func RegisterRunnerFactory[Config any](factory RunnerFactory[Config]) { | ||
runnerFactories = append(runnerFactories, factory) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add thread-safety to runner factory registration.
The global runnerFactories
slice could have race conditions if factories are registered concurrently.
-var runnerFactories = make([]any, 0)
+var (
+ runnerFactories = make([]any, 0)
+ runnerFactoriesMu sync.RWMutex
+)
func AllRunnerFactories() []any {
+ runnerFactoriesMu.RLock()
+ defer runnerFactoriesMu.RUnlock()
return runnerFactories
}
func RegisterRunnerFactory[Config any](factory RunnerFactory[Config]) {
+ runnerFactoriesMu.Lock()
+ defer runnerFactoriesMu.Unlock()
runnerFactories = append(runnerFactories, factory)
}
Also add at the top of the file:
import (
"context"
+ "fmt"
+ "sync"
"go.uber.org/fx"
"reflect"
)
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In internal/worker/module.go around lines 68 to 76, the global runnerFactories
slice is accessed and modified without synchronization, which can cause race
conditions during concurrent registrations. To fix this, add a sync.Mutex or
sync.RWMutex at the top of the file to protect access to runnerFactories. Lock
the mutex before appending to runnerFactories in RegisterRunnerFactory and
unlock it afterward. Also, lock the mutex for reading in AllRunnerFactories to
safely return the slice.
func NewFXModule(configLoader func(v any) error) (fx.Option, error) { | ||
|
||
options := make([]fx.Option, 0) | ||
for _, factory := range runnerFactories { | ||
vFactory := reflect.TypeOf(factory) | ||
method, _ := vFactory.MethodByName("CreateRunner") | ||
configType := reflect.New(method.Type.In(1)).Interface() | ||
if err := configLoader(configType); err != nil { | ||
return nil, err | ||
} | ||
|
||
ret := reflect.ValueOf(factory). | ||
MethodByName("CreateRunner"). | ||
Call([]reflect.Value{ | ||
reflect.ValueOf(configType).Elem(), | ||
}) | ||
if ret[1].Interface() != nil { | ||
return nil, ret[1].Interface().(error) | ||
} | ||
|
||
runnerConstructor := ret[0].Interface() | ||
|
||
options = append(options, fx.Provide( | ||
fx.Annotate(runnerConstructor, fx.ResultTags(`group:"runners"`)), | ||
)) | ||
} | ||
|
||
options = append(options, | ||
fx.Invoke(fx.Annotate(func(runners []Runner, lc fx.Lifecycle) { | ||
for _, runner := range runners { | ||
lc.Append(fx.Hook{ | ||
OnStart: func(ctx context.Context) error { | ||
go func() { | ||
if err := runner.Run(context.WithoutCancel(ctx)); err != nil { | ||
panic(err) | ||
} | ||
}() | ||
return nil | ||
}, | ||
OnStop: runner.Stop, | ||
}) | ||
} | ||
}, fx.ParamTags(`group:"runners"`))), | ||
) | ||
|
||
return fx.Options(options...), nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical issues with error handling and context management.
Several concerns need to be addressed:
- Context management: Using
context.WithoutCancel
prevents graceful shutdown - Error handling: Panicking in a goroutine will crash the entire application
- Reflection safety: Missing validation before method calls
Apply these critical fixes:
func NewFXModule(configLoader func(v any) error) (fx.Option, error) {
options := make([]fx.Option, 0)
for _, factory := range runnerFactories {
vFactory := reflect.TypeOf(factory)
method, _ := vFactory.MethodByName("CreateRunner")
+ if !method.IsValid() {
+ return nil, fmt.Errorf("factory %T does not implement CreateRunner method", factory)
+ }
configType := reflect.New(method.Type.In(1)).Interface()
if err := configLoader(configType); err != nil {
return nil, err
}
ret := reflect.ValueOf(factory).
MethodByName("CreateRunner").
Call([]reflect.Value{
reflect.ValueOf(configType).Elem(),
})
if ret[1].Interface() != nil {
- return nil, ret[1].Interface().(error)
+ if err, ok := ret[1].Interface().(error); ok && err != nil {
+ return nil, err
+ }
}
runnerConstructor := ret[0].Interface()
options = append(options, fx.Provide(
fx.Annotate(runnerConstructor, fx.ResultTags(`group:"runners"`)),
))
}
options = append(options,
fx.Invoke(fx.Annotate(func(runners []Runner, lc fx.Lifecycle) {
for _, runner := range runners {
+ runner := runner // capture loop variable
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
+ errCh := make(chan error, 1)
go func() {
- if err := runner.Run(context.WithoutCancel(ctx)); err != nil {
- panic(err)
- }
+ errCh <- runner.Run(ctx)
}()
+
+ // Consider how to handle runner errors - log them?
+ go func() {
+ if err := <-errCh; err != nil {
+ // Log error instead of panicking
+ // Consider using a logger injected via DI
+ }
+ }()
return nil
},
OnStop: runner.Stop,
})
}
}, fx.ParamTags(`group:"runners"`))),
)
return fx.Options(options...), nil
}
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In internal/worker/module.go lines 9 to 55, fix context management by replacing
context.WithoutCancel with the original context to allow graceful shutdown.
Change the goroutine error handling to avoid panicking; instead, log the error
or handle it safely without crashing the app. Add validation checks before
calling methods via reflection to ensure the method exists and has the expected
signature, preventing runtime panics.
a3431e5
to
fc9840a
Compare
fc9840a
to
6c053a1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Reflection Logic Fails Without Proper Validation
The reflection logic for RunnerFactory
lacks robust validation, leading to panics if registered factories do not conform to expected interfaces. The code assumes the "CreateRunner" method exists, has at least two parameters, and that its second parameter is a struct. Ignoring the boolean return from MethodByName
, not checking parameter count before Type.In(1)
, and not validating the type kind before NumField()
can cause nil pointer dereferences, index out of bounds errors, or calls on zero values.
cmd/worker.go#L18-L24
Lines 18 to 24 in 6c053a1
func addWorkerFlags(cmd *cobra.Command) { | |
for _, runnerFactory := range worker.AllRunnerFactories() { | |
typeOfRunnerFactory := reflect.TypeOf(runnerFactory) | |
method, _ := typeOfRunnerFactory.MethodByName("CreateRunner") | |
configType := method.Type.In(1) | |
for i := 0; i < configType.NumField(); i++ { |
internal/worker/module.go#L12-L24
ledger/internal/worker/module.go
Lines 12 to 24 in 6c053a1
for _, factory := range runnerFactories { | |
vFactory := reflect.TypeOf(factory) | |
method, _ := vFactory.MethodByName("CreateRunner") | |
configType := reflect.New(method.Type.In(1)).Interface() | |
if err := configLoader(configType); err != nil { | |
return nil, err | |
} | |
ret := reflect.ValueOf(factory). | |
MethodByName("CreateRunner"). | |
Call([]reflect.Value{ | |
reflect.ValueOf(configType).Elem(), | |
}) |
Bug: Configuration Parsing Fails Gracelessly
The addWorkerFlags
function panics at startup, crashing the application. This occurs when parsing configuration flags if strconv.ParseInt
fails due to an invalid default integer value in a struct tag, or if an unsupported field type is encountered. These panics should be replaced with graceful error handling.
cmd/worker.go#L32-L41
Lines 32 to 41 in 6c053a1
case reflect.Int, reflect.Int64, reflect.Int32, reflect.Int16, reflect.Int8: | |
defaultValue, err := strconv.ParseInt(defaultValue, 10, 64) | |
if err != nil { | |
panic(err) | |
} | |
cmd.Flags().Int(flag, int(defaultValue), description) | |
case reflect.String: | |
cmd.Flags().String(flag, defaultValue, description) | |
default: | |
panic(fmt.Sprintf("cannot config flag %s as type %T is not handled", flag, field.Type)) |
Bug: Go Loop Closure Captures Last Runner
Classic Go closure bug in the loop. The runner
variable is captured by reference in the OnStart
and OnStop
lifecycle hooks. Consequently, all hooks will reference the last runner
from the iteration, leading to only the last registered runner starting and potentially incorrect runners being stopped during shutdown. The runner
variable must be captured by value within the loop scope.
internal/worker/module.go#L36-L51
ledger/internal/worker/module.go
Lines 36 to 51 in 6c053a1
options = append(options, | |
fx.Invoke(fx.Annotate(func(runners []Runner, lc fx.Lifecycle) { | |
for _, runner := range runners { | |
lc.Append(fx.Hook{ | |
OnStart: func(ctx context.Context) error { | |
go func() { | |
if err := runner.Run(context.WithoutCancel(ctx)); err != nil { | |
panic(err) | |
} | |
}() | |
return nil | |
}, | |
OnStop: runner.Stop, | |
}) | |
} | |
}, fx.ParamTags(`group:"runners"`))), |
Was this report helpful? Give feedback by reacting with 👍 or 👎
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #996 +/- ##
==========================================
+ Coverage 82.82% 82.87% +0.05%
==========================================
Files 145 145
Lines 8219 8280 +61
==========================================
+ Hits 6807 6862 +55
- Misses 1085 1090 +5
- Partials 327 328 +1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
No description provided.