diff --git a/cmd/batch-expire.go b/cmd/batch-expire.go index ced17bcfd..fab592664 100644 --- a/cmd/batch-expire.go +++ b/cmd/batch-expire.go @@ -424,12 +424,12 @@ func batchObjsForDelete(ctx context.Context, r *BatchJobExpire, ri *batchJobInfo go func(toExpire []expireObjInfo) { defer wk.Give() - toExpireAll := make([]ObjectInfo, 0, len(toExpire)) + toExpireAll := make([]expireObjInfo, 0, len(toExpire)) toDel := make([]ObjectToDelete, 0, len(toExpire)) oiCache := newObjInfoCache() for _, exp := range toExpire { if exp.ExpireAll { - toExpireAll = append(toExpireAll, exp.ObjectInfo) + toExpireAll = append(toExpireAll, exp) continue } // Cache ObjectInfo value via pointers for @@ -527,7 +527,8 @@ func batchObjsForDelete(ctx context.Context, r *BatchJobExpire, ri *batchJobInfo type expireObjInfo struct { ObjectInfo - ExpireAll bool + ExpireAll bool + DeleteMarkerCount int64 } // Start the batch expiration job, resumes if there was a pending job via "job.ID" @@ -624,80 +625,115 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo matchedFilter BatchJobExpireFilter versionsCount int toDel []expireObjInfo + failed bool + done bool ) - failed := false - for result := range results { - if result.Err != nil { - failed = true - batchLogIf(ctx, result.Err) - continue + deleteMarkerCountMap := map[string]int64{} + pushToExpire := func() { + // set preObject deleteMarkerCount + if len(toDel) > 0 { + lastDelIndex := len(toDel) - 1 + lastDel := toDel[lastDelIndex] + if lastDel.ExpireAll { + toDel[lastDelIndex].DeleteMarkerCount = deleteMarkerCountMap[lastDel.Name] + // delete the key + delete(deleteMarkerCountMap, lastDel.Name) + } } - - // Apply filter to find the matching rule to apply expiry - // actions accordingly. - // nolint:gocritic - if result.Item.IsLatest { - // send down filtered entries to be deleted using - // DeleteObjects method - if len(toDel) > 10 { // batch up to 10 objects/versions to be expired simultaneously. - xfer := make([]expireObjInfo, len(toDel)) - copy(xfer, toDel) - - var done bool - select { - case <-ctx.Done(): - done = true - case expireCh <- xfer: - toDel = toDel[:0] // resetting toDel - } - if done { - break - } + // send down filtered entries to be deleted using + // DeleteObjects method + if len(toDel) > 10 { // batch up to 10 objects/versions to be expired simultaneously. + xfer := make([]expireObjInfo, len(toDel)) + copy(xfer, toDel) + select { + case expireCh <- xfer: + toDel = toDel[:0] // resetting toDel + case <-ctx.Done(): + done = true } - var match BatchJobExpireFilter - var found bool - for _, rule := range r.Rules { - if rule.Matches(result.Item, now) { - match = rule - found = true - break - } - } - if !found { - continue - } - - prevObj = result.Item - matchedFilter = match - versionsCount = 1 - // Include the latest version - if matchedFilter.Purge.RetainVersions == 0 { - toDel = append(toDel, expireObjInfo{ - ObjectInfo: result.Item, - ExpireAll: true, - }) - continue - } - } else if prevObj.Name == result.Item.Name { - if matchedFilter.Purge.RetainVersions == 0 { - continue // including latest version in toDel suffices, skipping other versions - } - versionsCount++ - } else { - continue } - - if versionsCount <= matchedFilter.Purge.RetainVersions { - continue // retain versions - } - toDel = append(toDel, expireObjInfo{ - ObjectInfo: result.Item, - }) } + for { + select { + case result, ok := <-results: + if !ok { + done = true + break + } + if result.Err != nil { + failed = true + batchLogIf(ctx, result.Err) + continue + } + if result.Item.DeleteMarker { + deleteMarkerCountMap[result.Item.Name]++ + } + // Apply filter to find the matching rule to apply expiry + // actions accordingly. + // nolint:gocritic + if result.Item.IsLatest { + var match BatchJobExpireFilter + var found bool + for _, rule := range r.Rules { + if rule.Matches(result.Item, now) { + match = rule + found = true + break + } + } + if !found { + continue + } + + if prevObj.Name != result.Item.Name { + // switch the object + pushToExpire() + } + + prevObj = result.Item + matchedFilter = match + versionsCount = 1 + // Include the latest version + if matchedFilter.Purge.RetainVersions == 0 { + toDel = append(toDel, expireObjInfo{ + ObjectInfo: result.Item, + ExpireAll: true, + }) + continue + } + } else if prevObj.Name == result.Item.Name { + if matchedFilter.Purge.RetainVersions == 0 { + continue // including latest version in toDel suffices, skipping other versions + } + versionsCount++ + } else { + // switch the object + pushToExpire() + // a file switched with no LatestVersion, logging it + batchLogIf(ctx, fmt.Errorf("skipping object %s, no latest version found", result.Item.Name)) + continue + } + + if versionsCount <= matchedFilter.Purge.RetainVersions { + continue // retain versions + } + toDel = append(toDel, expireObjInfo{ + ObjectInfo: result.Item, + }) + pushToExpire() + case <-ctx.Done(): + done = true + } + if done { + break + } + } + if context.Cause(ctx) != nil { xioutil.SafeClose(expireCh) return context.Cause(ctx) } + pushToExpire() // Send any remaining objects downstream if len(toDel) > 0 { select { diff --git a/cmd/batch-handlers.go b/cmd/batch-handlers.go index 59c82e50f..096c00377 100644 --- a/cmd/batch-handlers.go +++ b/cmd/batch-handlers.go @@ -881,21 +881,23 @@ func (ri *batchJobInfo) clone() *batchJobInfo { defer ri.mu.RUnlock() return &batchJobInfo{ - Version: ri.Version, - JobID: ri.JobID, - JobType: ri.JobType, - RetryAttempts: ri.RetryAttempts, - Complete: ri.Complete, - Failed: ri.Failed, - StartTime: ri.StartTime, - LastUpdate: ri.LastUpdate, - Bucket: ri.Bucket, - Object: ri.Object, - Objects: ri.Objects, - ObjectsFailed: ri.ObjectsFailed, - BytesTransferred: ri.BytesTransferred, - BytesFailed: ri.BytesFailed, - Attempts: ri.Attempts, + Version: ri.Version, + JobID: ri.JobID, + JobType: ri.JobType, + RetryAttempts: ri.RetryAttempts, + Complete: ri.Complete, + Failed: ri.Failed, + StartTime: ri.StartTime, + LastUpdate: ri.LastUpdate, + Bucket: ri.Bucket, + Object: ri.Object, + Objects: ri.Objects, + ObjectsFailed: ri.ObjectsFailed, + DeleteMarkers: ri.DeleteMarkers, + DeleteMarkersFailed: ri.DeleteMarkersFailed, + BytesTransferred: ri.BytesTransferred, + BytesFailed: ri.BytesFailed, + Attempts: ri.Attempts, } } @@ -994,11 +996,13 @@ func (ri *batchJobInfo) updateAfter(ctx context.Context, api ObjectLayer, durati // Note: to be used only with batch jobs that affect multiple versions through // a single action. e.g batch-expire has an option to expire all versions of an // object which matches the given filters. -func (ri *batchJobInfo) trackMultipleObjectVersions(info ObjectInfo, success bool) { +func (ri *batchJobInfo) trackMultipleObjectVersions(info expireObjInfo, success bool) { if success { - ri.Objects += int64(info.NumVersions) + ri.Objects += int64(info.NumVersions) - info.DeleteMarkerCount + ri.DeleteMarkers += info.DeleteMarkerCount } else { - ri.ObjectsFailed += int64(info.NumVersions) + ri.ObjectsFailed += int64(info.NumVersions) - info.DeleteMarkerCount + ri.DeleteMarkersFailed += info.DeleteMarkerCount } } @@ -2134,12 +2138,14 @@ func (ri *batchJobInfo) metric() madmin.JobMetric { switch ri.JobType { case string(madmin.BatchJobReplicate): m.Replicate = &madmin.ReplicateInfo{ - Bucket: ri.Bucket, - Object: ri.Object, - Objects: ri.Objects, - ObjectsFailed: ri.ObjectsFailed, - BytesTransferred: ri.BytesTransferred, - BytesFailed: ri.BytesFailed, + Bucket: ri.Bucket, + Object: ri.Object, + Objects: ri.Objects, + DeleteMarkers: ri.DeleteMarkers, + ObjectsFailed: ri.ObjectsFailed, + DeleteMarkersFailed: ri.DeleteMarkersFailed, + BytesTransferred: ri.BytesTransferred, + BytesFailed: ri.BytesFailed, } case string(madmin.BatchJobKeyRotate): m.KeyRotate = &madmin.KeyRotationInfo{ @@ -2150,10 +2156,12 @@ func (ri *batchJobInfo) metric() madmin.JobMetric { } case string(madmin.BatchJobExpire): m.Expired = &madmin.ExpirationInfo{ - Bucket: ri.Bucket, - Object: ri.Object, - Objects: ri.Objects, - ObjectsFailed: ri.ObjectsFailed, + Bucket: ri.Bucket, + Object: ri.Object, + Objects: ri.Objects, + DeleteMarkers: ri.DeleteMarkers, + ObjectsFailed: ri.ObjectsFailed, + DeleteMarkersFailed: ri.DeleteMarkersFailed, } } diff --git a/docs/debugging/s3-verify/go.mod b/docs/debugging/s3-verify/go.mod index 5b376d6fb..f089dc67f 100644 --- a/docs/debugging/s3-verify/go.mod +++ b/docs/debugging/s3-verify/go.mod @@ -1,7 +1,8 @@ module github.com/minio/minio/docs/debugging/s3-verify -go 1.23 -toolchain go1.24.1 +go 1.23.0 + +toolchain go1.24.2 require github.com/minio/minio-go/v7 v7.0.83