diff --git a/Makefile b/Makefile index bc934c0cd..16ceab3fb 100644 --- a/Makefile +++ b/Makefile @@ -28,7 +28,7 @@ verifiers: getdeps fmt lint ruleguard check-gen check-gen: @go generate ./... >/dev/null - @git diff --exit-code >/dev/null || echo "Non-committed changes in auto-generated code are detected, please check." + @git diff --exit-code >/dev/null || echo "Non-committed changes detected, please commit them to proceed." fmt: @echo "Running $@ check" diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index e84cc4285..d8d1acb55 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -164,8 +164,8 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureZones, bgSeq *healS for _, disk := range disks { logger.Info("Healing disk '%s' on %s zone", disk, humanize.Ordinal(i+1)) - lbDisks := z.zones[i].sets[setIndex].getLoadBalancedDisks() - if err := healErasureSet(ctx, setIndex, buckets, lbDisks, z.zones[i].setDriveCount); err != nil { + lbDisks := z.zones[i].sets[setIndex].getLoadBalancedNDisks(z.zones[i].listTolerancePerSet) + if err := healErasureSet(ctx, setIndex, buckets, lbDisks); err != nil { logger.LogIf(ctx, err) continue } diff --git a/cmd/erasure-common.go b/cmd/erasure-common.go index 69fe57eee..d6a64ad08 100644 --- a/cmd/erasure-common.go +++ b/cmd/erasure-common.go @@ -19,6 +19,7 @@ package cmd import ( "context" "path" + "sync" "github.com/minio/minio/pkg/sync/errgroup" ) @@ -53,16 +54,37 @@ func (er erasureObjects) getLoadBalancedNDisks(ndisks int) (newDisks []StorageAP // getLoadBalancedDisks - fetches load balanced (sufficiently randomized) disk slice. // ensures to skip disks if they are not healing and online. -func (er erasureObjects) getLoadBalancedDisks() (newDisks []StorageAPI) { +func (er erasureObjects) getLoadBalancedDisks() []StorageAPI { disks := er.getDisks() + var wg sync.WaitGroup + var mu sync.Mutex + var newDisks []StorageAPI // Based on the random shuffling return back randomized disks. for _, i := range hashOrder(UTCNow().String(), len(disks)) { - // Do not consume disks which are being healed. - if disks[i-1] != nil && !disks[i-1].Healing() && disks[i-1].IsOnline() { + i := i + wg.Add(1) + go func() { + defer wg.Done() + if disks[i-1] == nil { + return + } + di, err := disks[i-1].DiskInfo(context.Background()) + if err != nil || di.Healing { + // - Do not consume disks which are not reachable + // unformatted or simply not accessible for some reason. + // + // - Do not consume disks which are being healed + // + // - Future: skip busy disks + return + } + mu.Lock() newDisks = append(newDisks, disks[i-1]) - } + mu.Unlock() + }() } + wg.Wait() return newDisks } diff --git a/cmd/erasure-zones.go b/cmd/erasure-zones.go index bdaca636b..0733852e2 100644 --- a/cmd/erasure-zones.go +++ b/cmd/erasure-zones.go @@ -662,7 +662,7 @@ func (z *erasureZones) listObjectsNonSlash(ctx context.Context, bucket, prefix, for _, zone := range z.zones { zonesEntryChs = append(zonesEntryChs, zone.startMergeWalksN(ctx, bucket, prefix, "", true, endWalkCh, zone.listTolerancePerSet, false)) - zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet) + zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet-1) } var objInfos []ObjectInfo @@ -784,7 +784,7 @@ func (z *erasureZones) listObjectsSplunk(ctx context.Context, bucket, prefix, ma } zonesEntryChs = append(zonesEntryChs, entryChs) zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh) - zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet) + zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet-1) } entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, zonesListTolerancePerSet) @@ -876,7 +876,7 @@ func (z *erasureZones) listObjects(ctx context.Context, bucket, prefix, marker, } zonesEntryChs = append(zonesEntryChs, entryChs) zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh) - zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet) + zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet-1) } entries := mergeZonesEntriesCh(zonesEntryChs, maxKeys, zonesListTolerancePerSet) @@ -1278,7 +1278,7 @@ func (z *erasureZones) listObjectVersions(ctx context.Context, bucket, prefix, m } zonesEntryChs = append(zonesEntryChs, entryChs) zonesEndWalkCh = append(zonesEndWalkCh, endWalkCh) - zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet) + zonesListTolerancePerSet = append(zonesListTolerancePerSet, zone.listTolerancePerSet-1) } entries := mergeZonesEntriesVersionsCh(zonesEntryChs, maxKeys, zonesListTolerancePerSet) diff --git a/cmd/global-heal.go b/cmd/global-heal.go index bd06bbc6d..34a135dfa 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -97,7 +97,7 @@ func getLocalBackgroundHealStatus() (madmin.BgHealState, bool) { } // healErasureSet lists and heals all objects in a specific erasure set -func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, disks []StorageAPI, setDriveCount int) error { +func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, disks []StorageAPI) error { // Get background heal sequence to send elements to heal var bgSeq *healSequence var ok bool @@ -131,10 +131,6 @@ func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, dis var mu sync.Mutex var wg sync.WaitGroup for _, disk := range disks { - if disk == nil { - // Disk can be offline - continue - } disk := disk wg.Add(1) go func() { @@ -157,16 +153,11 @@ func healErasureSet(ctx context.Context, setIndex int, buckets []BucketInfo, dis entries := make([]FileInfoVersions, len(entryChs)) for { - entry, quorumCount, ok := lexicallySortedEntryVersions(entryChs, entries, entriesValid) + entry, _, ok := lexicallySortedEntryVersions(entryChs, entries, entriesValid) if !ok { break } - if quorumCount == setDriveCount { - // Skip good entries. - continue - } - for _, version := range entry.Versions { bgSeq.sourceCh <- healSource{ bucket: bucket.Name, diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 035477261..2ca8a9314 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -161,6 +161,12 @@ func (client *storageRESTClient) Endpoint() Endpoint { } func (client *storageRESTClient) Healing() bool { + // This call should never be called over the network + // this function should always return 'false' + // + // To know if a remote disk is being healed + // perform DiskInfo() call which would return + // back the correct data if disk is being healed. return false } diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 6a311a842..df2db5c70 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -105,6 +105,8 @@ type xlStorage struct { formatFileInfo os.FileInfo formatLastCheck time.Time + diskInfoCache timedValue + ctx context.Context sync.RWMutex } @@ -446,6 +448,7 @@ type DiskInfo struct { Total uint64 Free uint64 Used uint64 + FSType string RootDisk bool Healing bool Endpoint string @@ -462,23 +465,41 @@ func (s *xlStorage) DiskInfo(context.Context) (info DiskInfo, err error) { atomic.AddInt32(&s.activeIOCount, -1) }() - di, err := getDiskInfo(s.diskPath) - if err != nil { - return info, err - } + s.diskInfoCache.Once.Do(func() { + s.diskInfoCache.TTL = time.Second + s.diskInfoCache.Update = func() (interface{}, error) { + dcinfo := DiskInfo{ + RootDisk: s.rootDisk, + MountPath: s.diskPath, + Endpoint: s.endpoint.String(), + } + di, err := getDiskInfo(s.diskPath) + if err != nil { + return dcinfo, err + } + dcinfo.Total = di.Total + dcinfo.Free = di.Free + dcinfo.Used = di.Total - di.Free + dcinfo.FSType = di.FSType - info = DiskInfo{ - Total: di.Total, - Free: di.Free, - Used: di.Total - di.Free, - Healing: s.Healing(), - RootDisk: s.rootDisk, - MountPath: s.diskPath, - Endpoint: s.endpoint.String(), - } + diskID, err := s.GetDiskID() + if errors.Is(err, errUnformattedDisk) { + // if we found an unformatted disk then + // healing is automatically true. + dcinfo.Healing = true + } else { + // Check if the disk is being healed if GetDiskID + // returned any error other than fresh disk + dcinfo.Healing = s.Healing() + } - diskID, err := s.GetDiskID() - info.ID = diskID + dcinfo.ID = diskID + return dcinfo, err + } + }) + + v, err := s.diskInfoCache.Get() + info = v.(DiskInfo) return info, err } @@ -503,7 +524,7 @@ func (s *xlStorage) GetDiskID() (string, error) { s.RUnlock() // check if we have a valid disk ID that is less than 1 second old. - if fileInfo != nil && diskID != "" && time.Now().Before(lastCheck.Add(time.Second)) { + if fileInfo != nil && diskID != "" && time.Since(lastCheck) <= time.Second { return diskID, nil }