Add detailed scanner metrics (#15161)

This commit is contained in:
Klaus Post 2022-07-05 14:45:49 -07:00 committed by GitHub
parent df42914da6
commit ac055b09e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
55 changed files with 1735 additions and 1753 deletions

View File

@ -28,6 +28,7 @@ import (
"hash/crc32"
"io"
"io/ioutil"
"math"
"math/rand"
"net/http"
"net/url"
@ -50,6 +51,7 @@ import (
"github.com/minio/minio/internal/kms"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/logger/message/log"
"github.com/minio/minio/internal/pubsub"
iampolicy "github.com/minio/pkg/iam/policy"
xnet "github.com/minio/pkg/net"
"github.com/secure-io/sio-go"
@ -343,6 +345,92 @@ func (a adminAPIHandlers) StorageInfoHandler(w http.ResponseWriter, r *http.Requ
writeSuccessResponseJSON(w, jsonBytes)
}
// MetricsHandler - GET /minio/admin/v3/metrics
// ----------
// Get realtime server metrics
func (a adminAPIHandlers) MetricsHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "Metrics")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.ServerInfoAdminAction)
if objectAPI == nil {
return
}
const defaultMetricsInterval = time.Second
interval, err := time.ParseDuration(r.Form.Get("interval"))
if err != nil || interval < time.Second {
interval = defaultMetricsInterval
}
n, err := strconv.Atoi(r.Form.Get("n"))
if err != nil || n <= 0 {
n = math.MaxInt32
}
var types madmin.MetricType
if t, _ := strconv.ParseUint(r.Form.Get("types"), 10, 64); t != 0 {
types = madmin.MetricType(t)
} else {
types = madmin.MetricsAll
}
hosts := strings.Split(r.Form.Get("hosts"), ",")
byhost := strings.EqualFold(r.Form.Get("by-host"), "true")
var hostMap map[string]struct{}
if len(hosts) > 0 && hosts[0] != "" {
hostMap = make(map[string]struct{}, len(hosts))
for _, k := range hosts {
if k != "" {
hostMap[k] = struct{}{}
}
}
}
done := ctx.Done()
ticker := time.NewTicker(interval)
defer ticker.Stop()
w.Header().Set(xhttp.ContentType, string(mimeJSON))
for n > 0 {
var m madmin.RealtimeMetrics
mLocal := collectLocalMetrics(types, hostMap)
m.Merge(&mLocal)
// Allow half the interval for collecting remote...
cctx, cancel := context.WithTimeout(ctx, interval/2)
mRemote := collectRemoteMetrics(cctx, types, hostMap)
cancel()
m.Merge(&mRemote)
if !byhost {
m.ByHost = nil
}
m.Final = n <= 1
// Marshal API response
jsonBytes, err := json.Marshal(m)
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
_, err = w.Write(jsonBytes)
if err != nil {
n = 0
}
n--
if n <= 0 {
break
}
// Flush before waiting for next...
w.(http.Flusher).Flush()
select {
case <-ticker.C:
case <-done:
return
}
}
}
// DataUsageInfoHandler - GET /minio/admin/v3/datausage
// ----------
// Get server/cluster data usage info
@ -1300,72 +1388,45 @@ const (
// - input entry is not of the type *madmin.TraceInfo*
// - errOnly entries are to be traced, not status code 2xx, 3xx.
// - madmin.TraceInfo type is asked by opts
func mustTrace(entry interface{}, opts madmin.ServiceTraceOpts) (shouldTrace bool) {
trcInfo, ok := entry.(madmin.TraceInfo)
if !ok {
func shouldTrace(trcInfo madmin.TraceInfo, opts madmin.ServiceTraceOpts) (shouldTrace bool) {
// Reject all unwanted types.
want := opts.TraceTypes()
if !want.Contains(trcInfo.TraceType) {
return false
}
// Override shouldTrace decision with errOnly filtering
defer func() {
if shouldTrace && opts.OnlyErrors {
shouldTrace = trcInfo.RespInfo.StatusCode >= http.StatusBadRequest
}
}()
isHTTP := trcInfo.TraceType.Overlaps(madmin.TraceInternal|madmin.TraceS3) && trcInfo.HTTP != nil
if opts.Threshold > 0 {
var latency time.Duration
switch trcInfo.TraceType {
case madmin.TraceOS:
latency = trcInfo.OSStats.Duration
case madmin.TraceStorage:
latency = trcInfo.StorageStats.Duration
case madmin.TraceHTTP:
latency = trcInfo.CallStats.Latency
}
if latency < opts.Threshold {
// Check latency...
if opts.Threshold > 0 && trcInfo.Duration < opts.Threshold {
return false
}
// Check internal path
isInternal := isHTTP && HasPrefix(trcInfo.HTTP.ReqInfo.Path, minioReservedBucketPath+SlashSeparator)
if isInternal && !opts.Internal {
return false
}
// Filter non-errors.
if isHTTP && opts.OnlyErrors && trcInfo.HTTP.RespInfo.StatusCode < http.StatusBadRequest {
return false
}
if opts.Internal && trcInfo.TraceType == madmin.TraceHTTP && HasPrefix(trcInfo.ReqInfo.Path, minioReservedBucketPath+SlashSeparator) {
return true
}
if opts.S3 && trcInfo.TraceType == madmin.TraceHTTP && !HasPrefix(trcInfo.ReqInfo.Path, minioReservedBucketPath+SlashSeparator) {
return true
}
if opts.Storage && trcInfo.TraceType == madmin.TraceStorage {
return true
}
return opts.OS && trcInfo.TraceType == madmin.TraceOS
}
func extractTraceOptions(r *http.Request) (opts madmin.ServiceTraceOpts, err error) {
q := r.Form
opts.OnlyErrors = q.Get("err") == "true"
opts.S3 = q.Get("s3") == "true"
opts.Internal = q.Get("internal") == "true"
opts.Storage = q.Get("storage") == "true"
opts.OS = q.Get("os") == "true"
if err := opts.ParseParams(r); err != nil {
return opts, err
}
// Support deprecated 'all' query
if q.Get("all") == "true" {
if r.Form.Get("all") == "true" {
opts.S3 = true
opts.Internal = true
opts.Storage = true
opts.OS = true
}
if t := q.Get("threshold"); t != "" {
d, err := time.ParseDuration(t)
if err != nil {
return opts, err
}
opts.Threshold = d
// Older mc - cannot deal with more types...
}
return
}
@ -1388,20 +1449,21 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL)
return
}
setEventStreamHeaders(w)
// Trace Publisher and peer-trace-client uses nonblocking send and hence does not wait for slow receivers.
// Use buffered channel to take care of burst sends or slow w.Write()
traceCh := make(chan interface{}, 4000)
traceCh := make(chan pubsub.Maskable, 4000)
peers, _ := newPeerRestClients(globalEndpoints)
mask := pubsub.MaskFromMaskable(traceOpts.TraceTypes())
traceFn := func(entry interface{}) bool {
return mustTrace(entry, traceOpts)
err = globalTrace.Subscribe(mask, traceCh, ctx.Done(), func(entry pubsub.Maskable) bool {
if e, ok := entry.(madmin.TraceInfo); ok {
return shouldTrace(e, traceOpts)
}
err = globalTrace.Subscribe(traceCh, ctx.Done(), traceFn)
return false
})
if err != nil {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrSlowDown), r.URL)
return
@ -1442,7 +1504,7 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) {
}
}
// The handler sends console logs to the connected HTTP client.
// The ConsoleLogHandler handler sends console logs to the connected HTTP client.
func (a adminAPIHandlers) ConsoleLogHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "ConsoleLog")
@ -1460,11 +1522,10 @@ func (a adminAPIHandlers) ConsoleLogHandler(w http.ResponseWriter, r *http.Reque
limitLines = 10
}
logKind := r.Form.Get("logType")
if logKind == "" {
logKind = string(logger.All)
logKind := madmin.LogKind(strings.ToUpper(r.Form.Get("logType"))).LogMask()
if logKind == 0 {
logKind = madmin.LogMaskAll
}
logKind = strings.ToUpper(logKind)
// Avoid reusing tcp connection if read timeout is hit
// This is needed to make r.Context().Done() work as
@ -1473,7 +1534,7 @@ func (a adminAPIHandlers) ConsoleLogHandler(w http.ResponseWriter, r *http.Reque
setEventStreamHeaders(w)
logCh := make(chan interface{}, 4000)
logCh := make(chan pubsub.Maskable, 4000)
peers, _ := newPeerRestClients(globalEndpoints)

View File

@ -66,6 +66,8 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) {
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/storageinfo").HandlerFunc(gz(httpTraceAll(adminAPI.StorageInfoHandler)))
// DataUsageInfo operations
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/datausageinfo").HandlerFunc(gz(httpTraceAll(adminAPI.DataUsageInfoHandler)))
// Metrics operation
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/metrics").HandlerFunc(gz(httpTraceAll(adminAPI.MetricsHandler)))
if globalIsDistErasure || globalIsErasure {
// Heal operations

View File

@ -22,6 +22,7 @@ import (
"runtime"
"github.com/minio/madmin-go"
"github.com/minio/minio/internal/pubsub"
)
// healTask represents what to heal along with options
@ -49,10 +50,10 @@ type healRoutine struct {
workers int
}
func systemIO() int {
func activeListeners() int {
// Bucket notification and http trace are not costly, it is okay to ignore them
// while counting the number of concurrent connections
return int(globalHTTPListen.NumSubscribers()) + int(globalTrace.NumSubscribers())
return int(globalHTTPListen.NumSubscribers(pubsub.MaskAll)) + int(globalTrace.NumSubscribers(pubsub.MaskAll))
}
func waitForLowHTTPReq() {
@ -61,7 +62,7 @@ func waitForLowHTTPReq() {
currentIO = httpServer.GetRequestCount
}
globalHealConfig.Wait(currentIO, systemIO)
globalHealConfig.Wait(currentIO, activeListeners)
}
func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) {

View File

@ -26,7 +26,7 @@ import (
// ReplicationLatency holds information of bucket operations latency, such us uploads
type ReplicationLatency struct {
// Single & Multipart PUTs latency
UploadHistogram LastMinuteLatencies
UploadHistogram LastMinuteHistogram
}
// Merge two replication latency into a new one
@ -41,7 +41,7 @@ func (rl ReplicationLatency) getUploadLatency() (ret map[string]uint64) {
avg := rl.UploadHistogram.GetAvgData()
for k, v := range avg {
// Convert nanoseconds to milliseconds
ret[sizeTagToString(k)] = v.avg() / uint64(time.Millisecond)
ret[sizeTagToString(k)] = uint64(v.avg() / time.Millisecond)
}
return
}

View File

@ -18,10 +18,11 @@
package cmd
import (
ring "container/ring"
"container/ring"
"context"
"sync"
"github.com/minio/madmin-go"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/logger/message/log"
"github.com/minio/minio/internal/logger/target/console"
@ -71,11 +72,11 @@ func (sys *HTTPConsoleLoggerSys) SetNodeName(nodeName string) {
// HasLogListeners returns true if console log listeners are registered
// for this node or peers
func (sys *HTTPConsoleLoggerSys) HasLogListeners() bool {
return sys != nil && sys.pubsub.NumSubscribers() > 0
return sys != nil && sys.pubsub.NumSubscribers(madmin.LogMaskAll) > 0
}
// Subscribe starts console logging for this node.
func (sys *HTTPConsoleLoggerSys) Subscribe(subCh chan interface{}, doneCh <-chan struct{}, node string, last int, logKind string, filter func(entry interface{}) bool) error {
func (sys *HTTPConsoleLoggerSys) Subscribe(subCh chan pubsub.Maskable, doneCh <-chan struct{}, node string, last int, logKind madmin.LogMask, filter func(entry pubsub.Maskable) bool) error {
// Enable console logging for remote client.
if !sys.HasLogListeners() {
logger.AddSystemTarget(sys)
@ -115,8 +116,7 @@ func (sys *HTTPConsoleLoggerSys) Subscribe(subCh chan interface{}, doneCh <-chan
}
}
}
return sys.pubsub.Subscribe(subCh, doneCh, filter)
return sys.pubsub.Subscribe(pubsub.MaskFromMaskable(madmin.LogMaskAll), subCh, doneCh, filter)
}
// Init if HTTPConsoleLoggerSys is valid, always returns nil right now
@ -163,9 +163,9 @@ func (sys *HTTPConsoleLoggerSys) Type() types.TargetType {
// Send log message 'e' to console and publish to console
// log pubsub system
func (sys *HTTPConsoleLoggerSys) Send(e interface{}, logKind string) error {
func (sys *HTTPConsoleLoggerSys) Send(entry interface{}) error {
var lg log.Info
switch e := e.(type) {
switch e := entry.(type) {
case log.Entry:
lg = log.Info{Entry: e, NodeName: sys.nodeName}
case string:
@ -179,5 +179,5 @@ func (sys *HTTPConsoleLoggerSys) Send(e interface{}, logKind string) error {
sys.logBuf = sys.logBuf.Next()
sys.Unlock()
return sys.console.Send(e, string(logger.All))
return sys.console.Send(entry, string(logger.All))
}

315
cmd/data-scanner-metric.go Normal file
View File

@ -0,0 +1,315 @@
package cmd
import (
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/minio/madmin-go"
"github.com/minio/minio/internal/bucket/lifecycle"
)
//go:generate stringer -type=scannerMetric -trimprefix=scannerMetric $GOFILE
type scannerMetric uint8
type scannerMetrics struct {
// All fields must be accessed atomically and aligned.
operations [scannerMetricLast]uint64
latency [scannerMetricLastRealtime]lockedLastMinuteLatency
// actions records actions performed.
actions [lifecycle.ActionCount]uint64
actionsLatency [lifecycle.ActionCount]lockedLastMinuteLatency
// currentPaths contains (string,*currentPathTracker) for each disk processing.
// Alignment not required.
currentPaths sync.Map
cycleInfoMu sync.Mutex
cycleInfo *currentScannerCycle
}
var globalScannerMetrics scannerMetrics
const (
// START Realtime metrics, that only to records
// last minute latencies and total operation count.
scannerMetricReadMetadata scannerMetric = iota
scannerMetricCheckMissing
scannerMetricSaveUsage
scannerMetricApplyAll
scannerMetricApplyVersion
scannerMetricTierObjSweep
scannerMetricHealCheck
scannerMetricILM
scannerMetricCheckReplication
scannerMetricYield
// START Trace metrics:
scannerMetricStartTrace
scannerMetricScanObject // Scan object. All operations included.
// END realtime metrics:
scannerMetricLastRealtime
// Trace only metrics:
scannerMetricScanFolder // Scan a folder on disk, recursively.
scannerMetricScanCycle // Full cycle, cluster global
scannerMetricScanBucketDisk // Single bucket on one disk
// Must be last:
scannerMetricLast
)
// log scanner action.
// Use for s > scannerMetricStartTrace
func (p *scannerMetrics) log(s scannerMetric, paths ...string) func() {
startTime := time.Now()
return func() {
duration := time.Since(startTime)
atomic.AddUint64(&p.operations[s], 1)
if s < scannerMetricLastRealtime {
p.latency[s].add(duration)
}
if s > scannerMetricStartTrace && globalTrace.NumSubscribers(madmin.TraceScanner) > 0 {
globalTrace.Publish(scannerTrace(s, startTime, duration, strings.Join(paths, " ")))
}
}
}
// time a scanner action.
// Use for s < scannerMetricLastRealtime
func (p *scannerMetrics) time(s scannerMetric) func() {
startTime := time.Now()
return func() {
duration := time.Since(startTime)
atomic.AddUint64(&p.operations[s], 1)
if s < scannerMetricLastRealtime {
p.latency[s].add(duration)
}
}
}
// timeSize add time and size of a scanner action.
// Use for s < scannerMetricLastRealtime
func (p *scannerMetrics) timeSize(s scannerMetric) func(sz int) {
startTime := time.Now()
return func(sz int) {
duration := time.Since(startTime)
atomic.AddUint64(&p.operations[s], 1)
if s < scannerMetricLastRealtime {
p.latency[s].addSize(duration, int64(sz))
}
}
}
// incTime will increment time on metric s with a specific duration.
// Use for s < scannerMetricLastRealtime
func (p *scannerMetrics) incTime(s scannerMetric, d time.Duration) {
atomic.AddUint64(&p.operations[s], 1)
if s < scannerMetricLastRealtime {
p.latency[s].add(d)
}
}
func (p *scannerMetrics) incNoTime(s scannerMetric) {
atomic.AddUint64(&p.operations[s], 1)
if s < scannerMetricLastRealtime {
p.latency[s].add(0)
}
}
// timeILM times an ILM action.
// lifecycle.NoneAction is ignored.
// Use for s < scannerMetricLastRealtime
func (p *scannerMetrics) timeILM(a lifecycle.Action) func() {
if a == lifecycle.NoneAction || a >= lifecycle.ActionCount {
return func() {}
}
startTime := time.Now()
return func() {
duration := time.Since(startTime)
atomic.AddUint64(&p.actions[a], 1)
p.actionsLatency[a].add(duration)
}
}
type currentPathTracker struct {
name *unsafe.Pointer // contains atomically accessed *string
}
// currentPathUpdater provides a lightweight update function for keeping track of
// current objects for each disk.
// Returns a function that can be used to update the current object
// and a function to call to when processing finished.
func (p *scannerMetrics) currentPathUpdater(disk, initial string) (update func(path string), done func()) {
initialPtr := unsafe.Pointer(&initial)
tracker := &currentPathTracker{
name: &initialPtr,
}
p.currentPaths.Store(disk, tracker)
return func(path string) {
atomic.StorePointer(tracker.name, unsafe.Pointer(&path))
}, func() {
p.currentPaths.Delete(disk)
}
}
// getCurrentPaths returns the paths currently being processed.
func (p *scannerMetrics) getCurrentPaths() []string {
var res []string
prefix := globalMinioAddr + "/"
p.currentPaths.Range(func(key, value interface{}) bool {
// We are a bit paranoid, but better miss an entry than crash.
name, ok := key.(string)
if !ok {
return true
}
obj, ok := value.(*currentPathTracker)
if !ok {
return true
}
strptr := (*string)(atomic.LoadPointer(obj.name))
if strptr != nil {
res = append(res, prefix+name+"/"+*strptr)
}
return true
})
return res
}
// activeDisks returns the number of currently active disks.
// (since this is concurrent it may not be 100% reliable)
func (p *scannerMetrics) activeDisks() int {
var i int
p.currentPaths.Range(func(k, v interface{}) bool {
i++
return true
})
return i
}
// lifetime returns the lifetime count of the specified metric.
func (p *scannerMetrics) lifetime(m scannerMetric) uint64 {
if m < 0 || m >= scannerMetricLast {
return 0
}
val := atomic.LoadUint64(&p.operations[m])
return val
}
// lastMinute returns the last minute statistics of a metric.
// m should be < scannerMetricLastRealtime
func (p *scannerMetrics) lastMinute(m scannerMetric) AccElem {
if m < 0 || m >= scannerMetricLastRealtime {
return AccElem{}
}
val := p.latency[m].total()
return val
}
// lifetimeActions returns the lifetime count of the specified ilm metric.
func (p *scannerMetrics) lifetimeActions(a lifecycle.Action) uint64 {
if a == lifecycle.NoneAction || a >= lifecycle.ActionCount {
return 0
}
val := atomic.LoadUint64(&p.actions[a])
return val
}
// lastMinuteActions returns the last minute statistics of an ilm metric.
func (p *scannerMetrics) lastMinuteActions(a lifecycle.Action) AccElem {
if a == lifecycle.NoneAction || a >= lifecycle.ActionCount {
return AccElem{}
}
val := p.actionsLatency[a].total()
return val
}
// setCycle updates the current cycle metrics.
func (p *scannerMetrics) setCycle(c *currentScannerCycle) {
if c != nil {
c2 := c.clone()
c = &c2
}
p.cycleInfoMu.Lock()
p.cycleInfo = c
p.cycleInfoMu.Unlock()
}
// getCycle returns the current cycle metrics.
// If not nil, the returned value can safely be modified.
func (p *scannerMetrics) getCycle() *currentScannerCycle {
p.cycleInfoMu.Lock()
defer p.cycleInfoMu.Unlock()
if p.cycleInfo == nil {
return nil
}
c := p.cycleInfo.clone()
return &c
}
func (p *scannerMetrics) report() madmin.ScannerMetrics {
var m madmin.ScannerMetrics
cycle := p.getCycle()
if cycle != nil {
m.CurrentCycle = cycle.current
m.CyclesCompletedAt = cycle.cycleCompleted
m.CurrentStarted = cycle.started
}
m.CollectedAt = time.Now()
m.ActivePaths = p.getCurrentPaths()
m.LifeTimeOps = make(map[string]uint64, scannerMetricLast)
for i := scannerMetric(0); i < scannerMetricLast; i++ {
if n := atomic.LoadUint64(&p.operations[i]); n > 0 {
m.LifeTimeOps[i.String()] = n
}
}
if len(m.LifeTimeOps) == 0 {
m.LifeTimeOps = nil
}
m.LastMinute.Actions = make(map[string]madmin.TimedAction, scannerMetricLastRealtime)
for i := scannerMetric(0); i < scannerMetricLastRealtime; i++ {
lm := p.lastMinute(i)
if lm.N > 0 {
m.LastMinute.Actions[i.String()] = lm.asTimedAction()
}
}
if len(m.LastMinute.Actions) == 0 {
m.LastMinute.Actions = nil
}
// ILM
m.LifeTimeILM = make(map[string]uint64)
for i := lifecycle.NoneAction + 1; i < lifecycle.ActionCount; i++ {
if n := atomic.LoadUint64(&p.actions[i]); n > 0 {
m.LifeTimeILM[i.String()] = n
}
}
if len(m.LifeTimeILM) == 0 {
m.LifeTimeILM = nil
}
if len(m.LifeTimeILM) > 0 {
m.LastMinute.ILM = make(map[string]madmin.TimedAction, len(m.LifeTimeILM))
for i := lifecycle.NoneAction + 1; i < lifecycle.ActionCount; i++ {
lm := p.lastMinuteActions(i)
if lm.N > 0 {
m.LastMinute.ILM[i.String()] = madmin.TimedAction{Count: uint64(lm.N), AccTime: uint64(lm.Total)}
}
}
if len(m.LastMinute.ILM) == 0 {
m.LastMinute.ILM = nil
}
}
return m
}

View File

@ -31,7 +31,6 @@ import (
"path"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/bits-and-blooms/bloom/v3"
@ -66,7 +65,7 @@ var (
dataScannerLeaderLockTimeout = newDynamicTimeout(30*time.Second, 10*time.Second)
// Sleeper values are updated when config is loaded.
scannerSleeper = newDynamicSleeper(10, 10*time.Second)
scannerSleeper = newDynamicSleeper(10, 10*time.Second, true)
scannerCycle = uatomic.NewDuration(dataScannerStartDelay)
)
@ -162,35 +161,45 @@ func runDataScanner(pctx context.Context, objAPI ObjectLayer) {
// No unlock for "leader" lock.
// Load current bloom cycle
nextBloomCycle := intDataUpdateTracker.current() + 1
var cycleInfo currentScannerCycle
cycleInfo.next = intDataUpdateTracker.current() + 1
buf, _ := readConfig(ctx, objAPI, dataUsageBloomNamePath)
if len(buf) >= 8 {
if err = binary.Read(bytes.NewReader(buf), binary.LittleEndian, &nextBloomCycle); err != nil {
if len(buf) == 8 {
cycleInfo.next = binary.LittleEndian.Uint64(buf)
} else if len(buf) > 8 {
cycleInfo.next = binary.LittleEndian.Uint64(buf[:8])
buf = buf[8:]
_, err = cycleInfo.UnmarshalMsg(buf)
logger.LogIf(ctx, err)
}
}
scannerTimer := time.NewTimer(scannerCycle.Load())
defer scannerTimer.Stop()
defer globalScannerMetrics.setCycle(nil)
for {
select {
case <-ctx.Done():
return
case <-scannerTimer.C:
if intDataUpdateTracker.debug {
console.Debugln("starting scanner cycle")
}
// Reset the timer for next cycle.
// If scanner takes longer we start at once.
scannerTimer.Reset(scannerCycle.Load())
stopFn := globalScannerMetrics.log(scannerMetricScanCycle)
cycleInfo.current = cycleInfo.next
cycleInfo.started = time.Now()
globalScannerMetrics.setCycle(&cycleInfo)
bgHealInfo := readBackgroundHealInfo(ctx, objAPI)
scanMode := getCycleScanMode(nextBloomCycle, bgHealInfo.BitrotStartCycle, bgHealInfo.BitrotStartTime)
scanMode := getCycleScanMode(cycleInfo.current, bgHealInfo.BitrotStartCycle, bgHealInfo.BitrotStartTime)
if bgHealInfo.CurrentScanMode != scanMode {
newHealInfo := bgHealInfo
newHealInfo.CurrentScanMode = scanMode
if scanMode == madmin.HealDeepScan {
newHealInfo.BitrotStartTime = time.Now().UTC()
newHealInfo.BitrotStartCycle = nextBloomCycle
newHealInfo.BitrotStartCycle = cycleInfo.current
}
saveBackgroundHealInfo(ctx, objAPI, newHealInfo)
}
@ -198,23 +207,28 @@ func runDataScanner(pctx context.Context, objAPI ObjectLayer) {
// Wait before starting next cycle and wait on startup.
results := make(chan DataUsageInfo, 1)
go storeDataUsageInBackend(ctx, objAPI, results)
bf, err := globalNotificationSys.updateBloomFilter(ctx, nextBloomCycle)
bf, err := globalNotificationSys.updateBloomFilter(ctx, cycleInfo.current)
logger.LogIf(ctx, err)
err = objAPI.NSScanner(ctx, bf, results, uint32(nextBloomCycle), scanMode)
err = objAPI.NSScanner(ctx, bf, results, uint32(cycleInfo.current), scanMode)
logger.LogIf(ctx, err)
stopFn()
if err == nil {
// Store new cycle...
nextBloomCycle++
var tmp [8]byte
binary.LittleEndian.PutUint64(tmp[:], nextBloomCycle)
if err = saveConfig(ctx, objAPI, dataUsageBloomNamePath, tmp[:]); err != nil {
cycleInfo.next++
cycleInfo.current = 0
cycleInfo.cycleCompleted = append(cycleInfo.cycleCompleted, time.Now())
if len(cycleInfo.cycleCompleted) > dataUsageUpdateDirCycles {
cycleInfo.cycleCompleted = cycleInfo.cycleCompleted[len(cycleInfo.cycleCompleted)-dataUsageUpdateDirCycles:]
}
globalScannerMetrics.setCycle(&cycleInfo)
tmp := make([]byte, 8, 8+cycleInfo.Msgsize())
// Cycle for backward compat.
binary.LittleEndian.PutUint64(tmp, cycleInfo.next)
tmp, _ = cycleInfo.MarshalMsg(tmp)
err = saveConfig(ctx, objAPI, dataUsageBloomNamePath, tmp)
logger.LogIf(ctx, err)
}
}
// Reset the timer for next cycle.
scannerTimer.Reset(scannerCycle.Load())
}
}
}
@ -244,24 +258,11 @@ type folderScanner struct {
// Will not be closed when returned.
updates chan<- dataUsageEntry
lastUpdate time.Time
// updateCurrentPath should be called whenever a new path is scanned.
updateCurrentPath func(string)
}
type scannerStats struct {
// All fields must be accessed atomically and aligned.
accTotalObjects uint64
accTotalVersions uint64
accFolders uint64
bucketsStarted uint64
bucketsFinished uint64
ilmChecks uint64
// actions records actions performed.
actions [lifecycle.ActionCount]uint64
}
var globalScannerStats scannerStats
// Cache structure and compaction:
//
// A cache structure will be kept with a tree of usages.
@ -305,24 +306,14 @@ var globalScannerStats scannerStats
// Before each operation sleepDuration is called which can be used to temporarily halt the scanner.
// If the supplied context is canceled the function will return at the first chance.
func scanDataFolder(ctx context.Context, poolIdx, setIdx int, basePath string, cache dataUsageCache, getSize getSizeFn, scanMode madmin.HealScanMode) (dataUsageCache, error) {
t := UTCNow()
logPrefix := color.Green("data-usage: ")
logSuffix := color.Blue("- %v + %v", basePath, cache.Info.Name)
atomic.AddUint64(&globalScannerStats.bucketsStarted, 1)
defer func() {
atomic.AddUint64(&globalScannerStats.bucketsFinished, 1)
}()
if intDataUpdateTracker.debug {
defer func() {
console.Debugf(logPrefix+" Scanner time: %v %s\n", time.Since(t), logSuffix)
}()
}
switch cache.Info.Name {
case "", dataUsageRoot:
return cache, errors.New("internal error: root scan attempted")
}
updatePath, closeDisk := globalScannerMetrics.currentPathUpdater(basePath, cache.Info.Name)
defer closeDisk()
s := folderScanner{
root: basePath,
@ -335,6 +326,7 @@ func scanDataFolder(ctx context.Context, poolIdx, setIdx int, basePath string, c
healObjectSelect: 0,
scanMode: scanMode,
updates: cache.Info.updates,
updateCurrentPath: updatePath,
}
// Add disks for set healing.
@ -366,14 +358,8 @@ func scanDataFolder(ctx context.Context, poolIdx, setIdx int, basePath string, c
s.withFilter = nil
}
}
if s.dataUsageScannerDebug {
console.Debugf(logPrefix+"Start scanning. Bloom filter: %v %s\n", s.withFilter != nil, logSuffix)
}
done := ctx.Done()
if s.dataUsageScannerDebug {
console.Debugf(logPrefix+"Cycle: %v, Entries: %v %s\n", cache.Info.NextCycle, len(cache.Cache), logSuffix)
}
// Read top level in bucket.
select {
@ -389,9 +375,6 @@ func scanDataFolder(ctx context.Context, poolIdx, setIdx int, basePath string, c
return cache, err
}
if s.dataUsageScannerDebug {
console.Debugf(logPrefix+"Finished scanner, %v entries (%+v) %s \n", len(s.newCache.Cache), *s.newCache.sizeRecursive(s.newCache.Info.Name), logSuffix)
}
s.newCache.Info.LastUpdate = UTCNow()
s.newCache.Info.NextCycle = cache.Info.NextCycle
return s.newCache, nil
@ -420,10 +403,10 @@ func (f *folderScanner) sendUpdate() {
func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, into *dataUsageEntry) error {
done := ctx.Done()
scannerLogPrefix := color.Green("folder-scanner:")
thisHash := hashPath(folder.name)
// Store initial compaction state.
wasCompacted := into.Compacted
atomic.AddUint64(&globalScannerStats.accFolders, 1)
for {
select {
@ -648,7 +631,10 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
f.updateCache.replaceHashed(h, &thisHash, dataUsageEntry{})
}
}
f.updateCurrentPath(folder.name)
stopFn := globalScannerMetrics.log(scannerMetricScanFolder, f.root, folder.name)
scanFolder(folder)
stopFn()
// Add new folders if this is new and we don't have existing.
if !into.Compacted {
parent := f.updateCache.find(thisHash.Key())
@ -676,7 +662,10 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
folder.objectHealProbDiv = f.healFolderInclude
}
}
f.updateCurrentPath(folder.name)
stopFn := globalScannerMetrics.log(scannerMetricScanFolder, f.root, folder.name, "EXISTING")
scanFolder(folder)
stopFn()
}
// Scan for healing
@ -717,9 +706,8 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
healObjectsPrefix := color.Green("healObjects:")
for k := range abandonedChildren {
bucket, prefix := path2BucketObject(k)
if f.dataUsageScannerDebug {
console.Debugf(scannerLogPrefix+" checking disappeared folder: %v/%v\n", bucket, prefix)
}
stopFn := globalScannerMetrics.time(scannerMetricCheckMissing)
f.updateCurrentPath(k)
if bucket != resolver.bucket {
// Bucket might be missing as well with abandoned children.
@ -807,6 +795,7 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
},
})
stopFn()
if f.dataUsageScannerDebug && err != nil && err != errFileNotFound {
console.Debugf(healObjectsPrefix+" checking returned value %v (%T)\n", err, err)
}
@ -814,7 +803,9 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int
// Add unless healing returned an error.
if foundObjs {
this := cachedFolder{name: k, parent: &thisHash, objectHealProbDiv: 1}
stopFn := globalScannerMetrics.log(scannerMetricScanFolder, f.root, this.name, "HEALED")
scanFolder(this)
stopFn()
}
}
break
@ -964,7 +955,6 @@ func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, oi Obje
return false, size
}
atomic.AddUint64(&globalScannerStats.ilmChecks, 1)
versionID := oi.VersionID
rCfg, _ := globalBucketObjectLockSys.Get(i.bucket)
action := evalActionFromLifecycle(ctx, *i.lifeCycle, rCfg, oi, false)
@ -975,7 +965,7 @@ func (i *scannerItem) applyLifecycle(ctx context.Context, o ObjectLayer, oi Obje
console.Debugf(applyActionsLogPrefix+" lifecycle: %q Initial scan: %v\n", i.objectPath(), action)
}
}
atomic.AddUint64(&globalScannerStats.actions[action], 1)
defer globalScannerMetrics.timeILM(action)
switch action {
case lifecycle.DeleteAction, lifecycle.DeleteVersionAction, lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
@ -1092,16 +1082,23 @@ func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fi
// The metadata will be compared to consensus on the object layer before any changes are applied.
// If no metadata is supplied, -1 is returned if no action is taken.
func (i *scannerItem) applyActions(ctx context.Context, o ObjectLayer, oi ObjectInfo, sizeS *sizeSummary) int64 {
done := globalScannerMetrics.time(scannerMetricILM)
applied, size := i.applyLifecycle(ctx, o, oi)
done()
// For instance, an applied lifecycle means we remove/transitioned an object
// from the current deployment, which means we don't have to call healing
// routine even if we are asked to do via heal flag.
if !applied {
if i.heal.enabled {
done := globalScannerMetrics.time(scannerMetricHealCheck)
size = i.applyHealing(ctx, o, oi)
done()
}
// replicate only if lifecycle rules are not applied.
done := globalScannerMetrics.time(scannerMetricCheckReplication)
i.healReplication(ctx, o, oi.Clone(), sizeS)
done()
}
return size
}
@ -1341,15 +1338,20 @@ type dynamicSleeper struct {
// cycle will be closed
cycle chan struct{}
// isScanner should be set when this is used by the scanner
// to record metrics.
isScanner bool
}
// newDynamicSleeper
func newDynamicSleeper(factor float64, maxWait time.Duration) *dynamicSleeper {
func newDynamicSleeper(factor float64, maxWait time.Duration, isScanner bool) *dynamicSleeper {
return &dynamicSleeper{
factor: factor,
cycle: make(chan struct{}),
maxSleep: maxWait,
minSleep: 100 * time.Microsecond,
isScanner: isScanner,
}
}
@ -1379,15 +1381,24 @@ func (d *dynamicSleeper) Timer(ctx context.Context) func() {
select {
case <-ctx.Done():
if !timer.Stop() {
if d.isScanner {
globalScannerMetrics.incTime(scannerMetricYield, wantSleep)
}
<-timer.C
}
return
case <-timer.C:
if d.isScanner {
globalScannerMetrics.incTime(scannerMetricYield, wantSleep)
}
return
case <-cycle:
if !timer.Stop() {
// We expired.
<-timer.C
if d.isScanner {
globalScannerMetrics.incTime(scannerMetricYield, wantSleep)
}
return
}
}
@ -1418,14 +1429,23 @@ func (d *dynamicSleeper) Sleep(ctx context.Context, base time.Duration) {
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
if d.isScanner {
globalScannerMetrics.incTime(scannerMetricYield, wantSleep)
}
}
return
case <-timer.C:
if d.isScanner {
globalScannerMetrics.incTime(scannerMetricYield, wantSleep)
}
return
case <-cycle:
if !timer.Stop() {
// We expired.
<-timer.C
if d.isScanner {
globalScannerMetrics.incTime(scannerMetricYield, wantSleep)
}
return
}
}

View File

@ -63,7 +63,7 @@ func (t *testingLogger) Type() types.TargetType {
return types.TargetHTTP
}
func (t *testingLogger) Send(entry interface{}, errKind string) error {
func (t *testingLogger) Send(entry interface{}) error {
t.mu.Lock()
defer t.mu.Unlock()
if t.t == nil {
@ -75,7 +75,7 @@ func (t *testingLogger) Send(entry interface{}, errKind string) error {
}
t.t.Helper()
t.t.Log(e.Level, ":", errKind, e.Message)
t.t.Log(e.Level, ":", e.Message)
return nil
}

View File

@ -1323,3 +1323,19 @@ func (z dataUsageHashMap) Msgsize() (s int) {
}
return
}
//msgp:encode ignore currentScannerCycle
//msgp:decode ignore currentScannerCycle
type currentScannerCycle struct {
current uint64
next uint64
started time.Time
cycleCompleted []time.Time
}
// clone returns a clone.
func (z currentScannerCycle) clone() currentScannerCycle {
z.cycleCompleted = append(make([]time.Time, 0, len(z.cycleCompleted)), z.cycleCompleted...)
return z
}

View File

@ -3,6 +3,8 @@ package cmd
// Code generated by github.com/tinylib/msgp DO NOT EDIT.
import (
"time"
"github.com/tinylib/msgp/msgp"
)
@ -284,6 +286,101 @@ func (z *allTierStats) Msgsize() (s int) {
return
}
// MarshalMsg implements msgp.Marshaler
func (z *currentScannerCycle) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 4
// string "current"
o = append(o, 0x84, 0xa7, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74)
o = msgp.AppendUint64(o, z.current)
// string "next"
o = append(o, 0xa4, 0x6e, 0x65, 0x78, 0x74)
o = msgp.AppendUint64(o, z.next)
// string "started"
o = append(o, 0xa7, 0x73, 0x74, 0x61, 0x72, 0x74, 0x65, 0x64)
o = msgp.AppendTime(o, z.started)
// string "cycleCompleted"
o = append(o, 0xae, 0x63, 0x79, 0x63, 0x6c, 0x65, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64)
o = msgp.AppendArrayHeader(o, uint32(len(z.cycleCompleted)))
for za0001 := range z.cycleCompleted {
o = msgp.AppendTime(o, z.cycleCompleted[za0001])
}
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *currentScannerCycle) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "current":
z.current, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "current")
return
}
case "next":
z.next, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "next")
return
}
case "started":
z.started, bts, err = msgp.ReadTimeBytes(bts)
if err != nil {
err = msgp.WrapError(err, "started")
return
}
case "cycleCompleted":
var zb0002 uint32
zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "cycleCompleted")
return
}
if cap(z.cycleCompleted) >= int(zb0002) {
z.cycleCompleted = (z.cycleCompleted)[:zb0002]
} else {
z.cycleCompleted = make([]time.Time, zb0002)
}
for za0001 := range z.cycleCompleted {
z.cycleCompleted[za0001], bts, err = msgp.ReadTimeBytes(bts)
if err != nil {
err = msgp.WrapError(err, "cycleCompleted", za0001)
return
}
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *currentScannerCycle) Msgsize() (s int) {
s = 1 + 8 + msgp.Uint64Size + 5 + msgp.Uint64Size + 8 + msgp.TimeSize + 15 + msgp.ArrayHeaderSize + (len(z.cycleCompleted) * (msgp.TimeSize))
return
}
// DecodeMsg implements msgp.Decodable
func (z *dataUsageCache) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte

View File

@ -122,6 +122,64 @@ func BenchmarkDecodeallTierStats(b *testing.B) {
}
}
func TestMarshalUnmarshalcurrentScannerCycle(t *testing.T) {
v := currentScannerCycle{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
}
left, err := v.UnmarshalMsg(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
}
left, err = msgp.Skip(bts)
if err != nil {
t.Fatal(err)
}
if len(left) > 0 {
t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
}
}
func BenchmarkMarshalMsgcurrentScannerCycle(b *testing.B) {
v := currentScannerCycle{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v.MarshalMsg(nil)
}
}
func BenchmarkAppendMsgcurrentScannerCycle(b *testing.B) {
v := currentScannerCycle{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bts, _ = v.MarshalMsg(bts[0:0])
}
}
func BenchmarkUnmarshalcurrentScannerCycle(b *testing.B) {
v := currentScannerCycle{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := v.UnmarshalMsg(bts)
if err != nil {
b.Fatal(err)
}
}
}
func TestMarshalUnmarshaldataUsageCache(t *testing.T) {
v := dataUsageCache{}
bts, err := v.MarshalMsg(nil)

View File

@ -466,7 +466,7 @@ func newErasureSets(ctx context.Context, endpoints PoolEndpoints, storageDisks [
getDisks: s.GetDisks(i),
getLockers: s.GetLockers(i),
getEndpoints: s.GetEndpoints(i),
deletedCleanupSleeper: newDynamicSleeper(10, 2*time.Second),
deletedCleanupSleeper: newDynamicSleeper(10, 2*time.Second, false),
nsMutex: mutex,
bp: bp,
bpOld: bpOld,

View File

@ -90,7 +90,7 @@ func newErasureSingle(ctx context.Context, storageDisk StorageAPI, format *forma
format: format,
nsMutex: newNSLock(false),
bp: bp,
deletedCleanupSleeper: newDynamicSleeper(10, 2*time.Second),
deletedCleanupSleeper: newDynamicSleeper(10, 2*time.Second, false),
}
// start cleanup stale uploads go-routine.

View File

@ -213,11 +213,13 @@ func getDisksInfo(disks []StorageAPI, endpoints []Endpoint) (disksInfo []madmin.
}
}
di.Metrics = &madmin.DiskMetrics{
APILatencies: make(map[string]interface{}),
APICalls: make(map[string]uint64),
LastMinute: make(map[string]madmin.TimedAction, len(info.Metrics.LastMinute)),
APICalls: make(map[string]uint64, len(info.Metrics.APICalls)),
}
for k, v := range info.Metrics.LastMinute {
if v.N > 0 {
di.Metrics.LastMinute[k] = v.asTimedAction()
}
for k, v := range info.Metrics.APILatencies {
di.Metrics.APILatencies[k] = v
}
for k, v := range info.Metrics.APICalls {
di.Metrics.APICalls[k] = v

View File

@ -31,7 +31,6 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"
jsoniter "github.com/json-iterator/go"
@ -349,6 +348,7 @@ func (fs *FSObjects) NSScanner(ctx context.Context, bf *bloomFilter, updates cha
// A partially updated bucket may be returned.
func (fs *FSObjects) scanBucket(ctx context.Context, bucket string, cache dataUsageCache) (dataUsageCache, error) {
defer close(cache.Info.updates)
defer globalScannerMetrics.log(scannerMetricScanBucketDisk, fs.fsPath, bucket)()
// Get bucket policy
// Check if the current bucket has a configured lifecycle policy
@ -363,6 +363,16 @@ func (fs *FSObjects) scanBucket(ctx context.Context, bucket string, cache dataUs
// Load bucket info.
cache, err = scanDataFolder(ctx, -1, -1, fs.fsPath, cache, func(item scannerItem) (sizeSummary, error) {
bucket, object := item.bucket, item.objectPath()
stopFn := globalScannerMetrics.log(scannerMetricScanObject, fs.fsPath, PathJoin(item.bucket, item.objectPath()))
defer stopFn()
var fsMetaBytes []byte
done := globalScannerMetrics.timeSize(scannerMetricReadMetadata)
defer func() {
if done != nil {
done(len(fsMetaBytes))
}
}()
fsMetaBytes, err := xioutil.ReadFile(pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fs.metaJSONFile))
if err != nil && !osIsNotExist(err) {
if intDataUpdateTracker.debug {
@ -391,11 +401,16 @@ func (fs *FSObjects) scanBucket(ctx context.Context, bucket string, cache dataUs
}
return sizeSummary{}, errSkipFile
}
done(len(fsMetaBytes))
done = nil
// FS has no "all versions". Increment the counter, though
globalScannerMetrics.incNoTime(scannerMetricApplyAll)
oi := fsMeta.ToObjectInfo(bucket, object, fi)
atomic.AddUint64(&globalScannerStats.accTotalVersions, 1)
atomic.AddUint64(&globalScannerStats.accTotalObjects, 1)
doneVer := globalScannerMetrics.time(scannerMetricApplyVersion)
sz := item.applyActions(ctx, fs, oi, &sizeSummary{})
doneVer()
if sz >= 0 {
return sizeSummary{totalSize: sz, versions: 1}, nil
}

View File

@ -230,6 +230,7 @@ var (
globalTrace = pubsub.New(8)
// global Listen system to send S3 API events to registered listeners
// Objects are expected to be event.Event
globalHTTPListen = pubsub.New(0)
// global console system to send console logs to

View File

@ -125,7 +125,7 @@ type traceCtxt struct {
// otherwise, generate a trace event with request information but no response.
func httpTracer(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if globalTrace.NumSubscribers() == 0 {
if globalTrace.NumSubscribers(madmin.TraceS3|madmin.TraceInternal) == 0 {
h.ServeHTTP(w, r)
return
}
@ -149,6 +149,14 @@ func httpTracer(h http.Handler) http.Handler {
h.ServeHTTP(respRecorder, r)
reqEndTime := time.Now().UTC()
tt := madmin.TraceInternal
if strings.HasPrefix(tc.funcName, "s3.") {
tt = madmin.TraceS3
}
// No need to continue if no subscribers for actual type...
if globalTrace.NumSubscribers(tt) == 0 {
return
}
// Calculate input body size with headers
reqHeaders := r.Header.Clone()
reqHeaders.Set("Host", r.Host)
@ -185,7 +193,15 @@ func httpTracer(h http.Handler) http.Handler {
funcName = "<unknown>"
}
rq := madmin.TraceRequestInfo{
t := madmin.TraceInfo{
TraceType: tt,
FuncName: funcName,
NodeName: nodeName,
Time: reqStartTime,
Duration: reqEndTime.Sub(respRecorder.StartTime),
Path: reqPath,
HTTP: &madmin.TraceHTTPStats{
ReqInfo: madmin.TraceRequestInfo{
Time: reqStartTime,
Proto: r.Proto,
Method: r.Method,
@ -194,30 +210,20 @@ func httpTracer(h http.Handler) http.Handler {
Headers: reqHeaders,
Path: reqPath,
Body: reqRecorder.Data(),
}
rs := madmin.TraceResponseInfo{
},
RespInfo: madmin.TraceResponseInfo{
Time: reqEndTime,
Headers: respRecorder.Header().Clone(),
StatusCode: respRecorder.StatusCode,
Body: respRecorder.Body(),
}
cs := madmin.TraceCallStats{
Latency: rs.Time.Sub(respRecorder.StartTime),
},
CallStats: madmin.TraceCallStats{
Latency: reqEndTime.Sub(respRecorder.StartTime),
InputBytes: inputBytes,
OutputBytes: respRecorder.Size(),
TimeToFirstByte: respRecorder.TimeToFirstByte,
}
t := madmin.TraceInfo{
TraceType: madmin.TraceHTTP,
FuncName: funcName,
NodeName: nodeName,
Time: reqStartTime,
ReqInfo: rq,
RespInfo: rs,
CallStats: cs,
},
},
}
globalTrace.Publish(t)

View File

@ -21,6 +21,8 @@ package cmd
import (
"time"
"github.com/minio/madmin-go"
)
const (
@ -75,29 +77,49 @@ func sizeTagToString(tag int) string {
// AccElem holds information for calculating an average value
type AccElem struct {
Total int64
Size int64
N int64
}
// Add a duration to a single element.
func (a *AccElem) add(dur time.Duration) {
if dur < 0 {
dur = 0
}
a.Total += int64(dur)
a.N++
}
// Add a duration to a single element.
func (a *AccElem) addSize(dur time.Duration, sz int64) {
if dur < 0 {
dur = 0
}
a.Total += int64(dur)
a.Size += sz
a.N++
}
// Merge b into a.
func (a *AccElem) merge(b AccElem) {
a.N += b.N
a.Total += b.Total
a.Size += b.Size
}
// Avg converts total to average.
func (a AccElem) avg() uint64 {
// Avg returns average time spent.
func (a AccElem) avg() time.Duration {
if a.N >= 1 && a.Total > 0 {
return uint64(a.Total / a.N)
return time.Duration(a.Total / a.N)
}
return 0
}
// asTimedAction returns the element as a madmin.TimedAction.
func (a AccElem) asTimedAction() madmin.TimedAction {
return madmin.TimedAction{AccTime: uint64(a.Total), Count: uint64(a.N), Bytes: uint64(a.Size)}
}
// lastMinuteLatency keeps track of last minute latency.
type lastMinuteLatency struct {
Totals [60]AccElem
@ -118,6 +140,7 @@ func (l lastMinuteLatency) merge(o lastMinuteLatency) (merged lastMinuteLatency)
merged.Totals[i] = AccElem{
Total: l.Totals[i].Total + o.Totals[i].Total,
N: l.Totals[i].N + o.Totals[i].N,
Size: l.Totals[i].Size + o.Totals[i].Size,
}
}
return merged
@ -132,8 +155,17 @@ func (l *lastMinuteLatency) add(t time.Duration) {
l.LastSec = sec
}
// Add a new duration data
func (l *lastMinuteLatency) addSize(t time.Duration, sz int64) {
sec := time.Now().Unix()
l.forwardTo(sec)
winIdx := sec % 60
l.Totals[winIdx].addSize(t, sz)
l.LastSec = sec
}
// Merge all recorded latencies of last minute into one
func (l *lastMinuteLatency) getAvgData() AccElem {
func (l *lastMinuteLatency) getTotal() AccElem {
var res AccElem
sec := time.Now().Unix()
l.forwardTo(sec)
@ -160,11 +192,11 @@ func (l *lastMinuteLatency) forwardTo(t int64) {
}
}
// LastMinuteLatencies keeps track of last minute latencies.
type LastMinuteLatencies [sizeLastElemMarker]lastMinuteLatency
// LastMinuteHistogram keeps track of last minute sizes added.
type LastMinuteHistogram [sizeLastElemMarker]lastMinuteLatency
// Merge safely merges two LastMinuteLatencies structures into one
func (l LastMinuteLatencies) Merge(o LastMinuteLatencies) (merged LastMinuteLatencies) {
// Merge safely merges two LastMinuteHistogram structures into one
func (l LastMinuteHistogram) Merge(o LastMinuteHistogram) (merged LastMinuteHistogram) {
for i := range l {
merged[i] = l[i].merge(o[i])
}
@ -172,16 +204,16 @@ func (l LastMinuteLatencies) Merge(o LastMinuteLatencies) (merged LastMinuteLate
}
// Add latency t from object with the specified size.
func (l *LastMinuteLatencies) Add(size int64, t time.Duration) {
func (l *LastMinuteHistogram) Add(size int64, t time.Duration) {
l[sizeToTag(size)].add(t)
}
// GetAvgData will return the average for each bucket from the last time minute.
// The number of objects is also included.
func (l *LastMinuteLatencies) GetAvgData() [sizeLastElemMarker]AccElem {
func (l *LastMinuteHistogram) GetAvgData() [sizeLastElemMarker]AccElem {
var res [sizeLastElemMarker]AccElem
for i, elem := range l[:] {
res[i] = elem.getAvgData()
res[i] = elem.getTotal()
}
return res
}

View File

@ -30,6 +30,12 @@ func (z *AccElem) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "Total")
return
}
case "Size":
z.Size, err = dc.ReadInt64()
if err != nil {
err = msgp.WrapError(err, "Size")
return
}
case "N":
z.N, err = dc.ReadInt64()
if err != nil {
@ -49,9 +55,9 @@ func (z *AccElem) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable
func (z AccElem) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 2
// map header, size 3
// write "Total"
err = en.Append(0x82, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
err = en.Append(0x83, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
if err != nil {
return
}
@ -60,6 +66,16 @@ func (z AccElem) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "Total")
return
}
// write "Size"
err = en.Append(0xa4, 0x53, 0x69, 0x7a, 0x65)
if err != nil {
return
}
err = en.WriteInt64(z.Size)
if err != nil {
err = msgp.WrapError(err, "Size")
return
}
// write "N"
err = en.Append(0xa1, 0x4e)
if err != nil {
@ -76,10 +92,13 @@ func (z AccElem) EncodeMsg(en *msgp.Writer) (err error) {
// MarshalMsg implements msgp.Marshaler
func (z AccElem) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 2
// map header, size 3
// string "Total"
o = append(o, 0x82, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
o = append(o, 0x83, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
o = msgp.AppendInt64(o, z.Total)
// string "Size"
o = append(o, 0xa4, 0x53, 0x69, 0x7a, 0x65)
o = msgp.AppendInt64(o, z.Size)
// string "N"
o = append(o, 0xa1, 0x4e)
o = msgp.AppendInt64(o, z.N)
@ -110,6 +129,12 @@ func (z *AccElem) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "Total")
return
}
case "Size":
z.Size, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "Size")
return
}
case "N":
z.N, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
@ -130,12 +155,12 @@ func (z *AccElem) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z AccElem) Msgsize() (s int) {
s = 1 + 6 + msgp.Int64Size + 2 + msgp.Int64Size
s = 1 + 6 + msgp.Int64Size + 5 + msgp.Int64Size + 2 + msgp.Int64Size
return
}
// DecodeMsg implements msgp.Decodable
func (z *LastMinuteLatencies) DecodeMsg(dc *msgp.Reader) (err error) {
func (z *LastMinuteHistogram) DecodeMsg(dc *msgp.Reader) (err error) {
var zb0001 uint32
zb0001, err = dc.ReadArrayHeader()
if err != nil {
@ -195,6 +220,12 @@ func (z *LastMinuteLatencies) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, za0001, "Totals", za0002, "Total")
return
}
case "Size":
z[za0001].Totals[za0002].Size, err = dc.ReadInt64()
if err != nil {
err = msgp.WrapError(err, za0001, "Totals", za0002, "Size")
return
}
case "N":
z[za0001].Totals[za0002].N, err = dc.ReadInt64()
if err != nil {
@ -229,7 +260,7 @@ func (z *LastMinuteLatencies) DecodeMsg(dc *msgp.Reader) (err error) {
}
// EncodeMsg implements msgp.Encodable
func (z *LastMinuteLatencies) EncodeMsg(en *msgp.Writer) (err error) {
func (z *LastMinuteHistogram) EncodeMsg(en *msgp.Writer) (err error) {
err = en.WriteArrayHeader(uint32(sizeLastElemMarker))
if err != nil {
err = msgp.WrapError(err)
@ -248,9 +279,9 @@ func (z *LastMinuteLatencies) EncodeMsg(en *msgp.Writer) (err error) {
return
}
for za0002 := range z[za0001].Totals {
// map header, size 2
// map header, size 3
// write "Total"
err = en.Append(0x82, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
err = en.Append(0x83, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
if err != nil {
return
}
@ -259,6 +290,16 @@ func (z *LastMinuteLatencies) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, za0001, "Totals", za0002, "Total")
return
}
// write "Size"
err = en.Append(0xa4, 0x53, 0x69, 0x7a, 0x65)
if err != nil {
return
}
err = en.WriteInt64(z[za0001].Totals[za0002].Size)
if err != nil {
err = msgp.WrapError(err, za0001, "Totals", za0002, "Size")
return
}
// write "N"
err = en.Append(0xa1, 0x4e)
if err != nil {
@ -285,7 +326,7 @@ func (z *LastMinuteLatencies) EncodeMsg(en *msgp.Writer) (err error) {
}
// MarshalMsg implements msgp.Marshaler
func (z *LastMinuteLatencies) MarshalMsg(b []byte) (o []byte, err error) {
func (z *LastMinuteHistogram) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
o = msgp.AppendArrayHeader(o, uint32(sizeLastElemMarker))
for za0001 := range z {
@ -294,10 +335,13 @@ func (z *LastMinuteLatencies) MarshalMsg(b []byte) (o []byte, err error) {
o = append(o, 0x82, 0xa6, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x73)
o = msgp.AppendArrayHeader(o, uint32(60))
for za0002 := range z[za0001].Totals {
// map header, size 2
// map header, size 3
// string "Total"
o = append(o, 0x82, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
o = append(o, 0x83, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
o = msgp.AppendInt64(o, z[za0001].Totals[za0002].Total)
// string "Size"
o = append(o, 0xa4, 0x53, 0x69, 0x7a, 0x65)
o = msgp.AppendInt64(o, z[za0001].Totals[za0002].Size)
// string "N"
o = append(o, 0xa1, 0x4e)
o = msgp.AppendInt64(o, z[za0001].Totals[za0002].N)
@ -310,7 +354,7 @@ func (z *LastMinuteLatencies) MarshalMsg(b []byte) (o []byte, err error) {
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *LastMinuteLatencies) UnmarshalMsg(bts []byte) (o []byte, err error) {
func (z *LastMinuteHistogram) UnmarshalMsg(bts []byte) (o []byte, err error) {
var zb0001 uint32
zb0001, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
@ -370,6 +414,12 @@ func (z *LastMinuteLatencies) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, za0001, "Totals", za0002, "Total")
return
}
case "Size":
z[za0001].Totals[za0002].Size, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, za0001, "Totals", za0002, "Size")
return
}
case "N":
z[za0001].Totals[za0002].N, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
@ -405,8 +455,8 @@ func (z *LastMinuteLatencies) UnmarshalMsg(bts []byte) (o []byte, err error) {
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *LastMinuteLatencies) Msgsize() (s int) {
s = msgp.ArrayHeaderSize + (sizeLastElemMarker * (16 + (60 * (9 + msgp.Int64Size + msgp.Int64Size)) + msgp.Int64Size))
func (z *LastMinuteHistogram) Msgsize() (s int) {
s = msgp.ArrayHeaderSize + (sizeLastElemMarker * (16 + (60 * (14 + msgp.Int64Size + msgp.Int64Size + msgp.Int64Size)) + msgp.Int64Size))
return
}
@ -460,6 +510,12 @@ func (z *lastMinuteLatency) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "Totals", za0001, "Total")
return
}
case "Size":
z.Totals[za0001].Size, err = dc.ReadInt64()
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, "Size")
return
}
case "N":
z.Totals[za0001].N, err = dc.ReadInt64()
if err != nil {
@ -506,9 +562,9 @@ func (z *lastMinuteLatency) EncodeMsg(en *msgp.Writer) (err error) {
return
}
for za0001 := range z.Totals {
// map header, size 2
// map header, size 3
// write "Total"
err = en.Append(0x82, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
err = en.Append(0x83, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
if err != nil {
return
}
@ -517,6 +573,16 @@ func (z *lastMinuteLatency) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "Totals", za0001, "Total")
return
}
// write "Size"
err = en.Append(0xa4, 0x53, 0x69, 0x7a, 0x65)
if err != nil {
return
}
err = en.WriteInt64(z.Totals[za0001].Size)
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, "Size")
return
}
// write "N"
err = en.Append(0xa1, 0x4e)
if err != nil {
@ -549,10 +615,13 @@ func (z *lastMinuteLatency) MarshalMsg(b []byte) (o []byte, err error) {
o = append(o, 0x82, 0xa6, 0x54, 0x6f, 0x74, 0x61, 0x6c, 0x73)
o = msgp.AppendArrayHeader(o, uint32(60))
for za0001 := range z.Totals {
// map header, size 2
// map header, size 3
// string "Total"
o = append(o, 0x82, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
o = append(o, 0x83, 0xa5, 0x54, 0x6f, 0x74, 0x61, 0x6c)
o = msgp.AppendInt64(o, z.Totals[za0001].Total)
// string "Size"
o = append(o, 0xa4, 0x53, 0x69, 0x7a, 0x65)
o = msgp.AppendInt64(o, z.Totals[za0001].Size)
// string "N"
o = append(o, 0xa1, 0x4e)
o = msgp.AppendInt64(o, z.Totals[za0001].N)
@ -613,6 +682,12 @@ func (z *lastMinuteLatency) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "Totals", za0001, "Total")
return
}
case "Size":
z.Totals[za0001].Size, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "Totals", za0001, "Size")
return
}
case "N":
z.Totals[za0001].N, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
@ -648,6 +723,6 @@ func (z *lastMinuteLatency) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *lastMinuteLatency) Msgsize() (s int) {
s = 1 + 7 + msgp.ArrayHeaderSize + (60 * (9 + msgp.Int64Size + msgp.Int64Size)) + 8 + msgp.Int64Size
s = 1 + 7 + msgp.ArrayHeaderSize + (60 * (14 + msgp.Int64Size + msgp.Int64Size + msgp.Int64Size)) + 8 + msgp.Int64Size
return
}

View File

@ -122,8 +122,8 @@ func BenchmarkDecodeAccElem(b *testing.B) {
}
}
func TestMarshalUnmarshalLastMinuteLatencies(t *testing.T) {
v := LastMinuteLatencies{}
func TestMarshalUnmarshalLastMinuteHistogram(t *testing.T) {
v := LastMinuteHistogram{}
bts, err := v.MarshalMsg(nil)
if err != nil {
t.Fatal(err)
@ -145,8 +145,8 @@ func TestMarshalUnmarshalLastMinuteLatencies(t *testing.T) {
}
}
func BenchmarkMarshalMsgLastMinuteLatencies(b *testing.B) {
v := LastMinuteLatencies{}
func BenchmarkMarshalMsgLastMinuteHistogram(b *testing.B) {
v := LastMinuteHistogram{}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
@ -154,8 +154,8 @@ func BenchmarkMarshalMsgLastMinuteLatencies(b *testing.B) {
}
}
func BenchmarkAppendMsgLastMinuteLatencies(b *testing.B) {
v := LastMinuteLatencies{}
func BenchmarkAppendMsgLastMinuteHistogram(b *testing.B) {
v := LastMinuteHistogram{}
bts := make([]byte, 0, v.Msgsize())
bts, _ = v.MarshalMsg(bts[0:0])
b.SetBytes(int64(len(bts)))
@ -166,8 +166,8 @@ func BenchmarkAppendMsgLastMinuteLatencies(b *testing.B) {
}
}
func BenchmarkUnmarshalLastMinuteLatencies(b *testing.B) {
v := LastMinuteLatencies{}
func BenchmarkUnmarshalLastMinuteHistogram(b *testing.B) {
v := LastMinuteHistogram{}
bts, _ := v.MarshalMsg(nil)
b.ReportAllocs()
b.SetBytes(int64(len(bts)))
@ -180,17 +180,17 @@ func BenchmarkUnmarshalLastMinuteLatencies(b *testing.B) {
}
}
func TestEncodeDecodeLastMinuteLatencies(t *testing.T) {
v := LastMinuteLatencies{}
func TestEncodeDecodeLastMinuteHistogram(t *testing.T) {
v := LastMinuteHistogram{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
m := v.Msgsize()
if buf.Len() > m {
t.Log("WARNING: TestEncodeDecodeLastMinuteLatencies Msgsize() is inaccurate")
t.Log("WARNING: TestEncodeDecodeLastMinuteHistogram Msgsize() is inaccurate")
}
vn := LastMinuteLatencies{}
vn := LastMinuteHistogram{}
err := msgp.Decode(&buf, &vn)
if err != nil {
t.Error(err)
@ -204,8 +204,8 @@ func TestEncodeDecodeLastMinuteLatencies(t *testing.T) {
}
}
func BenchmarkEncodeLastMinuteLatencies(b *testing.B) {
v := LastMinuteLatencies{}
func BenchmarkEncodeLastMinuteHistogram(b *testing.B) {
v := LastMinuteHistogram{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))
@ -218,8 +218,8 @@ func BenchmarkEncodeLastMinuteLatencies(b *testing.B) {
en.Flush()
}
func BenchmarkDecodeLastMinuteLatencies(b *testing.B) {
v := LastMinuteLatencies{}
func BenchmarkDecodeLastMinuteHistogram(b *testing.B) {
v := LastMinuteHistogram{}
var buf bytes.Buffer
msgp.Encode(&buf, &v)
b.SetBytes(int64(buf.Len()))

View File

@ -25,7 +25,8 @@ import (
"github.com/gorilla/mux"
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/logger"
policy "github.com/minio/pkg/bucket/policy"
"github.com/minio/minio/internal/pubsub"
"github.com/minio/pkg/bucket/policy"
)
func (api objectAPIHandlers) ListenNotificationHandler(w http.ResponseWriter, r *http.Request) {
@ -100,13 +101,14 @@ func (api objectAPIHandlers) ListenNotificationHandler(w http.ResponseWriter, r
pattern := event.NewPattern(prefix, suffix)
var eventNames []event.Name
var mask pubsub.Mask
for _, s := range values[peerRESTListenEvents] {
eventName, err := event.ParseName(s)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
mask.MergeMaskable(eventName)
eventNames = append(eventNames, eventName)
}
@ -123,11 +125,11 @@ func (api objectAPIHandlers) ListenNotificationHandler(w http.ResponseWriter, r
// Listen Publisher and peer-listen-client uses nonblocking send and hence does not wait for slow receivers.
// Use buffered channel to take care of burst sends or slow w.Write()
listenCh := make(chan interface{}, 4000)
listenCh := make(chan pubsub.Maskable, 4000)
peers, _ := newPeerRestClients(globalEndpoints)
listenFn := func(evI interface{}) bool {
err := globalHTTPListen.Subscribe(mask, listenCh, ctx.Done(), func(evI pubsub.Maskable) bool {
ev, ok := evI.(event.Event)
if !ok {
return false
@ -138,14 +140,11 @@ func (api objectAPIHandlers) ListenNotificationHandler(w http.ResponseWriter, r
}
}
return rulesMap.MatchSimple(ev.EventName, ev.S3.Object.Key)
}
err := globalHTTPListen.Subscribe(listenCh, ctx.Done(), listenFn)
})
if err != nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSlowDown), r.URL)
return
}
if bucketName != "" {
values.Set(peerRESTListenBucket, bucketName)
}
@ -173,7 +172,10 @@ func (api objectAPIHandlers) ListenNotificationHandler(w http.ResponseWriter, r
return
}
}
if len(listenCh) == 0 {
// Flush if nothing is queued
w.(http.Flusher).Flush()
}
case <-keepAliveTicker.C:
if _, err := w.Write([]byte(" ")); err != nil {
return

109
cmd/metrics-realtime.go Normal file
View File

@ -0,0 +1,109 @@
// Copyright (c) 2015-2022 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"time"
"github.com/minio/madmin-go"
)
func collectLocalMetrics(types madmin.MetricType, hosts map[string]struct{}) (m madmin.RealtimeMetrics) {
if types == madmin.MetricsNone {
return
}
if len(hosts) > 0 {
if _, ok := hosts[globalMinioAddr]; !ok {
return
}
}
if types.Contains(madmin.MetricsScanner) {
metrics := globalScannerMetrics.report()
m.Aggregated.Scanner = &metrics
}
if types.Contains(madmin.MetricsDisk) && !globalIsGateway {
m.Aggregated.Disk = collectDiskMetrics()
}
if types.Contains(madmin.MetricsOS) {
metrics := globalOSMetrics.report()
m.Aggregated.OS = &metrics
}
// Add types...
// ByHost is a shallow reference, so careful about sharing.
m.ByHost = map[string]madmin.Metrics{globalMinioAddr: m.Aggregated}
m.Hosts = append(m.Hosts, globalMinioAddr)
return m
}
func collectDiskMetrics() *madmin.DiskMetric {
objLayer := newObjectLayerFn()
disks := madmin.DiskMetric{
CollectedAt: time.Now(),
}
if objLayer == nil {
return nil
}
// only need Disks information in server mode.
storageInfo, errs := objLayer.LocalStorageInfo(GlobalContext)
for _, err := range errs {
if err != nil {
disks.Merge(&madmin.DiskMetric{NDisks: 1, Offline: 1})
}
}
for i, disk := range storageInfo.Disks {
if errs[i] != nil {
continue
}
var d madmin.DiskMetric
d.NDisks = 1
if disk.Healing {
d.Healing++
}
if disk.Metrics != nil {
d.LifeTimeOps = make(map[string]uint64, len(disk.Metrics.APICalls))
for k, v := range disk.Metrics.APICalls {
if v != 0 {
d.LifeTimeOps[k] = v
}
}
d.LastMinute.Operations = make(map[string]madmin.TimedAction, len(disk.Metrics.APICalls))
for k, v := range disk.Metrics.LastMinute {
if v.Count != 0 {
d.LastMinute.Operations[k] = v
}
}
}
disks.Merge(&d)
}
return &disks
}
func collectRemoteMetrics(ctx context.Context, types madmin.MetricType, hosts map[string]struct{}) (m madmin.RealtimeMetrics) {
if !globalIsDistErasure {
return
}
all := globalNotificationSys.GetMetrics(ctx, types, hosts)
for _, remote := range all {
m.Merge(&remote)
}
return m
}

View File

@ -1222,7 +1222,7 @@ func getScannerNodeMetrics() *MetricsGroup {
Help: "Total number of unique objects scanned since server start",
Type: counterMetric,
},
Value: float64(atomic.LoadUint64(&globalScannerStats.accTotalObjects)),
Value: float64(globalScannerMetrics.lifetime(scannerMetricScanObject)),
},
{
Description: MetricDescription{
@ -1232,7 +1232,7 @@ func getScannerNodeMetrics() *MetricsGroup {
Help: "Total number of object versions scanned since server start",
Type: counterMetric,
},
Value: float64(atomic.LoadUint64(&globalScannerStats.accTotalVersions)),
Value: float64(globalScannerMetrics.lifetime(scannerMetricApplyVersion)),
},
{
Description: MetricDescription{
@ -1242,7 +1242,7 @@ func getScannerNodeMetrics() *MetricsGroup {
Help: "Total number of directories scanned since server start",
Type: counterMetric,
},
Value: float64(atomic.LoadUint64(&globalScannerStats.accFolders)),
Value: float64(globalScannerMetrics.lifetime(scannerMetricScanFolder)),
},
{
Description: MetricDescription{
@ -1252,7 +1252,7 @@ func getScannerNodeMetrics() *MetricsGroup {
Help: "Total number of bucket scans started since server start",
Type: counterMetric,
},
Value: float64(atomic.LoadUint64(&globalScannerStats.bucketsStarted)),
Value: float64(globalScannerMetrics.lifetime(scannerMetricScanBucketDisk) + uint64(globalScannerMetrics.activeDisks())),
},
{
Description: MetricDescription{
@ -1262,7 +1262,7 @@ func getScannerNodeMetrics() *MetricsGroup {
Help: "Total number of bucket scans finished since server start",
Type: counterMetric,
},
Value: float64(atomic.LoadUint64(&globalScannerStats.bucketsFinished)),
Value: float64(globalScannerMetrics.lifetime(scannerMetricScanBucketDisk)),
},
{
Description: MetricDescription{
@ -1272,12 +1272,12 @@ func getScannerNodeMetrics() *MetricsGroup {
Help: "Total number of object versions checked for ilm actions since server start",
Type: counterMetric,
},
Value: float64(atomic.LoadUint64(&globalScannerStats.ilmChecks)),
Value: float64(globalScannerMetrics.lifetime(scannerMetricILM)),
},
}
for i := range globalScannerStats.actions {
for i := range globalScannerMetrics.actions {
action := lifecycle.Action(i)
v := atomic.LoadUint64(&globalScannerStats.actions[action])
v := globalScannerMetrics.lifetimeActions(action)
if v == 0 {
continue
}
@ -1861,11 +1861,10 @@ func getLocalDiskStorageMetrics() *MetricsGroup {
if disk.Metrics == nil {
continue
}
for apiName, latency := range disk.Metrics.APILatencies {
val := latency.(uint64)
for apiName, latency := range disk.Metrics.LastMinute {
metrics = append(metrics, Metric{
Description: getNodeDiskAPILatencyMD(),
Value: float64(val / 1000),
Value: float64(latency.Avg().Microseconds()),
VariableLabels: map[string]string{"disk": disk.DrivePath, "api": "storage." + apiName},
})
}

View File

@ -929,6 +929,38 @@ func (sys *NotificationSys) GetOSInfo(ctx context.Context) []madmin.OSInfo {
return reply
}
// GetMetrics - Get metrics from all peers.
func (sys *NotificationSys) GetMetrics(ctx context.Context, t madmin.MetricType, hosts map[string]struct{}) []madmin.RealtimeMetrics {
reply := make([]madmin.RealtimeMetrics, len(sys.peerClients))
g := errgroup.WithNErrs(len(sys.peerClients))
for index, client := range sys.peerClients {
if client == nil {
continue
}
host := client.host.String()
if len(hosts) > 0 {
if _, ok := hosts[host]; !ok {
continue
}
}
index := index
g.Go(func() error {
var err error
reply[index], err = sys.peerClients[index].GetMetrics(ctx, t)
return err
}, index)
}
for index, err := range g.Wait() {
if err != nil {
reply[index].Errors = []string{err.Error()}
}
}
return reply
}
// GetSysConfig - Get information about system config
// (only the config that are of concern to minio)
func (sys *NotificationSys) GetSysConfig(ctx context.Context) []madmin.SysConfig {
@ -1288,8 +1320,7 @@ func sendEvent(args eventArgs) {
if globalNotificationSys == nil {
return
}
if globalHTTPListen.NumSubscribers() > 0 {
if globalHTTPListen.NumSubscribers(args.EventName) > 0 {
globalHTTPListen.Publish(args.ToEvent(false))
}

View File

@ -20,10 +20,12 @@ package cmd
import (
"os"
"strings"
"sync/atomic"
"time"
"github.com/minio/madmin-go"
"github.com/minio/minio/internal/disk"
ioutilx "github.com/minio/minio/internal/ioutil"
)
//go:generate stringer -type=osMetric -trimprefix=osMetric $GOFILE
@ -41,33 +43,67 @@ const (
osMetricRemove
osMetricStat
osMetricAccess
osMetricCreate
osMetricReadDirent
osMetricFdatasync
osMetricSync
// .... add more
osMetricLast
)
var globalOSMetrics osMetrics
func init() {
// Inject metrics.
ioutilx.OsOpenFile = OpenFile
ioutilx.OpenFileDirectIO = OpenFileDirectIO
ioutilx.OsOpen = Open
}
type osMetrics struct {
// All fields must be accessed atomically and aligned.
operations [osMetricLast]uint64
latency [osMetricLast]lockedLastMinuteLatency
}
// time an os action.
func (o *osMetrics) time(s osMetric) func() {
startTime := time.Now()
return func() {
duration := time.Since(startTime)
atomic.AddUint64(&o.operations[s], 1)
o.latency[s].add(duration)
}
}
// incTime will increment time on metric s with a specific duration.
func (o *osMetrics) incTime(s osMetric, d time.Duration) {
atomic.AddUint64(&o.operations[s], 1)
o.latency[s].add(d)
}
func osTrace(s osMetric, startTime time.Time, duration time.Duration, path string) madmin.TraceInfo {
return madmin.TraceInfo{
TraceType: madmin.TraceOS,
Time: startTime,
NodeName: globalLocalNodeName,
FuncName: "os." + s.String(),
OSStats: madmin.TraceOSStats{
Duration: duration,
Path: path,
},
}
}
func updateOSMetrics(s osMetric, paths ...string) func() {
if globalTrace.NumSubscribers() == 0 {
return func() {}
if globalTrace.NumSubscribers(madmin.TraceOS) == 0 {
return globalOSMetrics.time(s)
}
startTime := time.Now()
return func() {
duration := time.Since(startTime)
globalOSMetrics.incTime(s, duration)
globalTrace.Publish(osTrace(s, startTime, duration, strings.Join(paths, " -> ")))
}
}
@ -133,3 +169,47 @@ func Stat(name string) (os.FileInfo, error) {
defer updateOSMetrics(osMetricStat, name)()
return os.Stat(name)
}
// Create captures time taken to call os.Create
func Create(name string) (*os.File, error) {
defer updateOSMetrics(osMetricCreate, name)()
return os.Create(name)
}
// Fdatasync captures time taken to call Fdatasync
func Fdatasync(f *os.File) error {
fn := ""
if f != nil {
fn = f.Name()
}
defer updateOSMetrics(osMetricFdatasync, fn)()
return disk.Fdatasync(f)
}
// report returns all os metrics.
func (o *osMetrics) report() madmin.OSMetrics {
var m madmin.OSMetrics
m.CollectedAt = time.Now()
m.LifeTimeOps = make(map[string]uint64, osMetricLast)
for i := osMetric(0); i < osMetricLast; i++ {
if n := atomic.LoadUint64(&o.operations[i]); n > 0 {
m.LifeTimeOps[i.String()] = n
}
}
if len(m.LifeTimeOps) == 0 {
m.LifeTimeOps = nil
}
m.LastMinute.Operations = make(map[string]madmin.TimedAction, osMetricLast)
for i := osMetric(0); i < osMetricLast; i++ {
lm := o.latency[i].total()
if lm.N > 0 {
m.LastMinute.Operations[i.String()] = lm.asTimedAction()
}
}
if len(m.LastMinute.Operations) == 0 {
m.LastMinute.Operations = nil
}
return m
}

View File

@ -153,5 +153,6 @@ func readDirWithOpts(dirPath string, opts readDirOpts) (entries []string, err er
func globalSync() {
// no-op not sure about plan9/solaris support for syscall support
defer globalOSMetrics.time(osMetricSync)
syscall.Sync()
}

View File

@ -122,7 +122,9 @@ func readDirFn(dirPath string, fn func(name string, typ os.FileMode) error) erro
for {
if boff >= nbuf {
boff = 0
stop := globalOSMetrics.time(osMetricReadDirent)
nbuf, err = syscall.ReadDirent(int(f.Fd()), buf)
stop()
if err != nil {
if isSysErrNotDir(err) {
return nil
@ -150,7 +152,7 @@ func readDirFn(dirPath string, fn func(name string, typ os.FileMode) error) erro
// support Dirent.Type and have DT_UNKNOWN (0) there
// instead.
if typ == unexpectedFileMode || typ&os.ModeSymlink == os.ModeSymlink {
fi, err := os.Stat(pathJoin(dirPath, string(name)))
fi, err := Stat(pathJoin(dirPath, string(name)))
if err != nil {
// It got deleted in the meantime, not found
// or returns too many symlinks ignore this
@ -203,7 +205,9 @@ func readDirWithOpts(dirPath string, opts readDirOpts) (entries []string, err er
for count != 0 {
if boff >= nbuf {
boff = 0
stop := globalOSMetrics.time(osMetricReadDirent)
nbuf, err = syscall.ReadDirent(int(f.Fd()), buf)
stop()
if err != nil {
if isSysErrNotDir(err) {
return nil, errFileNotFound
@ -266,5 +270,6 @@ func readDirWithOpts(dirPath string, opts readDirOpts) (entries []string, err er
}
func globalSync() {
defer globalOSMetrics.time(osMetricSync)
syscall.Sync()
}

View File

@ -35,7 +35,7 @@ func access(name string) error {
// the directory itself, if the dirPath doesn't exist this function doesn't return
// an error.
func readDirFn(dirPath string, filter func(name string, typ os.FileMode) error) error {
f, err := os.Open(dirPath)
f, err := Open(dirPath)
if err != nil {
if osErrToFileErr(err) == errFileNotFound {
return nil
@ -116,7 +116,7 @@ func readDirFn(dirPath string, filter func(name string, typ os.FileMode) error)
// Return N entries at the directory dirPath.
func readDirWithOpts(dirPath string, opts readDirOpts) (entries []string, err error) {
f, err := os.Open(dirPath)
f, err := Open(dirPath)
if err != nil {
return nil, osErrToFileErr(err)
}

View File

@ -18,12 +18,16 @@ func _() {
_ = x[osMetricRemove-7]
_ = x[osMetricStat-8]
_ = x[osMetricAccess-9]
_ = x[osMetricLast-10]
_ = x[osMetricCreate-10]
_ = x[osMetricReadDirent-11]
_ = x[osMetricFdatasync-12]
_ = x[osMetricSync-13]
_ = x[osMetricLast-14]
}
const _osMetric_name = "RemoveAllMkdirAllRenameOpenFileOpenOpenFileDirectIOLstatRemoveStatAccessLast"
const _osMetric_name = "RemoveAllMkdirAllRenameOpenFileOpenOpenFileDirectIOLstatRemoveStatAccessCreateReadDirentFdatasyncSyncLast"
var _osMetric_index = [...]uint8{0, 9, 17, 23, 31, 35, 51, 56, 62, 66, 72, 76}
var _osMetric_index = [...]uint8{0, 9, 17, 23, 31, 35, 51, 56, 62, 66, 72, 78, 88, 97, 101, 105}
func (i osMetric) String() string {
if i >= osMetric(len(_osMetric_index)-1) {

View File

@ -34,6 +34,7 @@ import (
"github.com/minio/minio/internal/http"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/pubsub"
"github.com/minio/minio/internal/rest"
xnet "github.com/minio/pkg/net"
"github.com/tinylib/msgp/msgp"
@ -193,6 +194,19 @@ func (client *peerRESTClient) GetMemInfo(ctx context.Context) (info madmin.MemIn
return info, err
}
// GetMetrics - fetch metrics from a remote node.
func (client *peerRESTClient) GetMetrics(ctx context.Context, t madmin.MetricType) (info madmin.RealtimeMetrics, err error) {
values := make(url.Values)
values.Set(peerRESTTypes, strconv.FormatUint(uint64(t), 10))
respBody, err := client.callWithContext(ctx, peerRESTMethodMetrics, values, nil, -1)
if err != nil {
return
}
defer http.DrainBody(respBody)
err = gob.NewDecoder(respBody).Decode(&info)
return info, err
}
// GetProcInfo - fetch MinIO process information for a remote node.
func (client *peerRESTClient) GetProcInfo(ctx context.Context) (info madmin.ProcInfo, err error) {
respBody, err := client.callWithContext(ctx, peerRESTMethodProcInfo, nil, nil, -1)
@ -532,14 +546,9 @@ func (client *peerRESTClient) LoadTransitionTierConfig(ctx context.Context) erro
return nil
}
func (client *peerRESTClient) doTrace(traceCh chan interface{}, doneCh <-chan struct{}, traceOpts madmin.ServiceTraceOpts) {
func (client *peerRESTClient) doTrace(traceCh chan<- pubsub.Maskable, doneCh <-chan struct{}, traceOpts madmin.ServiceTraceOpts) {
values := make(url.Values)
values.Set(peerRESTTraceErr, strconv.FormatBool(traceOpts.OnlyErrors))
values.Set(peerRESTTraceS3, strconv.FormatBool(traceOpts.S3))
values.Set(peerRESTTraceStorage, strconv.FormatBool(traceOpts.Storage))
values.Set(peerRESTTraceOS, strconv.FormatBool(traceOpts.OS))
values.Set(peerRESTTraceInternal, strconv.FormatBool(traceOpts.Internal))
values.Set(peerRESTTraceThreshold, traceOpts.Threshold.String())
traceOpts.AddParams(values)
// To cancel the REST request in case doneCh gets closed.
ctx, cancel := context.WithCancel(GlobalContext)
@ -578,7 +587,7 @@ func (client *peerRESTClient) doTrace(traceCh chan interface{}, doneCh <-chan st
}
}
func (client *peerRESTClient) doListen(listenCh chan interface{}, doneCh <-chan struct{}, v url.Values) {
func (client *peerRESTClient) doListen(listenCh chan<- pubsub.Maskable, doneCh <-chan struct{}, v url.Values) {
// To cancel the REST request in case doneCh gets closed.
ctx, cancel := context.WithCancel(GlobalContext)
@ -617,7 +626,7 @@ func (client *peerRESTClient) doListen(listenCh chan interface{}, doneCh <-chan
}
// Listen - listen on peers.
func (client *peerRESTClient) Listen(listenCh chan interface{}, doneCh <-chan struct{}, v url.Values) {
func (client *peerRESTClient) Listen(listenCh chan<- pubsub.Maskable, doneCh <-chan struct{}, v url.Values) {
go func() {
for {
client.doListen(listenCh, doneCh, v)
@ -633,7 +642,7 @@ func (client *peerRESTClient) Listen(listenCh chan interface{}, doneCh <-chan st
}
// Trace - send http trace request to peer nodes
func (client *peerRESTClient) Trace(traceCh chan interface{}, doneCh <-chan struct{}, traceOpts madmin.ServiceTraceOpts) {
func (client *peerRESTClient) Trace(traceCh chan<- pubsub.Maskable, doneCh <-chan struct{}, traceOpts madmin.ServiceTraceOpts) {
go func() {
for {
client.doTrace(traceCh, doneCh, traceOpts)
@ -648,7 +657,7 @@ func (client *peerRESTClient) Trace(traceCh chan interface{}, doneCh <-chan stru
}()
}
func (client *peerRESTClient) doConsoleLog(logCh chan interface{}, doneCh <-chan struct{}) {
func (client *peerRESTClient) doConsoleLog(logCh chan pubsub.Maskable, doneCh <-chan struct{}) {
// To cancel the REST request in case doneCh gets closed.
ctx, cancel := context.WithCancel(GlobalContext)
@ -686,7 +695,7 @@ func (client *peerRESTClient) doConsoleLog(logCh chan interface{}, doneCh <-chan
}
// ConsoleLog - sends request to peer nodes to get console logs
func (client *peerRESTClient) ConsoleLog(logCh chan interface{}, doneCh <-chan struct{}) {
func (client *peerRESTClient) ConsoleLog(logCh chan pubsub.Maskable, doneCh <-chan struct{}) {
go func() {
for {
client.doConsoleLog(logCh, doneCh)

View File

@ -18,7 +18,7 @@
package cmd
const (
peerRESTVersion = "v22" // Add bulk GetBucketStats
peerRESTVersion = "v23" // Added /metrics
peerRESTVersionPrefix = SlashSeparator + peerRESTVersion
peerRESTPrefix = minioReservedBucketPath + "/peer"
peerRESTPath = peerRESTPrefix + peerRESTVersionPrefix
@ -70,6 +70,7 @@ const (
peerRESTMethodGetLastDayTierStats = "/getlastdaytierstats"
peerRESTMethodDevNull = "/devnull"
peerRESTMethodNetperf = "/netperf"
peerRESTMethodMetrics = "/metrics"
)
const (
@ -84,16 +85,11 @@ const (
peerRESTSignal = "signal"
peerRESTSubSys = "sub-sys"
peerRESTProfiler = "profiler"
peerRESTTraceErr = "err"
peerRESTTraceInternal = "internal"
peerRESTTraceStorage = "storage"
peerRESTTraceS3 = "s3"
peerRESTTraceOS = "os"
peerRESTTraceThreshold = "threshold"
peerRESTSize = "size"
peerRESTConcurrent = "concurrent"
peerRESTDuration = "duration"
peerRESTStorageClass = "storage-class"
peerRESTTypes = "types"
peerRESTListenBucket = "bucket"
peerRESTListenPrefix = "prefix"

View File

@ -35,6 +35,7 @@ import (
b "github.com/minio/minio/internal/bucket/bandwidth"
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/pubsub"
"github.com/tinylib/msgp/msgp"
)
@ -411,6 +412,28 @@ func (s *peerRESTServer) GetMemInfoHandler(w http.ResponseWriter, r *http.Reques
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
}
// GetMetricsHandler - returns server metrics.
func (s *peerRESTServer) GetMetricsHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
var types madmin.MetricType
if t, _ := strconv.ParseUint(r.Form.Get(peerRESTTypes), 10, 64); t != 0 {
types = madmin.MetricType(t)
} else {
types = madmin.MetricsAll
}
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
info := collectLocalMetrics(types, nil)
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
}
// GetSysConfigHandler - returns system config information.
// (only the config that are of concern to minio)
func (s *peerRESTServer) GetSysConfigHandler(w http.ResponseWriter, r *http.Request) {
@ -853,26 +876,26 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) {
pattern := event.NewPattern(prefix, suffix)
var eventNames []event.Name
var mask pubsub.Mask
for _, ev := range values[peerRESTListenEvents] {
eventName, err := event.ParseName(ev)
if err != nil {
s.writeErrorResponse(w, err)
return
}
mask.MergeMaskable(eventName)
eventNames = append(eventNames, eventName)
}
rulesMap := event.NewRulesMap(eventNames, pattern, event.TargetID{ID: mustGetUUID()})
doneCh := make(chan struct{})
defer close(doneCh)
doneCh := r.Context().Done()
// Listen Publisher uses nonblocking publish and hence does not wait for slow subscribers.
// Use buffered channel to take care of burst sends or slow w.Write()
ch := make(chan interface{}, 2000)
ch := make(chan pubsub.Maskable, 2000)
listenFn := func(evI interface{}) bool {
err := globalHTTPListen.Subscribe(mask, ch, doneCh, func(evI pubsub.Maskable) bool {
ev, ok := evI.(event.Event)
if !ok {
return false
@ -883,14 +906,11 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) {
}
}
return rulesMap.MatchSimple(ev.EventName, ev.S3.Object.Key)
}
err := globalHTTPListen.Subscribe(ch, doneCh, listenFn)
})
if err != nil {
s.writeErrorResponse(w, err)
return
}
keepAliveTicker := time.NewTicker(500 * time.Millisecond)
defer keepAliveTicker.Stop()
@ -901,7 +921,10 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) {
if err := enc.Encode(ev); err != nil {
return
}
if len(ch) == 0 {
// Flush if nothing is queued
w.(http.Flusher).Flush()
}
case <-r.Context().Done():
return
case <-keepAliveTicker.C:
@ -913,23 +936,6 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) {
}
}
func extractTraceOptsFromPeerRequest(r *http.Request) (opts madmin.ServiceTraceOpts, err error) {
opts.S3 = r.Form.Get(peerRESTTraceS3) == "true"
opts.OS = r.Form.Get(peerRESTTraceOS) == "true"
opts.Storage = r.Form.Get(peerRESTTraceStorage) == "true"
opts.Internal = r.Form.Get(peerRESTTraceInternal) == "true"
opts.OnlyErrors = r.Form.Get(peerRESTTraceErr) == "true"
if t := r.Form.Get(peerRESTTraceThreshold); t != "" {
d, err := time.ParseDuration(t)
if err != nil {
return opts, err
}
opts.Threshold = d
}
return
}
// TraceHandler sends http trace messages back to peer rest client
func (s *peerRESTServer) TraceHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
@ -937,29 +943,27 @@ func (s *peerRESTServer) TraceHandler(w http.ResponseWriter, r *http.Request) {
return
}
traceOpts, err := extractTraceOptsFromPeerRequest(r)
var traceOpts madmin.ServiceTraceOpts
err := traceOpts.ParseParams(r)
if err != nil {
s.writeErrorResponse(w, errors.New("Invalid request"))
return
}
doneCh := make(chan struct{})
defer close(doneCh)
// Trace Publisher uses nonblocking publish and hence does not wait for slow subscribers.
// Use buffered channel to take care of burst sends or slow w.Write()
ch := make(chan interface{}, 2000)
traceFn := func(entry interface{}) bool {
return mustTrace(entry, traceOpts)
ch := make(chan pubsub.Maskable, 2000)
mask := pubsub.MaskFromMaskable(traceOpts.TraceTypes())
err = globalTrace.Subscribe(mask, ch, r.Context().Done(), func(entry pubsub.Maskable) bool {
if e, ok := entry.(madmin.TraceInfo); ok {
return shouldTrace(e, traceOpts)
}
err = globalTrace.Subscribe(ch, doneCh, traceFn)
return false
})
if err != nil {
s.writeErrorResponse(w, err)
return
}
keepAliveTicker := time.NewTicker(500 * time.Millisecond)
defer keepAliveTicker.Stop()
@ -970,7 +974,6 @@ func (s *peerRESTServer) TraceHandler(w http.ResponseWriter, r *http.Request) {
if err := enc.Encode(entry); err != nil {
return
}
w.(http.Flusher).Flush()
case <-r.Context().Done():
return
case <-keepAliveTicker.C:
@ -1042,13 +1045,12 @@ func (s *peerRESTServer) ConsoleLogHandler(w http.ResponseWriter, r *http.Reques
doneCh := make(chan struct{})
defer close(doneCh)
ch := make(chan interface{}, 2000)
err := globalConsoleSys.Subscribe(ch, doneCh, "", 0, string(logger.All), nil)
ch := make(chan pubsub.Maskable, 2000)
err := globalConsoleSys.Subscribe(ch, doneCh, "", 0, madmin.LogMaskAll, nil)
if err != nil {
s.writeErrorResponse(w, err)
return
}
keepAliveTicker := time.NewTicker(500 * time.Millisecond)
defer keepAliveTicker.Stop()
@ -1275,6 +1277,7 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodServerInfo).HandlerFunc(httpTraceHdrs(server.ServerInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodProcInfo).HandlerFunc(httpTraceHdrs(server.GetProcInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodMemInfo).HandlerFunc(httpTraceHdrs(server.GetMemInfoHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodMetrics).HandlerFunc(httpTraceHdrs(server.GetMetricsHandler)).Queries(restQueries(peerRESTTypes)...)
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSysErrors).HandlerFunc(httpTraceHdrs(server.GetSysErrorsHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSysServices).HandlerFunc(httpTraceHdrs(server.GetSysServicesHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSysConfig).HandlerFunc(httpTraceHdrs(server.GetSysConfigHandler))

View File

@ -0,0 +1,39 @@
// Code generated by "stringer -type=scannerMetric -trimprefix=scannerMetric data-scanner-metric.go"; DO NOT EDIT.
package cmd
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[scannerMetricReadMetadata-0]
_ = x[scannerMetricCheckMissing-1]
_ = x[scannerMetricSaveUsage-2]
_ = x[scannerMetricApplyAll-3]
_ = x[scannerMetricApplyVersion-4]
_ = x[scannerMetricTierObjSweep-5]
_ = x[scannerMetricHealCheck-6]
_ = x[scannerMetricILM-7]
_ = x[scannerMetricCheckReplication-8]
_ = x[scannerMetricYield-9]
_ = x[scannerMetricStartTrace-10]
_ = x[scannerMetricScanObject-11]
_ = x[scannerMetricLastRealtime-12]
_ = x[scannerMetricScanFolder-13]
_ = x[scannerMetricScanCycle-14]
_ = x[scannerMetricScanBucketDisk-15]
_ = x[scannerMetricLast-16]
}
const _scannerMetric_name = "ReadMetadataCheckMissingSaveUsageApplyAllApplyVersionTierObjSweepHealCheckILMCheckReplicationYieldStartTraceScanObjectLastRealtimeScanFolderScanCycleScanBucketDiskLast"
var _scannerMetric_index = [...]uint8{0, 12, 24, 33, 41, 53, 65, 74, 77, 93, 98, 108, 118, 130, 140, 149, 163, 167}
func (i scannerMetric) String() string {
if i >= scannerMetric(len(_scannerMetric_index)-1) {
return "scannerMetric(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _scannerMetric_name[_scannerMetric_index[i]:_scannerMetric_index[i+1]]
}

View File

@ -47,7 +47,7 @@ func printStartupMessage(apiEndpoints []string, err error) {
logger.Info(color.Bold("MinIO Object Storage Server"))
if err != nil {
if globalConsoleSys != nil {
globalConsoleSys.Send(fmt.Sprintf("Server startup failed with '%v', some features may be missing", err), string(logger.All))
globalConsoleSys.Send(fmt.Sprintf("Server startup failed with '%v', some features may be missing", err))
}
}

View File

@ -47,7 +47,7 @@ type DiskInfo struct {
// the number of calls of each API and the moving average of
// the duration of each API.
type DiskMetrics struct {
APILatencies map[string]uint64 `json:"apiLatencies,omitempty"`
LastMinute map[string]AccElem `json:"apiLatencies,omitempty"`
APICalls map[string]uint64 `json:"apiCalls,omitempty"`
}

View File

@ -291,35 +291,35 @@ func (z *DiskMetrics) DecodeMsg(dc *msgp.Reader) (err error) {
return
}
switch msgp.UnsafeString(field) {
case "APILatencies":
case "LastMinute":
var zb0002 uint32
zb0002, err = dc.ReadMapHeader()
if err != nil {
err = msgp.WrapError(err, "APILatencies")
err = msgp.WrapError(err, "LastMinute")
return
}
if z.APILatencies == nil {
z.APILatencies = make(map[string]uint64, zb0002)
} else if len(z.APILatencies) > 0 {
for key := range z.APILatencies {
delete(z.APILatencies, key)
if z.LastMinute == nil {
z.LastMinute = make(map[string]AccElem, zb0002)
} else if len(z.LastMinute) > 0 {
for key := range z.LastMinute {
delete(z.LastMinute, key)
}
}
for zb0002 > 0 {
zb0002--
var za0001 string
var za0002 uint64
var za0002 AccElem
za0001, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "APILatencies")
err = msgp.WrapError(err, "LastMinute")
return
}
za0002, err = dc.ReadUint64()
err = za0002.DecodeMsg(dc)
if err != nil {
err = msgp.WrapError(err, "APILatencies", za0001)
err = msgp.WrapError(err, "LastMinute", za0001)
return
}
z.APILatencies[za0001] = za0002
z.LastMinute[za0001] = za0002
}
case "APICalls":
var zb0003 uint32
@ -365,25 +365,25 @@ func (z *DiskMetrics) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable
func (z *DiskMetrics) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 2
// write "APILatencies"
err = en.Append(0x82, 0xac, 0x41, 0x50, 0x49, 0x4c, 0x61, 0x74, 0x65, 0x6e, 0x63, 0x69, 0x65, 0x73)
// write "LastMinute"
err = en.Append(0x82, 0xaa, 0x4c, 0x61, 0x73, 0x74, 0x4d, 0x69, 0x6e, 0x75, 0x74, 0x65)
if err != nil {
return
}
err = en.WriteMapHeader(uint32(len(z.APILatencies)))
err = en.WriteMapHeader(uint32(len(z.LastMinute)))
if err != nil {
err = msgp.WrapError(err, "APILatencies")
err = msgp.WrapError(err, "LastMinute")
return
}
for za0001, za0002 := range z.APILatencies {
for za0001, za0002 := range z.LastMinute {
err = en.WriteString(za0001)
if err != nil {
err = msgp.WrapError(err, "APILatencies")
err = msgp.WrapError(err, "LastMinute")
return
}
err = en.WriteUint64(za0002)
err = za0002.EncodeMsg(en)
if err != nil {
err = msgp.WrapError(err, "APILatencies", za0001)
err = msgp.WrapError(err, "LastMinute", za0001)
return
}
}
@ -416,12 +416,16 @@ func (z *DiskMetrics) EncodeMsg(en *msgp.Writer) (err error) {
func (z *DiskMetrics) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 2
// string "APILatencies"
o = append(o, 0x82, 0xac, 0x41, 0x50, 0x49, 0x4c, 0x61, 0x74, 0x65, 0x6e, 0x63, 0x69, 0x65, 0x73)
o = msgp.AppendMapHeader(o, uint32(len(z.APILatencies)))
for za0001, za0002 := range z.APILatencies {
// string "LastMinute"
o = append(o, 0x82, 0xaa, 0x4c, 0x61, 0x73, 0x74, 0x4d, 0x69, 0x6e, 0x75, 0x74, 0x65)
o = msgp.AppendMapHeader(o, uint32(len(z.LastMinute)))
for za0001, za0002 := range z.LastMinute {
o = msgp.AppendString(o, za0001)
o = msgp.AppendUint64(o, za0002)
o, err = za0002.MarshalMsg(o)
if err != nil {
err = msgp.WrapError(err, "LastMinute", za0001)
return
}
}
// string "APICalls"
o = append(o, 0xa8, 0x41, 0x50, 0x49, 0x43, 0x61, 0x6c, 0x6c, 0x73)
@ -451,35 +455,35 @@ func (z *DiskMetrics) UnmarshalMsg(bts []byte) (o []byte, err error) {
return
}
switch msgp.UnsafeString(field) {
case "APILatencies":
case "LastMinute":
var zb0002 uint32
zb0002, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "APILatencies")
err = msgp.WrapError(err, "LastMinute")
return
}
if z.APILatencies == nil {
z.APILatencies = make(map[string]uint64, zb0002)
} else if len(z.APILatencies) > 0 {
for key := range z.APILatencies {
delete(z.APILatencies, key)
if z.LastMinute == nil {
z.LastMinute = make(map[string]AccElem, zb0002)
} else if len(z.LastMinute) > 0 {
for key := range z.LastMinute {
delete(z.LastMinute, key)
}
}
for zb0002 > 0 {
var za0001 string
var za0002 uint64
var za0002 AccElem
zb0002--
za0001, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "APILatencies")
err = msgp.WrapError(err, "LastMinute")
return
}
za0002, bts, err = msgp.ReadUint64Bytes(bts)
bts, err = za0002.UnmarshalMsg(bts)
if err != nil {
err = msgp.WrapError(err, "APILatencies", za0001)
err = msgp.WrapError(err, "LastMinute", za0001)
return
}
z.APILatencies[za0001] = za0002
z.LastMinute[za0001] = za0002
}
case "APICalls":
var zb0003 uint32
@ -525,11 +529,11 @@ func (z *DiskMetrics) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *DiskMetrics) Msgsize() (s int) {
s = 1 + 13 + msgp.MapHeaderSize
if z.APILatencies != nil {
for za0001, za0002 := range z.APILatencies {
s = 1 + 11 + msgp.MapHeaderSize
if z.LastMinute != nil {
for za0001, za0002 := range z.LastMinute {
_ = za0002
s += msgp.StringPrefixSize + len(za0001) + msgp.Uint64Size
s += msgp.StringPrefixSize + len(za0001) + za0002.Msgsize()
}
}
s += 9 + msgp.MapHeaderSize

View File

@ -95,7 +95,7 @@ func getModTime(path string) (t time.Time, err error) {
// Version is minio non-standard, we will use minio binary's
// ModTime as release time.
fi, err := os.Stat(absPath)
fi, err := Stat(absPath)
if err != nil {
return t, fmt.Errorf("Unable to get ModTime of %s. %w", absPath, err)
}

View File

@ -54,6 +54,7 @@ import (
"github.com/minio/minio/internal/fips"
"github.com/minio/minio/internal/handlers"
xhttp "github.com/minio/minio/internal/http"
ioutilx "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/logger/message/audit"
"github.com/minio/minio/internal/rest"
@ -305,7 +306,7 @@ func startProfiler(profilerType string) (minioProfiler, error) {
return nil, err
}
fn := filepath.Join(dirPath, "cpu.out")
f, err := os.Create(fn)
f, err := Create(fn)
if err != nil {
return nil, err
}
@ -319,8 +320,8 @@ func startProfiler(profilerType string) (minioProfiler, error) {
if err != nil {
return nil, err
}
defer os.RemoveAll(dirPath)
return ioutil.ReadFile(fn)
defer RemoveAll(dirPath)
return ioutilx.ReadFile(fn)
}
case madmin.ProfilerCPUIO:
// at 10k or more goroutines fgprof is likely to become
@ -335,7 +336,7 @@ func startProfiler(profilerType string) (minioProfiler, error) {
return nil, err
}
fn := filepath.Join(dirPath, "cpuio.out")
f, err := os.Create(fn)
f, err := Create(fn)
if err != nil {
return nil, err
}
@ -349,8 +350,8 @@ func startProfiler(profilerType string) (minioProfiler, error) {
if err != nil {
return nil, err
}
defer os.RemoveAll(dirPath)
return ioutil.ReadFile(fn)
defer RemoveAll(dirPath)
return ioutilx.ReadFile(fn)
}
case madmin.ProfilerMEM:
runtime.GC()
@ -400,7 +401,7 @@ func startProfiler(profilerType string) (minioProfiler, error) {
return nil, err
}
fn := filepath.Join(dirPath, "trace.out")
f, err := os.Create(fn)
f, err := Create(fn)
if err != nil {
return nil, err
}
@ -415,8 +416,8 @@ func startProfiler(profilerType string) (minioProfiler, error) {
if err != nil {
return nil, err
}
defer os.RemoveAll(dirPath)
return ioutil.ReadFile(fn)
defer RemoveAll(dirPath)
return ioutilx.ReadFile(fn)
}
default:
return nil, errors.New("profiler type unknown")

View File

@ -86,11 +86,11 @@ func (p *xlStorageDiskIDCheck) getMetrics() DiskMetrics {
p.metricsCache.TTL = 100 * time.Millisecond
p.metricsCache.Update = func() (interface{}, error) {
diskMetric := DiskMetrics{
APILatencies: make(map[string]uint64, len(p.apiLatencies)),
LastMinute: make(map[string]AccElem, len(p.apiLatencies)),
APICalls: make(map[string]uint64, len(p.apiCalls)),
}
for i, v := range p.apiLatencies {
diskMetric.APILatencies[storageMetric(i).String()] = v.value()
diskMetric.LastMinute[storageMetric(i).String()] = v.total()
}
for i := range p.apiCalls {
diskMetric.APICalls[storageMetric(i).String()] = atomic.LoadUint64(&p.apiCalls[i])
@ -113,10 +113,18 @@ func (e *lockedLastMinuteLatency) add(value time.Duration) {
e.lastMinuteLatency.add(value)
}
func (e *lockedLastMinuteLatency) value() uint64 {
// addSize will add a duration and size.
func (e *lockedLastMinuteLatency) addSize(value time.Duration, sz int64) {
e.Lock()
defer e.Unlock()
return e.lastMinuteLatency.getAvgData().avg()
e.lastMinuteLatency.addSize(value, sz)
}
// total returns the total call count and latency for the last minute.
func (e *lockedLastMinuteLatency) total() AccElem {
e.Lock()
defer e.Unlock()
return e.lastMinuteLatency.getTotal()
}
func newXLStorageDiskIDCheck(storage *xlStorage) *xlStorageDiskIDCheck {
@ -508,17 +516,26 @@ func storageTrace(s storageMetric, startTime time.Time, duration time.Duration,
Time: startTime,
NodeName: globalLocalNodeName,
FuncName: "storage." + s.String(),
StorageStats: madmin.TraceStorageStats{
Duration: duration,
Path: path,
},
}
}
func scannerTrace(s scannerMetric, startTime time.Time, duration time.Duration, path string) madmin.TraceInfo {
return madmin.TraceInfo{
TraceType: madmin.TraceScanner,
Time: startTime,
NodeName: globalLocalNodeName,
FuncName: "scanner." + s.String(),
Duration: duration,
Path: path,
}
}
// Update storage metrics
func (p *xlStorageDiskIDCheck) updateStorageMetrics(s storageMetric, paths ...string) func(err *error) {
startTime := time.Now()
trace := globalTrace.NumSubscribers() > 0
trace := globalTrace.NumSubscribers(madmin.TraceStorage) > 0
return func(err *error) {
duration := time.Since(startTime)
@ -655,18 +672,9 @@ func (p *xlStorageDiskIDCheck) TrackDiskHealth(ctx context.Context, s storageMet
atomic.StoreInt64(&p.health.lastStarted, time.Now().UnixNano())
ctx = context.WithValue(ctx, healthDiskCtxKey{}, &healthDiskCtxValue{lastSuccess: &p.health.lastSuccess})
si := p.updateStorageMetrics(s, paths...)
t := time.Now()
var once sync.Once
return ctx, func(errp *error) {
once.Do(func() {
if false {
var ers string
if errp != nil {
err := *errp
ers = fmt.Sprint(err)
}
fmt.Println(time.Now().Format(time.RFC3339), "op", s, "took", time.Since(t), "result:", ers, "disk:", p.storage.String(), "path:", strings.Join(paths, "/"))
}
p.health.tokens <- struct{}{}
if errp != nil {
err := *errp

View File

@ -32,7 +32,6 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
@ -477,8 +476,12 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
// if no xl.meta/xl.json found, skip the file.
return sizeSummary{}, errSkipFile
}
stopFn := globalScannerMetrics.log(scannerMetricScanObject, s.diskPath, PathJoin(item.bucket, item.objectPath()))
defer stopFn()
doneSz := globalScannerMetrics.timeSize(scannerMetricReadMetadata)
buf, err := s.readMetadata(ctx, item.Path)
doneSz(len(buf))
if err != nil {
if intDataUpdateTracker.debug {
console.Debugf(color.Green("scannerBucket:")+" object path missing: %v: %w\n", item.Path, err)
@ -502,8 +505,11 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
if noTiers = globalTierConfigMgr.Empty(); !noTiers {
sizeS.tiers = make(map[string]tierStats)
}
atomic.AddUint64(&globalScannerStats.accTotalObjects, 1)
done := globalScannerMetrics.time(scannerMetricApplyAll)
fivs.Versions, err = item.applyVersionActions(ctx, objAPI, fivs.Versions)
done()
if err != nil {
if intDataUpdateTracker.debug {
console.Debugf(color.Green("scannerBucket:")+" applying version actions failed: %v: %w\n", item.Path, err)
@ -514,9 +520,10 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
versioned := vcfg != nil && vcfg.Versioned(item.objectPath())
for _, version := range fivs.Versions {
atomic.AddUint64(&globalScannerStats.accTotalVersions, 1)
oi := version.ToObjectInfo(item.bucket, item.objectPath(), versioned)
done = globalScannerMetrics.time(scannerMetricApplyVersion)
sz := item.applyActions(ctx, objAPI, oi, &sizeS)
done()
if oi.VersionID != "" && sz == oi.Size {
sizeS.versions++
}
@ -528,7 +535,6 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
// tracking deleted transitioned objects
switch {
case noTiers, oi.DeleteMarker, oi.TransitionedObject.FreeVersion:
continue
}
tier := minioHotTier
@ -541,7 +547,9 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates
// apply tier sweep action on free versions
for _, freeVersion := range fivs.FreeVersions {
oi := freeVersion.ToObjectInfo(item.bucket, item.objectPath(), versioned)
done = globalScannerMetrics.time(scannerMetricTierObjSweep)
item.applyTierObjSweep(ctx, objAPI, oi)
done()
}
return sizeS, nil
}, scanMode)
@ -1847,7 +1855,7 @@ func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSiz
}
defer func() {
disk.Fdatasync(w) // Only interested in flushing the size_t not mtime/atime
Fdatasync(w) // Only interested in flushing the size_t not mtime/atime
w.Close()
}()

12
go.mod
View File

@ -43,12 +43,12 @@ require (
github.com/lib/pq v1.10.4
github.com/miekg/dns v1.1.48
github.com/minio/cli v1.22.0
github.com/minio/console v0.19.0
github.com/minio/console v0.19.1-0.20220705203612-63e2793272dd
github.com/minio/csvparser v1.0.0
github.com/minio/dperf v0.4.2
github.com/minio/highwayhash v1.0.2
github.com/minio/kes v0.19.2
github.com/minio/madmin-go v1.3.20
github.com/minio/madmin-go v1.4.3
github.com/minio/minio-go/v7 v7.0.30
github.com/minio/pkg v1.1.26
github.com/minio/selfupdate v0.4.0
@ -67,6 +67,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.34.0
github.com/prometheus/procfs v0.7.3
github.com/rs/cors v1.7.0
github.com/rs/dnscache v0.0.0-20211102005908-e0241e321417
@ -82,7 +83,7 @@ require (
go.etcd.io/etcd/client/v3 v3.5.4
go.uber.org/atomic v1.9.0
go.uber.org/zap v1.21.0
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5
golang.org/x/sys v0.0.0-20220502124256-b6088ccd6cba
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65
@ -100,7 +101,6 @@ require (
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.2.0 // indirect
github.com/briandowns/spinner v1.18.1 // indirect
github.com/charmbracelet/bubbles v0.10.3 // indirect
github.com/charmbracelet/bubbletea v0.20.0 // indirect
github.com/charmbracelet/lipgloss v0.5.0 // indirect
@ -164,7 +164,7 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/minio/colorjson v1.0.2 // indirect
github.com/minio/filepath v1.0.0 // indirect
github.com/minio/mc v0.0.0-20220512134321-aa60a8db1e4d // indirect
github.com/minio/mc v0.0.0-20220705180830-01b87ecc02ff // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/mitchellh/mapstructure v1.4.3 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
@ -184,7 +184,6 @@ require (
github.com/posener/complete v1.2.3 // indirect
github.com/power-devops/perfstat v0.0.0-20220216144756-c35f1ee13d7c // indirect
github.com/pquerna/cachecontrol v0.1.0 // indirect
github.com/prometheus/common v0.34.0 // indirect
github.com/prometheus/prom2json v1.3.1 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/rivo/tview v0.0.0-20220216162559-96063d6082f3 // indirect
@ -220,5 +219,4 @@ require (
gopkg.in/h2non/filetype.v1 v1.0.5 // indirect
gopkg.in/ini.v1 v1.66.4 // indirect
gopkg.in/square/go-jose.v2 v2.5.1 // indirect
maze.io/x/duration v0.0.0-20160924141736-faac084b6075 // indirect
)

1237
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -71,7 +71,7 @@ func (opts Config) BitrotScanCycle() (d time.Duration) {
// Wait waits for IOCount to go down or max sleep to elapse before returning.
// usually used in healing paths to wait for specified amount of time to
// throttle healing.
func (opts Config) Wait(currentIO func() int, systemIO func() int) {
func (opts Config) Wait(currentIO func() int, activeListeners func() int) {
configMutex.RLock()
maxIO, maxWait := opts.IOCount, opts.Sleep
configMutex.RUnlock()
@ -87,7 +87,7 @@ func (opts Config) Wait(currentIO func() int, systemIO func() int) {
tmpMaxWait := maxWait
if currentIO != nil {
for currentIO() >= maxIO+systemIO() {
for currentIO() >= maxIO+activeListeners() {
if tmpMaxWait > 0 {
if tmpMaxWait < waitTick {
time.Sleep(tmpMaxWait)

View File

@ -17,6 +17,10 @@
package event
import (
"github.com/minio/madmin-go"
)
const (
// NamespaceFormat - namespace log format used in some event targets.
NamespaceFormat = "namespace"
@ -79,6 +83,12 @@ type Event struct {
ResponseElements map[string]string `json:"responseElements"`
S3 Metadata `json:"s3"`
Source Source `json:"source"`
Type madmin.TraceType `json:"-"`
}
// Mask returns the type as mask.
func (e Event) Mask() uint64 {
return e.EventName.Mask()
}
// Log represents event information for some event targets.

View File

@ -30,12 +30,12 @@ type Name int
// Values of event Name
const (
ObjectAccessedAll Name = 1 + iota
ObjectAccessedGet
// Single event types (does not require expansion)
ObjectAccessedGet Name = 1 + iota
ObjectAccessedGetRetention
ObjectAccessedGetLegalHold
ObjectAccessedHead
ObjectCreatedAll
ObjectCreatedCompleteMultipartUpload
ObjectCreatedCopy
ObjectCreatedPost
@ -44,12 +44,10 @@ const (
ObjectCreatedPutLegalHold
ObjectCreatedPutTagging
ObjectCreatedDeleteTagging
ObjectRemovedAll
ObjectRemovedDelete
ObjectRemovedDeleteMarkerCreated
BucketCreated
BucketRemoved
ObjectReplicationAll
ObjectReplicationFailed
ObjectReplicationComplete
ObjectReplicationMissedThreshold
@ -57,19 +55,28 @@ const (
ObjectReplicationNotTracked
ObjectRestorePostInitiated
ObjectRestorePostCompleted
ObjectRestorePostAll
ObjectTransitionAll
ObjectTransitionFailed
ObjectTransitionComplete
objectSingleTypesEnd
// Start Compound types that require expansion:
ObjectAccessedAll
ObjectCreatedAll
ObjectRemovedAll
ObjectReplicationAll
ObjectRestorePostAll
ObjectTransitionAll
)
// The number of single names should not exceed 64.
// This will break masking. Use bit 63 as extension.
var _ = uint64(1 << objectSingleTypesEnd)
// Expand - returns expanded values of abbreviated event type.
func (name Name) Expand() []Name {
switch name {
case BucketCreated:
return []Name{BucketCreated}
case BucketRemoved:
return []Name{BucketRemoved}
case ObjectAccessedAll:
return []Name{
ObjectAccessedGet, ObjectAccessedHead,
@ -110,6 +117,19 @@ func (name Name) Expand() []Name {
}
}
// Mask returns the type as mask.
// Compound "All" types are expanded.
func (name Name) Mask() uint64 {
if name < objectSingleTypesEnd {
return 1 << (name - 1)
}
var mask uint64
for _, n := range name.Expand() {
mask |= 1 << (n - 1)
}
return mask
}
// String - returns string representation of event type.
func (name Name) String() string {
switch name {

View File

@ -25,12 +25,21 @@ import (
"github.com/minio/minio/internal/disk"
)
var (
// OpenFileDirectIO allows overriding default function.
OpenFileDirectIO = disk.OpenFileDirectIO
// OsOpen allows overriding default function.
OsOpen = os.Open
// OsOpenFile allows overriding default function.
OsOpenFile = os.OpenFile
)
// ReadFileWithFileInfo reads the named file and returns the contents.
// A successful call returns err == nil, not err == EOF.
// Because ReadFile reads the whole file, it does not treat an EOF from Read
// as an error to be reported, additionall returns os.FileInfo
func ReadFileWithFileInfo(name string) ([]byte, fs.FileInfo, error) {
f, err := os.Open(name)
f, err := OsOpen(name)
if err != nil {
return nil, nil, err
}
@ -53,11 +62,11 @@ func ReadFileWithFileInfo(name string) ([]byte, fs.FileInfo, error) {
//
// passes NOATIME flag for reads on Unix systems to avoid atime updates.
func ReadFile(name string) ([]byte, error) {
f, err := disk.OpenFileDirectIO(name, readMode, 0o666)
f, err := OpenFileDirectIO(name, readMode, 0o666)
if err != nil {
// fallback if there is an error to read
// 'name' with O_DIRECT
f, err = os.OpenFile(name, readMode, 0o666)
f, err = OsOpenFile(name, readMode, 0o666)
if err != nil {
return nil, err
}

View File

@ -27,6 +27,7 @@ import (
"time"
"github.com/klauspost/compress/gzhttp"
"github.com/minio/madmin-go"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger/message/audit"
)
@ -234,8 +235,8 @@ func AuditLog(ctx context.Context, w http.ResponseWriter, r *http.Request, reqCl
// Send audit logs only to http targets.
for _, t := range auditTgts {
if err := t.Send(entry, string(All)); err != nil {
LogAlwaysIf(context.Background(), fmt.Errorf("event(%v) was not sent to Audit target (%v): %v", entry, t, err), All)
if err := t.Send(entry); err != nil {
LogAlwaysIf(context.Background(), fmt.Errorf("event(%v) was not sent to Audit target (%v): %v", entry, t, err), madmin.LogKindAll)
}
}
}

View File

@ -31,6 +31,7 @@ import (
"time"
"github.com/minio/highwayhash"
"github.com/minio/madmin-go"
"github.com/minio/minio-go/v7/pkg/set"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger/message/log"
@ -50,6 +51,10 @@ const (
InformationLvl Level = iota + 1
ErrorLvl
FatalLvl
Application = madmin.LogKindApplication
Minio = madmin.LogKindMinio
All = madmin.LogKindAll
)
var trimStrings []string
@ -65,16 +70,15 @@ var matchingFuncNames = [...]string{
}
func (level Level) String() string {
var lvlStr string
switch level {
case InformationLvl:
lvlStr = "INFO"
return "INFO"
case ErrorLvl:
lvlStr = "ERROR"
return "ERROR"
case FatalLvl:
lvlStr = "FATAL"
return "FATAL"
}
return lvlStr
return ""
}
// quietFlag: Hide startup messages if enabled
@ -237,18 +241,6 @@ func hashString(input string) string {
return hex.EncodeToString(hh.Sum(nil))
}
// Kind specifies the kind of error log
type Kind string
const (
// Minio errors
Minio Kind = "MINIO"
// Application errors
Application Kind = "APPLICATION"
// All errors
All Kind = "ALL"
)
// LogAlwaysIf prints a detailed error message during
// the execution of the server.
func LogAlwaysIf(ctx context.Context, err error, errKind ...interface{}) {
@ -279,10 +271,10 @@ func LogIf(ctx context.Context, err error, errKind ...interface{}) {
}
func errToEntry(ctx context.Context, err error, errKind ...interface{}) log.Entry {
logKind := string(Minio)
logKind := madmin.LogKindAll
if len(errKind) > 0 {
if ek, ok := errKind[0].(Kind); ok {
logKind = string(ek)
if ek, ok := errKind[0].(madmin.LogKind); ok {
logKind = ek
}
}
req := GetReqInfo(ctx)
@ -366,7 +358,7 @@ func consoleLogIf(ctx context.Context, err error, errKind ...interface{}) {
if consoleTgt != nil {
entry := errToEntry(ctx, err, errKind...)
consoleTgt.Send(entry, entry.LogKind)
consoleTgt.Send(entry)
}
}
@ -385,10 +377,10 @@ func logIf(ctx context.Context, err error, errKind ...interface{}) {
entry := errToEntry(ctx, err, errKind...)
// Iterate over all logger targets to send the log entry
for _, t := range systemTgts {
if err := t.Send(entry, entry.LogKind); err != nil {
if err := t.Send(entry); err != nil {
if consoleTgt != nil {
entry.Trace.Message = fmt.Sprintf("event(%#v) was not sent to Logger target (%#v): %#v", entry, t, err)
consoleTgt.Send(entry, entry.LogKind)
consoleTgt.Send(entry)
}
}
}

View File

@ -20,6 +20,8 @@ package log
import (
"strings"
"time"
"github.com/minio/madmin-go"
)
// ObjectVersion object version key/versionId
@ -54,7 +56,7 @@ type API struct {
type Entry struct {
DeploymentID string `json:"deploymentid,omitempty"`
Level string `json:"level"`
LogKind string `json:"errKind"`
LogKind madmin.LogKind `json:"errKind"`
Time time.Time `json:"time"`
API *API `json:"api,omitempty"`
RemoteHost string `json:"remotehost,omitempty"`
@ -73,9 +75,15 @@ type Info struct {
Err error `json:"-"`
}
// SendLog returns true if log pertains to node specified in args.
func (l Info) SendLog(node, logKind string) bool {
nodeFltr := (node == "" || strings.EqualFold(node, l.NodeName))
typeFltr := strings.EqualFold(logKind, "all") || strings.EqualFold(l.LogKind, logKind)
return nodeFltr && typeFltr
// Mask returns the mask based on the error level.
func (l Info) Mask() uint64 {
return l.LogKind.LogMask().Mask()
}
// SendLog returns true if log pertains to node specified in args.
func (l Info) SendLog(node string, logKind madmin.LogMask) bool {
if logKind.Contains(l.LogKind.LogMask()) {
return node == "" || strings.EqualFold(node, l.NodeName)
}
return false
}

View File

@ -203,7 +203,7 @@ func New(config Config) *Target {
}
// Send log message 'e' to http target.
func (h *Target) Send(entry interface{}, errKind string) error {
func (h *Target) Send(entry interface{}) error {
select {
case <-h.doneCh:
return nil

View File

@ -48,7 +48,7 @@ type Target struct {
}
// Send log message 'e' to kafka target.
func (h *Target) Send(entry interface{}, errKind string) error {
func (h *Target) Send(entry interface{}) error {
select {
case <-h.doneCh:
return nil

View File

@ -33,7 +33,7 @@ type Target interface {
Endpoint() string
Init() error
Cancel()
Send(entry interface{}, errKind string) error
Send(entry interface{}) error
Type() types.TargetType
}

66
internal/pubsub/mask.go Normal file
View File

@ -0,0 +1,66 @@
package pubsub
import (
"math"
"math/bits"
)
// Mask allows filtering by a bitset mask.
type Mask uint64
const (
// MaskAll is the mask for all entries.
MaskAll Mask = math.MaxUint64
)
// MaskFromMaskable extracts mask from an interface.
func MaskFromMaskable(m Maskable) Mask {
return Mask(m.Mask())
}
// Contains returns whether *all* flags in other is present in t.
func (t Mask) Contains(other Mask) bool {
return t&other == other
}
// Overlaps returns whether *any* flags in t overlaps with other.
func (t Mask) Overlaps(other Mask) bool {
return t&other != 0
}
// SingleType returns whether t has a single type set.
func (t Mask) SingleType() bool {
return bits.OnesCount64(uint64(t)) == 1
}
// FromUint64 will set a mask to the uint64 value.
func (t *Mask) FromUint64(m uint64) {
*t = Mask(m)
}
// Merge will merge other into t.
func (t *Mask) Merge(other Mask) {
*t |= other
}
// MergeMaskable will merge other into t.
func (t *Mask) MergeMaskable(other Maskable) {
*t |= Mask(other.Mask())
}
// SetIf will add other if b is true.
func (t *Mask) SetIf(b bool, other Mask) {
if b {
*t |= other
}
}
// Mask returns the mask as a uint64.
func (t Mask) Mask() uint64 {
return uint64(t)
}
// Maskable implementations must return their mask as a 64 bit uint.
type Maskable interface {
Mask() uint64
}

View File

@ -25,27 +25,31 @@ import (
// Sub - subscriber entity.
type Sub struct {
ch chan interface{}
filter func(entry interface{}) bool
ch chan Maskable
types Mask
filter func(entry Maskable) bool
}
// PubSub holds publishers and subscribers
type PubSub struct {
subs []*Sub
// atomics, keep at top:
types uint64
numSubscribers int32
maxSubscribers int32
// not atomics:
subs []*Sub
sync.RWMutex
}
// Publish message to the subscribers.
// Note that publish is always nob-blocking send so that we don't block on slow receivers.
// Hence receivers should use buffered channel so as not to miss the published events.
func (ps *PubSub) Publish(item interface{}) {
func (ps *PubSub) Publish(item Maskable) {
ps.RLock()
defer ps.RUnlock()
for _, sub := range ps.subs {
if sub.filter == nil || sub.filter(item) {
if sub.types.Contains(Mask(item.Mask())) && (sub.filter == nil || sub.filter(item)) {
select {
case sub.ch <- item:
default:
@ -55,38 +59,54 @@ func (ps *PubSub) Publish(item interface{}) {
}
// Subscribe - Adds a subscriber to pubsub system
func (ps *PubSub) Subscribe(subCh chan interface{}, doneCh <-chan struct{}, filter func(entry interface{}) bool) error {
func (ps *PubSub) Subscribe(mask Mask, subCh chan Maskable, doneCh <-chan struct{}, filter func(entry Maskable) bool) error {
totalSubs := atomic.AddInt32(&ps.numSubscribers, 1)
if ps.maxSubscribers > 0 && totalSubs > ps.maxSubscribers {
atomic.AddInt32(&ps.numSubscribers, -1)
return fmt.Errorf("the limit of `%d` subscribers is reached", ps.maxSubscribers)
}
ps.Lock()
defer ps.Unlock()
sub := &Sub{subCh, filter}
sub := &Sub{ch: subCh, types: mask, filter: filter}
ps.subs = append(ps.subs, sub)
// We hold a lock, so we are safe to update
combined := Mask(atomic.LoadUint64(&ps.types))
combined.Merge(mask)
atomic.StoreUint64(&ps.types, uint64(combined))
go func() {
<-doneCh
ps.Lock()
defer ps.Unlock()
var remainTypes Mask
for i, s := range ps.subs {
if s == sub {
ps.subs = append(ps.subs[:i], ps.subs[i+1:]...)
} else {
remainTypes.Merge(s.types)
}
}
atomic.StoreUint64(&ps.types, uint64(remainTypes))
atomic.AddInt32(&ps.numSubscribers, -1)
}()
return nil
}
// NumSubscribers returns the number of current subscribers
func (ps *PubSub) NumSubscribers() int32 {
// NumSubscribers returns the number of current subscribers,
// If t is non-nil, the type is checked against the active subscribed types,
// and 0 will be returned if nobody is subscribed for the type,
// otherwise the *total* number of subscribers is returned.
func (ps *PubSub) NumSubscribers(m Maskable) int32 {
if m != nil {
types := Mask(atomic.LoadUint64(&ps.types))
if !types.Overlaps(Mask(m.Mask())) {
return 0
}
}
return atomic.LoadInt32(&ps.numSubscribers)
}

View File

@ -25,51 +25,87 @@ import (
func TestSubscribe(t *testing.T) {
ps := New(2)
ch1 := make(chan interface{}, 1)
ch2 := make(chan interface{}, 1)
ch1 := make(chan Maskable, 1)
ch2 := make(chan Maskable, 1)
doneCh := make(chan struct{})
defer close(doneCh)
if err := ps.Subscribe(ch1, doneCh, nil); err != nil {
if err := ps.Subscribe(MaskAll, ch1, doneCh, nil); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if err := ps.Subscribe(ch2, doneCh, nil); err != nil {
if err := ps.Subscribe(MaskAll, ch2, doneCh, nil); err != nil {
t.Fatalf("unexpected error: %v", err)
}
ps.Lock()
defer ps.Unlock()
if len(ps.subs) != 2 || ps.NumSubscribers(nil) != 2 {
t.Fatalf("expected 2 subscribers")
}
}
func TestNumSubscribersMask(t *testing.T) {
ps := New(2)
ch1 := make(chan Maskable, 1)
ch2 := make(chan Maskable, 1)
doneCh := make(chan struct{})
defer close(doneCh)
if err := ps.Subscribe(1, ch1, doneCh, nil); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if err := ps.Subscribe(2, ch2, doneCh, nil); err != nil {
t.Fatalf("unexpected error: %v", err)
}
ps.Lock()
defer ps.Unlock()
if len(ps.subs) != 2 {
t.Fatalf("expected 2 subscribers")
}
if want, got := int32(2), ps.NumSubscribers(Mask(1)); got != want {
t.Fatalf("want %d subscribers, got %d", want, got)
}
if want, got := int32(2), ps.NumSubscribers(Mask(2)); got != want {
t.Fatalf("want %d subscribers, got %d", want, got)
}
if want, got := int32(2), ps.NumSubscribers(Mask(1|2)); got != want {
t.Fatalf("want %d subscribers, got %d", want, got)
}
if want, got := int32(2), ps.NumSubscribers(nil); got != want {
t.Fatalf("want %d subscribers, got %d", want, got)
}
if want, got := int32(0), ps.NumSubscribers(Mask(4)); got != want {
t.Fatalf("want %d subscribers, got %d", want, got)
}
}
func TestSubscribeExceedingLimit(t *testing.T) {
ps := New(2)
ch1 := make(chan interface{}, 1)
ch2 := make(chan interface{}, 1)
ch3 := make(chan interface{}, 1)
ch1 := make(chan Maskable, 1)
ch2 := make(chan Maskable, 1)
ch3 := make(chan Maskable, 1)
doneCh := make(chan struct{})
defer close(doneCh)
if err := ps.Subscribe(ch1, doneCh, nil); err != nil {
if err := ps.Subscribe(MaskAll, ch1, doneCh, nil); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if err := ps.Subscribe(ch2, doneCh, nil); err != nil {
if err := ps.Subscribe(MaskAll, ch2, doneCh, nil); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if err := ps.Subscribe(ch3, doneCh, nil); err == nil {
if err := ps.Subscribe(MaskAll, ch3, doneCh, nil); err == nil {
t.Fatalf("unexpected nil err")
}
}
func TestUnsubscribe(t *testing.T) {
ps := New(2)
ch1 := make(chan interface{}, 1)
ch2 := make(chan interface{}, 1)
ch1 := make(chan Maskable, 1)
ch2 := make(chan Maskable, 1)
doneCh1 := make(chan struct{})
doneCh2 := make(chan struct{})
if err := ps.Subscribe(ch1, doneCh1, nil); err != nil {
if err := ps.Subscribe(MaskAll, ch1, doneCh1, nil); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if err := ps.Subscribe(ch2, doneCh2, nil); err != nil {
if err := ps.Subscribe(MaskAll, ch2, doneCh2, nil); err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -84,40 +120,81 @@ func TestUnsubscribe(t *testing.T) {
close(doneCh2)
}
type maskString string
func (m maskString) Mask() uint64 {
return 1
}
func TestPubSub(t *testing.T) {
ps := New(1)
ch1 := make(chan interface{}, 1)
ch1 := make(chan Maskable, 1)
doneCh1 := make(chan struct{})
defer close(doneCh1)
if err := ps.Subscribe(ch1, doneCh1, func(entry interface{}) bool { return true }); err != nil {
if err := ps.Subscribe(MaskAll, ch1, doneCh1, func(entry Maskable) bool { return true }); err != nil {
t.Fatalf("unexpected error: %v", err)
}
val := "hello"
val := maskString("hello")
ps.Publish(val)
msg := <-ch1
if msg != "hello" {
if msg != val {
t.Fatalf(fmt.Sprintf("expected %s , found %s", val, msg))
}
}
func TestMultiPubSub(t *testing.T) {
ps := New(2)
ch1 := make(chan interface{}, 1)
ch2 := make(chan interface{}, 1)
ch1 := make(chan Maskable, 1)
ch2 := make(chan Maskable, 1)
doneCh := make(chan struct{})
defer close(doneCh)
if err := ps.Subscribe(ch1, doneCh, func(entry interface{}) bool { return true }); err != nil {
if err := ps.Subscribe(MaskAll, ch1, doneCh, func(entry Maskable) bool { return true }); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if err := ps.Subscribe(ch2, doneCh, func(entry interface{}) bool { return true }); err != nil {
if err := ps.Subscribe(MaskAll, ch2, doneCh, func(entry Maskable) bool { return true }); err != nil {
t.Fatalf("unexpected error: %v", err)
}
val := "hello"
val := maskString("hello")
ps.Publish(val)
msg1 := <-ch1
msg2 := <-ch2
if msg1 != "hello" && msg2 != "hello" {
if msg1 != val && msg2 != val {
t.Fatalf(fmt.Sprintf("expected both subscribers to have%s , found %s and %s", val, msg1, msg2))
}
}
func TestMultiPubSubMask(t *testing.T) {
ps := New(3)
ch1 := make(chan Maskable, 1)
ch2 := make(chan Maskable, 1)
ch3 := make(chan Maskable, 1)
doneCh := make(chan struct{})
defer close(doneCh)
// Mask matches maskString, should get result
if err := ps.Subscribe(Mask(1), ch1, doneCh, func(entry Maskable) bool { return true }); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Mask matches maskString, should get result
if err := ps.Subscribe(Mask(1|2), ch2, doneCh, func(entry Maskable) bool { return true }); err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Does NOT overlap maskString
if err := ps.Subscribe(Mask(2), ch3, doneCh, func(entry Maskable) bool { return true }); err != nil {
t.Fatalf("unexpected error: %v", err)
}
val := maskString("hello")
ps.Publish(val)
msg1 := <-ch1
msg2 := <-ch2
if msg1 != val && msg2 != val {
t.Fatalf(fmt.Sprintf("expected both subscribers to have%s , found %s and %s", val, msg1, msg2))
}
select {
case msg := <-ch3:
t.Fatalf(fmt.Sprintf("unexpect msg, f got %s", msg))
default:
}
}