Skip to content

Commit 8b415da

Browse files
authored
Split event handler from watcher (#36)
* split apart task hydration+execution from watcher/config package with a simple channel * moved task list diff code to task package * implemented reconfigurer * removed env logging from target command execution debug log - it's mostly unnecessary and makes reading logs during dev annoying * written some mocks for tests * written tests for state transitions * moved execution specific environment variables to the ExecutionTask structure
1 parent b61ceac commit 8b415da

20 files changed

+902
-392
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,5 @@ pico
2424
dist
2525
.env
2626
cache
27+
28+
*/.test/

service/executor/cmd.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package executor
2+
3+
import (
4+
"github.com/pkg/errors"
5+
"go.uber.org/zap"
6+
7+
"github.com/picostack/pico/service/secret"
8+
"github.com/picostack/pico/service/task"
9+
)
10+
11+
var _ Executor = &CommandExecutor{}
12+
13+
// CommandExecutor handles command invocation targets
14+
type CommandExecutor struct {
15+
secrets secret.Store
16+
}
17+
18+
// NewCommandExecutor creates a new CommandExecutor
19+
func NewCommandExecutor(secrets secret.Store) CommandExecutor {
20+
return CommandExecutor{
21+
secrets: secrets,
22+
}
23+
}
24+
25+
// Subscribe implements executor.Executor
26+
func (e *CommandExecutor) Subscribe(bus chan task.ExecutionTask) {
27+
for t := range bus {
28+
if err := e.execute(t.Target, t.Path, t.Shutdown); err != nil {
29+
zap.L().Error("executor task unsuccessful",
30+
zap.String("target", t.Target.Name),
31+
zap.Bool("shutdown", t.Shutdown),
32+
zap.Error(err))
33+
}
34+
}
35+
return
36+
}
37+
38+
func (e *CommandExecutor) execute(
39+
target task.Target,
40+
path string,
41+
shutdown bool,
42+
) (err error) {
43+
env, err := e.secrets.GetSecretsForTarget(target.Name)
44+
if err != nil {
45+
return errors.Wrap(err, "failed to get secrets for target")
46+
}
47+
48+
zap.L().Debug("executing with secrets",
49+
zap.String("target", target.Name),
50+
zap.Strings("cmd", target.Up),
51+
zap.String("url", target.RepoURL),
52+
zap.String("dir", path),
53+
zap.Int("secrets", len(env)))
54+
55+
return target.Execute(path, env, shutdown)
56+
}

service/executor/cmd_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package executor_test
2+
3+
import (
4+
"os"
5+
"testing"
6+
"time"
7+
8+
"golang.org/x/sync/errgroup"
9+
10+
"github.com/picostack/pico/service/executor"
11+
"github.com/picostack/pico/service/secret/memory"
12+
"github.com/picostack/pico/service/task"
13+
)
14+
15+
func TestMain(m *testing.M) {
16+
os.Mkdir(".test", os.ModePerm) //nolint:errcheck
17+
os.Exit(m.Run())
18+
}
19+
20+
func TestCommandExecutor(t *testing.T) {
21+
ce := executor.NewCommandExecutor(&memory.MemorySecrets{})
22+
bus := make(chan task.ExecutionTask)
23+
24+
g := errgroup.Group{}
25+
26+
g.Go(func() error {
27+
bus <- task.ExecutionTask{
28+
Target: task.Target{
29+
Name: "test_executor",
30+
Up: []string{"touch", "01"},
31+
},
32+
Path: "./.test",
33+
}
34+
return nil
35+
})
36+
37+
go ce.Subscribe(bus)
38+
39+
if err := g.Wait(); err != nil {
40+
t.Error(err)
41+
}
42+
43+
// wait for the task to be consumed and executed
44+
time.Sleep(time.Second)
45+
46+
if _, err := os.Stat(".test/01"); err == os.ErrNotExist {
47+
t.Error("expected file .test/01 to exist:", err)
48+
}
49+
}

service/executor/executor.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package executor
2+
3+
import "github.com/picostack/pico/service/task"
4+
5+
// Executor describes a type that can handle events and react to them. An
6+
// executor is also responsible for hydrating a target with secrets.
7+
type Executor interface {
8+
Subscribe(chan task.ExecutionTask)
9+
}

service/executor/printer.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package executor
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/picostack/pico/service/task"
7+
)
8+
9+
// Printer implements an executor that doesn't actually execute, just prints.
10+
type Printer struct{}
11+
12+
// Subscribe implements executor.Executor
13+
func (p *Printer) Subscribe(bus chan task.ExecutionTask) {
14+
for t := range bus {
15+
fmt.Printf("received task: %s\n", t.Target.Name)
16+
}
17+
}

