diff --git a/cmd/admin-handlers-pools.go b/cmd/admin-handlers-pools.go
new file mode 100644
index 000000000..3b3107905
--- /dev/null
+++ b/cmd/admin-handlers-pools.go
@@ -0,0 +1,179 @@
+// Copyright (c) 2015-2021 MinIO, Inc.
+//
+// This file is part of MinIO Object Storage stack
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package cmd
+
+import (
+ "encoding/json"
+ "net/http"
+
+ "github.com/gorilla/mux"
+ "github.com/minio/minio/internal/logger"
+ iampolicy "github.com/minio/pkg/iam/policy"
+)
+
+func (a adminAPIHandlers) StartDecommission(w http.ResponseWriter, r *http.Request) {
+ ctx := newContext(r, w, "StartDecommission")
+
+ defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
+
+ objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.DecommissionAdminAction)
+ if objectAPI == nil {
+ return
+ }
+
+ // Legacy args style such as non-ellipses style is not supported with this API.
+ if globalEndpoints.Legacy() {
+ writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
+ return
+ }
+
+ pools, ok := objectAPI.(*erasureServerPools)
+ if !ok {
+ writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
+ return
+ }
+
+ vars := mux.Vars(r)
+ v := vars["pool"]
+
+ idx := globalEndpoints.GetPoolIdx(v)
+ if idx == -1 {
+ // We didn't find any matching pools, invalid input
+ writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errInvalidArgument), r.URL)
+ return
+ }
+
+ if err := pools.Decommission(r.Context(), idx); err != nil {
+ writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
+ return
+ }
+}
+
+func (a adminAPIHandlers) CancelDecommission(w http.ResponseWriter, r *http.Request) {
+ ctx := newContext(r, w, "CancelDecommission")
+
+ defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
+
+ objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.DecommissionAdminAction)
+ if objectAPI == nil {
+ return
+ }
+
+ // Legacy args style such as non-ellipses style is not supported with this API.
+ if globalEndpoints.Legacy() {
+ writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
+ return
+ }
+
+ pools, ok := objectAPI.(*erasureServerPools)
+ if !ok {
+ writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
+ return
+ }
+
+ vars := mux.Vars(r)
+ v := vars["pool"]
+
+ idx := globalEndpoints.GetPoolIdx(v)
+ if idx == -1 {
+ // We didn't find any matching pools, invalid input
+ writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errInvalidArgument), r.URL)
+ return
+ }
+
+ if err := pools.DecommissionCancel(ctx, idx); err != nil {
+ writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
+ return
+ }
+}
+
+func (a adminAPIHandlers) StatusPool(w http.ResponseWriter, r *http.Request) {
+ ctx := newContext(r, w, "StatusPool")
+
+ defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
+
+ objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.ServerInfoAdminAction, iampolicy.DecommissionAdminAction)
+ if objectAPI == nil {
+ return
+ }
+
+ // Legacy args style such as non-ellipses style is not supported with this API.
+ if globalEndpoints.Legacy() {
+ writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
+ return
+ }
+
+ pools, ok := objectAPI.(*erasureServerPools)
+ if !ok {
+ writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
+ return
+ }
+
+ vars := mux.Vars(r)
+ v := vars["pool"]
+
+ idx := globalEndpoints.GetPoolIdx(v)
+ if idx == -1 {
+ // We didn't find any matching pools, invalid input
+ writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errInvalidArgument), r.URL)
+ return
+ }
+
+ status, err := pools.Status(r.Context(), idx)
+ if err != nil {
+ writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
+ return
+ }
+
+ logger.LogIf(r.Context(), json.NewEncoder(w).Encode(&status))
+}
+
+func (a adminAPIHandlers) ListPools(w http.ResponseWriter, r *http.Request) {
+ ctx := newContext(r, w, "ListPools")
+
+ defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
+
+ objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.ServerInfoAdminAction, iampolicy.DecommissionAdminAction)
+ if objectAPI == nil {
+ return
+ }
+
+ // Legacy args style such as non-ellipses style is not supported with this API.
+ if globalEndpoints.Legacy() {
+ writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
+ return
+ }
+
+ pools, ok := objectAPI.(*erasureServerPools)
+ if !ok {
+ writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
+ return
+ }
+
+ poolsStatus := make([]PoolStatus, len(globalEndpoints))
+ for idx := range globalEndpoints {
+ status, err := pools.Status(r.Context(), idx)
+ if err != nil {
+ writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
+ return
+ }
+ poolsStatus[idx] = status
+ }
+
+ logger.LogIf(r.Context(), json.NewEncoder(w).Encode(poolsStatus))
+}
diff --git a/cmd/admin-router.go b/cmd/admin-router.go
index 44b99221f..6abdb5e4c 100644
--- a/cmd/admin-router.go
+++ b/cmd/admin-router.go
@@ -74,8 +74,14 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) {
adminRouter.Methods(http.MethodPost).Path(adminVersion + "/heal/").HandlerFunc(gz(httpTraceAll(adminAPI.HealHandler)))
adminRouter.Methods(http.MethodPost).Path(adminVersion + "/heal/{bucket}").HandlerFunc(gz(httpTraceAll(adminAPI.HealHandler)))
adminRouter.Methods(http.MethodPost).Path(adminVersion + "/heal/{bucket}/{prefix:.*}").HandlerFunc(gz(httpTraceAll(adminAPI.HealHandler)))
-
adminRouter.Methods(http.MethodPost).Path(adminVersion + "/background-heal/status").HandlerFunc(gz(httpTraceAll(adminAPI.BackgroundHealStatusHandler)))
+
+ // Pool operations
+ adminRouter.Methods(http.MethodGet).Path(adminVersion + "/pools/list").HandlerFunc(gz(httpTraceAll(adminAPI.ListPools)))
+ adminRouter.Methods(http.MethodGet).Path(adminVersion+"/pools/status").HandlerFunc(gz(httpTraceAll(adminAPI.StatusPool))).Queries("pool", "{pool:.*}")
+
+ adminRouter.Methods(http.MethodPost).Path(adminVersion+"/pools/decommission").HandlerFunc(gz(httpTraceAll(adminAPI.StartDecommission))).Queries("pool", "{pool:.*}")
+ adminRouter.Methods(http.MethodPost).Path(adminVersion+"/pools/cancel").HandlerFunc(gz(httpTraceAll(adminAPI.CancelDecommission))).Queries("pool", "{pool:.*}")
}
// Profiling operations
diff --git a/cmd/endpoint-ellipses.go b/cmd/endpoint-ellipses.go
index 8e3398762..eedd1435b 100644
--- a/cmd/endpoint-ellipses.go
+++ b/cmd/endpoint-ellipses.go
@@ -353,9 +353,11 @@ func createServerEndpoints(serverAddr string, args ...string) (
return nil, -1, err
}
endpointServerPools = append(endpointServerPools, PoolEndpoints{
+ Legacy: true,
SetCount: len(setArgs),
DrivesPerSet: len(setArgs[0]),
Endpoints: endpointList,
+ CmdLine: strings.Join(args, " "),
})
setupType = newSetupType
return endpointServerPools, setupType, nil
@@ -376,6 +378,7 @@ func createServerEndpoints(serverAddr string, args ...string) (
SetCount: len(setArgs),
DrivesPerSet: len(setArgs[0]),
Endpoints: endpointList,
+ CmdLine: arg,
}); err != nil {
return nil, -1, err
}
diff --git a/cmd/endpoint.go b/cmd/endpoint.go
index e6381d464..d6088ca06 100644
--- a/cmd/endpoint.go
+++ b/cmd/endpoint.go
@@ -197,14 +197,28 @@ func NewEndpoint(arg string) (ep Endpoint, e error) {
// PoolEndpoints represent endpoints in a given pool
// along with its setCount and setDriveCount.
type PoolEndpoints struct {
+ // indicates if endpoints are provided in non-ellipses style
+ Legacy bool
SetCount int
DrivesPerSet int
Endpoints Endpoints
+ CmdLine string
}
// EndpointServerPools - list of list of endpoints
type EndpointServerPools []PoolEndpoints
+// GetPoolIdx return pool index
+func (l EndpointServerPools) GetPoolIdx(pool string) int {
+ for id, ep := range globalEndpoints {
+ if ep.CmdLine != pool {
+ continue
+ }
+ return id
+ }
+ return -1
+}
+
// GetLocalPoolIdx returns the pool which endpoint belongs to locally.
// if ep is remote this code will return -1 poolIndex
func (l EndpointServerPools) GetLocalPoolIdx(ep Endpoint) int {
@@ -220,6 +234,13 @@ func (l EndpointServerPools) GetLocalPoolIdx(ep Endpoint) int {
return -1
}
+// Legacy returns 'true' if the MinIO server commandline was
+// provided with no ellipses pattern, those are considered
+// legacy deployments.
+func (l EndpointServerPools) Legacy() bool {
+ return len(l) == 1 && l[0].Legacy
+}
+
// Add add pool endpoints
func (l *EndpointServerPools) Add(zeps PoolEndpoints) error {
existSet := set.NewStringSet()
diff --git a/cmd/erasure-common.go b/cmd/erasure-common.go
index 575192674..9cfbb9808 100644
--- a/cmd/erasure-common.go
+++ b/cmd/erasure-common.go
@@ -22,6 +22,38 @@ import (
"sync"
)
+func (er erasureObjects) getOnlineDisks() (newDisks []StorageAPI) {
+ disks := er.getDisks()
+ var wg sync.WaitGroup
+ var mu sync.Mutex
+ for _, i := range hashOrder(UTCNow().String(), len(disks)) {
+ i := i
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ if disks[i-1] == nil {
+ return
+ }
+ di, err := disks[i-1].DiskInfo(context.Background())
+ if err != nil || di.Healing {
+ // - Do not consume disks which are not reachable
+ // unformatted or simply not accessible for some reason.
+ //
+ // - Do not consume disks which are being healed
+ //
+ // - Future: skip busy disks
+ return
+ }
+
+ mu.Lock()
+ newDisks = append(newDisks, disks[i-1])
+ mu.Unlock()
+ }()
+ }
+ wg.Wait()
+ return newDisks
+}
+
func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) {
disks := er.getDisks()
// Based on the random shuffling return back randomized disks.
diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go
index 38183ae24..6b6cb721e 100644
--- a/cmd/erasure-object.go
+++ b/cmd/erasure-object.go
@@ -408,7 +408,8 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
}
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
- if reducedErr == errErasureReadQuorum && bucket != minioMetaBucket {
+ if errors.Is(reducedErr, errErasureReadQuorum) && !strings.HasPrefix(bucket, minioMetaBucket) {
+ // Skip buckets that live at `.minio.sys` bucket.
if _, ok := isObjectDangling(metaArr, errs, nil); ok {
reducedErr = errFileNotFound
if opts.VersionID != "" {
diff --git a/cmd/erasure-server-pool-decom.go b/cmd/erasure-server-pool-decom.go
new file mode 100644
index 000000000..5d3305133
--- /dev/null
+++ b/cmd/erasure-server-pool-decom.go
@@ -0,0 +1,919 @@
+// Copyright (c) 2015-2021 MinIO, Inc.
+//
+// This file is part of MinIO Object Storage stack
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package cmd
+
+import (
+ "context"
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "net/http"
+ "sort"
+ "time"
+
+ "github.com/dustin/go-humanize"
+ "github.com/minio/madmin-go"
+ "github.com/minio/minio/internal/config/storageclass"
+ "github.com/minio/minio/internal/hash"
+ "github.com/minio/minio/internal/logger"
+)
+
+// PoolDecommissionInfo currently decommissioning information
+type PoolDecommissionInfo struct {
+ StartTime time.Time `json:"startTime" msg:"st"`
+ StartSize int64 `json:"startSize" msg:"ss"`
+ TotalSize int64 `json:"totalSize" msg:"ts"`
+ CurrentSize int64 `json:"currentSize" msg:"cs"`
+
+ Complete bool `json:"complete" msg:"cmp"`
+ Failed bool `json:"failed" msg:"fl"`
+ Canceled bool `json:"canceled" msg:"cnl"`
+
+ // Internal information.
+ QueuedBuckets []string `json:"-" msg:"bkts"`
+ DecommissionedBuckets []string `json:"-" msg:"dbkts"`
+
+ // Last bucket/object decommissioned.
+ Bucket string `json:"-" msg:"bkt"`
+ Object string `json:"-" msg:"obj"`
+
+ // Verbose information
+ ItemsDecommissioned uint64 `json:"-" msg:"id"`
+ ItemsDecommissionFailed uint64 `json:"-" msg:"idf"`
+ BytesDone uint64 `json:"-" msg:"bd"`
+ BytesFailed uint64 `json:"-" msg:"bf"`
+}
+
+// bucketPop should be called when a bucket is done decommissioning.
+// Adds the bucket to the list of decommissioned buckets and updates resume numbers.
+func (pd *PoolDecommissionInfo) bucketPop(bucket string) {
+ pd.DecommissionedBuckets = append(pd.DecommissionedBuckets, bucket)
+ for i, b := range pd.QueuedBuckets {
+ if b == bucket {
+ // Bucket is done.
+ pd.QueuedBuckets = append(pd.QueuedBuckets[:i], pd.QueuedBuckets[i+1:]...)
+ }
+ }
+}
+
+func (pd *PoolDecommissionInfo) bucketsToDecommission() []string {
+ return pd.QueuedBuckets
+}
+
+func (pd *PoolDecommissionInfo) isBucketDecommissioned(bucket string) bool {
+ for _, b := range pd.DecommissionedBuckets {
+ if b == bucket {
+ return true
+ }
+ }
+ return false
+}
+
+func (pd *PoolDecommissionInfo) bucketPush(bucket string) {
+ for _, b := range pd.QueuedBuckets {
+ if pd.isBucketDecommissioned(b) {
+ return
+ }
+ if b == bucket {
+ return
+ }
+ }
+ pd.QueuedBuckets = append(pd.QueuedBuckets, bucket)
+}
+
+// PoolStatus captures current pool status
+type PoolStatus struct {
+ ID int `json:"id" msg:"id"`
+ CmdLine string `json:"cmdline" msg:"cl"`
+ LastUpdate time.Time `json:"lastUpdate" msg:"lu"`
+ Decommission *PoolDecommissionInfo `json:"decommissionInfo,omitempty" msg:"dec"`
+}
+
+//go:generate msgp -file $GOFILE -unexported
+type poolMeta struct {
+ Version int `msg:"v"`
+ Pools []PoolStatus `msg:"pls"`
+}
+
+// A decommission resumable tells us if decommission is worth
+// resuming upon restart of a cluster.
+func (p *poolMeta) returnResumablePools(n int) []PoolStatus {
+ var newPools []PoolStatus
+ for _, pool := range p.Pools {
+ if pool.Decommission == nil {
+ continue
+ }
+ if pool.Decommission.Complete || pool.Decommission.Canceled {
+ // Do not resume decommission upon startup for
+ // - decommission complete
+ // - decommission canceled
+ continue
+ } // In all other situations we need to resume
+ newPools = append(newPools, pool)
+ if n > 0 && len(newPools) == n {
+ return newPools
+ }
+ }
+ return nil
+}
+
+func (p *poolMeta) DecommissionComplete(idx int) bool {
+ if p.Pools[idx].Decommission != nil {
+ p.Pools[idx].LastUpdate = time.Now().UTC()
+ p.Pools[idx].Decommission.Complete = true
+ p.Pools[idx].Decommission.Failed = false
+ p.Pools[idx].Decommission.Canceled = false
+ return true
+ }
+ return false
+}
+
+func (p *poolMeta) DecommissionFailed(idx int) bool {
+ if p.Pools[idx].Decommission != nil {
+ p.Pools[idx].LastUpdate = time.Now().UTC()
+ p.Pools[idx].Decommission.StartTime = time.Time{}
+ p.Pools[idx].Decommission.Complete = false
+ p.Pools[idx].Decommission.Failed = true
+ p.Pools[idx].Decommission.Canceled = false
+ return true
+ }
+ return false
+}
+
+func (p *poolMeta) DecommissionCancel(idx int) bool {
+ if p.Pools[idx].Decommission != nil {
+ p.Pools[idx].LastUpdate = time.Now().UTC()
+ p.Pools[idx].Decommission.StartTime = time.Time{}
+ p.Pools[idx].Decommission.Complete = false
+ p.Pools[idx].Decommission.Failed = false
+ p.Pools[idx].Decommission.Canceled = true
+ return true
+ }
+ return false
+}
+
+func (p poolMeta) isBucketDecommissioned(idx int, bucket string) bool {
+ return p.Pools[idx].Decommission.isBucketDecommissioned(bucket)
+}
+
+func (p *poolMeta) BucketDone(idx int, bucket string) {
+ if p.Pools[idx].Decommission == nil {
+ // Decommission not in progress.
+ return
+ }
+ p.Pools[idx].Decommission.bucketPop(bucket)
+}
+
+func (p poolMeta) ResumeBucketObject(idx int) (bucket, object string) {
+ if p.Pools[idx].Decommission != nil {
+ bucket = p.Pools[idx].Decommission.Bucket
+ object = p.Pools[idx].Decommission.Object
+ }
+ return
+}
+
+func (p *poolMeta) TrackCurrentBucketObject(idx int, bucket string, object string) {
+ if p.Pools[idx].Decommission == nil {
+ // Decommission not in progress.
+ return
+ }
+ p.Pools[idx].Decommission.Bucket = bucket
+ p.Pools[idx].Decommission.Object = object
+}
+
+func (p *poolMeta) PendingBuckets(idx int) []string {
+ if p.Pools[idx].Decommission == nil {
+ // Decommission not in progress.
+ return nil
+ }
+
+ return p.Pools[idx].Decommission.bucketsToDecommission()
+}
+
+func (p *poolMeta) QueueBuckets(idx int, buckets []BucketInfo) {
+ // add new queued buckets
+ for _, bucket := range buckets {
+ p.Pools[idx].Decommission.bucketPush(bucket.Name)
+ }
+}
+
+var (
+ errDecommissionAlreadyRunning = errors.New("decommission is already in progress")
+ errDecommissionComplete = errors.New("decommission is complete, please remove the servers from command-line")
+)
+
+func (p *poolMeta) Decommission(idx int, info StorageInfo) error {
+ for i, pool := range p.Pools {
+ if idx == i {
+ continue
+ }
+ if pool.Decommission != nil {
+ // Do not allow multiple decommissions at the same time.
+ // We shall for now only allow one pool decommission at
+ // a time.
+ return fmt.Errorf("%w at index: %d", errDecommissionAlreadyRunning, i)
+ }
+ }
+ if p.Pools[idx].Decommission == nil {
+ startSize := TotalUsableCapacityFree(info)
+ totalSize := TotalUsableCapacity(info)
+ p.Pools[idx].LastUpdate = time.Now().UTC()
+ p.Pools[idx].Decommission = &PoolDecommissionInfo{
+ StartTime: UTCNow(),
+ StartSize: startSize,
+ CurrentSize: startSize,
+ TotalSize: totalSize,
+ }
+ return nil
+ }
+
+ // Completed pool doesn't need to be decommissioned again.
+ if p.Pools[idx].Decommission.Complete {
+ return errDecommissionComplete
+ }
+
+ // Canceled or Failed decommission can be triggered again.
+ if p.Pools[idx].Decommission.StartTime.IsZero() {
+ if p.Pools[idx].Decommission.Canceled || p.Pools[idx].Decommission.Failed {
+ startSize := TotalUsableCapacityFree(info)
+ totalSize := TotalUsableCapacity(info)
+ p.Pools[idx].LastUpdate = time.Now().UTC()
+ p.Pools[idx].Decommission = &PoolDecommissionInfo{
+ StartTime: UTCNow(),
+ StartSize: startSize,
+ CurrentSize: startSize,
+ TotalSize: totalSize,
+ }
+ return nil
+ }
+ } // In-progress pool doesn't need to be decommissioned again.
+
+ // In all other scenarios an active decommissioning is in progress.
+ return errDecommissionAlreadyRunning
+}
+
+func (p poolMeta) IsSuspended(idx int) bool {
+ return p.Pools[idx].Decommission != nil
+}
+
+func (p *poolMeta) load(ctx context.Context, pool *erasureSets, pools []*erasureSets) (bool, error) {
+ data, err := readConfig(ctx, pool, poolMetaName)
+ if err != nil {
+ if errors.Is(err, errConfigNotFound) || isErrObjectNotFound(err) {
+ return true, nil
+ }
+ return false, err
+ }
+ if len(data) == 0 {
+ return true, nil
+ }
+ if len(data) <= 4 {
+ return false, fmt.Errorf("poolMeta: no data")
+ }
+ // Read header
+ switch binary.LittleEndian.Uint16(data[0:2]) {
+ case poolMetaFormat:
+ default:
+ return false, fmt.Errorf("poolMeta: unknown format: %d", binary.LittleEndian.Uint16(data[0:2]))
+ }
+ switch binary.LittleEndian.Uint16(data[2:4]) {
+ case poolMetaVersion:
+ default:
+ return false, fmt.Errorf("poolMeta: unknown version: %d", binary.LittleEndian.Uint16(data[2:4]))
+ }
+
+ // OK, parse data.
+ if _, err = p.UnmarshalMsg(data[4:]); err != nil {
+ return false, err
+ }
+
+ switch p.Version {
+ case poolMetaVersionV1:
+ default:
+ return false, fmt.Errorf("unexpected pool meta version: %d", p.Version)
+ }
+
+ type poolInfo struct {
+ position int
+ completed bool
+ }
+
+ rememberedPools := make(map[string]poolInfo)
+ for idx, pool := range p.Pools {
+ complete := false
+ if pool.Decommission != nil && pool.Decommission.Complete {
+ complete = true
+ }
+ rememberedPools[pool.CmdLine] = poolInfo{
+ position: idx,
+ completed: complete,
+ }
+ }
+
+ specifiedPools := make(map[string]int)
+ for idx, pool := range pools {
+ specifiedPools[pool.endpoints.CmdLine] = idx
+ }
+
+ // Check if specified pools need to remove decommissioned pool.
+ for k := range specifiedPools {
+ pi, ok := rememberedPools[k]
+ if ok && pi.completed {
+ return false, fmt.Errorf("pool(%s) = %s is decommissioned, please remove from server command line", humanize.Ordinal(pi.position+1), k)
+ }
+ }
+
+ // check if remembered pools are in right position or missing from command line.
+ for k, pi := range rememberedPools {
+ if pi.completed {
+ continue
+ }
+ _, ok := specifiedPools[k]
+ if !ok {
+ return false, fmt.Errorf("pool(%s) = %s is not specified, please specify on server command line", humanize.Ordinal(pi.position+1), k)
+ }
+ }
+
+ // check when remembered pools and specified pools are same they are at the expected position
+ if len(rememberedPools) == len(specifiedPools) {
+ for k, pi := range rememberedPools {
+ pos, ok := specifiedPools[k]
+ if !ok {
+ return false, fmt.Errorf("pool(%s) = %s is not specified, please specify on server command line", humanize.Ordinal(pi.position+1), k)
+ }
+ if pos != pi.position {
+ return false, fmt.Errorf("pool order change detected for %s, expected position is (%s) but found (%s)", k, humanize.Ordinal(pi.position+1), humanize.Ordinal(pos+1))
+ }
+ }
+ }
+
+ return len(rememberedPools) != len(specifiedPools), nil
+}
+
+func (p *poolMeta) CountItem(idx int, size int64, failed bool) {
+ pd := p.Pools[idx].Decommission
+ if pd != nil {
+ if failed {
+ pd.ItemsDecommissionFailed++
+ pd.BytesFailed += uint64(size)
+ } else {
+ pd.ItemsDecommissioned++
+ pd.BytesDone += uint64(size)
+ }
+ p.Pools[idx].Decommission = pd
+ }
+}
+
+func (p *poolMeta) updateAfter(ctx context.Context, idx int, pools []*erasureSets, duration time.Duration) error {
+ if p.Pools[idx].Decommission == nil {
+ return errInvalidArgument
+ }
+ if time.Since(p.Pools[idx].LastUpdate) > duration {
+ p.Pools[idx].LastUpdate = time.Now().UTC()
+ return p.save(ctx, pools)
+ }
+ return nil
+}
+
+func (p poolMeta) save(ctx context.Context, pools []*erasureSets) error {
+ data := make([]byte, 4, p.Msgsize()+4)
+
+ // Initialize the header.
+ binary.LittleEndian.PutUint16(data[0:2], poolMetaFormat)
+ binary.LittleEndian.PutUint16(data[2:4], poolMetaVersion)
+
+ buf, err := p.MarshalMsg(data)
+ if err != nil {
+ return err
+ }
+
+ // Saves on all pools to make sure decommissioning of first pool is allowed.
+ for _, eset := range pools {
+ if err = saveConfig(ctx, eset, poolMetaName, buf); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+const (
+ poolMetaName = "pool.bin"
+ poolMetaFormat = 1
+ poolMetaVersionV1 = 1
+ poolMetaVersion = poolMetaVersionV1
+)
+
+// Init() initializes pools and saves additional information about them
+// in 'pool.bin', this is eventually used for decommissioning the pool.
+func (z *erasureServerPools) Init(ctx context.Context) error {
+ meta := poolMeta{}
+
+ update, err := meta.load(ctx, z.serverPools[0], z.serverPools)
+ if err != nil {
+ return err
+ }
+
+ // if no update is needed return right away.
+ if !update {
+ // We are only supporting single pool decommission at this time
+ // so it makes sense to only resume single pools at any given
+ // time, in future meta.returnResumablePools() might take
+ // '-1' as argument to decommission multiple pools at a time
+ // but this is not a priority at the moment.
+ for _, pool := range meta.returnResumablePools(1) {
+ err := z.Decommission(ctx, pool.ID)
+ switch err {
+ case errDecommissionAlreadyRunning:
+ fallthrough
+ case nil:
+ go z.doDecommissionInRoutine(ctx, pool.ID)
+ }
+ }
+ z.poolMeta = meta
+
+ return nil
+ }
+
+ // looks like new pool was added we need to update,
+ // or this is a fresh installation (or an existing
+ // installation with pool removed)
+ meta.Version = poolMetaVersion
+ for idx, pool := range z.serverPools {
+ meta.Pools = append(meta.Pools, PoolStatus{
+ CmdLine: pool.endpoints.CmdLine,
+ ID: idx,
+ LastUpdate: time.Now().UTC(),
+ })
+ }
+ if err = meta.save(ctx, z.serverPools); err != nil {
+ return err
+ }
+ z.poolMeta = meta
+ return nil
+}
+
+func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket string, gr *GetObjectReader) (err error) {
+ defer gr.Close()
+ objInfo := gr.ObjInfo
+ if objInfo.isMultipart() {
+ uploadID, err := z.NewMultipartUpload(ctx, bucket, objInfo.Name,
+ ObjectOptions{
+ VersionID: objInfo.VersionID,
+ MTime: objInfo.ModTime,
+ UserDefined: objInfo.UserDefined,
+ })
+ if err != nil {
+ return err
+ }
+ defer z.AbortMultipartUpload(ctx, bucket, objInfo.Name, uploadID, ObjectOptions{})
+ parts := make([]CompletePart, 0, len(objInfo.Parts))
+ for _, part := range objInfo.Parts {
+ hr, err := hash.NewReader(gr, part.Size, "", "", part.Size)
+ if err != nil {
+ return err
+ }
+ _, err = z.PutObjectPart(ctx, bucket, objInfo.Name, uploadID,
+ part.Number,
+ NewPutObjReader(hr),
+ ObjectOptions{})
+ if err != nil {
+ return err
+ }
+ parts = append(parts, CompletePart{
+ PartNumber: part.Number,
+ ETag: part.ETag,
+ })
+ }
+ _, err = z.CompleteMultipartUpload(ctx, bucket, objInfo.Name, uploadID, parts, ObjectOptions{
+ MTime: objInfo.ModTime,
+ })
+ return err
+ }
+ hr, err := hash.NewReader(gr, objInfo.Size, "", "", objInfo.Size)
+ if err != nil {
+ return err
+ }
+ _, err = z.PutObject(ctx,
+ bucket,
+ objInfo.Name,
+ NewPutObjReader(hr),
+ ObjectOptions{
+ VersionID: objInfo.VersionID,
+ MTime: objInfo.ModTime,
+ UserDefined: objInfo.UserDefined,
+ })
+ return err
+}
+
+// versionsSorter sorts FileInfo slices by version.
+//msgp:ignore versionsSorter
+type versionsSorter []FileInfo
+
+func (v versionsSorter) reverse() {
+ sort.Slice(v, func(i, j int) bool {
+ return v[i].ModTime.Before(v[j].ModTime)
+ })
+}
+
+func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool *erasureSets, bName string) error {
+ var forwardTo string
+ // If we resume to the same bucket, forward to last known item.
+ rbucket, robject := z.poolMeta.ResumeBucketObject(idx)
+ if rbucket != "" {
+ if rbucket == bName {
+ forwardTo = robject
+ }
+ }
+
+ versioned := globalBucketVersioningSys.Enabled(bName)
+ for _, set := range pool.sets {
+ disks := set.getOnlineDisks()
+ if len(disks) == 0 {
+ logger.LogIf(GlobalContext, fmt.Errorf("no online disks found for set with endpoints %s",
+ set.getEndpoints()))
+ continue
+ }
+
+ decommissionEntry := func(entry metaCacheEntry) {
+ if entry.isDir() {
+ return
+ }
+
+ fivs, err := entry.fileInfoVersions(bName)
+ if err != nil {
+ return
+ }
+
+ // We need a reversed order for Decommissioning,
+ // to create the appropriate stack.
+ versionsSorter(fivs.Versions).reverse()
+
+ for _, version := range fivs.Versions {
+ // TODO: Skip transitioned objects for now.
+ if version.IsRemote() {
+ continue
+ }
+ // We will skip decommissioning delete markers
+ // with single version, its as good as there
+ // is no data associated with the object.
+ if version.Deleted && len(fivs.Versions) == 1 {
+ continue
+ }
+ if version.Deleted {
+ _, err := z.DeleteObject(ctx,
+ bName,
+ version.Name,
+ ObjectOptions{
+ Versioned: versioned,
+ VersionID: version.VersionID,
+ MTime: version.ModTime,
+ DeleteReplication: version.ReplicationState,
+ })
+ if err != nil {
+ logger.LogIf(ctx, err)
+ z.poolMetaMutex.Lock()
+ z.poolMeta.CountItem(idx, 0, true)
+ z.poolMetaMutex.Unlock()
+ } else {
+ set.DeleteObject(ctx,
+ bName,
+ version.Name,
+ ObjectOptions{
+ VersionID: version.VersionID,
+ })
+ z.poolMetaMutex.Lock()
+ z.poolMeta.CountItem(idx, 0, false)
+ z.poolMetaMutex.Unlock()
+ }
+ continue
+ }
+ gr, err := set.GetObjectNInfo(ctx,
+ bName,
+ version.Name,
+ nil,
+ http.Header{},
+ noLock, // all mutations are blocked reads are safe without locks.
+ ObjectOptions{
+ VersionID: version.VersionID,
+ })
+ if err != nil {
+ logger.LogIf(ctx, err)
+ z.poolMetaMutex.Lock()
+ z.poolMeta.CountItem(idx, version.Size, true)
+ z.poolMetaMutex.Unlock()
+ continue
+ }
+ // gr.Close() is ensured by decommissionObject().
+ if err = z.decommissionObject(ctx, bName, gr); err != nil {
+ logger.LogIf(ctx, err)
+ z.poolMetaMutex.Lock()
+ z.poolMeta.CountItem(idx, version.Size, true)
+ z.poolMetaMutex.Unlock()
+ continue
+ }
+ set.DeleteObject(ctx,
+ bName,
+ version.Name,
+ ObjectOptions{
+ VersionID: version.VersionID,
+ })
+ z.poolMetaMutex.Lock()
+ z.poolMeta.CountItem(idx, version.Size, false)
+ z.poolMetaMutex.Unlock()
+ }
+ z.poolMetaMutex.Lock()
+ z.poolMeta.TrackCurrentBucketObject(idx, bName, entry.name)
+ logger.LogIf(ctx, z.poolMeta.updateAfter(ctx, idx, z.serverPools, time.Minute))
+ z.poolMetaMutex.Unlock()
+ }
+
+ // How to resolve partial results.
+ resolver := metadataResolutionParams{
+ dirQuorum: set.defaultRQuorum(),
+ objQuorum: set.defaultRQuorum(),
+ bucket: bName,
+ }
+
+ if err := listPathRaw(ctx, listPathRawOptions{
+ disks: disks,
+ bucket: bName,
+ recursive: true,
+ forwardTo: forwardTo,
+ minDisks: len(disks),
+ reportNotFound: false,
+ agreed: decommissionEntry,
+ partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
+ entry, ok := entries.resolve(&resolver)
+ if ok {
+ decommissionEntry(*entry)
+ }
+ },
+ finished: nil,
+ }); err != nil {
+ // Decommissioning failed and won't continue
+ return err
+ }
+ }
+ return nil
+}
+
+func (z *erasureServerPools) decommissionInBackground(ctx context.Context, idx int) error {
+ pool := z.serverPools[idx]
+ for _, bucket := range z.poolMeta.PendingBuckets(idx) {
+ if z.poolMeta.isBucketDecommissioned(idx, bucket) {
+ z.poolMetaMutex.Lock()
+ z.poolMeta.BucketDone(idx, bucket) // remove from pendingBuckets and persist.
+ z.poolMeta.save(ctx, z.serverPools)
+ z.poolMetaMutex.Unlock()
+ continue
+ }
+ if err := z.decommissionPool(ctx, idx, pool, bucket); err != nil {
+ return err
+ }
+ z.poolMetaMutex.Lock()
+ z.poolMeta.BucketDone(idx, bucket)
+ z.poolMeta.save(ctx, z.serverPools)
+ z.poolMetaMutex.Unlock()
+ }
+ return nil
+}
+
+func (z *erasureServerPools) doDecommissionInRoutine(ctx context.Context, idx int) {
+ z.poolMetaMutex.Lock()
+ var dctx context.Context
+ dctx, z.decommissionCancelers[idx] = context.WithCancel(GlobalContext)
+ z.poolMetaMutex.Unlock()
+
+ if err := z.decommissionInBackground(dctx, idx); err != nil {
+ logger.LogIf(GlobalContext, err)
+ logger.LogIf(GlobalContext, z.DecommissionFailed(dctx, idx))
+ return
+ }
+ // Complete the decommission..
+ logger.LogIf(GlobalContext, z.CompleteDecommission(dctx, idx))
+}
+
+// Decommission - start decommission session.
+func (z *erasureServerPools) Decommission(ctx context.Context, idx int) error {
+ if idx < 0 {
+ return errInvalidArgument
+ }
+
+ if z.SinglePool() {
+ return errInvalidArgument
+ }
+
+ // Make pool unwritable before decommissioning.
+ if err := z.StartDecommission(ctx, idx); err != nil {
+ return err
+ }
+
+ go z.doDecommissionInRoutine(ctx, idx)
+
+ // Successfully started decommissioning.
+ return nil
+}
+
+func (z *erasureServerPools) Status(ctx context.Context, idx int) (PoolStatus, error) {
+ if idx < 0 {
+ return PoolStatus{}, errInvalidArgument
+ }
+
+ z.poolMetaMutex.RLock()
+ defer z.poolMetaMutex.RUnlock()
+
+ if idx+1 > len(z.poolMeta.Pools) {
+ return PoolStatus{}, errInvalidArgument
+ }
+
+ pool := z.serverPools[idx]
+ info, _ := pool.StorageInfo(ctx)
+ info.Backend.Type = madmin.Erasure
+
+ scParity := globalStorageClass.GetParityForSC(storageclass.STANDARD)
+ if scParity <= 0 {
+ scParity = z.serverPools[0].defaultParityCount
+ }
+ rrSCParity := globalStorageClass.GetParityForSC(storageclass.RRS)
+ info.Backend.StandardSCData = append(info.Backend.StandardSCData, pool.SetDriveCount()-scParity)
+ info.Backend.RRSCData = append(info.Backend.RRSCData, pool.SetDriveCount()-rrSCParity)
+ info.Backend.StandardSCParity = scParity
+ info.Backend.RRSCParity = rrSCParity
+
+ currentSize := TotalUsableCapacityFree(info)
+
+ poolInfo := z.poolMeta.Pools[idx]
+ if poolInfo.Decommission != nil {
+ poolInfo.Decommission.CurrentSize = currentSize
+ } else {
+ poolInfo.Decommission = &PoolDecommissionInfo{
+ CurrentSize: currentSize,
+ TotalSize: TotalUsableCapacity(info),
+ }
+ }
+ return poolInfo, nil
+}
+
+func (z *erasureServerPools) ReloadPoolMeta(ctx context.Context) (err error) {
+ meta := poolMeta{}
+
+ if _, err = meta.load(ctx, z.serverPools[0], z.serverPools); err != nil {
+ return err
+ }
+
+ z.poolMetaMutex.Lock()
+ defer z.poolMetaMutex.Unlock()
+
+ z.poolMeta = meta
+ return nil
+}
+
+func (z *erasureServerPools) DecommissionCancel(ctx context.Context, idx int) (err error) {
+ if idx < 0 {
+ return errInvalidArgument
+ }
+
+ if z.SinglePool() {
+ return errInvalidArgument
+ }
+
+ z.poolMetaMutex.Lock()
+ defer z.poolMetaMutex.Unlock()
+
+ if z.poolMeta.DecommissionCancel(idx) {
+ z.decommissionCancelers[idx]() // cancel any active thread.
+ if err = z.poolMeta.save(ctx, z.serverPools); err != nil {
+ return err
+ }
+ globalNotificationSys.ReloadPoolMeta(ctx)
+ }
+ return nil
+}
+
+func (z *erasureServerPools) DecommissionFailed(ctx context.Context, idx int) (err error) {
+ if idx < 0 {
+ return errInvalidArgument
+ }
+
+ if z.SinglePool() {
+ return errInvalidArgument
+ }
+
+ z.poolMetaMutex.Lock()
+ defer z.poolMetaMutex.Unlock()
+
+ if z.poolMeta.DecommissionFailed(idx) {
+ z.decommissionCancelers[idx]() // cancel any active thread.
+ if err = z.poolMeta.save(ctx, z.serverPools); err != nil {
+ return err
+ }
+ globalNotificationSys.ReloadPoolMeta(ctx)
+ }
+ return nil
+}
+
+func (z *erasureServerPools) CompleteDecommission(ctx context.Context, idx int) (err error) {
+ if idx < 0 {
+ return errInvalidArgument
+ }
+
+ if z.SinglePool() {
+ return errInvalidArgument
+ }
+
+ z.poolMetaMutex.Lock()
+ defer z.poolMetaMutex.Unlock()
+
+ if z.poolMeta.DecommissionComplete(idx) {
+ if err = z.poolMeta.save(ctx, z.serverPools); err != nil {
+ return err
+ }
+ globalNotificationSys.ReloadPoolMeta(ctx)
+ }
+ return nil
+}
+
+func (z *erasureServerPools) StartDecommission(ctx context.Context, idx int) (err error) {
+ if idx < 0 {
+ return errInvalidArgument
+ }
+
+ if z.SinglePool() {
+ return errInvalidArgument
+ }
+
+ buckets, err := z.ListBuckets(ctx)
+ if err != nil {
+ return err
+ }
+
+ // TODO: Support decommissioning transition tiers.
+ for _, bucket := range buckets {
+ if lc, err := globalLifecycleSys.Get(bucket.Name); err == nil {
+ if lc.HasTransition() {
+ return fmt.Errorf("Bucket is part of transitioned tier %s: decommission is not allowed in Tier'd setups", bucket.Name)
+ }
+ }
+ }
+
+ // Buckets data are dispersed in multiple zones/sets, make
+ // sure to decommission the necessary metadata.
+ buckets = append(buckets, BucketInfo{
+ Name: pathJoin(minioMetaBucket, minioConfigPrefix),
+ }, BucketInfo{
+ Name: pathJoin(minioMetaBucket, bucketMetaPrefix),
+ })
+
+ var pool *erasureSets
+ for pidx := range z.serverPools {
+ if pidx == idx {
+ pool = z.serverPools[idx]
+ break
+ }
+ }
+
+ if pool == nil {
+ return errInvalidArgument
+ }
+
+ info, _ := pool.StorageInfo(ctx)
+ info.Backend.Type = madmin.Erasure
+
+ scParity := globalStorageClass.GetParityForSC(storageclass.STANDARD)
+ if scParity <= 0 {
+ scParity = z.serverPools[0].defaultParityCount
+ }
+ rrSCParity := globalStorageClass.GetParityForSC(storageclass.RRS)
+ info.Backend.StandardSCData = append(info.Backend.StandardSCData, pool.SetDriveCount()-scParity)
+ info.Backend.RRSCData = append(info.Backend.RRSCData, pool.SetDriveCount()-rrSCParity)
+ info.Backend.StandardSCParity = scParity
+ info.Backend.RRSCParity = rrSCParity
+
+ z.poolMetaMutex.Lock()
+ defer z.poolMetaMutex.Unlock()
+
+ if err = z.poolMeta.Decommission(idx, info); err != nil {
+ return err
+ }
+ z.poolMeta.QueueBuckets(idx, buckets)
+ if err = z.poolMeta.save(ctx, z.serverPools); err != nil {
+ return err
+ }
+ globalNotificationSys.ReloadPoolMeta(ctx)
+ return nil
+}
diff --git a/cmd/erasure-server-pool-decom_gen.go b/cmd/erasure-server-pool-decom_gen.go
new file mode 100644
index 000000000..07294cd3f
--- /dev/null
+++ b/cmd/erasure-server-pool-decom_gen.go
@@ -0,0 +1,932 @@
+package cmd
+
+// Code generated by github.com/tinylib/msgp DO NOT EDIT.
+
+import (
+ "github.com/tinylib/msgp/msgp"
+)
+
+// DecodeMsg implements msgp.Decodable
+func (z *PoolDecommissionInfo) DecodeMsg(dc *msgp.Reader) (err error) {
+ var field []byte
+ _ = field
+ var zb0001 uint32
+ zb0001, err = dc.ReadMapHeader()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ for zb0001 > 0 {
+ zb0001--
+ field, err = dc.ReadMapKeyPtr()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ switch msgp.UnsafeString(field) {
+ case "st":
+ z.StartTime, err = dc.ReadTime()
+ if err != nil {
+ err = msgp.WrapError(err, "StartTime")
+ return
+ }
+ case "ss":
+ z.StartSize, err = dc.ReadInt64()
+ if err != nil {
+ err = msgp.WrapError(err, "StartSize")
+ return
+ }
+ case "ts":
+ z.TotalSize, err = dc.ReadInt64()
+ if err != nil {
+ err = msgp.WrapError(err, "TotalSize")
+ return
+ }
+ case "cs":
+ z.CurrentSize, err = dc.ReadInt64()
+ if err != nil {
+ err = msgp.WrapError(err, "CurrentSize")
+ return
+ }
+ case "cmp":
+ z.Complete, err = dc.ReadBool()
+ if err != nil {
+ err = msgp.WrapError(err, "Complete")
+ return
+ }
+ case "fl":
+ z.Failed, err = dc.ReadBool()
+ if err != nil {
+ err = msgp.WrapError(err, "Failed")
+ return
+ }
+ case "cnl":
+ z.Canceled, err = dc.ReadBool()
+ if err != nil {
+ err = msgp.WrapError(err, "Canceled")
+ return
+ }
+ case "bkts":
+ var zb0002 uint32
+ zb0002, err = dc.ReadArrayHeader()
+ if err != nil {
+ err = msgp.WrapError(err, "QueuedBuckets")
+ return
+ }
+ if cap(z.QueuedBuckets) >= int(zb0002) {
+ z.QueuedBuckets = (z.QueuedBuckets)[:zb0002]
+ } else {
+ z.QueuedBuckets = make([]string, zb0002)
+ }
+ for za0001 := range z.QueuedBuckets {
+ z.QueuedBuckets[za0001], err = dc.ReadString()
+ if err != nil {
+ err = msgp.WrapError(err, "QueuedBuckets", za0001)
+ return
+ }
+ }
+ case "dbkts":
+ var zb0003 uint32
+ zb0003, err = dc.ReadArrayHeader()
+ if err != nil {
+ err = msgp.WrapError(err, "DecommissionedBuckets")
+ return
+ }
+ if cap(z.DecommissionedBuckets) >= int(zb0003) {
+ z.DecommissionedBuckets = (z.DecommissionedBuckets)[:zb0003]
+ } else {
+ z.DecommissionedBuckets = make([]string, zb0003)
+ }
+ for za0002 := range z.DecommissionedBuckets {
+ z.DecommissionedBuckets[za0002], err = dc.ReadString()
+ if err != nil {
+ err = msgp.WrapError(err, "DecommissionedBuckets", za0002)
+ return
+ }
+ }
+ case "bkt":
+ z.Bucket, err = dc.ReadString()
+ if err != nil {
+ err = msgp.WrapError(err, "Bucket")
+ return
+ }
+ case "obj":
+ z.Object, err = dc.ReadString()
+ if err != nil {
+ err = msgp.WrapError(err, "Object")
+ return
+ }
+ case "id":
+ z.ItemsDecommissioned, err = dc.ReadUint64()
+ if err != nil {
+ err = msgp.WrapError(err, "ItemsDecommissioned")
+ return
+ }
+ case "idf":
+ z.ItemsDecommissionFailed, err = dc.ReadUint64()
+ if err != nil {
+ err = msgp.WrapError(err, "ItemsDecommissionFailed")
+ return
+ }
+ case "bd":
+ z.BytesDone, err = dc.ReadUint64()
+ if err != nil {
+ err = msgp.WrapError(err, "BytesDone")
+ return
+ }
+ case "bf":
+ z.BytesFailed, err = dc.ReadUint64()
+ if err != nil {
+ err = msgp.WrapError(err, "BytesFailed")
+ return
+ }
+ default:
+ err = dc.Skip()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ }
+ }
+ return
+}
+
+// EncodeMsg implements msgp.Encodable
+func (z *PoolDecommissionInfo) EncodeMsg(en *msgp.Writer) (err error) {
+ // map header, size 15
+ // write "st"
+ err = en.Append(0x8f, 0xa2, 0x73, 0x74)
+ if err != nil {
+ return
+ }
+ err = en.WriteTime(z.StartTime)
+ if err != nil {
+ err = msgp.WrapError(err, "StartTime")
+ return
+ }
+ // write "ss"
+ err = en.Append(0xa2, 0x73, 0x73)
+ if err != nil {
+ return
+ }
+ err = en.WriteInt64(z.StartSize)
+ if err != nil {
+ err = msgp.WrapError(err, "StartSize")
+ return
+ }
+ // write "ts"
+ err = en.Append(0xa2, 0x74, 0x73)
+ if err != nil {
+ return
+ }
+ err = en.WriteInt64(z.TotalSize)
+ if err != nil {
+ err = msgp.WrapError(err, "TotalSize")
+ return
+ }
+ // write "cs"
+ err = en.Append(0xa2, 0x63, 0x73)
+ if err != nil {
+ return
+ }
+ err = en.WriteInt64(z.CurrentSize)
+ if err != nil {
+ err = msgp.WrapError(err, "CurrentSize")
+ return
+ }
+ // write "cmp"
+ err = en.Append(0xa3, 0x63, 0x6d, 0x70)
+ if err != nil {
+ return
+ }
+ err = en.WriteBool(z.Complete)
+ if err != nil {
+ err = msgp.WrapError(err, "Complete")
+ return
+ }
+ // write "fl"
+ err = en.Append(0xa2, 0x66, 0x6c)
+ if err != nil {
+ return
+ }
+ err = en.WriteBool(z.Failed)
+ if err != nil {
+ err = msgp.WrapError(err, "Failed")
+ return
+ }
+ // write "cnl"
+ err = en.Append(0xa3, 0x63, 0x6e, 0x6c)
+ if err != nil {
+ return
+ }
+ err = en.WriteBool(z.Canceled)
+ if err != nil {
+ err = msgp.WrapError(err, "Canceled")
+ return
+ }
+ // write "bkts"
+ err = en.Append(0xa4, 0x62, 0x6b, 0x74, 0x73)
+ if err != nil {
+ return
+ }
+ err = en.WriteArrayHeader(uint32(len(z.QueuedBuckets)))
+ if err != nil {
+ err = msgp.WrapError(err, "QueuedBuckets")
+ return
+ }
+ for za0001 := range z.QueuedBuckets {
+ err = en.WriteString(z.QueuedBuckets[za0001])
+ if err != nil {
+ err = msgp.WrapError(err, "QueuedBuckets", za0001)
+ return
+ }
+ }
+ // write "dbkts"
+ err = en.Append(0xa5, 0x64, 0x62, 0x6b, 0x74, 0x73)
+ if err != nil {
+ return
+ }
+ err = en.WriteArrayHeader(uint32(len(z.DecommissionedBuckets)))
+ if err != nil {
+ err = msgp.WrapError(err, "DecommissionedBuckets")
+ return
+ }
+ for za0002 := range z.DecommissionedBuckets {
+ err = en.WriteString(z.DecommissionedBuckets[za0002])
+ if err != nil {
+ err = msgp.WrapError(err, "DecommissionedBuckets", za0002)
+ return
+ }
+ }
+ // write "bkt"
+ err = en.Append(0xa3, 0x62, 0x6b, 0x74)
+ if err != nil {
+ return
+ }
+ err = en.WriteString(z.Bucket)
+ if err != nil {
+ err = msgp.WrapError(err, "Bucket")
+ return
+ }
+ // write "obj"
+ err = en.Append(0xa3, 0x6f, 0x62, 0x6a)
+ if err != nil {
+ return
+ }
+ err = en.WriteString(z.Object)
+ if err != nil {
+ err = msgp.WrapError(err, "Object")
+ return
+ }
+ // write "id"
+ err = en.Append(0xa2, 0x69, 0x64)
+ if err != nil {
+ return
+ }
+ err = en.WriteUint64(z.ItemsDecommissioned)
+ if err != nil {
+ err = msgp.WrapError(err, "ItemsDecommissioned")
+ return
+ }
+ // write "idf"
+ err = en.Append(0xa3, 0x69, 0x64, 0x66)
+ if err != nil {
+ return
+ }
+ err = en.WriteUint64(z.ItemsDecommissionFailed)
+ if err != nil {
+ err = msgp.WrapError(err, "ItemsDecommissionFailed")
+ return
+ }
+ // write "bd"
+ err = en.Append(0xa2, 0x62, 0x64)
+ if err != nil {
+ return
+ }
+ err = en.WriteUint64(z.BytesDone)
+ if err != nil {
+ err = msgp.WrapError(err, "BytesDone")
+ return
+ }
+ // write "bf"
+ err = en.Append(0xa2, 0x62, 0x66)
+ if err != nil {
+ return
+ }
+ err = en.WriteUint64(z.BytesFailed)
+ if err != nil {
+ err = msgp.WrapError(err, "BytesFailed")
+ return
+ }
+ return
+}
+
+// MarshalMsg implements msgp.Marshaler
+func (z *PoolDecommissionInfo) MarshalMsg(b []byte) (o []byte, err error) {
+ o = msgp.Require(b, z.Msgsize())
+ // map header, size 15
+ // string "st"
+ o = append(o, 0x8f, 0xa2, 0x73, 0x74)
+ o = msgp.AppendTime(o, z.StartTime)
+ // string "ss"
+ o = append(o, 0xa2, 0x73, 0x73)
+ o = msgp.AppendInt64(o, z.StartSize)
+ // string "ts"
+ o = append(o, 0xa2, 0x74, 0x73)
+ o = msgp.AppendInt64(o, z.TotalSize)
+ // string "cs"
+ o = append(o, 0xa2, 0x63, 0x73)
+ o = msgp.AppendInt64(o, z.CurrentSize)
+ // string "cmp"
+ o = append(o, 0xa3, 0x63, 0x6d, 0x70)
+ o = msgp.AppendBool(o, z.Complete)
+ // string "fl"
+ o = append(o, 0xa2, 0x66, 0x6c)
+ o = msgp.AppendBool(o, z.Failed)
+ // string "cnl"
+ o = append(o, 0xa3, 0x63, 0x6e, 0x6c)
+ o = msgp.AppendBool(o, z.Canceled)
+ // string "bkts"
+ o = append(o, 0xa4, 0x62, 0x6b, 0x74, 0x73)
+ o = msgp.AppendArrayHeader(o, uint32(len(z.QueuedBuckets)))
+ for za0001 := range z.QueuedBuckets {
+ o = msgp.AppendString(o, z.QueuedBuckets[za0001])
+ }
+ // string "dbkts"
+ o = append(o, 0xa5, 0x64, 0x62, 0x6b, 0x74, 0x73)
+ o = msgp.AppendArrayHeader(o, uint32(len(z.DecommissionedBuckets)))
+ for za0002 := range z.DecommissionedBuckets {
+ o = msgp.AppendString(o, z.DecommissionedBuckets[za0002])
+ }
+ // string "bkt"
+ o = append(o, 0xa3, 0x62, 0x6b, 0x74)
+ o = msgp.AppendString(o, z.Bucket)
+ // string "obj"
+ o = append(o, 0xa3, 0x6f, 0x62, 0x6a)
+ o = msgp.AppendString(o, z.Object)
+ // string "id"
+ o = append(o, 0xa2, 0x69, 0x64)
+ o = msgp.AppendUint64(o, z.ItemsDecommissioned)
+ // string "idf"
+ o = append(o, 0xa3, 0x69, 0x64, 0x66)
+ o = msgp.AppendUint64(o, z.ItemsDecommissionFailed)
+ // string "bd"
+ o = append(o, 0xa2, 0x62, 0x64)
+ o = msgp.AppendUint64(o, z.BytesDone)
+ // string "bf"
+ o = append(o, 0xa2, 0x62, 0x66)
+ o = msgp.AppendUint64(o, z.BytesFailed)
+ return
+}
+
+// UnmarshalMsg implements msgp.Unmarshaler
+func (z *PoolDecommissionInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
+ var field []byte
+ _ = field
+ var zb0001 uint32
+ zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ for zb0001 > 0 {
+ zb0001--
+ field, bts, err = msgp.ReadMapKeyZC(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ switch msgp.UnsafeString(field) {
+ case "st":
+ z.StartTime, bts, err = msgp.ReadTimeBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "StartTime")
+ return
+ }
+ case "ss":
+ z.StartSize, bts, err = msgp.ReadInt64Bytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "StartSize")
+ return
+ }
+ case "ts":
+ z.TotalSize, bts, err = msgp.ReadInt64Bytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "TotalSize")
+ return
+ }
+ case "cs":
+ z.CurrentSize, bts, err = msgp.ReadInt64Bytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "CurrentSize")
+ return
+ }
+ case "cmp":
+ z.Complete, bts, err = msgp.ReadBoolBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "Complete")
+ return
+ }
+ case "fl":
+ z.Failed, bts, err = msgp.ReadBoolBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "Failed")
+ return
+ }
+ case "cnl":
+ z.Canceled, bts, err = msgp.ReadBoolBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "Canceled")
+ return
+ }
+ case "bkts":
+ var zb0002 uint32
+ zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "QueuedBuckets")
+ return
+ }
+ if cap(z.QueuedBuckets) >= int(zb0002) {
+ z.QueuedBuckets = (z.QueuedBuckets)[:zb0002]
+ } else {
+ z.QueuedBuckets = make([]string, zb0002)
+ }
+ for za0001 := range z.QueuedBuckets {
+ z.QueuedBuckets[za0001], bts, err = msgp.ReadStringBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "QueuedBuckets", za0001)
+ return
+ }
+ }
+ case "dbkts":
+ var zb0003 uint32
+ zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "DecommissionedBuckets")
+ return
+ }
+ if cap(z.DecommissionedBuckets) >= int(zb0003) {
+ z.DecommissionedBuckets = (z.DecommissionedBuckets)[:zb0003]
+ } else {
+ z.DecommissionedBuckets = make([]string, zb0003)
+ }
+ for za0002 := range z.DecommissionedBuckets {
+ z.DecommissionedBuckets[za0002], bts, err = msgp.ReadStringBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "DecommissionedBuckets", za0002)
+ return
+ }
+ }
+ case "bkt":
+ z.Bucket, bts, err = msgp.ReadStringBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "Bucket")
+ return
+ }
+ case "obj":
+ z.Object, bts, err = msgp.ReadStringBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "Object")
+ return
+ }
+ case "id":
+ z.ItemsDecommissioned, bts, err = msgp.ReadUint64Bytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "ItemsDecommissioned")
+ return
+ }
+ case "idf":
+ z.ItemsDecommissionFailed, bts, err = msgp.ReadUint64Bytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "ItemsDecommissionFailed")
+ return
+ }
+ case "bd":
+ z.BytesDone, bts, err = msgp.ReadUint64Bytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "BytesDone")
+ return
+ }
+ case "bf":
+ z.BytesFailed, bts, err = msgp.ReadUint64Bytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "BytesFailed")
+ return
+ }
+ default:
+ bts, err = msgp.Skip(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ }
+ }
+ o = bts
+ return
+}
+
+// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
+func (z *PoolDecommissionInfo) Msgsize() (s int) {
+ s = 1 + 3 + msgp.TimeSize + 3 + msgp.Int64Size + 3 + msgp.Int64Size + 3 + msgp.Int64Size + 4 + msgp.BoolSize + 3 + msgp.BoolSize + 4 + msgp.BoolSize + 5 + msgp.ArrayHeaderSize
+ for za0001 := range z.QueuedBuckets {
+ s += msgp.StringPrefixSize + len(z.QueuedBuckets[za0001])
+ }
+ s += 6 + msgp.ArrayHeaderSize
+ for za0002 := range z.DecommissionedBuckets {
+ s += msgp.StringPrefixSize + len(z.DecommissionedBuckets[za0002])
+ }
+ s += 4 + msgp.StringPrefixSize + len(z.Bucket) + 4 + msgp.StringPrefixSize + len(z.Object) + 3 + msgp.Uint64Size + 4 + msgp.Uint64Size + 3 + msgp.Uint64Size + 3 + msgp.Uint64Size
+ return
+}
+
+// DecodeMsg implements msgp.Decodable
+func (z *PoolStatus) DecodeMsg(dc *msgp.Reader) (err error) {
+ var field []byte
+ _ = field
+ var zb0001 uint32
+ zb0001, err = dc.ReadMapHeader()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ for zb0001 > 0 {
+ zb0001--
+ field, err = dc.ReadMapKeyPtr()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ switch msgp.UnsafeString(field) {
+ case "id":
+ z.ID, err = dc.ReadInt()
+ if err != nil {
+ err = msgp.WrapError(err, "ID")
+ return
+ }
+ case "cl":
+ z.CmdLine, err = dc.ReadString()
+ if err != nil {
+ err = msgp.WrapError(err, "CmdLine")
+ return
+ }
+ case "lu":
+ z.LastUpdate, err = dc.ReadTime()
+ if err != nil {
+ err = msgp.WrapError(err, "LastUpdate")
+ return
+ }
+ case "dec":
+ if dc.IsNil() {
+ err = dc.ReadNil()
+ if err != nil {
+ err = msgp.WrapError(err, "Decommission")
+ return
+ }
+ z.Decommission = nil
+ } else {
+ if z.Decommission == nil {
+ z.Decommission = new(PoolDecommissionInfo)
+ }
+ err = z.Decommission.DecodeMsg(dc)
+ if err != nil {
+ err = msgp.WrapError(err, "Decommission")
+ return
+ }
+ }
+ default:
+ err = dc.Skip()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ }
+ }
+ return
+}
+
+// EncodeMsg implements msgp.Encodable
+func (z *PoolStatus) EncodeMsg(en *msgp.Writer) (err error) {
+ // map header, size 4
+ // write "id"
+ err = en.Append(0x84, 0xa2, 0x69, 0x64)
+ if err != nil {
+ return
+ }
+ err = en.WriteInt(z.ID)
+ if err != nil {
+ err = msgp.WrapError(err, "ID")
+ return
+ }
+ // write "cl"
+ err = en.Append(0xa2, 0x63, 0x6c)
+ if err != nil {
+ return
+ }
+ err = en.WriteString(z.CmdLine)
+ if err != nil {
+ err = msgp.WrapError(err, "CmdLine")
+ return
+ }
+ // write "lu"
+ err = en.Append(0xa2, 0x6c, 0x75)
+ if err != nil {
+ return
+ }
+ err = en.WriteTime(z.LastUpdate)
+ if err != nil {
+ err = msgp.WrapError(err, "LastUpdate")
+ return
+ }
+ // write "dec"
+ err = en.Append(0xa3, 0x64, 0x65, 0x63)
+ if err != nil {
+ return
+ }
+ if z.Decommission == nil {
+ err = en.WriteNil()
+ if err != nil {
+ return
+ }
+ } else {
+ err = z.Decommission.EncodeMsg(en)
+ if err != nil {
+ err = msgp.WrapError(err, "Decommission")
+ return
+ }
+ }
+ return
+}
+
+// MarshalMsg implements msgp.Marshaler
+func (z *PoolStatus) MarshalMsg(b []byte) (o []byte, err error) {
+ o = msgp.Require(b, z.Msgsize())
+ // map header, size 4
+ // string "id"
+ o = append(o, 0x84, 0xa2, 0x69, 0x64)
+ o = msgp.AppendInt(o, z.ID)
+ // string "cl"
+ o = append(o, 0xa2, 0x63, 0x6c)
+ o = msgp.AppendString(o, z.CmdLine)
+ // string "lu"
+ o = append(o, 0xa2, 0x6c, 0x75)
+ o = msgp.AppendTime(o, z.LastUpdate)
+ // string "dec"
+ o = append(o, 0xa3, 0x64, 0x65, 0x63)
+ if z.Decommission == nil {
+ o = msgp.AppendNil(o)
+ } else {
+ o, err = z.Decommission.MarshalMsg(o)
+ if err != nil {
+ err = msgp.WrapError(err, "Decommission")
+ return
+ }
+ }
+ return
+}
+
+// UnmarshalMsg implements msgp.Unmarshaler
+func (z *PoolStatus) UnmarshalMsg(bts []byte) (o []byte, err error) {
+ var field []byte
+ _ = field
+ var zb0001 uint32
+ zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ for zb0001 > 0 {
+ zb0001--
+ field, bts, err = msgp.ReadMapKeyZC(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ switch msgp.UnsafeString(field) {
+ case "id":
+ z.ID, bts, err = msgp.ReadIntBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "ID")
+ return
+ }
+ case "cl":
+ z.CmdLine, bts, err = msgp.ReadStringBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "CmdLine")
+ return
+ }
+ case "lu":
+ z.LastUpdate, bts, err = msgp.ReadTimeBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "LastUpdate")
+ return
+ }
+ case "dec":
+ if msgp.IsNil(bts) {
+ bts, err = msgp.ReadNilBytes(bts)
+ if err != nil {
+ return
+ }
+ z.Decommission = nil
+ } else {
+ if z.Decommission == nil {
+ z.Decommission = new(PoolDecommissionInfo)
+ }
+ bts, err = z.Decommission.UnmarshalMsg(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "Decommission")
+ return
+ }
+ }
+ default:
+ bts, err = msgp.Skip(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ }
+ }
+ o = bts
+ return
+}
+
+// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
+func (z *PoolStatus) Msgsize() (s int) {
+ s = 1 + 3 + msgp.IntSize + 3 + msgp.StringPrefixSize + len(z.CmdLine) + 3 + msgp.TimeSize + 4
+ if z.Decommission == nil {
+ s += msgp.NilSize
+ } else {
+ s += z.Decommission.Msgsize()
+ }
+ return
+}
+
+// DecodeMsg implements msgp.Decodable
+func (z *poolMeta) DecodeMsg(dc *msgp.Reader) (err error) {
+ var field []byte
+ _ = field
+ var zb0001 uint32
+ zb0001, err = dc.ReadMapHeader()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ for zb0001 > 0 {
+ zb0001--
+ field, err = dc.ReadMapKeyPtr()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ switch msgp.UnsafeString(field) {
+ case "v":
+ z.Version, err = dc.ReadInt()
+ if err != nil {
+ err = msgp.WrapError(err, "Version")
+ return
+ }
+ case "pls":
+ var zb0002 uint32
+ zb0002, err = dc.ReadArrayHeader()
+ if err != nil {
+ err = msgp.WrapError(err, "Pools")
+ return
+ }
+ if cap(z.Pools) >= int(zb0002) {
+ z.Pools = (z.Pools)[:zb0002]
+ } else {
+ z.Pools = make([]PoolStatus, zb0002)
+ }
+ for za0001 := range z.Pools {
+ err = z.Pools[za0001].DecodeMsg(dc)
+ if err != nil {
+ err = msgp.WrapError(err, "Pools", za0001)
+ return
+ }
+ }
+ default:
+ err = dc.Skip()
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ }
+ }
+ return
+}
+
+// EncodeMsg implements msgp.Encodable
+func (z *poolMeta) EncodeMsg(en *msgp.Writer) (err error) {
+ // map header, size 2
+ // write "v"
+ err = en.Append(0x82, 0xa1, 0x76)
+ if err != nil {
+ return
+ }
+ err = en.WriteInt(z.Version)
+ if err != nil {
+ err = msgp.WrapError(err, "Version")
+ return
+ }
+ // write "pls"
+ err = en.Append(0xa3, 0x70, 0x6c, 0x73)
+ if err != nil {
+ return
+ }
+ err = en.WriteArrayHeader(uint32(len(z.Pools)))
+ if err != nil {
+ err = msgp.WrapError(err, "Pools")
+ return
+ }
+ for za0001 := range z.Pools {
+ err = z.Pools[za0001].EncodeMsg(en)
+ if err != nil {
+ err = msgp.WrapError(err, "Pools", za0001)
+ return
+ }
+ }
+ return
+}
+
+// MarshalMsg implements msgp.Marshaler
+func (z *poolMeta) MarshalMsg(b []byte) (o []byte, err error) {
+ o = msgp.Require(b, z.Msgsize())
+ // map header, size 2
+ // string "v"
+ o = append(o, 0x82, 0xa1, 0x76)
+ o = msgp.AppendInt(o, z.Version)
+ // string "pls"
+ o = append(o, 0xa3, 0x70, 0x6c, 0x73)
+ o = msgp.AppendArrayHeader(o, uint32(len(z.Pools)))
+ for za0001 := range z.Pools {
+ o, err = z.Pools[za0001].MarshalMsg(o)
+ if err != nil {
+ err = msgp.WrapError(err, "Pools", za0001)
+ return
+ }
+ }
+ return
+}
+
+// UnmarshalMsg implements msgp.Unmarshaler
+func (z *poolMeta) UnmarshalMsg(bts []byte) (o []byte, err error) {
+ var field []byte
+ _ = field
+ var zb0001 uint32
+ zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ for zb0001 > 0 {
+ zb0001--
+ field, bts, err = msgp.ReadMapKeyZC(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ switch msgp.UnsafeString(field) {
+ case "v":
+ z.Version, bts, err = msgp.ReadIntBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "Version")
+ return
+ }
+ case "pls":
+ var zb0002 uint32
+ zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "Pools")
+ return
+ }
+ if cap(z.Pools) >= int(zb0002) {
+ z.Pools = (z.Pools)[:zb0002]
+ } else {
+ z.Pools = make([]PoolStatus, zb0002)
+ }
+ for za0001 := range z.Pools {
+ bts, err = z.Pools[za0001].UnmarshalMsg(bts)
+ if err != nil {
+ err = msgp.WrapError(err, "Pools", za0001)
+ return
+ }
+ }
+ default:
+ bts, err = msgp.Skip(bts)
+ if err != nil {
+ err = msgp.WrapError(err)
+ return
+ }
+ }
+ }
+ o = bts
+ return
+}
+
+// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
+func (z *poolMeta) Msgsize() (s int) {
+ s = 1 + 2 + msgp.IntSize + 4 + msgp.ArrayHeaderSize
+ for za0001 := range z.Pools {
+ s += z.Pools[za0001].Msgsize()
+ }
+ return
+}
diff --git a/cmd/erasure-server-pool-decom_gen_test.go b/cmd/erasure-server-pool-decom_gen_test.go
new file mode 100644
index 000000000..b479ceb1e
--- /dev/null
+++ b/cmd/erasure-server-pool-decom_gen_test.go
@@ -0,0 +1,349 @@
+package cmd
+
+// Code generated by github.com/tinylib/msgp DO NOT EDIT.
+
+import (
+ "bytes"
+ "testing"
+
+ "github.com/tinylib/msgp/msgp"
+)
+
+func TestMarshalUnmarshalPoolDecommissionInfo(t *testing.T) {
+ v := PoolDecommissionInfo{}
+ bts, err := v.MarshalMsg(nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ left, err := v.UnmarshalMsg(bts)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(left) > 0 {
+ t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
+ }
+
+ left, err = msgp.Skip(bts)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(left) > 0 {
+ t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
+ }
+}
+
+func BenchmarkMarshalMsgPoolDecommissionInfo(b *testing.B) {
+ v := PoolDecommissionInfo{}
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ v.MarshalMsg(nil)
+ }
+}
+
+func BenchmarkAppendMsgPoolDecommissionInfo(b *testing.B) {
+ v := PoolDecommissionInfo{}
+ bts := make([]byte, 0, v.Msgsize())
+ bts, _ = v.MarshalMsg(bts[0:0])
+ b.SetBytes(int64(len(bts)))
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ bts, _ = v.MarshalMsg(bts[0:0])
+ }
+}
+
+func BenchmarkUnmarshalPoolDecommissionInfo(b *testing.B) {
+ v := PoolDecommissionInfo{}
+ bts, _ := v.MarshalMsg(nil)
+ b.ReportAllocs()
+ b.SetBytes(int64(len(bts)))
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ _, err := v.UnmarshalMsg(bts)
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+}
+
+func TestEncodeDecodePoolDecommissionInfo(t *testing.T) {
+ v := PoolDecommissionInfo{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+
+ m := v.Msgsize()
+ if buf.Len() > m {
+ t.Log("WARNING: TestEncodeDecodePoolDecommissionInfo Msgsize() is inaccurate")
+ }
+
+ vn := PoolDecommissionInfo{}
+ err := msgp.Decode(&buf, &vn)
+ if err != nil {
+ t.Error(err)
+ }
+
+ buf.Reset()
+ msgp.Encode(&buf, &v)
+ err = msgp.NewReader(&buf).Skip()
+ if err != nil {
+ t.Error(err)
+ }
+}
+
+func BenchmarkEncodePoolDecommissionInfo(b *testing.B) {
+ v := PoolDecommissionInfo{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+ b.SetBytes(int64(buf.Len()))
+ en := msgp.NewWriter(msgp.Nowhere)
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ v.EncodeMsg(en)
+ }
+ en.Flush()
+}
+
+func BenchmarkDecodePoolDecommissionInfo(b *testing.B) {
+ v := PoolDecommissionInfo{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+ b.SetBytes(int64(buf.Len()))
+ rd := msgp.NewEndlessReader(buf.Bytes(), b)
+ dc := msgp.NewReader(rd)
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ err := v.DecodeMsg(dc)
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+}
+
+func TestMarshalUnmarshalPoolStatus(t *testing.T) {
+ v := PoolStatus{}
+ bts, err := v.MarshalMsg(nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ left, err := v.UnmarshalMsg(bts)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(left) > 0 {
+ t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
+ }
+
+ left, err = msgp.Skip(bts)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(left) > 0 {
+ t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
+ }
+}
+
+func BenchmarkMarshalMsgPoolStatus(b *testing.B) {
+ v := PoolStatus{}
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ v.MarshalMsg(nil)
+ }
+}
+
+func BenchmarkAppendMsgPoolStatus(b *testing.B) {
+ v := PoolStatus{}
+ bts := make([]byte, 0, v.Msgsize())
+ bts, _ = v.MarshalMsg(bts[0:0])
+ b.SetBytes(int64(len(bts)))
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ bts, _ = v.MarshalMsg(bts[0:0])
+ }
+}
+
+func BenchmarkUnmarshalPoolStatus(b *testing.B) {
+ v := PoolStatus{}
+ bts, _ := v.MarshalMsg(nil)
+ b.ReportAllocs()
+ b.SetBytes(int64(len(bts)))
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ _, err := v.UnmarshalMsg(bts)
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+}
+
+func TestEncodeDecodePoolStatus(t *testing.T) {
+ v := PoolStatus{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+
+ m := v.Msgsize()
+ if buf.Len() > m {
+ t.Log("WARNING: TestEncodeDecodePoolStatus Msgsize() is inaccurate")
+ }
+
+ vn := PoolStatus{}
+ err := msgp.Decode(&buf, &vn)
+ if err != nil {
+ t.Error(err)
+ }
+
+ buf.Reset()
+ msgp.Encode(&buf, &v)
+ err = msgp.NewReader(&buf).Skip()
+ if err != nil {
+ t.Error(err)
+ }
+}
+
+func BenchmarkEncodePoolStatus(b *testing.B) {
+ v := PoolStatus{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+ b.SetBytes(int64(buf.Len()))
+ en := msgp.NewWriter(msgp.Nowhere)
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ v.EncodeMsg(en)
+ }
+ en.Flush()
+}
+
+func BenchmarkDecodePoolStatus(b *testing.B) {
+ v := PoolStatus{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+ b.SetBytes(int64(buf.Len()))
+ rd := msgp.NewEndlessReader(buf.Bytes(), b)
+ dc := msgp.NewReader(rd)
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ err := v.DecodeMsg(dc)
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+}
+
+func TestMarshalUnmarshalpoolMeta(t *testing.T) {
+ v := poolMeta{}
+ bts, err := v.MarshalMsg(nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ left, err := v.UnmarshalMsg(bts)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(left) > 0 {
+ t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
+ }
+
+ left, err = msgp.Skip(bts)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(left) > 0 {
+ t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
+ }
+}
+
+func BenchmarkMarshalMsgpoolMeta(b *testing.B) {
+ v := poolMeta{}
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ v.MarshalMsg(nil)
+ }
+}
+
+func BenchmarkAppendMsgpoolMeta(b *testing.B) {
+ v := poolMeta{}
+ bts := make([]byte, 0, v.Msgsize())
+ bts, _ = v.MarshalMsg(bts[0:0])
+ b.SetBytes(int64(len(bts)))
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ bts, _ = v.MarshalMsg(bts[0:0])
+ }
+}
+
+func BenchmarkUnmarshalpoolMeta(b *testing.B) {
+ v := poolMeta{}
+ bts, _ := v.MarshalMsg(nil)
+ b.ReportAllocs()
+ b.SetBytes(int64(len(bts)))
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ _, err := v.UnmarshalMsg(bts)
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+}
+
+func TestEncodeDecodepoolMeta(t *testing.T) {
+ v := poolMeta{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+
+ m := v.Msgsize()
+ if buf.Len() > m {
+ t.Log("WARNING: TestEncodeDecodepoolMeta Msgsize() is inaccurate")
+ }
+
+ vn := poolMeta{}
+ err := msgp.Decode(&buf, &vn)
+ if err != nil {
+ t.Error(err)
+ }
+
+ buf.Reset()
+ msgp.Encode(&buf, &v)
+ err = msgp.NewReader(&buf).Skip()
+ if err != nil {
+ t.Error(err)
+ }
+}
+
+func BenchmarkEncodepoolMeta(b *testing.B) {
+ v := poolMeta{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+ b.SetBytes(int64(buf.Len()))
+ en := msgp.NewWriter(msgp.Nowhere)
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ v.EncodeMsg(en)
+ }
+ en.Flush()
+}
+
+func BenchmarkDecodepoolMeta(b *testing.B) {
+ v := poolMeta{}
+ var buf bytes.Buffer
+ msgp.Encode(&buf, &v)
+ b.SetBytes(int64(buf.Len()))
+ rd := msgp.NewEndlessReader(buf.Bytes(), b)
+ dc := msgp.NewReader(rd)
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ err := v.DecodeMsg(dc)
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+}
diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go
index b0c51b76a..21ad5cc1b 100644
--- a/cmd/erasure-server-pool.go
+++ b/cmd/erasure-server-pool.go
@@ -42,10 +42,15 @@ import (
type erasureServerPools struct {
GatewayUnsupported
- serverPools []*erasureSets
+ poolMetaMutex sync.RWMutex
+ poolMeta poolMeta
+ serverPools []*erasureSets
// Shut down async operations
shutdown context.CancelFunc
+
+ // Active decommission canceler
+ decommissionCancelers []context.CancelFunc
}
func (z *erasureServerPools) SinglePool() bool {
@@ -62,7 +67,9 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ
formats = make([]*formatErasureV3, len(endpointServerPools))
storageDisks = make([][]StorageAPI, len(endpointServerPools))
- z = &erasureServerPools{serverPools: make([]*erasureSets, len(endpointServerPools))}
+ z = &erasureServerPools{
+ serverPools: make([]*erasureSets, len(endpointServerPools)),
+ }
)
var localDrives []string
@@ -108,11 +115,26 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ
return nil, fmt.Errorf("All serverPools should have same deployment ID expected %s, got %s", deploymentID, formats[i].ID)
}
- z.serverPools[i], err = newErasureSets(ctx, ep.Endpoints, storageDisks[i], formats[i], commonParityDrives, i)
+ z.serverPools[i], err = newErasureSets(ctx, ep, storageDisks[i], formats[i], commonParityDrives, i)
if err != nil {
return nil, err
}
}
+
+ z.decommissionCancelers = make([]context.CancelFunc, len(z.serverPools))
+ r := rand.New(rand.NewSource(time.Now().UnixNano()))
+ for {
+ err := z.Init(ctx)
+ if err != nil {
+ if !configRetriableErrors(err) {
+ logger.Fatal(err, "Unable to initialize backend")
+ }
+ time.Sleep(time.Duration(r.Float64() * float64(5*time.Second)))
+ continue
+ }
+ break
+ }
+
ctx, z.shutdown = context.WithCancel(ctx)
go intDataUpdateTracker.start(ctx, localDrives...)
return z, nil
@@ -249,6 +271,10 @@ func (z *erasureServerPools) getServerPoolsAvailableSpace(ctx context.Context, b
g := errgroup.WithNErrs(len(z.serverPools))
for index := range z.serverPools {
index := index
+ // skip suspended pools for any new I/O.
+ if z.poolMeta.IsSuspended(index) {
+ continue
+ }
g.Go(func() error {
// Get the set where it would be placed.
storageInfos[index] = getDiskInfos(ctx, z.serverPools[index].getHashedSet(object).getDisks())
@@ -292,19 +318,24 @@ func (z *erasureServerPools) getPoolIdxExistingWithOpts(ctx context.Context, buc
}
poolObjInfos := make([]poolObjInfo, len(z.serverPools))
+ poolOpts := make([]ObjectOptions, len(z.serverPools))
+ for i := range z.serverPools {
+ poolOpts[i] = opts
+ }
var wg sync.WaitGroup
for i, pool := range z.serverPools {
wg.Add(1)
- go func(i int, pool *erasureSets) {
+ go func(i int, pool *erasureSets, opts ObjectOptions) {
defer wg.Done()
// remember the pool index, we may sort the slice original index might be lost.
pinfo := poolObjInfo{
PoolIndex: i,
}
+ opts.VersionID = "" // no need to check for specific versionId
pinfo.ObjInfo, pinfo.Err = pool.GetObjectInfo(ctx, bucket, object, opts)
poolObjInfos[i] = pinfo
- }(i, pool)
+ }(i, pool, poolOpts[i])
}
wg.Wait()
@@ -322,6 +353,12 @@ func (z *erasureServerPools) getPoolIdxExistingWithOpts(ctx context.Context, buc
if pinfo.Err != nil && !isErrObjectNotFound(pinfo.Err) {
return -1, pinfo.Err
}
+
+ // skip all objects from suspended pools for mutating calls.
+ if z.poolMeta.IsSuspended(pinfo.PoolIndex) && opts.Mutate {
+ continue
+ }
+
if isErrObjectNotFound(pinfo.Err) {
// No object exists or its a delete marker,
// check objInfo to confirm.
@@ -333,23 +370,23 @@ func (z *erasureServerPools) getPoolIdxExistingWithOpts(ctx context.Context, buc
// exist proceed to next pool.
continue
}
+
return pinfo.PoolIndex, nil
}
return -1, toObjectErr(errFileNotFound, bucket, object)
}
-func (z *erasureServerPools) getPoolIdxExistingNoLock(ctx context.Context, bucket, object string) (idx int, err error) {
- return z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{NoLock: true})
-}
-
-// getPoolIdxExisting returns the (first) found object pool index containing an object.
+// getPoolIdxExistingNoLock returns the (first) found object pool index containing an object.
// If the object exists, but the latest version is a delete marker, the index with it is still returned.
// If the object does not exist ObjectNotFound error is returned.
// If any other error is found, it is returned.
// The check is skipped if there is only one zone, and 0, nil is always returned in that case.
-func (z *erasureServerPools) getPoolIdxExisting(ctx context.Context, bucket, object string) (idx int, err error) {
- return z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{})
+func (z *erasureServerPools) getPoolIdxExistingNoLock(ctx context.Context, bucket, object string) (idx int, err error) {
+ return z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{
+ NoLock: true,
+ Mutate: true,
+ })
}
func (z *erasureServerPools) getPoolIdxNoLock(ctx context.Context, bucket, object string, size int64) (idx int, err error) {
@@ -369,9 +406,10 @@ func (z *erasureServerPools) getPoolIdxNoLock(ctx context.Context, bucket, objec
}
// getPoolIdx returns the found previous object and its corresponding pool idx,
-// if none are found falls back to most available space pool.
+// if none are found falls back to most available space pool, this function is
+// designed to be only used by PutObject, CopyObject (newObject creation) and NewMultipartUpload.
func (z *erasureServerPools) getPoolIdx(ctx context.Context, bucket, object string, size int64) (idx int, err error) {
- idx, err = z.getPoolIdxExisting(ctx, bucket, object)
+ idx, err = z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{Mutate: true})
if err != nil && !isErrObjectNotFound(err) {
return idx, err
}
@@ -847,7 +885,11 @@ func (z *erasureServerPools) PutObject(ctx context.Context, bucket string, objec
}
func (z *erasureServerPools) deletePrefix(ctx context.Context, bucket string, prefix string) error {
- for _, zone := range z.serverPools {
+ for idx, zone := range z.serverPools {
+ if z.poolMeta.IsSuspended(idx) {
+ logger.LogIf(ctx, fmt.Errorf("pool %d is suspended, all writes are suspended", idx+1))
+ continue
+ }
_, err := zone.DeleteObject(ctx, bucket, prefix, ObjectOptions{DeletePrefix: true})
if err != nil {
return err
@@ -871,7 +913,8 @@ func (z *erasureServerPools) DeleteObject(ctx context.Context, bucket string, ob
return z.serverPools[0].DeleteObject(ctx, bucket, object, opts)
}
- idx, err := z.getPoolIdxExisting(ctx, bucket, object)
+ opts.Mutate = true
+ idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
if err != nil {
return objInfo, err
}
@@ -2006,7 +2049,8 @@ func (z *erasureServerPools) PutObjectMetadata(ctx context.Context, bucket, obje
}
// We don't know the size here set 1GiB atleast.
- idx, err := z.getPoolIdxExisting(ctx, bucket, object)
+ opts.Mutate = true
+ idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
if err != nil {
return ObjectInfo{}, err
}
@@ -2022,7 +2066,8 @@ func (z *erasureServerPools) PutObjectTags(ctx context.Context, bucket, object s
}
// We don't know the size here set 1GiB atleast.
- idx, err := z.getPoolIdxExisting(ctx, bucket, object)
+ opts.Mutate = true
+ idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
if err != nil {
return ObjectInfo{}, err
}
@@ -2037,7 +2082,8 @@ func (z *erasureServerPools) DeleteObjectTags(ctx context.Context, bucket, objec
return z.serverPools[0].DeleteObjectTags(ctx, bucket, object, opts)
}
- idx, err := z.getPoolIdxExisting(ctx, bucket, object)
+ opts.Mutate = true
+ idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
if err != nil {
return ObjectInfo{}, err
}
@@ -2052,7 +2098,8 @@ func (z *erasureServerPools) GetObjectTags(ctx context.Context, bucket, object s
return z.serverPools[0].GetObjectTags(ctx, bucket, object, opts)
}
- idx, err := z.getPoolIdxExisting(ctx, bucket, object)
+ opts.Mutate = false
+ idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
if err != nil {
return nil, err
}
@@ -2067,7 +2114,8 @@ func (z *erasureServerPools) TransitionObject(ctx context.Context, bucket, objec
return z.serverPools[0].TransitionObject(ctx, bucket, object, opts)
}
- idx, err := z.getPoolIdxExisting(ctx, bucket, object)
+ opts.Mutate = true
+ idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
if err != nil {
return err
}
@@ -2082,7 +2130,8 @@ func (z *erasureServerPools) RestoreTransitionedObject(ctx context.Context, buck
return z.serverPools[0].RestoreTransitionedObject(ctx, bucket, object, opts)
}
- idx, err := z.getPoolIdxExisting(ctx, bucket, object)
+ opts.Mutate = true
+ idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts)
if err != nil {
return err
}
diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go
index fb848a75a..edc2e1cff 100644
--- a/cmd/erasure-sets.go
+++ b/cmd/erasure-sets.go
@@ -71,7 +71,7 @@ type erasureSets struct {
erasureLockOwner string
// List of endpoints provided on the command line.
- endpoints Endpoints
+ endpoints PoolEndpoints
// String version of all the endpoints, an optimization
// to avoid url.String() conversion taking CPU on
@@ -208,7 +208,7 @@ func (s *erasureSets) connectDisks() {
var wg sync.WaitGroup
setsJustConnected := make([]bool, s.setCount)
diskMap := s.getDiskMap()
- for _, endpoint := range s.endpoints {
+ for _, endpoint := range s.endpoints.Endpoints {
if isEndpointConnectionStable(diskMap, endpoint, s.lastConnectDisksOpTime) {
continue
}
@@ -328,7 +328,7 @@ func (s *erasureSets) GetEndpoints(setIndex int) func() []Endpoint {
eps := make([]Endpoint, s.setDriveCount)
for i := 0; i < s.setDriveCount; i++ {
- eps[i] = s.endpoints[setIndex*s.setDriveCount+i]
+ eps[i] = s.endpoints.Endpoints[setIndex*s.setDriveCount+i]
}
return eps
}
@@ -350,12 +350,12 @@ func (s *erasureSets) GetDisks(setIndex int) func() []StorageAPI {
const defaultMonitorConnectEndpointInterval = defaultMonitorNewDiskInterval + time.Second*5
// Initialize new set of erasure coded sets.
-func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []StorageAPI, format *formatErasureV3, defaultParityCount, poolIdx int) (*erasureSets, error) {
+func newErasureSets(ctx context.Context, endpoints PoolEndpoints, storageDisks []StorageAPI, format *formatErasureV3, defaultParityCount, poolIdx int) (*erasureSets, error) {
setCount := len(format.Erasure.Sets)
setDriveCount := len(format.Erasure.Sets[0])
- endpointStrings := make([]string, len(endpoints))
- for i, endpoint := range endpoints {
+ endpointStrings := make([]string, len(endpoints.Endpoints))
+ for i, endpoint := range endpoints.Endpoints {
endpointStrings[i] = endpoint.String()
}
@@ -399,7 +399,7 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
}
erasureLockers := map[string]dsync.NetLocker{}
- for _, endpoint := range endpoints {
+ for _, endpoint := range endpoints.Endpoints {
if _, ok := erasureLockers[endpoint.Host]; !ok {
erasureLockers[endpoint.Host] = newLockAPI(endpoint)
}
@@ -408,8 +408,8 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
for i := 0; i < setCount; i++ {
lockerEpSet := set.NewStringSet()
for j := 0; j < setDriveCount; j++ {
- endpoint := endpoints[i*setDriveCount+j]
- // Only add lockers per endpoint.
+ endpoint := endpoints.Endpoints[i*setDriveCount+j]
+ // Only add lockers only one per endpoint and per erasure set.
if locker, ok := erasureLockers[endpoint.Host]; ok && !lockerEpSet.Contains(endpoint.Host) {
lockerEpSet.Add(endpoint.Host)
s.erasureLockers[i] = append(s.erasureLockers[i], locker)
@@ -1234,7 +1234,7 @@ func markRootDisksAsDown(storageDisks []StorageAPI, errs []error) {
// HealFormat - heals missing `format.json` on fresh unformatted disks.
func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealResultItem, err error) {
- storageDisks, _ := initStorageDisksWithErrorsWithoutHealthCheck(s.endpoints)
+ storageDisks, _ := initStorageDisksWithErrorsWithoutHealthCheck(s.endpoints.Endpoints)
defer func(storageDisks []StorageAPI) {
if err != nil {
@@ -1264,7 +1264,7 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
}
// Fetch all the drive info status.
- beforeDrives := formatsToDrivesInfo(s.endpoints, formats, sErrs)
+ beforeDrives := formatsToDrivesInfo(s.endpoints.Endpoints, formats, sErrs)
res.After.Drives = make([]madmin.HealDriveInfo, len(beforeDrives))
res.Before.Drives = make([]madmin.HealDriveInfo, len(beforeDrives))
diff --git a/cmd/erasure-sets_test.go b/cmd/erasure-sets_test.go
index 84ee2a738..57b826d93 100644
--- a/cmd/erasure-sets_test.go
+++ b/cmd/erasure-sets_test.go
@@ -190,7 +190,8 @@ func TestNewErasureSets(t *testing.T) {
t.Fatalf("Unable to format disks for erasure, %s", err)
}
- if _, err := newErasureSets(ctx, endpoints, storageDisks, format, ecDrivesNoConfig(16), 0); err != nil {
+ ep := PoolEndpoints{Endpoints: endpoints}
+ if _, err := newErasureSets(ctx, ep, storageDisks, format, ecDrivesNoConfig(16), 0); err != nil {
t.Fatalf("Unable to initialize erasure")
}
}
diff --git a/cmd/notification.go b/cmd/notification.go
index a5c6af8ba..8ca7ef365 100644
--- a/cmd/notification.go
+++ b/cmd/notification.go
@@ -616,6 +616,26 @@ func (sys *NotificationSys) GetClusterBucketStats(ctx context.Context, bucketNam
return bucketStats
}
+// ReloadPoolMeta reloads on disk updates on pool metadata
+func (sys *NotificationSys) ReloadPoolMeta(ctx context.Context) {
+ ng := WithNPeers(len(sys.peerClients))
+ for idx, client := range sys.peerClients {
+ if client == nil {
+ continue
+ }
+ client := client
+ ng.Go(ctx, func() error {
+ return client.ReloadPoolMeta(ctx)
+ }, idx, *client.host)
+ }
+ for _, nErr := range ng.Wait() {
+ reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", nErr.Host.String())
+ if nErr.Err != nil {
+ logger.LogIf(logger.SetReqInfo(ctx, reqInfo), nErr.Err)
+ }
+ }
+}
+
// LoadTransitionTierConfig notifies remote peers to load their remote tier
// configs from config store.
func (sys *NotificationSys) LoadTransitionTierConfig(ctx context.Context) {
diff --git a/cmd/object-api-datatypes.go b/cmd/object-api-datatypes.go
index 6bf4bfd45..84c759527 100644
--- a/cmd/object-api-datatypes.go
+++ b/cmd/object-api-datatypes.go
@@ -46,6 +46,16 @@ const (
// StorageInfo - represents total capacity of underlying storage.
type StorageInfo = madmin.StorageInfo
+// TotalUsableCapacity - total usable capacity
+func TotalUsableCapacity(s StorageInfo) int64 {
+ return int64(GetTotalUsableCapacity(s.Disks, s))
+}
+
+// TotalUsableCapacityFree - total usable capacity free
+func TotalUsableCapacityFree(s StorageInfo) int64 {
+ return int64(GetTotalUsableCapacityFree(s.Disks, s))
+}
+
// objectHistogramInterval is an interval that will be
// used to report the histogram of objects data sizes
type objectHistogramInterval struct {
diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go
index 576d924d4..fa7714148 100644
--- a/cmd/object-api-interface.go
+++ b/cmd/object-api-interface.go
@@ -71,6 +71,9 @@ type ObjectOptions struct {
// Use the maximum parity (N/2), used when saving server configuration files
MaxParity bool
+
+ // Mutate set to 'true' if the call is namespace mutation call
+ Mutate bool
}
// ExpirationOptions represents object options for object expiration at objectLayer.
diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go
index 6b9571b0f..896ae5799 100644
--- a/cmd/peer-rest-client.go
+++ b/cmd/peer-rest-client.go
@@ -767,6 +767,16 @@ func (client *peerRESTClient) UpdateMetacacheListing(ctx context.Context, m meta
return resp, msgp.Decode(respBody, &resp)
}
+func (client *peerRESTClient) ReloadPoolMeta(ctx context.Context) error {
+ respBody, err := client.callWithContext(ctx, peerRESTMethodReloadPoolMeta, nil, nil, 0)
+ if err != nil {
+ logger.LogIf(ctx, err)
+ return err
+ }
+ defer http.DrainBody(respBody)
+ return nil
+}
+
func (client *peerRESTClient) LoadTransitionTierConfig(ctx context.Context) error {
respBody, err := client.callWithContext(ctx, peerRESTMethodLoadTransitionTierConfig, nil, nil, 0)
if err != nil {
diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go
index e86e201ab..b6efdd3bf 100644
--- a/cmd/peer-rest-common.go
+++ b/cmd/peer-rest-common.go
@@ -18,7 +18,7 @@
package cmd
const (
- peerRESTVersion = "v17" // Add "storage-class" option for SpeedTest
+ peerRESTVersion = "v18" // Add LoadPoolMeta
peerRESTVersionPrefix = SlashSeparator + peerRESTVersion
peerRESTPrefix = minioReservedBucketPath + "/peer"
peerRESTPath = peerRESTPrefix + peerRESTVersionPrefix
@@ -67,6 +67,7 @@ const (
peerRESTMethodLoadTransitionTierConfig = "/loadtransitiontierconfig"
peerRESTMethodSpeedtest = "/speedtest"
peerRESTMethodReloadSiteReplicationConfig = "/reloadsitereplicationconfig"
+ peerRESTMethodReloadPoolMeta = "/reloadpoolmeta"
)
const (
diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go
index 70d17e5ea..a5815924c 100644
--- a/cmd/peer-rest-server.go
+++ b/cmd/peer-rest-server.go
@@ -1023,6 +1023,27 @@ func (s *peerRESTServer) BackgroundHealStatusHandler(w http.ResponseWriter, r *h
logger.LogIf(ctx, gob.NewEncoder(w).Encode(state))
}
+func (s *peerRESTServer) ReloadPoolMetaHandler(w http.ResponseWriter, r *http.Request) {
+ if !s.IsValid(w, r) {
+ s.writeErrorResponse(w, errors.New("invalid request"))
+ return
+ }
+ objAPI := newObjectLayerFn()
+ if objAPI == nil {
+ s.writeErrorResponse(w, errServerNotInitialized)
+ return
+ }
+
+ pools, ok := objAPI.(*erasureServerPools)
+ if !ok {
+ return
+ }
+ if err := pools.ReloadPoolMeta(r.Context()); err != nil {
+ s.writeErrorResponse(w, err)
+ return
+ }
+}
+
func (s *peerRESTServer) LoadTransitionTierConfigHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
s.writeErrorResponse(w, errors.New("invalid request"))
@@ -1353,4 +1374,5 @@ func registerPeerRESTHandlers(router *mux.Router) {
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadTransitionTierConfig).HandlerFunc(httpTraceHdrs(server.LoadTransitionTierConfigHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSpeedtest).HandlerFunc(httpTraceHdrs(server.SpeedtestHandler))
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadSiteReplicationConfig).HandlerFunc(httpTraceHdrs(server.ReloadSiteReplicationConfigHandler))
+ subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadPoolMeta).HandlerFunc(httpTraceHdrs(server.ReloadPoolMetaHandler))
}
diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go
index af85153d4..e571a118f 100644
--- a/cmd/storage-datatypes.go
+++ b/cmd/storage-datatypes.go
@@ -72,11 +72,12 @@ type FilesInfo struct {
IsTruncated bool
}
-// FilesInfoVersions represents a list of file versions,
-// additionally indicates if the list is last.
-type FilesInfoVersions struct {
- FilesVersions []FileInfoVersions
- IsTruncated bool
+// Size returns size of all versions for the object 'Name'
+func (f FileInfoVersions) Size() (size int64) {
+ for _, v := range f.Versions {
+ size += v.Size
+ }
+ return size
}
// FileInfoVersions represent a list of versions for a given file.
@@ -235,7 +236,9 @@ func (fi *FileInfo) SetInlineData() {
}
// VersionPurgeStatusKey denotes purge status in metadata
-const VersionPurgeStatusKey = "purgestatus"
+const (
+ VersionPurgeStatusKey = ReservedMetadataPrefixLower + "purgestatus"
+)
// newFileInfo - initializes new FileInfo, allocates a fresh erasure info.
func newFileInfo(object string, dataBlocks, parityBlocks int) (fi FileInfo) {
diff --git a/cmd/storage-datatypes_gen.go b/cmd/storage-datatypes_gen.go
index 317b2c1d0..5d40784b3 100644
--- a/cmd/storage-datatypes_gen.go
+++ b/cmd/storage-datatypes_gen.go
@@ -1530,178 +1530,6 @@ func (z *FilesInfo) Msgsize() (s int) {
return
}
-// DecodeMsg implements msgp.Decodable
-func (z *FilesInfoVersions) DecodeMsg(dc *msgp.Reader) (err error) {
- var field []byte
- _ = field
- var zb0001 uint32
- zb0001, err = dc.ReadMapHeader()
- if err != nil {
- err = msgp.WrapError(err)
- return
- }
- for zb0001 > 0 {
- zb0001--
- field, err = dc.ReadMapKeyPtr()
- if err != nil {
- err = msgp.WrapError(err)
- return
- }
- switch msgp.UnsafeString(field) {
- case "FilesVersions":
- var zb0002 uint32
- zb0002, err = dc.ReadArrayHeader()
- if err != nil {
- err = msgp.WrapError(err, "FilesVersions")
- return
- }
- if cap(z.FilesVersions) >= int(zb0002) {
- z.FilesVersions = (z.FilesVersions)[:zb0002]
- } else {
- z.FilesVersions = make([]FileInfoVersions, zb0002)
- }
- for za0001 := range z.FilesVersions {
- err = z.FilesVersions[za0001].DecodeMsg(dc)
- if err != nil {
- err = msgp.WrapError(err, "FilesVersions", za0001)
- return
- }
- }
- case "IsTruncated":
- z.IsTruncated, err = dc.ReadBool()
- if err != nil {
- err = msgp.WrapError(err, "IsTruncated")
- return
- }
- default:
- err = dc.Skip()
- if err != nil {
- err = msgp.WrapError(err)
- return
- }
- }
- }
- return
-}
-
-// EncodeMsg implements msgp.Encodable
-func (z *FilesInfoVersions) EncodeMsg(en *msgp.Writer) (err error) {
- // map header, size 2
- // write "FilesVersions"
- err = en.Append(0x82, 0xad, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73)
- if err != nil {
- return
- }
- err = en.WriteArrayHeader(uint32(len(z.FilesVersions)))
- if err != nil {
- err = msgp.WrapError(err, "FilesVersions")
- return
- }
- for za0001 := range z.FilesVersions {
- err = z.FilesVersions[za0001].EncodeMsg(en)
- if err != nil {
- err = msgp.WrapError(err, "FilesVersions", za0001)
- return
- }
- }
- // write "IsTruncated"
- err = en.Append(0xab, 0x49, 0x73, 0x54, 0x72, 0x75, 0x6e, 0x63, 0x61, 0x74, 0x65, 0x64)
- if err != nil {
- return
- }
- err = en.WriteBool(z.IsTruncated)
- if err != nil {
- err = msgp.WrapError(err, "IsTruncated")
- return
- }
- return
-}
-
-// MarshalMsg implements msgp.Marshaler
-func (z *FilesInfoVersions) MarshalMsg(b []byte) (o []byte, err error) {
- o = msgp.Require(b, z.Msgsize())
- // map header, size 2
- // string "FilesVersions"
- o = append(o, 0x82, 0xad, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73)
- o = msgp.AppendArrayHeader(o, uint32(len(z.FilesVersions)))
- for za0001 := range z.FilesVersions {
- o, err = z.FilesVersions[za0001].MarshalMsg(o)
- if err != nil {
- err = msgp.WrapError(err, "FilesVersions", za0001)
- return
- }
- }
- // string "IsTruncated"
- o = append(o, 0xab, 0x49, 0x73, 0x54, 0x72, 0x75, 0x6e, 0x63, 0x61, 0x74, 0x65, 0x64)
- o = msgp.AppendBool(o, z.IsTruncated)
- return
-}
-
-// UnmarshalMsg implements msgp.Unmarshaler
-func (z *FilesInfoVersions) UnmarshalMsg(bts []byte) (o []byte, err error) {
- var field []byte
- _ = field
- var zb0001 uint32
- zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
- if err != nil {
- err = msgp.WrapError(err)
- return
- }
- for zb0001 > 0 {
- zb0001--
- field, bts, err = msgp.ReadMapKeyZC(bts)
- if err != nil {
- err = msgp.WrapError(err)
- return
- }
- switch msgp.UnsafeString(field) {
- case "FilesVersions":
- var zb0002 uint32
- zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts)
- if err != nil {
- err = msgp.WrapError(err, "FilesVersions")
- return
- }
- if cap(z.FilesVersions) >= int(zb0002) {
- z.FilesVersions = (z.FilesVersions)[:zb0002]
- } else {
- z.FilesVersions = make([]FileInfoVersions, zb0002)
- }
- for za0001 := range z.FilesVersions {
- bts, err = z.FilesVersions[za0001].UnmarshalMsg(bts)
- if err != nil {
- err = msgp.WrapError(err, "FilesVersions", za0001)
- return
- }
- }
- case "IsTruncated":
- z.IsTruncated, bts, err = msgp.ReadBoolBytes(bts)
- if err != nil {
- err = msgp.WrapError(err, "IsTruncated")
- return
- }
- default:
- bts, err = msgp.Skip(bts)
- if err != nil {
- err = msgp.WrapError(err)
- return
- }
- }
- }
- o = bts
- return
-}
-
-// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
-func (z *FilesInfoVersions) Msgsize() (s int) {
- s = 1 + 14 + msgp.ArrayHeaderSize
- for za0001 := range z.FilesVersions {
- s += z.FilesVersions[za0001].Msgsize()
- }
- s += 12 + msgp.BoolSize
- return
-}
-
// DecodeMsg implements msgp.Decodable
func (z *VolInfo) DecodeMsg(dc *msgp.Reader) (err error) {
var zb0001 uint32
diff --git a/cmd/storage-datatypes_gen_test.go b/cmd/storage-datatypes_gen_test.go
index 3bbe8555b..1495d113b 100644
--- a/cmd/storage-datatypes_gen_test.go
+++ b/cmd/storage-datatypes_gen_test.go
@@ -574,119 +574,6 @@ func BenchmarkDecodeFilesInfo(b *testing.B) {
}
}
-func TestMarshalUnmarshalFilesInfoVersions(t *testing.T) {
- v := FilesInfoVersions{}
- bts, err := v.MarshalMsg(nil)
- if err != nil {
- t.Fatal(err)
- }
- left, err := v.UnmarshalMsg(bts)
- if err != nil {
- t.Fatal(err)
- }
- if len(left) > 0 {
- t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left)
- }
-
- left, err = msgp.Skip(bts)
- if err != nil {
- t.Fatal(err)
- }
- if len(left) > 0 {
- t.Errorf("%d bytes left over after Skip(): %q", len(left), left)
- }
-}
-
-func BenchmarkMarshalMsgFilesInfoVersions(b *testing.B) {
- v := FilesInfoVersions{}
- b.ReportAllocs()
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- v.MarshalMsg(nil)
- }
-}
-
-func BenchmarkAppendMsgFilesInfoVersions(b *testing.B) {
- v := FilesInfoVersions{}
- bts := make([]byte, 0, v.Msgsize())
- bts, _ = v.MarshalMsg(bts[0:0])
- b.SetBytes(int64(len(bts)))
- b.ReportAllocs()
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- bts, _ = v.MarshalMsg(bts[0:0])
- }
-}
-
-func BenchmarkUnmarshalFilesInfoVersions(b *testing.B) {
- v := FilesInfoVersions{}
- bts, _ := v.MarshalMsg(nil)
- b.ReportAllocs()
- b.SetBytes(int64(len(bts)))
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- _, err := v.UnmarshalMsg(bts)
- if err != nil {
- b.Fatal(err)
- }
- }
-}
-
-func TestEncodeDecodeFilesInfoVersions(t *testing.T) {
- v := FilesInfoVersions{}
- var buf bytes.Buffer
- msgp.Encode(&buf, &v)
-
- m := v.Msgsize()
- if buf.Len() > m {
- t.Log("WARNING: TestEncodeDecodeFilesInfoVersions Msgsize() is inaccurate")
- }
-
- vn := FilesInfoVersions{}
- err := msgp.Decode(&buf, &vn)
- if err != nil {
- t.Error(err)
- }
-
- buf.Reset()
- msgp.Encode(&buf, &v)
- err = msgp.NewReader(&buf).Skip()
- if err != nil {
- t.Error(err)
- }
-}
-
-func BenchmarkEncodeFilesInfoVersions(b *testing.B) {
- v := FilesInfoVersions{}
- var buf bytes.Buffer
- msgp.Encode(&buf, &v)
- b.SetBytes(int64(buf.Len()))
- en := msgp.NewWriter(msgp.Nowhere)
- b.ReportAllocs()
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- v.EncodeMsg(en)
- }
- en.Flush()
-}
-
-func BenchmarkDecodeFilesInfoVersions(b *testing.B) {
- v := FilesInfoVersions{}
- var buf bytes.Buffer
- msgp.Encode(&buf, &v)
- b.SetBytes(int64(buf.Len()))
- rd := msgp.NewEndlessReader(buf.Bytes(), b)
- dc := msgp.NewReader(rd)
- b.ReportAllocs()
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- err := v.DecodeMsg(dc)
- if err != nil {
- b.Fatal(err)
- }
- }
-}
-
func TestMarshalUnmarshalVolInfo(t *testing.T) {
v := VolInfo{}
bts, err := v.MarshalMsg(nil)
diff --git a/docs/distributed/DECOMMISSION.md b/docs/distributed/DECOMMISSION.md
new file mode 100644
index 000000000..61c619ad0
--- /dev/null
+++ b/docs/distributed/DECOMMISSION.md
@@ -0,0 +1,73 @@
+## Decommissioning
+
+### How to decommission a pool?
+```
+λ mc admin decommission start alias/ http://minio{1...2}/data{1...4}
+```
+
+### Status decommissioning a pool
+
+#### Decommissioning without args lists all pools
+```
+λ mc admin decommission status alias/
+┌─────┬─────────────────────────────────┬──────────────────────────────────┬────────┐
+│ ID │ Pools │ Capacity │ Status │
+│ 1st │ http://minio{1...2}/data{1...4} │ 439 GiB (used) / 561 GiB (total) │ Active │
+│ 2nd │ http://minio{3...4}/data{1...4} │ 329 GiB (used) / 421 GiB (total) │ Active │
+└─────┴─────────────────────────────────┴──────────────────────────────────┴────────┘
+```
+
+#### Decommissioning status
+```
+λ mc admin decommission status alias/ http://minio{1...2}/data{1...4}
+Progress: ===================> [1GiB/sec] [15%] [4TiB/50TiB]
+Time Remaining: 4 hours (started 3 hours ago)
+```
+
+#### A pool not under decommissioning will throw an error
+```
+λ mc admin decommission status alias/ http://minio{1...2}/data{1...4}
+ERROR: This pool is not scheduled for decommissioning currently.
+```
+
+### Canceling a decommission?
+Stop an on-going decommission in progress, mainly used in situations when the load may be
+too high and you may want to schedule the decommission at a later point in time.
+
+`mc admin decommission cancel` without an argument, lists out any on-going decommission in progress.
+
+```
+λ mc admin decommission cancel alias/
+┌─────┬─────────────────────────────────┬──────────────────────────────────┬──────────┐
+│ ID │ Pools │ Capacity │ Status │
+│ 1st │ http://minio{1...2}/data{1...4} │ 439 GiB (used) / 561 GiB (total) │ Draining │
+└─────┴─────────────────────────────────┴──────────────────────────────────┴──────────┘
+```
+
+> NOTE: Canceled decommission will not make the pool active again, since we might have
+> Potentially partial duplicate content on the other pools, to avoid this scenario be
+> absolutely sure to start decommissioning as a planned activity.
+
+```
+λ mc admin decommission cancel alias/ http://minio{1...2}/data{1...4}
+┌─────┬─────────────────────────────────┬──────────────────────────────────┬────────────────────┐
+│ ID │ Pools │ Capacity │ Status │
+│ 1st │ http://minio{1...2}/data{1...4} │ 439 GiB (used) / 561 GiB (total) │ Draining(Canceled) │
+└─────┴─────────────────────────────────┴──────────────────────────────────┴────────────────────┘
+```
+
+If for some reason decommission fails in between, the `status` will indicate decommission as failed instead.
+```
+λ mc admin decommission status alias/
+┌─────┬─────────────────────────────────┬──────────────────────────────────┬──────────────────┐
+│ ID │ Pools │ Capacity │ Status │
+│ 1st │ http://minio{1...2}/data{1...4} │ 439 GiB (used) / 561 GiB (total) │ Draining(Failed) │
+│ 2nd │ http://minio{3...4}/data{1...4} │ 329 GiB (used) / 421 GiB (total) │ Active │
+└─────┴─────────────────────────────────┴──────────────────────────────────┴──────────────────┘
+```
+
+### Restart a canceled or failed decommission?
+
+```
+λ mc admin decommission start alias/ http://minio{1...2}/data{1...4}
+```
diff --git a/internal/bucket/lifecycle/lifecycle.go b/internal/bucket/lifecycle/lifecycle.go
index 570fa5a94..594726415 100644
--- a/internal/bucket/lifecycle/lifecycle.go
+++ b/internal/bucket/lifecycle/lifecycle.go
@@ -74,6 +74,16 @@ type Lifecycle struct {
Rules []Rule `xml:"Rule"`
}
+// HasTransition returns 'true' if lifecycle document has Transition enabled.
+func (lc Lifecycle) HasTransition() bool {
+ for _, rule := range lc.Rules {
+ if rule.Transition.IsEnabled() {
+ return true
+ }
+ }
+ return false
+}
+
// UnmarshalXML - decodes XML data.
func (lc *Lifecycle) UnmarshalXML(d *xml.Decoder, start xml.StartElement) (err error) {
switch start.Name.Local {
diff --git a/internal/bucket/lifecycle/transition.go b/internal/bucket/lifecycle/transition.go
index ed54bd129..948510d01 100644
--- a/internal/bucket/lifecycle/transition.go
+++ b/internal/bucket/lifecycle/transition.go
@@ -106,6 +106,11 @@ type Transition struct {
set bool
}
+// IsEnabled returns if transition is enabled.
+func (t Transition) IsEnabled() bool {
+ return t.set
+}
+
// MarshalXML encodes transition field into an XML form.
func (t Transition) MarshalXML(enc *xml.Encoder, start xml.StartElement) error {
if !t.set {