decom IAM, Bucket metadata properly (#15220)

Current code incorrectly passed the
config asset object name while decommissioning,
make sure that we pass the right object name
to be hashed on the newer set of pools.

This PR fixes situations after a successful
decommission, the users and policies might go
missing due to wrong hashed set.
This commit is contained in:
Harshavardhana
2022-07-04 14:02:54 -07:00
committed by GitHub
parent ce667ddae0
commit b311abed31
6 changed files with 231 additions and 45 deletions

View File

@@ -53,6 +53,9 @@ type PoolDecommissionInfo struct {
// Last bucket/object decommissioned.
Bucket string `json:"-" msg:"bkt"`
// Captures prefix that is currently being
// decommissioned inside the 'Bucket'
Prefix string `json:"-" msg:"pfx"`
Object string `json:"-" msg:"obj"`
// Verbose information
@@ -73,6 +76,7 @@ func (pd *PoolDecommissionInfo) bucketPop(bucket string) {
// Clear tracker info.
if pd.Bucket == bucket {
pd.Bucket = "" // empty this out for next bucket
pd.Prefix = "" // empty this out for the next bucket
pd.Object = "" // empty this out for next object
}
return
@@ -80,12 +84,6 @@ func (pd *PoolDecommissionInfo) bucketPop(bucket string) {
}
}
func (pd *PoolDecommissionInfo) bucketsToDecommission() []string {
queuedBuckets := make([]string, len(pd.QueuedBuckets))
copy(queuedBuckets, pd.QueuedBuckets)
return queuedBuckets
}
func (pd *PoolDecommissionInfo) isBucketDecommissioned(bucket string) bool {
for _, b := range pd.DecommissionedBuckets {
if b == bucket {
@@ -95,17 +93,18 @@ func (pd *PoolDecommissionInfo) isBucketDecommissioned(bucket string) bool {
return false
}
func (pd *PoolDecommissionInfo) bucketPush(bucket string) {
func (pd *PoolDecommissionInfo) bucketPush(bucket decomBucketInfo) {
for _, b := range pd.QueuedBuckets {
if pd.isBucketDecommissioned(b) {
return
}
if b == bucket {
if b == bucket.String() {
return
}
}
pd.QueuedBuckets = append(pd.QueuedBuckets, bucket)
pd.Bucket = bucket
pd.QueuedBuckets = append(pd.QueuedBuckets, bucket.String())
pd.Bucket = bucket.Name
pd.Prefix = bucket.Prefix
}
// PoolStatus captures current pool status
@@ -183,12 +182,12 @@ func (p poolMeta) isBucketDecommissioned(idx int, bucket string) bool {
return p.Pools[idx].Decommission.isBucketDecommissioned(bucket)
}
func (p *poolMeta) BucketDone(idx int, bucket string) {
func (p *poolMeta) BucketDone(idx int, bucket decomBucketInfo) {
if p.Pools[idx].Decommission == nil {
// Decommission not in progress.
return
}
p.Pools[idx].Decommission.bucketPop(bucket)
p.Pools[idx].Decommission.bucketPop(bucket.String())
}
func (p poolMeta) ResumeBucketObject(idx int) (bucket, object string) {
@@ -208,19 +207,38 @@ func (p *poolMeta) TrackCurrentBucketObject(idx int, bucket string, object strin
p.Pools[idx].Decommission.Object = object
}
func (p *poolMeta) PendingBuckets(idx int) []string {
func (p *poolMeta) PendingBuckets(idx int) []decomBucketInfo {
if p.Pools[idx].Decommission == nil {
// Decommission not in progress.
return nil
}
return p.Pools[idx].Decommission.bucketsToDecommission()
decomBuckets := make([]decomBucketInfo, len(p.Pools[idx].Decommission.QueuedBuckets))
for i := range decomBuckets {
bucket, prefix := path2BucketObject(p.Pools[idx].Decommission.QueuedBuckets[i])
decomBuckets[i] = decomBucketInfo{
Name: bucket,
Prefix: prefix,
}
}
return decomBuckets
}
func (p *poolMeta) QueueBuckets(idx int, buckets []BucketInfo) {
//msgp:ignore decomBucketInfo
type decomBucketInfo struct {
Name string
Prefix string
}
func (db decomBucketInfo) String() string {
return pathJoin(db.Name, db.Prefix)
}
func (p *poolMeta) QueueBuckets(idx int, buckets []decomBucketInfo) {
// add new queued buckets
for _, bucket := range buckets {
p.Pools[idx].Decommission.bucketPush(bucket.Name)
p.Pools[idx].Decommission.bucketPush(bucket)
}
}
@@ -607,7 +625,7 @@ func (v versionsSorter) reverse() {
})
}
func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool *erasureSets, bName string) error {
func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool *erasureSets, bi decomBucketInfo) error {
ctx = logger.SetReqInfo(ctx, &logger.ReqInfo{})
var wg sync.WaitGroup
@@ -628,7 +646,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
continue
}
vc, _ := globalBucketVersioningSys.Get(bName)
vc, _ := globalBucketVersioningSys.Get(bi.Name)
decommissionEntry := func(entry metaCacheEntry) {
defer func() {
<-parallelWorkers
@@ -639,7 +657,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
return
}
fivs, err := entry.fileInfoVersions(bName)
fivs, err := entry.fileInfoVersions(bi.Name)
if err != nil {
return
}
@@ -652,19 +670,19 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
for _, version := range fivs.Versions {
// TODO: Skip transitioned objects for now.
if version.IsRemote() {
logger.LogIf(ctx, fmt.Errorf("found %s/%s transitioned object, transitioned object won't be decommissioned", bName, version.Name))
logger.LogIf(ctx, fmt.Errorf("found %s/%s transitioned object, transitioned object won't be decommissioned", bi.Name, version.Name))
continue
}
// We will skip decommissioning delete markers
// with single version, its as good as there
// is no data associated with the object.
if version.Deleted && len(fivs.Versions) == 1 {
logger.LogIf(ctx, fmt.Errorf("found %s/%s delete marked object with no other versions, skipping since there is no content left", bName, version.Name))
logger.LogIf(ctx, fmt.Errorf("found %s/%s delete marked object with no other versions, skipping since there is no content left", bi.Name, version.Name))
continue
}
if version.Deleted {
_, err := z.DeleteObject(ctx,
bName,
bi.Name,
version.Name,
ObjectOptions{
Versioned: vc.PrefixEnabled(version.Name),
@@ -691,7 +709,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
// gr.Close() is ensured by decommissionObject().
for try := 0; try < 3; try++ {
gr, err := set.GetObjectNInfo(ctx,
bName,
bi.Name,
encodeDirObject(version.Name),
nil,
http.Header{},
@@ -708,7 +726,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
logger.LogIf(ctx, err)
continue
}
if err = z.decommissionObject(ctx, bName, gr); err != nil {
if err = z.decommissionObject(ctx, bi.Name, gr); err != nil {
failure = true
logger.LogIf(ctx, err)
continue
@@ -728,19 +746,19 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
// if all versions were decommissioned, then we can delete the object versions.
if decommissionedCount == len(fivs.Versions) {
_, err := set.DeleteObject(ctx,
bName,
bi.Name,
encodeDirObject(entry.name),
ObjectOptions{
DeletePrefix: true, // use prefix delete to delete all versions at once.
},
)
auditLogDecom(ctx, "DecomDeleteObject", bName, entry.name, "", err)
auditLogDecom(ctx, "DecomDeleteObject", bi.Name, entry.name, "", err)
if err != nil {
logger.LogIf(ctx, err)
}
}
z.poolMetaMutex.Lock()
z.poolMeta.TrackCurrentBucketObject(idx, bName, entry.name)
z.poolMeta.TrackCurrentBucketObject(idx, bi.Name, entry.name)
ok, err := z.poolMeta.updateAfter(ctx, idx, z.serverPools, 30*time.Second)
logger.LogIf(ctx, err)
if ok {
@@ -753,7 +771,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
resolver := metadataResolutionParams{
dirQuorum: len(disks) / 2, // make sure to capture all quorum ratios
objQuorum: len(disks) / 2, // make sure to capture all quorum ratios
bucket: bName,
bucket: bi.Name,
}
wg.Add(1)
@@ -761,7 +779,8 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
defer wg.Done()
err := listPathRaw(ctx, listPathRawOptions{
disks: disks,
bucket: bName,
bucket: bi.Name,
path: bi.Prefix,
recursive: true,
forwardTo: "",
minDisks: len(disks) / 2, // to capture all quorum ratios
@@ -791,7 +810,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
func (z *erasureServerPools) decommissionInBackground(ctx context.Context, idx int) error {
pool := z.serverPools[idx]
for _, bucket := range z.poolMeta.PendingBuckets(idx) {
if z.poolMeta.isBucketDecommissioned(idx, bucket) {
if z.poolMeta.isBucketDecommissioned(idx, bucket.String()) {
if serverDebugLog {
console.Debugln("decommission: already done, moving on", bucket)
}
@@ -803,7 +822,7 @@ func (z *erasureServerPools) decommissionInBackground(ctx context.Context, idx i
continue
}
if serverDebugLog {
console.Debugln("decommission: currently on bucket", bucket)
console.Debugln("decommission: currently on bucket", bucket.Name)
}
if err := z.decommissionPool(ctx, idx, pool, bucket); err != nil {
return err
@@ -1031,8 +1050,15 @@ func (z *erasureServerPools) StartDecommission(ctx context.Context, idx int) (er
return err
}
decomBuckets := make([]decomBucketInfo, len(buckets))
for i := range buckets {
decomBuckets[i] = decomBucketInfo{
Name: buckets[i].Name,
}
}
// TODO: Support decommissioning transition tiers.
for _, bucket := range buckets {
for _, bucket := range decomBuckets {
if lc, err := globalLifecycleSys.Get(bucket.Name); err == nil {
if lc.HasTransition() {
return decomError{
@@ -1059,11 +1085,13 @@ func (z *erasureServerPools) StartDecommission(ctx context.Context, idx int) (er
// Buckets data are dispersed in multiple zones/sets, make
// sure to decommission the necessary metadata.
buckets = append(buckets, BucketInfo{
Name: pathJoin(minioMetaBucket, minioConfigPrefix),
decomBuckets = append(decomBuckets, decomBucketInfo{
Name: minioMetaBucket,
Prefix: minioConfigPrefix,
})
buckets = append(buckets, BucketInfo{
Name: pathJoin(minioMetaBucket, bucketMetaPrefix),
decomBuckets = append(decomBuckets, decomBucketInfo{
Name: minioMetaBucket,
Prefix: bucketMetaPrefix,
})
var pool *erasureSets
@@ -1089,7 +1117,7 @@ func (z *erasureServerPools) StartDecommission(ctx context.Context, idx int) (er
if err = z.poolMeta.Decommission(idx, pi); err != nil {
return err
}
z.poolMeta.QueueBuckets(idx, buckets)
z.poolMeta.QueueBuckets(idx, decomBuckets)
if err = z.poolMeta.save(ctx, z.serverPools); err != nil {
return err
}

View File

@@ -110,6 +110,12 @@ func (z *PoolDecommissionInfo) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "Bucket")
return
}
case "pfx":
z.Prefix, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Prefix")
return
}
case "obj":
z.Object, err = dc.ReadString()
if err != nil {
@@ -153,9 +159,9 @@ func (z *PoolDecommissionInfo) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable
func (z *PoolDecommissionInfo) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 15
// map header, size 16
// write "st"
err = en.Append(0x8f, 0xa2, 0x73, 0x74)
err = en.Append(0xde, 0x0, 0x10, 0xa2, 0x73, 0x74)
if err != nil {
return
}
@@ -268,6 +274,16 @@ func (z *PoolDecommissionInfo) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "Bucket")
return
}
// write "pfx"
err = en.Append(0xa3, 0x70, 0x66, 0x78)
if err != nil {
return
}
err = en.WriteString(z.Prefix)
if err != nil {
err = msgp.WrapError(err, "Prefix")
return
}
// write "obj"
err = en.Append(0xa3, 0x6f, 0x62, 0x6a)
if err != nil {
@@ -324,9 +340,9 @@ func (z *PoolDecommissionInfo) EncodeMsg(en *msgp.Writer) (err error) {
// MarshalMsg implements msgp.Marshaler
func (z *PoolDecommissionInfo) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 15
// map header, size 16
// string "st"
o = append(o, 0x8f, 0xa2, 0x73, 0x74)
o = append(o, 0xde, 0x0, 0x10, 0xa2, 0x73, 0x74)
o = msgp.AppendTime(o, z.StartTime)
// string "ss"
o = append(o, 0xa2, 0x73, 0x73)
@@ -361,6 +377,9 @@ func (z *PoolDecommissionInfo) MarshalMsg(b []byte) (o []byte, err error) {
// string "bkt"
o = append(o, 0xa3, 0x62, 0x6b, 0x74)
o = msgp.AppendString(o, z.Bucket)
// string "pfx"
o = append(o, 0xa3, 0x70, 0x66, 0x78)
o = msgp.AppendString(o, z.Prefix)
// string "obj"
o = append(o, 0xa3, 0x6f, 0x62, 0x6a)
o = msgp.AppendString(o, z.Object)
@@ -483,6 +502,12 @@ func (z *PoolDecommissionInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "Bucket")
return
}
case "pfx":
z.Prefix, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Prefix")
return
}
case "obj":
z.Object, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
@@ -527,7 +552,7 @@ func (z *PoolDecommissionInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *PoolDecommissionInfo) Msgsize() (s int) {
s = 1 + 3 + msgp.TimeSize + 3 + msgp.Int64Size + 3 + msgp.Int64Size + 3 + msgp.Int64Size + 4 + msgp.BoolSize + 3 + msgp.BoolSize + 4 + msgp.BoolSize + 5 + msgp.ArrayHeaderSize
s = 3 + 3 + msgp.TimeSize + 3 + msgp.Int64Size + 3 + msgp.Int64Size + 3 + msgp.Int64Size + 4 + msgp.BoolSize + 3 + msgp.BoolSize + 4 + msgp.BoolSize + 5 + msgp.ArrayHeaderSize
for za0001 := range z.QueuedBuckets {
s += msgp.StringPrefixSize + len(z.QueuedBuckets[za0001])
}
@@ -535,7 +560,7 @@ func (z *PoolDecommissionInfo) Msgsize() (s int) {
for za0002 := range z.DecommissionedBuckets {
s += msgp.StringPrefixSize + len(z.DecommissionedBuckets[za0002])
}
s += 4 + msgp.StringPrefixSize + len(z.Bucket) + 4 + msgp.StringPrefixSize + len(z.Object) + 3 + msgp.Int64Size + 4 + msgp.Int64Size + 3 + msgp.Int64Size + 3 + msgp.Int64Size
s += 4 + msgp.StringPrefixSize + len(z.Bucket) + 4 + msgp.StringPrefixSize + len(z.Prefix) + 4 + msgp.StringPrefixSize + len(z.Object) + 3 + msgp.Int64Size + 4 + msgp.Int64Size + 3 + msgp.Int64Size + 3 + msgp.Int64Size
return
}