Imporve healing and reporting (#11312)

* Provide information on *actively* healing, buckets healed/queued, objects healed/failed.
* Add concurrent healing of multiple sets (typically on startup).
* Add bucket level resume, so restarts will only heal non-healed buckets.
* Print summary after healing a disk is done.
This commit is contained in:
Klaus Post 2021-03-04 14:36:23 -08:00 committed by GitHub
parent 97e7a902d0
commit fa9cf1251b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 1357 additions and 307 deletions

View File

@ -297,7 +297,7 @@ func (a adminAPIHandlers) StorageInfoHandler(w http.ResponseWriter, r *http.Requ
storageInfo, _ := objectAPI.StorageInfo(ctx) storageInfo, _ := objectAPI.StorageInfo(ctx)
// Collect any disk healing. // Collect any disk healing.
healing, _ := getAggregatedBackgroundHealState(ctx) healing, _ := getAggregatedBackgroundHealState(ctx, nil)
healDisks := make(map[string]struct{}, len(healing.HealDisks)) healDisks := make(map[string]struct{}, len(healing.HealDisks))
for _, disk := range healing.HealDisks { for _, disk := range healing.HealDisks {
healDisks[disk] = struct{}{} healDisks[disk] = struct{}{}
@ -861,16 +861,14 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) {
keepConnLive(w, r, respCh) keepConnLive(w, r, respCh)
} }
func getAggregatedBackgroundHealState(ctx context.Context) (madmin.BgHealState, error) { // getAggregatedBackgroundHealState returns the heal state of disks.
var bgHealStates []madmin.BgHealState // If no ObjectLayer is provided no set status is returned.
func getAggregatedBackgroundHealState(ctx context.Context, o ObjectLayer) (madmin.BgHealState, error) {
localHealState, ok := getLocalBackgroundHealStatus()
if !ok {
return madmin.BgHealState{}, errServerNotInitialized
}
// Get local heal status first // Get local heal status first
bgHealStates = append(bgHealStates, localHealState) bgHealStates, ok := getBackgroundHealStatus(ctx, o)
if !ok {
return bgHealStates, errServerNotInitialized
}
if globalIsDistErasure { if globalIsDistErasure {
// Get heal status from other peers // Get heal status from other peers
@ -885,33 +883,10 @@ func getAggregatedBackgroundHealState(ctx context.Context) (madmin.BgHealState,
if errCount == len(nerrs) { if errCount == len(nerrs) {
return madmin.BgHealState{}, fmt.Errorf("all remote servers failed to report heal status, cluster is unhealthy") return madmin.BgHealState{}, fmt.Errorf("all remote servers failed to report heal status, cluster is unhealthy")
} }
bgHealStates = append(bgHealStates, peersHealStates...) bgHealStates.Merge(peersHealStates...)
} }
// Aggregate healing result return bgHealStates, nil
var aggregatedHealStateResult = madmin.BgHealState{
ScannedItemsCount: bgHealStates[0].ScannedItemsCount,
LastHealActivity: bgHealStates[0].LastHealActivity,
NextHealRound: bgHealStates[0].NextHealRound,
HealDisks: bgHealStates[0].HealDisks,
}
bgHealStates = bgHealStates[1:]
for _, state := range bgHealStates {
aggregatedHealStateResult.ScannedItemsCount += state.ScannedItemsCount
aggregatedHealStateResult.HealDisks = append(aggregatedHealStateResult.HealDisks, state.HealDisks...)
if !state.LastHealActivity.IsZero() && aggregatedHealStateResult.LastHealActivity.Before(state.LastHealActivity) {
aggregatedHealStateResult.LastHealActivity = state.LastHealActivity
// The node which has the last heal activity means its
// is the node that is orchestrating self healing operations,
// which also means it is the same node which decides when
// the next self healing operation will be done.
aggregatedHealStateResult.NextHealRound = state.NextHealRound
}
}
return aggregatedHealStateResult, nil
} }
func (a adminAPIHandlers) BackgroundHealStatusHandler(w http.ResponseWriter, r *http.Request) { func (a adminAPIHandlers) BackgroundHealStatusHandler(w http.ResponseWriter, r *http.Request) {
@ -930,7 +905,7 @@ func (a adminAPIHandlers) BackgroundHealStatusHandler(w http.ResponseWriter, r *
return return
} }
aggregateHealStateResult, err := getAggregatedBackgroundHealState(r.Context()) aggregateHealStateResult, err := getAggregatedBackgroundHealState(r.Context(), objectAPI)
if err != nil { if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return return
@ -1568,9 +1543,9 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque
// Get the notification target info // Get the notification target info
notifyTarget := fetchLambdaInfo() notifyTarget := fetchLambdaInfo()
server := getLocalServerProperty(globalEndpoints, r) local := getLocalServerProperty(globalEndpoints, r)
servers := globalNotificationSys.ServerInfo() servers := globalNotificationSys.ServerInfo()
servers = append(servers, server) servers = append(servers, local)
assignPoolNumbers(servers) assignPoolNumbers(servers)
@ -1598,7 +1573,7 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque
// Fetching the backend information // Fetching the backend information
backendInfo := objectAPI.BackendInfo() backendInfo := objectAPI.BackendInfo()
if backendInfo.Type == BackendType(madmin.Erasure) { if backendInfo.Type == madmin.Erasure {
// Calculate the number of online/offline disks of all nodes // Calculate the number of online/offline disks of all nodes
var allDisks []madmin.Disk var allDisks []madmin.Disk
for _, s := range servers { for _, s := range servers {

View File

@ -90,8 +90,9 @@ type allHealState struct {
sync.RWMutex sync.RWMutex
// map of heal path to heal sequence // map of heal path to heal sequence
healSeqMap map[string]*healSequence healSeqMap map[string]*healSequence // Indexed by endpoint
healLocalDisks map[Endpoint]struct{} healLocalDisks map[Endpoint]struct{}
healStatus map[string]healingTracker // Indexed by disk ID
} }
// newHealState - initialize global heal state management // newHealState - initialize global heal state management
@ -99,6 +100,7 @@ func newHealState(cleanup bool) *allHealState {
hstate := &allHealState{ hstate := &allHealState{
healSeqMap: make(map[string]*healSequence), healSeqMap: make(map[string]*healSequence),
healLocalDisks: map[Endpoint]struct{}{}, healLocalDisks: map[Endpoint]struct{}{},
healStatus: make(map[string]healingTracker),
} }
if cleanup { if cleanup {
go hstate.periodicHealSeqsClean(GlobalContext) go hstate.periodicHealSeqsClean(GlobalContext)
@ -113,7 +115,56 @@ func (ahs *allHealState) healDriveCount() int {
return len(ahs.healLocalDisks) return len(ahs.healLocalDisks)
} }
func (ahs *allHealState) getHealLocalDisks() Endpoints { func (ahs *allHealState) popHealLocalDisks(healLocalDisks ...Endpoint) {
ahs.Lock()
defer ahs.Unlock()
for _, ep := range healLocalDisks {
delete(ahs.healLocalDisks, ep)
}
for id, disk := range ahs.healStatus {
for _, ep := range healLocalDisks {
if disk.Endpoint == ep.String() {
delete(ahs.healStatus, id)
}
}
}
}
// updateHealStatus will update the heal status.
func (ahs *allHealState) updateHealStatus(tracker *healingTracker) {
ahs.Lock()
defer ahs.Unlock()
ahs.healStatus[tracker.ID] = *tracker
}
// Sort by zone, set and disk index
func sortDisks(disks []madmin.Disk) {
sort.Slice(disks, func(i, j int) bool {
a, b := &disks[i], &disks[j]
if a.PoolIndex != b.PoolIndex {
return a.PoolIndex < b.PoolIndex
}
if a.SetIndex != b.SetIndex {
return a.SetIndex < b.SetIndex
}
return a.DiskIndex < b.DiskIndex
})
}
// getLocalHealingDisks returns local healing disks indexed by endpoint.
func (ahs *allHealState) getLocalHealingDisks() map[string]madmin.HealingDisk {
ahs.RLock()
defer ahs.RUnlock()
dst := make(map[string]madmin.HealingDisk, len(ahs.healStatus))
for _, v := range ahs.healStatus {
dst[v.Endpoint] = v.toHealingDisk()
}
return dst
}
func (ahs *allHealState) getHealLocalDiskEndpoints() Endpoints {
ahs.RLock() ahs.RLock()
defer ahs.RUnlock() defer ahs.RUnlock()
@ -124,15 +175,6 @@ func (ahs *allHealState) getHealLocalDisks() Endpoints {
return endpoints return endpoints
} }
func (ahs *allHealState) popHealLocalDisks(healLocalDisks ...Endpoint) {
ahs.Lock()
defer ahs.Unlock()
for _, ep := range healLocalDisks {
delete(ahs.healLocalDisks, ep)
}
}
func (ahs *allHealState) pushHealLocalDisks(healLocalDisks ...Endpoint) { func (ahs *allHealState) pushHealLocalDisks(healLocalDisks ...Endpoint) {
ahs.Lock() ahs.Lock()
defer ahs.Unlock() defer ahs.Unlock()

View File

@ -17,16 +17,23 @@
package cmd package cmd
import ( import (
"bytes"
"context" "context"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"io"
"sort" "sort"
"strings"
"sync"
"time" "time"
"github.com/dustin/go-humanize" "github.com/dustin/go-humanize"
"github.com/minio/minio-go/v7/pkg/set"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/color" "github.com/minio/minio/pkg/color"
"github.com/minio/minio/pkg/console" "github.com/minio/minio/pkg/console"
"github.com/minio/minio/pkg/madmin"
) )
const ( const (
@ -35,10 +42,200 @@ const (
) )
//go:generate msgp -file $GOFILE -unexported //go:generate msgp -file $GOFILE -unexported
type healingTracker struct {
ID string
// future add more tracking capabilities // healingTracker is used to persist healing information during a heal.
type healingTracker struct {
disk StorageAPI `msg:"-"`
ID string
PoolIndex int
SetIndex int
DiskIndex int
Path string
Endpoint string
Started time.Time
LastUpdate time.Time
ObjectsHealed uint64
ObjectsFailed uint64
BytesDone uint64
BytesFailed uint64
// Last object scanned.
Bucket string `json:"-"`
Object string `json:"-"`
// Numbers when current bucket started healing,
// for resuming with correct numbers.
ResumeObjectsHealed uint64 `json:"-"`
ResumeObjectsFailed uint64 `json:"-"`
ResumeBytesDone uint64 `json:"-"`
ResumeBytesFailed uint64 `json:"-"`
// Filled on startup/restarts.
QueuedBuckets []string
// Filled during heal.
HealedBuckets []string
// Add future tracking capabilities
// Be sure that they are included in toHealingDisk
}
// loadHealingTracker will load the healing tracker from the supplied disk.
// The disk ID will be validated against the loaded one.
func loadHealingTracker(ctx context.Context, disk StorageAPI) (*healingTracker, error) {
if disk == nil {
return nil, errors.New("loadHealingTracker: nil disk given")
}
diskID, err := disk.GetDiskID()
if err != nil {
return nil, err
}
b, err := disk.ReadAll(ctx, minioMetaBucket,
pathJoin(bucketMetaPrefix, slashSeparator, healingTrackerFilename))
if err != nil {
return nil, err
}
var h healingTracker
_, err = h.UnmarshalMsg(b)
if err != nil {
return nil, err
}
if h.ID != diskID && h.ID != "" {
return nil, fmt.Errorf("loadHealingTracker: disk id mismatch expected %s, got %s", h.ID, diskID)
}
h.disk = disk
h.ID = diskID
return &h, nil
}
// newHealingTracker will create a new healing tracker for the disk.
func newHealingTracker(disk StorageAPI) *healingTracker {
diskID, _ := disk.GetDiskID()
h := healingTracker{
disk: disk,
ID: diskID,
Path: disk.String(),
Endpoint: disk.Endpoint().String(),
Started: time.Now().UTC(),
}
h.PoolIndex, h.SetIndex, h.DiskIndex = disk.GetDiskLoc()
return &h
}
// update will update the tracker on the disk.
// If the tracker has been deleted an error is returned.
func (h *healingTracker) update(ctx context.Context) error {
if h.disk.Healing() == nil {
return fmt.Errorf("healingTracker: disk %q is not marked as healing", h.ID)
}
if h.ID == "" || h.PoolIndex < 0 || h.SetIndex < 0 || h.DiskIndex < 0 {
h.ID, _ = h.disk.GetDiskID()
h.PoolIndex, h.SetIndex, h.DiskIndex = h.disk.GetDiskLoc()
}
return h.save(ctx)
}
// save will unconditionally save the tracker and will be created if not existing.
func (h *healingTracker) save(ctx context.Context) error {
if h.PoolIndex < 0 || h.SetIndex < 0 || h.DiskIndex < 0 {
// Attempt to get location.
if api := newObjectLayerFn(); api != nil {
if ep, ok := api.(*erasureServerPools); ok {
h.PoolIndex, h.SetIndex, h.DiskIndex, _ = ep.getPoolAndSet(h.ID)
}
}
}
h.LastUpdate = time.Now().UTC()
htrackerBytes, err := h.MarshalMsg(nil)
if err != nil {
return err
}
globalBackgroundHealState.updateHealStatus(h)
return h.disk.WriteAll(ctx, minioMetaBucket,
pathJoin(bucketMetaPrefix, slashSeparator, healingTrackerFilename),
htrackerBytes)
}
// delete the tracker on disk.
func (h *healingTracker) delete(ctx context.Context) error {
return h.disk.Delete(ctx, minioMetaBucket,
pathJoin(bucketMetaPrefix, slashSeparator, healingTrackerFilename),
false)
}
func (h *healingTracker) isHealed(bucket string) bool {
for _, v := range h.HealedBuckets {
if v == bucket {
return true
}
}
return false
}
// resume will reset progress to the numbers at the start of the bucket.
func (h *healingTracker) resume() {
h.ObjectsHealed = h.ResumeObjectsHealed
h.ObjectsFailed = h.ResumeObjectsFailed
h.BytesDone = h.ResumeBytesDone
h.BytesFailed = h.ResumeBytesFailed
}
// bucketDone should be called when a bucket is done healing.
// Adds the bucket to the list of healed buckets and updates resume numbers.
func (h *healingTracker) bucketDone(bucket string) {
h.ResumeObjectsHealed = h.ObjectsHealed
h.ResumeObjectsFailed = h.ObjectsFailed
h.ResumeBytesDone = h.BytesDone
h.ResumeBytesFailed = h.BytesFailed
h.HealedBuckets = append(h.HealedBuckets, bucket)
for i, b := range h.QueuedBuckets {
if b == bucket {
// Delete...
h.QueuedBuckets = append(h.QueuedBuckets[:i], h.QueuedBuckets[i+1:]...)
}
}
}
// setQueuedBuckets will add buckets, but exclude any that is already in h.HealedBuckets.
// Order is preserved.
func (h *healingTracker) setQueuedBuckets(buckets []BucketInfo) {
s := set.CreateStringSet(h.HealedBuckets...)
h.QueuedBuckets = make([]string, 0, len(buckets))
for _, b := range buckets {
if !s.Contains(b.Name) {
h.QueuedBuckets = append(h.QueuedBuckets, b.Name)
}
}
}
func (h *healingTracker) printTo(writer io.Writer) {
b, err := json.MarshalIndent(h, "", " ")
if err != nil {
writer.Write([]byte(err.Error()))
}
writer.Write(b)
}
// toHealingDisk converts the information to madmin.HealingDisk
func (h *healingTracker) toHealingDisk() madmin.HealingDisk {
return madmin.HealingDisk{
ID: h.ID,
Endpoint: h.Endpoint,
PoolIndex: h.PoolIndex,
SetIndex: h.SetIndex,
DiskIndex: h.DiskIndex,
Path: h.Path,
Started: h.Started.UTC(),
LastUpdate: h.LastUpdate.UTC(),
ObjectsHealed: h.ObjectsHealed,
ObjectsFailed: h.ObjectsFailed,
BytesDone: h.BytesDone,
BytesFailed: h.BytesFailed,
Bucket: h.Bucket,
Object: h.Object,
QueuedBuckets: h.QueuedBuckets,
HealedBuckets: h.HealedBuckets,
}
} }
func initAutoHeal(ctx context.Context, objAPI ObjectLayer) { func initAutoHeal(ctx context.Context, objAPI ObjectLayer) {
@ -90,7 +287,7 @@ func getLocalDisksToHeal() (disksToHeal Endpoints) {
disk, _, err := connectEndpoint(endpoint) disk, _, err := connectEndpoint(endpoint)
if errors.Is(err, errUnformattedDisk) { if errors.Is(err, errUnformattedDisk) {
disksToHeal = append(disksToHeal, endpoint) disksToHeal = append(disksToHeal, endpoint)
} else if err == nil && disk != nil && disk.Healing() { } else if err == nil && disk != nil && disk.Healing() != nil {
disksToHeal = append(disksToHeal, disk.Endpoint()) disksToHeal = append(disksToHeal, disk.Endpoint())
} }
} }
@ -114,7 +311,7 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerPools, bgSeq
// Perform automatic disk healing when a disk is replaced locally. // Perform automatic disk healing when a disk is replaced locally.
diskCheckTimer := time.NewTimer(defaultMonitorNewDiskInterval) diskCheckTimer := time.NewTimer(defaultMonitorNewDiskInterval)
defer diskCheckTimer.Stop() defer diskCheckTimer.Stop()
wait:
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -125,7 +322,7 @@ wait:
var erasureSetInPoolDisksToHeal []map[int][]StorageAPI var erasureSetInPoolDisksToHeal []map[int][]StorageAPI
healDisks := globalBackgroundHealState.getHealLocalDisks() healDisks := globalBackgroundHealState.getHealLocalDiskEndpoints()
if len(healDisks) > 0 { if len(healDisks) > 0 {
// Reformat disks // Reformat disks
bgSeq.sourceCh <- healSource{bucket: SlashSeparator} bgSeq.sourceCh <- healSource{bucket: SlashSeparator}
@ -174,55 +371,70 @@ wait:
buckets, _ := z.ListBuckets(ctx) buckets, _ := z.ListBuckets(ctx)
buckets = append(buckets, BucketInfo{
Name: pathJoin(minioMetaBucket, minioConfigPrefix),
})
// Heal latest buckets first. // Heal latest buckets first.
sort.Slice(buckets, func(i, j int) bool { sort.Slice(buckets, func(i, j int) bool {
a, b := strings.HasPrefix(buckets[i].Name, minioMetaBucket), strings.HasPrefix(buckets[j].Name, minioMetaBucket)
if a != b {
return a
}
return buckets[i].Created.After(buckets[j].Created) return buckets[i].Created.After(buckets[j].Created)
}) })
// TODO(klauspost): This will block until all heals are done,
// in the future this should be able to start healing other sets at once.
var wg sync.WaitGroup
for i, setMap := range erasureSetInPoolDisksToHeal { for i, setMap := range erasureSetInPoolDisksToHeal {
i := i
for setIndex, disks := range setMap { for setIndex, disks := range setMap {
for _, disk := range disks { if len(disks) == 0 {
logger.Info("Healing disk '%s' on %s pool", disk, humanize.Ordinal(i+1)) continue
}
wg.Add(1)
go func(setIndex int, disks []StorageAPI) {
defer wg.Done()
for _, disk := range disks {
logger.Info("Healing disk '%v' on %s pool", disk, humanize.Ordinal(i+1))
// So someone changed the drives underneath, healing tracker missing. // So someone changed the drives underneath, healing tracker missing.
if !disk.Healing() { tracker, err := loadHealingTracker(ctx, disk)
logger.Info("Healing tracker missing on '%s', disk was swapped again on %s pool", disk, humanize.Ordinal(i+1))
diskID, err := disk.GetDiskID()
if err != nil { if err != nil {
logger.LogIf(ctx, err) logger.Info("Healing tracker missing on '%s', disk was swapped again on %s pool", disk, humanize.Ordinal(i+1))
// reading format.json failed or not found, proceed to look tracker = newHealingTracker(disk)
// for new disks to be healed again, we cannot proceed further.
goto wait
} }
if err := saveHealingTracker(disk, diskID); err != nil { tracker.PoolIndex, tracker.SetIndex, tracker.DiskIndex = disk.GetDiskLoc()
tracker.setQueuedBuckets(buckets)
if err := tracker.save(ctx); err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
// Unable to write healing tracker, permission denied or some // Unable to write healing tracker, permission denied or some
// other unexpected error occurred. Proceed to look for new // other unexpected error occurred. Proceed to look for new
// disks to be healed again, we cannot proceed further. // disks to be healed again, we cannot proceed further.
goto wait return
} }
err = z.serverPools[i].sets[setIndex].healErasureSet(ctx, buckets, tracker)
if err != nil {
logger.LogIf(ctx, err)
continue
}
logger.Info("Healing disk '%s' on %s pool complete", disk, humanize.Ordinal(i+1))
var buf bytes.Buffer
tracker.printTo(&buf)
logger.Info("Summary:\n%s", buf.String())
logger.LogIf(ctx, tracker.delete(ctx))
// Only upon success pop the healed disk.
globalBackgroundHealState.popHealLocalDisks(disk.Endpoint())
} }
}(setIndex, disks)
err := z.serverPools[i].sets[setIndex].healErasureSet(ctx, buckets)
if err != nil {
logger.LogIf(ctx, err)
continue
}
logger.Info("Healing disk '%s' on %s pool complete", disk, humanize.Ordinal(i+1))
if err := disk.Delete(ctx, pathJoin(minioMetaBucket, bucketMetaPrefix),
healingTrackerFilename, false); err != nil && !errors.Is(err, errFileNotFound) {
logger.LogIf(ctx, err)
continue
}
// Only upon success pop the healed disk.
globalBackgroundHealState.popHealLocalDisks(disk.Endpoint())
}
} }
} }
wg.Wait()
} }
} }
} }

View File

@ -30,6 +30,146 @@ func (z *healingTracker) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "ID") err = msgp.WrapError(err, "ID")
return return
} }
case "PoolIndex":
z.PoolIndex, err = dc.ReadInt()
if err != nil {
err = msgp.WrapError(err, "PoolIndex")
return
}
case "SetIndex":
z.SetIndex, err = dc.ReadInt()
if err != nil {
err = msgp.WrapError(err, "SetIndex")
return
}
case "DiskIndex":
z.DiskIndex, err = dc.ReadInt()
if err != nil {
err = msgp.WrapError(err, "DiskIndex")
return
}
case "Path":
z.Path, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Path")
return
}
case "Endpoint":
z.Endpoint, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Endpoint")
return
}
case "Started":
z.Started, err = dc.ReadTime()
if err != nil {
err = msgp.WrapError(err, "Started")
return
}
case "LastUpdate":
z.LastUpdate, err = dc.ReadTime()
if err != nil {
err = msgp.WrapError(err, "LastUpdate")
return
}
case "ObjectsHealed":
z.ObjectsHealed, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "ObjectsHealed")
return
}
case "ObjectsFailed":
z.ObjectsFailed, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "ObjectsFailed")
return
}
case "BytesDone":
z.BytesDone, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "BytesDone")
return
}
case "BytesFailed":
z.BytesFailed, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "BytesFailed")
return
}
case "Bucket":
z.Bucket, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Bucket")
return
}
case "Object":
z.Object, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Object")
return
}
case "ResumeObjectsHealed":
z.ResumeObjectsHealed, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "ResumeObjectsHealed")
return
}
case "ResumeObjectsFailed":
z.ResumeObjectsFailed, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "ResumeObjectsFailed")
return
}
case "ResumeBytesDone":
z.ResumeBytesDone, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "ResumeBytesDone")
return
}
case "ResumeBytesFailed":
z.ResumeBytesFailed, err = dc.ReadUint64()
if err != nil {
err = msgp.WrapError(err, "ResumeBytesFailed")
return
}
case "QueuedBuckets":
var zb0002 uint32
zb0002, err = dc.ReadArrayHeader()
if err != nil {
err = msgp.WrapError(err, "QueuedBuckets")
return
}
if cap(z.QueuedBuckets) >= int(zb0002) {
z.QueuedBuckets = (z.QueuedBuckets)[:zb0002]
} else {
z.QueuedBuckets = make([]string, zb0002)
}
for za0001 := range z.QueuedBuckets {
z.QueuedBuckets[za0001], err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "QueuedBuckets", za0001)
return
}
}
case "HealedBuckets":
var zb0003 uint32
zb0003, err = dc.ReadArrayHeader()
if err != nil {
err = msgp.WrapError(err, "HealedBuckets")
return
}
if cap(z.HealedBuckets) >= int(zb0003) {
z.HealedBuckets = (z.HealedBuckets)[:zb0003]
} else {
z.HealedBuckets = make([]string, zb0003)
}
for za0002 := range z.HealedBuckets {
z.HealedBuckets[za0002], err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "HealedBuckets", za0002)
return
}
}
default: default:
err = dc.Skip() err = dc.Skip()
if err != nil { if err != nil {
@ -42,10 +182,10 @@ func (z *healingTracker) DecodeMsg(dc *msgp.Reader) (err error) {
} }
// EncodeMsg implements msgp.Encodable // EncodeMsg implements msgp.Encodable
func (z healingTracker) EncodeMsg(en *msgp.Writer) (err error) { func (z *healingTracker) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 1 // map header, size 20
// write "ID" // write "ID"
err = en.Append(0x81, 0xa2, 0x49, 0x44) err = en.Append(0xde, 0x0, 0x14, 0xa2, 0x49, 0x44)
if err != nil { if err != nil {
return return
} }
@ -54,16 +194,283 @@ func (z healingTracker) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "ID") err = msgp.WrapError(err, "ID")
return return
} }
// write "PoolIndex"
err = en.Append(0xa9, 0x50, 0x6f, 0x6f, 0x6c, 0x49, 0x6e, 0x64, 0x65, 0x78)
if err != nil {
return
}
err = en.WriteInt(z.PoolIndex)
if err != nil {
err = msgp.WrapError(err, "PoolIndex")
return
}
// write "SetIndex"
err = en.Append(0xa8, 0x53, 0x65, 0x74, 0x49, 0x6e, 0x64, 0x65, 0x78)
if err != nil {
return
}
err = en.WriteInt(z.SetIndex)
if err != nil {
err = msgp.WrapError(err, "SetIndex")
return
}
// write "DiskIndex"
err = en.Append(0xa9, 0x44, 0x69, 0x73, 0x6b, 0x49, 0x6e, 0x64, 0x65, 0x78)
if err != nil {
return
}
err = en.WriteInt(z.DiskIndex)
if err != nil {
err = msgp.WrapError(err, "DiskIndex")
return
}
// write "Path"
err = en.Append(0xa4, 0x50, 0x61, 0x74, 0x68)
if err != nil {
return
}
err = en.WriteString(z.Path)
if err != nil {
err = msgp.WrapError(err, "Path")
return
}
// write "Endpoint"
err = en.Append(0xa8, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74)
if err != nil {
return
}
err = en.WriteString(z.Endpoint)
if err != nil {
err = msgp.WrapError(err, "Endpoint")
return
}
// write "Started"
err = en.Append(0xa7, 0x53, 0x74, 0x61, 0x72, 0x74, 0x65, 0x64)
if err != nil {
return
}
err = en.WriteTime(z.Started)
if err != nil {
err = msgp.WrapError(err, "Started")
return
}
// write "LastUpdate"
err = en.Append(0xaa, 0x4c, 0x61, 0x73, 0x74, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65)
if err != nil {
return
}
err = en.WriteTime(z.LastUpdate)
if err != nil {
err = msgp.WrapError(err, "LastUpdate")
return
}
// write "ObjectsHealed"
err = en.Append(0xad, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64)
if err != nil {
return
}
err = en.WriteUint64(z.ObjectsHealed)
if err != nil {
err = msgp.WrapError(err, "ObjectsHealed")
return
}
// write "ObjectsFailed"
err = en.Append(0xad, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64)
if err != nil {
return
}
err = en.WriteUint64(z.ObjectsFailed)
if err != nil {
err = msgp.WrapError(err, "ObjectsFailed")
return
}
// write "BytesDone"
err = en.Append(0xa9, 0x42, 0x79, 0x74, 0x65, 0x73, 0x44, 0x6f, 0x6e, 0x65)
if err != nil {
return
}
err = en.WriteUint64(z.BytesDone)
if err != nil {
err = msgp.WrapError(err, "BytesDone")
return
}
// write "BytesFailed"
err = en.Append(0xab, 0x42, 0x79, 0x74, 0x65, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64)
if err != nil {
return
}
err = en.WriteUint64(z.BytesFailed)
if err != nil {
err = msgp.WrapError(err, "BytesFailed")
return
}
// write "Bucket"
err = en.Append(0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74)
if err != nil {
return
}
err = en.WriteString(z.Bucket)
if err != nil {
err = msgp.WrapError(err, "Bucket")
return
}
// write "Object"
err = en.Append(0xa6, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74)
if err != nil {
return
}
err = en.WriteString(z.Object)
if err != nil {
err = msgp.WrapError(err, "Object")
return
}
// write "ResumeObjectsHealed"
err = en.Append(0xb3, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64)
if err != nil {
return
}
err = en.WriteUint64(z.ResumeObjectsHealed)
if err != nil {
err = msgp.WrapError(err, "ResumeObjectsHealed")
return
}
// write "ResumeObjectsFailed"
err = en.Append(0xb3, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64)
if err != nil {
return
}
err = en.WriteUint64(z.ResumeObjectsFailed)
if err != nil {
err = msgp.WrapError(err, "ResumeObjectsFailed")
return
}
// write "ResumeBytesDone"
err = en.Append(0xaf, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x44, 0x6f, 0x6e, 0x65)
if err != nil {
return
}
err = en.WriteUint64(z.ResumeBytesDone)
if err != nil {
err = msgp.WrapError(err, "ResumeBytesDone")
return
}
// write "ResumeBytesFailed"
err = en.Append(0xb1, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64)
if err != nil {
return
}
err = en.WriteUint64(z.ResumeBytesFailed)
if err != nil {
err = msgp.WrapError(err, "ResumeBytesFailed")
return
}
// write "QueuedBuckets"
err = en.Append(0xad, 0x51, 0x75, 0x65, 0x75, 0x65, 0x64, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x73)
if err != nil {
return
}
err = en.WriteArrayHeader(uint32(len(z.QueuedBuckets)))
if err != nil {
err = msgp.WrapError(err, "QueuedBuckets")
return
}
for za0001 := range z.QueuedBuckets {
err = en.WriteString(z.QueuedBuckets[za0001])
if err != nil {
err = msgp.WrapError(err, "QueuedBuckets", za0001)
return
}
}
// write "HealedBuckets"
err = en.Append(0xad, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x73)
if err != nil {
return
}
err = en.WriteArrayHeader(uint32(len(z.HealedBuckets)))
if err != nil {
err = msgp.WrapError(err, "HealedBuckets")
return
}
for za0002 := range z.HealedBuckets {
err = en.WriteString(z.HealedBuckets[za0002])
if err != nil {
err = msgp.WrapError(err, "HealedBuckets", za0002)
return
}
}
return return
} }
// MarshalMsg implements msgp.Marshaler // MarshalMsg implements msgp.Marshaler
func (z healingTracker) MarshalMsg(b []byte) (o []byte, err error) { func (z *healingTracker) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize()) o = msgp.Require(b, z.Msgsize())
// map header, size 1 // map header, size 20
// string "ID" // string "ID"
o = append(o, 0x81, 0xa2, 0x49, 0x44) o = append(o, 0xde, 0x0, 0x14, 0xa2, 0x49, 0x44)
o = msgp.AppendString(o, z.ID) o = msgp.AppendString(o, z.ID)
// string "PoolIndex"
o = append(o, 0xa9, 0x50, 0x6f, 0x6f, 0x6c, 0x49, 0x6e, 0x64, 0x65, 0x78)
o = msgp.AppendInt(o, z.PoolIndex)
// string "SetIndex"
o = append(o, 0xa8, 0x53, 0x65, 0x74, 0x49, 0x6e, 0x64, 0x65, 0x78)
o = msgp.AppendInt(o, z.SetIndex)
// string "DiskIndex"
o = append(o, 0xa9, 0x44, 0x69, 0x73, 0x6b, 0x49, 0x6e, 0x64, 0x65, 0x78)
o = msgp.AppendInt(o, z.DiskIndex)
// string "Path"
o = append(o, 0xa4, 0x50, 0x61, 0x74, 0x68)
o = msgp.AppendString(o, z.Path)
// string "Endpoint"
o = append(o, 0xa8, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74)
o = msgp.AppendString(o, z.Endpoint)
// string "Started"
o = append(o, 0xa7, 0x53, 0x74, 0x61, 0x72, 0x74, 0x65, 0x64)
o = msgp.AppendTime(o, z.Started)
// string "LastUpdate"
o = append(o, 0xaa, 0x4c, 0x61, 0x73, 0x74, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65)
o = msgp.AppendTime(o, z.LastUpdate)
// string "ObjectsHealed"
o = append(o, 0xad, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64)
o = msgp.AppendUint64(o, z.ObjectsHealed)
// string "ObjectsFailed"
o = append(o, 0xad, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64)
o = msgp.AppendUint64(o, z.ObjectsFailed)
// string "BytesDone"
o = append(o, 0xa9, 0x42, 0x79, 0x74, 0x65, 0x73, 0x44, 0x6f, 0x6e, 0x65)
o = msgp.AppendUint64(o, z.BytesDone)
// string "BytesFailed"
o = append(o, 0xab, 0x42, 0x79, 0x74, 0x65, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64)
o = msgp.AppendUint64(o, z.BytesFailed)
// string "Bucket"
o = append(o, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74)
o = msgp.AppendString(o, z.Bucket)
// string "Object"
o = append(o, 0xa6, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74)
o = msgp.AppendString(o, z.Object)
// string "ResumeObjectsHealed"
o = append(o, 0xb3, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64)
o = msgp.AppendUint64(o, z.ResumeObjectsHealed)
// string "ResumeObjectsFailed"
o = append(o, 0xb3, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64)
o = msgp.AppendUint64(o, z.ResumeObjectsFailed)
// string "ResumeBytesDone"
o = append(o, 0xaf, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x44, 0x6f, 0x6e, 0x65)
o = msgp.AppendUint64(o, z.ResumeBytesDone)
// string "ResumeBytesFailed"
o = append(o, 0xb1, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64)
o = msgp.AppendUint64(o, z.ResumeBytesFailed)
// string "QueuedBuckets"
o = append(o, 0xad, 0x51, 0x75, 0x65, 0x75, 0x65, 0x64, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x73)
o = msgp.AppendArrayHeader(o, uint32(len(z.QueuedBuckets)))
for za0001 := range z.QueuedBuckets {
o = msgp.AppendString(o, z.QueuedBuckets[za0001])
}
// string "HealedBuckets"
o = append(o, 0xad, 0x48, 0x65, 0x61, 0x6c, 0x65, 0x64, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x73)
o = msgp.AppendArrayHeader(o, uint32(len(z.HealedBuckets)))
for za0002 := range z.HealedBuckets {
o = msgp.AppendString(o, z.HealedBuckets[za0002])
}
return return
} }
@ -91,6 +498,146 @@ func (z *healingTracker) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "ID") err = msgp.WrapError(err, "ID")
return return
} }
case "PoolIndex":
z.PoolIndex, bts, err = msgp.ReadIntBytes(bts)
if err != nil {
err = msgp.WrapError(err, "PoolIndex")
return
}
case "SetIndex":
z.SetIndex, bts, err = msgp.ReadIntBytes(bts)
if err != nil {
err = msgp.WrapError(err, "SetIndex")
return
}
case "DiskIndex":
z.DiskIndex, bts, err = msgp.ReadIntBytes(bts)
if err != nil {
err = msgp.WrapError(err, "DiskIndex")
return
}
case "Path":
z.Path, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Path")
return
}
case "Endpoint":
z.Endpoint, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Endpoint")
return
}
case "Started":
z.Started, bts, err = msgp.ReadTimeBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Started")
return
}
case "LastUpdate":
z.LastUpdate, bts, err = msgp.ReadTimeBytes(bts)
if err != nil {
err = msgp.WrapError(err, "LastUpdate")
return
}
case "ObjectsHealed":
z.ObjectsHealed, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "ObjectsHealed")
return
}
case "ObjectsFailed":
z.ObjectsFailed, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "ObjectsFailed")
return
}
case "BytesDone":
z.BytesDone, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "BytesDone")
return
}
case "BytesFailed":
z.BytesFailed, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "BytesFailed")
return
}
case "Bucket":
z.Bucket, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Bucket")
return
}
case "Object":
z.Object, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Object")
return
}
case "ResumeObjectsHealed":
z.ResumeObjectsHealed, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "ResumeObjectsHealed")
return
}
case "ResumeObjectsFailed":
z.ResumeObjectsFailed, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "ResumeObjectsFailed")
return
}
case "ResumeBytesDone":
z.ResumeBytesDone, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "ResumeBytesDone")
return
}
case "ResumeBytesFailed":
z.ResumeBytesFailed, bts, err = msgp.ReadUint64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "ResumeBytesFailed")
return
}
case "QueuedBuckets":
var zb0002 uint32
zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "QueuedBuckets")
return
}
if cap(z.QueuedBuckets) >= int(zb0002) {
z.QueuedBuckets = (z.QueuedBuckets)[:zb0002]
} else {
z.QueuedBuckets = make([]string, zb0002)
}
for za0001 := range z.QueuedBuckets {
z.QueuedBuckets[za0001], bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "QueuedBuckets", za0001)
return
}
}
case "HealedBuckets":
var zb0003 uint32
zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err, "HealedBuckets")
return
}
if cap(z.HealedBuckets) >= int(zb0003) {
z.HealedBuckets = (z.HealedBuckets)[:zb0003]
} else {
z.HealedBuckets = make([]string, zb0003)
}
for za0002 := range z.HealedBuckets {
z.HealedBuckets[za0002], bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "HealedBuckets", za0002)
return
}
}
default: default:
bts, err = msgp.Skip(bts) bts, err = msgp.Skip(bts)
if err != nil { if err != nil {
@ -104,7 +651,14 @@ func (z *healingTracker) UnmarshalMsg(bts []byte) (o []byte, err error) {
} }
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z healingTracker) Msgsize() (s int) { func (z *healingTracker) Msgsize() (s int) {
s = 1 + 3 + msgp.StringPrefixSize + len(z.ID) s = 3 + 3 + msgp.StringPrefixSize + len(z.ID) + 10 + msgp.IntSize + 9 + msgp.IntSize + 10 + msgp.IntSize + 5 + msgp.StringPrefixSize + len(z.Path) + 9 + msgp.StringPrefixSize + len(z.Endpoint) + 8 + msgp.TimeSize + 11 + msgp.TimeSize + 14 + msgp.Uint64Size + 14 + msgp.Uint64Size + 10 + msgp.Uint64Size + 12 + msgp.Uint64Size + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Object) + 20 + msgp.Uint64Size + 20 + msgp.Uint64Size + 16 + msgp.Uint64Size + 18 + msgp.Uint64Size + 14 + msgp.ArrayHeaderSize
for za0001 := range z.QueuedBuckets {
s += msgp.StringPrefixSize + len(z.QueuedBuckets[za0001])
}
s += 14 + msgp.ArrayHeaderSize
for za0002 := range z.HealedBuckets {
s += msgp.StringPrefixSize + len(z.HealedBuckets[za0002])
}
return return
} }

