Skip to content

Commit 784455a

Browse files
committed
pkg/cdi: implement automatic refresh.
Implement automatic refresh and enable it by default. The new WithAutoRefresh(bool) option can be used to explicitly change the auto-refreshing behavior of the cache. When auto-refresh is enabled, CDI Spec directories are monitored for changes. A cache refresh is internally triggered whenever a relevant change to a cache file or directory is detected. Signed-off-by: Krisztian Litkey <krisztian.litkey@intel.com>
1 parent 4b00593 commit 784455a

5 files changed

Lines changed: 557 additions & 66 deletions

File tree

pkg/cdi/cache.go

Lines changed: 239 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"strings"
2323
"sync"
2424

25+
"github.com/fsnotify/fsnotify"
2526
"github.com/hashicorp/go-multierror"
2627
oci "github.com/opencontainers/runtime-spec/specs-go"
2728
"github.com/pkg/errors"
@@ -33,30 +34,46 @@ type Option func(*Cache) error
3334
// Cache stores CDI Specs loaded from Spec directories.
3435
type Cache struct {
3536
sync.Mutex
36-
specDirs []string
37-
specs map[string][]*Spec
38-
devices map[string]*Device
39-
errors map[string][]error
37+
specDirs []string
38+
specs map[string][]*Spec
39+
devices map[string]*Device
40+
errors map[string][]error
41+
dirErrors map[string]error
42+
43+
autoRefresh bool
44+
watch *watch
45+
}
46+
47+
// WithAutoRefresh returns an option to control automatic Cache refresh.
48+
// By default auto-refresh is enabled, the list of Spec directories are
49+
// monitored and the Cache is automatically refreshed whenever a change
50+
// is detected. This option can be used to disable this behavior when a
51+
// manually refreshed mode is preferable.
52+
func WithAutoRefresh(autoRefresh bool) Option {
53+
return func(c *Cache) error {
54+
c.autoRefresh = autoRefresh
55+
return nil
56+
}
4057
}
4158

