diff --git a/cmd/auth-handler.go b/cmd/auth-handler.go index ee55d90c6..46dd67322 100644 --- a/cmd/auth-handler.go +++ b/cmd/auth-handler.go @@ -226,7 +226,7 @@ func getClaimsFromToken(r *http.Request) (map[string]interface{}, error) { if err != nil { // Base64 decoding fails, we should log to indicate // something is malforming the request sent by client. - logger.LogIf(context.Background(), err, logger.Application) + logger.LogIf(r.Context(), err, logger.Application) return nil, errAuthentication } claims.MapClaims[iampolicy.SessionPolicyName] = string(spBytes) @@ -246,7 +246,7 @@ func checkClaimsFromToken(r *http.Request, cred auth.Credentials) (map[string]in } claims, err := getClaimsFromToken(r) if err != nil { - return nil, toAPIErrorCode(context.Background(), err) + return nil, toAPIErrorCode(r.Context(), err) } return claims, ErrNone } @@ -460,7 +460,7 @@ func (a authHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { a.handler.ServeHTTP(w, r) return } - writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrSignatureVersionNotSupported), r.URL, guessIsBrowserReq(r)) + writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrSignatureVersionNotSupported), r.URL, guessIsBrowserReq(r)) } // isPutActionAllowed - check if PUT operation is allowed on the resource, this diff --git a/cmd/background-heal-ops.go b/cmd/background-heal-ops.go index 95a420357..e422ac02b 100644 --- a/cmd/background-heal-ops.go +++ b/cmd/background-heal-ops.go @@ -66,8 +66,7 @@ func waitForLowHTTPReq(tolerance int32) { } // Wait for heal requests and process them -func (h *healRoutine) run() { - ctx := context.Background() +func (h *healRoutine) run(ctx context.Context, objAPI ObjectLayer) { for { select { case task, ok := <-h.tasks: @@ -83,18 +82,18 @@ func (h *healRoutine) run() { bucket, object := path2BucketObject(task.path) switch { case bucket == "" && object == "": - res, err = bgHealDiskFormat(ctx, task.opts) + res, err = healDiskFormat(ctx, objAPI, task.opts) case bucket != "" && object == "": - res, err = bgHealBucket(ctx, bucket, task.opts) + res, err = objAPI.HealBucket(ctx, bucket, task.opts.DryRun, task.opts.Remove) case bucket != "" && object != "": - res, err = bgHealObject(ctx, bucket, object, task.opts) + res, err = objAPI.HealObject(ctx, bucket, object, task.opts) } if task.responseCh != nil { task.responseCh <- healResult{result: res, err: err} } case <-h.doneCh: return - case <-GlobalServiceDoneCh: + case <-ctx.Done(): return } } @@ -108,22 +107,10 @@ func initHealRoutine() *healRoutine { } -func startBackgroundHealing() { - ctx := context.Background() - - var objAPI ObjectLayer - for { - objAPI = newObjectLayerWithoutSafeModeFn() - if objAPI == nil { - time.Sleep(time.Second) - continue - } - break - } - +func startBackgroundHealing(ctx context.Context, objAPI ObjectLayer) { // Run the background healer globalBackgroundHealRoutine = initHealRoutine() - go globalBackgroundHealRoutine.run() + go globalBackgroundHealRoutine.run(ctx, objAPI) // Launch the background healer sequence to track // background healing operations @@ -133,20 +120,14 @@ func startBackgroundHealing() { globalBackgroundHealState.LaunchNewHealSequence(nh) } -func initBackgroundHealing() { - go startBackgroundHealing() +func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) { + go startBackgroundHealing(ctx, objAPI) } -// bgHealDiskFormat - heals format.json, return value indicates if a +// healDiskFormat - heals format.json, return value indicates if a // failure error occurred. -func bgHealDiskFormat(ctx context.Context, opts madmin.HealOpts) (madmin.HealResultItem, error) { - // Get current object layer instance. - objectAPI := newObjectLayerWithoutSafeModeFn() - if objectAPI == nil { - return madmin.HealResultItem{}, errServerNotInitialized - } - - res, err := objectAPI.HealFormat(ctx, opts.DryRun) +func healDiskFormat(ctx context.Context, objAPI ObjectLayer, opts madmin.HealOpts) (madmin.HealResultItem, error) { + res, err := objAPI.HealFormat(ctx, opts.DryRun) // return any error, ignore error returned when disks have // already healed. @@ -167,24 +148,3 @@ func bgHealDiskFormat(ctx context.Context, opts madmin.HealOpts) (madmin.HealRes return res, nil } - -// bghealBucket - traverses and heals given bucket -func bgHealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) { - // Get current object layer instance. - objectAPI := newObjectLayerWithoutSafeModeFn() - if objectAPI == nil { - return madmin.HealResultItem{}, errServerNotInitialized - } - - return objectAPI.HealBucket(ctx, bucket, opts.DryRun, opts.Remove) -} - -// bgHealObject - heal the given object and record result -func bgHealObject(ctx context.Context, bucket, object string, opts madmin.HealOpts) (madmin.HealResultItem, error) { - // Get current object layer instance. - objectAPI := newObjectLayerWithoutSafeModeFn() - if objectAPI == nil { - return madmin.HealResultItem{}, errServerNotInitialized - } - return objectAPI.HealObject(ctx, bucket, object, opts) -} diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index 91156c713..aa8fb45d1 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -25,32 +25,19 @@ import ( const defaultMonitorNewDiskInterval = time.Minute * 10 -func initLocalDisksAutoHeal() { - go monitorLocalDisksAndHeal() +func initLocalDisksAutoHeal(ctx context.Context, objAPI ObjectLayer) { + go monitorLocalDisksAndHeal(ctx, objAPI) } // monitorLocalDisksAndHeal - ensures that detected new disks are healed // 1. Only the concerned erasure set will be listed and healed // 2. Only the node hosting the disk is responsible to perform the heal -func monitorLocalDisksAndHeal() { - // Wait until the object layer is ready - var objAPI ObjectLayer - for { - objAPI = newObjectLayerWithoutSafeModeFn() - if objAPI == nil { - time.Sleep(time.Second) - continue - } - break - } - +func monitorLocalDisksAndHeal(ctx context.Context, objAPI ObjectLayer) { z, ok := objAPI.(*xlZones) if !ok { return } - ctx := context.Background() - var bgSeq *healSequence var found bool @@ -64,64 +51,67 @@ func monitorLocalDisksAndHeal() { // Perform automatic disk healing when a disk is replaced locally. for { - time.Sleep(defaultMonitorNewDiskInterval) - - // Attempt a heal as the server starts-up first. - localDisksInZoneHeal := make([]Endpoints, len(z.zones)) - for i, ep := range globalEndpoints { - localDisksToHeal := Endpoints{} - for _, endpoint := range ep.Endpoints { - if !endpoint.IsLocal { + select { + case <-ctx.Done(): + return + case <-time.After(defaultMonitorNewDiskInterval): + // Attempt a heal as the server starts-up first. + localDisksInZoneHeal := make([]Endpoints, len(z.zones)) + for i, ep := range globalEndpoints { + localDisksToHeal := Endpoints{} + for _, endpoint := range ep.Endpoints { + if !endpoint.IsLocal { + continue + } + // Try to connect to the current endpoint + // and reformat if the current disk is not formatted + _, _, err := connectEndpoint(endpoint) + if err == errUnformattedDisk { + localDisksToHeal = append(localDisksToHeal, endpoint) + } + } + if len(localDisksToHeal) == 0 { continue } - // Try to connect to the current endpoint - // and reformat if the current disk is not formatted - _, _, err := connectEndpoint(endpoint) - if err == errUnformattedDisk { - localDisksToHeal = append(localDisksToHeal, endpoint) - } + localDisksInZoneHeal[i] = localDisksToHeal } - if len(localDisksToHeal) == 0 { - continue - } - localDisksInZoneHeal[i] = localDisksToHeal - } - // Reformat disks - bgSeq.sourceCh <- SlashSeparator + // Reformat disks + bgSeq.sourceCh <- SlashSeparator - // Ensure that reformatting disks is finished - bgSeq.sourceCh <- nopHeal + // Ensure that reformatting disks is finished + bgSeq.sourceCh <- nopHeal - var erasureSetInZoneToHeal = make([][]int, len(localDisksInZoneHeal)) - // Compute the list of erasure set to heal - for i, localDisksToHeal := range localDisksInZoneHeal { - var erasureSetToHeal []int - for _, endpoint := range localDisksToHeal { - // Load the new format of this passed endpoint - _, format, err := connectEndpoint(endpoint) - if err != nil { - logger.LogIf(ctx, err) - continue + var erasureSetInZoneToHeal = make([][]int, len(localDisksInZoneHeal)) + // Compute the list of erasure set to heal + for i, localDisksToHeal := range localDisksInZoneHeal { + var erasureSetToHeal []int + for _, endpoint := range localDisksToHeal { + // Load the new format of this passed endpoint + _, format, err := connectEndpoint(endpoint) + if err != nil { + logger.LogIf(ctx, err) + continue + } + // Calculate the set index where the current endpoint belongs + setIndex, _, err := findDiskIndex(z.zones[i].format, format) + if err != nil { + logger.LogIf(ctx, err) + continue + } + + erasureSetToHeal = append(erasureSetToHeal, setIndex) } - // Calculate the set index where the current endpoint belongs - setIndex, _, err := findDiskIndex(z.zones[i].format, format) - if err != nil { - logger.LogIf(ctx, err) - continue - } - - erasureSetToHeal = append(erasureSetToHeal, setIndex) + erasureSetInZoneToHeal[i] = erasureSetToHeal } - erasureSetInZoneToHeal[i] = erasureSetToHeal - } - // Heal all erasure sets that need - for i, erasureSetToHeal := range erasureSetInZoneToHeal { - for _, setIndex := range erasureSetToHeal { - err := healErasureSet(ctx, setIndex, z.zones[i].sets[setIndex]) - if err != nil { - logger.LogIf(ctx, err) + // Heal all erasure sets that need + for i, erasureSetToHeal := range erasureSetInZoneToHeal { + for _, setIndex := range erasureSetToHeal { + err := healErasureSet(ctx, setIndex, z.zones[i].sets[setIndex]) + if err != nil { + logger.LogIf(ctx, err) + } } } } diff --git a/cmd/daily-lifecycle-ops.go b/cmd/daily-lifecycle-ops.go index 23dc1b140..3868579de 100644 --- a/cmd/daily-lifecycle-ops.go +++ b/cmd/daily-lifecycle-ops.go @@ -30,76 +30,30 @@ const ( bgLifecycleTick = time.Hour ) -type lifecycleOps struct { - LastActivity time.Time -} - -// Register to the daily objects listing -var globalLifecycleOps = &lifecycleOps{} - -func getLocalBgLifecycleOpsStatus() BgLifecycleOpsStatus { - return BgLifecycleOpsStatus{ - LastActivity: globalLifecycleOps.LastActivity, - } -} - // initDailyLifecycle starts the routine that receives the daily // listing of all objects and applies any matching bucket lifecycle // rules. -func initDailyLifecycle() { - go startDailyLifecycle() +func initDailyLifecycle(ctx context.Context, objAPI ObjectLayer) { + go startDailyLifecycle(ctx, objAPI) } -func startDailyLifecycle() { - var objAPI ObjectLayer - var ctx = context.Background() - - // Wait until the object API is ready +func startDailyLifecycle(ctx context.Context, objAPI ObjectLayer) { for { - objAPI = newObjectLayerWithoutSafeModeFn() - if objAPI == nil { - time.Sleep(time.Second) - continue - } - break - } - - // Calculate the time of the last lifecycle operation in all peers node of the cluster - computeLastLifecycleActivity := func(status []BgOpsStatus) time.Time { - var lastAct time.Time - for _, st := range status { - if st.LifecycleOps.LastActivity.After(lastAct) { - lastAct = st.LifecycleOps.LastActivity + select { + case <-ctx.Done(): + return + case <-time.NewTimer(bgLifecycleInterval).C: + // Perform one lifecycle operation + err := lifecycleRound(ctx, objAPI) + switch err.(type) { + case OperationTimedOut: + // Unable to hold a lock means there is another + // caller holding write lock, ignore and try next round. + continue + default: + logger.LogIf(ctx, err) } } - return lastAct - } - - for { - // Check if we should perform lifecycle ops based on the last lifecycle activity, sleep one hour otherwise - allLifecycleStatus := []BgOpsStatus{ - {LifecycleOps: getLocalBgLifecycleOpsStatus()}, - } - if globalIsDistXL { - allLifecycleStatus = append(allLifecycleStatus, globalNotificationSys.BackgroundOpsStatus()...) - } - lastAct := computeLastLifecycleActivity(allLifecycleStatus) - if !lastAct.IsZero() && time.Since(lastAct) < bgLifecycleInterval { - time.Sleep(bgLifecycleTick) - } - - // Perform one lifecycle operation - err := lifecycleRound(ctx, objAPI) - switch err.(type) { - // Unable to hold a lock means there is another - // instance doing the lifecycle round round - case OperationTimedOut: - time.Sleep(bgLifecycleTick) - default: - logger.LogIf(ctx, err) - time.Sleep(time.Minute) - continue - } } } @@ -107,13 +61,6 @@ func startDailyLifecycle() { var lifecycleLockTimeout = newDynamicTimeout(60*time.Second, time.Second) func lifecycleRound(ctx context.Context, objAPI ObjectLayer) error { - // Lock to avoid concurrent lifecycle ops from other nodes - sweepLock := objAPI.NewNSLock(ctx, "system", "daily-lifecycle-ops") - if err := sweepLock.GetLock(lifecycleLockTimeout); err != nil { - return err - } - defer sweepLock.Unlock() - buckets, err := objAPI.ListBuckets(ctx) if err != nil { return err diff --git a/cmd/data-usage.go b/cmd/data-usage.go index c18860b78..9b9e95e83 100644 --- a/cmd/data-usage.go +++ b/cmd/data-usage.go @@ -49,50 +49,19 @@ const ( ) // initDataUsageStats will start the crawler unless disabled. -func initDataUsageStats() { +func initDataUsageStats(ctx context.Context, objAPI ObjectLayer) { if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOn { - go runDataUsageInfoUpdateRoutine() + go runDataUsageInfo(ctx, objAPI) } } -// runDataUsageInfoUpdateRoutine will contain the main crawler. -func runDataUsageInfoUpdateRoutine() { - // Wait until the object layer is ready - var objAPI ObjectLayer - for { - objAPI = newObjectLayerWithoutSafeModeFn() - if objAPI == nil { - time.Sleep(time.Second) - continue - } - break - } - - runDataUsageInfo(GlobalContext, objAPI) -} - -var dataUsageLockTimeout = lifecycleLockTimeout - func runDataUsageInfo(ctx context.Context, objAPI ObjectLayer) { - // Make sure only 1 crawler is running on the cluster. - locker := objAPI.NewNSLock(ctx, minioMetaBucket, "leader-data-usage-info") - for { - err := locker.GetLock(dataUsageLockTimeout) - if err != nil { - time.Sleep(5 * time.Minute) - continue - } - // Break without unlocking, this node will acquire - // data usage calculator role for its lifetime. - break - } for { select { case <-ctx.Done(): - locker.Unlock() return - // Wait before starting next cycle and wait on startup. case <-time.NewTimer(dataUsageStartDelay).C: + // Wait before starting next cycle and wait on startup. results := make(chan DataUsageInfo, 1) go storeDataUsageInBackend(ctx, objAPI, results) err := objAPI.CrawlAndGetDataUsage(ctx, results) diff --git a/cmd/disk-cache-backend.go b/cmd/disk-cache-backend.go index 04db0790a..54987d501 100644 --- a/cmd/disk-cache-backend.go +++ b/cmd/disk-cache-backend.go @@ -229,7 +229,7 @@ func (c *diskCache) toClear() uint64 { } // Purge cache entries that were not accessed. -func (c *diskCache) purge(ctx context.Context, doneCh <-chan struct{}) { +func (c *diskCache) purge(ctx context.Context) { if c.diskUsageLow() { return } diff --git a/cmd/disk-cache.go b/cmd/disk-cache.go index a4084f13c..85127e010 100644 --- a/cmd/disk-cache.go +++ b/cmd/disk-cache.go @@ -689,17 +689,17 @@ func newServerCacheObjects(ctx context.Context, config cache.Config) (CacheObjec if migrateSw { go c.migrateCacheFromV1toV2(ctx) } - go c.gc(ctx, GlobalServiceDoneCh) + go c.gc(ctx) return c, nil } -func (c *cacheObjects) gc(ctx context.Context, doneCh <-chan struct{}) { +func (c *cacheObjects) gc(ctx context.Context) { ticker := time.NewTicker(cacheGCInterval) defer ticker.Stop() for { select { - case <-doneCh: + case <-ctx.Done(): return case <-ticker.C: if c.migrating { @@ -714,7 +714,7 @@ func (c *cacheObjects) gc(ctx context.Context, doneCh <-chan struct{}) { go func(d *diskCache) { defer wg.Done() d.resetGCCounter() - d.purge(ctx, doneCh) + d.purge(ctx) }(dcache) } wg.Wait() diff --git a/cmd/gateway-main.go b/cmd/gateway-main.go index 5a4d80eb2..c7df05659 100644 --- a/cmd/gateway-main.go +++ b/cmd/gateway-main.go @@ -267,7 +267,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) { if globalCacheConfig.Enabled { // initialize the new disk cache objects. var cacheAPI CacheObjectLayer - cacheAPI, err = newServerCacheObjects(context.Background(), globalCacheConfig) + cacheAPI, err = newServerCacheObjects(GlobalContext, globalCacheConfig) logger.FatalIf(err, "Unable to initialize disk caching") globalObjLayerMutex.Lock() @@ -277,7 +277,7 @@ func StartGateway(ctx *cli.Context, gw Gateway) { // Populate existing buckets to the etcd backend if globalDNSConfig != nil { - buckets, err := newObject.ListBuckets(context.Background()) + buckets, err := newObject.ListBuckets(GlobalContext) if err != nil { logger.Fatal(err, "Unable to list buckets") } diff --git a/cmd/generic-handlers.go b/cmd/generic-handlers.go index ae9b2b5ec..a8f7c208e 100644 --- a/cmd/generic-handlers.go +++ b/cmd/generic-handlers.go @@ -87,7 +87,7 @@ func setRequestHeaderSizeLimitHandler(h http.Handler) http.Handler { // of the user-defined metadata to 2 KB. func (h requestHeaderSizeLimitHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if isHTTPHeaderSizeTooLarge(r.Header) { - writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrMetadataTooLarge), r.URL, guessIsBrowserReq(r)) + writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrMetadataTooLarge), r.URL, guessIsBrowserReq(r)) return } h.Handler.ServeHTTP(w, r) @@ -130,7 +130,7 @@ func filterReservedMetadata(h http.Handler) http.Handler { // would be treated as metadata. func (h reservedMetadataHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if containsReservedMetadata(r.Header) { - writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrUnsupportedMetadata), r.URL, guessIsBrowserReq(r)) + writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrUnsupportedMetadata), r.URL, guessIsBrowserReq(r)) return } h.Handler.ServeHTTP(w, r) @@ -371,14 +371,14 @@ func (h timeValidityHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // All our internal APIs are sensitive towards Date // header, for all requests where Date header is not // present we will reject such clients. - writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(errCode), r.URL, guessIsBrowserReq(r)) + writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(errCode), r.URL, guessIsBrowserReq(r)) return } // Verify if the request date header is shifted by less than globalMaxSkewTime parameter in the past // or in the future, reject request otherwise. curTime := UTCNow() if curTime.Sub(amzDate) > globalMaxSkewTime || amzDate.Sub(curTime) > globalMaxSkewTime { - writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrRequestTimeTooSkewed), r.URL, guessIsBrowserReq(r)) + writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrRequestTimeTooSkewed), r.URL, guessIsBrowserReq(r)) return } } @@ -509,14 +509,14 @@ func (h resourceHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // If bucketName is present and not objectName check for bucket level resource queries. if bucketName != "" && objectName == "" { if ignoreNotImplementedBucketResources(r) { - writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r)) + writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r)) return } } // If bucketName and objectName are present check for its resource queries. if bucketName != "" && objectName != "" { if ignoreNotImplementedObjectResources(r) { - writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r)) + writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r)) return } } @@ -589,20 +589,20 @@ func hasMultipleAuth(r *http.Request) bool { func (h requestValidityHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Check for bad components in URL path. if hasBadPathComponent(r.URL.Path) { - writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrInvalidResourceName), r.URL, guessIsBrowserReq(r)) + writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrInvalidResourceName), r.URL, guessIsBrowserReq(r)) return } // Check for bad components in URL query values. for _, vv := range r.URL.Query() { for _, v := range vv { if hasBadPathComponent(v) { - writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrInvalidResourceName), r.URL, guessIsBrowserReq(r)) + writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrInvalidResourceName), r.URL, guessIsBrowserReq(r)) return } } } if hasMultipleAuth(r) { - writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL, guessIsBrowserReq(r)) + writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL, guessIsBrowserReq(r)) return } h.handler.ServeHTTP(w, r) @@ -648,10 +648,10 @@ func (f bucketForwardingHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques sr, err := globalDNSConfig.Get(bucket) if err != nil { if err == dns.ErrNoEntriesFound { - writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrNoSuchBucket), + writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrNoSuchBucket), r.URL, guessIsBrowserReq(r)) } else { - writeErrorResponse(context.Background(), w, toAPIError(context.Background(), err), + writeErrorResponse(r.Context(), w, toAPIError(r.Context(), err), r.URL, guessIsBrowserReq(r)) } return @@ -697,9 +697,9 @@ func (f bucketForwardingHandler) ServeHTTP(w http.ResponseWriter, r *http.Reques sr, err := globalDNSConfig.Get(bucket) if err != nil { if err == dns.ErrNoEntriesFound { - writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrNoSuchBucket), r.URL, guessIsBrowserReq(r)) + writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrNoSuchBucket), r.URL, guessIsBrowserReq(r)) } else { - writeErrorResponse(context.Background(), w, toAPIError(context.Background(), err), r.URL, guessIsBrowserReq(r)) + writeErrorResponse(r.Context(), w, toAPIError(r.Context(), err), r.URL, guessIsBrowserReq(r)) } return } @@ -772,7 +772,7 @@ type criticalErrorHandler struct{ handler http.Handler } func (h criticalErrorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { defer func() { if err := recover(); err == logger.ErrCritical { // handle - writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrInternalError), r.URL, guessIsBrowserReq(r)) + writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrInternalError), r.URL, guessIsBrowserReq(r)) } else if err != nil { panic(err) // forward other panic calls } @@ -791,7 +791,7 @@ func (h sseTLSHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Method == http.MethodHead { writeErrorResponseHeadersOnly(w, errorCodes.ToAPIErr(ErrInsecureSSECustomerRequest)) } else { - writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrInsecureSSECustomerRequest), r.URL, guessIsBrowserReq(r)) + writeErrorResponse(r.Context(), w, errorCodes.ToAPIErr(ErrInsecureSSECustomerRequest), r.URL, guessIsBrowserReq(r)) } return } diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 34e55b068..13e986cf7 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -18,7 +18,6 @@ package cmd import ( "context" - "fmt" "sync" "time" @@ -126,65 +125,36 @@ func durationToNextHealRound(lastHeal time.Time) time.Duration { } // Healing leader will take the charge of healing all erasure sets -func execLeaderTasks(z *xlZones) { - ctx := context.Background() - - // Hold a lock so only one server performs auto-healing - leaderLock := z.NewNSLock(ctx, minioMetaBucket, "leader") +func execLeaderTasks(ctx context.Context, z *xlZones) { + lastScanTime := UTCNow() // So that we don't heal immediately, but after one month. for { - err := leaderLock.GetLock(leaderLockTimeout) - if err == nil { - break - } - time.Sleep(leaderTick) - } - - // Hold a lock for healing the erasure set - zeroDuration := time.Millisecond - zeroDynamicTimeout := newDynamicTimeout(zeroDuration, zeroDuration) - - lastScanTime := time.Now() // So that we don't heal immediately, but after one month. - for { - time.Sleep(durationToNextHealRound(lastScanTime)) - for _, zone := range z.zones { - // Heal set by set - for i, set := range zone.sets { - setLock := z.zones[0].NewNSLock(ctx, "system", fmt.Sprintf("erasure-set-heal-%d", i)) - if err := setLock.GetLock(zeroDynamicTimeout); err != nil { - logger.LogIf(ctx, err) - continue + select { + case <-ctx.Done(): + return + case <-time.NewTimer(durationToNextHealRound(lastScanTime)).C: + for _, zone := range z.zones { + // Heal set by set + for i, set := range zone.sets { + if err := healErasureSet(ctx, i, set); err != nil { + logger.LogIf(ctx, err) + continue + } } - if err := healErasureSet(ctx, i, set); err != nil { - setLock.Unlock() - logger.LogIf(ctx, err) - continue - } - setLock.Unlock() } + lastScanTime = UTCNow() } - lastScanTime = time.Now() } } -func startGlobalHeal() { - var objAPI ObjectLayer - for { - objAPI = newObjectLayerWithoutSafeModeFn() - if objAPI == nil { - time.Sleep(time.Second) - continue - } - break - } - +func startGlobalHeal(ctx context.Context, objAPI ObjectLayer) { zones, ok := objAPI.(*xlZones) if !ok { return } - execLeaderTasks(zones) + execLeaderTasks(ctx, zones) } -func initGlobalHeal() { - go startGlobalHeal() +func initGlobalHeal(ctx context.Context, objAPI ObjectLayer) { + go startGlobalHeal(ctx, objAPI) } diff --git a/cmd/lock-rest-server.go b/cmd/lock-rest-server.go index df92e8144..147994b86 100644 --- a/cmd/lock-rest-server.go +++ b/cmd/lock-rest-server.go @@ -293,9 +293,7 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error { } // Start lock maintenance from all lock servers. -func startLockMaintenance() { - var ctx = context.Background() - +func startLockMaintenance(ctx context.Context) { // Wait until the object API is ready // no need to start the lock maintenance // if ObjectAPI is not initialized. @@ -317,7 +315,7 @@ func startLockMaintenance() { for { // Verifies every minute for locks held more than 2 minutes. select { - case <-GlobalServiceDoneCh: + case <-ctx.Done(): return case <-ticker.C: // Start with random sleep time, so as to avoid @@ -357,5 +355,5 @@ func registerLockRESTHandlers(router *mux.Router, endpointZones EndpointZones) { } } - go startLockMaintenance() + go startLockMaintenance(GlobalContext) } diff --git a/cmd/notification.go b/cmd/notification.go index 039a178be..c3388434a 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -267,24 +267,6 @@ func (sys *NotificationSys) BackgroundHealStatus() []madmin.BgHealState { return states } -// BackgroundOpsStatus - returns the status of all background operations of all peers -func (sys *NotificationSys) BackgroundOpsStatus() []BgOpsStatus { - states := make([]BgOpsStatus, len(sys.peerClients)) - for idx, client := range sys.peerClients { - if client == nil { - continue - } - st, err := client.BackgroundOpsStatus() - if err != nil { - logger.LogIf(context.Background(), err) - } else { - states[idx] = st - } - } - - return states -} - // StartProfiling - start profiling on remote peers, by initiating a remote RPC. func (sys *NotificationSys) StartProfiling(profiler string) []NotificationPeerErr { ng := WithNPeers(len(sys.peerClients)) diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 4f874c9ca..c36e8d5df 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -614,32 +614,6 @@ func (client *peerRESTClient) BackgroundHealStatus() (madmin.BgHealState, error) return state, err } -// BgLifecycleOpsStatus describes the status -// of the background lifecycle operations -type BgLifecycleOpsStatus struct { - LastActivity time.Time -} - -// BgOpsStatus describes the status of all operations performed -// in background such as auto-healing and lifecycle. -// Notice: We need to increase peer REST API version when adding -// new fields to this struct. -type BgOpsStatus struct { - LifecycleOps BgLifecycleOpsStatus -} - -func (client *peerRESTClient) BackgroundOpsStatus() (BgOpsStatus, error) { - respBody, err := client.call(peerRESTMethodBackgroundOpsStatus, nil, nil, -1) - if err != nil { - return BgOpsStatus{}, err - } - defer http.DrainBody(respBody) - - state := BgOpsStatus{} - err = gob.NewDecoder(respBody).Decode(&state) - return state, err -} - func (client *peerRESTClient) doTrace(traceCh chan interface{}, doneCh chan struct{}, trcAll, trcErr bool) { values := make(url.Values) values.Set(peerRESTTraceAll, strconv.FormatBool(trcAll)) diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go index 80a270305..ec6f32376 100644 --- a/cmd/peer-rest-common.go +++ b/cmd/peer-rest-common.go @@ -34,7 +34,6 @@ const ( peerRESTMethodServerUpdate = "/serverupdate" peerRESTMethodSignalService = "/signalservice" peerRESTMethodBackgroundHealStatus = "/backgroundhealstatus" - peerRESTMethodBackgroundOpsStatus = "/backgroundopsstatus" peerRESTMethodGetLocks = "/getlocks" peerRESTMethodBucketPolicyRemove = "/removebucketpolicy" peerRESTMethodLoadUser = "/loaduser" diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index e174084c0..ef85678ee 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -1129,22 +1129,6 @@ func (s *peerRESTServer) BackgroundHealStatusHandler(w http.ResponseWriter, r *h logger.LogIf(ctx, gob.NewEncoder(w).Encode(state)) } -func (s *peerRESTServer) BackgroundOpsStatusHandler(w http.ResponseWriter, r *http.Request) { - if !s.IsValid(w, r) { - s.writeErrorResponse(w, errors.New("invalid request")) - return - } - - ctx := newContext(r, w, "BackgroundOpsStatus") - - state := BgOpsStatus{ - LifecycleOps: getLocalBgLifecycleOpsStatus(), - } - - defer w.(http.Flusher).Flush() - logger.LogIf(ctx, gob.NewEncoder(w).Encode(state)) -} - // ConsoleLogHandler sends console logs of this node back to peer rest client func (s *peerRESTServer) ConsoleLogHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { @@ -1230,8 +1214,6 @@ func registerPeerRESTHandlers(router *mux.Router) { subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBucketLifecycleRemove).HandlerFunc(httpTraceHdrs(server.RemoveBucketLifecycleHandler)).Queries(restQueries(peerRESTBucket)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBucketEncryptionSet).HandlerFunc(httpTraceHdrs(server.SetBucketSSEConfigHandler)).Queries(restQueries(peerRESTBucket)...) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBucketEncryptionRemove).HandlerFunc(httpTraceHdrs(server.RemoveBucketSSEConfigHandler)).Queries(restQueries(peerRESTBucket)...) - subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBackgroundOpsStatus).HandlerFunc(server.BackgroundOpsStatusHandler) - subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodTrace).HandlerFunc(server.TraceHandler) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodListen).HandlerFunc(httpTraceHdrs(server.ListenHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodBackgroundHealStatus).HandlerFunc(server.BackgroundHealStatusHandler) diff --git a/cmd/posix.go b/cmd/posix.go index 457d0ac80..06e50ec6c 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -672,11 +672,15 @@ func (s *posix) Walk(volume, dirPath, marker string, recursive bool, leafFile st return nil, err } - ch = make(chan FileInfo) + // buffer channel matches the S3 ListObjects implementation + ch = make(chan FileInfo, maxObjectList) go func() { defer close(ch) listDir := func(volume, dirPath, dirEntry string) (emptyDir bool, entries []string) { + // Dynamic time delay. + t := UTCNow() entries, err := s.ListDir(volume, dirPath, -1, leafFile) + sleepDuration(time.Since(t), 10.0) if err != nil { return } @@ -701,7 +705,10 @@ func (s *posix) Walk(volume, dirPath, marker string, recursive bool, leafFile st Mode: os.ModeDir, } } else { + // Dynamic time delay. + t := UTCNow() buf, err := s.ReadAll(volume, pathJoin(walkResult.entry, leafFile)) + sleepDuration(time.Since(t), 10.0) if err != nil { continue } diff --git a/cmd/server-main.go b/cmd/server-main.go index a5b908103..822b028cb 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -26,6 +26,7 @@ import ( "os/signal" "strings" "syscall" + "time" "github.com/minio/cli" "github.com/minio/minio/cmd/config" @@ -285,6 +286,27 @@ func initAllSubsystems(buckets []BucketInfo, newObject ObjectLayer) (err error) return nil } +func startBackgroundOps(ctx context.Context, objAPI ObjectLayer) { + // Make sure only 1 crawler is running on the cluster. + locker := objAPI.NewNSLock(ctx, minioMetaBucket, "leader") + for { + err := locker.GetLock(leaderLockTimeout) + if err != nil { + time.Sleep(leaderTick) + continue + } + break + // No unlock for "leader" lock. + } + + if globalIsXL { + initGlobalHeal(ctx, objAPI) + } + + initDataUsageStats(ctx, objAPI) + initDailyLifecycle(ctx, objAPI) +} + // serverMain handler called for 'minio server' command. func serverMain(ctx *cli.Context) { if ctx.Args().First() == "help" || !endpointsPresent(ctx) { @@ -401,12 +423,11 @@ func serverMain(ctx *cli.Context) { // Enable healing to heal drives if possible if globalIsXL { - initBackgroundHealing() - initLocalDisksAutoHeal() - initGlobalHeal() + initBackgroundHealing(GlobalContext, newObject) + initLocalDisksAutoHeal(GlobalContext, newObject) } - buckets, err := newObject.ListBuckets(context.Background()) + buckets, err := newObject.ListBuckets(GlobalContext) if err != nil { logger.Fatal(err, "Unable to list buckets") } @@ -416,7 +437,7 @@ func serverMain(ctx *cli.Context) { if globalCacheConfig.Enabled { // initialize the new disk cache objects. var cacheAPI CacheObjectLayer - cacheAPI, err = newServerCacheObjects(context.Background(), globalCacheConfig) + cacheAPI, err = newServerCacheObjects(GlobalContext, globalCacheConfig) logger.FatalIf(err, "Unable to initialize disk caching") globalObjLayerMutex.Lock() @@ -429,8 +450,7 @@ func serverMain(ctx *cli.Context) { initFederatorBackend(buckets, newObject) } - initDataUsageStats() - initDailyLifecycle() + go startBackgroundOps(GlobalContext, newObject) // Disable safe mode operation, after all initialization is over. globalObjLayerMutex.Lock() diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index 1c793e7a9..3fd3a450d 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -392,6 +392,11 @@ func (s *xlSets) StorageInfo(ctx context.Context, local bool) StorageInfo { storageInfo.Backend.Sets[i] = make([]madmin.DriveInfo, s.drivesPerSet) } + if local { + // if local is true, we don't need to read format.json + return storageInfo + } + storageDisks, dErrs := initStorageDisksWithErrors(s.endpoints) defer closeStorageDisks(storageDisks)