mirror of https://github.com/minio/minio.git
293 lines
7.1 KiB
Go
293 lines
7.1 KiB
Go
|
// Copyright (c) 2014-2015 The Notify Authors. All rights reserved.
|
||
|
// Use of this source code is governed by the MIT license that can be
|
||
|
// found in the LICENSE file.
|
||
|
|
||
|
package notify
|
||
|
|
||
|
import "sync"
|
||
|
|
||
|
// nonrecursiveTree TODO(rjeczalik)
|
||
|
type nonrecursiveTree struct {
|
||
|
rw sync.RWMutex // protects root
|
||
|
root root
|
||
|
w watcher
|
||
|
c chan EventInfo
|
||
|
rec chan EventInfo
|
||
|
}
|
||
|
|
||
|
// newNonrecursiveTree TODO(rjeczalik)
|
||
|
func newNonrecursiveTree(w watcher, c, rec chan EventInfo) *nonrecursiveTree {
|
||
|
if rec == nil {
|
||
|
rec = make(chan EventInfo, buffer)
|
||
|
}
|
||
|
t := &nonrecursiveTree{
|
||
|
root: root{nd: newnode("")},
|
||
|
w: w,
|
||
|
c: c,
|
||
|
rec: rec,
|
||
|
}
|
||
|
go t.dispatch(c)
|
||
|
go t.internal(rec)
|
||
|
return t
|
||
|
}
|
||
|
|
||
|
// dispatch TODO(rjeczalik)
|
||
|
func (t *nonrecursiveTree) dispatch(c <-chan EventInfo) {
|
||
|
for ei := range c {
|
||
|
dbgprintf("dispatching %v on %q", ei.Event(), ei.Path())
|
||
|
go func(ei EventInfo) {
|
||
|
var nd node
|
||
|
var isrec bool
|
||
|
dir, base := split(ei.Path())
|
||
|
fn := func(it node, isbase bool) error {
|
||
|
isrec = isrec || it.Watch.IsRecursive()
|
||
|
if isbase {
|
||
|
nd = it
|
||
|
} else {
|
||
|
it.Watch.Dispatch(ei, recursive)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
t.rw.RLock()
|
||
|
// Notify recursive watchpoints found on the path.
|
||
|
if err := t.root.WalkPath(dir, fn); err != nil {
|
||
|
dbgprint("dispatch did not reach leaf:", err)
|
||
|
t.rw.RUnlock()
|
||
|
return
|
||
|
}
|
||
|
// Notify parent watchpoint.
|
||
|
nd.Watch.Dispatch(ei, 0)
|
||
|
isrec = isrec || nd.Watch.IsRecursive()
|
||
|
// If leaf watchpoint exists, notify it.
|
||
|
if nd, ok := nd.Child[base]; ok {
|
||
|
isrec = isrec || nd.Watch.IsRecursive()
|
||
|
nd.Watch.Dispatch(ei, 0)
|
||
|
}
|
||
|
t.rw.RUnlock()
|
||
|
// If the event describes newly leaf directory created within
|
||
|
if !isrec || ei.Event() != Create {
|
||
|
return
|
||
|
}
|
||
|
if ok, err := ei.(isDirer).isDir(); !ok || err != nil {
|
||
|
return
|
||
|
}
|
||
|
t.rec <- ei
|
||
|
}(ei)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// internal TODO(rjeczalik)
|
||
|
func (t *nonrecursiveTree) internal(rec <-chan EventInfo) {
|
||
|
for ei := range rec {
|
||
|
var nd node
|
||
|
var eset = internal
|
||
|
t.rw.Lock()
|
||
|
t.root.WalkPath(ei.Path(), func(it node, _ bool) error {
|
||
|
if e := it.Watch[t.rec]; e != 0 && e > eset {
|
||
|
eset = e
|
||
|
}
|
||
|
nd = it
|
||
|
return nil
|
||
|
})
|
||
|
if eset == internal {
|
||
|
t.rw.Unlock()
|
||
|
continue
|
||
|
}
|
||
|
err := nd.Add(ei.Path()).AddDir(t.recFunc(eset))
|
||
|
t.rw.Unlock()
|
||
|
if err != nil {
|
||
|
dbgprintf("internal(%p) error: %v", rec, err)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// watchAdd TODO(rjeczalik)
|
||
|
func (t *nonrecursiveTree) watchAdd(nd node, c chan<- EventInfo, e Event) eventDiff {
|
||
|
if e&recursive != 0 {
|
||
|
diff := nd.Watch.Add(t.rec, e|Create|omit)
|
||
|
nd.Watch.Add(c, e)
|
||
|
return diff
|
||
|
}
|
||
|
return nd.Watch.Add(c, e)
|
||
|
}
|
||
|
|
||
|
// watchDelMin TODO(rjeczalik)
|
||
|
func (t *nonrecursiveTree) watchDelMin(min Event, nd node, c chan<- EventInfo, e Event) eventDiff {
|
||
|
old, ok := nd.Watch[t.rec]
|
||
|
if ok {
|
||
|
nd.Watch[t.rec] = min
|
||
|
}
|
||
|
diff := nd.Watch.Del(c, e)
|
||
|
if ok {
|
||
|
switch old &^= diff[0] &^ diff[1]; {
|
||
|
case old|internal == internal:
|
||
|
delete(nd.Watch, t.rec)
|
||
|
if set, ok := nd.Watch[nil]; ok && len(nd.Watch) == 1 && set == 0 {
|
||
|
delete(nd.Watch, nil)
|
||
|
}
|
||
|
default:
|
||
|
nd.Watch.Add(t.rec, old|Create)
|
||
|
switch {
|
||
|
case diff == none:
|
||
|
case diff[1]|Create == diff[0]:
|
||
|
diff = none
|
||
|
default:
|
||
|
diff[1] |= Create
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return diff
|
||
|
}
|
||
|
|
||
|
// watchDel TODO(rjeczalik)
|
||
|
func (t *nonrecursiveTree) watchDel(nd node, c chan<- EventInfo, e Event) eventDiff {
|
||
|
return t.watchDelMin(0, nd, c, e)
|
||
|
}
|
||
|
|
||
|
// Watch TODO(rjeczalik)
|
||
|
func (t *nonrecursiveTree) Watch(path string, c chan<- EventInfo, events ...Event) error {
|
||
|
if c == nil {
|
||
|
panic("notify: Watch using nil channel")
|
||
|
}
|
||
|
// Expanding with empty event set is a nop.
|
||
|
if len(events) == 0 {
|
||
|
return nil
|
||
|
}
|
||
|
path, isrec, err := cleanpath(path)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
eset := joinevents(events)
|
||
|
t.rw.Lock()
|
||
|
defer t.rw.Unlock()
|
||
|
nd := t.root.Add(path)
|
||
|
if isrec {
|
||
|
return t.watchrec(nd, c, eset|recursive)
|
||
|
}
|
||
|
return t.watch(nd, c, eset)
|
||
|
}
|
||
|
|
||
|
func (t *nonrecursiveTree) watch(nd node, c chan<- EventInfo, e Event) (err error) {
|
||
|
diff := nd.Watch.Add(c, e)
|
||
|
switch {
|
||
|
case diff == none:
|
||
|
return nil
|
||
|
case diff[1] == 0:
|
||
|
// TODO(rjeczalik): cleanup this panic after implementation is stable
|
||
|
panic("eset is empty: " + nd.Name)
|
||
|
case diff[0] == 0:
|
||
|
err = t.w.Watch(nd.Name, diff[1])
|
||
|
default:
|
||
|
err = t.w.Rewatch(nd.Name, diff[0], diff[1])
|
||
|
}
|
||
|
if err != nil {
|
||
|
nd.Watch.Del(c, diff.Event())
|
||
|
return err
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (t *nonrecursiveTree) recFunc(e Event) walkFunc {
|
||
|
return func(nd node) error {
|
||
|
switch diff := nd.Watch.Add(t.rec, e|omit|Create); {
|
||
|
case diff == none:
|
||
|
case diff[1] == 0:
|
||
|
// TODO(rjeczalik): cleanup this panic after implementation is stable
|
||
|
panic("eset is empty: " + nd.Name)
|
||
|
case diff[0] == 0:
|
||
|
t.w.Watch(nd.Name, diff[1])
|
||
|
default:
|
||
|
t.w.Rewatch(nd.Name, diff[0], diff[1])
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (t *nonrecursiveTree) watchrec(nd node, c chan<- EventInfo, e Event) error {
|
||
|
var traverse func(walkFunc) error
|
||
|
// Non-recursive tree listens on Create event for every recursive
|
||
|
// watchpoint in order to automagically set a watch for every
|
||
|
// created directory.
|
||
|
switch diff := nd.Watch.dryAdd(t.rec, e|Create); {
|
||
|
case diff == none:
|
||
|
t.watchAdd(nd, c, e)
|
||
|
nd.Watch.Add(t.rec, e|omit|Create)
|
||
|
return nil
|
||
|
case diff[1] == 0:
|
||
|
// TODO(rjeczalik): cleanup this panic after implementation is stable
|
||
|
panic("eset is empty: " + nd.Name)
|
||
|
case diff[0] == 0:
|
||
|
// TODO(rjeczalik): BFS into directories and skip subtree as soon as first
|
||
|
// recursive watchpoint is encountered.
|
||
|
traverse = nd.AddDir
|
||
|
default:
|
||
|
traverse = nd.Walk
|
||
|
}
|
||
|
// TODO(rjeczalik): account every path that failed to be (re)watched
|
||
|
// and retry.
|
||
|
if err := traverse(t.recFunc(e)); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
t.watchAdd(nd, c, e)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
type walkWatchpointFunc func(Event, node) error
|
||
|
|
||
|
func (t *nonrecursiveTree) walkWatchpoint(nd node, fn walkWatchpointFunc) error {
|
||
|
type minode struct {
|
||
|
min Event
|
||
|
nd node
|
||
|
}
|
||
|
mnd := minode{nd: nd}
|
||
|
stack := []minode{mnd}
|
||
|
Traverse:
|
||
|
for n := len(stack); n != 0; n = len(stack) {
|
||
|
mnd, stack = stack[n-1], stack[:n-1]
|
||
|
// There must be no recursive watchpoints if the node has no watchpoints
|
||
|
// itself (every node in subtree rooted at recursive watchpoints must
|
||
|
// have at least nil (total) and t.rec watchpoints).
|
||
|
if len(mnd.nd.Watch) != 0 {
|
||
|
switch err := fn(mnd.min, mnd.nd); err {
|
||
|
case nil:
|
||
|
case errSkip:
|
||
|
continue Traverse
|
||
|
default:
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
for _, nd := range mnd.nd.Child {
|
||
|
stack = append(stack, minode{mnd.nd.Watch[t.rec], nd})
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Stop TODO(rjeczalik)
|
||
|
func (t *nonrecursiveTree) Stop(c chan<- EventInfo) {
|
||
|
fn := func(min Event, nd node) error {
|
||
|
// TODO(rjeczalik): aggregate watcher errors and retry; in worst case
|
||
|
// forward to the user.
|
||
|
switch diff := t.watchDelMin(min, nd, c, all); {
|
||
|
case diff == none:
|
||
|
return nil
|
||
|
case diff[1] == 0:
|
||
|
t.w.Unwatch(nd.Name)
|
||
|
default:
|
||
|
t.w.Rewatch(nd.Name, diff[0], diff[1])
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
t.rw.Lock()
|
||
|
err := t.walkWatchpoint(t.root.nd, fn) // TODO(rjeczalik): store max root per c
|
||
|
t.rw.Unlock()
|
||
|
dbgprintf("Stop(%p) error: %v\n", c, err)
|
||
|
}
|
||
|
|
||
|
// Close TODO(rjeczalik)
|
||
|
func (t *nonrecursiveTree) Close() error {
|
||
|
err := t.w.Close()
|
||
|
close(t.c)
|
||
|
return err
|
||
|
}
|