4259
// NewCache creates a new CDI Cache. The cache is populated from a set
4360
// of CDI Spec directories. These can be specified using a WithSpecDirs
4461
// option. The default set of directories is exposed in DefaultSpecDirs.
4562
func NewCache(options ...Option) (*Cache, error) {
46-
c := &Cache{}
47-
48-
if err := c.Configure(options...); err != nil {
49-
return nil, err
50-
}
51-
if len(c.specDirs) == 0 {
52-
c.Configure(WithSpecDirs(DefaultSpecDirs...))
63+
c := &Cache{
64+
autoRefresh: true,
65+
watch: &watch{},
5366
}
5467

55-
return c, c.Refresh()
68+
WithSpecDirs(DefaultSpecDirs...)(c)
69+
c.Lock()
70+
defer c.Unlock()
71+
72+
return c, c.configure(options...)
5673
}
5774

58-
// Configure applies options to the cache. Updates the cache if options have
59-
// changed.
75+
// Configure applies options to the Cache. Updates and refreshes the
76+
// Cache if options have changed.
6077
func (c *Cache) Configure(options ...Option) error {
6178
if len(options) == 0 {
6279
return nil
@@ -65,17 +82,54 @@ func (c *Cache) Configure(options ...Option) error {
6582
c.Lock()
6683
defer c.Unlock()
6784

85+
return c.configure(options...)
86+
}
87+
88+
// Configure the Cache. Start/stop CDI Spec directory watch, refresh
89+
// the Cache if necessary.
90+
func (c *Cache) configure(options ...Option) error {
91+
var err error
92+
6893
for _, o := range options {
69-
if err := o(c); err != nil {
94+
if err = o(c); err != nil {
7095
return errors.Wrapf(err, "failed to apply cache options")
7196
}
7297
}
7398

99+
c.dirErrors = make(map[string]error)
100+
101+
c.watch.stop()
102+
if c.autoRefresh {
103+
c.watch.setup(c.specDirs, c.dirErrors)
104+
c.watch.start(&c.Mutex, c.refresh, c.dirErrors)
105+
}
106+
c.refresh()
107+
74108
return nil
75109
}
76110

77111
// Refresh rescans the CDI Spec directories and refreshes the Cache.
112+
// In manual refresh mode the cache is always refreshed. In auto-
113+
// refresh mode the cache is only refreshed if it is out of date.
78114
func (c *Cache) Refresh() error {
115+
c.Lock()
116+
defer c.Unlock()
117+
118+
// force a refresh in manual mode
119+
if refreshed, err := c.refreshIfRequired(!c.autoRefresh); refreshed {
120+
return err
121+
}
122+
123+
// collect and return cached errors, much like refresh() does it
124+
var result error
125+
for _, err := range c.errors {
126+
result = multierror.Append(result, err...)
127+
}
128+
return result
129+
}
130+
131+
// Refresh the Cache by rescanning CDI Spec directories and files.
132+
func (c *Cache) refresh() error {
79133
var (
80134
specs = map[string][]*Spec{}
81135
devices = map[string]*Device{}
@@ -135,9 +189,6 @@ func (c *Cache) Refresh() error {
135189
delete(devices, conflict)
136190
}
137191

138-
c.Lock()
139-
defer c.Unlock()
140-
141192
c.specs = specs
142193
c.devices = devices
143194
c.errors = specErrors
@@ -149,6 +200,17 @@ func (c *Cache) Refresh() error {
149200
return nil
150201
}
151202

203+
// RefreshIfRequired triggers a refresh if necessary.
204+
func (c *Cache) refreshIfRequired(force bool) (bool, error) {
205+
// We need to refresh if
206+
// - it's forced by an explicitly call to Refresh() in manual mode
207+
// - a missing Spec dir appears (added to watch) in auto-refresh mode
208+
if force || (c.autoRefresh && c.watch.update(c.dirErrors)) {
209+
return true, c.refresh()
210+
}
211+
return false, nil
212+
}
213+
152214
// InjectDevices injects the given qualified devices to an OCI Spec. It
153215
// returns any unresolvable devices and an error if injection fails for
154216
// any of the devices.
@@ -162,6 +224,8 @@ func (c *Cache) InjectDevices(ociSpec *oci.Spec, devices ...string) ([]string, e
162224
c.Lock()
163225
defer c.Unlock()
164226

227+
c.refreshIfRequired(false)
228+
165229
edits := &ContainerEdits{}
166230
specs := map[*Spec]struct{}{}
167231

@@ -195,6 +259,8 @@ func (c *Cache) GetDevice(device string) *Device {
195259
c.Lock()
196260
defer c.Unlock()
197261

262+
c.refreshIfRequired(false)
263+
198264
return c.devices[device]
199265
}
200266

@@ -205,6 +271,8 @@ func (c *Cache) ListDevices() []string {
205271
c.Lock()
206272
defer c.Unlock()
207273

274+
c.refreshIfRequired(false)
275+
208276
for name := range c.devices {
209277
devices = append(devices, name)
210278
}
@@ -220,6 +288,8 @@ func (c *Cache) ListVendors() []string {
220288
c.Lock()
221289
defer c.Unlock()
222290

291+
c.refreshIfRequired(false)
292+
223293
for vendor := range c.specs {
224294
vendors = append(vendors, vendor)
225295
}
@@ -238,6 +308,8 @@ func (c *Cache) ListClasses() []string {
238308
c.Lock()
239309
defer c.Unlock()
240310

311+
c.refreshIfRequired(false)
312+
241313
for _, specs := range c.specs {
242314
for _, spec := range specs {
243315
cmap[spec.GetClass()] = struct{}{}
@@ -256,6 +328,8 @@ func (c *Cache) GetVendorSpecs(vendor string) []*Spec {
256328
c.Lock()
257329
defer c.Unlock()
258330

331+
c.refreshIfRequired(false)
332+
259333
return c.specs[vendor]
260334
}
261335

@@ -268,12 +342,158 @@ func (c *Cache) GetSpecErrors(spec *Spec) []error {
268342
// GetErrors returns all errors encountered during the last
269343
// cache refresh.
270344
func (c *Cache) GetErrors() map[string][]error {
271-
return c.errors
345+
c.Lock()
346+
defer c.Unlock()
347+
348+
errors := map[string][]error{}
349+
for path, errs := range c.errors {
350+
errors[path] = errs
351+
}
352+
for path, err := range c.dirErrors {
353+
errors[path] = []error{err}
354+
}
355+
356+
return errors
272357
}
273358

274359
// GetSpecDirectories returns the CDI Spec directories currently in use.
275360
func (c *Cache) GetSpecDirectories() []string {
361+
c.Lock()
362+
defer c.Unlock()
363+
276364
dirs := make([]string, len(c.specDirs))
277365
copy(dirs, c.specDirs)
278366
return dirs
279367
}
368+
369+
// GetSpecDirErrors returns any errors related to configured Spec directories.
370+
func (c *Cache) GetSpecDirErrors() map[string]error {
371+
if c.dirErrors == nil {
372+
return nil
373+
}
374+
375+
c.Lock()
376+
defer c.Unlock()
377+
378+
errors := make(map[string]error)
379+
for dir, err := range c.dirErrors {
380+
errors[dir] = err
381+
}
382+
return errors
383+
}
384+
385+
// Our fsnotify helper wrapper.
386+
type watch struct {
387+
watcher *fsnotify.Watcher
388+
tracked map[string]bool
389+
}
390+
391+
// Setup monitoring for the given Spec directories.
392+
func (w *watch) setup(dirs []string, dirErrors map[string]error) {
393+
var (
394+
dir string
395+
err error
396+
)
397+
w.tracked = make(map[string]bool)
398+
for _, dir = range dirs {
399+
w.tracked[dir] = false
400+
}
401+
402+
w.watcher, err = fsnotify.NewWatcher()
403+
if err != nil {
404+
for _, dir := range dirs {
405+
dirErrors[dir] = errors.Wrap(err, "failed to create watcher")
406+
}
407+
return
408+
}
409+
410+
w.update(dirErrors)
411+
}
412+
413+
// Start watching Spec directories for relevant changes.
414+
func (w *watch) start(m *sync.Mutex, refresh func() error, dirErrors map[string]error) {
415+
go w.watch(m, refresh, dirErrors)
416+
}
417+
418+
// Stop watching directories.
419+
func (w *watch) stop() {
420+
if w.watcher == nil {
421+
return
422+
}
423+
424+
w.watcher.Close()
425+
w.tracked = nil
426+
}
427+
428+
// Watch Spec directory changes, triggering a refresh if necessary.
429+
func (w *watch) watch(m *sync.Mutex, refresh func() error, dirErrors map[string]error) {
430+
watch := w.watcher
431+
if watch == nil {
432+
return
433+
}
434+
for {
435+
select {
436+
case event, ok := <-watch.Events:
437+
if !ok {
438+
return
439+
}
440+
441+
if (event.Op & (fsnotify.Rename | fsnotify.Remove | fsnotify.Write)) == 0 {
442+
continue
443+
}
444+
if event.Op == fsnotify.Write {
445+
if ext := filepath.Ext(event.Name); ext != ".json" && ext != ".yaml" {
446+
continue
447+
}
448+
}
449+
450+
m.Lock()
451+
if event.Op == fsnotify.Remove && w.tracked[event.Name] {
452+
w.update(dirErrors, event.Name)
453+
} else {
454+
w.update(dirErrors)
455+
}
456+
refresh()
457+
m.Unlock()
458+
459+
case _, ok := <-watch.Errors:
460+
if !ok {
461+
return
462+
}
463+
}
464+
}
465+
}
466+
467+
// Update watch with pending/missing or removed directories.
468+
func (w *watch) update(dirErrors map[string]error, removed ...string) bool {
469+
var (
470+
dir string
471+
ok bool
472+
err error
473+
update bool
474+
)
475+
476+
for dir, ok = range w.tracked {
477+
if ok {
478+
continue
479+
}
480+
481+
err = w.watcher.Add(dir)
482+
if err == nil {
483+
w.tracked[dir] = true
484+
delete(dirErrors, dir)
485+
update = true
486+
} else {
487+
w.tracked[dir] = false
488+
dirErrors[dir] = errors.Wrap(err, "failed to monitor for changes")
489+
}
490+
}
491+
492+
for _, dir = range removed {
493+
w.tracked[dir] = false
494+
dirErrors[dir] = errors.New("directory removed")
495+
update = true
496+
}
497+
498+
return update
499+
}

0 commit comments

Comments
 (0)