From 76b21de0c62ab02862c204d3a5b1bb75f17a55b7 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 10 Jan 2022 09:07:49 -0800 Subject: [PATCH] feat: decommission feature for pools (#14012) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ``` λ mc admin decommission start alias/ http://minio{1...2}/data{1...4} ``` ``` λ mc admin decommission status alias/ ┌─────┬─────────────────────────────────┬──────────────────────────────────┬────────┐ │ ID │ Pools │ Capacity │ Status │ │ 1st │ http://minio{1...2}/data{1...4} │ 439 GiB (used) / 561 GiB (total) │ Active │ │ 2nd │ http://minio{3...4}/data{1...4} │ 329 GiB (used) / 421 GiB (total) │ Active │ └─────┴─────────────────────────────────┴──────────────────────────────────┴────────┘ ``` ``` λ mc admin decommission status alias/ http://minio{1...2}/data{1...4} Progress: ===================> [1GiB/sec] [15%] [4TiB/50TiB] Time Remaining: 4 hours (started 3 hours ago) ``` ``` λ mc admin decommission status alias/ http://minio{1...2}/data{1...4} ERROR: This pool is not scheduled for decommissioning currently. ``` ``` λ mc admin decommission cancel alias/ ┌─────┬─────────────────────────────────┬──────────────────────────────────┬──────────┐ │ ID │ Pools │ Capacity │ Status │ │ 1st │ http://minio{1...2}/data{1...4} │ 439 GiB (used) / 561 GiB (total) │ Draining │ └─────┴─────────────────────────────────┴──────────────────────────────────┴──────────┘ ``` > NOTE: Canceled decommission will not make the pool active again, since we might have > Potentially partial duplicate content on the other pools, to avoid this scenario be > very sure to start decommissioning as a planned activity. ``` λ mc admin decommission cancel alias/ http://minio{1...2}/data{1...4} ┌─────┬─────────────────────────────────┬──────────────────────────────────┬────────────────────┐ │ ID │ Pools │ Capacity │ Status │ │ 1st │ http://minio{1...2}/data{1...4} │ 439 GiB (used) / 561 GiB (total) │ Draining(Canceled) │ └─────┴─────────────────────────────────┴──────────────────────────────────┴────────────────────┘ ``` --- cmd/admin-handlers-pools.go | 179 +++++ cmd/admin-router.go | 8 +- cmd/endpoint-ellipses.go | 3 + cmd/endpoint.go | 21 + cmd/erasure-common.go | 32 + cmd/erasure-object.go | 3 +- cmd/erasure-server-pool-decom.go | 919 +++++++++++++++++++++ cmd/erasure-server-pool-decom_gen.go | 932 ++++++++++++++++++++++ cmd/erasure-server-pool-decom_gen_test.go | 349 ++++++++ cmd/erasure-server-pool.go | 93 ++- cmd/erasure-sets.go | 22 +- cmd/erasure-sets_test.go | 3 +- cmd/notification.go | 20 + cmd/object-api-datatypes.go | 10 + cmd/object-api-interface.go | 3 + cmd/peer-rest-client.go | 10 + cmd/peer-rest-common.go | 3 +- cmd/peer-rest-server.go | 22 + cmd/storage-datatypes.go | 15 +- cmd/storage-datatypes_gen.go | 172 ---- cmd/storage-datatypes_gen_test.go | 113 --- docs/distributed/DECOMMISSION.md | 73 ++ internal/bucket/lifecycle/lifecycle.go | 10 + internal/bucket/lifecycle/transition.go | 5 + 24 files changed, 2692 insertions(+), 328 deletions(-) create mode 100644 cmd/admin-handlers-pools.go create mode 100644 cmd/erasure-server-pool-decom.go create mode 100644 cmd/erasure-server-pool-decom_gen.go create mode 100644 cmd/erasure-server-pool-decom_gen_test.go create mode 100644 docs/distributed/DECOMMISSION.md diff --git a/cmd/admin-handlers-pools.go b/cmd/admin-handlers-pools.go new file mode 100644 index 000000000..3b3107905 --- /dev/null +++ b/cmd/admin-handlers-pools.go @@ -0,0 +1,179 @@ +// Copyright (c) 2015-2021 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "encoding/json" + "net/http" + + "github.com/gorilla/mux" + "github.com/minio/minio/internal/logger" + iampolicy "github.com/minio/pkg/iam/policy" +) + +func (a adminAPIHandlers) StartDecommission(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "StartDecommission") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.DecommissionAdminAction) + if objectAPI == nil { + return + } + + // Legacy args style such as non-ellipses style is not supported with this API. + if globalEndpoints.Legacy() { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } + + pools, ok := objectAPI.(*erasureServerPools) + if !ok { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } + + vars := mux.Vars(r) + v := vars["pool"] + + idx := globalEndpoints.GetPoolIdx(v) + if idx == -1 { + // We didn't find any matching pools, invalid input + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errInvalidArgument), r.URL) + return + } + + if err := pools.Decommission(r.Context(), idx); err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } +} + +func (a adminAPIHandlers) CancelDecommission(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "CancelDecommission") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.DecommissionAdminAction) + if objectAPI == nil { + return + } + + // Legacy args style such as non-ellipses style is not supported with this API. + if globalEndpoints.Legacy() { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } + + pools, ok := objectAPI.(*erasureServerPools) + if !ok { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } + + vars := mux.Vars(r) + v := vars["pool"] + + idx := globalEndpoints.GetPoolIdx(v) + if idx == -1 { + // We didn't find any matching pools, invalid input + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errInvalidArgument), r.URL) + return + } + + if err := pools.DecommissionCancel(ctx, idx); err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } +} + +func (a adminAPIHandlers) StatusPool(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "StatusPool") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.ServerInfoAdminAction, iampolicy.DecommissionAdminAction) + if objectAPI == nil { + return + } + + // Legacy args style such as non-ellipses style is not supported with this API. + if globalEndpoints.Legacy() { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } + + pools, ok := objectAPI.(*erasureServerPools) + if !ok { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } + + vars := mux.Vars(r) + v := vars["pool"] + + idx := globalEndpoints.GetPoolIdx(v) + if idx == -1 { + // We didn't find any matching pools, invalid input + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errInvalidArgument), r.URL) + return + } + + status, err := pools.Status(r.Context(), idx) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + + logger.LogIf(r.Context(), json.NewEncoder(w).Encode(&status)) +} + +func (a adminAPIHandlers) ListPools(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "ListPools") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.ServerInfoAdminAction, iampolicy.DecommissionAdminAction) + if objectAPI == nil { + return + } + + // Legacy args style such as non-ellipses style is not supported with this API. + if globalEndpoints.Legacy() { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } + + pools, ok := objectAPI.(*erasureServerPools) + if !ok { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } + + poolsStatus := make([]PoolStatus, len(globalEndpoints)) + for idx := range globalEndpoints { + status, err := pools.Status(r.Context(), idx) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + poolsStatus[idx] = status + } + + logger.LogIf(r.Context(), json.NewEncoder(w).Encode(poolsStatus)) +} diff --git a/cmd/admin-router.go b/cmd/admin-router.go index 44b99221f..6abdb5e4c 100644 --- a/cmd/admin-router.go +++ b/cmd/admin-router.go @@ -74,8 +74,14 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) { adminRouter.Methods(http.MethodPost).Path(adminVersion + "/heal/").HandlerFunc(gz(httpTraceAll(adminAPI.HealHandler))) adminRouter.Methods(http.MethodPost).Path(adminVersion + "/heal/{bucket}").HandlerFunc(gz(httpTraceAll(adminAPI.HealHandler))) adminRouter.Methods(http.MethodPost).Path(adminVersion + "/heal/{bucket}/{prefix:.*}").HandlerFunc(gz(httpTraceAll(adminAPI.HealHandler))) - adminRouter.Methods(http.MethodPost).Path(adminVersion + "/background-heal/status").HandlerFunc(gz(httpTraceAll(adminAPI.BackgroundHealStatusHandler))) + + // Pool operations + adminRouter.Methods(http.MethodGet).Path(adminVersion + "/pools/list").HandlerFunc(gz(httpTraceAll(adminAPI.ListPools))) + adminRouter.Methods(http.MethodGet).Path(adminVersion+"/pools/status").HandlerFunc(gz(httpTraceAll(adminAPI.StatusPool))).Queries("pool", "{pool:.*}") + + adminRouter.Methods(http.MethodPost).Path(adminVersion+"/pools/decommission").HandlerFunc(gz(httpTraceAll(adminAPI.StartDecommission))).Queries("pool", "{pool:.*}") + adminRouter.Methods(http.MethodPost).Path(adminVersion+"/pools/cancel").HandlerFunc(gz(httpTraceAll(adminAPI.CancelDecommission))).Queries("pool", "{pool:.*}") } // Profiling operations diff --git a/cmd/endpoint-ellipses.go b/cmd/endpoint-ellipses.go index 8e3398762..eedd1435b 100644 --- a/cmd/endpoint-ellipses.go +++ b/cmd/endpoint-ellipses.go @@ -353,9 +353,11 @@ func createServerEndpoints(serverAddr string, args ...string) ( return nil, -1, err } endpointServerPools = append(endpointServerPools, PoolEndpoints{ + Legacy: true, SetCount: len(setArgs), DrivesPerSet: len(setArgs[0]), Endpoints: endpointList, + CmdLine: strings.Join(args, " "), }) setupType = newSetupType return endpointServerPools, setupType, nil @@ -376,6 +378,7 @@ func createServerEndpoints(serverAddr string, args ...string) ( SetCount: len(setArgs), DrivesPerSet: len(setArgs[0]), Endpoints: endpointList, + CmdLine: arg, }); err != nil { return nil, -1, err } diff --git a/cmd/endpoint.go b/cmd/endpoint.go index e6381d464..d6088ca06 100644 --- a/cmd/endpoint.go +++ b/cmd/endpoint.go @@ -197,14 +197,28 @@ func NewEndpoint(arg string) (ep Endpoint, e error) { // PoolEndpoints represent endpoints in a given pool // along with its setCount and setDriveCount. type PoolEndpoints struct { + // indicates if endpoints are provided in non-ellipses style + Legacy bool SetCount int DrivesPerSet int Endpoints Endpoints + CmdLine string } // EndpointServerPools - list of list of endpoints type EndpointServerPools []PoolEndpoints +// GetPoolIdx return pool index +func (l EndpointServerPools) GetPoolIdx(pool string) int { + for id, ep := range globalEndpoints { + if ep.CmdLine != pool { + continue + } + return id + } + return -1 +} + // GetLocalPoolIdx returns the pool which endpoint belongs to locally. // if ep is remote this code will return -1 poolIndex func (l EndpointServerPools) GetLocalPoolIdx(ep Endpoint) int { @@ -220,6 +234,13 @@ func (l EndpointServerPools) GetLocalPoolIdx(ep Endpoint) int { return -1 } +// Legacy returns 'true' if the MinIO server commandline was +// provided with no ellipses pattern, those are considered +// legacy deployments. +func (l EndpointServerPools) Legacy() bool { + return len(l) == 1 && l[0].Legacy +} + // Add add pool endpoints func (l *EndpointServerPools) Add(zeps PoolEndpoints) error { existSet := set.NewStringSet() diff --git a/cmd/erasure-common.go b/cmd/erasure-common.go index 575192674..9cfbb9808 100644 --- a/cmd/erasure-common.go +++ b/cmd/erasure-common.go @@ -22,6 +22,38 @@ import ( "sync" ) +func (er erasureObjects) getOnlineDisks() (newDisks []StorageAPI) { + disks := er.getDisks() + var wg sync.WaitGroup + var mu sync.Mutex + for _, i := range hashOrder(UTCNow().String(), len(disks)) { + i := i + wg.Add(1) + go func() { + defer wg.Done() + if disks[i-1] == nil { + return + } + di, err := disks[i-1].DiskInfo(context.Background()) + if err != nil || di.Healing { + // - Do not consume disks which are not reachable + // unformatted or simply not accessible for some reason. + // + // - Do not consume disks which are being healed + // + // - Future: skip busy disks + return + } + + mu.Lock() + newDisks = append(newDisks, disks[i-1]) + mu.Unlock() + }() + } + wg.Wait() + return newDisks +} + func (er erasureObjects) getLoadBalancedLocalDisks() (newDisks []StorageAPI) { disks := er.getDisks() // Based on the random shuffling return back randomized disks. diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 38183ae24..6b6cb721e 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -408,7 +408,8 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s } if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil { - if reducedErr == errErasureReadQuorum && bucket != minioMetaBucket { + if errors.Is(reducedErr, errErasureReadQuorum) && !strings.HasPrefix(bucket, minioMetaBucket) { + // Skip buckets that live at `.minio.sys` bucket. if _, ok := isObjectDangling(metaArr, errs, nil); ok { reducedErr = errFileNotFound if opts.VersionID != "" { diff --git a/cmd/erasure-server-pool-decom.go b/cmd/erasure-server-pool-decom.go new file mode 100644 index 000000000..5d3305133 --- /dev/null +++ b/cmd/erasure-server-pool-decom.go @@ -0,0 +1,919 @@ +// Copyright (c) 2015-2021 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "net/http" + "sort" + "time" + + "github.com/dustin/go-humanize" + "github.com/minio/madmin-go" + "github.com/minio/minio/internal/config/storageclass" + "github.com/minio/minio/internal/hash" + "github.com/minio/minio/internal/logger" +) + +// PoolDecommissionInfo currently decommissioning information +type PoolDecommissionInfo struct { + StartTime time.Time `json:"startTime" msg:"st"` + StartSize int64 `json:"startSize" msg:"ss"` + TotalSize int64 `json:"totalSize" msg:"ts"` + CurrentSize int64 `json:"currentSize" msg:"cs"` + + Complete bool `json:"complete" msg:"cmp"` + Failed bool `json:"failed" msg:"fl"` + Canceled bool `json:"canceled" msg:"cnl"` + + // Internal information. + QueuedBuckets []string `json:"-" msg:"bkts"` + DecommissionedBuckets []string `json:"-" msg:"dbkts"` + + // Last bucket/object decommissioned. + Bucket string `json:"-" msg:"bkt"` + Object string `json:"-" msg:"obj"` + + // Verbose information + ItemsDecommissioned uint64 `json:"-" msg:"id"` + ItemsDecommissionFailed uint64 `json:"-" msg:"idf"` + BytesDone uint64 `json:"-" msg:"bd"` + BytesFailed uint64 `json:"-" msg:"bf"` +} + +// bucketPop should be called when a bucket is done decommissioning. +// Adds the bucket to the list of decommissioned buckets and updates resume numbers. +func (pd *PoolDecommissionInfo) bucketPop(bucket string) { + pd.DecommissionedBuckets = append(pd.DecommissionedBuckets, bucket) + for i, b := range pd.QueuedBuckets { + if b == bucket { + // Bucket is done. + pd.QueuedBuckets = append(pd.QueuedBuckets[:i], pd.QueuedBuckets[i+1:]...) + } + } +} + +func (pd *PoolDecommissionInfo) bucketsToDecommission() []string { + return pd.QueuedBuckets +} + +func (pd *PoolDecommissionInfo) isBucketDecommissioned(bucket string) bool { + for _, b := range pd.DecommissionedBuckets { + if b == bucket { + return true + } + } + return false +} + +func (pd *PoolDecommissionInfo) bucketPush(bucket string) { + for _, b := range pd.QueuedBuckets { + if pd.isBucketDecommissioned(b) { + return + } + if b == bucket { + return + } + } + pd.QueuedBuckets = append(pd.QueuedBuckets, bucket) +} + +// PoolStatus captures current pool status +type PoolStatus struct { + ID int `json:"id" msg:"id"` + CmdLine string `json:"cmdline" msg:"cl"` + LastUpdate time.Time `json:"lastUpdate" msg:"lu"` + Decommission *PoolDecommissionInfo `json:"decommissionInfo,omitempty" msg:"dec"` +} + +//go:generate msgp -file $GOFILE -unexported +type poolMeta struct { + Version int `msg:"v"` + Pools []PoolStatus `msg:"pls"` +} + +// A decommission resumable tells us if decommission is worth +// resuming upon restart of a cluster. +func (p *poolMeta) returnResumablePools(n int) []PoolStatus { + var newPools []PoolStatus + for _, pool := range p.Pools { + if pool.Decommission == nil { + continue + } + if pool.Decommission.Complete || pool.Decommission.Canceled { + // Do not resume decommission upon startup for + // - decommission complete + // - decommission canceled + continue + } // In all other situations we need to resume + newPools = append(newPools, pool) + if n > 0 && len(newPools) == n { + return newPools + } + } + return nil +} + +func (p *poolMeta) DecommissionComplete(idx int) bool { + if p.Pools[idx].Decommission != nil { + p.Pools[idx].LastUpdate = time.Now().UTC() + p.Pools[idx].Decommission.Complete = true + p.Pools[idx].Decommission.Failed = false + p.Pools[idx].Decommission.Canceled = false + return true + } + return false +} + +func (p *poolMeta) DecommissionFailed(idx int) bool { + if p.Pools[idx].Decommission != nil { + p.Pools[idx].LastUpdate = time.Now().UTC() + p.Pools[idx].Decommission.StartTime = time.Time{} + p.Pools[idx].Decommission.Complete = false + p.Pools[idx].Decommission.Failed = true + p.Pools[idx].Decommission.Canceled = false + return true + } + return false +} + +func (p *poolMeta) DecommissionCancel(idx int) bool { + if p.Pools[idx].Decommission != nil { + p.Pools[idx].LastUpdate = time.Now().UTC() + p.Pools[idx].Decommission.StartTime = time.Time{} + p.Pools[idx].Decommission.Complete = false + p.Pools[idx].Decommission.Failed = false + p.Pools[idx].Decommission.Canceled = true + return true + } + return false +} + +func (p poolMeta) isBucketDecommissioned(idx int, bucket string) bool { + return p.Pools[idx].Decommission.isBucketDecommissioned(bucket) +} + +func (p *poolMeta) BucketDone(idx int, bucket string) { + if p.Pools[idx].Decommission == nil { + // Decommission not in progress. + return + } + p.Pools[idx].Decommission.bucketPop(bucket) +} + +func (p poolMeta) ResumeBucketObject(idx int) (bucket, object string) { + if p.Pools[idx].Decommission != nil { + bucket = p.Pools[idx].Decommission.Bucket + object = p.Pools[idx].Decommission.Object + } + return +} + +func (p *poolMeta) TrackCurrentBucketObject(idx int, bucket string, object string) { + if p.Pools[idx].Decommission == nil { + // Decommission not in progress. + return + } + p.Pools[idx].Decommission.Bucket = bucket + p.Pools[idx].Decommission.Object = object +} + +func (p *poolMeta) PendingBuckets(idx int) []string { + if p.Pools[idx].Decommission == nil { + // Decommission not in progress. + return nil + } + + return p.Pools[idx].Decommission.bucketsToDecommission() +} + +func (p *poolMeta) QueueBuckets(idx int, buckets []BucketInfo) { + // add new queued buckets + for _, bucket := range buckets { + p.Pools[idx].Decommission.bucketPush(bucket.Name) + } +} + +var ( + errDecommissionAlreadyRunning = errors.New("decommission is already in progress") + errDecommissionComplete = errors.New("decommission is complete, please remove the servers from command-line") +) + +func (p *poolMeta) Decommission(idx int, info StorageInfo) error { + for i, pool := range p.Pools { + if idx == i { + continue + } + if pool.Decommission != nil { + // Do not allow multiple decommissions at the same time. + // We shall for now only allow one pool decommission at + // a time. + return fmt.Errorf("%w at index: %d", errDecommissionAlreadyRunning, i) + } + } + if p.Pools[idx].Decommission == nil { + startSize := TotalUsableCapacityFree(info) + totalSize := TotalUsableCapacity(info) + p.Pools[idx].LastUpdate = time.Now().UTC() + p.Pools[idx].Decommission = &PoolDecommissionInfo{ + StartTime: UTCNow(), + StartSize: startSize, + CurrentSize: startSize, + TotalSize: totalSize, + } + return nil + } + + // Completed pool doesn't need to be decommissioned again. + if p.Pools[idx].Decommission.Complete { + return errDecommissionComplete + } + + // Canceled or Failed decommission can be triggered again. + if p.Pools[idx].Decommission.StartTime.IsZero() { + if p.Pools[idx].Decommission.Canceled || p.Pools[idx].Decommission.Failed { + startSize := TotalUsableCapacityFree(info) + totalSize := TotalUsableCapacity(info) + p.Pools[idx].LastUpdate = time.Now().UTC() + p.Pools[idx].Decommission = &PoolDecommissionInfo{ + StartTime: UTCNow(), + StartSize: startSize, + CurrentSize: startSize, + TotalSize: totalSize, + } + return nil + } + } // In-progress pool doesn't need to be decommissioned again. + + // In all other scenarios an active decommissioning is in progress. + return errDecommissionAlreadyRunning +} + +func (p poolMeta) IsSuspended(idx int) bool { + return p.Pools[idx].Decommission != nil +} + +func (p *poolMeta) load(ctx context.Context, pool *erasureSets, pools []*erasureSets) (bool, error) { + data, err := readConfig(ctx, pool, poolMetaName) + if err != nil { + if errors.Is(err, errConfigNotFound) || isErrObjectNotFound(err) { + return true, nil + } + return false, err + } + if len(data) == 0 { + return true, nil + } + if len(data) <= 4 { + return false, fmt.Errorf("poolMeta: no data") + } + // Read header + switch binary.LittleEndian.Uint16(data[0:2]) { + case poolMetaFormat: + default: + return false, fmt.Errorf("poolMeta: unknown format: %d", binary.LittleEndian.Uint16(data[0:2])) + } + switch binary.LittleEndian.Uint16(data[2:4]) { + case poolMetaVersion: + default: + return false, fmt.Errorf("poolMeta: unknown version: %d", binary.LittleEndian.Uint16(data[2:4])) + } + + // OK, parse data. + if _, err = p.UnmarshalMsg(data[4:]); err != nil { + return false, err + } + + switch p.Version { + case poolMetaVersionV1: + default: + return false, fmt.Errorf("unexpected pool meta version: %d", p.Version) + } + + type poolInfo struct { + position int + completed bool + } + + rememberedPools := make(map[string]poolInfo) + for idx, pool := range p.Pools { + complete := false + if pool.Decommission != nil && pool.Decommission.Complete { + complete = true + } + rememberedPools[pool.CmdLine] = poolInfo{ + position: idx, + completed: complete, + } + } + + specifiedPools := make(map[string]int) + for idx, pool := range pools { + specifiedPools[pool.endpoints.CmdLine] = idx + } + + // Check if specified pools need to remove decommissioned pool. + for k := range specifiedPools { + pi, ok := rememberedPools[k] + if ok && pi.completed { + return false, fmt.Errorf("pool(%s) = %s is decommissioned, please remove from server command line", humanize.Ordinal(pi.position+1), k) + } + } + + // check if remembered pools are in right position or missing from command line. + for k, pi := range rememberedPools { + if pi.completed { + continue + } + _, ok := specifiedPools[k] + if !ok { + return false, fmt.Errorf("pool(%s) = %s is not specified, please specify on server command line", humanize.Ordinal(pi.position+1), k) + } + } + + // check when remembered pools and specified pools are same they are at the expected position + if len(rememberedPools) == len(specifiedPools) { + for k, pi := range rememberedPools { + pos, ok := specifiedPools[k] + if !ok { + return false, fmt.Errorf("pool(%s) = %s is not specified, please specify on server command line", humanize.Ordinal(pi.position+1), k) + } + if pos != pi.position { + return false, fmt.Errorf("pool order change detected for %s, expected position is (%s) but found (%s)", k, humanize.Ordinal(pi.position+1), humanize.Ordinal(pos+1)) + } + } + } + + return len(rememberedPools) != len(specifiedPools), nil +} + +func (p *poolMeta) CountItem(idx int, size int64, failed bool) { + pd := p.Pools[idx].Decommission + if pd != nil { + if failed { + pd.ItemsDecommissionFailed++ + pd.BytesFailed += uint64(size) + } else { + pd.ItemsDecommissioned++ + pd.BytesDone += uint64(size) + } + p.Pools[idx].Decommission = pd + } +} + +func (p *poolMeta) updateAfter(ctx context.Context, idx int, pools []*erasureSets, duration time.Duration) error { + if p.Pools[idx].Decommission == nil { + return errInvalidArgument + } + if time.Since(p.Pools[idx].LastUpdate) > duration { + p.Pools[idx].LastUpdate = time.Now().UTC() + return p.save(ctx, pools) + } + return nil +} + +func (p poolMeta) save(ctx context.Context, pools []*erasureSets) error { + data := make([]byte, 4, p.Msgsize()+4) + + // Initialize the header. + binary.LittleEndian.PutUint16(data[0:2], poolMetaFormat) + binary.LittleEndian.PutUint16(data[2:4], poolMetaVersion) + + buf, err := p.MarshalMsg(data) + if err != nil { + return err + } + + // Saves on all pools to make sure decommissioning of first pool is allowed. + for _, eset := range pools { + if err = saveConfig(ctx, eset, poolMetaName, buf); err != nil { + return err + } + } + return nil +} + +const ( + poolMetaName = "pool.bin" + poolMetaFormat = 1 + poolMetaVersionV1 = 1 + poolMetaVersion = poolMetaVersionV1 +) + +// Init() initializes pools and saves additional information about them +// in 'pool.bin', this is eventually used for decommissioning the pool. +func (z *erasureServerPools) Init(ctx context.Context) error { + meta := poolMeta{} + + update, err := meta.load(ctx, z.serverPools[0], z.serverPools) + if err != nil { + return err + } + + // if no update is needed return right away. + if !update { + // We are only supporting single pool decommission at this time + // so it makes sense to only resume single pools at any given + // time, in future meta.returnResumablePools() might take + // '-1' as argument to decommission multiple pools at a time + // but this is not a priority at the moment. + for _, pool := range meta.returnResumablePools(1) { + err := z.Decommission(ctx, pool.ID) + switch err { + case errDecommissionAlreadyRunning: + fallthrough + case nil: + go z.doDecommissionInRoutine(ctx, pool.ID) + } + } + z.poolMeta = meta + + return nil + } + + // looks like new pool was added we need to update, + // or this is a fresh installation (or an existing + // installation with pool removed) + meta.Version = poolMetaVersion + for idx, pool := range z.serverPools { + meta.Pools = append(meta.Pools, PoolStatus{ + CmdLine: pool.endpoints.CmdLine, + ID: idx, + LastUpdate: time.Now().UTC(), + }) + } + if err = meta.save(ctx, z.serverPools); err != nil { + return err + } + z.poolMeta = meta + return nil +} + +func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket string, gr *GetObjectReader) (err error) { + defer gr.Close() + objInfo := gr.ObjInfo + if objInfo.isMultipart() { + uploadID, err := z.NewMultipartUpload(ctx, bucket, objInfo.Name, + ObjectOptions{ + VersionID: objInfo.VersionID, + MTime: objInfo.ModTime, + UserDefined: objInfo.UserDefined, + }) + if err != nil { + return err + } + defer z.AbortMultipartUpload(ctx, bucket, objInfo.Name, uploadID, ObjectOptions{}) + parts := make([]CompletePart, 0, len(objInfo.Parts)) + for _, part := range objInfo.Parts { + hr, err := hash.NewReader(gr, part.Size, "", "", part.Size) + if err != nil { + return err + } + _, err = z.PutObjectPart(ctx, bucket, objInfo.Name, uploadID, + part.Number, + NewPutObjReader(hr), + ObjectOptions{}) + if err != nil { + return err + } + parts = append(parts, CompletePart{ + PartNumber: part.Number, + ETag: part.ETag, + }) + } + _, err = z.CompleteMultipartUpload(ctx, bucket, objInfo.Name, uploadID, parts, ObjectOptions{ + MTime: objInfo.ModTime, + }) + return err + } + hr, err := hash.NewReader(gr, objInfo.Size, "", "", objInfo.Size) + if err != nil { + return err + } + _, err = z.PutObject(ctx, + bucket, + objInfo.Name, + NewPutObjReader(hr), + ObjectOptions{ + VersionID: objInfo.VersionID, + MTime: objInfo.ModTime, + UserDefined: objInfo.UserDefined, + }) + return err +} + +// versionsSorter sorts FileInfo slices by version. +//msgp:ignore versionsSorter +type versionsSorter []FileInfo + +func (v versionsSorter) reverse() { + sort.Slice(v, func(i, j int) bool { + return v[i].ModTime.Before(v[j].ModTime) + }) +} + +func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool *erasureSets, bName string) error { + var forwardTo string + // If we resume to the same bucket, forward to last known item. + rbucket, robject := z.poolMeta.ResumeBucketObject(idx) + if rbucket != "" { + if rbucket == bName { + forwardTo = robject + } + } + + versioned := globalBucketVersioningSys.Enabled(bName) + for _, set := range pool.sets { + disks := set.getOnlineDisks() + if len(disks) == 0 { + logger.LogIf(GlobalContext, fmt.Errorf("no online disks found for set with endpoints %s", + set.getEndpoints())) + continue + } + + decommissionEntry := func(entry metaCacheEntry) { + if entry.isDir() { + return + } + + fivs, err := entry.fileInfoVersions(bName) + if err != nil { + return + } + + // We need a reversed order for Decommissioning, + // to create the appropriate stack. + versionsSorter(fivs.Versions).reverse() + + for _, version := range fivs.Versions { + // TODO: Skip transitioned objects for now. + if version.IsRemote() { + continue + } + // We will skip decommissioning delete markers + // with single version, its as good as there + // is no data associated with the object. + if version.Deleted && len(fivs.Versions) == 1 { + continue + } + if version.Deleted { + _, err := z.DeleteObject(ctx, + bName, + version.Name, + ObjectOptions{ + Versioned: versioned, + VersionID: version.VersionID, + MTime: version.ModTime, + DeleteReplication: version.ReplicationState, + }) + if err != nil { + logger.LogIf(ctx, err) + z.poolMetaMutex.Lock() + z.poolMeta.CountItem(idx, 0, true) + z.poolMetaMutex.Unlock() + } else { + set.DeleteObject(ctx, + bName, + version.Name, + ObjectOptions{ + VersionID: version.VersionID, + }) + z.poolMetaMutex.Lock() + z.poolMeta.CountItem(idx, 0, false) + z.poolMetaMutex.Unlock() + } + continue + } + gr, err := set.GetObjectNInfo(ctx, + bName, + version.Name, + nil, + http.Header{}, + noLock, // all mutations are blocked reads are safe without locks. + ObjectOptions{ + VersionID: version.VersionID, + }) + if err != nil { + logger.LogIf(ctx, err) + z.poolMetaMutex.Lock() + z.poolMeta.CountItem(idx, version.Size, true) + z.poolMetaMutex.Unlock() + continue + } + // gr.Close() is ensured by decommissionObject(). + if err = z.decommissionObject(ctx, bName, gr); err != nil { + logger.LogIf(ctx, err) + z.poolMetaMutex.Lock() + z.poolMeta.CountItem(idx, version.Size, true) + z.poolMetaMutex.Unlock() + continue + } + set.DeleteObject(ctx, + bName, + version.Name, + ObjectOptions{ + VersionID: version.VersionID, + }) + z.poolMetaMutex.Lock() + z.poolMeta.CountItem(idx, version.Size, false) + z.poolMetaMutex.Unlock() + } + z.poolMetaMutex.Lock() + z.poolMeta.TrackCurrentBucketObject(idx, bName, entry.name) + logger.LogIf(ctx, z.poolMeta.updateAfter(ctx, idx, z.serverPools, time.Minute)) + z.poolMetaMutex.Unlock() + } + + // How to resolve partial results. + resolver := metadataResolutionParams{ + dirQuorum: set.defaultRQuorum(), + objQuorum: set.defaultRQuorum(), + bucket: bName, + } + + if err := listPathRaw(ctx, listPathRawOptions{ + disks: disks, + bucket: bName, + recursive: true, + forwardTo: forwardTo, + minDisks: len(disks), + reportNotFound: false, + agreed: decommissionEntry, + partial: func(entries metaCacheEntries, nAgreed int, errs []error) { + entry, ok := entries.resolve(&resolver) + if ok { + decommissionEntry(*entry) + } + }, + finished: nil, + }); err != nil { + // Decommissioning failed and won't continue + return err + } + } + return nil +} + +func (z *erasureServerPools) decommissionInBackground(ctx context.Context, idx int) error { + pool := z.serverPools[idx] + for _, bucket := range z.poolMeta.PendingBuckets(idx) { + if z.poolMeta.isBucketDecommissioned(idx, bucket) { + z.poolMetaMutex.Lock() + z.poolMeta.BucketDone(idx, bucket) // remove from pendingBuckets and persist. + z.poolMeta.save(ctx, z.serverPools) + z.poolMetaMutex.Unlock() + continue + } + if err := z.decommissionPool(ctx, idx, pool, bucket); err != nil { + return err + } + z.poolMetaMutex.Lock() + z.poolMeta.BucketDone(idx, bucket) + z.poolMeta.save(ctx, z.serverPools) + z.poolMetaMutex.Unlock() + } + return nil +} + +func (z *erasureServerPools) doDecommissionInRoutine(ctx context.Context, idx int) { + z.poolMetaMutex.Lock() + var dctx context.Context + dctx, z.decommissionCancelers[idx] = context.WithCancel(GlobalContext) + z.poolMetaMutex.Unlock() + + if err := z.decommissionInBackground(dctx, idx); err != nil { + logger.LogIf(GlobalContext, err) + logger.LogIf(GlobalContext, z.DecommissionFailed(dctx, idx)) + return + } + // Complete the decommission.. + logger.LogIf(GlobalContext, z.CompleteDecommission(dctx, idx)) +} + +// Decommission - start decommission session. +func (z *erasureServerPools) Decommission(ctx context.Context, idx int) error { + if idx < 0 { + return errInvalidArgument + } + + if z.SinglePool() { + return errInvalidArgument + } + + // Make pool unwritable before decommissioning. + if err := z.StartDecommission(ctx, idx); err != nil { + return err + } + + go z.doDecommissionInRoutine(ctx, idx) + + // Successfully started decommissioning. + return nil +} + +func (z *erasureServerPools) Status(ctx context.Context, idx int) (PoolStatus, error) { + if idx < 0 { + return PoolStatus{}, errInvalidArgument + } + + z.poolMetaMutex.RLock() + defer z.poolMetaMutex.RUnlock() + + if idx+1 > len(z.poolMeta.Pools) { + return PoolStatus{}, errInvalidArgument + } + + pool := z.serverPools[idx] + info, _ := pool.StorageInfo(ctx) + info.Backend.Type = madmin.Erasure + + scParity := globalStorageClass.GetParityForSC(storageclass.STANDARD) + if scParity <= 0 { + scParity = z.serverPools[0].defaultParityCount + } + rrSCParity := globalStorageClass.GetParityForSC(storageclass.RRS) + info.Backend.StandardSCData = append(info.Backend.StandardSCData, pool.SetDriveCount()-scParity) + info.Backend.RRSCData = append(info.Backend.RRSCData, pool.SetDriveCount()-rrSCParity) + info.Backend.StandardSCParity = scParity + info.Backend.RRSCParity = rrSCParity + + currentSize := TotalUsableCapacityFree(info) + + poolInfo := z.poolMeta.Pools[idx] + if poolInfo.Decommission != nil { + poolInfo.Decommission.CurrentSize = currentSize + } else { + poolInfo.Decommission = &PoolDecommissionInfo{ + CurrentSize: currentSize, + TotalSize: TotalUsableCapacity(info), + } + } + return poolInfo, nil +} + +func (z *erasureServerPools) ReloadPoolMeta(ctx context.Context) (err error) { + meta := poolMeta{} + + if _, err = meta.load(ctx, z.serverPools[0], z.serverPools); err != nil { + return err + } + + z.poolMetaMutex.Lock() + defer z.poolMetaMutex.Unlock() + + z.poolMeta = meta + return nil +} + +func (z *erasureServerPools) DecommissionCancel(ctx context.Context, idx int) (err error) { + if idx < 0 { + return errInvalidArgument + } + + if z.SinglePool() { + return errInvalidArgument + } + + z.poolMetaMutex.Lock() + defer z.poolMetaMutex.Unlock() + + if z.poolMeta.DecommissionCancel(idx) { + z.decommissionCancelers[idx]() // cancel any active thread. + if err = z.poolMeta.save(ctx, z.serverPools); err != nil { + return err + } + globalNotificationSys.ReloadPoolMeta(ctx) + } + return nil +} + +func (z *erasureServerPools) DecommissionFailed(ctx context.Context, idx int) (err error) { + if idx < 0 { + return errInvalidArgument + } + + if z.SinglePool() { + return errInvalidArgument + } + + z.poolMetaMutex.Lock() + defer z.poolMetaMutex.Unlock() + + if z.poolMeta.DecommissionFailed(idx) { + z.decommissionCancelers[idx]() // cancel any active thread. + if err = z.poolMeta.save(ctx, z.serverPools); err != nil { + return err + } + globalNotificationSys.ReloadPoolMeta(ctx) + } + return nil +} + +func (z *erasureServerPools) CompleteDecommission(ctx context.Context, idx int) (err error) { + if idx < 0 { + return errInvalidArgument + } + + if z.SinglePool() { + return errInvalidArgument + } + + z.poolMetaMutex.Lock() + defer z.poolMetaMutex.Unlock() + + if z.poolMeta.DecommissionComplete(idx) { + if err = z.poolMeta.save(ctx, z.serverPools); err != nil { + return err + } + globalNotificationSys.ReloadPoolMeta(ctx) + } + return nil +} + +func (z *erasureServerPools) StartDecommission(ctx context.Context, idx int) (err error) { + if idx < 0 { + return errInvalidArgument + } + + if z.SinglePool() { + return errInvalidArgument + } + + buckets, err := z.ListBuckets(ctx) + if err != nil { + return err + } + + // TODO: Support decommissioning transition tiers. + for _, bucket := range buckets { + if lc, err := globalLifecycleSys.Get(bucket.Name); err == nil { + if lc.HasTransition() { + return fmt.Errorf("Bucket is part of transitioned tier %s: decommission is not allowed in Tier'd setups", bucket.Name) + } + } + } + + // Buckets data are dispersed in multiple zones/sets, make + // sure to decommission the necessary metadata. + buckets = append(buckets, BucketInfo{ + Name: pathJoin(minioMetaBucket, minioConfigPrefix), + }, BucketInfo{ + Name: pathJoin(minioMetaBucket, bucketMetaPrefix), + }) + + var pool *erasureSets + for pidx := range z.serverPools { + if pidx == idx { + pool = z.serverPools[idx] + break + } + } + + if pool == nil { + return errInvalidArgument + } + + info, _ := pool.StorageInfo(ctx) + info.Backend.Type = madmin.Erasure + + scParity := globalStorageClass.GetParityForSC(storageclass.STANDARD) + if scParity <= 0 { + scParity = z.serverPools[0].defaultParityCount + } + rrSCParity := globalStorageClass.GetParityForSC(storageclass.RRS) + info.Backend.StandardSCData = append(info.Backend.StandardSCData, pool.SetDriveCount()-scParity) + info.Backend.RRSCData = append(info.Backend.RRSCData, pool.SetDriveCount()-rrSCParity) + info.Backend.StandardSCParity = scParity + info.Backend.RRSCParity = rrSCParity + + z.poolMetaMutex.Lock() + defer z.poolMetaMutex.Unlock() + + if err = z.poolMeta.Decommission(idx, info); err != nil { + return err + } + z.poolMeta.QueueBuckets(idx, buckets) + if err = z.poolMeta.save(ctx, z.serverPools); err != nil { + return err + } + globalNotificationSys.ReloadPoolMeta(ctx) + return nil +} diff --git a/cmd/erasure-server-pool-decom_gen.go b/cmd/erasure-server-pool-decom_gen.go new file mode 100644 index 000000000..07294cd3f --- /dev/null +++ b/cmd/erasure-server-pool-decom_gen.go @@ -0,0 +1,932 @@ +package cmd + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "github.com/tinylib/msgp/msgp" +) + +// DecodeMsg implements msgp.Decodable +func (z *PoolDecommissionInfo) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "st": + z.StartTime, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "StartTime") + return + } + case "ss": + z.StartSize, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "StartSize") + return + } + case "ts": + z.TotalSize, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "TotalSize") + return + } + case "cs": + z.CurrentSize, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "CurrentSize") + return + } + case "cmp": + z.Complete, err = dc.ReadBool() + if err != nil { + err = msgp.WrapError(err, "Complete") + return + } + case "fl": + z.Failed, err = dc.ReadBool() + if err != nil { + err = msgp.WrapError(err, "Failed") + return + } + case "cnl": + z.Canceled, err = dc.ReadBool() + if err != nil { + err = msgp.WrapError(err, "Canceled") + return + } + case "bkts": + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "QueuedBuckets") + return + } + if cap(z.QueuedBuckets) >= int(zb0002) { + z.QueuedBuckets = (z.QueuedBuckets)[:zb0002] + } else { + z.QueuedBuckets = make([]string, zb0002) + } + for za0001 := range z.QueuedBuckets { + z.QueuedBuckets[za0001], err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "QueuedBuckets", za0001) + return + } + } + case "dbkts": + var zb0003 uint32 + zb0003, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "DecommissionedBuckets") + return + } + if cap(z.DecommissionedBuckets) >= int(zb0003) { + z.DecommissionedBuckets = (z.DecommissionedBuckets)[:zb0003] + } else { + z.DecommissionedBuckets = make([]string, zb0003) + } + for za0002 := range z.DecommissionedBuckets { + z.DecommissionedBuckets[za0002], err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "DecommissionedBuckets", za0002) + return + } + } + case "bkt": + z.Bucket, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "obj": + z.Object, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Object") + return + } + case "id": + z.ItemsDecommissioned, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "ItemsDecommissioned") + return + } + case "idf": + z.ItemsDecommissionFailed, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "ItemsDecommissionFailed") + return + } + case "bd": + z.BytesDone, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "BytesDone") + return + } + case "bf": + z.BytesFailed, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "BytesFailed") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *PoolDecommissionInfo) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 15 + // write "st" + err = en.Append(0x8f, 0xa2, 0x73, 0x74) + if err != nil { + return + } + err = en.WriteTime(z.StartTime) + if err != nil { + err = msgp.WrapError(err, "StartTime") + return + } + // write "ss" + err = en.Append(0xa2, 0x73, 0x73) + if err != nil { + return + } + err = en.WriteInt64(z.StartSize) + if err != nil { + err = msgp.WrapError(err, "StartSize") + return + } + // write "ts" + err = en.Append(0xa2, 0x74, 0x73) + if err != nil { + return + } + err = en.WriteInt64(z.TotalSize) + if err != nil { + err = msgp.WrapError(err, "TotalSize") + return + } + // write "cs" + err = en.Append(0xa2, 0x63, 0x73) + if err != nil { + return + } + err = en.WriteInt64(z.CurrentSize) + if err != nil { + err = msgp.WrapError(err, "CurrentSize") + return + } + // write "cmp" + err = en.Append(0xa3, 0x63, 0x6d, 0x70) + if err != nil { + return + } + err = en.WriteBool(z.Complete) + if err != nil { + err = msgp.WrapError(err, "Complete") + return + } + // write "fl" + err = en.Append(0xa2, 0x66, 0x6c) + if err != nil { + return + } + err = en.WriteBool(z.Failed) + if err != nil { + err = msgp.WrapError(err, "Failed") + return + } + // write "cnl" + err = en.Append(0xa3, 0x63, 0x6e, 0x6c) + if err != nil { + return + } + err = en.WriteBool(z.Canceled) + if err != nil { + err = msgp.WrapError(err, "Canceled") + return + } + // write "bkts" + err = en.Append(0xa4, 0x62, 0x6b, 0x74, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.QueuedBuckets))) + if err != nil { + err = msgp.WrapError(err, "QueuedBuckets") + return + } + for za0001 := range z.QueuedBuckets { + err = en.WriteString(z.QueuedBuckets[za0001]) + if err != nil { + err = msgp.WrapError(err, "QueuedBuckets", za0001) + return + } + } + // write "dbkts" + err = en.Append(0xa5, 0x64, 0x62, 0x6b, 0x74, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.DecommissionedBuckets))) + if err != nil { + err = msgp.WrapError(err, "DecommissionedBuckets") + return + } + for za0002 := range z.DecommissionedBuckets { + err = en.WriteString(z.DecommissionedBuckets[za0002]) + if err != nil { + err = msgp.WrapError(err, "DecommissionedBuckets", za0002) + return + } + } + // write "bkt" + err = en.Append(0xa3, 0x62, 0x6b, 0x74) + if err != nil { + return + } + err = en.WriteString(z.Bucket) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + // write "obj" + err = en.Append(0xa3, 0x6f, 0x62, 0x6a) + if err != nil { + return + } + err = en.WriteString(z.Object) + if err != nil { + err = msgp.WrapError(err, "Object") + return + } + // write "id" + err = en.Append(0xa2, 0x69, 0x64) + if err != nil { + return + } + err = en.WriteUint64(z.ItemsDecommissioned) + if err != nil { + err = msgp.WrapError(err, "ItemsDecommissioned") + return + } + // write "idf" + err = en.Append(0xa3, 0x69, 0x64, 0x66) + if err != nil { + return + } + err = en.WriteUint64(z.ItemsDecommissionFailed) + if err != nil { + err = msgp.WrapError(err, "ItemsDecommissionFailed") + return + } + // write "bd" + err = en.Append(0xa2, 0x62, 0x64) + if err != nil { + return + } + err = en.WriteUint64(z.BytesDone) + if err != nil { + err = msgp.WrapError(err, "BytesDone") + return + } + // write "bf" + err = en.Append(0xa2, 0x62, 0x66) + if err != nil { + return + } + err = en.WriteUint64(z.BytesFailed) + if err != nil { + err = msgp.WrapError(err, "BytesFailed") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *PoolDecommissionInfo) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 15 + // string "st" + o = append(o, 0x8f, 0xa2, 0x73, 0x74) + o = msgp.AppendTime(o, z.StartTime) + // string "ss" + o = append(o, 0xa2, 0x73, 0x73) + o = msgp.AppendInt64(o, z.StartSize) + // string "ts" + o = append(o, 0xa2, 0x74, 0x73) + o = msgp.AppendInt64(o, z.TotalSize) + // string "cs" + o = append(o, 0xa2, 0x63, 0x73) + o = msgp.AppendInt64(o, z.CurrentSize) + // string "cmp" + o = append(o, 0xa3, 0x63, 0x6d, 0x70) + o = msgp.AppendBool(o, z.Complete) + // string "fl" + o = append(o, 0xa2, 0x66, 0x6c) + o = msgp.AppendBool(o, z.Failed) + // string "cnl" + o = append(o, 0xa3, 0x63, 0x6e, 0x6c) + o = msgp.AppendBool(o, z.Canceled) + // string "bkts" + o = append(o, 0xa4, 0x62, 0x6b, 0x74, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.QueuedBuckets))) + for za0001 := range z.QueuedBuckets { + o = msgp.AppendString(o, z.QueuedBuckets[za0001]) + } + // string "dbkts" + o = append(o, 0xa5, 0x64, 0x62, 0x6b, 0x74, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.DecommissionedBuckets))) + for za0002 := range z.DecommissionedBuckets { + o = msgp.AppendString(o, z.DecommissionedBuckets[za0002]) + } + // string "bkt" + o = append(o, 0xa3, 0x62, 0x6b, 0x74) + o = msgp.AppendString(o, z.Bucket) + // string "obj" + o = append(o, 0xa3, 0x6f, 0x62, 0x6a) + o = msgp.AppendString(o, z.Object) + // string "id" + o = append(o, 0xa2, 0x69, 0x64) + o = msgp.AppendUint64(o, z.ItemsDecommissioned) + // string "idf" + o = append(o, 0xa3, 0x69, 0x64, 0x66) + o = msgp.AppendUint64(o, z.ItemsDecommissionFailed) + // string "bd" + o = append(o, 0xa2, 0x62, 0x64) + o = msgp.AppendUint64(o, z.BytesDone) + // string "bf" + o = append(o, 0xa2, 0x62, 0x66) + o = msgp.AppendUint64(o, z.BytesFailed) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *PoolDecommissionInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "st": + z.StartTime, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "StartTime") + return + } + case "ss": + z.StartSize, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "StartSize") + return + } + case "ts": + z.TotalSize, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "TotalSize") + return + } + case "cs": + z.CurrentSize, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "CurrentSize") + return + } + case "cmp": + z.Complete, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Complete") + return + } + case "fl": + z.Failed, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Failed") + return + } + case "cnl": + z.Canceled, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Canceled") + return + } + case "bkts": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "QueuedBuckets") + return + } + if cap(z.QueuedBuckets) >= int(zb0002) { + z.QueuedBuckets = (z.QueuedBuckets)[:zb0002] + } else { + z.QueuedBuckets = make([]string, zb0002) + } + for za0001 := range z.QueuedBuckets { + z.QueuedBuckets[za0001], bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "QueuedBuckets", za0001) + return + } + } + case "dbkts": + var zb0003 uint32 + zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "DecommissionedBuckets") + return + } + if cap(z.DecommissionedBuckets) >= int(zb0003) { + z.DecommissionedBuckets = (z.DecommissionedBuckets)[:zb0003] + } else { + z.DecommissionedBuckets = make([]string, zb0003) + } + for za0002 := range z.DecommissionedBuckets { + z.DecommissionedBuckets[za0002], bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "DecommissionedBuckets", za0002) + return + } + } + case "bkt": + z.Bucket, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "obj": + z.Object, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Object") + return + } + case "id": + z.ItemsDecommissioned, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ItemsDecommissioned") + return + } + case "idf": + z.ItemsDecommissionFailed, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ItemsDecommissionFailed") + return + } + case "bd": + z.BytesDone, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "BytesDone") + return + } + case "bf": + z.BytesFailed, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "BytesFailed") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *PoolDecommissionInfo) Msgsize() (s int) { + s = 1 + 3 + msgp.TimeSize + 3 + msgp.Int64Size + 3 + msgp.Int64Size + 3 + msgp.Int64Size + 4 + msgp.BoolSize + 3 + msgp.BoolSize + 4 + msgp.BoolSize + 5 + msgp.ArrayHeaderSize + for za0001 := range z.QueuedBuckets { + s += msgp.StringPrefixSize + len(z.QueuedBuckets[za0001]) + } + s += 6 + msgp.ArrayHeaderSize + for za0002 := range z.DecommissionedBuckets { + s += msgp.StringPrefixSize + len(z.DecommissionedBuckets[za0002]) + } + s += 4 + msgp.StringPrefixSize + len(z.Bucket) + 4 + msgp.StringPrefixSize + len(z.Object) + 3 + msgp.Uint64Size + 4 + msgp.Uint64Size + 3 + msgp.Uint64Size + 3 + msgp.Uint64Size + return +} + +// DecodeMsg implements msgp.Decodable +func (z *PoolStatus) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "id": + z.ID, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "ID") + return + } + case "cl": + z.CmdLine, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "CmdLine") + return + } + case "lu": + z.LastUpdate, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "LastUpdate") + return + } + case "dec": + if dc.IsNil() { + err = dc.ReadNil() + if err != nil { + err = msgp.WrapError(err, "Decommission") + return + } + z.Decommission = nil + } else { + if z.Decommission == nil { + z.Decommission = new(PoolDecommissionInfo) + } + err = z.Decommission.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Decommission") + return + } + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *PoolStatus) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 4 + // write "id" + err = en.Append(0x84, 0xa2, 0x69, 0x64) + if err != nil { + return + } + err = en.WriteInt(z.ID) + if err != nil { + err = msgp.WrapError(err, "ID") + return + } + // write "cl" + err = en.Append(0xa2, 0x63, 0x6c) + if err != nil { + return + } + err = en.WriteString(z.CmdLine) + if err != nil { + err = msgp.WrapError(err, "CmdLine") + return + } + // write "lu" + err = en.Append(0xa2, 0x6c, 0x75) + if err != nil { + return + } + err = en.WriteTime(z.LastUpdate) + if err != nil { + err = msgp.WrapError(err, "LastUpdate") + return + } + // write "dec" + err = en.Append(0xa3, 0x64, 0x65, 0x63) + if err != nil { + return + } + if z.Decommission == nil { + err = en.WriteNil() + if err != nil { + return + } + } else { + err = z.Decommission.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Decommission") + return + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *PoolStatus) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 4 + // string "id" + o = append(o, 0x84, 0xa2, 0x69, 0x64) + o = msgp.AppendInt(o, z.ID) + // string "cl" + o = append(o, 0xa2, 0x63, 0x6c) + o = msgp.AppendString(o, z.CmdLine) + // string "lu" + o = append(o, 0xa2, 0x6c, 0x75) + o = msgp.AppendTime(o, z.LastUpdate) + // string "dec" + o = append(o, 0xa3, 0x64, 0x65, 0x63) + if z.Decommission == nil { + o = msgp.AppendNil(o) + } else { + o, err = z.Decommission.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Decommission") + return + } + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *PoolStatus) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "id": + z.ID, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "ID") + return + } + case "cl": + z.CmdLine, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "CmdLine") + return + } + case "lu": + z.LastUpdate, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "LastUpdate") + return + } + case "dec": + if msgp.IsNil(bts) { + bts, err = msgp.ReadNilBytes(bts) + if err != nil { + return + } + z.Decommission = nil + } else { + if z.Decommission == nil { + z.Decommission = new(PoolDecommissionInfo) + } + bts, err = z.Decommission.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Decommission") + return + } + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *PoolStatus) Msgsize() (s int) { + s = 1 + 3 + msgp.IntSize + 3 + msgp.StringPrefixSize + len(z.CmdLine) + 3 + msgp.TimeSize + 4 + if z.Decommission == nil { + s += msgp.NilSize + } else { + s += z.Decommission.Msgsize() + } + return +} + +// DecodeMsg implements msgp.Decodable +func (z *poolMeta) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "v": + z.Version, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "Version") + return + } + case "pls": + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Pools") + return + } + if cap(z.Pools) >= int(zb0002) { + z.Pools = (z.Pools)[:zb0002] + } else { + z.Pools = make([]PoolStatus, zb0002) + } + for za0001 := range z.Pools { + err = z.Pools[za0001].DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Pools", za0001) + return + } + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *poolMeta) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 2 + // write "v" + err = en.Append(0x82, 0xa1, 0x76) + if err != nil { + return + } + err = en.WriteInt(z.Version) + if err != nil { + err = msgp.WrapError(err, "Version") + return + } + // write "pls" + err = en.Append(0xa3, 0x70, 0x6c, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.Pools))) + if err != nil { + err = msgp.WrapError(err, "Pools") + return + } + for za0001 := range z.Pools { + err = z.Pools[za0001].EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Pools", za0001) + return + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *poolMeta) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 2 + // string "v" + o = append(o, 0x82, 0xa1, 0x76) + o = msgp.AppendInt(o, z.Version) + // string "pls" + o = append(o, 0xa3, 0x70, 0x6c, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.Pools))) + for za0001 := range z.Pools { + o, err = z.Pools[za0001].MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Pools", za0001) + return + } + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *poolMeta) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "v": + z.Version, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Version") + return + } + case "pls": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Pools") + return + } + if cap(z.Pools) >= int(zb0002) { + z.Pools = (z.Pools)[:zb0002] + } else { + z.Pools = make([]PoolStatus, zb0002) + } + for za0001 := range z.Pools { + bts, err = z.Pools[za0001].UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Pools", za0001) + return + } + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *poolMeta) Msgsize() (s int) { + s = 1 + 2 + msgp.IntSize + 4 + msgp.ArrayHeaderSize + for za0001 := range z.Pools { + s += z.Pools[za0001].Msgsize() + } + return +} diff --git a/cmd/erasure-server-pool-decom_gen_test.go b/cmd/erasure-server-pool-decom_gen_test.go new file mode 100644 index 000000000..b479ceb1e --- /dev/null +++ b/cmd/erasure-server-pool-decom_gen_test.go @@ -0,0 +1,349 @@ +package cmd + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "bytes" + "testing" + + "github.com/tinylib/msgp/msgp" +) + +func TestMarshalUnmarshalPoolDecommissionInfo(t *testing.T) { + v := PoolDecommissionInfo{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgPoolDecommissionInfo(b *testing.B) { + v := PoolDecommissionInfo{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgPoolDecommissionInfo(b *testing.B) { + v := PoolDecommissionInfo{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalPoolDecommissionInfo(b *testing.B) { + v := PoolDecommissionInfo{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodePoolDecommissionInfo(t *testing.T) { + v := PoolDecommissionInfo{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodePoolDecommissionInfo Msgsize() is inaccurate") + } + + vn := PoolDecommissionInfo{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodePoolDecommissionInfo(b *testing.B) { + v := PoolDecommissionInfo{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodePoolDecommissionInfo(b *testing.B) { + v := PoolDecommissionInfo{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + +func TestMarshalUnmarshalPoolStatus(t *testing.T) { + v := PoolStatus{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgPoolStatus(b *testing.B) { + v := PoolStatus{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgPoolStatus(b *testing.B) { + v := PoolStatus{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalPoolStatus(b *testing.B) { + v := PoolStatus{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodePoolStatus(t *testing.T) { + v := PoolStatus{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodePoolStatus Msgsize() is inaccurate") + } + + vn := PoolStatus{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodePoolStatus(b *testing.B) { + v := PoolStatus{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodePoolStatus(b *testing.B) { + v := PoolStatus{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + +func TestMarshalUnmarshalpoolMeta(t *testing.T) { + v := poolMeta{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgpoolMeta(b *testing.B) { + v := poolMeta{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgpoolMeta(b *testing.B) { + v := poolMeta{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalpoolMeta(b *testing.B) { + v := poolMeta{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodepoolMeta(t *testing.T) { + v := poolMeta{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodepoolMeta Msgsize() is inaccurate") + } + + vn := poolMeta{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodepoolMeta(b *testing.B) { + v := poolMeta{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodepoolMeta(b *testing.B) { + v := poolMeta{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index b0c51b76a..21ad5cc1b 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -42,10 +42,15 @@ import ( type erasureServerPools struct { GatewayUnsupported - serverPools []*erasureSets + poolMetaMutex sync.RWMutex + poolMeta poolMeta + serverPools []*erasureSets // Shut down async operations shutdown context.CancelFunc + + // Active decommission canceler + decommissionCancelers []context.CancelFunc } func (z *erasureServerPools) SinglePool() bool { @@ -62,7 +67,9 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ formats = make([]*formatErasureV3, len(endpointServerPools)) storageDisks = make([][]StorageAPI, len(endpointServerPools)) - z = &erasureServerPools{serverPools: make([]*erasureSets, len(endpointServerPools))} + z = &erasureServerPools{ + serverPools: make([]*erasureSets, len(endpointServerPools)), + } ) var localDrives []string @@ -108,11 +115,26 @@ func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServ return nil, fmt.Errorf("All serverPools should have same deployment ID expected %s, got %s", deploymentID, formats[i].ID) } - z.serverPools[i], err = newErasureSets(ctx, ep.Endpoints, storageDisks[i], formats[i], commonParityDrives, i) + z.serverPools[i], err = newErasureSets(ctx, ep, storageDisks[i], formats[i], commonParityDrives, i) if err != nil { return nil, err } } + + z.decommissionCancelers = make([]context.CancelFunc, len(z.serverPools)) + r := rand.New(rand.NewSource(time.Now().UnixNano())) + for { + err := z.Init(ctx) + if err != nil { + if !configRetriableErrors(err) { + logger.Fatal(err, "Unable to initialize backend") + } + time.Sleep(time.Duration(r.Float64() * float64(5*time.Second))) + continue + } + break + } + ctx, z.shutdown = context.WithCancel(ctx) go intDataUpdateTracker.start(ctx, localDrives...) return z, nil @@ -249,6 +271,10 @@ func (z *erasureServerPools) getServerPoolsAvailableSpace(ctx context.Context, b g := errgroup.WithNErrs(len(z.serverPools)) for index := range z.serverPools { index := index + // skip suspended pools for any new I/O. + if z.poolMeta.IsSuspended(index) { + continue + } g.Go(func() error { // Get the set where it would be placed. storageInfos[index] = getDiskInfos(ctx, z.serverPools[index].getHashedSet(object).getDisks()) @@ -292,19 +318,24 @@ func (z *erasureServerPools) getPoolIdxExistingWithOpts(ctx context.Context, buc } poolObjInfos := make([]poolObjInfo, len(z.serverPools)) + poolOpts := make([]ObjectOptions, len(z.serverPools)) + for i := range z.serverPools { + poolOpts[i] = opts + } var wg sync.WaitGroup for i, pool := range z.serverPools { wg.Add(1) - go func(i int, pool *erasureSets) { + go func(i int, pool *erasureSets, opts ObjectOptions) { defer wg.Done() // remember the pool index, we may sort the slice original index might be lost. pinfo := poolObjInfo{ PoolIndex: i, } + opts.VersionID = "" // no need to check for specific versionId pinfo.ObjInfo, pinfo.Err = pool.GetObjectInfo(ctx, bucket, object, opts) poolObjInfos[i] = pinfo - }(i, pool) + }(i, pool, poolOpts[i]) } wg.Wait() @@ -322,6 +353,12 @@ func (z *erasureServerPools) getPoolIdxExistingWithOpts(ctx context.Context, buc if pinfo.Err != nil && !isErrObjectNotFound(pinfo.Err) { return -1, pinfo.Err } + + // skip all objects from suspended pools for mutating calls. + if z.poolMeta.IsSuspended(pinfo.PoolIndex) && opts.Mutate { + continue + } + if isErrObjectNotFound(pinfo.Err) { // No object exists or its a delete marker, // check objInfo to confirm. @@ -333,23 +370,23 @@ func (z *erasureServerPools) getPoolIdxExistingWithOpts(ctx context.Context, buc // exist proceed to next pool. continue } + return pinfo.PoolIndex, nil } return -1, toObjectErr(errFileNotFound, bucket, object) } -func (z *erasureServerPools) getPoolIdxExistingNoLock(ctx context.Context, bucket, object string) (idx int, err error) { - return z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{NoLock: true}) -} - -// getPoolIdxExisting returns the (first) found object pool index containing an object. +// getPoolIdxExistingNoLock returns the (first) found object pool index containing an object. // If the object exists, but the latest version is a delete marker, the index with it is still returned. // If the object does not exist ObjectNotFound error is returned. // If any other error is found, it is returned. // The check is skipped if there is only one zone, and 0, nil is always returned in that case. -func (z *erasureServerPools) getPoolIdxExisting(ctx context.Context, bucket, object string) (idx int, err error) { - return z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{}) +func (z *erasureServerPools) getPoolIdxExistingNoLock(ctx context.Context, bucket, object string) (idx int, err error) { + return z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{ + NoLock: true, + Mutate: true, + }) } func (z *erasureServerPools) getPoolIdxNoLock(ctx context.Context, bucket, object string, size int64) (idx int, err error) { @@ -369,9 +406,10 @@ func (z *erasureServerPools) getPoolIdxNoLock(ctx context.Context, bucket, objec } // getPoolIdx returns the found previous object and its corresponding pool idx, -// if none are found falls back to most available space pool. +// if none are found falls back to most available space pool, this function is +// designed to be only used by PutObject, CopyObject (newObject creation) and NewMultipartUpload. func (z *erasureServerPools) getPoolIdx(ctx context.Context, bucket, object string, size int64) (idx int, err error) { - idx, err = z.getPoolIdxExisting(ctx, bucket, object) + idx, err = z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{Mutate: true}) if err != nil && !isErrObjectNotFound(err) { return idx, err } @@ -847,7 +885,11 @@ func (z *erasureServerPools) PutObject(ctx context.Context, bucket string, objec } func (z *erasureServerPools) deletePrefix(ctx context.Context, bucket string, prefix string) error { - for _, zone := range z.serverPools { + for idx, zone := range z.serverPools { + if z.poolMeta.IsSuspended(idx) { + logger.LogIf(ctx, fmt.Errorf("pool %d is suspended, all writes are suspended", idx+1)) + continue + } _, err := zone.DeleteObject(ctx, bucket, prefix, ObjectOptions{DeletePrefix: true}) if err != nil { return err @@ -871,7 +913,8 @@ func (z *erasureServerPools) DeleteObject(ctx context.Context, bucket string, ob return z.serverPools[0].DeleteObject(ctx, bucket, object, opts) } - idx, err := z.getPoolIdxExisting(ctx, bucket, object) + opts.Mutate = true + idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts) if err != nil { return objInfo, err } @@ -2006,7 +2049,8 @@ func (z *erasureServerPools) PutObjectMetadata(ctx context.Context, bucket, obje } // We don't know the size here set 1GiB atleast. - idx, err := z.getPoolIdxExisting(ctx, bucket, object) + opts.Mutate = true + idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts) if err != nil { return ObjectInfo{}, err } @@ -2022,7 +2066,8 @@ func (z *erasureServerPools) PutObjectTags(ctx context.Context, bucket, object s } // We don't know the size here set 1GiB atleast. - idx, err := z.getPoolIdxExisting(ctx, bucket, object) + opts.Mutate = true + idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts) if err != nil { return ObjectInfo{}, err } @@ -2037,7 +2082,8 @@ func (z *erasureServerPools) DeleteObjectTags(ctx context.Context, bucket, objec return z.serverPools[0].DeleteObjectTags(ctx, bucket, object, opts) } - idx, err := z.getPoolIdxExisting(ctx, bucket, object) + opts.Mutate = true + idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts) if err != nil { return ObjectInfo{}, err } @@ -2052,7 +2098,8 @@ func (z *erasureServerPools) GetObjectTags(ctx context.Context, bucket, object s return z.serverPools[0].GetObjectTags(ctx, bucket, object, opts) } - idx, err := z.getPoolIdxExisting(ctx, bucket, object) + opts.Mutate = false + idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts) if err != nil { return nil, err } @@ -2067,7 +2114,8 @@ func (z *erasureServerPools) TransitionObject(ctx context.Context, bucket, objec return z.serverPools[0].TransitionObject(ctx, bucket, object, opts) } - idx, err := z.getPoolIdxExisting(ctx, bucket, object) + opts.Mutate = true + idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts) if err != nil { return err } @@ -2082,7 +2130,8 @@ func (z *erasureServerPools) RestoreTransitionedObject(ctx context.Context, buck return z.serverPools[0].RestoreTransitionedObject(ctx, bucket, object, opts) } - idx, err := z.getPoolIdxExisting(ctx, bucket, object) + opts.Mutate = true + idx, err := z.getPoolIdxExistingWithOpts(ctx, bucket, object, opts) if err != nil { return err } diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index fb848a75a..edc2e1cff 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -71,7 +71,7 @@ type erasureSets struct { erasureLockOwner string // List of endpoints provided on the command line. - endpoints Endpoints + endpoints PoolEndpoints // String version of all the endpoints, an optimization // to avoid url.String() conversion taking CPU on @@ -208,7 +208,7 @@ func (s *erasureSets) connectDisks() { var wg sync.WaitGroup setsJustConnected := make([]bool, s.setCount) diskMap := s.getDiskMap() - for _, endpoint := range s.endpoints { + for _, endpoint := range s.endpoints.Endpoints { if isEndpointConnectionStable(diskMap, endpoint, s.lastConnectDisksOpTime) { continue } @@ -328,7 +328,7 @@ func (s *erasureSets) GetEndpoints(setIndex int) func() []Endpoint { eps := make([]Endpoint, s.setDriveCount) for i := 0; i < s.setDriveCount; i++ { - eps[i] = s.endpoints[setIndex*s.setDriveCount+i] + eps[i] = s.endpoints.Endpoints[setIndex*s.setDriveCount+i] } return eps } @@ -350,12 +350,12 @@ func (s *erasureSets) GetDisks(setIndex int) func() []StorageAPI { const defaultMonitorConnectEndpointInterval = defaultMonitorNewDiskInterval + time.Second*5 // Initialize new set of erasure coded sets. -func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []StorageAPI, format *formatErasureV3, defaultParityCount, poolIdx int) (*erasureSets, error) { +func newErasureSets(ctx context.Context, endpoints PoolEndpoints, storageDisks []StorageAPI, format *formatErasureV3, defaultParityCount, poolIdx int) (*erasureSets, error) { setCount := len(format.Erasure.Sets) setDriveCount := len(format.Erasure.Sets[0]) - endpointStrings := make([]string, len(endpoints)) - for i, endpoint := range endpoints { + endpointStrings := make([]string, len(endpoints.Endpoints)) + for i, endpoint := range endpoints.Endpoints { endpointStrings[i] = endpoint.String() } @@ -399,7 +399,7 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto } erasureLockers := map[string]dsync.NetLocker{} - for _, endpoint := range endpoints { + for _, endpoint := range endpoints.Endpoints { if _, ok := erasureLockers[endpoint.Host]; !ok { erasureLockers[endpoint.Host] = newLockAPI(endpoint) } @@ -408,8 +408,8 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto for i := 0; i < setCount; i++ { lockerEpSet := set.NewStringSet() for j := 0; j < setDriveCount; j++ { - endpoint := endpoints[i*setDriveCount+j] - // Only add lockers per endpoint. + endpoint := endpoints.Endpoints[i*setDriveCount+j] + // Only add lockers only one per endpoint and per erasure set. if locker, ok := erasureLockers[endpoint.Host]; ok && !lockerEpSet.Contains(endpoint.Host) { lockerEpSet.Add(endpoint.Host) s.erasureLockers[i] = append(s.erasureLockers[i], locker) @@ -1234,7 +1234,7 @@ func markRootDisksAsDown(storageDisks []StorageAPI, errs []error) { // HealFormat - heals missing `format.json` on fresh unformatted disks. func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealResultItem, err error) { - storageDisks, _ := initStorageDisksWithErrorsWithoutHealthCheck(s.endpoints) + storageDisks, _ := initStorageDisksWithErrorsWithoutHealthCheck(s.endpoints.Endpoints) defer func(storageDisks []StorageAPI) { if err != nil { @@ -1264,7 +1264,7 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H } // Fetch all the drive info status. - beforeDrives := formatsToDrivesInfo(s.endpoints, formats, sErrs) + beforeDrives := formatsToDrivesInfo(s.endpoints.Endpoints, formats, sErrs) res.After.Drives = make([]madmin.HealDriveInfo, len(beforeDrives)) res.Before.Drives = make([]madmin.HealDriveInfo, len(beforeDrives)) diff --git a/cmd/erasure-sets_test.go b/cmd/erasure-sets_test.go index 84ee2a738..57b826d93 100644 --- a/cmd/erasure-sets_test.go +++ b/cmd/erasure-sets_test.go @@ -190,7 +190,8 @@ func TestNewErasureSets(t *testing.T) { t.Fatalf("Unable to format disks for erasure, %s", err) } - if _, err := newErasureSets(ctx, endpoints, storageDisks, format, ecDrivesNoConfig(16), 0); err != nil { + ep := PoolEndpoints{Endpoints: endpoints} + if _, err := newErasureSets(ctx, ep, storageDisks, format, ecDrivesNoConfig(16), 0); err != nil { t.Fatalf("Unable to initialize erasure") } } diff --git a/cmd/notification.go b/cmd/notification.go index a5c6af8ba..8ca7ef365 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -616,6 +616,26 @@ func (sys *NotificationSys) GetClusterBucketStats(ctx context.Context, bucketNam return bucketStats } +// ReloadPoolMeta reloads on disk updates on pool metadata +func (sys *NotificationSys) ReloadPoolMeta(ctx context.Context) { + ng := WithNPeers(len(sys.peerClients)) + for idx, client := range sys.peerClients { + if client == nil { + continue + } + client := client + ng.Go(ctx, func() error { + return client.ReloadPoolMeta(ctx) + }, idx, *client.host) + } + for _, nErr := range ng.Wait() { + reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", nErr.Host.String()) + if nErr.Err != nil { + logger.LogIf(logger.SetReqInfo(ctx, reqInfo), nErr.Err) + } + } +} + // LoadTransitionTierConfig notifies remote peers to load their remote tier // configs from config store. func (sys *NotificationSys) LoadTransitionTierConfig(ctx context.Context) { diff --git a/cmd/object-api-datatypes.go b/cmd/object-api-datatypes.go index 6bf4bfd45..84c759527 100644 --- a/cmd/object-api-datatypes.go +++ b/cmd/object-api-datatypes.go @@ -46,6 +46,16 @@ const ( // StorageInfo - represents total capacity of underlying storage. type StorageInfo = madmin.StorageInfo +// TotalUsableCapacity - total usable capacity +func TotalUsableCapacity(s StorageInfo) int64 { + return int64(GetTotalUsableCapacity(s.Disks, s)) +} + +// TotalUsableCapacityFree - total usable capacity free +func TotalUsableCapacityFree(s StorageInfo) int64 { + return int64(GetTotalUsableCapacityFree(s.Disks, s)) +} + // objectHistogramInterval is an interval that will be // used to report the histogram of objects data sizes type objectHistogramInterval struct { diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 576d924d4..fa7714148 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -71,6 +71,9 @@ type ObjectOptions struct { // Use the maximum parity (N/2), used when saving server configuration files MaxParity bool + + // Mutate set to 'true' if the call is namespace mutation call + Mutate bool } // ExpirationOptions represents object options for object expiration at objectLayer. diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 6b9571b0f..896ae5799 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -767,6 +767,16 @@ func (client *peerRESTClient) UpdateMetacacheListing(ctx context.Context, m meta return resp, msgp.Decode(respBody, &resp) } +func (client *peerRESTClient) ReloadPoolMeta(ctx context.Context) error { + respBody, err := client.callWithContext(ctx, peerRESTMethodReloadPoolMeta, nil, nil, 0) + if err != nil { + logger.LogIf(ctx, err) + return err + } + defer http.DrainBody(respBody) + return nil +} + func (client *peerRESTClient) LoadTransitionTierConfig(ctx context.Context) error { respBody, err := client.callWithContext(ctx, peerRESTMethodLoadTransitionTierConfig, nil, nil, 0) if err != nil { diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go index e86e201ab..b6efdd3bf 100644 --- a/cmd/peer-rest-common.go +++ b/cmd/peer-rest-common.go @@ -18,7 +18,7 @@ package cmd const ( - peerRESTVersion = "v17" // Add "storage-class" option for SpeedTest + peerRESTVersion = "v18" // Add LoadPoolMeta peerRESTVersionPrefix = SlashSeparator + peerRESTVersion peerRESTPrefix = minioReservedBucketPath + "/peer" peerRESTPath = peerRESTPrefix + peerRESTVersionPrefix @@ -67,6 +67,7 @@ const ( peerRESTMethodLoadTransitionTierConfig = "/loadtransitiontierconfig" peerRESTMethodSpeedtest = "/speedtest" peerRESTMethodReloadSiteReplicationConfig = "/reloadsitereplicationconfig" + peerRESTMethodReloadPoolMeta = "/reloadpoolmeta" ) const ( diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 70d17e5ea..a5815924c 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -1023,6 +1023,27 @@ func (s *peerRESTServer) BackgroundHealStatusHandler(w http.ResponseWriter, r *h logger.LogIf(ctx, gob.NewEncoder(w).Encode(state)) } +func (s *peerRESTServer) ReloadPoolMetaHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("invalid request")) + return + } + objAPI := newObjectLayerFn() + if objAPI == nil { + s.writeErrorResponse(w, errServerNotInitialized) + return + } + + pools, ok := objAPI.(*erasureServerPools) + if !ok { + return + } + if err := pools.ReloadPoolMeta(r.Context()); err != nil { + s.writeErrorResponse(w, err) + return + } +} + func (s *peerRESTServer) LoadTransitionTierConfigHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { s.writeErrorResponse(w, errors.New("invalid request")) @@ -1353,4 +1374,5 @@ func registerPeerRESTHandlers(router *mux.Router) { subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadTransitionTierConfig).HandlerFunc(httpTraceHdrs(server.LoadTransitionTierConfigHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSpeedtest).HandlerFunc(httpTraceHdrs(server.SpeedtestHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadSiteReplicationConfig).HandlerFunc(httpTraceHdrs(server.ReloadSiteReplicationConfigHandler)) + subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadPoolMeta).HandlerFunc(httpTraceHdrs(server.ReloadPoolMetaHandler)) } diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index af85153d4..e571a118f 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -72,11 +72,12 @@ type FilesInfo struct { IsTruncated bool } -// FilesInfoVersions represents a list of file versions, -// additionally indicates if the list is last. -type FilesInfoVersions struct { - FilesVersions []FileInfoVersions - IsTruncated bool +// Size returns size of all versions for the object 'Name' +func (f FileInfoVersions) Size() (size int64) { + for _, v := range f.Versions { + size += v.Size + } + return size } // FileInfoVersions represent a list of versions for a given file. @@ -235,7 +236,9 @@ func (fi *FileInfo) SetInlineData() { } // VersionPurgeStatusKey denotes purge status in metadata -const VersionPurgeStatusKey = "purgestatus" +const ( + VersionPurgeStatusKey = ReservedMetadataPrefixLower + "purgestatus" +) // newFileInfo - initializes new FileInfo, allocates a fresh erasure info. func newFileInfo(object string, dataBlocks, parityBlocks int) (fi FileInfo) { diff --git a/cmd/storage-datatypes_gen.go b/cmd/storage-datatypes_gen.go index 317b2c1d0..5d40784b3 100644 --- a/cmd/storage-datatypes_gen.go +++ b/cmd/storage-datatypes_gen.go @@ -1530,178 +1530,6 @@ func (z *FilesInfo) Msgsize() (s int) { return } -// DecodeMsg implements msgp.Decodable -func (z *FilesInfoVersions) DecodeMsg(dc *msgp.Reader) (err error) { - var field []byte - _ = field - var zb0001 uint32 - zb0001, err = dc.ReadMapHeader() - if err != nil { - err = msgp.WrapError(err) - return - } - for zb0001 > 0 { - zb0001-- - field, err = dc.ReadMapKeyPtr() - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - case "FilesVersions": - var zb0002 uint32 - zb0002, err = dc.ReadArrayHeader() - if err != nil { - err = msgp.WrapError(err, "FilesVersions") - return - } - if cap(z.FilesVersions) >= int(zb0002) { - z.FilesVersions = (z.FilesVersions)[:zb0002] - } else { - z.FilesVersions = make([]FileInfoVersions, zb0002) - } - for za0001 := range z.FilesVersions { - err = z.FilesVersions[za0001].DecodeMsg(dc) - if err != nil { - err = msgp.WrapError(err, "FilesVersions", za0001) - return - } - } - case "IsTruncated": - z.IsTruncated, err = dc.ReadBool() - if err != nil { - err = msgp.WrapError(err, "IsTruncated") - return - } - default: - err = dc.Skip() - if err != nil { - err = msgp.WrapError(err) - return - } - } - } - return -} - -// EncodeMsg implements msgp.Encodable -func (z *FilesInfoVersions) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 2 - // write "FilesVersions" - err = en.Append(0x82, 0xad, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73) - if err != nil { - return - } - err = en.WriteArrayHeader(uint32(len(z.FilesVersions))) - if err != nil { - err = msgp.WrapError(err, "FilesVersions") - return - } - for za0001 := range z.FilesVersions { - err = z.FilesVersions[za0001].EncodeMsg(en) - if err != nil { - err = msgp.WrapError(err, "FilesVersions", za0001) - return - } - } - // write "IsTruncated" - err = en.Append(0xab, 0x49, 0x73, 0x54, 0x72, 0x75, 0x6e, 0x63, 0x61, 0x74, 0x65, 0x64) - if err != nil { - return - } - err = en.WriteBool(z.IsTruncated) - if err != nil { - err = msgp.WrapError(err, "IsTruncated") - return - } - return -} - -// MarshalMsg implements msgp.Marshaler -func (z *FilesInfoVersions) MarshalMsg(b []byte) (o []byte, err error) { - o = msgp.Require(b, z.Msgsize()) - // map header, size 2 - // string "FilesVersions" - o = append(o, 0x82, 0xad, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73) - o = msgp.AppendArrayHeader(o, uint32(len(z.FilesVersions))) - for za0001 := range z.FilesVersions { - o, err = z.FilesVersions[za0001].MarshalMsg(o) - if err != nil { - err = msgp.WrapError(err, "FilesVersions", za0001) - return - } - } - // string "IsTruncated" - o = append(o, 0xab, 0x49, 0x73, 0x54, 0x72, 0x75, 0x6e, 0x63, 0x61, 0x74, 0x65, 0x64) - o = msgp.AppendBool(o, z.IsTruncated) - return -} - -// UnmarshalMsg implements msgp.Unmarshaler -func (z *FilesInfoVersions) UnmarshalMsg(bts []byte) (o []byte, err error) { - var field []byte - _ = field - var zb0001 uint32 - zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - for zb0001 > 0 { - zb0001-- - field, bts, err = msgp.ReadMapKeyZC(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - case "FilesVersions": - var zb0002 uint32 - zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) - if err != nil { - err = msgp.WrapError(err, "FilesVersions") - return - } - if cap(z.FilesVersions) >= int(zb0002) { - z.FilesVersions = (z.FilesVersions)[:zb0002] - } else { - z.FilesVersions = make([]FileInfoVersions, zb0002) - } - for za0001 := range z.FilesVersions { - bts, err = z.FilesVersions[za0001].UnmarshalMsg(bts) - if err != nil { - err = msgp.WrapError(err, "FilesVersions", za0001) - return - } - } - case "IsTruncated": - z.IsTruncated, bts, err = msgp.ReadBoolBytes(bts) - if err != nil { - err = msgp.WrapError(err, "IsTruncated") - return - } - default: - bts, err = msgp.Skip(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - } - } - o = bts - return -} - -// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z *FilesInfoVersions) Msgsize() (s int) { - s = 1 + 14 + msgp.ArrayHeaderSize - for za0001 := range z.FilesVersions { - s += z.FilesVersions[za0001].Msgsize() - } - s += 12 + msgp.BoolSize - return -} - // DecodeMsg implements msgp.Decodable func (z *VolInfo) DecodeMsg(dc *msgp.Reader) (err error) { var zb0001 uint32 diff --git a/cmd/storage-datatypes_gen_test.go b/cmd/storage-datatypes_gen_test.go index 3bbe8555b..1495d113b 100644 --- a/cmd/storage-datatypes_gen_test.go +++ b/cmd/storage-datatypes_gen_test.go @@ -574,119 +574,6 @@ func BenchmarkDecodeFilesInfo(b *testing.B) { } } -func TestMarshalUnmarshalFilesInfoVersions(t *testing.T) { - v := FilesInfoVersions{} - bts, err := v.MarshalMsg(nil) - if err != nil { - t.Fatal(err) - } - left, err := v.UnmarshalMsg(bts) - if err != nil { - t.Fatal(err) - } - if len(left) > 0 { - t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) - } - - left, err = msgp.Skip(bts) - if err != nil { - t.Fatal(err) - } - if len(left) > 0 { - t.Errorf("%d bytes left over after Skip(): %q", len(left), left) - } -} - -func BenchmarkMarshalMsgFilesInfoVersions(b *testing.B) { - v := FilesInfoVersions{} - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - v.MarshalMsg(nil) - } -} - -func BenchmarkAppendMsgFilesInfoVersions(b *testing.B) { - v := FilesInfoVersions{} - bts := make([]byte, 0, v.Msgsize()) - bts, _ = v.MarshalMsg(bts[0:0]) - b.SetBytes(int64(len(bts))) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - bts, _ = v.MarshalMsg(bts[0:0]) - } -} - -func BenchmarkUnmarshalFilesInfoVersions(b *testing.B) { - v := FilesInfoVersions{} - bts, _ := v.MarshalMsg(nil) - b.ReportAllocs() - b.SetBytes(int64(len(bts))) - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, err := v.UnmarshalMsg(bts) - if err != nil { - b.Fatal(err) - } - } -} - -func TestEncodeDecodeFilesInfoVersions(t *testing.T) { - v := FilesInfoVersions{} - var buf bytes.Buffer - msgp.Encode(&buf, &v) - - m := v.Msgsize() - if buf.Len() > m { - t.Log("WARNING: TestEncodeDecodeFilesInfoVersions Msgsize() is inaccurate") - } - - vn := FilesInfoVersions{} - err := msgp.Decode(&buf, &vn) - if err != nil { - t.Error(err) - } - - buf.Reset() - msgp.Encode(&buf, &v) - err = msgp.NewReader(&buf).Skip() - if err != nil { - t.Error(err) - } -} - -func BenchmarkEncodeFilesInfoVersions(b *testing.B) { - v := FilesInfoVersions{} - var buf bytes.Buffer - msgp.Encode(&buf, &v) - b.SetBytes(int64(buf.Len())) - en := msgp.NewWriter(msgp.Nowhere) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - v.EncodeMsg(en) - } - en.Flush() -} - -func BenchmarkDecodeFilesInfoVersions(b *testing.B) { - v := FilesInfoVersions{} - var buf bytes.Buffer - msgp.Encode(&buf, &v) - b.SetBytes(int64(buf.Len())) - rd := msgp.NewEndlessReader(buf.Bytes(), b) - dc := msgp.NewReader(rd) - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - err := v.DecodeMsg(dc) - if err != nil { - b.Fatal(err) - } - } -} - func TestMarshalUnmarshalVolInfo(t *testing.T) { v := VolInfo{} bts, err := v.MarshalMsg(nil) diff --git a/docs/distributed/DECOMMISSION.md b/docs/distributed/DECOMMISSION.md new file mode 100644 index 000000000..61c619ad0 --- /dev/null +++ b/docs/distributed/DECOMMISSION.md @@ -0,0 +1,73 @@ +## Decommissioning + +### How to decommission a pool? +``` +λ mc admin decommission start alias/ http://minio{1...2}/data{1...4} +``` + +### Status decommissioning a pool + +#### Decommissioning without args lists all pools +``` +λ mc admin decommission status alias/ +┌─────┬─────────────────────────────────┬──────────────────────────────────┬────────┐ +│ ID │ Pools │ Capacity │ Status │ +│ 1st │ http://minio{1...2}/data{1...4} │ 439 GiB (used) / 561 GiB (total) │ Active │ +│ 2nd │ http://minio{3...4}/data{1...4} │ 329 GiB (used) / 421 GiB (total) │ Active │ +└─────┴─────────────────────────────────┴──────────────────────────────────┴────────┘ +``` + +#### Decommissioning status +``` +λ mc admin decommission status alias/ http://minio{1...2}/data{1...4} +Progress: ===================> [1GiB/sec] [15%] [4TiB/50TiB] +Time Remaining: 4 hours (started 3 hours ago) +``` + +#### A pool not under decommissioning will throw an error +``` +λ mc admin decommission status alias/ http://minio{1...2}/data{1...4} +ERROR: This pool is not scheduled for decommissioning currently. +``` + +### Canceling a decommission? +Stop an on-going decommission in progress, mainly used in situations when the load may be +too high and you may want to schedule the decommission at a later point in time. + +`mc admin decommission cancel` without an argument, lists out any on-going decommission in progress. + +``` +λ mc admin decommission cancel alias/ +┌─────┬─────────────────────────────────┬──────────────────────────────────┬──────────┐ +│ ID │ Pools │ Capacity │ Status │ +│ 1st │ http://minio{1...2}/data{1...4} │ 439 GiB (used) / 561 GiB (total) │ Draining │ +└─────┴─────────────────────────────────┴──────────────────────────────────┴──────────┘ +``` + +> NOTE: Canceled decommission will not make the pool active again, since we might have +> Potentially partial duplicate content on the other pools, to avoid this scenario be +> absolutely sure to start decommissioning as a planned activity. + +``` +λ mc admin decommission cancel alias/ http://minio{1...2}/data{1...4} +┌─────┬─────────────────────────────────┬──────────────────────────────────┬────────────────────┐ +│ ID │ Pools │ Capacity │ Status │ +│ 1st │ http://minio{1...2}/data{1...4} │ 439 GiB (used) / 561 GiB (total) │ Draining(Canceled) │ +└─────┴─────────────────────────────────┴──────────────────────────────────┴────────────────────┘ +``` + +If for some reason decommission fails in between, the `status` will indicate decommission as failed instead. +``` +λ mc admin decommission status alias/ +┌─────┬─────────────────────────────────┬──────────────────────────────────┬──────────────────┐ +│ ID │ Pools │ Capacity │ Status │ +│ 1st │ http://minio{1...2}/data{1...4} │ 439 GiB (used) / 561 GiB (total) │ Draining(Failed) │ +│ 2nd │ http://minio{3...4}/data{1...4} │ 329 GiB (used) / 421 GiB (total) │ Active │ +└─────┴─────────────────────────────────┴──────────────────────────────────┴──────────────────┘ +``` + +### Restart a canceled or failed decommission? + +``` +λ mc admin decommission start alias/ http://minio{1...2}/data{1...4} +``` diff --git a/internal/bucket/lifecycle/lifecycle.go b/internal/bucket/lifecycle/lifecycle.go index 570fa5a94..594726415 100644 --- a/internal/bucket/lifecycle/lifecycle.go +++ b/internal/bucket/lifecycle/lifecycle.go @@ -74,6 +74,16 @@ type Lifecycle struct { Rules []Rule `xml:"Rule"` } +// HasTransition returns 'true' if lifecycle document has Transition enabled. +func (lc Lifecycle) HasTransition() bool { + for _, rule := range lc.Rules { + if rule.Transition.IsEnabled() { + return true + } + } + return false +} + // UnmarshalXML - decodes XML data. func (lc *Lifecycle) UnmarshalXML(d *xml.Decoder, start xml.StartElement) (err error) { switch start.Name.Local { diff --git a/internal/bucket/lifecycle/transition.go b/internal/bucket/lifecycle/transition.go index ed54bd129..948510d01 100644 --- a/internal/bucket/lifecycle/transition.go +++ b/internal/bucket/lifecycle/transition.go @@ -106,6 +106,11 @@ type Transition struct { set bool } +// IsEnabled returns if transition is enabled. +func (t Transition) IsEnabled() bool { + return t.set +} + // MarshalXML encodes transition field into an XML form. func (t Transition) MarshalXML(enc *xml.Encoder, start xml.StartElement) error { if !t.set {