Nothing Special   »   [go: up one dir, main page]

Skip to content

Commit

Permalink
Create one Stack per manifest directory (k0sproject#264)
Browse files Browse the repository at this point in the history
Signed-off-by: Leo Sjöberg <lsjoberg@mirantis.com>

Co-authored-by: Leo Sjöberg <lsjoberg@mirantis.com>
  • Loading branch information
phroggyy and Leo Sjöberg authored Oct 12, 2020
1 parent a84aea2 commit 6954a82
Show file tree
Hide file tree
Showing 13 changed files with 271 additions and 43 deletions.
6 changes: 5 additions & 1 deletion docs/manifests.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

MKE embeds a manifest deployer on controllers which allows easy way to deploy manifests automatically. By default MKE reads all manifests in `/var/lib/mke/manifests` and ensures their state matches on the cluster. When you remove a manifest file MKE will automatically prune all the resources associated with it.

Each directory that is a **direct descendant** of `/var/lib/mke/manifests` is considered
to be its own stack, but nested directories are not considered new stacks.

**Note:** MKE uses this mechanism for some of it's internal in-cluster components and other resources. Make sure you only touch the manifests not managed by MKE.

## Future

We intend to re-write the manifest handling partially. Currently all manifests are managed as a single "stack", meaning they will automatically get some of the same labels etc to allow automated pruning. We're planning to make this work in a stack per directory basis.
We may in the future support nested directories, but those will not be considered
_stacks_, but rather subresources of a parent stack. Stacks are exclusively top-level.

34 changes: 28 additions & 6 deletions pkg/applier/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"io/ioutil"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/retry"
"path"
"path/filepath"

Expand All @@ -31,20 +32,18 @@ type Applier struct {
}

// NewApplier creates new Applier
func NewApplier(dir string) (Applier, error) {
func NewApplier(dir string) Applier {
name := filepath.Base(dir)
log := logrus.WithFields(logrus.Fields{
"component": "applier",
"bundle": name,
})

a := Applier{
return Applier{
log: log,
Dir: dir,
Name: name,
}

return a, nil
}

func (a *Applier) init() error {
Expand Down Expand Up @@ -72,27 +71,50 @@ func (a *Applier) init() error {
// Apply resources
func (a *Applier) Apply() error {
if a.client == nil {
a.init()
err := retry.OnError(retry.DefaultBackoff, func(err error) bool {
return true
}, a.init)

if err != nil {
return err
}
}
files, err := filepath.Glob(path.Join(a.Dir, "*.yaml"))
if err != nil {
return err
}
resources, err := a.parseFiles(files)
stack := Stack{
Name: "mke-stack",
Name: a.Name,
Resources: resources,
Client: a.client,
Discovery: a.discoveryClient,
}
a.log.Debug("applying stack")
err = stack.Apply(context.Background(), true)
if err != nil {
a.log.WithError(err).Warn("stack apply failed")
a.discoveryClient.Invalidate()
} else {
a.log.Debug("successfully applied stack")
}

return err
}

// Delete deletes the entire stack by applying it with empty set of resources
func (a *Applier) Delete() error {
stack := Stack{
Name: a.Name,
Resources: []*unstructured.Unstructured{},
Client: a.client,
Discovery: a.discoveryClient,
}
logrus.Debugf("about to delete a stack %s with empty apply", a.Name)
err := stack.Apply(context.Background(), true)
return err
}

func (a *Applier) parseFiles(files []string) ([]*unstructured.Unstructured, error) {
resources := []*unstructured.Unstructured{}
for _, file := range files {
Expand Down
2 changes: 1 addition & 1 deletion pkg/applier/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ spec:
ioutil.WriteFile(fmt.Sprintf("%s/test.yaml", dir), []byte(template), 0400)
ioutil.WriteFile(fmt.Sprintf("%s/test-pod.yaml", dir), []byte(template2), 0400)
assert.Nil(t, err)
a, err := NewApplier(dir)
a := NewApplier(dir)
assert.Nil(t, err)

a.client = fake.NewSimpleDynamicClient(runtime.NewScheme())
Expand Down
111 changes: 88 additions & 23 deletions pkg/applier/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package applier

import (
"context"
"path"
"time"

"github.com/Mirantis/mke/pkg/constant"
"github.com/Mirantis/mke/pkg/debounce"
kubeutil "github.com/Mirantis/mke/pkg/kubernetes"
"github.com/Mirantis/mke/pkg/leaderelection"
"github.com/Mirantis/mke/pkg/util"
Expand All @@ -23,6 +23,7 @@ type Manager struct {
cancelWatcher context.CancelFunc
cancelLeaderElection context.CancelFunc
log *logrus.Entry
stacks map[string]*StackApplier
}

// Init initializes the Manager
Expand All @@ -32,8 +33,10 @@ func (m *Manager) Init() error {
return errors.Wrapf(err, "failed to create manifest bundle dir %s", constant.ManifestsDir)
}
m.log = logrus.WithField("component", "applier-manager")
m.stacks = make(map[string]*StackApplier)
m.bundlePath = constant.ManifestsDir

m.applier, err = NewApplier(constant.ManifestsDir)
m.applier = NewApplier(constant.ManifestsDir)
return err
}

Expand Down Expand Up @@ -93,7 +96,9 @@ func (m *Manager) watchLeaseEvents(events *leaderelection.LeaseEvents) {
log.Info("acquired leader lease")
ctx, cancel := context.WithCancel(context.Background())
m.cancelWatcher = cancel
go m.runFSWatcher(ctx)
go func() {
_ = m.runWatchers(ctx)
}()
case <-events.LostLease:
log.Info("lost leader lease")
if m.cancelWatcher != nil {
Expand All @@ -103,41 +108,101 @@ func (m *Manager) watchLeaseEvents(events *leaderelection.LeaseEvents) {
}
}

func (m *Manager) runFSWatcher(ctx context.Context) {
func (m *Manager) runWatchers(ctx context.Context) error {
log := logrus.WithField("component", "applier-manager")
watcher, err := fsnotify.NewWatcher()

dirs, err := util.GetAllDirs(m.bundlePath)
if err != nil {
log.Errorf("failed to create fs watcher for %s: %s", constant.ManifestsDir, err.Error())
return
return err
}
defer watcher.Close()

// Apply once after becoming leader, to make everything sync even if there's no FS events
log.Debug("Running initial apply after we've become the leader")
if err := m.applier.Apply(); err != nil {
log.Warnf("failed to apply manifests: %s", err.Error())
for _, dir := range dirs {
if err := m.createStack(path.Join(m.bundlePath, dir)); err != nil {
log.WithError(err).Error("failed to create stack")
return err
}
}

debouncer := debounce.New(5*time.Second, watcher.Events, func(arg fsnotify.Event) {
log.Debug("debouncer triggering, applying...")
if err := m.applier.Apply(); err != nil {
log.Warnf("failed to apply manifests: %s", err.Error())
}
})
defer debouncer.Stop()
go debouncer.Start()
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.WithError(err).Error("failed to create watcher")
return err
}
defer watcher.Close()

err = watcher.Add(constant.ManifestsDir)
err = watcher.Add(m.bundlePath)
if err != nil {
log.Warnf("Failed to start watcher: %s", err.Error())
}
for {
select {
case err := <-watcher.Errors:
case err, ok := <-watcher.Errors:
if !ok {
return err
}

log.Warnf("watch error: %s", err.Error())
case event, ok := <-watcher.Events:
if !ok {
return nil
}
switch event.Op {
case fsnotify.Create:
if util.IsDirectory(event.Name) {
if err := m.createStack(event.Name); err != nil {
return err
}
}
case fsnotify.Remove:
_ = m.removeStack(event.Name)
}
case <-ctx.Done():
log.Info("manifest watcher done")
return
return nil
}
}
}

func (m *Manager) createStack(name string) error {
// safeguard in case the fswatcher would trigger an event for an already existing watcher
if _, ok := m.stacks[name]; ok {
return nil
}
m.log.WithField("stack", name).Info("registering new stack")
sa, err := NewStackApplier(name)
if err != nil {
return err
}

go func() {
_ = sa.Start()
}()

m.stacks[name] = sa
return nil
}

func (m *Manager) removeStack(name string) error {
sa, ok := m.stacks[name]

if !ok {
m.log.
WithField("path", name).
Debug("attempted to remove non-existent stack, probably not a directory")
return nil
}
err := sa.Stop()
if err != nil {
m.log.WithField("stack", name).WithError(err).Warn("failed to stop stack applier")
return err
}
err = sa.DeleteStack()
if err != nil {
m.log.WithField("stack", name).WithError(err).Warn("failed to stop and delete a stack applier")
return err
}

delete(m.stacks, name)

return nil
}
79 changes: 79 additions & 0 deletions pkg/applier/stackapplier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package applier

import (
"k8s.io/client-go/util/retry"
"time"

"github.com/Mirantis/mke/pkg/debounce"
"github.com/sirupsen/logrus"
"gopkg.in/fsnotify.v1"
)

// StackApplier handles each directory as a Stack and watches for changes
type StackApplier struct {
Path string

fsWatcher *fsnotify.Watcher
applier Applier
log *logrus.Entry
done chan bool
}

// NewStackApplier crates new stack applier to manage a stack
func NewStackApplier(path string) (*StackApplier, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
err = watcher.Add(path)
if err != nil {
return nil, err
}
applier := NewApplier(path)
log := logrus.WithField("component", "applier-"+applier.Name)
log.WithField("path", path).Debug("created stack applier")

return &StackApplier{
Path: path,
fsWatcher: watcher,
applier: applier,
log: log,
done: make(chan bool, 1),
}, nil
}

// Start both the initial apply and also the watch for a single stack
func (s *StackApplier) Start() error {
debouncer := debounce.New(5*time.Second, s.fsWatcher.Events, func(arg fsnotify.Event) {
s.log.Debug("debouncer triggering, applying...")
err := retry.OnError(retry.DefaultRetry, func(err error) bool {
return true
}, s.applier.Apply)
if err != nil {
s.log.Warnf("failed to apply manifests: %s", err.Error())
}
})
defer debouncer.Stop()
go debouncer.Start()

// apply all changes on start
s.fsWatcher.Events <- fsnotify.Event{}

<-s.done

return nil
}

// Stop stops the stack applier and removes the stack
func (s *StackApplier) Stop() error {
s.log.WithField("stack", s.Path).Info("stopping and deleting stack")
s.done <- true
close(s.done)

return nil
}

// DeleteStack deletes the associated stack
func (s *StackApplier) DeleteStack() error {
return s.applier.Delete()
}
11 changes: 9 additions & 2 deletions pkg/component/server/calico.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package server
import (
"fmt"
"github.com/Mirantis/mke/static"
"os"
"path"
"path/filepath"
"strings"
"time"
Expand Down Expand Up @@ -53,6 +55,11 @@ func (c *Calico) Init() error {

// Run runs the calico reconciler
func (c *Calico) Run() error {
calicoDir := path.Join(constant.DataDir, "manifests", "calico")
err := os.MkdirAll(calicoDir, constant.ManifestsDirMode)
if err != nil {
return err
}
c.tickerDone = make(chan struct{})
var emptyStruct struct{}

Expand All @@ -74,7 +81,7 @@ func (c *Calico) Run() error {
Name: fmt.Sprintf("calico-crd-%s", strings.TrimSuffix(filename, filepath.Ext(filename))),
Template: string(contents),
Data: emptyStruct,
Path: filepath.Join(constant.DataDir, "manifests", fmt.Sprintf("calico-crd-%s", filename)),
Path: filepath.Join(calicoDir, fmt.Sprintf("calico-crd-%s", filename)),
}
err = tw.Write()
if err != nil {
Expand Down Expand Up @@ -142,7 +149,7 @@ func (c *Calico) work(previousConfig calicoConfig) *calicoConfig {
Name: fmt.Sprintf("calico-%s-%s", dir, strings.TrimSuffix(filename, filepath.Ext(filename))),
Template: string(contents),
Data: config,
Path: filepath.Join(constant.DataDir, "manifests", fmt.Sprintf("calico-%s-%s", dir, filename)),
Path: filepath.Join(constant.DataDir, "manifests", "calico", fmt.Sprintf("calico-%s-%s", dir, filename)),
}
err = tw.Write()
if err != nil {
Expand Down
Loading

0 comments on commit 6954a82

Please sign in to comment.