diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 134920883..f0d49f7d7 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -297,7 +297,7 @@ func (a adminAPIHandlers) StorageInfoHandler(w http.ResponseWriter, r *http.Requ storageInfo, _ := objectAPI.StorageInfo(ctx) // Collect any disk healing. - healing, _ := getAggregatedBackgroundHealState(ctx) + healing, _ := getAggregatedBackgroundHealState(ctx, nil) healDisks := make(map[string]struct{}, len(healing.HealDisks)) for _, disk := range healing.HealDisks { healDisks[disk] = struct{}{} @@ -861,16 +861,14 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) { keepConnLive(w, r, respCh) } -func getAggregatedBackgroundHealState(ctx context.Context) (madmin.BgHealState, error) { - var bgHealStates []madmin.BgHealState - - localHealState, ok := getLocalBackgroundHealStatus() - if !ok { - return madmin.BgHealState{}, errServerNotInitialized - } - +// getAggregatedBackgroundHealState returns the heal state of disks. +// If no ObjectLayer is provided no set status is returned. +func getAggregatedBackgroundHealState(ctx context.Context, o ObjectLayer) (madmin.BgHealState, error) { // Get local heal status first - bgHealStates = append(bgHealStates, localHealState) + bgHealStates, ok := getBackgroundHealStatus(ctx, o) + if !ok { + return bgHealStates, errServerNotInitialized + } if globalIsDistErasure { // Get heal status from other peers @@ -885,33 +883,10 @@ func getAggregatedBackgroundHealState(ctx context.Context) (madmin.BgHealState, if errCount == len(nerrs) { return madmin.BgHealState{}, fmt.Errorf("all remote servers failed to report heal status, cluster is unhealthy") } - bgHealStates = append(bgHealStates, peersHealStates...) + bgHealStates.Merge(peersHealStates...) } - // Aggregate healing result - var aggregatedHealStateResult = madmin.BgHealState{ - ScannedItemsCount: bgHealStates[0].ScannedItemsCount, - LastHealActivity: bgHealStates[0].LastHealActivity, - NextHealRound: bgHealStates[0].NextHealRound, - HealDisks: bgHealStates[0].HealDisks, - } - - bgHealStates = bgHealStates[1:] - - for _, state := range bgHealStates { - aggregatedHealStateResult.ScannedItemsCount += state.ScannedItemsCount - aggregatedHealStateResult.HealDisks = append(aggregatedHealStateResult.HealDisks, state.HealDisks...) - if !state.LastHealActivity.IsZero() && aggregatedHealStateResult.LastHealActivity.Before(state.LastHealActivity) { - aggregatedHealStateResult.LastHealActivity = state.LastHealActivity - // The node which has the last heal activity means its - // is the node that is orchestrating self healing operations, - // which also means it is the same node which decides when - // the next self healing operation will be done. - aggregatedHealStateResult.NextHealRound = state.NextHealRound - } - } - - return aggregatedHealStateResult, nil + return bgHealStates, nil } func (a adminAPIHandlers) BackgroundHealStatusHandler(w http.ResponseWriter, r *http.Request) { @@ -930,7 +905,7 @@ func (a adminAPIHandlers) BackgroundHealStatusHandler(w http.ResponseWriter, r * return } - aggregateHealStateResult, err := getAggregatedBackgroundHealState(r.Context()) + aggregateHealStateResult, err := getAggregatedBackgroundHealState(r.Context(), objectAPI) if err != nil { writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return @@ -1568,9 +1543,9 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque // Get the notification target info notifyTarget := fetchLambdaInfo() - server := getLocalServerProperty(globalEndpoints, r) + local := getLocalServerProperty(globalEndpoints, r) servers := globalNotificationSys.ServerInfo() - servers = append(servers, server) + servers = append(servers, local) assignPoolNumbers(servers) @@ -1598,7 +1573,7 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque // Fetching the backend information backendInfo := objectAPI.BackendInfo() - if backendInfo.Type == BackendType(madmin.Erasure) { + if backendInfo.Type == madmin.Erasure { // Calculate the number of online/offline disks of all nodes var allDisks []madmin.Disk for _, s := range servers { diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index f868a6902..8e2d9b4bc 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -90,8 +90,9 @@ type allHealState struct { sync.RWMutex // map of heal path to heal sequence - healSeqMap map[string]*healSequence + healSeqMap map[string]*healSequence // Indexed by endpoint healLocalDisks map[Endpoint]struct{} + healStatus map[string]healingTracker // Indexed by disk ID } // newHealState - initialize global heal state management @@ -99,6 +100,7 @@ func newHealState(cleanup bool) *allHealState { hstate := &allHealState{ healSeqMap: make(map[string]*healSequence), healLocalDisks: map[Endpoint]struct{}{}, + healStatus: make(map[string]healingTracker), } if cleanup { go hstate.periodicHealSeqsClean(GlobalContext) @@ -113,7 +115,56 @@ func (ahs *allHealState) healDriveCount() int { return len(ahs.healLocalDisks) } -func (ahs *allHealState) getHealLocalDisks() Endpoints { +func (ahs *allHealState) popHealLocalDisks(healLocalDisks ...Endpoint) { + ahs.Lock() + defer ahs.Unlock() + + for _, ep := range healLocalDisks { + delete(ahs.healLocalDisks, ep) + } + for id, disk := range ahs.healStatus { + for _, ep := range healLocalDisks { + if disk.Endpoint == ep.String() { + delete(ahs.healStatus, id) + } + } + } +} + +// updateHealStatus will update the heal status. +func (ahs *allHealState) updateHealStatus(tracker *healingTracker) { + ahs.Lock() + defer ahs.Unlock() + ahs.healStatus[tracker.ID] = *tracker +} + +// Sort by zone, set and disk index +func sortDisks(disks []madmin.Disk) { + sort.Slice(disks, func(i, j int) bool { + a, b := &disks[i], &disks[j] + if a.PoolIndex != b.PoolIndex { + return a.PoolIndex < b.PoolIndex + } + if a.SetIndex != b.SetIndex { + return a.SetIndex < b.SetIndex + } + return a.DiskIndex < b.DiskIndex + }) +} + +// getLocalHealingDisks returns local healing disks indexed by endpoint. +func (ahs *allHealState) getLocalHealingDisks() map[string]madmin.HealingDisk { + ahs.RLock() + defer ahs.RUnlock() + dst := make(map[string]madmin.HealingDisk, len(ahs.healStatus)) + for _, v := range ahs.healStatus { + dst[v.Endpoint] = v.toHealingDisk() + } + + return dst +} + +func (ahs *allHealState) getHealLocalDiskEndpoints() Endpoints { ahs.RLock() defer ahs.RUnlock() @@ -124,15 +175,6 @@ func (ahs *allHealState) getHealLocalDisks() Endpoints { return endpoints } -func (ahs *allHealState) popHealLocalDisks(healLocalDisks ...Endpoint) { - ahs.Lock() - defer ahs.Unlock() - - for _, ep := range healLocalDisks { - delete(ahs.healLocalDisks, ep) - } -} - func (ahs *allHealState) pushHealLocalDisks(healLocalDisks ...Endpoint) { ahs.Lock() defer ahs.Unlock() diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index d98265da9..36c398940 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -17,16 +17,23 @@ package cmd import ( + "bytes" "context" + "encoding/json" "errors" "fmt" + "io" "sort" + "strings" + "sync" "time" "github.com/dustin/go-humanize" + "github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/color" "github.com/minio/minio/pkg/console" + "github.com/minio/minio/pkg/madmin" ) const ( @@ -35,10 +42,200 @@ const ( ) //go:generate msgp -file $GOFILE -unexported -type healingTracker struct { - ID string - // future add more tracking capabilities +// healingTracker is used to persist healing information during a heal. +type healingTracker struct { + disk StorageAPI `msg:"-"` + + ID string + PoolIndex int + SetIndex int + DiskIndex int + Path string + Endpoint string + Started time.Time + LastUpdate time.Time + ObjectsHealed uint64 + ObjectsFailed uint64 + BytesDone uint64 + BytesFailed uint64 + + // Last object scanned. + Bucket string `json:"-"` + Object string `json:"-"` + + // Numbers when current bucket started healing, + // for resuming with correct numbers. + ResumeObjectsHealed uint64 `json:"-"` + ResumeObjectsFailed uint64 `json:"-"` + ResumeBytesDone uint64 `json:"-"` + ResumeBytesFailed uint64 `json:"-"` + + // Filled on startup/restarts. + QueuedBuckets []string + + // Filled during heal. + HealedBuckets []string + // Add future tracking capabilities + // Be sure that they are included in toHealingDisk +} + +// loadHealingTracker will load the healing tracker from the supplied disk. +// The disk ID will be validated against the loaded one. +func loadHealingTracker(ctx context.Context, disk StorageAPI) (*healingTracker, error) { + if disk == nil { + return nil, errors.New("loadHealingTracker: nil disk given") + } + diskID, err := disk.GetDiskID() + if err != nil { + return nil, err + } + b, err := disk.ReadAll(ctx, minioMetaBucket, + pathJoin(bucketMetaPrefix, slashSeparator, healingTrackerFilename)) + if err != nil { + return nil, err + } + var h healingTracker + _, err = h.UnmarshalMsg(b) + if err != nil { + return nil, err + } + if h.ID != diskID && h.ID != "" { + return nil, fmt.Errorf("loadHealingTracker: disk id mismatch expected %s, got %s", h.ID, diskID) + } + h.disk = disk + h.ID = diskID + return &h, nil +} + +// newHealingTracker will create a new healing tracker for the disk. +func newHealingTracker(disk StorageAPI) *healingTracker { + diskID, _ := disk.GetDiskID() + h := healingTracker{ + disk: disk, + ID: diskID, + Path: disk.String(), + Endpoint: disk.Endpoint().String(), + Started: time.Now().UTC(), + } + h.PoolIndex, h.SetIndex, h.DiskIndex = disk.GetDiskLoc() + return &h +} + +// update will update the tracker on the disk. +// If the tracker has been deleted an error is returned. +func (h *healingTracker) update(ctx context.Context) error { + if h.disk.Healing() == nil { + return fmt.Errorf("healingTracker: disk %q is not marked as healing", h.ID) + } + if h.ID == "" || h.PoolIndex < 0 || h.SetIndex < 0 || h.DiskIndex < 0 { + h.ID, _ = h.disk.GetDiskID() + h.PoolIndex, h.SetIndex, h.DiskIndex = h.disk.GetDiskLoc() + } + return h.save(ctx) +} + +// save will unconditionally save the tracker and will be created if not existing. +func (h *healingTracker) save(ctx context.Context) error { + if h.PoolIndex < 0 || h.SetIndex < 0 || h.DiskIndex < 0 { + // Attempt to get location. + if api := newObjectLayerFn(); api != nil { + if ep, ok := api.(*erasureServerPools); ok { + h.PoolIndex, h.SetIndex, h.DiskIndex, _ = ep.getPoolAndSet(h.ID) + } + } + } + h.LastUpdate = time.Now().UTC() + htrackerBytes, err := h.MarshalMsg(nil) + if err != nil { + return err + } + globalBackgroundHealState.updateHealStatus(h) + return h.disk.WriteAll(ctx, minioMetaBucket, + pathJoin(bucketMetaPrefix, slashSeparator, healingTrackerFilename), + htrackerBytes) +} + +// delete the tracker on disk. +func (h *healingTracker) delete(ctx context.Context) error { + return h.disk.Delete(ctx, minioMetaBucket, + pathJoin(bucketMetaPrefix, slashSeparator, healingTrackerFilename), + false) +} + +func (h *healingTracker) isHealed(bucket string) bool { + for _, v := range h.HealedBuckets { + if v == bucket { + return true + } + } + return false +} + +// resume will reset progress to the numbers at the start of the bucket. +func (h *healingTracker) resume() { + h.ObjectsHealed = h.ResumeObjectsHealed + h.ObjectsFailed = h.ResumeObjectsFailed + h.BytesDone = h.ResumeBytesDone + h.BytesFailed = h.ResumeBytesFailed +} + +// bucketDone should be called when a bucket is done healing. +// Adds the bucket to the list of healed buckets and updates resume numbers. +func (h *healingTracker) bucketDone(bucket string) { + h.ResumeObjectsHealed = h.ObjectsHealed + h.ResumeObjectsFailed = h.ObjectsFailed + h.ResumeBytesDone = h.BytesDone + h.ResumeBytesFailed = h.BytesFailed + h.HealedBuckets = append(h.HealedBuckets, bucket) + for i, b := range h.QueuedBuckets { + if b == bucket { + // Delete... + h.QueuedBuckets = append(h.QueuedBuckets[:i], h.QueuedBuckets[i+1:]...) + } + } +} + +// setQueuedBuckets will add buckets, but exclude any that is already in h.HealedBuckets. +// Order is preserved. +func (h *healingTracker) setQueuedBuckets(buckets []BucketInfo) { + s := set.CreateStringSet(h.HealedBuckets...) + h.QueuedBuckets = make([]string, 0, len(buckets)) + for _, b := range buckets { + if !s.Contains(b.Name) { + h.QueuedBuckets = append(h.QueuedBuckets, b.Name) + } + } +} + +func (h *healingTracker) printTo(writer io.Writer) { + b, err := json.MarshalIndent(h, "", " ") + if err != nil { + writer.Write([]byte(err.Error())) + } + writer.Write(b) +} + +// toHealingDisk converts the information to madmin.HealingDisk +func (h *healingTracker) toHealingDisk() madmin.HealingDisk { + return madmin.HealingDisk{ + ID: h.ID, + Endpoint: h.Endpoint, + PoolIndex: h.PoolIndex, + SetIndex: h.SetIndex, + DiskIndex: h.DiskIndex, + Path: h.Path, + Started: h.Started.UTC(), + LastUpdate: h.LastUpdate.UTC(), + ObjectsHealed: h.ObjectsHealed, + ObjectsFailed: h.ObjectsFailed, + BytesDone: h.BytesDone, + BytesFailed: h.BytesFailed, + Bucket: h.Bucket, + Object: h.Object, + QueuedBuckets: h.QueuedBuckets, + HealedBuckets: h.HealedBuckets, + } } func initAutoHeal(ctx context.Context, objAPI ObjectLayer) { @@ -90,7 +287,7 @@ func getLocalDisksToHeal() (disksToHeal Endpoints) { disk, _, err := connectEndpoint(endpoint) if errors.Is(err, errUnformattedDisk) { disksToHeal = append(disksToHeal, endpoint) - } else if err == nil && disk != nil && disk.Healing() { + } else if err == nil && disk != nil && disk.Healing() != nil { disksToHeal = append(disksToHeal, disk.Endpoint()) } } @@ -114,7 +311,7 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerPools, bgSeq // Perform automatic disk healing when a disk is replaced locally. diskCheckTimer := time.NewTimer(defaultMonitorNewDiskInterval) defer diskCheckTimer.Stop() -wait: + for { select { case <-ctx.Done(): @@ -125,7 +322,7 @@ wait: var erasureSetInPoolDisksToHeal []map[int][]StorageAPI - healDisks := globalBackgroundHealState.getHealLocalDisks() + healDisks := globalBackgroundHealState.getHealLocalDiskEndpoints() if len(healDisks) > 0 { // Reformat disks bgSeq.sourceCh <- healSource{bucket: SlashSeparator} @@ -174,55 +371,70 @@ wait: buckets, _ := z.ListBuckets(ctx) + buckets = append(buckets, BucketInfo{ + Name: pathJoin(minioMetaBucket, minioConfigPrefix), + }) + // Heal latest buckets first. sort.Slice(buckets, func(i, j int) bool { + a, b := strings.HasPrefix(buckets[i].Name, minioMetaBucket), strings.HasPrefix(buckets[j].Name, minioMetaBucket) + if a != b { + return a + } return buckets[i].Created.After(buckets[j].Created) }) + // TODO(klauspost): This will block until all heals are done, + // in the future this should be able to start healing other sets at once. + var wg sync.WaitGroup for i, setMap := range erasureSetInPoolDisksToHeal { + i := i for setIndex, disks := range setMap { - for _, disk := range disks { - logger.Info("Healing disk '%s' on %s pool", disk, humanize.Ordinal(i+1)) + if len(disks) == 0 { + continue + } + wg.Add(1) + go func(setIndex int, disks []StorageAPI) { + defer wg.Done() + for _, disk := range disks { + logger.Info("Healing disk '%v' on %s pool", disk, humanize.Ordinal(i+1)) - // So someone changed the drives underneath, healing tracker missing. - if !disk.Healing() { - logger.Info("Healing tracker missing on '%s', disk was swapped again on %s pool", disk, humanize.Ordinal(i+1)) - diskID, err := disk.GetDiskID() + // So someone changed the drives underneath, healing tracker missing. + tracker, err := loadHealingTracker(ctx, disk) if err != nil { - logger.LogIf(ctx, err) - // reading format.json failed or not found, proceed to look - // for new disks to be healed again, we cannot proceed further. - goto wait + logger.Info("Healing tracker missing on '%s', disk was swapped again on %s pool", disk, humanize.Ordinal(i+1)) + tracker = newHealingTracker(disk) } - if err := saveHealingTracker(disk, diskID); err != nil { + tracker.PoolIndex, tracker.SetIndex, tracker.DiskIndex = disk.GetDiskLoc() + tracker.setQueuedBuckets(buckets) + if err := tracker.save(ctx); err != nil { logger.LogIf(ctx, err) // Unable to write healing tracker, permission denied or some // other unexpected error occurred. Proceed to look for new // disks to be healed again, we cannot proceed further. - goto wait + return } + + err = z.serverPools[i].sets[setIndex].healErasureSet(ctx, buckets, tracker) + if err != nil { + logger.LogIf(ctx, err) + continue + } + + logger.Info("Healing disk '%s' on %s pool complete", disk, humanize.Ordinal(i+1)) + var buf bytes.Buffer + tracker.printTo(&buf) + logger.Info("Summary:\n%s", buf.String()) + logger.LogIf(ctx, tracker.delete(ctx)) + + // Only upon success pop the healed disk. + globalBackgroundHealState.popHealLocalDisks(disk.Endpoint()) } - - err := z.serverPools[i].sets[setIndex].healErasureSet(ctx, buckets) - if err != nil { - logger.LogIf(ctx, err) - continue - } - - logger.Info("Healing disk '%s' on %s pool complete", disk, humanize.Ordinal(i+1)) - - if err := disk.Delete(ctx, pathJoin(minioMetaBucket, bucketMetaPrefix), - healingTrackerFilename, false); err != nil && !errors.Is(err, errFileNotFound) { - logger.LogIf(ctx, err) - continue - } - - // Only upon success pop the healed disk. - globalBackgroundHealState.popHealLocalDisks(disk.Endpoint()) - } + }(setIndex, disks) } } + wg.Wait() } } } diff --git a/cmd/background-newdisks-heal-ops_gen.go b/cmd/background-newdisks-heal-ops_gen.go index fae339a2c..13ca63c32 100644 --- a/cmd/background-newdisks-heal-ops_gen.go +++ b/cmd/background-newdisks-heal-ops_gen.go @@ -30,6 +30,146 @@ func (z *healingTracker) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "ID") return } + case "PoolIndex": + z.PoolIndex, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "PoolIndex") + return + } + case "SetIndex": + z.SetIndex, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "SetIndex") + return + } + case "DiskIndex": + z.DiskIndex, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "DiskIndex") + return + } + case "Path": + z.Path, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Path") + return + } + case "Endpoint": + z.Endpoint, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Endpoint") + return + } + case "Started": + z.Started, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "Started") + return + } + case "LastUpdate": + z.LastUpdate, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "LastUpdate") + return + } + case "ObjectsHealed": + z.ObjectsHealed, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "ObjectsHealed") + return + } + case "ObjectsFailed": + z.ObjectsFailed, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "ObjectsFailed") + return + } + case "BytesDone": + z.BytesDone, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "BytesDone") + return + } + case "BytesFailed": + z.BytesFailed, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "BytesFailed") + return + } + case "Bucket": + z.Bucket, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "Object": + z.Object, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Object") + return + } + case "ResumeObjectsHealed": + z.ResumeObjectsHealed, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "ResumeObjectsHealed") + return + } + case "ResumeObjectsFailed": + z.ResumeObjectsFailed, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "ResumeObjectsFailed") + return + } + case "ResumeBytesDone": + z.ResumeBytesDone, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "ResumeBytesDone") + return + } + case "ResumeBytesFailed": + z.ResumeBytesFailed, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "ResumeBytesFailed") + return + } + case "QueuedBuckets": + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "QueuedBuckets") + return + } + if cap(z.QueuedBuckets) >= int(zb0002) { + z.QueuedBuckets = (z.QueuedBuckets)[:zb0002] + } else { + z.QueuedBuckets = make([]string, zb0002) + } + for za0001 := range z.QueuedBuckets { + z.QueuedBuckets[za0001], err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "QueuedBuckets", za0001) + return + } + } + case "HealedBuckets": + var zb0003 uint32 + zb0003, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "HealedBuckets") + return + } + if cap(z.HealedBuckets) >= int(zb0003) { + z.HealedBuckets = (z.HealedBuckets)[:zb0003] + } else { + z.HealedBuckets = make([]string, zb0003) + } + for za0002 := range z.HealedBuckets { + z.HealedBuckets[za0002], err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "HealedBuckets", za0002) + return + } + } default: err = dc.Skip() if err != nil { @@ -42,10 +182,10 @@ func (z *healingTracker) DecodeMsg(dc *msgp.Reader) (err error) { } // EncodeMsg implements msgp.Encodable -func (z healingTracker) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 1 +func (z *healingTracker) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 20 // write "ID" - err = en.Append(0x81, 0xa2, 0x49, 0x44) + err = en.Append(0xde, 0x0, 0x14, 0xa2, 0x49, 0x44) if err != nil { return } @@ -54,16 +194,283 @@ func (z healingTracker) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "ID") return } + // write "PoolIndex" + err = en.Append(0xa9, 0x50, 0x6f, 0x6f, 0x6c, 0x49, 0x6e, 0x64, 0x65, 0x78) + if err != nil { + return + } + err = en.WriteInt(z.PoolIndex) + if err != nil { + err = msgp.WrapError(err, "PoolIndex") + return + } + // write "SetIndex" + err = en.Append(0xa8, 0x53, 0x65, 0x74, 0x49, 0x6e, 0x64, 0x65, 0x78) + if err != nil { + return + } + err = en.WriteInt(z.SetIndex) + if err != nil { + err = msgp.WrapError(err, "SetIndex") + return + } + // write "DiskIndex" + err = en.Append(0xa9, 0x44, 0x69, 0x73, 0x6b, 0x49, 0x6e, 0x64, 0x65, 0x78) + if err != nil { + return + } + err = en.WriteInt(z.DiskIndex) + if err != nil { + err = msgp.WrapError(err, "DiskIndex") + return + } + // write "Path" + err = en.Append(0xa4, 0x50, 0x61, 0x74, 0x68) + if err != nil { + return + } + err = en.WriteString(z.Path) + if err != nil { + err = msgp.WrapError(err, "Path") + return + } + // write "Endpoint" + err = en.Append(0xa8, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74) + if err != nil { + return + } + err = en.WriteString(z.Endpoint) + if err != nil { + err = msgp.WrapError(err, "Endpoint") + return + } + // write "Started" + err = en.Append(0xa7, 0x53, 0x74, 0x61, 0x72, 0x74, 0x65, 0x64) + if err != nil { + return + } + err = en.WriteTime(z.Started) + if err != nil { + err = msgp.WrapError(err, "Started") + return + } + // write "LastUpdate" + err = en.Append(0xaa, 0x4c, 0x61, 0x73, 0x74, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65) + if err != nil { + return + } + err = en.WriteTime(z.LastUpdate) + if err != nil { + err = msgp.WrapError(err, "LastUpdate") + return + } + // write "ObjectsHealed" + err = en.Append(0xad, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64) + if err != nil { + return + } + err = en.WriteUint64(z.ObjectsHealed) + if err != nil { + err = msgp.WrapError(err, "ObjectsHealed") + return + } + // write "ObjectsFailed" + err = en.Append(0xad, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64) + if err != nil { + return + } + err = en.WriteUint64(z.ObjectsFailed) + if err != nil { + err = msgp.WrapError(err, "ObjectsFailed") + return + } + // write "BytesDone" + err = en.Append(0xa9, 0x42, 0x79, 0x74, 0x65, 0x73, 0x44, 0x6f, 0x6e, 0x65) + if err != nil { + return + } + err = en.WriteUint64(z.BytesDone) + if err != nil { + err = msgp.WrapError(err, "BytesDone") + return + } + // write "BytesFailed" + err = en.Append(0xab, 0x42, 0x79, 0x74, 0x65, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64) + if err != nil { + return + } + err = en.WriteUint64(z.BytesFailed) + if err != nil { + err = msgp.WrapError(err, "BytesFailed") + return + } + // write "Bucket" + err = en.Append(0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) + if err != nil { + return + } + err = en.WriteString(z.Bucket) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + // write "Object" + err = en.Append(0xa6, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74) + if err != nil { + return + } + err = en.WriteString(z.Object) + if err != nil { + err = msgp.WrapError(err, "Object") + return + } + // write "ResumeObjectsHealed" + err = en.Append(0xb3, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64) + if err != nil { + return + } + err = en.WriteUint64(z.ResumeObjectsHealed) + if err != nil { + err = msgp.WrapError(err, "ResumeObjectsHealed") + return + } + // write "ResumeObjectsFailed" + err = en.Append(0xb3, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64) + if err != nil { + return + } + err = en.WriteUint64(z.ResumeObjectsFailed) + if err != nil { + err = msgp.WrapError(err, "ResumeObjectsFailed") + return + } + // write "ResumeBytesDone" + err = en.Append(0xaf, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x44, 0x6f, 0x6e, 0x65) + if err != nil { + return + } + err = en.WriteUint64(z.ResumeBytesDone) + if err != nil { + err = msgp.WrapError(err, "ResumeBytesDone") + return + } + // write "ResumeBytesFailed" + err = en.Append(0xb1, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64) + if err != nil { + return + } + err = en.WriteUint64(z.ResumeBytesFailed) + if err != nil { + err = msgp.WrapError(err, "ResumeBytesFailed") + return + } + // write "QueuedBuckets" + err = en.Append(0xad, 0x51, 0x75, 0x65, 0x75, 0x65, 0x64, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.QueuedBuckets))) + if err != nil { + err = msgp.WrapError(err, "QueuedBuckets") + return + } + for za0001 := range z.QueuedBuckets { + err = en.WriteString(z.QueuedBuckets[za0001]) + if err != nil { + err = msgp.WrapError(err, "QueuedBuckets", za0001) + return + } + } + // write "HealedBuckets" + err = en.Append(0xad, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.HealedBuckets))) + if err != nil { + err = msgp.WrapError(err, "HealedBuckets") + return + } + for za0002 := range z.HealedBuckets { + err = en.WriteString(z.HealedBuckets[za0002]) + if err != nil { + err = msgp.WrapError(err, "HealedBuckets", za0002) + return + } + } return } // MarshalMsg implements msgp.Marshaler -func (z healingTracker) MarshalMsg(b []byte) (o []byte, err error) { +func (z *healingTracker) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 1 + // map header, size 20 // string "ID" - o = append(o, 0x81, 0xa2, 0x49, 0x44) + o = append(o, 0xde, 0x0, 0x14, 0xa2, 0x49, 0x44) o = msgp.AppendString(o, z.ID) + // string "PoolIndex" + o = append(o, 0xa9, 0x50, 0x6f, 0x6f, 0x6c, 0x49, 0x6e, 0x64, 0x65, 0x78) + o = msgp.AppendInt(o, z.PoolIndex) + // string "SetIndex" + o = append(o, 0xa8, 0x53, 0x65, 0x74, 0x49, 0x6e, 0x64, 0x65, 0x78) + o = msgp.AppendInt(o, z.SetIndex) + // string "DiskIndex" + o = append(o, 0xa9, 0x44, 0x69, 0x73, 0x6b, 0x49, 0x6e, 0x64, 0x65, 0x78) + o = msgp.AppendInt(o, z.DiskIndex) + // string "Path" + o = append(o, 0xa4, 0x50, 0x61, 0x74, 0x68) + o = msgp.AppendString(o, z.Path) + // string "Endpoint" + o = append(o, 0xa8, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74) + o = msgp.AppendString(o, z.Endpoint) + // string "Started" + o = append(o, 0xa7, 0x53, 0x74, 0x61, 0x72, 0x74, 0x65, 0x64) + o = msgp.AppendTime(o, z.Started) + // string "LastUpdate" + o = append(o, 0xaa, 0x4c, 0x61, 0x73, 0x74, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65) + o = msgp.AppendTime(o, z.LastUpdate) + // string "ObjectsHealed" + o = append(o, 0xad, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64) + o = msgp.AppendUint64(o, z.ObjectsHealed) + // string "ObjectsFailed" + o = append(o, 0xad, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64) + o = msgp.AppendUint64(o, z.ObjectsFailed) + // string "BytesDone" + o = append(o, 0xa9, 0x42, 0x79, 0x74, 0x65, 0x73, 0x44, 0x6f, 0x6e, 0x65) + o = msgp.AppendUint64(o, z.BytesDone) + // string "BytesFailed" + o = append(o, 0xab, 0x42, 0x79, 0x74, 0x65, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64) + o = msgp.AppendUint64(o, z.BytesFailed) + // string "Bucket" + o = append(o, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) + o = msgp.AppendString(o, z.Bucket) + // string "Object" + o = append(o, 0xa6, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74) + o = msgp.AppendString(o, z.Object) + // string "ResumeObjectsHealed" + o = append(o, 0xb3, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64) + o = msgp.AppendUint64(o, z.ResumeObjectsHealed) + // string "ResumeObjectsFailed" + o = append(o, 0xb3, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64) + o = msgp.AppendUint64(o, z.ResumeObjectsFailed) + // string "ResumeBytesDone" + o = append(o, 0xaf, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x44, 0x6f, 0x6e, 0x65) + o = msgp.AppendUint64(o, z.ResumeBytesDone) + // string "ResumeBytesFailed" + o = append(o, 0xb1, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64) + o = msgp.AppendUint64(o, z.ResumeBytesFailed) + // string "QueuedBuckets" + o = append(o, 0xad, 0x51, 0x75, 0x65, 0x75, 0x65, 0x64, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.QueuedBuckets))) + for za0001 := range z.QueuedBuckets { + o = msgp.AppendString(o, z.QueuedBuckets[za0001]) + } + // string "HealedBuckets" + o = append(o, 0xad, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.HealedBuckets))) + for za0002 := range z.HealedBuckets { + o = msgp.AppendString(o, z.HealedBuckets[za0002]) + } return } @@ -91,6 +498,146 @@ func (z *healingTracker) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "ID") return } + case "PoolIndex": + z.PoolIndex, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "PoolIndex") + return + } + case "SetIndex": + z.SetIndex, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "SetIndex") + return + } + case "DiskIndex": + z.DiskIndex, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "DiskIndex") + return + } + case "Path": + z.Path, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Path") + return + } + case "Endpoint": + z.Endpoint, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Endpoint") + return + } + case "Started": + z.Started, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Started") + return + } + case "LastUpdate": + z.LastUpdate, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "LastUpdate") + return + } + case "ObjectsHealed": + z.ObjectsHealed, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ObjectsHealed") + return + } + case "ObjectsFailed": + z.ObjectsFailed, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ObjectsFailed") + return + } + case "BytesDone": + z.BytesDone, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "BytesDone") + return + } + case "BytesFailed": + z.BytesFailed, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "BytesFailed") + return + } + case "Bucket": + z.Bucket, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "Object": + z.Object, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Object") + return + } + case "ResumeObjectsHealed": + z.ResumeObjectsHealed, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ResumeObjectsHealed") + return + } + case "ResumeObjectsFailed": + z.ResumeObjectsFailed, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ResumeObjectsFailed") + return + } + case "ResumeBytesDone": + z.ResumeBytesDone, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ResumeBytesDone") + return + } + case "ResumeBytesFailed": + z.ResumeBytesFailed, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ResumeBytesFailed") + return + } + case "QueuedBuckets": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "QueuedBuckets") + return + } + if cap(z.QueuedBuckets) >= int(zb0002) { + z.QueuedBuckets = (z.QueuedBuckets)[:zb0002] + } else { + z.QueuedBuckets = make([]string, zb0002) + } + for za0001 := range z.QueuedBuckets { + z.QueuedBuckets[za0001], bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "QueuedBuckets", za0001) + return + } + } + case "HealedBuckets": + var zb0003 uint32 + zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "HealedBuckets") + return + } + if cap(z.HealedBuckets) >= int(zb0003) { + z.HealedBuckets = (z.HealedBuckets)[:zb0003] + } else { + z.HealedBuckets = make([]string, zb0003) + } + for za0002 := range z.HealedBuckets { + z.HealedBuckets[za0002], bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "HealedBuckets", za0002) + return + } + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -104,7 +651,14 @@ func (z *healingTracker) UnmarshalMsg(bts []byte) (o []byte, err error) { } // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z healingTracker) Msgsize() (s int) { - s = 1 + 3 + msgp.StringPrefixSize + len(z.ID) +func (z *healingTracker) Msgsize() (s int) { + s = 3 + 3 + msgp.StringPrefixSize + len(z.ID) + 10 + msgp.IntSize + 9 + msgp.IntSize + 10 + msgp.IntSize + 5 + msgp.StringPrefixSize + len(z.Path) + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 8 + msgp.TimeSize + 11 + msgp.TimeSize + 14 + msgp.Uint64Size + 14 + msgp.Uint64Size + 10 + msgp.Uint64Size + 12 + msgp.Uint64Size + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Object) + 20 + msgp.Uint64Size + 20 + msgp.Uint64Size + 16 + msgp.Uint64Size + 18 + msgp.Uint64Size + 14 + msgp.ArrayHeaderSize + for za0001 := range z.QueuedBuckets { + s += msgp.StringPrefixSize + len(z.QueuedBuckets[za0001]) + } + s += 14 + msgp.ArrayHeaderSize + for za0002 := range z.HealedBuckets { + s += msgp.StringPrefixSize + len(z.HealedBuckets[za0002]) + } return } diff --git a/cmd/erasure-common.go b/cmd/erasure-common.go index d563868e9..895d651c0 100644 --- a/cmd/erasure-common.go +++ b/cmd/erasure-common.go @@ -38,7 +38,7 @@ func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) { // Based on the random shuffling return back randomized disks. for _, i := range hashOrder(UTCNow().String(), len(disks)) { if disks[i-1] != nil && disks[i-1].IsLocal() { - if !disks[i-1].Healing() && disks[i-1].IsOnline() { + if disks[i-1].Healing() == nil && disks[i-1].IsOnline() { newDisks = append(newDisks, disks[i-1]) } } diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index a812a3ab2..44a0cbdbb 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -239,13 +239,6 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s storageDisks := er.getDisks() storageEndpoints := er.getEndpoints() - // List of disks having latest version of the object er.meta - // (by modtime). - latestDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) - - // List of disks having all parts as per latest er.meta. - availableDisks, dataErrs := disksWithAllParts(ctx, latestDisks, partsMetadata, errs, bucket, object, scanMode) - // Initialize heal result object result = madmin.HealResultItem{ Type: madmin.HealItemObject, @@ -256,6 +249,21 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s DataBlocks: len(storageDisks) - er.defaultParityCount, } + if !opts.NoLock { + lk := er.NewNSLock(bucket, object) + if ctx, err = lk.GetLock(ctx, globalOperationTimeout); err != nil { + return result, err + } + defer lk.Unlock() + } + + // List of disks having latest version of the object er.meta + // (by modtime). + latestDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) + + // List of disks having all parts as per latest er.meta. + availableDisks, dataErrs := disksWithAllParts(ctx, latestDisks, partsMetadata, errs, bucket, object, scanMode) + // Loop to find number of disks with valid data, per-drive // data state and a list of outdated disks on which data needs // to be healed. diff --git a/cmd/erasure-metadata-utils_test.go b/cmd/erasure-metadata-utils_test.go index 44184f208..c74b6f711 100644 --- a/cmd/erasure-metadata-utils_test.go +++ b/cmd/erasure-metadata-utils_test.go @@ -18,6 +18,9 @@ package cmd import ( "context" + "encoding/hex" + "fmt" + "math/rand" "reflect" "testing" ) @@ -199,3 +202,22 @@ func TestEvalDisks(t *testing.T) { z := objLayer.(*erasureServerPools) testShuffleDisks(t, z) } + +func Test_hashOrder(t *testing.T) { + for x := 1; x < 17; x++ { + t.Run(fmt.Sprintf("%d", x), func(t *testing.T) { + var first [17]int + rng := rand.New(rand.NewSource(0)) + var tmp [16]byte + rng.Read(tmp[:]) + prefix := hex.EncodeToString(tmp[:]) + for i := 0; i < 10000; i++ { + rng.Read(tmp[:]) + + y := hashOrder(fmt.Sprintf("%s/%x", prefix, hex.EncodeToString(tmp[:3])), x) + first[y[0]]++ + } + t.Log("first:", first[:x]) + }) + } +} diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 3750af2d8..1a764d50c 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -106,11 +106,10 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ return nil, fmt.Errorf("All serverPools should have same deployment ID expected %s, got %s", deploymentID, formats[i].ID) } - z.serverPools[i], err = newErasureSets(ctx, ep.Endpoints, storageDisks[i], formats[i], commonParityDrives) + z.serverPools[i], err = newErasureSets(ctx, ep.Endpoints, storageDisks[i], formats[i], commonParityDrives, i) if err != nil { return nil, err } - z.serverPools[i].poolNumber = i } ctx, z.shutdown = context.WithCancel(ctx) go intDataUpdateTracker.start(ctx, localDrives...) @@ -311,8 +310,8 @@ func (z *erasureServerPools) Shutdown(ctx context.Context) error { return nil } -func (z *erasureServerPools) BackendInfo() (b BackendInfo) { - b.Type = BackendErasure +func (z *erasureServerPools) BackendInfo() (b madmin.BackendInfo) { + b.Type = madmin.Erasure scParity := globalStorageClass.GetParityForSC(storageclass.STANDARD) if scParity <= 0 { @@ -1519,25 +1518,6 @@ func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix str func (z *erasureServerPools) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) { object = encodeDirObject(object) - var err error - lk := z.NewNSLock(bucket, object) - if bucket == minioMetaBucket { - // For .minio.sys bucket heals we should hold write locks. - ctx, err = lk.GetLock(ctx, globalOperationTimeout) - if err != nil { - return madmin.HealResultItem{}, err - } - defer lk.Unlock() - } else { - // Lock the object before healing. Use read lock since healing - // will only regenerate parts & xl.meta of outdated disks. - ctx, err = lk.GetRLock(ctx, globalOperationTimeout) - if err != nil { - return madmin.HealResultItem{}, err - } - defer lk.RUnlock() - } - for _, pool := range z.serverPools { result, err := pool.HealObject(ctx, bucket, object, versionID, opts) result.Object = decodeDirObject(result.Object) @@ -1568,18 +1548,18 @@ func (z *erasureServerPools) GetMetrics(ctx context.Context) (*BackendMetrics, e return &BackendMetrics{}, NotImplemented{} } -func (z *erasureServerPools) getPoolAndSet(id string) (int, int, error) { +func (z *erasureServerPools) getPoolAndSet(id string) (poolIdx, setIdx, diskIdx int, err error) { for poolIdx := range z.serverPools { format := z.serverPools[poolIdx].format for setIdx, set := range format.Erasure.Sets { - for _, diskID := range set { + for i, diskID := range set { if diskID == id { - return poolIdx, setIdx, nil + return poolIdx, setIdx, i, nil } } } } - return 0, 0, fmt.Errorf("DiskID(%s) %w", id, errDiskNotFound) + return -1, -1, -1, fmt.Errorf("DiskID(%s) %w", id, errDiskNotFound) } // HealthOptions takes input options to return sepcific information @@ -1609,7 +1589,7 @@ func (z *erasureServerPools) ReadHealth(ctx context.Context) bool { for _, localDiskIDs := range diskIDs { for _, id := range localDiskIDs { - poolIdx, setIdx, err := z.getPoolAndSet(id) + poolIdx, setIdx, _, err := z.getPoolAndSet(id) if err != nil { logger.LogIf(ctx, err) continue @@ -1648,7 +1628,7 @@ func (z *erasureServerPools) Health(ctx context.Context, opts HealthOptions) Hea for _, localDiskIDs := range diskIDs { for _, id := range localDiskIDs { - poolIdx, setIdx, err := z.getPoolAndSet(id) + poolIdx, setIdx, _, err := z.getPoolAndSet(id) if err != nil { logger.LogIf(ctx, err) continue @@ -1671,7 +1651,7 @@ func (z *erasureServerPools) Health(ctx context.Context, opts HealthOptions) Hea // we need to tell healthy status as 'false' so that this server // is not taken down for maintenance var err error - aggHealStateResult, err = getAggregatedBackgroundHealState(ctx) + aggHealStateResult, err = getAggregatedBackgroundHealState(ctx, nil) if err != nil { logger.LogIf(logger.SetReqInfo(ctx, reqInfo), fmt.Errorf("Unable to verify global heal status: %w", err)) return HealthResult{ diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index ac164db04..ff22ce894 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -83,7 +83,7 @@ type erasureSets struct { setCount, setDriveCount int defaultParityCount int - poolNumber int + poolIndex int disksConnectEvent chan diskConnectInfo @@ -217,7 +217,7 @@ func (s *erasureSets) connectDisks() { } return } - if disk.IsLocal() && disk.Healing() { + if disk.IsLocal() && disk.Healing() != nil { globalBackgroundHealState.pushHealLocalDisks(disk.Endpoint()) logger.Info(fmt.Sprintf("Found the drive %s that needs healing, attempting to heal...", disk)) } @@ -246,6 +246,7 @@ func (s *erasureSets) connectDisks() { disk.SetDiskID(format.Erasure.This) s.erasureDisks[setIndex][diskIndex] = disk } + disk.SetDiskLoc(s.poolIndex, setIndex, diskIndex) s.endpointStrings[setIndex*s.setDriveCount+diskIndex] = disk.String() s.erasureDisksMu.Unlock() go func(setIndex int) { @@ -331,7 +332,7 @@ func (s *erasureSets) GetDisks(setIndex int) func() []StorageAPI { const defaultMonitorConnectEndpointInterval = defaultMonitorNewDiskInterval + time.Second*5 // Initialize new set of erasure coded sets. -func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []StorageAPI, format *formatErasureV3, defaultParityCount int) (*erasureSets, error) { +func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []StorageAPI, format *formatErasureV3, defaultParityCount, poolIdx int) (*erasureSets, error) { setCount := len(format.Erasure.Sets) setDriveCount := len(format.Erasure.Sets[0]) @@ -353,6 +354,7 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto distributionAlgo: format.Erasure.DistributionAlgo, deploymentID: uuid.MustParse(format.ID), mrfOperations: make(map[healSource]int), + poolIndex: poolIdx, } mutex := newNSLock(globalIsDistErasure) @@ -398,13 +400,15 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto if err != nil { continue } + disk.SetDiskLoc(s.poolIndex, m, n) s.endpointStrings[m*setDriveCount+n] = disk.String() s.erasureDisks[m][n] = disk } // Initialize erasure objects for a given set. s.sets[i] = &erasureObjects{ - setNumber: i, + setIndex: i, + poolIndex: poolIdx, setDriveCount: setDriveCount, defaultParityCount: defaultParityCount, getDisks: s.GetDisks(i), @@ -480,7 +484,7 @@ type auditObjectOp struct { Disks []string `json:"disks"` } -func auditObjectErasureSet(ctx context.Context, object string, set *erasureObjects, poolNum int) { +func auditObjectErasureSet(ctx context.Context, object string, set *erasureObjects) { if len(logger.AuditTargets) == 0 { return } @@ -488,8 +492,8 @@ func auditObjectErasureSet(ctx context.Context, object string, set *erasureObjec object = decodeDirObject(object) op := auditObjectOp{ - Pool: poolNum + 1, - Set: set.setNumber + 1, + Pool: set.poolIndex + 1, + Set: set.setIndex + 1, Disks: set.getEndpoints(), } @@ -537,7 +541,7 @@ func (s *erasureSets) StorageUsageInfo(ctx context.Context) StorageInfo { storageUsageInfo := func() StorageInfo { var storageInfo StorageInfo storageInfos := make([]StorageInfo, len(s.sets)) - storageInfo.Backend.Type = BackendErasure + storageInfo.Backend.Type = madmin.Erasure g := errgroup.WithNErrs(len(s.sets)) for index := range s.sets { @@ -572,9 +576,9 @@ func (s *erasureSets) StorageUsageInfo(ctx context.Context) StorageInfo { // StorageInfo - combines output of StorageInfo across all erasure coded object sets. func (s *erasureSets) StorageInfo(ctx context.Context) (StorageInfo, []error) { - var storageInfo StorageInfo + var storageInfo madmin.StorageInfo - storageInfos := make([]StorageInfo, len(s.sets)) + storageInfos := make([]madmin.StorageInfo, len(s.sets)) storageInfoErrs := make([][]error, len(s.sets)) g := errgroup.WithNErrs(len(s.sets)) @@ -839,7 +843,7 @@ func (s *erasureSets) ListBuckets(ctx context.Context) (buckets []BucketInfo, er // GetObjectNInfo - returns object info and locked object ReadCloser func (s *erasureSets) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) { set := s.getHashedSet(object) - auditObjectErasureSet(ctx, object, set, s.poolNumber) + auditObjectErasureSet(ctx, object, set) return set.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts) } @@ -853,7 +857,7 @@ func (s *erasureSets) parentDirIsObject(ctx context.Context, bucket, parent stri // PutObject - writes an object to hashedSet based on the object name. func (s *erasureSets) PutObject(ctx context.Context, bucket string, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) { set := s.getHashedSet(object) - auditObjectErasureSet(ctx, object, set, s.poolNumber) + auditObjectErasureSet(ctx, object, set) opts.ParentIsObject = s.parentDirIsObject return set.PutObject(ctx, bucket, object, data, opts) } @@ -861,14 +865,14 @@ func (s *erasureSets) PutObject(ctx context.Context, bucket string, object strin // GetObjectInfo - reads object metadata from the hashedSet based on the object name. func (s *erasureSets) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { set := s.getHashedSet(object) - auditObjectErasureSet(ctx, object, set, s.poolNumber) + auditObjectErasureSet(ctx, object, set) return set.GetObjectInfo(ctx, bucket, object, opts) } // DeleteObject - deletes an object from the hashedSet based on the object name. func (s *erasureSets) DeleteObject(ctx context.Context, bucket string, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { set := s.getHashedSet(object) - auditObjectErasureSet(ctx, object, set, s.poolNumber) + auditObjectErasureSet(ctx, object, set) return set.DeleteObject(ctx, bucket, object, opts) } @@ -920,7 +924,7 @@ func (s *erasureSets) DeleteObjects(ctx context.Context, bucket string, objects delErrs[obj.origIndex] = errs[i] delObjects[obj.origIndex] = dobjects[i] if errs[i] == nil { - auditObjectErasureSet(ctx, obj.object.ObjectName, set, s.poolNumber) + auditObjectErasureSet(ctx, obj.object.ObjectName, set) } } } @@ -933,7 +937,7 @@ func (s *erasureSets) CopyObject(ctx context.Context, srcBucket, srcObject, dstB srcSet := s.getHashedSet(srcObject) dstSet := s.getHashedSet(dstObject) - auditObjectErasureSet(ctx, dstObject, dstSet, s.poolNumber) + auditObjectErasureSet(ctx, dstObject, dstSet) cpSrcDstSame := srcSet == dstSet // Check if this request is only metadata update. @@ -1117,14 +1121,14 @@ func (s *erasureSets) ListMultipartUploads(ctx context.Context, bucket, prefix, // In list multipart uploads we are going to treat input prefix as the object, // this means that we are not supporting directory navigation. set := s.getHashedSet(prefix) - auditObjectErasureSet(ctx, prefix, set, s.poolNumber) + auditObjectErasureSet(ctx, prefix, set) return set.ListMultipartUploads(ctx, bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) } // Initiate a new multipart upload on a hashedSet based on object name. func (s *erasureSets) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (uploadID string, err error) { set := s.getHashedSet(object) - auditObjectErasureSet(ctx, object, set, s.poolNumber) + auditObjectErasureSet(ctx, object, set) return set.NewMultipartUpload(ctx, bucket, object, opts) } @@ -1132,42 +1136,42 @@ func (s *erasureSets) NewMultipartUpload(ctx context.Context, bucket, object str func (s *erasureSets) CopyObjectPart(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, uploadID string, partID int, startOffset int64, length int64, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (partInfo PartInfo, err error) { destSet := s.getHashedSet(destObject) - auditObjectErasureSet(ctx, destObject, destSet, s.poolNumber) + auditObjectErasureSet(ctx, destObject, destSet) return destSet.PutObjectPart(ctx, destBucket, destObject, uploadID, partID, NewPutObjReader(srcInfo.Reader), dstOpts) } // PutObjectPart - writes part of an object to hashedSet based on the object name. func (s *erasureSets) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *PutObjReader, opts ObjectOptions) (info PartInfo, err error) { set := s.getHashedSet(object) - auditObjectErasureSet(ctx, object, set, s.poolNumber) + auditObjectErasureSet(ctx, object, set) return set.PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts) } // GetMultipartInfo - return multipart metadata info uploaded at hashedSet. func (s *erasureSets) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (result MultipartInfo, err error) { set := s.getHashedSet(object) - auditObjectErasureSet(ctx, object, set, s.poolNumber) + auditObjectErasureSet(ctx, object, set) return set.GetMultipartInfo(ctx, bucket, object, uploadID, opts) } // ListObjectParts - lists all uploaded parts to an object in hashedSet. func (s *erasureSets) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) { set := s.getHashedSet(object) - auditObjectErasureSet(ctx, object, set, s.poolNumber) + auditObjectErasureSet(ctx, object, set) return set.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts) } // Aborts an in-progress multipart operation on hashedSet based on the object name. func (s *erasureSets) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error { set := s.getHashedSet(object) - auditObjectErasureSet(ctx, object, set, s.poolNumber) + auditObjectErasureSet(ctx, object, set) return set.AbortMultipartUpload(ctx, bucket, object, uploadID, opts) } // CompleteMultipartUpload - completes a pending multipart transaction, on hashedSet based on object name. func (s *erasureSets) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error) { set := s.getHashedSet(object) - auditObjectErasureSet(ctx, object, set, s.poolNumber) + auditObjectErasureSet(ctx, object, set) opts.ParentIsObject = s.parentDirIsObject return set.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts) } @@ -1351,8 +1355,8 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H res.Before.Drives = make([]madmin.HealDriveInfo, len(beforeDrives)) // Copy "after" drive state too from before. for k, v := range beforeDrives { - res.Before.Drives[k] = madmin.HealDriveInfo(v) - res.After.Drives[k] = madmin.HealDriveInfo(v) + res.Before.Drives[k] = v + res.After.Drives[k] = v } if countErrs(sErrs, errUnformattedDisk) == 0 { @@ -1395,7 +1399,7 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H if s.erasureDisks[m][n] != nil { s.erasureDisks[m][n].Close() } - + storageDisks[index].SetDiskLoc(s.poolIndex, m, n) s.erasureDisks[m][n] = storageDisks[index] s.endpointStrings[m*s.setDriveCount+n] = storageDisks[index].String() } diff --git a/cmd/erasure-sets_test.go b/cmd/erasure-sets_test.go index d1ef54295..7cceb971b 100644 --- a/cmd/erasure-sets_test.go +++ b/cmd/erasure-sets_test.go @@ -189,7 +189,7 @@ func TestNewErasureSets(t *testing.T) { t.Fatalf("Unable to format disks for erasure, %s", err) } - if _, err := newErasureSets(ctx, endpoints, storageDisks, format, ecDrivesNoConfig(16)); err != nil { + if _, err := newErasureSets(ctx, endpoints, storageDisks, format, ecDrivesNoConfig(16), 0); err != nil { t.Fatalf("Unable to initialize erasure") } } diff --git a/cmd/erasure.go b/cmd/erasure.go index b4ca287c6..8b220333c 100644 --- a/cmd/erasure.go +++ b/cmd/erasure.go @@ -53,7 +53,8 @@ type erasureObjects struct { setDriveCount int defaultParityCount int - setNumber int + setIndex int + poolIndex int // getDisks returns list of storageAPIs. getDisks func() []StorageAPI @@ -186,7 +187,7 @@ func getDisksInfo(disks []StorageAPI, endpoints []string) (disksInfo []madmin.Di } info, err := disks[index].DiskInfo(context.TODO()) di := madmin.Disk{ - Endpoint: endpoints[index], + Endpoint: info.Endpoint, DrivePath: info.MountPath, TotalSpace: info.Total, UsedSpace: info.Used, @@ -196,6 +197,13 @@ func getDisksInfo(disks []StorageAPI, endpoints []string) (disksInfo []madmin.Di Healing: info.Healing, State: diskErrToDriveState(err), } + di.PoolIndex, di.SetIndex, di.DiskIndex = disks[index].GetDiskLoc() + if info.Healing { + if hi := disks[index].Healing(); hi != nil { + hd := hi.toHealingDisk() + di.HealInfo = &hd + } + } if info.Total > 0 { di.Utilization = float64(info.Used / info.Total * 100) } @@ -218,7 +226,7 @@ func getStorageInfo(disks []StorageAPI, endpoints []string) (StorageInfo, []erro Disks: disksInfo, } - storageInfo.Backend.Type = BackendErasure + storageInfo.Backend.Type = madmin.Erasure return storageInfo, errs } diff --git a/cmd/format-erasure.go b/cmd/format-erasure.go index 6220d8802..06cb03847 100644 --- a/cmd/format-erasure.go +++ b/cmd/format-erasure.go @@ -340,19 +340,6 @@ func loadFormatErasureAll(storageDisks []StorageAPI, heal bool) ([]*formatErasur return formats, g.Wait() } -func saveHealingTracker(disk StorageAPI, diskID string) error { - htracker := healingTracker{ - ID: diskID, - } - htrackerBytes, err := htracker.MarshalMsg(nil) - if err != nil { - return err - } - return disk.WriteAll(context.TODO(), minioMetaBucket, - pathJoin(bucketMetaPrefix, slashSeparator, healingTrackerFilename), - htrackerBytes) -} - func saveFormatErasure(disk StorageAPI, format *formatErasureV3, heal bool) error { if disk == nil || format == nil { return errDiskNotFound @@ -387,7 +374,9 @@ func saveFormatErasure(disk StorageAPI, format *formatErasureV3, heal bool) erro disk.SetDiskID(diskID) if heal { - return saveHealingTracker(disk, diskID) + ctx := context.Background() + ht := newHealingTracker(disk) + return ht.save(ctx) } return nil } diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index d4a608543..1249f7646 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -202,8 +202,8 @@ func (fs *FSObjects) Shutdown(ctx context.Context) error { } // BackendInfo - returns backend information -func (fs *FSObjects) BackendInfo() BackendInfo { - return BackendInfo{Type: BackendFS} +func (fs *FSObjects) BackendInfo() madmin.BackendInfo { + return madmin.BackendInfo{Type: madmin.FS} } // LocalStorageInfo - returns underlying storage statistics. @@ -232,7 +232,7 @@ func (fs *FSObjects) StorageInfo(ctx context.Context) (StorageInfo, []error) { }, }, } - storageInfo.Backend.Type = BackendFS + storageInfo.Backend.Type = madmin.FS return storageInfo, nil } diff --git a/cmd/gateway-unsupported.go b/cmd/gateway-unsupported.go index ee30ad434..6fcd7a183 100644 --- a/cmd/gateway-unsupported.go +++ b/cmd/gateway-unsupported.go @@ -35,8 +35,8 @@ import ( type GatewayUnsupported struct{} // BackendInfo returns the underlying backend information -func (a GatewayUnsupported) BackendInfo() BackendInfo { - return BackendInfo{Type: BackendGateway} +func (a GatewayUnsupported) BackendInfo() madmin.BackendInfo { + return madmin.BackendInfo{Type: madmin.Gateway} } // LocalStorageInfo returns the local disks information, mainly used diff --git a/cmd/gateway/azure/gateway-azure.go b/cmd/gateway/azure/gateway-azure.go index 2bb56511b..585885da8 100644 --- a/cmd/gateway/azure/gateway-azure.go +++ b/cmd/gateway/azure/gateway-azure.go @@ -36,19 +36,18 @@ import ( "strings" "time" - "github.com/minio/minio/pkg/env" - "github.com/Azure/azure-pipeline-go/pipeline" "github.com/Azure/azure-storage-blob-go/azblob" humanize "github.com/dustin/go-humanize" "github.com/minio/cli" miniogopolicy "github.com/minio/minio-go/v7/pkg/policy" + minio "github.com/minio/minio/cmd" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/bucket/policy" "github.com/minio/minio/pkg/bucket/policy/condition" - - minio "github.com/minio/minio/cmd" + "github.com/minio/minio/pkg/env" + "github.com/minio/minio/pkg/madmin" ) const ( @@ -545,7 +544,7 @@ func (a *azureObjects) Shutdown(ctx context.Context) error { // StorageInfo - Not relevant to Azure backend. func (a *azureObjects) StorageInfo(ctx context.Context) (si minio.StorageInfo, _ []error) { - si.Backend.Type = minio.BackendGateway + si.Backend.Type = madmin.Gateway host := a.endpoint.Host if a.endpoint.Port() == "" { host = a.endpoint.Host + ":" + a.endpoint.Scheme diff --git a/cmd/gateway/gcs/gateway-gcs.go b/cmd/gateway/gcs/gateway-gcs.go index 573078a11..fd3859ae5 100644 --- a/cmd/gateway/gcs/gateway-gcs.go +++ b/cmd/gateway/gcs/gateway-gcs.go @@ -29,9 +29,8 @@ import ( "net/http" "os" "path" - "strconv" - "regexp" + "strconv" "strings" "time" @@ -39,17 +38,16 @@ import ( humanize "github.com/dustin/go-humanize" "github.com/minio/cli" miniogopolicy "github.com/minio/minio-go/v7/pkg/policy" + minio "github.com/minio/minio/cmd" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/bucket/policy" "github.com/minio/minio/pkg/bucket/policy/condition" "github.com/minio/minio/pkg/env" - + "github.com/minio/minio/pkg/madmin" "google.golang.org/api/googleapi" "google.golang.org/api/iterator" "google.golang.org/api/option" - - minio "github.com/minio/minio/cmd" ) var ( @@ -413,7 +411,7 @@ func (l *gcsGateway) Shutdown(ctx context.Context) error { // StorageInfo - Not relevant to GCS backend. func (l *gcsGateway) StorageInfo(ctx context.Context) (si minio.StorageInfo, _ []error) { - si.Backend.Type = minio.BackendGateway + si.Backend.Type = madmin.Gateway si.Backend.GatewayOnline = minio.IsBackendOnline(ctx, "storage.googleapis.com:443") return si, nil } diff --git a/cmd/gateway/hdfs/gateway-hdfs.go b/cmd/gateway/hdfs/gateway-hdfs.go index 2ed894b24..0070ce644 100644 --- a/cmd/gateway/hdfs/gateway-hdfs.go +++ b/cmd/gateway/hdfs/gateway-hdfs.go @@ -244,7 +244,7 @@ func (n *hdfsObjects) StorageInfo(ctx context.Context) (si minio.StorageInfo, er si.Disks = []madmin.Disk{{ UsedSpace: fsInfo.Used, }} - si.Backend.Type = minio.BackendGateway + si.Backend.Type = madmin.Gateway si.Backend.GatewayOnline = true return si, nil } diff --git a/cmd/gateway/nas/gateway-nas.go b/cmd/gateway/nas/gateway-nas.go index a9b95856d..48ddb03fd 100644 --- a/cmd/gateway/nas/gateway-nas.go +++ b/cmd/gateway/nas/gateway-nas.go @@ -22,6 +22,7 @@ import ( "github.com/minio/cli" minio "github.com/minio/minio/cmd" "github.com/minio/minio/pkg/auth" + "github.com/minio/minio/pkg/madmin" ) func init() { @@ -106,8 +107,8 @@ func (n *nasObjects) IsListenSupported() bool { func (n *nasObjects) StorageInfo(ctx context.Context) (si minio.StorageInfo, _ []error) { si, errs := n.ObjectLayer.StorageInfo(ctx) - si.Backend.GatewayOnline = si.Backend.Type == minio.BackendFS - si.Backend.Type = minio.BackendGateway + si.Backend.GatewayOnline = si.Backend.Type == madmin.FS + si.Backend.Type = madmin.Gateway return si, errs } diff --git a/cmd/gateway/s3/gateway-s3.go b/cmd/gateway/s3/gateway-s3.go index 72eacd73e..28f8a9234 100644 --- a/cmd/gateway/s3/gateway-s3.go +++ b/cmd/gateway/s3/gateway-s3.go @@ -29,15 +29,15 @@ import ( "github.com/minio/cli" miniogo "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" - "github.com/minio/minio-go/v7/pkg/tags" - minio "github.com/minio/minio/cmd" - "github.com/minio/minio-go/v7/pkg/encrypt" "github.com/minio/minio-go/v7/pkg/s3utils" + "github.com/minio/minio-go/v7/pkg/tags" + minio "github.com/minio/minio/cmd" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/bucket/policy" + "github.com/minio/minio/pkg/madmin" ) func init() { @@ -276,7 +276,7 @@ func (l *s3Objects) Shutdown(ctx context.Context) error { // StorageInfo is not relevant to S3 backend. func (l *s3Objects) StorageInfo(ctx context.Context) (si minio.StorageInfo, _ []error) { - si.Backend.Type = minio.BackendGateway + si.Backend.Type = madmin.Gateway host := l.Client.EndpointURL().Host if l.Client.EndpointURL().Port() == "" { host = l.Client.EndpointURL().Host + ":" + l.Client.EndpointURL().Scheme diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 343dff47a..49a877c86 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -19,6 +19,8 @@ package cmd import ( "context" "errors" + "fmt" + "sort" "time" "github.com/minio/minio/cmd/logger" @@ -64,7 +66,8 @@ func newBgHealSequence() *healSequence { } } -func getLocalBackgroundHealStatus() (madmin.BgHealState, bool) { +// getBackgroundHealStatus will return the +func getBackgroundHealStatus(ctx context.Context, o ObjectLayer) (madmin.BgHealState, bool) { if globalBackgroundHealState == nil { return madmin.BgHealState{}, false } @@ -78,24 +81,51 @@ func getLocalBackgroundHealStatus() (madmin.BgHealState, bool) { for _, ep := range getLocalDisksToHeal() { healDisksMap[ep.String()] = struct{}{} } - - for _, ep := range globalBackgroundHealState.getHealLocalDisks() { - if _, ok := healDisksMap[ep.String()]; !ok { - healDisksMap[ep.String()] = struct{}{} - } - } - - var healDisks []string - for disk := range healDisksMap { - healDisks = append(healDisks, disk) - } - - return madmin.BgHealState{ + status := madmin.BgHealState{ ScannedItemsCount: bgSeq.getScannedItemsCount(), - LastHealActivity: bgSeq.lastHealActivity, - HealDisks: healDisks, - NextHealRound: UTCNow(), - }, true + } + + if o == nil { + healing := globalBackgroundHealState.getLocalHealingDisks() + for _, disk := range healing { + status.HealDisks = append(status.HealDisks, disk.Endpoint) + } + + return status, true + } + + // ignores any errors here. + si, _ := o.StorageInfo(ctx) + + indexed := make(map[string][]madmin.Disk) + for _, disk := range si.Disks { + setIdx := fmt.Sprintf("%d-%d", disk.PoolIndex, disk.SetIndex) + indexed[setIdx] = append(indexed[setIdx], disk) + } + + for id, disks := range indexed { + ss := madmin.SetStatus{ + ID: id, + SetIndex: disks[0].SetIndex, + PoolIndex: disks[0].PoolIndex, + } + for _, disk := range disks { + ss.Disks = append(ss.Disks, disk) + if disk.Healing { + ss.HealStatus = "Healing" + ss.HealPriority = "high" + status.HealDisks = append(status.HealDisks, disk.Endpoint) + } + } + sortDisks(ss.Disks) + status.Sets = append(status.Sets, ss) + } + sort.Slice(status.Sets, func(i, j int) bool { + return status.Sets[i].ID < status.Sets[j].ID + }) + + return status, true + } func mustGetHealSequence(ctx context.Context) *healSequence { @@ -120,7 +150,7 @@ func mustGetHealSequence(ctx context.Context) *healSequence { } // healErasureSet lists and heals all objects in a specific erasure set -func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketInfo) error { +func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketInfo, tracker *healingTracker) error { bgSeq := mustGetHealSequence(ctx) buckets = append(buckets, BucketInfo{ Name: pathJoin(minioMetaBucket, minioConfigPrefix), @@ -135,6 +165,21 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn // Heal all buckets with all objects for _, bucket := range buckets { + if tracker.isHealed(bucket.Name) { + continue + } + var forwardTo string + // If we resume to the same bucket, forward to last known item. + if tracker.Bucket != "" { + if tracker.Bucket == bucket.Name { + forwardTo = tracker.Bucket + } else { + // Reset to where last bucket ended if resuming. + tracker.resume() + } + } + tracker.Object = "" + tracker.Bucket = bucket.Name // Heal current bucket if _, err := er.HealBucket(ctx, bucket.Name, madmin.HealOpts{}); err != nil { if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { @@ -143,7 +188,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn } if serverDebugLog { - console.Debugf(color.Green("healDisk:")+" healing bucket %s content on erasure set %d\n", bucket.Name, er.setNumber+1) + console.Debugf(color.Green("healDisk:")+" healing bucket %s content on erasure set %d\n", bucket.Name, tracker.SetIndex+1) } disks, _ := er.getOnlineDisksWithHealing() @@ -167,17 +212,27 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn for _, version := range fivs.Versions { if _, err := er.HealObject(ctx, bucket.Name, version.Name, version.VersionID, madmin.HealOpts{ScanMode: madmin.HealNormalScan, Remove: healDeleteDangling}); err != nil { if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { + // If not deleted, assume they failed. + tracker.ObjectsFailed++ + tracker.BytesFailed += uint64(version.Size) logger.LogIf(ctx, err) } + } else { + tracker.ObjectsHealed++ + tracker.BytesDone += uint64(version.Size) } bgSeq.logHeal(madmin.HealItemObject) } + tracker.Object = entry.name + if time.Since(tracker.LastUpdate) > time.Minute { + logger.LogIf(ctx, tracker.update(ctx)) + } } err := listPathRaw(ctx, listPathRawOptions{ disks: disks, bucket: bucket.Name, recursive: true, - forwardTo: "", //TODO(klauspost): Set this to last known offset when resuming. + forwardTo: forwardTo, minDisks: 1, reportNotFound: false, agreed: healEntry, @@ -189,8 +244,18 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn }, finished: nil, }) - logger.LogIf(ctx, err) + select { + // If context is canceled don't mark as done... + case <-ctx.Done(): + return ctx.Err() + default: + logger.LogIf(ctx, err) + tracker.bucketDone(bucket.Name) + logger.LogIf(ctx, tracker.update(ctx)) + } } + tracker.Object = "" + tracker.Bucket = "" return nil } diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index dfda50871..487aeb37c 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -66,7 +66,7 @@ func (d *naughtyDisk) Hostname() string { return d.disk.Hostname() } -func (d *naughtyDisk) Healing() bool { +func (d *naughtyDisk) Healing() *healingTracker { return d.disk.Healing() } @@ -90,6 +90,12 @@ func (d *naughtyDisk) calcError() (err error) { return nil } +func (d *naughtyDisk) GetDiskLoc() (poolIdx, setIdx, diskIdx int) { + return -1, -1, -1 +} + +func (d *naughtyDisk) SetDiskLoc(poolIdx, setIdx, diskIdx int) {} + func (d *naughtyDisk) GetDiskID() (string, error) { return d.disk.GetDiskID() } diff --git a/cmd/object-api-datatypes.go b/cmd/object-api-datatypes.go index a7b4931b8..d2d20612a 100644 --- a/cmd/object-api-datatypes.go +++ b/cmd/object-api-datatypes.go @@ -32,36 +32,18 @@ type BackendType int // Enum for different backend types. const ( - Unknown BackendType = iota + Unknown = BackendType(madmin.Unknown) // Filesystem backend. - BackendFS + BackendFS = BackendType(madmin.FS) // Multi disk BackendErasure (single, distributed) backend. - BackendErasure + BackendErasure = BackendType(madmin.Erasure) // Gateway backend. - BackendGateway + BackendGateway = BackendType(madmin.Gateway) // Add your own backend. ) -// BackendInfo - contains info of the underlying backend -type BackendInfo struct { - // Represents various backend types, currently on FS, Erasure and Gateway - Type BackendType - - // Following fields are only meaningful if BackendType is Gateway. - GatewayOnline bool - - // Following fields are only meaningful if BackendType is Erasure. - StandardSCData []int // Data disks for currently configured Standard storage class. - StandardSCParity int // Parity disks for currently configured Standard storage class. - RRSCData []int // Data disks for currently configured Reduced Redundancy storage class. - RRSCParity int // Parity disks for currently configured Reduced Redundancy storage class. -} - // StorageInfo - represents total capacity of underlying storage. -type StorageInfo struct { - Disks []madmin.Disk - Backend BackendInfo -} +type StorageInfo = madmin.StorageInfo // objectHistogramInterval is an interval that will be // used to report the histogram of objects data sizes diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index e9f2d182a..5e22c1205 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -89,7 +89,7 @@ type ObjectLayer interface { Shutdown(context.Context) error NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error - BackendInfo() BackendInfo + BackendInfo() madmin.BackendInfo StorageInfo(ctx context.Context) (StorageInfo, []error) LocalStorageInfo(ctx context.Context) (StorageInfo, []error) diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index f8b82005a..a9aebd4e9 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -947,10 +947,9 @@ func (s *peerRESTServer) BackgroundHealStatusHandler(w http.ResponseWriter, r *h s.writeErrorResponse(w, errors.New("invalid request")) return } - ctx := newContext(r, w, "BackgroundHealStatus") - state, ok := getLocalBackgroundHealStatus() + state, ok := getBackgroundHealStatus(ctx, newObjectLayerFn()) if !ok { s.writeErrorResponse(w, errServerNotInitialized) return diff --git a/cmd/server-startup-msg.go b/cmd/server-startup-msg.go index ebc5afe87..cb7710195 100644 --- a/cmd/server-startup-msg.go +++ b/cmd/server-startup-msg.go @@ -27,6 +27,7 @@ import ( "github.com/minio/minio/cmd/config" "github.com/minio/minio/cmd/logger" color "github.com/minio/minio/pkg/color" + "github.com/minio/minio/pkg/madmin" xnet "github.com/minio/minio/pkg/net" ) @@ -206,7 +207,7 @@ func getStorageInfoMsg(storageInfo StorageInfo) string { var msg string var mcMessage string onlineDisks, offlineDisks := getOnlineOfflineDisksStats(storageInfo.Disks) - if storageInfo.Backend.Type == BackendErasure { + if storageInfo.Backend.Type == madmin.Erasure { if offlineDisks.Sum() > 0 { mcMessage = "Use `mc admin info` to look for latest server/disk info\n" } diff --git a/cmd/server-startup-msg_test.go b/cmd/server-startup-msg_test.go index a9e7b2516..ec66cf366 100644 --- a/cmd/server-startup-msg_test.go +++ b/cmd/server-startup-msg_test.go @@ -38,7 +38,7 @@ func TestStorageInfoMsg(t *testing.T) { {Endpoint: "http://127.0.0.1:9001/data/3/", State: madmin.DriveStateOk}, {Endpoint: "http://127.0.0.1:9001/data/4/", State: madmin.DriveStateOffline}, } - infoStorage.Backend.Type = BackendErasure + infoStorage.Backend.Type = madmin.Erasure if msg := getStorageInfoMsg(infoStorage); !strings.Contains(msg, "7 Online, 1 Offline") { t.Fatal("Unexpected storage info message, found:", msg) diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 8047d4552..220cf3d30 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -36,7 +36,7 @@ type StorageAPI interface { Close() error GetDiskID() (string, error) SetDiskID(id string) - Healing() bool // Returns if disk is healing. + Healing() *healingTracker // Returns nil if disk is not healing. DiskInfo(ctx context.Context) (info DiskInfo, err error) NSScanner(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) @@ -79,6 +79,9 @@ type StorageAPI interface { // Read all. ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error) + + GetDiskLoc() (poolIdx, setIdx, diskIdx int) // Retrieve location indexes. + SetDiskLoc(poolIdx, setIdx, diskIdx int) // Set location indexes. } // storageReader is an io.Reader view of a disk diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 5bca3f3f6..0d225bac5 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -29,6 +29,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/minio/minio/cmd/http" xhttp "github.com/minio/minio/cmd/http" @@ -119,6 +120,24 @@ type storageRESTClient struct { endpoint Endpoint restClient *rest.Client diskID string + + // Indexes, will be -1 until assigned a set. + poolIndex, setIndex, diskIndex int + + diskInfoCache timedValue + diskHealCache timedValue +} + +// Retrieve location indexes. +func (client *storageRESTClient) GetDiskLoc() (poolIdx, setIdx, diskIdx int) { + return client.poolIndex, client.setIndex, client.diskIndex +} + +// Set location indexes. +func (client *storageRESTClient) SetDiskLoc(poolIdx, setIdx, diskIdx int) { + client.poolIndex = poolIdx + client.setIndex = setIdx + client.diskIndex = diskIdx } // Wrapper to restClient.Call to handle network errors, in case of network error the connection is makred disconnected @@ -160,14 +179,26 @@ func (client *storageRESTClient) Endpoint() Endpoint { return client.endpoint } -func (client *storageRESTClient) Healing() bool { - // This call should never be called over the network - // this function should always return 'false' - // - // To know if a remote disk is being healed - // perform DiskInfo() call which would return - // back the correct data if disk is being healed. - return false +func (client *storageRESTClient) Healing() *healingTracker { + client.diskHealCache.Once.Do(func() { + // Update at least every second. + client.diskHealCache.TTL = time.Second + client.diskHealCache.Update = func() (interface{}, error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + b, err := client.ReadAll(ctx, minioMetaBucket, + pathJoin(bucketMetaPrefix, healingTrackerFilename)) + if err != nil { + // If error, likely not healing. + return (*healingTracker)(nil), nil + } + var h healingTracker + _, err = h.UnmarshalMsg(b) + return &h, err + } + }) + val, _ := client.diskHealCache.Get() + return val.(*healingTracker) } func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) { @@ -207,18 +238,31 @@ func (client *storageRESTClient) SetDiskID(id string) { // DiskInfo - fetch disk information for a remote disk. func (client *storageRESTClient) DiskInfo(ctx context.Context) (info DiskInfo, err error) { - respBody, err := client.call(ctx, storageRESTMethodDiskInfo, nil, nil, -1) - if err != nil { - return info, err + client.diskInfoCache.Once.Do(func() { + client.diskInfoCache.TTL = time.Second + client.diskInfoCache.Update = func() (interface{}, error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + respBody, err := client.call(ctx, storageRESTMethodDiskInfo, nil, nil, -1) + if err != nil { + return info, err + } + defer http.DrainBody(respBody) + if err = msgp.Decode(respBody, &info); err != nil { + return info, err + } + if info.Error != "" { + return info, toStorageErr(errors.New(info.Error)) + } + return info, nil + } + }) + val, err := client.diskInfoCache.Get() + if err == nil { + info = val.(DiskInfo) } - defer http.DrainBody(respBody) - if err = msgp.Decode(respBody, &info); err != nil { - return info, err - } - if info.Error != "" { - return info, toStorageErr(errors.New(info.Error)) - } - return info, nil + + return info, err } // MakeVolBulk - create multiple volumes in a bulk operation. @@ -651,5 +695,5 @@ func newStorageRESTClient(endpoint Endpoint, healthcheck bool) *storageRESTClien } } - return &storageRESTClient{endpoint: endpoint, restClient: restClient} + return &storageRESTClient{endpoint: endpoint, restClient: restClient, poolIndex: -1, setIndex: -1, diskIndex: -1} } diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index ef4de2a73..908e326e4 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -51,7 +51,7 @@ func (p *xlStorageDiskIDCheck) Hostname() string { return p.storage.Hostname() } -func (p *xlStorageDiskIDCheck) Healing() bool { +func (p *xlStorageDiskIDCheck) Healing() *healingTracker { return p.storage.Healing() } @@ -68,6 +68,14 @@ func (p *xlStorageDiskIDCheck) NSScanner(ctx context.Context, cache dataUsageCac return p.storage.NSScanner(ctx, cache) } +func (p *xlStorageDiskIDCheck) GetDiskLoc() (poolIdx, setIdx, diskIdx int) { + return p.storage.GetDiskLoc() +} + +func (p *xlStorageDiskIDCheck) SetDiskLoc(poolIdx, setIdx, diskIdx int) { + p.storage.SetDiskLoc(poolIdx, setIdx, diskIdx) +} + func (p *xlStorageDiskIDCheck) Close() error { return p.storage.Close() } diff --git a/cmd/xl-storage-format-utils.go b/cmd/xl-storage-format-utils.go index c24a78b91..9a1b284d3 100644 --- a/cmd/xl-storage-format-utils.go +++ b/cmd/xl-storage-format-utils.go @@ -94,3 +94,16 @@ func getFileInfo(xlMetaBuf []byte, volume, path, versionID string) (FileInfo, er fi.XLV1 = true // indicates older version return fi, err } + +// getXLDiskLoc will return the pool/set/disk id if it can be located in the object layer. +// Will return -1 for unknown values. +func getXLDiskLoc(diskID string) (poolIdx, setIdx, diskIdx int) { + if api := newObjectLayerFn(); api != nil { + if ep, ok := api.(*erasureServerPools); ok { + if pool, set, disk, err := ep.getPoolAndSet(diskID); err == nil { + return pool, set, disk + } + } + } + return -1, -1, -1 +} diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 0a3dd6f0d..3e13e65bd 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -105,6 +105,9 @@ type xlStorage struct { diskID string + // Indexes, will be -1 until assigned a set. + poolIndex, setIndex, diskIndex int + formatFileInfo os.FileInfo formatLegacy bool formatLastCheck time.Time @@ -268,6 +271,9 @@ func newXLStorage(ep Endpoint) (*xlStorage, error) { ctx: GlobalContext, rootDisk: rootDisk, readODirectSupported: true, + poolIndex: -1, + setIndex: -1, + diskIndex: -1, } // Create all necessary bucket folders if possible. @@ -346,11 +352,33 @@ func (s *xlStorage) IsLocal() bool { return true } -func (s *xlStorage) Healing() bool { +// Retrieve location indexes. +func (s *xlStorage) GetDiskLoc() (poolIdx, setIdx, diskIdx int) { + // If unset, see if we can locate it. + if s.poolIndex < 0 || s.setIndex < 0 || s.diskIndex < 0 { + return getXLDiskLoc(s.diskID) + } + return s.poolIndex, s.setIndex, s.diskIndex +} + +// Set location indexes. +func (s *xlStorage) SetDiskLoc(poolIdx, setIdx, diskIdx int) { + s.poolIndex = poolIdx + s.setIndex = setIdx + s.diskIndex = diskIdx +} + +func (s *xlStorage) Healing() *healingTracker { healingFile := pathJoin(s.diskPath, minioMetaBucket, bucketMetaPrefix, healingTrackerFilename) - _, err := os.Lstat(healingFile) - return err == nil + b, err := ioutil.ReadFile(healingFile) + if err != nil { + return nil + } + var h healingTracker + _, err = h.UnmarshalMsg(b) + logger.LogIf(GlobalContext, err) + return &h } func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) { @@ -461,7 +489,7 @@ func (s *xlStorage) DiskInfo(context.Context) (info DiskInfo, err error) { } else { // Check if the disk is being healed if GetDiskID // returned any error other than fresh disk - dcinfo.Healing = s.Healing() + dcinfo.Healing = s.Healing() != nil } dcinfo.ID = diskID diff --git a/pkg/madmin/examples/heal-status.go b/pkg/madmin/examples/heal-status.go index ad8e5900d..639420e7f 100644 --- a/pkg/madmin/examples/heal-status.go +++ b/pkg/madmin/examples/heal-status.go @@ -21,6 +21,7 @@ package main import ( "context" + "encoding/json" "log" "github.com/minio/minio/pkg/madmin" @@ -41,6 +42,7 @@ func main() { if err != nil { log.Fatalln(err) } + js, _ := json.MarshalIndent(healStatusResult, "", " ") - log.Printf("Heal status result: %+v\n", healStatusResult) + log.Printf("Heal status result: %s\n", string(js)) } diff --git a/pkg/madmin/examples/server-info.go b/pkg/madmin/examples/server-info.go index 6e54f433c..79f99298a 100644 --- a/pkg/madmin/examples/server-info.go +++ b/pkg/madmin/examples/server-info.go @@ -41,5 +41,5 @@ func main() { if err != nil { log.Fatalln(err) } - log.Println(st) + log.Printf("%+v\n", st) } diff --git a/pkg/madmin/examples/storage-info.go b/pkg/madmin/examples/storage-info.go index 5441e278d..d9ce4c340 100644 --- a/pkg/madmin/examples/storage-info.go +++ b/pkg/madmin/examples/storage-info.go @@ -41,5 +41,5 @@ func main() { if err != nil { log.Fatalln(err) } - log.Println(st) + log.Printf("%+v\n", st) } diff --git a/pkg/madmin/heal-commands.go b/pkg/madmin/heal-commands.go index b49783f70..21455a05d 100644 --- a/pkg/madmin/heal-commands.go +++ b/pkg/madmin/heal-commands.go @@ -24,6 +24,7 @@ import ( "io/ioutil" "net/http" "net/url" + "sort" "time" ) @@ -47,6 +48,7 @@ type HealOpts struct { DryRun bool `json:"dryRun"` Remove bool `json:"remove"` Recreate bool `json:"recreate"` // only used when bucket needs to be healed + NoLock bool `json:"-"` // only used internally. ScanMode HealScanMode `json:"scanMode"` } @@ -298,9 +300,96 @@ func (adm *AdminClient) Heal(ctx context.Context, bucket, prefix string, // BgHealState represents the status of the background heal type BgHealState struct { ScannedItemsCount int64 - LastHealActivity time.Time - NextHealRound time.Time - HealDisks []string + + HealDisks []string + + // SetStatus contains information for each set. + Sets []SetStatus `json:"sets"` +} + +// SetStatus contains information about the heal status of a set. +type SetStatus struct { + ID string `json:"id"` + PoolIndex int `json:"pool_index"` + SetIndex int `json:"set_index"` + HealStatus string `json:"heal_status"` + HealPriority string `json:"heal_priority"` + Disks []Disk `json:"disks"` +} + +// HealingDisk contains information about +type HealingDisk struct { + // Copied from cmd/background-newdisks-heal-ops.go + // When adding new field, update (*healingTracker).toHealingDisk + + ID string `json:"id"` + PoolIndex int `json:"pool_index"` + SetIndex int `json:"set_index"` + DiskIndex int `json:"disk_index"` + Endpoint string `json:"endpoint"` + Path string `json:"path"` + Started time.Time `json:"started"` + LastUpdate time.Time `json:"last_update"` + ObjectsHealed uint64 `json:"objects_healed"` + ObjectsFailed uint64 `json:"objects_failed"` + BytesDone uint64 `json:"bytes_done"` + BytesFailed uint64 `json:"bytes_failed"` + + // Last object scanned. + Bucket string `json:"current_bucket"` + Object string `json:"current_object"` + + // Filled on startup/restarts. + QueuedBuckets []string `json:"queued_buckets"` + + // Filled during heal. + HealedBuckets []string `json:"healed_buckets"` + // future add more tracking capabilities +} + +// Merge others into b. +func (b *BgHealState) Merge(others ...BgHealState) { + for _, other := range others { + b.ScannedItemsCount += other.ScannedItemsCount + if len(b.Sets) == 0 { + b.Sets = make([]SetStatus, len(other.Sets)) + copy(b.Sets, other.Sets) + continue + } + + // Add disk if not present. + // If present select the one with latest lastupdate. + addSet := func(set SetStatus) { + for eSetIdx, existing := range b.Sets { + if existing.ID != set.ID { + continue + } + if len(existing.Disks) < len(set.Disks) { + b.Sets[eSetIdx].Disks = set.Disks + } + if len(existing.Disks) < len(set.Disks) { + return + } + for i, disk := range set.Disks { + // Disks should be the same. + if disk.HealInfo != nil { + existing.Disks[i].HealInfo = disk.HealInfo + } + } + return + } + b.Sets = append(b.Sets, set) + } + for _, disk := range other.Sets { + addSet(disk) + } + } + sort.Slice(b.Sets, func(i, j int) bool { + if b.Sets[i].PoolIndex != b.Sets[j].PoolIndex { + return b.Sets[i].PoolIndex < b.Sets[j].PoolIndex + } + return b.Sets[i].SetIndex < b.Sets[j].SetIndex + }) } // BackgroundHealStatus returns the background heal status of the diff --git a/pkg/madmin/info-commands.go b/pkg/madmin/info-commands.go index c7c8cf5c6..dfa65cebe 100644 --- a/pkg/madmin/info-commands.go +++ b/pkg/madmin/info-commands.go @@ -35,6 +35,8 @@ const ( FS // Multi disk Erasure (single, distributed) backend. Erasure + // Gateway to other storage + Gateway // Add your own backend. ) @@ -57,16 +59,26 @@ type StorageInfo struct { Disks []Disk // Backend type. - Backend struct { - // Represents various backend types, currently on FS and Erasure. - Type BackendType + Backend BackendInfo +} - // Following fields are only meaningful if BackendType is Erasure. - OnlineDisks BackendDisks // Online disks during server startup. - OfflineDisks BackendDisks // Offline disks during server startup. - StandardSCParity int // Parity disks for currently configured Standard storage class. - RRSCParity int // Parity disks for currently configured Reduced Redundancy storage class. - } +// BackendInfo - contains info of the underlying backend +type BackendInfo struct { + // Represents various backend types, currently on FS, Erasure and Gateway + Type BackendType + + // Following fields are only meaningful if BackendType is Gateway. + GatewayOnline bool + + // Following fields are only meaningful if BackendType is Erasure. + OnlineDisks BackendDisks // Online disks during server startup. + OfflineDisks BackendDisks // Offline disks during server startup. + + // Following fields are only meaningful if BackendType is Erasure. + StandardSCData []int // Data disks for currently configured Standard storage class. + StandardSCParity int // Parity disks for currently configured Standard storage class. + RRSCData []int // Data disks for currently configured Reduced Redundancy storage class. + RRSCParity int // Parity disks for currently configured Reduced Redundancy storage class. } // BackendDisks - represents the map of endpoint-disks. @@ -280,21 +292,27 @@ type ServerProperties struct { // Disk holds Disk information type Disk struct { - Endpoint string `json:"endpoint,omitempty"` - RootDisk bool `json:"rootDisk,omitempty"` - DrivePath string `json:"path,omitempty"` - Healing bool `json:"healing,omitempty"` - State string `json:"state,omitempty"` - UUID string `json:"uuid,omitempty"` - Model string `json:"model,omitempty"` - TotalSpace uint64 `json:"totalspace,omitempty"` - UsedSpace uint64 `json:"usedspace,omitempty"` - AvailableSpace uint64 `json:"availspace,omitempty"` - ReadThroughput float64 `json:"readthroughput,omitempty"` - WriteThroughPut float64 `json:"writethroughput,omitempty"` - ReadLatency float64 `json:"readlatency,omitempty"` - WriteLatency float64 `json:"writelatency,omitempty"` - Utilization float64 `json:"utilization,omitempty"` + Endpoint string `json:"endpoint,omitempty"` + RootDisk bool `json:"rootDisk,omitempty"` + DrivePath string `json:"path,omitempty"` + Healing bool `json:"healing,omitempty"` + State string `json:"state,omitempty"` + UUID string `json:"uuid,omitempty"` + Model string `json:"model,omitempty"` + TotalSpace uint64 `json:"totalspace,omitempty"` + UsedSpace uint64 `json:"usedspace,omitempty"` + AvailableSpace uint64 `json:"availspace,omitempty"` + ReadThroughput float64 `json:"readthroughput,omitempty"` + WriteThroughPut float64 `json:"writethroughput,omitempty"` + ReadLatency float64 `json:"readlatency,omitempty"` + WriteLatency float64 `json:"writelatency,omitempty"` + Utilization float64 `json:"utilization,omitempty"` + HealInfo *HealingDisk `json:"heal_info,omitempty"` + + // Indexes, will be -1 until assigned a set. + PoolIndex int `json:"pool_index"` + SetIndex int `json:"set_index"` + DiskIndex int `json:"disk_index"` } // ServerInfo - Connect to a minio server and call Server Admin Info Management API