mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
fix: decommission bugfixes found during migration of .minio.sys/config (#14078)
This commit is contained in:
parent
3bd9636a5b
commit
737a3f0bad
@ -90,6 +90,18 @@ func toAdminAPIErr(ctx context.Context, err error) APIError {
|
|||||||
apiErr = errorCodes.ToAPIErrWithErr(e.Code, e.Cause)
|
apiErr = errorCodes.ToAPIErrWithErr(e.Code, e.Cause)
|
||||||
default:
|
default:
|
||||||
switch {
|
switch {
|
||||||
|
case errors.Is(err, errDecommissionAlreadyRunning):
|
||||||
|
apiErr = APIError{
|
||||||
|
Code: "XMinioDecommissionNotAllowed",
|
||||||
|
Description: err.Error(),
|
||||||
|
HTTPStatusCode: http.StatusBadRequest,
|
||||||
|
}
|
||||||
|
case errors.Is(err, errDecommissionComplete):
|
||||||
|
apiErr = APIError{
|
||||||
|
Code: "XMinioDecommissionNotAllowed",
|
||||||
|
Description: err.Error(),
|
||||||
|
HTTPStatusCode: http.StatusBadRequest,
|
||||||
|
}
|
||||||
case errors.Is(err, errConfigNotFound):
|
case errors.Is(err, errConfigNotFound):
|
||||||
apiErr = APIError{
|
apiErr = APIError{
|
||||||
Code: "XMinioConfigError",
|
Code: "XMinioConfigError",
|
||||||
|
@ -31,6 +31,7 @@ import (
|
|||||||
"github.com/minio/minio/internal/config/storageclass"
|
"github.com/minio/minio/internal/config/storageclass"
|
||||||
"github.com/minio/minio/internal/hash"
|
"github.com/minio/minio/internal/hash"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
|
"github.com/minio/pkg/console"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PoolDecommissionInfo currently decommissioning information
|
// PoolDecommissionInfo currently decommissioning information
|
||||||
@ -67,12 +68,20 @@ func (pd *PoolDecommissionInfo) bucketPop(bucket string) {
|
|||||||
if b == bucket {
|
if b == bucket {
|
||||||
// Bucket is done.
|
// Bucket is done.
|
||||||
pd.QueuedBuckets = append(pd.QueuedBuckets[:i], pd.QueuedBuckets[i+1:]...)
|
pd.QueuedBuckets = append(pd.QueuedBuckets[:i], pd.QueuedBuckets[i+1:]...)
|
||||||
|
// Clear tracker info.
|
||||||
|
if pd.Bucket == bucket {
|
||||||
|
pd.Bucket = "" // empty this out for next bucket
|
||||||
|
pd.Object = "" // empty this out for next object
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pd *PoolDecommissionInfo) bucketsToDecommission() []string {
|
func (pd *PoolDecommissionInfo) bucketsToDecommission() []string {
|
||||||
return pd.QueuedBuckets
|
queuedBuckets := make([]string, len(pd.QueuedBuckets))
|
||||||
|
copy(queuedBuckets, pd.QueuedBuckets)
|
||||||
|
return queuedBuckets
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pd *PoolDecommissionInfo) isBucketDecommissioned(bucket string) bool {
|
func (pd *PoolDecommissionInfo) isBucketDecommissioned(bucket string) bool {
|
||||||
@ -94,6 +103,7 @@ func (pd *PoolDecommissionInfo) bucketPush(bucket string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
pd.QueuedBuckets = append(pd.QueuedBuckets, bucket)
|
pd.QueuedBuckets = append(pd.QueuedBuckets, bucket)
|
||||||
|
pd.Bucket = bucket
|
||||||
}
|
}
|
||||||
|
|
||||||
// PoolStatus captures current pool status
|
// PoolStatus captures current pool status
|
||||||
@ -271,7 +281,7 @@ func (p poolMeta) IsSuspended(idx int) bool {
|
|||||||
return p.Pools[idx].Decommission != nil
|
return p.Pools[idx].Decommission != nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *poolMeta) load(ctx context.Context, pool *erasureSets, pools []*erasureSets) (bool, error) {
|
func (p *poolMeta) load(ctx context.Context, pool *erasureSets, pools []*erasureSets, validate bool) (bool, error) {
|
||||||
data, err := readConfig(ctx, pool, poolMetaName)
|
data, err := readConfig(ctx, pool, poolMetaName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, errConfigNotFound) || isErrObjectNotFound(err) {
|
if errors.Is(err, errConfigNotFound) || isErrObjectNotFound(err) {
|
||||||
@ -308,6 +318,11 @@ func (p *poolMeta) load(ctx context.Context, pool *erasureSets, pools []*erasure
|
|||||||
return false, fmt.Errorf("unexpected pool meta version: %d", p.Version)
|
return false, fmt.Errorf("unexpected pool meta version: %d", p.Version)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !validate {
|
||||||
|
// No further validation requested.
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
type poolInfo struct {
|
type poolInfo struct {
|
||||||
position int
|
position int
|
||||||
completed bool
|
completed bool
|
||||||
@ -423,7 +438,7 @@ const (
|
|||||||
func (z *erasureServerPools) Init(ctx context.Context) error {
|
func (z *erasureServerPools) Init(ctx context.Context) error {
|
||||||
meta := poolMeta{}
|
meta := poolMeta{}
|
||||||
|
|
||||||
update, err := meta.load(ctx, z.serverPools[0], z.serverPools)
|
update, err := meta.load(ctx, z.serverPools[0], z.serverPools, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -471,33 +486,32 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri
|
|||||||
defer gr.Close()
|
defer gr.Close()
|
||||||
objInfo := gr.ObjInfo
|
objInfo := gr.ObjInfo
|
||||||
if objInfo.isMultipart() {
|
if objInfo.isMultipart() {
|
||||||
uploadID, err := z.NewMultipartUpload(ctx, bucket, objInfo.Name,
|
uploadID, err := z.NewMultipartUpload(ctx, bucket, objInfo.Name, ObjectOptions{
|
||||||
ObjectOptions{
|
VersionID: objInfo.VersionID,
|
||||||
VersionID: objInfo.VersionID,
|
MTime: objInfo.ModTime,
|
||||||
MTime: objInfo.ModTime,
|
UserDefined: objInfo.UserDefined,
|
||||||
UserDefined: objInfo.UserDefined,
|
})
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer z.AbortMultipartUpload(ctx, bucket, objInfo.Name, uploadID, ObjectOptions{})
|
defer z.AbortMultipartUpload(ctx, bucket, objInfo.Name, uploadID, ObjectOptions{})
|
||||||
parts := make([]CompletePart, 0, len(objInfo.Parts))
|
parts := make([]CompletePart, len(objInfo.Parts))
|
||||||
for _, part := range objInfo.Parts {
|
for i, part := range objInfo.Parts {
|
||||||
hr, err := hash.NewReader(gr, part.Size, "", "", part.Size)
|
hr, err := hash.NewReader(gr, part.Size, "", "", part.Size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, err = z.PutObjectPart(ctx, bucket, objInfo.Name, uploadID,
|
pi, err := z.PutObjectPart(ctx, bucket, objInfo.Name, uploadID,
|
||||||
part.Number,
|
part.Number,
|
||||||
NewPutObjReader(hr),
|
NewPutObjReader(hr),
|
||||||
ObjectOptions{})
|
ObjectOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
parts = append(parts, CompletePart{
|
parts[i] = CompletePart{
|
||||||
PartNumber: part.Number,
|
ETag: pi.ETag,
|
||||||
ETag: part.ETag,
|
PartNumber: pi.PartNumber,
|
||||||
})
|
}
|
||||||
}
|
}
|
||||||
_, err = z.CompleteMultipartUpload(ctx, bucket, objInfo.Name, uploadID, parts, ObjectOptions{
|
_, err = z.CompleteMultipartUpload(ctx, bucket, objInfo.Name, uploadID, parts, ObjectOptions{
|
||||||
MTime: objInfo.ModTime,
|
MTime: objInfo.ModTime,
|
||||||
@ -534,10 +548,8 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
|
|||||||
var forwardTo string
|
var forwardTo string
|
||||||
// If we resume to the same bucket, forward to last known item.
|
// If we resume to the same bucket, forward to last known item.
|
||||||
rbucket, robject := z.poolMeta.ResumeBucketObject(idx)
|
rbucket, robject := z.poolMeta.ResumeBucketObject(idx)
|
||||||
if rbucket != "" {
|
if rbucket != "" && rbucket == bName {
|
||||||
if rbucket == bName {
|
forwardTo = robject
|
||||||
forwardTo = robject
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
versioned := globalBucketVersioningSys.Enabled(bName)
|
versioned := globalBucketVersioningSys.Enabled(bName)
|
||||||
@ -676,12 +688,19 @@ func (z *erasureServerPools) decommissionInBackground(ctx context.Context, idx i
|
|||||||
pool := z.serverPools[idx]
|
pool := z.serverPools[idx]
|
||||||
for _, bucket := range z.poolMeta.PendingBuckets(idx) {
|
for _, bucket := range z.poolMeta.PendingBuckets(idx) {
|
||||||
if z.poolMeta.isBucketDecommissioned(idx, bucket) {
|
if z.poolMeta.isBucketDecommissioned(idx, bucket) {
|
||||||
|
if serverDebugLog {
|
||||||
|
console.Debugln("decommission: already done, moving on", bucket)
|
||||||
|
}
|
||||||
|
|
||||||
z.poolMetaMutex.Lock()
|
z.poolMetaMutex.Lock()
|
||||||
z.poolMeta.BucketDone(idx, bucket) // remove from pendingBuckets and persist.
|
z.poolMeta.BucketDone(idx, bucket) // remove from pendingBuckets and persist.
|
||||||
z.poolMeta.save(ctx, z.serverPools)
|
z.poolMeta.save(ctx, z.serverPools)
|
||||||
z.poolMetaMutex.Unlock()
|
z.poolMetaMutex.Unlock()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if serverDebugLog {
|
||||||
|
console.Debugln("decommission: currently on bucket", bucket)
|
||||||
|
}
|
||||||
if err := z.decommissionPool(ctx, idx, pool, bucket); err != nil {
|
if err := z.decommissionPool(ctx, idx, pool, bucket); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -756,14 +775,16 @@ func (z *erasureServerPools) Status(ctx context.Context, idx int) (PoolStatus, e
|
|||||||
info.Backend.RRSCParity = rrSCParity
|
info.Backend.RRSCParity = rrSCParity
|
||||||
|
|
||||||
currentSize := TotalUsableCapacityFree(info)
|
currentSize := TotalUsableCapacityFree(info)
|
||||||
|
totalSize := TotalUsableCapacity(info)
|
||||||
|
|
||||||
poolInfo := z.poolMeta.Pools[idx]
|
poolInfo := z.poolMeta.Pools[idx]
|
||||||
if poolInfo.Decommission != nil {
|
if poolInfo.Decommission != nil {
|
||||||
|
poolInfo.Decommission.TotalSize = totalSize
|
||||||
poolInfo.Decommission.CurrentSize = currentSize
|
poolInfo.Decommission.CurrentSize = currentSize
|
||||||
} else {
|
} else {
|
||||||
poolInfo.Decommission = &PoolDecommissionInfo{
|
poolInfo.Decommission = &PoolDecommissionInfo{
|
||||||
CurrentSize: currentSize,
|
CurrentSize: currentSize,
|
||||||
TotalSize: TotalUsableCapacity(info),
|
TotalSize: totalSize,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return poolInfo, nil
|
return poolInfo, nil
|
||||||
@ -772,7 +793,7 @@ func (z *erasureServerPools) Status(ctx context.Context, idx int) (PoolStatus, e
|
|||||||
func (z *erasureServerPools) ReloadPoolMeta(ctx context.Context) (err error) {
|
func (z *erasureServerPools) ReloadPoolMeta(ctx context.Context) (err error) {
|
||||||
meta := poolMeta{}
|
meta := poolMeta{}
|
||||||
|
|
||||||
if _, err = meta.load(ctx, z.serverPools[0], z.serverPools); err != nil {
|
if _, err = meta.load(ctx, z.serverPools[0], z.serverPools, false); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -875,7 +896,8 @@ func (z *erasureServerPools) StartDecommission(ctx context.Context, idx int) (er
|
|||||||
// sure to decommission the necessary metadata.
|
// sure to decommission the necessary metadata.
|
||||||
buckets = append(buckets, BucketInfo{
|
buckets = append(buckets, BucketInfo{
|
||||||
Name: pathJoin(minioMetaBucket, minioConfigPrefix),
|
Name: pathJoin(minioMetaBucket, minioConfigPrefix),
|
||||||
}, BucketInfo{
|
})
|
||||||
|
buckets = append(buckets, BucketInfo{
|
||||||
Name: pathJoin(minioMetaBucket, bucketMetaPrefix),
|
Name: pathJoin(minioMetaBucket, bucketMetaPrefix),
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -896,8 +896,17 @@ func makeFormatErasureMetaVolumes(disk StorageAPI) error {
|
|||||||
if disk == nil {
|
if disk == nil {
|
||||||
return errDiskNotFound
|
return errDiskNotFound
|
||||||
}
|
}
|
||||||
|
volumes := []string{
|
||||||
|
minioMetaBucket,
|
||||||
|
minioMetaTmpBucket,
|
||||||
|
minioMetaMultipartBucket,
|
||||||
|
minioMetaTmpDeletedBucket,
|
||||||
|
dataUsageBucket,
|
||||||
|
pathJoin(minioMetaBucket, minioConfigPrefix),
|
||||||
|
minioMetaTmpBucket + "-old",
|
||||||
|
}
|
||||||
// Attempt to create MinIO internal buckets.
|
// Attempt to create MinIO internal buckets.
|
||||||
return disk.MakeVolBulk(context.TODO(), minioMetaBucket, minioMetaTmpBucket, minioMetaMultipartBucket, minioMetaTmpDeletedBucket, dataUsageBucket, minioMetaTmpBucket+"-old")
|
return disk.MakeVolBulk(context.TODO(), volumes...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize a new set of set formats which will be written to all disks.
|
// Initialize a new set of set formats which will be written to all disks.
|
||||||
|
@ -209,6 +209,9 @@ func (s *xlStorage) Sanitize() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create any missing paths.
|
||||||
|
makeFormatErasureMetaVolumes(s)
|
||||||
|
|
||||||
return formatErasureCleanupTmp(s.diskPath)
|
return formatErasureCleanupTmp(s.diskPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user