View File

@ -38,7 +38,7 @@ func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) {
// Based on the random shuffling return back randomized disks. // Based on the random shuffling return back randomized disks.
for _, i := range hashOrder(UTCNow().String(), len(disks)) { for _, i := range hashOrder(UTCNow().String(), len(disks)) {
if disks[i-1] != nil && disks[i-1].IsLocal() { if disks[i-1] != nil && disks[i-1].IsLocal() {
if !disks[i-1].Healing() && disks[i-1].IsOnline() { if disks[i-1].Healing() == nil && disks[i-1].IsOnline() {
newDisks = append(newDisks, disks[i-1]) newDisks = append(newDisks, disks[i-1])
} }
} }

View File

@ -239,13 +239,6 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
storageDisks := er.getDisks() storageDisks := er.getDisks()
storageEndpoints := er.getEndpoints() storageEndpoints := er.getEndpoints()
// List of disks having latest version of the object er.meta
// (by modtime).
latestDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
// List of disks having all parts as per latest er.meta.
availableDisks, dataErrs := disksWithAllParts(ctx, latestDisks, partsMetadata, errs, bucket, object, scanMode)
// Initialize heal result object // Initialize heal result object
result = madmin.HealResultItem{ result = madmin.HealResultItem{
Type: madmin.HealItemObject, Type: madmin.HealItemObject,
@ -256,6 +249,21 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
DataBlocks: len(storageDisks) - er.defaultParityCount, DataBlocks: len(storageDisks) - er.defaultParityCount,
} }
if !opts.NoLock {
lk := er.NewNSLock(bucket, object)
if ctx, err = lk.GetLock(ctx, globalOperationTimeout); err != nil {
return result, err
}
defer lk.Unlock()
}
// List of disks having latest version of the object er.meta
// (by modtime).
latestDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
// List of disks having all parts as per latest er.meta.
availableDisks, dataErrs := disksWithAllParts(ctx, latestDisks, partsMetadata, errs, bucket, object, scanMode)
// Loop to find number of disks with valid data, per-drive // Loop to find number of disks with valid data, per-drive
// data state and a list of outdated disks on which data needs // data state and a list of outdated disks on which data needs
// to be healed. // to be healed.

View File

@ -18,6 +18,9 @@ package cmd
import ( import (
"context" "context"
"encoding/hex"
"fmt"
"math/rand"
"reflect" "reflect"
"testing" "testing"
) )
@ -199,3 +202,22 @@ func TestEvalDisks(t *testing.T) {
z := objLayer.(*erasureServerPools) z := objLayer.(*erasureServerPools)
testShuffleDisks(t, z) testShuffleDisks(t, z)
} }
func Test_hashOrder(t *testing.T) {
for x := 1; x < 17; x++ {
t.Run(fmt.Sprintf("%d", x), func(t *testing.T) {
var first [17]int
rng := rand.New(rand.NewSource(0))
var tmp [16]byte
rng.Read(tmp[:])
prefix := hex.EncodeToString(tmp[:])
for i := 0; i < 10000; i++ {
rng.Read(tmp[:])
y := hashOrder(fmt.Sprintf("%s/%x", prefix, hex.EncodeToString(tmp[:3])), x)
first[y[0]]++
}
t.Log("first:", first[:x])
})
}
}

View File

@ -106,11 +106,10 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ
return nil, fmt.Errorf("All serverPools should have same deployment ID expected %s, got %s", deploymentID, formats[i].ID) return nil, fmt.Errorf("All serverPools should have same deployment ID expected %s, got %s", deploymentID, formats[i].ID)
} }
z.serverPools[i], err = newErasureSets(ctx, ep.Endpoints, storageDisks[i], formats[i], commonParityDrives) z.serverPools[i], err = newErasureSets(ctx, ep.Endpoints, storageDisks[i], formats[i], commonParityDrives, i)
if err != nil { if err != nil {
return nil, err return nil, err
} }
z.serverPools[i].poolNumber = i
} }
ctx, z.shutdown = context.WithCancel(ctx) ctx, z.shutdown = context.WithCancel(ctx)
go intDataUpdateTracker.start(ctx, localDrives...) go intDataUpdateTracker.start(ctx, localDrives...)
@ -311,8 +310,8 @@ func (z *erasureServerPools) Shutdown(ctx context.Context) error {
return nil return nil
} }
func (z *erasureServerPools) BackendInfo() (b BackendInfo) { func (z *erasureServerPools) BackendInfo() (b madmin.BackendInfo) {
b.Type = BackendErasure b.Type = madmin.Erasure
scParity := globalStorageClass.GetParityForSC(storageclass.STANDARD) scParity := globalStorageClass.GetParityForSC(storageclass.STANDARD)
if scParity <= 0 { if scParity <= 0 {
@ -1519,25 +1518,6 @@ func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix str
func (z *erasureServerPools) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) { func (z *erasureServerPools) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
object = encodeDirObject(object) object = encodeDirObject(object)
var err error
lk := z.NewNSLock(bucket, object)
if bucket == minioMetaBucket {
// For .minio.sys bucket heals we should hold write locks.
ctx, err = lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
return madmin.HealResultItem{}, err
}
defer lk.Unlock()
} else {
// Lock the object before healing. Use read lock since healing
// will only regenerate parts & xl.meta of outdated disks.
ctx, err = lk.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return madmin.HealResultItem{}, err
}
defer lk.RUnlock()
}
for _, pool := range z.serverPools { for _, pool := range z.serverPools {
result, err := pool.HealObject(ctx, bucket, object, versionID, opts) result, err := pool.HealObject(ctx, bucket, object, versionID, opts)
result.Object = decodeDirObject(result.Object) result.Object = decodeDirObject(result.Object)
@ -1568,18 +1548,18 @@ func (z *erasureServerPools) GetMetrics(ctx context.Context) (*BackendMetrics, e
return &BackendMetrics{}, NotImplemented{} return &BackendMetrics{}, NotImplemented{}
} }
func (z *erasureServerPools) getPoolAndSet(id string) (int, int, error) { func (z *erasureServerPools) getPoolAndSet(id string) (poolIdx, setIdx, diskIdx int, err error) {
for poolIdx := range z.serverPools { for poolIdx := range z.serverPools {
format := z.serverPools[poolIdx].format format := z.serverPools[poolIdx].format
for setIdx, set := range format.Erasure.Sets { for setIdx, set := range format.Erasure.Sets {
for _, diskID := range set { for i, diskID := range set {
if diskID == id { if diskID == id {
return poolIdx, setIdx, nil return poolIdx, setIdx, i, nil
} }
} }
} }
} }
return 0, 0, fmt.Errorf("DiskID(%s) %w", id, errDiskNotFound) return -1, -1, -1, fmt.Errorf("DiskID(%s) %w", id, errDiskNotFound)
} }
// HealthOptions takes input options to return sepcific information // HealthOptions takes input options to return sepcific information
@ -1609,7 +1589,7 @@ func (z *erasureServerPools) ReadHealth(ctx context.Context) bool {
for _, localDiskIDs := range diskIDs { for _, localDiskIDs := range diskIDs {
for _, id := range localDiskIDs { for _, id := range localDiskIDs {
poolIdx, setIdx, err := z.getPoolAndSet(id) poolIdx, setIdx, _, err := z.getPoolAndSet(id)
if err != nil { if err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
continue continue
@ -1648,7 +1628,7 @@ func (z *erasureServerPools) Health(ctx context.Context, opts HealthOptions) Hea
for _, localDiskIDs := range diskIDs { for _, localDiskIDs := range diskIDs {
for _, id := range localDiskIDs { for _, id := range localDiskIDs {
poolIdx, setIdx, err := z.getPoolAndSet(id) poolIdx, setIdx, _, err := z.getPoolAndSet(id)
if err != nil { if err != nil {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
continue continue
@ -1671,7 +1651,7 @@ func (z *erasureServerPools) Health(ctx context.Context, opts HealthOptions) Hea
// we need to tell healthy status as 'false' so that this server // we need to tell healthy status as 'false' so that this server
// is not taken down for maintenance // is not taken down for maintenance
var err error var err error
aggHealStateResult, err = getAggregatedBackgroundHealState(ctx) aggHealStateResult, err = getAggregatedBackgroundHealState(ctx, nil)
if err != nil { if err != nil {
logger.LogIf(logger.SetReqInfo(ctx, reqInfo), fmt.Errorf("Unable to verify global heal status: %w", err)) logger.LogIf(logger.SetReqInfo(ctx, reqInfo), fmt.Errorf("Unable to verify global heal status: %w", err))
return HealthResult{ return HealthResult{

View File

@ -83,7 +83,7 @@ type erasureSets struct {
setCount, setDriveCount int setCount, setDriveCount int
defaultParityCount int defaultParityCount int
poolNumber int poolIndex int
disksConnectEvent chan diskConnectInfo disksConnectEvent chan diskConnectInfo
@ -217,7 +217,7 @@ func (s *erasureSets) connectDisks() {
} }
return return
} }
if disk.IsLocal() && disk.Healing() { if disk.IsLocal() && disk.Healing() != nil {
globalBackgroundHealState.pushHealLocalDisks(disk.Endpoint()) globalBackgroundHealState.pushHealLocalDisks(disk.Endpoint())
logger.Info(fmt.Sprintf("Found the drive %s that needs healing, attempting to heal...", disk)) logger.Info(fmt.Sprintf("Found the drive %s that needs healing, attempting to heal...", disk))
} }
@ -246,6 +246,7 @@ func (s *erasureSets) connectDisks() {
disk.SetDiskID(format.Erasure.This) disk.SetDiskID(format.Erasure.This)
s.erasureDisks[setIndex][diskIndex] = disk s.erasureDisks[setIndex][diskIndex] = disk
} }
disk.SetDiskLoc(s.poolIndex, setIndex, diskIndex)
s.endpointStrings[setIndex*s.setDriveCount+diskIndex] = disk.String() s.endpointStrings[setIndex*s.setDriveCount+diskIndex] = disk.String()
s.erasureDisksMu.Unlock() s.erasureDisksMu.Unlock()
go func(setIndex int) { go func(setIndex int) {
@ -331,7 +332,7 @@ func (s *erasureSets) GetDisks(setIndex int) func() []StorageAPI {
const defaultMonitorConnectEndpointInterval = defaultMonitorNewDiskInterval + time.Second*5 const defaultMonitorConnectEndpointInterval = defaultMonitorNewDiskInterval + time.Second*5
// Initialize new set of erasure coded sets. // Initialize new set of erasure coded sets.
func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []StorageAPI, format *formatErasureV3, defaultParityCount int) (*erasureSets, error) { func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []StorageAPI, format *formatErasureV3, defaultParityCount, poolIdx int) (*erasureSets, error) {
setCount := len(format.Erasure.Sets) setCount := len(format.Erasure.Sets)
setDriveCount := len(format.Erasure.Sets[0]) setDriveCount := len(format.Erasure.Sets[0])
@ -353,6 +354,7 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
distributionAlgo: format.Erasure.DistributionAlgo, distributionAlgo: format.Erasure.DistributionAlgo,
deploymentID: uuid.MustParse(format.ID), deploymentID: uuid.MustParse(format.ID),
mrfOperations: make(map[healSource]int), mrfOperations: make(map[healSource]int),
poolIndex: poolIdx,
} }
mutex := newNSLock(globalIsDistErasure) mutex := newNSLock(globalIsDistErasure)
@ -398,13 +400,15 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
if err != nil { if err != nil {
continue continue
} }
disk.SetDiskLoc(s.poolIndex, m, n)
s.endpointStrings[m*setDriveCount+n] = disk.String() s.endpointStrings[m*setDriveCount+n] = disk.String()
s.erasureDisks[m][n] = disk s.erasureDisks[m][n] = disk
} }
// Initialize erasure objects for a given set. // Initialize erasure objects for a given set.
s.sets[i] = &erasureObjects{ s.sets[i] = &erasureObjects{
setNumber: i, setIndex: i,
poolIndex: poolIdx,
setDriveCount: setDriveCount, setDriveCount: setDriveCount,
defaultParityCount: defaultParityCount, defaultParityCount: defaultParityCount,
getDisks: s.GetDisks(i), getDisks: s.GetDisks(i),
@ -480,7 +484,7 @@ type auditObjectOp struct {
Disks []string `json:"disks"` Disks []string `json:"disks"`
} }
func auditObjectErasureSet(ctx context.Context, object string, set *erasureObjects, poolNum int) { func auditObjectErasureSet(ctx context.Context, object string, set *erasureObjects) {
if len(logger.AuditTargets) == 0 { if len(logger.AuditTargets) == 0 {
return return
} }
@ -488,8 +492,8 @@ func auditObjectErasureSet(ctx context.Context, object string, set *erasureObjec
object = decodeDirObject(object) object = decodeDirObject(object)
op := auditObjectOp{ op := auditObjectOp{
Pool: poolNum + 1, Pool: set.poolIndex + 1,
Set: set.setNumber + 1, Set: set.setIndex + 1,
Disks: set.getEndpoints(), Disks: set.getEndpoints(),
} }
@ -537,7 +541,7 @@ func (s *erasureSets) StorageUsageInfo(ctx context.Context) StorageInfo {
storageUsageInfo := func() StorageInfo { storageUsageInfo := func() StorageInfo {
var storageInfo StorageInfo var storageInfo StorageInfo
storageInfos := make([]StorageInfo, len(s.sets)) storageInfos := make([]StorageInfo, len(s.sets))
storageInfo.Backend.Type = BackendErasure storageInfo.Backend.Type = madmin.Erasure
g := errgroup.WithNErrs(len(s.sets)) g := errgroup.WithNErrs(len(s.sets))
for index := range s.sets { for index := range s.sets {
@ -572,9 +576,9 @@ func (s *erasureSets) StorageUsageInfo(ctx context.Context) StorageInfo {
// StorageInfo - combines output of StorageInfo across all erasure coded object sets. // StorageInfo - combines output of StorageInfo across all erasure coded object sets.
func (s *erasureSets) StorageInfo(ctx context.Context) (StorageInfo, []error) { func (s *erasureSets) StorageInfo(ctx context.Context) (StorageInfo, []error) {
var storageInfo StorageInfo var storageInfo madmin.StorageInfo
storageInfos := make([]StorageInfo, len(s.sets)) storageInfos := make([]madmin.StorageInfo, len(s.sets))
storageInfoErrs := make([][]error, len(s.sets)) storageInfoErrs := make([][]error, len(s.sets))
g := errgroup.WithNErrs(len(s.sets)) g := errgroup.WithNErrs(len(s.sets))
@ -839,7 +843,7 @@ func (s *erasureSets) ListBuckets(ctx context.Context) (buckets []BucketInfo, er
// GetObjectNInfo - returns object info and locked object ReadCloser // GetObjectNInfo - returns object info and locked object ReadCloser
func (s *erasureSets) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) { func (s *erasureSets) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
set := s.getHashedSet(object) set := s.getHashedSet(object)
auditObjectErasureSet(ctx, object, set, s.poolNumber) auditObjectErasureSet(ctx, object, set)
return set.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts) return set.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
} }
@ -853,7 +857,7 @@ func (s *erasureSets) parentDirIsObject(ctx context.Context, bucket, parent stri
// PutObject - writes an object to hashedSet based on the object name. // PutObject - writes an object to hashedSet based on the object name.
func (s *erasureSets) PutObject(ctx context.Context, bucket string, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) { func (s *erasureSets) PutObject(ctx context.Context, bucket string, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
set := s.getHashedSet(object) set := s.getHashedSet(object)
auditObjectErasureSet(ctx, object, set, s.poolNumber) auditObjectErasureSet(ctx, object, set)
opts.ParentIsObject = s.parentDirIsObject opts.ParentIsObject = s.parentDirIsObject
return set.PutObject(ctx, bucket, object, data, opts) return set.PutObject(ctx, bucket, object, data, opts)
} }
@ -861,14 +865,14 @@ func (s *erasureSets) PutObject(ctx context.Context, bucket string, object strin
// GetObjectInfo - reads object metadata from the hashedSet based on the object name. // GetObjectInfo - reads object metadata from the hashedSet based on the object name.
func (s *erasureSets) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { func (s *erasureSets) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
set := s.getHashedSet(object) set := s.getHashedSet(object)
auditObjectErasureSet(ctx, object, set, s.poolNumber) auditObjectErasureSet(ctx, object, set)
return set.GetObjectInfo(ctx, bucket, object, opts) return set.GetObjectInfo(ctx, bucket, object, opts)
} }
// DeleteObject - deletes an object from the hashedSet based on the object name. // DeleteObject - deletes an object from the hashedSet based on the object name.
func (s *erasureSets) DeleteObject(ctx context.Context, bucket string, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { func (s *erasureSets) DeleteObject(ctx context.Context, bucket string, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
set := s.getHashedSet(object) set := s.getHashedSet(object)
auditObjectErasureSet(ctx, object, set, s.poolNumber) auditObjectErasureSet(ctx, object, set)
return set.DeleteObject(ctx, bucket, object, opts) return set.DeleteObject(ctx, bucket, object, opts)
} }
@ -920,7 +924,7 @@ func (s *erasureSets) DeleteObjects(ctx context.Context, bucket string, objects
delErrs[obj.origIndex] = errs[i] delErrs[obj.origIndex] = errs[i]
delObjects[obj.origIndex] = dobjects[i] delObjects[obj.origIndex] = dobjects[i]
if errs[i] == nil { if errs[i] == nil {
auditObjectErasureSet(ctx, obj.object.ObjectName, set, s.poolNumber) auditObjectErasureSet(ctx, obj.object.ObjectName, set)
} }
} }
} }
@ -933,7 +937,7 @@ func (s *erasureSets) CopyObject(ctx context.Context, srcBucket, srcObject, dstB
srcSet := s.getHashedSet(srcObject) srcSet := s.getHashedSet(srcObject)
dstSet := s.getHashedSet(dstObject) dstSet := s.getHashedSet(dstObject)
auditObjectErasureSet(ctx, dstObject, dstSet, s.poolNumber) auditObjectErasureSet(ctx, dstObject, dstSet)
cpSrcDstSame := srcSet == dstSet cpSrcDstSame := srcSet == dstSet
// Check if this request is only metadata update. // Check if this request is only metadata update.
@ -1117,14 +1121,14 @@ func (s *erasureSets) ListMultipartUploads(ctx context.Context, bucket, prefix,
// In list multipart uploads we are going to treat input prefix as the object, // In list multipart uploads we are going to treat input prefix as the object,
// this means that we are not supporting directory navigation. // this means that we are not supporting directory navigation.
set := s.getHashedSet(prefix) set := s.getHashedSet(prefix)
auditObjectErasureSet(ctx, prefix, set, s.poolNumber) auditObjectErasureSet(ctx, prefix, set)
return set.ListMultipartUploads(ctx, bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads) return set.ListMultipartUploads(ctx, bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
} }
// Initiate a new multipart upload on a hashedSet based on object name. // Initiate a new multipart upload on a hashedSet based on object name.
func (s *erasureSets) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (uploadID string, err error) { func (s *erasureSets) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (uploadID string, err error) {
set := s.getHashedSet(object) set := s.getHashedSet(object)
auditObjectErasureSet(ctx, object, set, s.poolNumber) auditObjectErasureSet(ctx, object, set)
return set.NewMultipartUpload(ctx, bucket, object, opts) return set.NewMultipartUpload(ctx, bucket, object, opts)
} }
@ -1132,42 +1136,42 @@ func (s *erasureSets) NewMultipartUpload(ctx context.Context, bucket, object str
func (s *erasureSets) CopyObjectPart(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, uploadID string, partID int, func (s *erasureSets) CopyObjectPart(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, uploadID string, partID int,
startOffset int64, length int64, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (partInfo PartInfo, err error) { startOffset int64, length int64, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (partInfo PartInfo, err error) {
destSet := s.getHashedSet(destObject) destSet := s.getHashedSet(destObject)
auditObjectErasureSet(ctx, destObject, destSet, s.poolNumber) auditObjectErasureSet(ctx, destObject, destSet)
return destSet.PutObjectPart(ctx, destBucket, destObject, uploadID, partID, NewPutObjReader(srcInfo.Reader), dstOpts) return destSet.PutObjectPart(ctx, destBucket, destObject, uploadID, partID, NewPutObjReader(srcInfo.Reader), dstOpts)
} }
// PutObjectPart - writes part of an object to hashedSet based on the object name. // PutObjectPart - writes part of an object to hashedSet based on the object name.
func (s *erasureSets) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *PutObjReader, opts ObjectOptions) (info PartInfo, err error) { func (s *erasureSets) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *PutObjReader, opts ObjectOptions) (info PartInfo, err error) {
set := s.getHashedSet(object) set := s.getHashedSet(object)
auditObjectErasureSet(ctx, object, set, s.poolNumber) auditObjectErasureSet(ctx, object, set)
return set.PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts) return set.PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts)
} }
// GetMultipartInfo - return multipart metadata info uploaded at hashedSet. // GetMultipartInfo - return multipart metadata info uploaded at hashedSet.
func (s *erasureSets) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (result MultipartInfo, err error) { func (s *erasureSets) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (result MultipartInfo, err error) {
set := s.getHashedSet(object) set := s.getHashedSet(object)
auditObjectErasureSet(ctx, object, set, s.poolNumber) auditObjectErasureSet(ctx, object, set)
return set.GetMultipartInfo(ctx, bucket, object, uploadID, opts) return set.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
} }
// ListObjectParts - lists all uploaded parts to an object in hashedSet. // ListObjectParts - lists all uploaded parts to an object in hashedSet.
func (s *erasureSets) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) { func (s *erasureSets) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) {
set := s.getHashedSet(object) set := s.getHashedSet(object)
auditObjectErasureSet(ctx, object, set, s.poolNumber) auditObjectErasureSet(ctx, object, set)
return set.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts) return set.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts)
} }
// Aborts an in-progress multipart operation on hashedSet based on the object name. // Aborts an in-progress multipart operation on hashedSet based on the object name.
func (s *erasureSets) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error { func (s *erasureSets) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error {
set := s.getHashedSet(object) set := s.getHashedSet(object)
auditObjectErasureSet(ctx, object, set, s.poolNumber) auditObjectErasureSet(ctx, object, set)
return set.AbortMultipartUpload(ctx, bucket, object, uploadID, opts) return set.AbortMultipartUpload(ctx, bucket, object, uploadID, opts)
} }
// CompleteMultipartUpload - completes a pending multipart transaction, on hashedSet based on object name. // CompleteMultipartUpload - completes a pending multipart transaction, on hashedSet based on object name.
func (s *erasureSets) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error) { func (s *erasureSets) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error) {
set := s.getHashedSet(object) set := s.getHashedSet(object)
auditObjectErasureSet(ctx, object, set, s.poolNumber) auditObjectErasureSet(ctx, object, set)
opts.ParentIsObject = s.parentDirIsObject opts.ParentIsObject = s.parentDirIsObject
return set.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts) return set.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts)
} }
@ -1351,8 +1355,8 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
res.Before.Drives = make([]madmin.HealDriveInfo, len(beforeDrives)) res.Before.Drives = make([]madmin.HealDriveInfo, len(beforeDrives))
// Copy "after" drive state too from before. // Copy "after" drive state too from before.
for k, v := range beforeDrives { for k, v := range beforeDrives {
res.Before.Drives[k] = madmin.HealDriveInfo(v) res.Before.Drives[k] = v
res.After.Drives[k] = madmin.HealDriveInfo(v) res.After.Drives[k] = v
} }
if countErrs(sErrs, errUnformattedDisk) == 0 { if countErrs(sErrs, errUnformattedDisk) == 0 {
@ -1395,7 +1399,7 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
if s.erasureDisks[m][n] != nil { if s.erasureDisks[m][n] != nil {
s.erasureDisks[m][n].Close() s.erasureDisks[m][n].Close()
} }
storageDisks[index].SetDiskLoc(s.poolIndex, m, n)
s.erasureDisks[m][n] = storageDisks[index] s.erasureDisks[m][n] = storageDisks[index]
s.endpointStrings[m*s.setDriveCount+n] = storageDisks[index].String() s.endpointStrings[m*s.setDriveCount+n] = storageDisks[index].String()
} }

View File

@ -189,7 +189,7 @@ func TestNewErasureSets(t *testing.T) {
t.Fatalf("Unable to format disks for erasure, %s", err) t.Fatalf("Unable to format disks for erasure, %s", err)
} }
if _, err := newErasureSets(ctx, endpoints, storageDisks, format, ecDrivesNoConfig(16)); err != nil { if _, err := newErasureSets(ctx, endpoints, storageDisks, format, ecDrivesNoConfig(16), 0); err != nil {
t.Fatalf("Unable to initialize erasure") t.Fatalf("Unable to initialize erasure")
} }
} }

View File

@ -53,7 +53,8 @@ type erasureObjects struct {
setDriveCount int setDriveCount int
defaultParityCount int defaultParityCount int
setNumber int setIndex int
poolIndex int
// getDisks returns list of storageAPIs. // getDisks returns list of storageAPIs.
getDisks func() []StorageAPI getDisks func() []StorageAPI
@ -186,7 +187,7 @@ func getDisksInfo(disks []StorageAPI, endpoints []string) (disksInfo []madmin.Di
} }
info, err := disks[index].DiskInfo(context.TODO()) info, err := disks[index].DiskInfo(context.TODO())
di := madmin.Disk{ di := madmin.Disk{
Endpoint: endpoints[index], Endpoint: info.Endpoint,
DrivePath: info.MountPath, DrivePath: info.MountPath,
TotalSpace: info.Total, TotalSpace: info.Total,
UsedSpace: info.Used, UsedSpace: info.Used,
@ -196,6 +197,13 @@ func getDisksInfo(disks []StorageAPI, endpoints []string) (disksInfo []madmin.Di
Healing: info.Healing, Healing: info.Healing,
State: diskErrToDriveState(err), State: diskErrToDriveState(err),
} }
di.PoolIndex, di.SetIndex, di.DiskIndex = disks[index].GetDiskLoc()
if info.Healing {
if hi := disks[index].Healing(); hi != nil {
hd := hi.toHealingDisk()
di.HealInfo = &hd
}
}
if info.Total > 0 { if info.Total > 0 {
di.Utilization = float64(info.Used / info.Total * 100) di.Utilization = float64(info.Used / info.Total * 100)
} }
@ -218,7 +226,7 @@ func getStorageInfo(disks []StorageAPI, endpoints []string) (StorageInfo, []erro
Disks: disksInfo, Disks: disksInfo,
} }
storageInfo.Backend.Type = BackendErasure storageInfo.Backend.Type = madmin.Erasure
return storageInfo, errs return storageInfo, errs
} }

View File

@ -340,19 +340,6 @@ func loadFormatErasureAll(storageDisks []StorageAPI, heal bool) ([]*formatErasur
return formats, g.Wait() return formats, g.Wait()
} }
func saveHealingTracker(disk StorageAPI, diskID string) error {
htracker := healingTracker{
ID: diskID,
}
htrackerBytes, err := htracker.MarshalMsg(nil)
if err != nil {
return err
}
return disk.WriteAll(context.TODO(), minioMetaBucket,
pathJoin(bucketMetaPrefix, slashSeparator, healingTrackerFilename),
htrackerBytes)
}
func saveFormatErasure(disk StorageAPI, format *formatErasureV3, heal bool) error { func saveFormatErasure(disk StorageAPI, format *formatErasureV3, heal bool) error {
if disk == nil || format == nil { if disk == nil || format == nil {
return errDiskNotFound return errDiskNotFound
@ -387,7 +374,9 @@ func saveFormatErasure(disk StorageAPI, format *formatErasureV3, heal bool) erro
disk.SetDiskID(diskID) disk.SetDiskID(diskID)
if heal { if heal {
return saveHealingTracker(disk, diskID) ctx := context.Background()
ht := newHealingTracker(disk)
return ht.save(ctx)
} }
return nil return nil
} }

View File

@ -202,8 +202,8 @@ func (fs *FSObjects) Shutdown(ctx context.Context) error {
} }
// BackendInfo - returns backend information // BackendInfo - returns backend information
func (fs *FSObjects) BackendInfo() BackendInfo { func (fs *FSObjects) BackendInfo() madmin.BackendInfo {
return BackendInfo{Type: BackendFS} return madmin.BackendInfo{Type: madmin.FS}
} }
// LocalStorageInfo - returns underlying storage statistics. // LocalStorageInfo - returns underlying storage statistics.
@ -232,7 +232,7 @@ func (fs *FSObjects) StorageInfo(ctx context.Context) (StorageInfo, []error) {
}, },
}, },
} }
storageInfo.Backend.Type = BackendFS storageInfo.Backend.Type = madmin.FS
return storageInfo, nil return storageInfo, nil
} }

View File

@ -35,8 +35,8 @@ import (
type GatewayUnsupported struct{} type GatewayUnsupported struct{}
// BackendInfo returns the underlying backend information // BackendInfo returns the underlying backend information
func (a GatewayUnsupported) BackendInfo() BackendInfo { func (a GatewayUnsupported) BackendInfo() madmin.BackendInfo {
return BackendInfo{Type: BackendGateway} return madmin.BackendInfo{Type: madmin.Gateway}
} }
// LocalStorageInfo returns the local disks information, mainly used // LocalStorageInfo returns the local disks information, mainly used

View File

@ -36,19 +36,18 @@ import (
"strings" "strings"
"time" "time"
"github.com/minio/minio/pkg/env"
"github.com/Azure/azure-pipeline-go/pipeline" "github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-blob-go/azblob" "github.com/Azure/azure-storage-blob-go/azblob"
humanize "github.com/dustin/go-humanize" humanize "github.com/dustin/go-humanize"
"github.com/minio/cli" "github.com/minio/cli"
miniogopolicy "github.com/minio/minio-go/v7/pkg/policy" miniogopolicy "github.com/minio/minio-go/v7/pkg/policy"
minio "github.com/minio/minio/cmd"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/bucket/policy" "github.com/minio/minio/pkg/bucket/policy"
"github.com/minio/minio/pkg/bucket/policy/condition" "github.com/minio/minio/pkg/bucket/policy/condition"
"github.com/minio/minio/pkg/env"
minio "github.com/minio/minio/cmd" "github.com/minio/minio/pkg/madmin"
) )
const ( const (
@ -545,7 +544,7 @@ func (a *azureObjects) Shutdown(ctx context.Context) error {
// StorageInfo - Not relevant to Azure backend. // StorageInfo - Not relevant to Azure backend.
func (a *azureObjects) StorageInfo(ctx context.Context) (si minio.StorageInfo, _ []error) { func (a *azureObjects) StorageInfo(ctx context.Context) (si minio.StorageInfo, _ []error) {
si.Backend.Type = minio.BackendGateway si.Backend.Type = madmin.Gateway
host := a.endpoint.Host host := a.endpoint.Host
if a.endpoint.Port() == "" { if a.endpoint.Port() == "" {
host = a.endpoint.Host + ":" + a.endpoint.Scheme host = a.endpoint.Host + ":" + a.endpoint.Scheme

View File

@ -29,9 +29,8 @@ import (
"net/http" "net/http"
"os" "os"
"path" "path"
"strconv"
"regexp" "regexp"
"strconv"
"strings" "strings"
"time" "time"
@ -39,17 +38,16 @@ import (
humanize "github.com/dustin/go-humanize" humanize "github.com/dustin/go-humanize"
"github.com/minio/cli" "github.com/minio/cli"
miniogopolicy "github.com/minio/minio-go/v7/pkg/policy" miniogopolicy "github.com/minio/minio-go/v7/pkg/policy"
minio "github.com/minio/minio/cmd"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/bucket/policy" "github.com/minio/minio/pkg/bucket/policy"
"github.com/minio/minio/pkg/bucket/policy/condition" "github.com/minio/minio/pkg/bucket/policy/condition"
"github.com/minio/minio/pkg/env" "github.com/minio/minio/pkg/env"
"github.com/minio/minio/pkg/madmin"
"google.golang.org/api/googleapi" "google.golang.org/api/googleapi"
"google.golang.org/api/iterator" "google.golang.org/api/iterator"
"google.golang.org/api/option" "google.golang.org/api/option"
minio "github.com/minio/minio/cmd"
) )
var ( var (
@ -413,7 +411,7 @@ func (l *gcsGateway) Shutdown(ctx context.Context) error {
// StorageInfo - Not relevant to GCS backend. // StorageInfo - Not relevant to GCS backend.
func (l *gcsGateway) StorageInfo(ctx context.Context) (si minio.StorageInfo, _ []error) { func (l *gcsGateway) StorageInfo(ctx context.Context) (si minio.StorageInfo, _ []error) {
si.Backend.Type = minio.BackendGateway si.Backend.Type = madmin.Gateway
si.Backend.GatewayOnline = minio.IsBackendOnline(ctx, "storage.googleapis.com:443") si.Backend.GatewayOnline = minio.IsBackendOnline(ctx, "storage.googleapis.com:443")
return si, nil return si, nil
} }

View File

@ -244,7 +244,7 @@ func (n *hdfsObjects) StorageInfo(ctx context.Context) (si minio.StorageInfo, er
si.Disks = []madmin.Disk{{ si.Disks = []madmin.Disk{{
UsedSpace: fsInfo.Used, UsedSpace: fsInfo.Used,
}} }}
si.Backend.Type = minio.BackendGateway si.Backend.Type = madmin.Gateway
si.Backend.GatewayOnline = true si.Backend.GatewayOnline = true
return si, nil return si, nil
} }

View File

@ -22,6 +22,7 @@ import (
"github.com/minio/cli" "github.com/minio/cli"
minio "github.com/minio/minio/cmd" minio "github.com/minio/minio/cmd"
"github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/madmin"
) )
func init() { func init() {
@ -106,8 +107,8 @@ func (n *nasObjects) IsListenSupported() bool {
func (n *nasObjects) StorageInfo(ctx context.Context) (si minio.StorageInfo, _ []error) { func (n *nasObjects) StorageInfo(ctx context.Context) (si minio.StorageInfo, _ []error) {
si, errs := n.ObjectLayer.StorageInfo(ctx) si, errs := n.ObjectLayer.StorageInfo(ctx)
si.Backend.GatewayOnline = si.Backend.Type == minio.BackendFS si.Backend.GatewayOnline = si.Backend.Type == madmin.FS
si.Backend.Type = minio.BackendGateway si.Backend.Type = madmin.Gateway
return si, errs return si, errs
} }

View File

@ -29,15 +29,15 @@ import (
"github.com/minio/cli" "github.com/minio/cli"
miniogo "github.com/minio/minio-go/v7" miniogo "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio-go/v7/pkg/tags"
minio "github.com/minio/minio/cmd"
"github.com/minio/minio-go/v7/pkg/encrypt" "github.com/minio/minio-go/v7/pkg/encrypt"
"github.com/minio/minio-go/v7/pkg/s3utils" "github.com/minio/minio-go/v7/pkg/s3utils"
"github.com/minio/minio-go/v7/pkg/tags"
minio "github.com/minio/minio/cmd"
xhttp "github.com/minio/minio/cmd/http" xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/auth"
"github.com/minio/minio/pkg/bucket/policy" "github.com/minio/minio/pkg/bucket/policy"
"github.com/minio/minio/pkg/madmin"
) )
func init() { func init() {
@ -276,7 +276,7 @@ func (l *s3Objects) Shutdown(ctx context.Context) error {
// StorageInfo is not relevant to S3 backend. // StorageInfo is not relevant to S3 backend.
func (l *s3Objects) StorageInfo(ctx context.Context) (si minio.StorageInfo, _ []error) { func (l *s3Objects) StorageInfo(ctx context.Context) (si minio.StorageInfo, _ []error) {
si.Backend.Type = minio.BackendGateway si.Backend.Type = madmin.Gateway
host := l.Client.EndpointURL().Host host := l.Client.EndpointURL().Host
if l.Client.EndpointURL().Port() == "" { if l.Client.EndpointURL().Port() == "" {
host = l.Client.EndpointURL().Host + ":" + l.Client.EndpointURL().Scheme host = l.Client.EndpointURL().Host + ":" + l.Client.EndpointURL().Scheme

View File

@ -19,6 +19,8 @@ package cmd
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"sort"
"time" "time"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
@ -64,7 +66,8 @@ func newBgHealSequence() *healSequence {
} }
} }
func getLocalBackgroundHealStatus() (madmin.BgHealState, bool) { // getBackgroundHealStatus will return the
func getBackgroundHealStatus(ctx context.Context, o ObjectLayer) (madmin.BgHealState, bool) {
if globalBackgroundHealState == nil { if globalBackgroundHealState == nil {
return madmin.BgHealState{}, false return madmin.BgHealState{}, false
} }
@ -78,24 +81,51 @@ func getLocalBackgroundHealStatus() (madmin.BgHealState, bool) {
for _, ep := range getLocalDisksToHeal() { for _, ep := range getLocalDisksToHeal() {
healDisksMap[ep.String()] = struct{}{} healDisksMap[ep.String()] = struct{}{}
} }
status := madmin.BgHealState{
for _, ep := range globalBackgroundHealState.getHealLocalDisks() {
if _, ok := healDisksMap[ep.String()]; !ok {
healDisksMap[ep.String()] = struct{}{}
}
}
var healDisks []string
for disk := range healDisksMap {
healDisks = append(healDisks, disk)
}
return madmin.BgHealState{
ScannedItemsCount: bgSeq.getScannedItemsCount(), ScannedItemsCount: bgSeq.getScannedItemsCount(),
LastHealActivity: bgSeq.lastHealActivity, }
HealDisks: healDisks,
NextHealRound: UTCNow(), if o == nil {
}, true healing := globalBackgroundHealState.getLocalHealingDisks()
for _, disk := range healing {
status.HealDisks = append(status.HealDisks, disk.Endpoint)
}
return status, true
}
// ignores any errors here.
si, _ := o.StorageInfo(ctx)
indexed := make(map[string][]madmin.Disk)
for _, disk := range si.Disks {
setIdx := fmt.Sprintf("%d-%d", disk.PoolIndex, disk.SetIndex)
indexed[setIdx] = append(indexed[setIdx], disk)
}
for id, disks := range indexed {
ss := madmin.SetStatus{
ID: id,
SetIndex: disks[0].SetIndex,
PoolIndex: disks[0].PoolIndex,
}
for _, disk := range disks {
ss.Disks = append(ss.Disks, disk)
if disk.Healing {
ss.HealStatus = "Healing"
ss.HealPriority = "high"
status.HealDisks = append(status.HealDisks, disk.Endpoint)
}
}
sortDisks(ss.Disks)
status.Sets = append(status.Sets, ss)
}
sort.Slice(status.Sets, func(i, j int) bool {
return status.Sets[i].ID < status.Sets[j].ID
})
return status, true
} }
func mustGetHealSequence(ctx context.Context) *healSequence { func mustGetHealSequence(ctx context.Context) *healSequence {
@ -120,7 +150,7 @@ func mustGetHealSequence(ctx context.Context) *healSequence {
} }
// healErasureSet lists and heals all objects in a specific erasure set // healErasureSet lists and heals all objects in a specific erasure set
func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketInfo) error { func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketInfo, tracker *healingTracker) error {
bgSeq := mustGetHealSequence(ctx) bgSeq := mustGetHealSequence(ctx)
buckets = append(buckets, BucketInfo{ buckets = append(buckets, BucketInfo{
Name: pathJoin(minioMetaBucket, minioConfigPrefix), Name: pathJoin(minioMetaBucket, minioConfigPrefix),
@ -135,6 +165,21 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn
// Heal all buckets with all objects // Heal all buckets with all objects
for _, bucket := range buckets { for _, bucket := range buckets {
if tracker.isHealed(bucket.Name) {
continue
}
var forwardTo string
// If we resume to the same bucket, forward to last known item.
if tracker.Bucket != "" {
if tracker.Bucket == bucket.Name {
forwardTo = tracker.Bucket
} else {
// Reset to where last bucket ended if resuming.
tracker.resume()
}
}
tracker.Object = ""
tracker.Bucket = bucket.Name
// Heal current bucket // Heal current bucket
if _, err := er.HealBucket(ctx, bucket.Name, madmin.HealOpts{}); err != nil { if _, err := er.HealBucket(ctx, bucket.Name, madmin.HealOpts{}); err != nil {
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
@ -143,7 +188,7 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn
} }
if serverDebugLog { if serverDebugLog {
console.Debugf(color.Green("healDisk:")+" healing bucket %s content on erasure set %d\n", bucket.Name, er.setNumber+1) console.Debugf(color.Green("healDisk:")+" healing bucket %s content on erasure set %d\n", bucket.Name, tracker.SetIndex+1)
} }
disks, _ := er.getOnlineDisksWithHealing() disks, _ := er.getOnlineDisksWithHealing()
@ -167,17 +212,27 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn
for _, version := range fivs.Versions { for _, version := range fivs.Versions {
if _, err := er.HealObject(ctx, bucket.Name, version.Name, version.VersionID, madmin.HealOpts{ScanMode: madmin.HealNormalScan, Remove: healDeleteDangling}); err != nil { if _, err := er.HealObject(ctx, bucket.Name, version.Name, version.VersionID, madmin.HealOpts{ScanMode: madmin.HealNormalScan, Remove: healDeleteDangling}); err != nil {
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
// If not deleted, assume they failed.
tracker.ObjectsFailed++
tracker.BytesFailed += uint64(version.Size)
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
} }
} else {
tracker.ObjectsHealed++
tracker.BytesDone += uint64(version.Size)
} }
bgSeq.logHeal(madmin.HealItemObject) bgSeq.logHeal(madmin.HealItemObject)
} }
tracker.Object = entry.name
if time.Since(tracker.LastUpdate) > time.Minute {
logger.LogIf(ctx, tracker.update(ctx))
}
} }
err := listPathRaw(ctx, listPathRawOptions{ err := listPathRaw(ctx, listPathRawOptions{
disks: disks, disks: disks,
bucket: bucket.Name, bucket: bucket.Name,
recursive: true, recursive: true,
forwardTo: "", //TODO(klauspost): Set this to last known offset when resuming. forwardTo: forwardTo,
minDisks: 1, minDisks: 1,
reportNotFound: false, reportNotFound: false,
agreed: healEntry, agreed: healEntry,
@ -189,8 +244,18 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []BucketIn
}, },
finished: nil, finished: nil,
}) })
logger.LogIf(ctx, err) select {
// If context is canceled don't mark as done...
case <-ctx.Done():
return ctx.Err()
default:
logger.LogIf(ctx, err)
tracker.bucketDone(bucket.Name)
logger.LogIf(ctx, tracker.update(ctx))
}
} }
tracker.Object = ""
tracker.Bucket = ""
return nil return nil
} }

View File

@ -66,7 +66,7 @@ func (d *naughtyDisk) Hostname() string {
return d.disk.Hostname() return d.disk.Hostname()
} }
func (d *naughtyDisk) Healing() bool { func (d *naughtyDisk) Healing() *healingTracker {
return d.disk.Healing() return d.disk.Healing()
} }
@ -90,6 +90,12 @@ func (d *naughtyDisk) calcError() (err error) {
return nil return nil
} }
func (d *naughtyDisk) GetDiskLoc() (poolIdx, setIdx, diskIdx int) {
return -1, -1, -1
}
func (d *naughtyDisk) SetDiskLoc(poolIdx, setIdx, diskIdx int) {}
func (d *naughtyDisk) GetDiskID() (string, error) { func (d *naughtyDisk) GetDiskID() (string, error) {
return d.disk.GetDiskID() return d.disk.GetDiskID()
} }

View File

@ -32,36 +32,18 @@ type BackendType int
// Enum for different backend types. // Enum for different backend types.
const ( const (
Unknown BackendType = iota Unknown = BackendType(madmin.Unknown)
// Filesystem backend. // Filesystem backend.
BackendFS BackendFS = BackendType(madmin.FS)
// Multi disk BackendErasure (single, distributed) backend. // Multi disk BackendErasure (single, distributed) backend.
BackendErasure BackendErasure = BackendType(madmin.Erasure)
// Gateway backend. // Gateway backend.
BackendGateway BackendGateway = BackendType(madmin.Gateway)
// Add your own backend. // Add your own backend.
) )
// BackendInfo - contains info of the underlying backend
type BackendInfo struct {
// Represents various backend types, currently on FS, Erasure and Gateway
Type BackendType
// Following fields are only meaningful if BackendType is Gateway.
GatewayOnline bool
// Following fields are only meaningful if BackendType is Erasure.
StandardSCData []int // Data disks for currently configured Standard storage class.
StandardSCParity int // Parity disks for currently configured Standard storage class.
RRSCData []int // Data disks for currently configured Reduced Redundancy storage class.
RRSCParity int // Parity disks for currently configured Reduced Redundancy storage class.
}
// StorageInfo - represents total capacity of underlying storage. // StorageInfo - represents total capacity of underlying storage.
type StorageInfo struct { type StorageInfo = madmin.StorageInfo
Disks []madmin.Disk
Backend BackendInfo
}
// objectHistogramInterval is an interval that will be // objectHistogramInterval is an interval that will be
// used to report the histogram of objects data sizes // used to report the histogram of objects data sizes

View File

@ -89,7 +89,7 @@ type ObjectLayer interface {
Shutdown(context.Context) error Shutdown(context.Context) error
NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error
BackendInfo() BackendInfo BackendInfo() madmin.BackendInfo
StorageInfo(ctx context.Context) (StorageInfo, []error) StorageInfo(ctx context.Context) (StorageInfo, []error)
LocalStorageInfo(ctx context.Context) (StorageInfo, []error) LocalStorageInfo(ctx context.Context) (StorageInfo, []error)

View File

@ -947,10 +947,9 @@ func (s *peerRESTServer) BackgroundHealStatusHandler(w http.ResponseWriter, r *h
s.writeErrorResponse(w, errors.New("invalid request")) s.writeErrorResponse(w, errors.New("invalid request"))
return return
} }
ctx := newContext(r, w, "BackgroundHealStatus") ctx := newContext(r, w, "BackgroundHealStatus")
state, ok := getLocalBackgroundHealStatus() state, ok := getBackgroundHealStatus(ctx, newObjectLayerFn())
if !ok { if !ok {
s.writeErrorResponse(w, errServerNotInitialized) s.writeErrorResponse(w, errServerNotInitialized)
return return

View File

@ -27,6 +27,7 @@ import (
"github.com/minio/minio/cmd/config" "github.com/minio/minio/cmd/config"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
color "github.com/minio/minio/pkg/color" color "github.com/minio/minio/pkg/color"
"github.com/minio/minio/pkg/madmin"
xnet "github.com/minio/minio/pkg/net" xnet "github.com/minio/minio/pkg/net"
) )
@ -206,7 +207,7 @@ func getStorageInfoMsg(storageInfo StorageInfo) string {
var msg string var msg string
var mcMessage string var mcMessage string
onlineDisks, offlineDisks := getOnlineOfflineDisksStats(storageInfo.Disks) onlineDisks, offlineDisks := getOnlineOfflineDisksStats(storageInfo.Disks)
if storageInfo.Backend.Type == BackendErasure { if storageInfo.Backend.Type == madmin.Erasure {
if offlineDisks.Sum() > 0 { if offlineDisks.Sum() > 0 {
mcMessage = "Use `mc admin info` to look for latest server/disk info\n" mcMessage = "Use `mc admin info` to look for latest server/disk info\n"
} }

View File

@ -38,7 +38,7 @@ func TestStorageInfoMsg(t *testing.T) {
{Endpoint: "http://127.0.0.1:9001/data/3/", State: madmin.DriveStateOk}, {Endpoint: "http://127.0.0.1:9001/data/3/", State: madmin.DriveStateOk},
{Endpoint: "http://127.0.0.1:9001/data/4/", State: madmin.DriveStateOffline}, {Endpoint: "http://127.0.0.1:9001/data/4/", State: madmin.DriveStateOffline},
} }
infoStorage.Backend.Type = BackendErasure infoStorage.Backend.Type = madmin.Erasure
if msg := getStorageInfoMsg(infoStorage); !strings.Contains(msg, "7 Online, 1 Offline") { if msg := getStorageInfoMsg(infoStorage); !strings.Contains(msg, "7 Online, 1 Offline") {
t.Fatal("Unexpected storage info message, found:", msg) t.Fatal("Unexpected storage info message, found:", msg)

View File

@ -36,7 +36,7 @@ type StorageAPI interface {
Close() error Close() error
GetDiskID() (string, error) GetDiskID() (string, error)
SetDiskID(id string) SetDiskID(id string)
Healing() bool // Returns if disk is healing. Healing() *healingTracker // Returns nil if disk is not healing.
DiskInfo(ctx context.Context) (info DiskInfo, err error) DiskInfo(ctx context.Context) (info DiskInfo, err error)
NSScanner(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) NSScanner(ctx context.Context, cache dataUsageCache) (dataUsageCache, error)
@ -79,6 +79,9 @@ type StorageAPI interface {
// Read all. // Read all.
ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error) ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error)
GetDiskLoc() (poolIdx, setIdx, diskIdx int) // Retrieve location indexes.
SetDiskLoc(poolIdx, setIdx, diskIdx int) // Set location indexes.
} }
// storageReader is an io.Reader view of a disk // storageReader is an io.Reader view of a disk

View File

@ -29,6 +29,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time"
"github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/http"
xhttp "github.com/minio/minio/cmd/http" xhttp "github.com/minio/minio/cmd/http"
@ -119,6 +120,24 @@ type storageRESTClient struct {
endpoint Endpoint endpoint Endpoint
restClient *rest.Client restClient *rest.Client
diskID string diskID string
// Indexes, will be -1 until assigned a set.
poolIndex, setIndex, diskIndex int
diskInfoCache timedValue
diskHealCache timedValue
}
// Retrieve location indexes.
func (client *storageRESTClient) GetDiskLoc() (poolIdx, setIdx, diskIdx int) {
return client.poolIndex, client.setIndex, client.diskIndex
}
// Set location indexes.
func (client *storageRESTClient) SetDiskLoc(poolIdx, setIdx, diskIdx int) {
client.poolIndex = poolIdx
client.setIndex = setIdx
client.diskIndex = diskIdx
} }
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is makred disconnected // Wrapper to restClient.Call to handle network errors, in case of network error the connection is makred disconnected
@ -160,14 +179,26 @@ func (client *storageRESTClient) Endpoint() Endpoint {
return client.endpoint return client.endpoint
} }
func (client *storageRESTClient) Healing() bool { func (client *storageRESTClient) Healing() *healingTracker {
// This call should never be called over the network client.diskHealCache.Once.Do(func() {
// this function should always return 'false' // Update at least every second.
// client.diskHealCache.TTL = time.Second
// To know if a remote disk is being healed client.diskHealCache.Update = func() (interface{}, error) {
// perform DiskInfo() call which would return ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
// back the correct data if disk is being healed. defer cancel()
return false b, err := client.ReadAll(ctx, minioMetaBucket,
pathJoin(bucketMetaPrefix, healingTrackerFilename))
if err != nil {
// If error, likely not healing.
return (*healingTracker)(nil), nil
}
var h healingTracker
_, err = h.UnmarshalMsg(b)
return &h, err
}
})
val, _ := client.diskHealCache.Get()
return val.(*healingTracker)
} }
func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) { func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) {
@ -207,18 +238,31 @@ func (client *storageRESTClient) SetDiskID(id string) {
// DiskInfo - fetch disk information for a remote disk. // DiskInfo - fetch disk information for a remote disk.
func (client *storageRESTClient) DiskInfo(ctx context.Context) (info DiskInfo, err error) { func (client *storageRESTClient) DiskInfo(ctx context.Context) (info DiskInfo, err error) {
respBody, err := client.call(ctx, storageRESTMethodDiskInfo, nil, nil, -1) client.diskInfoCache.Once.Do(func() {
if err != nil { client.diskInfoCache.TTL = time.Second
return info, err client.diskInfoCache.Update = func() (interface{}, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
respBody, err := client.call(ctx, storageRESTMethodDiskInfo, nil, nil, -1)
if err != nil {
return info, err
}
defer http.DrainBody(respBody)
if err = msgp.Decode(respBody, &info); err != nil {
return info, err
}
if info.Error != "" {
return info, toStorageErr(errors.New(info.Error))
}
return info, nil
}
})
val, err := client.diskInfoCache.Get()
if err == nil {
info = val.(DiskInfo)
} }
defer http.DrainBody(respBody)
if err = msgp.Decode(respBody, &info); err != nil { return info, err
return info, err
}
if info.Error != "" {
return info, toStorageErr(errors.New(info.Error))
}
return info, nil
} }
// MakeVolBulk - create multiple volumes in a bulk operation. // MakeVolBulk - create multiple volumes in a bulk operation.
@ -651,5 +695,5 @@ func newStorageRESTClient(endpoint Endpoint, healthcheck bool) *storageRESTClien
} }
} }
return &storageRESTClient{endpoint: endpoint, restClient: restClient} return &storageRESTClient{endpoint: endpoint, restClient: restClient, poolIndex: -1, setIndex: -1, diskIndex: -1}
} }

View File

@ -51,7 +51,7 @@ func (p *xlStorageDiskIDCheck) Hostname() string {
return p.storage.Hostname() return p.storage.Hostname()
} }
func (p *xlStorageDiskIDCheck) Healing() bool { func (p *xlStorageDiskIDCheck) Healing() *healingTracker {
return p.storage.Healing() return p.storage.Healing()
} }
@ -68,6 +68,14 @@ func (p *xlStorageDiskIDCheck) NSScanner(ctx context.Context, cache dataUsageCac
return p.storage.NSScanner(ctx, cache) return p.storage.NSScanner(ctx, cache)
} }
func (p *xlStorageDiskIDCheck) GetDiskLoc() (poolIdx, setIdx, diskIdx int) {
return p.storage.GetDiskLoc()
}
func (p *xlStorageDiskIDCheck) SetDiskLoc(poolIdx, setIdx, diskIdx int) {
p.storage.SetDiskLoc(poolIdx, setIdx, diskIdx)
}
func (p *xlStorageDiskIDCheck) Close() error { func (p *xlStorageDiskIDCheck) Close() error {
return p.storage.Close() return p.storage.Close()
} }

View File

@ -94,3 +94,16 @@ func getFileInfo(xlMetaBuf []byte, volume, path, versionID string) (FileInfo, er
fi.XLV1 = true // indicates older version fi.XLV1 = true // indicates older version
return fi, err return fi, err
} }
// getXLDiskLoc will return the pool/set/disk id if it can be located in the object layer.
// Will return -1 for unknown values.
func getXLDiskLoc(diskID string) (poolIdx, setIdx, diskIdx int) {
if api := newObjectLayerFn(); api != nil {
if ep, ok := api.(*erasureServerPools); ok {
if pool, set, disk, err := ep.getPoolAndSet(diskID); err == nil {
return pool, set, disk
}
}
}
return -1, -1, -1
}

View File

@ -105,6 +105,9 @@ type xlStorage struct {
diskID string diskID string
// Indexes, will be -1 until assigned a set.
poolIndex, setIndex, diskIndex int
formatFileInfo os.FileInfo formatFileInfo os.FileInfo
formatLegacy bool formatLegacy bool
formatLastCheck time.Time formatLastCheck time.Time
@ -268,6 +271,9 @@ func newXLStorage(ep Endpoint) (*xlStorage, error) {
ctx: GlobalContext, ctx: GlobalContext,
rootDisk: rootDisk, rootDisk: rootDisk,
readODirectSupported: true, readODirectSupported: true,
poolIndex: -1,
setIndex: -1,
diskIndex: -1,
} }
// Create all necessary bucket folders if possible. // Create all necessary bucket folders if possible.
@ -346,11 +352,33 @@ func (s *xlStorage) IsLocal() bool {
return true return true
} }
func (s *xlStorage) Healing() bool { // Retrieve location indexes.
func (s *xlStorage) GetDiskLoc() (poolIdx, setIdx, diskIdx int) {
// If unset, see if we can locate it.
if s.poolIndex < 0 || s.setIndex < 0 || s.diskIndex < 0 {
return getXLDiskLoc(s.diskID)
}
return s.poolIndex, s.setIndex, s.diskIndex
}
// Set location indexes.
func (s *xlStorage) SetDiskLoc(poolIdx, setIdx, diskIdx int) {
s.poolIndex = poolIdx
s.setIndex = setIdx
s.diskIndex = diskIdx
}
func (s *xlStorage) Healing() *healingTracker {
healingFile := pathJoin(s.diskPath, minioMetaBucket, healingFile := pathJoin(s.diskPath, minioMetaBucket,
bucketMetaPrefix, healingTrackerFilename) bucketMetaPrefix, healingTrackerFilename)
_, err := os.Lstat(healingFile) b, err := ioutil.ReadFile(healingFile)
return err == nil if err != nil {
return nil
}
var h healingTracker
_, err = h.UnmarshalMsg(b)
logger.LogIf(GlobalContext, err)
return &h
} }
func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) { func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache) (dataUsageCache, error) {
@ -461,7 +489,7 @@ func (s *xlStorage) DiskInfo(context.Context) (info DiskInfo, err error) {
} else { } else {
// Check if the disk is being healed if GetDiskID // Check if the disk is being healed if GetDiskID
// returned any error other than fresh disk // returned any error other than fresh disk
dcinfo.Healing = s.Healing() dcinfo.Healing = s.Healing() != nil
} }
dcinfo.ID = diskID dcinfo.ID = diskID

View File

@ -21,6 +21,7 @@ package main
import ( import (
"context" "context"
"encoding/json"
"log" "log"
"github.com/minio/minio/pkg/madmin" "github.com/minio/minio/pkg/madmin"
@ -41,6 +42,7 @@ func main() {
if err != nil { if err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
js, _ := json.MarshalIndent(healStatusResult, "", " ")
log.Printf("Heal status result: %+v\n", healStatusResult) log.Printf("Heal status result: %s\n", string(js))
} }

View File

@ -41,5 +41,5 @@ func main() {
if err != nil { if err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
log.Println(st) log.Printf("%+v\n", st)
} }

View File

@ -41,5 +41,5 @@ func main() {
if err != nil { if err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
log.Println(st) log.Printf("%+v\n", st)
} }

View File

@ -24,6 +24,7 @@ import (
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/url" "net/url"
"sort"
"time" "time"
) )
@ -47,6 +48,7 @@ type HealOpts struct {
DryRun bool `json:"dryRun"` DryRun bool `json:"dryRun"`
Remove bool `json:"remove"` Remove bool `json:"remove"`
Recreate bool `json:"recreate"` // only used when bucket needs to be healed Recreate bool `json:"recreate"` // only used when bucket needs to be healed
NoLock bool `json:"-"` // only used internally.
ScanMode HealScanMode `json:"scanMode"` ScanMode HealScanMode `json:"scanMode"`
} }
@ -298,9 +300,96 @@ func (adm *AdminClient) Heal(ctx context.Context, bucket, prefix string,
// BgHealState represents the status of the background heal // BgHealState represents the status of the background heal
type BgHealState struct { type BgHealState struct {
ScannedItemsCount int64 ScannedItemsCount int64
LastHealActivity time.Time
NextHealRound time.Time HealDisks []string
HealDisks []string
// SetStatus contains information for each set.
Sets []SetStatus `json:"sets"`
}
// SetStatus contains information about the heal status of a set.
type SetStatus struct {
ID string `json:"id"`
PoolIndex int `json:"pool_index"`
SetIndex int `json:"set_index"`
HealStatus string `json:"heal_status"`
HealPriority string `json:"heal_priority"`
Disks []Disk `json:"disks"`
}
// HealingDisk contains information about
type HealingDisk struct {
// Copied from cmd/background-newdisks-heal-ops.go
// When adding new field, update (*healingTracker).toHealingDisk
ID string `json:"id"`
PoolIndex int `json:"pool_index"`
SetIndex int `json:"set_index"`
DiskIndex int `json:"disk_index"`
Endpoint string `json:"endpoint"`
Path string `json:"path"`
Started time.Time `json:"started"`
LastUpdate time.Time `json:"last_update"`
ObjectsHealed uint64 `json:"objects_healed"`
ObjectsFailed uint64 `json:"objects_failed"`
BytesDone uint64 `json:"bytes_done"`
BytesFailed uint64 `json:"bytes_failed"`
// Last object scanned.
Bucket string `json:"current_bucket"`
Object string `json:"current_object"`
// Filled on startup/restarts.
QueuedBuckets []string `json:"queued_buckets"`
// Filled during heal.
HealedBuckets []string `json:"healed_buckets"`
// future add more tracking capabilities
}
// Merge others into b.
func (b *BgHealState) Merge(others ...BgHealState) {
for _, other := range others {
b.ScannedItemsCount += other.ScannedItemsCount
if len(b.Sets) == 0 {
b.Sets = make([]SetStatus, len(other.Sets))
copy(b.Sets, other.Sets)
continue
}
// Add disk if not present.
// If present select the one with latest lastupdate.
addSet := func(set SetStatus) {
for eSetIdx, existing := range b.Sets {
if existing.ID != set.ID {
continue
}
if len(existing.Disks) < len(set.Disks) {
b.Sets[eSetIdx].Disks = set.Disks
}
if len(existing.Disks) < len(set.Disks) {
return
}
for i, disk := range set.Disks {
// Disks should be the same.
if disk.HealInfo != nil {
existing.Disks[i].HealInfo = disk.HealInfo
}
}
return
}
b.Sets = append(b.Sets, set)
}
for _, disk := range other.Sets {
addSet(disk)
}
}
sort.Slice(b.Sets, func(i, j int) bool {
if b.Sets[i].PoolIndex != b.Sets[j].PoolIndex {
return b.Sets[i].PoolIndex < b.Sets[j].PoolIndex
}
return b.Sets[i].SetIndex < b.Sets[j].SetIndex
})
} }
// BackgroundHealStatus returns the background heal status of the // BackgroundHealStatus returns the background heal status of the

View File

@ -35,6 +35,8 @@ const (
FS FS
// Multi disk Erasure (single, distributed) backend. // Multi disk Erasure (single, distributed) backend.
Erasure Erasure
// Gateway to other storage
Gateway
// Add your own backend. // Add your own backend.
) )
@ -57,16 +59,26 @@ type StorageInfo struct {
Disks []Disk Disks []Disk
// Backend type. // Backend type.
Backend struct { Backend BackendInfo
// Represents various backend types, currently on FS and Erasure. }
Type BackendType
// Following fields are only meaningful if BackendType is Erasure. // BackendInfo - contains info of the underlying backend
OnlineDisks BackendDisks // Online disks during server startup. type BackendInfo struct {
OfflineDisks BackendDisks // Offline disks during server startup. // Represents various backend types, currently on FS, Erasure and Gateway
StandardSCParity int // Parity disks for currently configured Standard storage class. Type BackendType
RRSCParity int // Parity disks for currently configured Reduced Redundancy storage class.
} // Following fields are only meaningful if BackendType is Gateway.
GatewayOnline bool
// Following fields are only meaningful if BackendType is Erasure.
OnlineDisks BackendDisks // Online disks during server startup.
OfflineDisks BackendDisks // Offline disks during server startup.
// Following fields are only meaningful if BackendType is Erasure.
StandardSCData []int // Data disks for currently configured Standard storage class.
StandardSCParity int // Parity disks for currently configured Standard storage class.
RRSCData []int // Data disks for currently configured Reduced Redundancy storage class.
RRSCParity int // Parity disks for currently configured Reduced Redundancy storage class.
} }
// BackendDisks - represents the map of endpoint-disks. // BackendDisks - represents the map of endpoint-disks.
@ -280,21 +292,27 @@ type ServerProperties struct {
// Disk holds Disk information // Disk holds Disk information
type Disk struct { type Disk struct {
Endpoint string `json:"endpoint,omitempty"` Endpoint string `json:"endpoint,omitempty"`
RootDisk bool `json:"rootDisk,omitempty"` RootDisk bool `json:"rootDisk,omitempty"`
DrivePath string `json:"path,omitempty"` DrivePath string `json:"path,omitempty"`
Healing bool `json:"healing,omitempty"` Healing bool `json:"healing,omitempty"`
State string `json:"state,omitempty"` State string `json:"state,omitempty"`
UUID string `json:"uuid,omitempty"` UUID string `json:"uuid,omitempty"`
Model string `json:"model,omitempty"` Model string `json:"model,omitempty"`
TotalSpace uint64 `json:"totalspace,omitempty"` TotalSpace uint64 `json:"totalspace,omitempty"`
UsedSpace uint64 `json:"usedspace,omitempty"` UsedSpace uint64 `json:"usedspace,omitempty"`
AvailableSpace uint64 `json:"availspace,omitempty"` AvailableSpace uint64 `json:"availspace,omitempty"`
ReadThroughput float64 `json:"readthroughput,omitempty"` ReadThroughput float64 `json:"readthroughput,omitempty"`
WriteThroughPut float64 `json:"writethroughput,omitempty"` WriteThroughPut float64 `json:"writethroughput,omitempty"`
ReadLatency float64 `json:"readlatency,omitempty"` ReadLatency float64 `json:"readlatency,omitempty"`
WriteLatency float64 `json:"writelatency,omitempty"` WriteLatency float64 `json:"writelatency,omitempty"`
Utilization float64 `json:"utilization,omitempty"` Utilization float64 `json:"utilization,omitempty"`
HealInfo *HealingDisk `json:"heal_info,omitempty"`
// Indexes, will be -1 until assigned a set.
PoolIndex int `json:"pool_index"`
SetIndex int `json:"set_index"`
DiskIndex int `json:"disk_index"`
} }
// ServerInfo - Connect to a minio server and call Server Admin Info Management API // ServerInfo - Connect to a minio server and call Server Admin Info Management API