minio/cmd/daily-sweeper.go

159 lines
4.1 KiB
Go

/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"math/rand"
"sync"
"time"
"github.com/minio/minio/cmd/logger"
)
// The list of modules listening for the daily listing of all objects
// such as the daily heal ops, disk usage and bucket lifecycle management.
var globalDailySweepListeners = make([]chan string, 0)
var globalDailySweepListenersMu = sync.Mutex{}
// Add a new listener to the daily objects listing
func registerDailySweepListener(ch chan string) {
globalDailySweepListenersMu.Lock()
defer globalDailySweepListenersMu.Unlock()
globalDailySweepListeners = append(globalDailySweepListeners, ch)
}
// Safe copy of globalDailySweepListeners content
func copyDailySweepListeners() []chan string {
globalDailySweepListenersMu.Lock()
defer globalDailySweepListenersMu.Unlock()
var listenersCopy = make([]chan string, len(globalDailySweepListeners))
copy(listenersCopy, globalDailySweepListeners)
return listenersCopy
}
var sweepTimeout = newDynamicTimeout(60*time.Second, time.Second)
// sweepRound will list all objects, having read quorum or not and
// feeds to all listeners, such as the background healing
func sweepRound(ctx context.Context, objAPI ObjectLayer) error {
// General lock so we avoid parallel daily sweep by different instances.
sweepLock := globalNSMutex.NewNSLock(ctx, "system", "daily-sweep")
if err := sweepLock.GetLock(sweepTimeout); err != nil {
return err
}
defer sweepLock.Unlock()
buckets, err := objAPI.ListBuckets(ctx)
if err != nil {
return err
}
// List all objects, having read quorum or not in all buckets
// and send them to all the registered sweep listeners
for _, bucket := range buckets {
// Send bucket names to all listeners
for _, l := range copyDailySweepListeners() {
l <- bucket.Name
}
marker := ""
for {
if globalHTTPServer != nil {
// Wait at max 10 minute for an inprogress request before proceeding to heal
waitCount := 600
// Any requests in progress, delay the heal.
for (globalHTTPServer.GetRequestCount() >= int32(globalXLSetCount*globalXLSetDriveCount)) &&
waitCount > 0 {
waitCount--
time.Sleep(1 * time.Second)
}
}
res, err := objAPI.ListObjectsHeal(ctx, bucket.Name, "", marker, "", 1000)
if err != nil {
continue
}
for _, obj := range res.Objects {
for _, l := range copyDailySweepListeners() {
l <- pathJoin(bucket.Name, obj.Name)
}
}
if !res.IsTruncated {
break
} else {
marker = res.NextMarker
}
}
}
return nil
}
// initDailySweeper creates a go-routine which will list all
// objects in all buckets in a daily basis
func initDailySweeper() {
go dailySweeper()
}
// List all objects in all buckets in a daily basis
func dailySweeper() {
var lastSweepTime time.Time
var objAPI ObjectLayer
var ctx = context.Background()
// Wait until the object layer is ready
for {
objAPI = newObjectLayerFn()
if objAPI == nil {
time.Sleep(time.Second)
continue
}
break
}
// Start with random sleep time, so as to avoid "synchronous checks" between servers
time.Sleep(time.Duration(rand.Float64() * float64(time.Hour)))
for {
if time.Since(lastSweepTime) < 30*24*time.Hour {
time.Sleep(time.Hour)
continue
}
err := sweepRound(ctx, objAPI)
if err != nil {
switch err.(type) {
// Unable to hold a lock means there is another
// instance doing the sweep round
case OperationTimedOut:
lastSweepTime = time.Now()
continue
}
logger.LogIf(ctx, err)
time.Sleep(time.Minute)
continue
}
lastSweepTime = time.Now()
}
}