service/reconfigurer/git.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package reconfigurer
2+
3+
import (
4+
"context"
5+
"path/filepath"
6+
"time"
7+
8+
"github.com/Southclaws/gitwatch"
9+
"github.com/picostack/pico/service/config"
10+
"github.com/picostack/pico/service/watcher"
11+
"github.com/pkg/errors"
12+
"go.uber.org/zap"
13+
"gopkg.in/src-d/go-git.v4/plumbing/transport"
14+
)
15+
16+
var _ Provider = &GitProvider{}
17+
18+
// GitProvider implements a Provider backed by Git. It will reconfigure its
19+
// watcher process upon commits to its defined configuration repository.
20+
type GitProvider struct {
21+
directory string
22+
hostname string
23+
configRepo string
24+
checkInterval time.Duration
25+
ssh transport.AuthMethod
26+
27+
configWatcher *gitwatch.Session
28+
}
29+
30+
// New creates a new provider with all necessary parameters
31+
func New(
32+
directory string,
33+
hostname string,
34+
configRepo string,
35+
checkInterval time.Duration,
36+
ssh transport.AuthMethod,
37+
) *GitProvider {
38+
return &GitProvider{
39+
directory: directory,
40+
hostname: hostname,
41+
configRepo: configRepo,
42+
checkInterval: checkInterval,
43+
ssh: ssh,
44+
}
45+
}
46+
47+
// Configure implements Provider
48+
func (p *GitProvider) Configure(w watcher.Watcher) error {
49+
if err := p.reconfigure(w); err != nil {
50+
return err
51+
}
52+
53+
for range p.configWatcher.Events {
54+
if err := p.reconfigure(w); err != nil {
55+
return err
56+
}
57+
}
58+
59+
return nil
60+
}
61+
62+
// reconfigure will close the configuration watcher (unless it's the first run)
63+
// then create a watcher for the application's config target repo then wait for
64+
// the first event (either from a fresh clone, a pull, or just a noop event)
65+
// then update the state of the watcher it's in charge of.
66+
func (p *GitProvider) reconfigure(w watcher.Watcher) (err error) {
67+
zap.L().Debug("reconfiguring")
68+
69+
err = p.watchConfig()
70+
if err != nil {
71+
return
72+
}
73+
74+
// generate a new desired state from the config repo
75+
path, err := gitwatch.GetRepoDirectory(p.configRepo)
76+
if err != nil {
77+
return
78+
}
79+
state := getNewState(
80+
filepath.Join(p.directory, path),
81+
p.hostname,
82+
w.GetState(),
83+
)
84+
85+
// Set the HOSTNAME config environment variable if necessary.
86+
if p.hostname != "" {
87+
state.Env["HOSTNAME"] = p.hostname
88+
}
89+
90+
return w.SetState(state)
91+
}
92+
93+
// watchConfig creates or restarts the watcher that reacts to changes to the
94+
// repo that contains pico configuration scripts
95+
func (p *GitProvider) watchConfig() (err error) {
96+
if p.configWatcher != nil {
97+
zap.L().Debug("closing existing watcher")
98+
p.configWatcher.Close()
99+
}
100+
101+
p.configWatcher, err = gitwatch.New(
102+
context.TODO(),
103+
[]gitwatch.Repository{{URL: p.configRepo}},
104+
p.checkInterval,
105+
p.directory,
106+
p.ssh,
107+
false)
108+
if err != nil {
109+
return errors.Wrap(err, "failed to watch config target")
110+
}
111+
112+
go func() {
113+
e := p.configWatcher.Run()
114+
if e != nil && !errors.Is(e, context.Canceled) {
115+
zap.L().Error("config watcher failed", zap.Error(e))
116+
}
117+
}()
118+
zap.L().Debug("created new config watcher, awaiting setup")
119+
120+
<-p.configWatcher.InitialDone
121+
122+
return
123+
}
124+
125+
// getNewState attempts to obtain a new desired state from the given path, if
126+
// any failures occur, it simply returns a fallback state and logs an error
127+
func getNewState(path, hostname string, fallback config.State) (state config.State) {
128+
state, err := config.ConfigFromDirectory(path, hostname)
129+
if err != nil {
130+
zap.L().Error("failed to construct config from repo, falling back to original state",
131+
zap.String("path", path),
132+
zap.String("hostname", hostname),
133+
zap.Error(err))
134+
135+
state = fallback
136+
} else {
137+
zap.L().Debug("constructed desired state",
138+
zap.Int("targets", len(state.Targets)))
139+
}
140+
return
141+
}

service/reconfigurer/reconfigurer.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package reconfigurer
2+
3+
import (
4+
"github.com/picostack/pico/service/watcher"
5+
)
6+
7+
// Provider describes a type that can provide config state events to a target
8+
// watcher. It will reconfigure and restart the watcher whenever necessary.
9+
type Provider interface {
10+
Configure(watcher.Watcher) error
11+
}

service/reconfigurer/static.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package reconfigurer
2+
3+
import (
4+
"github.com/picostack/pico/service/config"
5+
"github.com/picostack/pico/service/watcher"
6+
)
7+
8+
var _ Provider = &Static{}
9+
10+
// Static implements a Provider with a static config state that only sets its
11+
// watcher state once on initialisation.
12+
type Static struct {
13+
state config.State
14+
}
15+
16+
// NewStatic creates and calls SetState
17+
func NewStatic(state config.State, w watcher.Watcher) Static {
18+
s := Static{state: state}
19+
s.Configure(w)
20+
return s
21+
}
22+
23+
// Configure implements Provider
24+
func (s *Static) Configure(w watcher.Watcher) error {
25+
return w.SetState(s.state)
26+
}

0 commit comments

Comments
 (0)