fix: batch expiry job doesn't report delete marker in batch-status (#21183)

This commit is contained in:
jiuker 2025-04-22 19:16:32 +08:00 committed by GitHub
parent 0379d6a37f
commit 864f80e226
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 143 additions and 98 deletions

View File

@ -424,12 +424,12 @@ func batchObjsForDelete(ctx context.Context, r *BatchJobExpire, ri *batchJobInfo
go func(toExpire []expireObjInfo) {
defer wk.Give()
toExpireAll := make([]ObjectInfo, 0, len(toExpire))
toExpireAll := make([]expireObjInfo, 0, len(toExpire))
toDel := make([]ObjectToDelete, 0, len(toExpire))
oiCache := newObjInfoCache()
for _, exp := range toExpire {
if exp.ExpireAll {
toExpireAll = append(toExpireAll, exp.ObjectInfo)
toExpireAll = append(toExpireAll, exp)
continue
}
// Cache ObjectInfo value via pointers for
@ -527,7 +527,8 @@ func batchObjsForDelete(ctx context.Context, r *BatchJobExpire, ri *batchJobInfo
type expireObjInfo struct {
ObjectInfo
ExpireAll bool
ExpireAll bool
DeleteMarkerCount int64
}
// Start the batch expiration job, resumes if there was a pending job via "job.ID"
@ -624,80 +625,115 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo
matchedFilter BatchJobExpireFilter
versionsCount int
toDel []expireObjInfo
failed bool
done bool
)
failed := false
for result := range results {
if result.Err != nil {
failed = true
batchLogIf(ctx, result.Err)
continue
deleteMarkerCountMap := map[string]int64{}
pushToExpire := func() {
// set preObject deleteMarkerCount
if len(toDel) > 0 {
lastDelIndex := len(toDel) - 1
lastDel := toDel[lastDelIndex]
if lastDel.ExpireAll {
toDel[lastDelIndex].DeleteMarkerCount = deleteMarkerCountMap[lastDel.Name]
// delete the key
delete(deleteMarkerCountMap, lastDel.Name)
}
}
// Apply filter to find the matching rule to apply expiry
// actions accordingly.
// nolint:gocritic
if result.Item.IsLatest {
// send down filtered entries to be deleted using
// DeleteObjects method
if len(toDel) > 10 { // batch up to 10 objects/versions to be expired simultaneously.
xfer := make([]expireObjInfo, len(toDel))
copy(xfer, toDel)
var done bool
select {
case <-ctx.Done():
done = true
case expireCh <- xfer:
toDel = toDel[:0] // resetting toDel
}
if done {
break
}
// send down filtered entries to be deleted using
// DeleteObjects method
if len(toDel) > 10 { // batch up to 10 objects/versions to be expired simultaneously.
xfer := make([]expireObjInfo, len(toDel))
copy(xfer, toDel)
select {
case expireCh <- xfer:
toDel = toDel[:0] // resetting toDel
case <-ctx.Done():
done = true
}
var match BatchJobExpireFilter
var found bool
for _, rule := range r.Rules {
if rule.Matches(result.Item, now) {
match = rule
found = true
break
}
}
if !found {
continue
}
prevObj = result.Item
matchedFilter = match
versionsCount = 1
// Include the latest version
if matchedFilter.Purge.RetainVersions == 0 {
toDel = append(toDel, expireObjInfo{
ObjectInfo: result.Item,
ExpireAll: true,
})
continue
}
} else if prevObj.Name == result.Item.Name {
if matchedFilter.Purge.RetainVersions == 0 {
continue // including latest version in toDel suffices, skipping other versions
}
versionsCount++
} else {
continue
}
if versionsCount <= matchedFilter.Purge.RetainVersions {
continue // retain versions
}
toDel = append(toDel, expireObjInfo{
ObjectInfo: result.Item,
})
}
for {
select {
case result, ok := <-results:
if !ok {
done = true
break
}
if result.Err != nil {
failed = true
batchLogIf(ctx, result.Err)
continue
}
if result.Item.DeleteMarker {
deleteMarkerCountMap[result.Item.Name]++
}
// Apply filter to find the matching rule to apply expiry
// actions accordingly.
// nolint:gocritic
if result.Item.IsLatest {
var match BatchJobExpireFilter
var found bool
for _, rule := range r.Rules {
if rule.Matches(result.Item, now) {
match = rule
found = true
break
}
}
if !found {
continue
}
if prevObj.Name != result.Item.Name {
// switch the object
pushToExpire()
}
prevObj = result.Item
matchedFilter = match
versionsCount = 1
// Include the latest version
if matchedFilter.Purge.RetainVersions == 0 {
toDel = append(toDel, expireObjInfo{
ObjectInfo: result.Item,
ExpireAll: true,
})
continue
}
} else if prevObj.Name == result.Item.Name {
if matchedFilter.Purge.RetainVersions == 0 {
continue // including latest version in toDel suffices, skipping other versions
}
versionsCount++
} else {
// switch the object
pushToExpire()
// a file switched with no LatestVersion, logging it
batchLogIf(ctx, fmt.Errorf("skipping object %s, no latest version found", result.Item.Name))
continue
}
if versionsCount <= matchedFilter.Purge.RetainVersions {
continue // retain versions
}
toDel = append(toDel, expireObjInfo{
ObjectInfo: result.Item,
})
pushToExpire()
case <-ctx.Done():
done = true
}
if done {
break
}
}
if context.Cause(ctx) != nil {
xioutil.SafeClose(expireCh)
return context.Cause(ctx)
}
pushToExpire()
// Send any remaining objects downstream
if len(toDel) > 0 {
select {

View File

@ -881,21 +881,23 @@ func (ri *batchJobInfo) clone() *batchJobInfo {
defer ri.mu.RUnlock()
return &batchJobInfo{
Version: ri.Version,
JobID: ri.JobID,
JobType: ri.JobType,
RetryAttempts: ri.RetryAttempts,
Complete: ri.Complete,
Failed: ri.Failed,
StartTime: ri.StartTime,
LastUpdate: ri.LastUpdate,
Bucket: ri.Bucket,
Object: ri.Object,
Objects: ri.Objects,
ObjectsFailed: ri.ObjectsFailed,
BytesTransferred: ri.BytesTransferred,
BytesFailed: ri.BytesFailed,
Attempts: ri.Attempts,
Version: ri.Version,
JobID: ri.JobID,
JobType: ri.JobType,
RetryAttempts: ri.RetryAttempts,
Complete: ri.Complete,
Failed: ri.Failed,
StartTime: ri.StartTime,
LastUpdate: ri.LastUpdate,
Bucket: ri.Bucket,
Object: ri.Object,
Objects: ri.Objects,
ObjectsFailed: ri.ObjectsFailed,
DeleteMarkers: ri.DeleteMarkers,
DeleteMarkersFailed: ri.DeleteMarkersFailed,
BytesTransferred: ri.BytesTransferred,
BytesFailed: ri.BytesFailed,
Attempts: ri.Attempts,
}
}
@ -994,11 +996,13 @@ func (ri *batchJobInfo) updateAfter(ctx context.Context, api ObjectLayer, durati
// Note: to be used only with batch jobs that affect multiple versions through
// a single action. e.g batch-expire has an option to expire all versions of an
// object which matches the given filters.
func (ri *batchJobInfo) trackMultipleObjectVersions(info ObjectInfo, success bool) {
func (ri *batchJobInfo) trackMultipleObjectVersions(info expireObjInfo, success bool) {
if success {
ri.Objects += int64(info.NumVersions)
ri.Objects += int64(info.NumVersions) - info.DeleteMarkerCount
ri.DeleteMarkers += info.DeleteMarkerCount
} else {
ri.ObjectsFailed += int64(info.NumVersions)
ri.ObjectsFailed += int64(info.NumVersions) - info.DeleteMarkerCount
ri.DeleteMarkersFailed += info.DeleteMarkerCount
}
}
@ -2134,12 +2138,14 @@ func (ri *batchJobInfo) metric() madmin.JobMetric {
switch ri.JobType {
case string(madmin.BatchJobReplicate):
m.Replicate = &madmin.ReplicateInfo{
Bucket: ri.Bucket,
Object: ri.Object,
Objects: ri.Objects,
ObjectsFailed: ri.ObjectsFailed,
BytesTransferred: ri.BytesTransferred,
BytesFailed: ri.BytesFailed,
Bucket: ri.Bucket,
Object: ri.Object,
Objects: ri.Objects,
DeleteMarkers: ri.DeleteMarkers,
ObjectsFailed: ri.ObjectsFailed,
DeleteMarkersFailed: ri.DeleteMarkersFailed,
BytesTransferred: ri.BytesTransferred,
BytesFailed: ri.BytesFailed,
}
case string(madmin.BatchJobKeyRotate):
m.KeyRotate = &madmin.KeyRotationInfo{
@ -2150,10 +2156,12 @@ func (ri *batchJobInfo) metric() madmin.JobMetric {
}
case string(madmin.BatchJobExpire):
m.Expired = &madmin.ExpirationInfo{
Bucket: ri.Bucket,
Object: ri.Object,
Objects: ri.Objects,
ObjectsFailed: ri.ObjectsFailed,
Bucket: ri.Bucket,
Object: ri.Object,
Objects: ri.Objects,
DeleteMarkers: ri.DeleteMarkers,
ObjectsFailed: ri.ObjectsFailed,
DeleteMarkersFailed: ri.DeleteMarkersFailed,
}
}

View File

@ -1,7 +1,8 @@
module github.com/minio/minio/docs/debugging/s3-verify
go 1.23
toolchain go1.24.1
go 1.23.0
toolchain go1.24.2
require github.com/minio/minio-go/v7 v7.0.83