fix: support decommissioning directory objects (#14822)

improvements in this PR include

- decommission objects that have __XLDIR__ suffix
- decommission objects that have `null` version on
  a versioned bucket.
- make sure to look for any "decom" failures to ensure
  that we do not wrong conclude decom as complete without
  all files getting copied over.
- break out eagerly upon first error for objects with
  multiple versions, leave the object as is for support
  debugging and analysis.
This commit is contained in:
Harshavardhana 2022-04-26 20:06:41 -07:00 committed by GitHub
parent df50eda811
commit c56a139fdc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -605,6 +605,7 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
<-parallelWorkers
wg.Done()
}()
if entry.isDir() {
return
}
@ -618,15 +619,18 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
// to create the appropriate stack.
versionsSorter(fivs.Versions).reverse()
var decommissionedCount int
for _, version := range fivs.Versions {
// TODO: Skip transitioned objects for now.
if version.IsRemote() {
logger.LogIf(ctx, fmt.Errorf("found %s/%s transitioned object, transitioned object won't be decommissioned", bName, version.Name))
continue
}
// We will skip decommissioning delete markers
// with single version, its as good as there
// is no data associated with the object.
if version.Deleted && len(fivs.Versions) == 1 {
logger.LogIf(ctx, fmt.Errorf("found %s/%s delete marked object with no other versions, skipping since there is no content left", bName, version.Name))
continue
}
if version.Deleted {
@ -639,27 +643,24 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
MTime: version.ModTime,
DeleteReplication: version.ReplicationState,
})
var failure bool
if err != nil {
logger.LogIf(ctx, err)
z.poolMetaMutex.Lock()
z.poolMeta.CountItem(idx, 0, true)
z.poolMetaMutex.Unlock()
} else {
set.DeleteObject(ctx,
bName,
version.Name,
ObjectOptions{
VersionID: version.VersionID,
})
z.poolMetaMutex.Lock()
z.poolMeta.CountItem(idx, 0, false)
z.poolMetaMutex.Unlock()
failure = true
}
z.poolMetaMutex.Lock()
z.poolMeta.CountItem(idx, 0, failure)
z.poolMetaMutex.Unlock()
if failure {
break // break out on first error
}
decommissionedCount++
continue
}
gr, err := set.GetObjectNInfo(ctx,
bName,
version.Name,
encodeDirObject(version.Name),
nil,
http.Header{},
noLock, // all mutations are blocked reads are safe without locks.
@ -671,25 +672,32 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
z.poolMetaMutex.Lock()
z.poolMeta.CountItem(idx, version.Size, true)
z.poolMetaMutex.Unlock()
continue
break // break out on first error
}
var failure bool
// gr.Close() is ensured by decommissionObject().
if err = z.decommissionObject(ctx, bName, gr); err != nil {
logger.LogIf(ctx, err)
z.poolMetaMutex.Lock()
z.poolMeta.CountItem(idx, version.Size, true)
z.poolMetaMutex.Unlock()
continue
failure = true
}
z.poolMetaMutex.Lock()
z.poolMeta.CountItem(idx, version.Size, failure)
z.poolMetaMutex.Unlock()
if failure {
break // break out on first error
}
decommissionedCount++
}
// if all versions were decommissioned, then we can delete the object versions.
if decommissionedCount == len(fivs.Versions) {
set.DeleteObject(ctx,
bName,
version.Name,
entry.name,
ObjectOptions{
VersionID: version.VersionID,
})
z.poolMetaMutex.Lock()
z.poolMeta.CountItem(idx, version.Size, false)
z.poolMetaMutex.Unlock()
DeletePrefix: true, // use prefix delete to delete all versions at once.
},
)
}
z.poolMetaMutex.Lock()
z.poolMeta.TrackCurrentBucketObject(idx, bName, entry.name)
@ -779,8 +787,18 @@ func (z *erasureServerPools) doDecommissionInRoutine(ctx context.Context, idx in
logger.LogIf(GlobalContext, z.DecommissionFailed(dctx, idx))
return
}
// Complete the decommission..
logger.LogIf(GlobalContext, z.CompleteDecommission(dctx, idx))
z.poolMetaMutex.Lock()
failed := z.poolMeta.Pools[idx].Decommission.ItemsDecommissionFailed > 0
z.poolMetaMutex.Unlock()
if failed {
// Decommission failed indicate as such.
logger.LogIf(GlobalContext, z.DecommissionFailed(dctx, idx))
} else {
// Complete the decommission..
logger.LogIf(GlobalContext, z.CompleteDecommission(dctx, idx))
}
}
func (z *erasureServerPools) IsSuspended(idx int) bool {