Make WalkDir return errors (#19677)

If used, 'opts.Marker` will cause many missed entries since results are returned 
unsorted, and pools are serialized.

Switch to fully concurrent listing and merging across pools to return sorted entries.
This commit is contained in:
Klaus Post 2024-05-06 13:27:52 -07:00 committed by GitHub
parent 9a9a49aa84
commit 847ee5ac45
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 169 additions and 75 deletions

View File

@ -472,17 +472,17 @@ func batchObjsForDelete(ctx context.Context, r *BatchJobExpire, ri *batchJobInfo
stopFn(toDelCopy[i], err) stopFn(toDelCopy[i], err)
batchLogIf(ctx, fmt.Errorf("Failed to expire %s/%s versionID=%s due to %v (attempts=%d)", ri.Bucket, toDelCopy[i].ObjectName, toDelCopy[i].VersionID, err, attempts)) batchLogIf(ctx, fmt.Errorf("Failed to expire %s/%s versionID=%s due to %v (attempts=%d)", ri.Bucket, toDelCopy[i].ObjectName, toDelCopy[i].VersionID, err, attempts))
failed++ failed++
if attempts == retryAttempts { // all retry attempts failed, record failure if oi, ok := oiCache.Get(toDelCopy[i]); ok {
if oi, ok := oiCache.Get(toDelCopy[i]); ok { ri.trackCurrentBucketObject(r.Bucket, *oi, false, attempts)
ri.trackCurrentBucketObject(r.Bucket, *oi, false) }
} if attempts != retryAttempts {
} else { // retry
toDel = append(toDel, toDelCopy[i]) toDel = append(toDel, toDelCopy[i])
} }
} else { } else {
stopFn(toDelCopy[i], nil) stopFn(toDelCopy[i], nil)
if oi, ok := oiCache.Get(toDelCopy[i]); ok { if oi, ok := oiCache.Get(toDelCopy[i]); ok {
ri.trackCurrentBucketObject(r.Bucket, *oi, true) ri.trackCurrentBucketObject(r.Bucket, *oi, true, attempts)
} }
} }
} }
@ -537,7 +537,7 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
results := make(chan ObjectInfo, workerSize) results := make(chan itemOrErr[ObjectInfo], workerSize)
if err := api.Walk(ctx, r.Bucket, r.Prefix, results, WalkOptions{ if err := api.Walk(ctx, r.Bucket, r.Prefix, results, WalkOptions{
Marker: lastObject, Marker: lastObject,
LatestOnly: false, // we need to visit all versions of the object to implement purge: retainVersions LatestOnly: false, // we need to visit all versions of the object to implement purge: retainVersions
@ -584,11 +584,18 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo
versionsCount int versionsCount int
toDel []expireObjInfo toDel []expireObjInfo
) )
failed := true
for result := range results { for result := range results {
if result.Err != nil {
failed = true
batchLogIf(ctx, result.Err)
continue
}
// Apply filter to find the matching rule to apply expiry // Apply filter to find the matching rule to apply expiry
// actions accordingly. // actions accordingly.
// nolint:gocritic // nolint:gocritic
if result.IsLatest { if result.Item.IsLatest {
// send down filtered entries to be deleted using // send down filtered entries to be deleted using
// DeleteObjects method // DeleteObjects method
if len(toDel) > 10 { // batch up to 10 objects/versions to be expired simultaneously. if len(toDel) > 10 { // batch up to 10 objects/versions to be expired simultaneously.
@ -609,7 +616,7 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo
var match BatchJobExpireFilter var match BatchJobExpireFilter
var found bool var found bool
for _, rule := range r.Rules { for _, rule := range r.Rules {
if rule.Matches(result, now) { if rule.Matches(result.Item, now) {
match = rule match = rule
found = true found = true
break break
@ -619,18 +626,18 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo
continue continue
} }
prevObj = result prevObj = result.Item
matchedFilter = match matchedFilter = match
versionsCount = 1 versionsCount = 1
// Include the latest version // Include the latest version
if matchedFilter.Purge.RetainVersions == 0 { if matchedFilter.Purge.RetainVersions == 0 {
toDel = append(toDel, expireObjInfo{ toDel = append(toDel, expireObjInfo{
ObjectInfo: result, ObjectInfo: result.Item,
ExpireAll: true, ExpireAll: true,
}) })
continue continue
} }
} else if prevObj.Name == result.Name { } else if prevObj.Name == result.Item.Name {
if matchedFilter.Purge.RetainVersions == 0 { if matchedFilter.Purge.RetainVersions == 0 {
continue // including latest version in toDel suffices, skipping other versions continue // including latest version in toDel suffices, skipping other versions
} }
@ -643,7 +650,7 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo
continue // retain versions continue // retain versions
} }
toDel = append(toDel, expireObjInfo{ toDel = append(toDel, expireObjInfo{
ObjectInfo: result, ObjectInfo: result.Item,
}) })
} }
// Send any remaining objects downstream // Send any remaining objects downstream
@ -658,8 +665,8 @@ func (r *BatchJobExpire) Start(ctx context.Context, api ObjectLayer, job BatchJo
<-expireDoneCh // waits for the expire goroutine to complete <-expireDoneCh // waits for the expire goroutine to complete
wk.Wait() // waits for all expire workers to retire wk.Wait() // waits for all expire workers to retire
ri.Complete = ri.ObjectsFailed == 0 ri.Complete = !failed && ri.ObjectsFailed == 0
ri.Failed = ri.ObjectsFailed > 0 ri.Failed = failed || ri.ObjectsFailed > 0
globalBatchJobsMetrics.save(job.ID, ri) globalBatchJobsMetrics.save(job.ID, ri)
// Close the saverQuitCh - this also triggers saving in-memory state // Close the saverQuitCh - this also triggers saving in-memory state

View File

@ -447,7 +447,7 @@ func (r *BatchJobReplicateV1) StartFromSource(ctx context.Context, api ObjectLay
} else { } else {
stopFn(oi, nil) stopFn(oi, nil)
} }
ri.trackCurrentBucketObject(r.Target.Bucket, oi, success) ri.trackCurrentBucketObject(r.Target.Bucket, oi, success, attempts)
globalBatchJobsMetrics.save(job.ID, ri) globalBatchJobsMetrics.save(job.ID, ri)
// persist in-memory state to disk after every 10secs. // persist in-memory state to disk after every 10secs.
batchLogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job)) batchLogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job))
@ -690,6 +690,7 @@ type batchJobInfo struct {
StartTime time.Time `json:"startTime" msg:"st"` StartTime time.Time `json:"startTime" msg:"st"`
LastUpdate time.Time `json:"lastUpdate" msg:"lu"` LastUpdate time.Time `json:"lastUpdate" msg:"lu"`
RetryAttempts int `json:"retryAttempts" msg:"ra"` RetryAttempts int `json:"retryAttempts" msg:"ra"`
Attempts int `json:"attempts" msg:"at"`
Complete bool `json:"complete" msg:"cmp"` Complete bool `json:"complete" msg:"cmp"`
Failed bool `json:"failed" msg:"fld"` Failed bool `json:"failed" msg:"fld"`
@ -833,13 +834,15 @@ func (ri *batchJobInfo) clone() *batchJobInfo {
ObjectsFailed: ri.ObjectsFailed, ObjectsFailed: ri.ObjectsFailed,
BytesTransferred: ri.BytesTransferred, BytesTransferred: ri.BytesTransferred,
BytesFailed: ri.BytesFailed, BytesFailed: ri.BytesFailed,
Attempts: ri.Attempts,
} }
} }
func (ri *batchJobInfo) countItem(size int64, dmarker, success bool) { func (ri *batchJobInfo) countItem(size int64, dmarker, success bool, attempt int) {
if ri == nil { if ri == nil {
return return
} }
ri.Attempts++
if success { if success {
if dmarker { if dmarker {
ri.DeleteMarkers++ ri.DeleteMarkers++
@ -847,7 +850,19 @@ func (ri *batchJobInfo) countItem(size int64, dmarker, success bool) {
ri.Objects++ ri.Objects++
ri.BytesTransferred += size ri.BytesTransferred += size
} }
if attempt > 1 {
if dmarker {
ri.DeleteMarkersFailed--
} else {
ri.ObjectsFailed--
ri.BytesFailed += size
}
}
} else { } else {
if attempt > 1 {
// Only count first attempt
return
}
if dmarker { if dmarker {
ri.DeleteMarkersFailed++ ri.DeleteMarkersFailed++
} else { } else {
@ -921,7 +936,7 @@ func (ri *batchJobInfo) trackMultipleObjectVersions(bucket string, info ObjectIn
} }
} }
func (ri *batchJobInfo) trackCurrentBucketObject(bucket string, info ObjectInfo, success bool) { func (ri *batchJobInfo) trackCurrentBucketObject(bucket string, info ObjectInfo, success bool, attempt int) {
if ri == nil { if ri == nil {
return return
} }
@ -931,7 +946,7 @@ func (ri *batchJobInfo) trackCurrentBucketObject(bucket string, info ObjectInfo,
ri.Bucket = bucket ri.Bucket = bucket
ri.Object = info.Name ri.Object = info.Name
ri.countItem(info.Size, info.DeleteMarker, success) ri.countItem(info.Size, info.DeleteMarker, success, attempt)
} }
func (ri *batchJobInfo) trackCurrentBucketBatch(bucket string, batch []ObjectInfo) { func (ri *batchJobInfo) trackCurrentBucketBatch(bucket string, batch []ObjectInfo) {
@ -945,7 +960,7 @@ func (ri *batchJobInfo) trackCurrentBucketBatch(bucket string, batch []ObjectInf
ri.Bucket = bucket ri.Bucket = bucket
for i := range batch { for i := range batch {
ri.Object = batch[i].Name ri.Object = batch[i].Name
ri.countItem(batch[i].Size, batch[i].DeleteMarker, true) ri.countItem(batch[i].Size, batch[i].DeleteMarker, true, 1)
} }
} }
@ -1057,8 +1072,8 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
c.SetAppInfo("minio-"+batchJobPrefix, r.APIVersion+" "+job.ID) c.SetAppInfo("minio-"+batchJobPrefix, r.APIVersion+" "+job.ID)
var ( var (
walkCh = make(chan ObjectInfo, 100) walkCh = make(chan itemOrErr[ObjectInfo], 100)
slowCh = make(chan ObjectInfo, 100) slowCh = make(chan itemOrErr[ObjectInfo], 100)
) )
if !*r.Source.Snowball.Disable && r.Source.Type.isMinio() && r.Target.Type.isMinio() { if !*r.Source.Snowball.Disable && r.Source.Type.isMinio() && r.Target.Type.isMinio() {
@ -1084,7 +1099,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
if err := r.writeAsArchive(ctx, api, cl, batch); err != nil { if err := r.writeAsArchive(ctx, api, cl, batch); err != nil {
batchLogIf(ctx, err) batchLogIf(ctx, err)
for _, b := range batch { for _, b := range batch {
slowCh <- b slowCh <- itemOrErr[ObjectInfo]{Item: b}
} }
} else { } else {
ri.trackCurrentBucketBatch(r.Source.Bucket, batch) ri.trackCurrentBucketBatch(r.Source.Bucket, batch)
@ -1095,12 +1110,12 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
} }
} }
for obj := range walkCh { for obj := range walkCh {
if obj.DeleteMarker || !obj.VersionPurgeStatus.Empty() || obj.Size >= int64(smallerThan) { if obj.Item.DeleteMarker || !obj.Item.VersionPurgeStatus.Empty() || obj.Item.Size >= int64(smallerThan) {
slowCh <- obj slowCh <- obj
continue continue
} }
batch = append(batch, obj) batch = append(batch, obj.Item)
if len(batch) < *r.Source.Snowball.Batch { if len(batch) < *r.Source.Snowball.Batch {
continue continue
@ -1153,8 +1168,13 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
prevObj := "" prevObj := ""
skipReplicate := false skipReplicate := false
for result := range slowCh { for res := range slowCh {
result := result if res.Err != nil {
ri.Failed = true
batchLogIf(ctx, res.Err)
continue
}
result := res.Item
if result.Name != prevObj { if result.Name != prevObj {
prevObj = result.Name prevObj = result.Name
skipReplicate = result.DeleteMarker && s3Type skipReplicate = result.DeleteMarker && s3Type
@ -1183,7 +1203,7 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
} else { } else {
stopFn(result, nil) stopFn(result, nil)
} }
ri.trackCurrentBucketObject(r.Source.Bucket, result, success) ri.trackCurrentBucketObject(r.Source.Bucket, result, success, attempts)
globalBatchJobsMetrics.save(job.ID, ri) globalBatchJobsMetrics.save(job.ID, ri)
// persist in-memory state to disk after every 10secs. // persist in-memory state to disk after every 10secs.
batchLogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job)) batchLogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job))
@ -1484,7 +1504,7 @@ func (a adminAPIHandlers) ListBatchJobs(w http.ResponseWriter, r *http.Request)
jobType = string(madmin.BatchJobReplicate) jobType = string(madmin.BatchJobReplicate)
} }
resultCh := make(chan ObjectInfo) resultCh := make(chan itemOrErr[ObjectInfo])
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
@ -1496,8 +1516,12 @@ func (a adminAPIHandlers) ListBatchJobs(w http.ResponseWriter, r *http.Request)
listResult := madmin.ListBatchJobsResult{} listResult := madmin.ListBatchJobsResult{}
for result := range resultCh { for result := range resultCh {
if result.Err != nil {
writeErrorResponseJSON(ctx, w, toAPIError(ctx, result.Err), r.URL)
return
}
req := &BatchJobRequest{} req := &BatchJobRequest{}
if err := req.load(ctx, objectAPI, result.Name); err != nil { if err := req.load(ctx, objectAPI, result.Item.Name); err != nil {
if !errors.Is(err, errNoSuchJob) { if !errors.Is(err, errNoSuchJob) {
batchLogIf(ctx, err) batchLogIf(ctx, err)
} }
@ -1702,7 +1726,7 @@ func newBatchJobPool(ctx context.Context, o ObjectLayer, workers int) *BatchJobP
} }
func (j *BatchJobPool) resume() { func (j *BatchJobPool) resume() {
results := make(chan ObjectInfo, 100) results := make(chan itemOrErr[ObjectInfo], 100)
ctx, cancel := context.WithCancel(j.ctx) ctx, cancel := context.WithCancel(j.ctx)
defer cancel() defer cancel()
if err := j.objLayer.Walk(ctx, minioMetaBucket, batchJobPrefix, results, WalkOptions{}); err != nil { if err := j.objLayer.Walk(ctx, minioMetaBucket, batchJobPrefix, results, WalkOptions{}); err != nil {
@ -1710,12 +1734,16 @@ func (j *BatchJobPool) resume() {
return return
} }
for result := range results { for result := range results {
if result.Err != nil {
batchLogIf(j.ctx, result.Err)
continue
}
// ignore batch-replicate.bin and batch-rotate.bin entries // ignore batch-replicate.bin and batch-rotate.bin entries
if strings.HasSuffix(result.Name, slashSeparator) { if strings.HasSuffix(result.Item.Name, slashSeparator) {
continue continue
} }
req := &BatchJobRequest{} req := &BatchJobRequest{}
if err := req.load(ctx, j.objLayer, result.Name); err != nil { if err := req.load(ctx, j.objLayer, result.Item.Name); err != nil {
batchLogIf(ctx, err) batchLogIf(ctx, err)
continue continue
} }

View File

@ -419,6 +419,12 @@ func (z *batchJobInfo) DecodeMsg(dc *msgp.Reader) (err error) {
err = msgp.WrapError(err, "RetryAttempts") err = msgp.WrapError(err, "RetryAttempts")
return return
} }
case "at":
z.Attempts, err = dc.ReadInt()
if err != nil {
err = msgp.WrapError(err, "Attempts")
return
}
case "cmp": case "cmp":
z.Complete, err = dc.ReadBool() z.Complete, err = dc.ReadBool()
if err != nil { if err != nil {
@ -492,9 +498,9 @@ func (z *batchJobInfo) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable // EncodeMsg implements msgp.Encodable
func (z *batchJobInfo) EncodeMsg(en *msgp.Writer) (err error) { func (z *batchJobInfo) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 16 // map header, size 17
// write "v" // write "v"
err = en.Append(0xde, 0x0, 0x10, 0xa1, 0x76) err = en.Append(0xde, 0x0, 0x11, 0xa1, 0x76)
if err != nil { if err != nil {
return return
} }
@ -553,6 +559,16 @@ func (z *batchJobInfo) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "RetryAttempts") err = msgp.WrapError(err, "RetryAttempts")
return return
} }
// write "at"
err = en.Append(0xa2, 0x61, 0x74)
if err != nil {
return
}
err = en.WriteInt(z.Attempts)
if err != nil {
err = msgp.WrapError(err, "Attempts")
return
}
// write "cmp" // write "cmp"
err = en.Append(0xa3, 0x63, 0x6d, 0x70) err = en.Append(0xa3, 0x63, 0x6d, 0x70)
if err != nil { if err != nil {
@ -659,9 +675,9 @@ func (z *batchJobInfo) EncodeMsg(en *msgp.Writer) (err error) {
// MarshalMsg implements msgp.Marshaler // MarshalMsg implements msgp.Marshaler
func (z *batchJobInfo) MarshalMsg(b []byte) (o []byte, err error) { func (z *batchJobInfo) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize()) o = msgp.Require(b, z.Msgsize())
// map header, size 16 // map header, size 17
// string "v" // string "v"
o = append(o, 0xde, 0x0, 0x10, 0xa1, 0x76) o = append(o, 0xde, 0x0, 0x11, 0xa1, 0x76)
o = msgp.AppendInt(o, z.Version) o = msgp.AppendInt(o, z.Version)
// string "jid" // string "jid"
o = append(o, 0xa3, 0x6a, 0x69, 0x64) o = append(o, 0xa3, 0x6a, 0x69, 0x64)
@ -678,6 +694,9 @@ func (z *batchJobInfo) MarshalMsg(b []byte) (o []byte, err error) {
// string "ra" // string "ra"
o = append(o, 0xa2, 0x72, 0x61) o = append(o, 0xa2, 0x72, 0x61)
o = msgp.AppendInt(o, z.RetryAttempts) o = msgp.AppendInt(o, z.RetryAttempts)
// string "at"
o = append(o, 0xa2, 0x61, 0x74)
o = msgp.AppendInt(o, z.Attempts)
// string "cmp" // string "cmp"
o = append(o, 0xa3, 0x63, 0x6d, 0x70) o = append(o, 0xa3, 0x63, 0x6d, 0x70)
o = msgp.AppendBool(o, z.Complete) o = msgp.AppendBool(o, z.Complete)
@ -765,6 +784,12 @@ func (z *batchJobInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
err = msgp.WrapError(err, "RetryAttempts") err = msgp.WrapError(err, "RetryAttempts")
return return
} }
case "at":
z.Attempts, bts, err = msgp.ReadIntBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Attempts")
return
}
case "cmp": case "cmp":
z.Complete, bts, err = msgp.ReadBoolBytes(bts) z.Complete, bts, err = msgp.ReadBoolBytes(bts)
if err != nil { if err != nil {
@ -839,6 +864,6 @@ func (z *batchJobInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *batchJobInfo) Msgsize() (s int) { func (z *batchJobInfo) Msgsize() (s int) {
s = 3 + 2 + msgp.IntSize + 4 + msgp.StringPrefixSize + len(z.JobID) + 3 + msgp.StringPrefixSize + len(z.JobType) + 3 + msgp.TimeSize + 3 + msgp.TimeSize + 3 + msgp.IntSize + 4 + msgp.BoolSize + 4 + msgp.BoolSize + 5 + msgp.StringPrefixSize + len(z.Bucket) + 5 + msgp.StringPrefixSize + len(z.Object) + 3 + msgp.Int64Size + 3 + msgp.Int64Size + 4 + msgp.Int64Size + 4 + msgp.Int64Size + 3 + msgp.Int64Size + 3 + msgp.Int64Size s = 3 + 2 + msgp.IntSize + 4 + msgp.StringPrefixSize + len(z.JobID) + 3 + msgp.StringPrefixSize + len(z.JobType) + 3 + msgp.TimeSize + 3 + msgp.TimeSize + 3 + msgp.IntSize + 3 + msgp.IntSize + 4 + msgp.BoolSize + 4 + msgp.BoolSize + 5 + msgp.StringPrefixSize + len(z.Bucket) + 5 + msgp.StringPrefixSize + len(z.Object) + 3 + msgp.Int64Size + 3 + msgp.Int64Size + 4 + msgp.Int64Size + 4 + msgp.Int64Size + 3 + msgp.Int64Size + 3 + msgp.Int64Size
return return
} }

View File

@ -356,7 +356,7 @@ func (r *BatchJobKeyRotateV1) Start(ctx context.Context, api ObjectLayer, job Ba
retryAttempts := ri.RetryAttempts retryAttempts := ri.RetryAttempts
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
results := make(chan ObjectInfo, 100) results := make(chan itemOrErr[ObjectInfo], 100)
if err := api.Walk(ctx, r.Bucket, r.Prefix, results, WalkOptions{ if err := api.Walk(ctx, r.Bucket, r.Prefix, results, WalkOptions{
Marker: lastObject, Marker: lastObject,
Filter: selectObj, Filter: selectObj,
@ -365,9 +365,14 @@ func (r *BatchJobKeyRotateV1) Start(ctx context.Context, api ObjectLayer, job Ba
// Do not need to retry if we can't list objects on source. // Do not need to retry if we can't list objects on source.
return err return err
} }
failed := false
for result := range results { for res := range results {
result := result if res.Err != nil {
failed = true
batchLogIf(ctx, res.Err)
break
}
result := res.Item
sseKMS := crypto.S3KMS.IsEncrypted(result.UserDefined) sseKMS := crypto.S3KMS.IsEncrypted(result.UserDefined)
sseS3 := crypto.S3.IsEncrypted(result.UserDefined) sseS3 := crypto.S3.IsEncrypted(result.UserDefined)
if !sseKMS && !sseS3 { // neither sse-s3 nor sse-kms disallowed if !sseKMS && !sseS3 { // neither sse-s3 nor sse-kms disallowed
@ -377,7 +382,6 @@ func (r *BatchJobKeyRotateV1) Start(ctx context.Context, api ObjectLayer, job Ba
go func() { go func() {
defer wk.Give() defer wk.Give()
for attempts := 1; attempts <= retryAttempts; attempts++ { for attempts := 1; attempts <= retryAttempts; attempts++ {
attempts := attempts
stopFn := globalBatchJobsMetrics.trace(batchJobMetricKeyRotation, job.ID, attempts) stopFn := globalBatchJobsMetrics.trace(batchJobMetricKeyRotation, job.ID, attempts)
success := true success := true
if err := r.KeyRotate(ctx, api, result); err != nil { if err := r.KeyRotate(ctx, api, result); err != nil {
@ -387,8 +391,7 @@ func (r *BatchJobKeyRotateV1) Start(ctx context.Context, api ObjectLayer, job Ba
} else { } else {
stopFn(result, nil) stopFn(result, nil)
} }
ri.trackCurrentBucketObject(r.Bucket, result, success) ri.trackCurrentBucketObject(r.Bucket, result, success, attempts)
ri.RetryAttempts = attempts
globalBatchJobsMetrics.save(job.ID, ri) globalBatchJobsMetrics.save(job.ID, ri)
// persist in-memory state to disk after every 10secs. // persist in-memory state to disk after every 10secs.
batchLogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job)) batchLogIf(ctx, ri.updateAfter(ctx, api, 10*time.Second, job))
@ -407,8 +410,8 @@ func (r *BatchJobKeyRotateV1) Start(ctx context.Context, api ObjectLayer, job Ba
} }
wk.Wait() wk.Wait()
ri.Complete = ri.ObjectsFailed == 0 ri.Complete = !failed && ri.ObjectsFailed == 0
ri.Failed = ri.ObjectsFailed > 0 ri.Failed = failed || ri.ObjectsFailed > 0
globalBatchJobsMetrics.save(job.ID, ri) globalBatchJobsMetrics.save(job.ID, ri)
// persist in-memory state to disk. // persist in-memory state to disk.
batchLogIf(ctx, ri.updateAfter(ctx, api, 0, job)) batchLogIf(ctx, ri.updateAfter(ctx, api, 0, job))

View File

@ -2732,7 +2732,7 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object
s.workerCh <- struct{}{} s.workerCh <- struct{}{}
}() }()
// Allocate new results channel to receive ObjectInfo. // Allocate new results channel to receive ObjectInfo.
objInfoCh := make(chan ObjectInfo) objInfoCh := make(chan itemOrErr[ObjectInfo])
cfg, err := getReplicationConfig(ctx, opts.bucket) cfg, err := getReplicationConfig(ctx, opts.bucket)
if err != nil { if err != nil {
replLogIf(ctx, fmt.Errorf("replication resync of %s for arn %s failed with %w", opts.bucket, opts.arn, err)) replLogIf(ctx, fmt.Errorf("replication resync of %s for arn %s failed with %w", opts.bucket, opts.arn, err))
@ -2867,7 +2867,12 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object
} }
}(ctx, i) }(ctx, i)
} }
for obj := range objInfoCh { for res := range objInfoCh {
if res.Err != nil {
resyncStatus = ResyncFailed
replLogIf(ctx, res.Err)
return
}
select { select {
case <-s.resyncCancelCh: case <-s.resyncCancelCh:
resyncStatus = ResyncCanceled resyncStatus = ResyncCanceled
@ -2876,11 +2881,11 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object
return return
default: default:
} }
if heal && lastCheckpoint != "" && lastCheckpoint != obj.Name { if heal && lastCheckpoint != "" && lastCheckpoint != res.Item.Name {
continue continue
} }
lastCheckpoint = "" lastCheckpoint = ""
roi := getHealReplicateObjectInfo(obj, rcfg) roi := getHealReplicateObjectInfo(res.Item, rcfg)
if !roi.ExistingObjResync.mustResync() { if !roi.ExistingObjResync.mustResync() {
continue continue
} }
@ -3140,7 +3145,7 @@ func getReplicationDiff(ctx context.Context, objAPI ObjectLayer, bucket string,
return nil, err return nil, err
} }
objInfoCh := make(chan ObjectInfo, 10) objInfoCh := make(chan itemOrErr[ObjectInfo], 10)
if err := objAPI.Walk(ctx, bucket, opts.Prefix, objInfoCh, WalkOptions{}); err != nil { if err := objAPI.Walk(ctx, bucket, opts.Prefix, objInfoCh, WalkOptions{}); err != nil {
replLogIf(ctx, err) replLogIf(ctx, err)
return nil, err return nil, err
@ -3152,11 +3157,17 @@ func getReplicationDiff(ctx context.Context, objAPI ObjectLayer, bucket string,
diffCh := make(chan madmin.DiffInfo, 4000) diffCh := make(chan madmin.DiffInfo, 4000)
go func() { go func() {
defer xioutil.SafeClose(diffCh) defer xioutil.SafeClose(diffCh)
for obj := range objInfoCh { for res := range objInfoCh {
if res.Err != nil {
diffCh <- madmin.DiffInfo{Err: res.Err}
return
}
if contextCanceled(ctx) { if contextCanceled(ctx) {
// Just consume input... // Just consume input...
continue continue
} }
obj := res.Item
// Ignore object prefixes which are excluded // Ignore object prefixes which are excluded
// from versioning via the MinIO bucket versioning extension. // from versioning via the MinIO bucket versioning extension.
if globalBucketVersioningSys.PrefixSuspended(bucket, obj.Name) { if globalBucketVersioningSys.PrefixSuspended(bucket, obj.Name) {

View File

@ -2045,13 +2045,13 @@ func (z *erasureServerPools) HealBucket(ctx context.Context, bucket string, opts
// Walk a bucket, optionally prefix recursively, until we have returned // Walk a bucket, optionally prefix recursively, until we have returned
// all the contents of the provided bucket+prefix. // all the contents of the provided bucket+prefix.
// TODO: Note that most errors will result in a truncated listing. // TODO: Note that most errors will result in a truncated listing.
func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts WalkOptions) error { func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, results chan<- itemOrErr[ObjectInfo], opts WalkOptions) error {
if err := checkListObjsArgs(ctx, bucket, prefix, ""); err != nil { if err := checkListObjsArgs(ctx, bucket, prefix, ""); err != nil {
xioutil.SafeClose(results) xioutil.SafeClose(results)
return err return err
} }
parentCtx := ctx
ctx, cancel := context.WithCancel(ctx) ctx, cancelCause := context.WithCancelCause(ctx)
var entries []chan metaCacheEntry var entries []chan metaCacheEntry
for poolIdx, erasureSet := range z.serverPools { for poolIdx, erasureSet := range z.serverPools {
@ -2062,8 +2062,9 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
disks, infos, _ := set.getOnlineDisksWithHealingAndInfo(true) disks, infos, _ := set.getOnlineDisksWithHealingAndInfo(true)
if len(disks) == 0 { if len(disks) == 0 {
xioutil.SafeClose(results) xioutil.SafeClose(results)
cancel() err := fmt.Errorf("Walk: no online disks found in pool %d, set %d", setIdx, poolIdx)
return fmt.Errorf("Walk: no online disks found in pool %d, set %d", setIdx, poolIdx) cancelCause(err)
return err
} }
go func() { go func() {
defer xioutil.SafeClose(listOut) defer xioutil.SafeClose(listOut)
@ -2150,8 +2151,7 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
} }
if err := listPathRaw(ctx, lopts); err != nil { if err := listPathRaw(ctx, lopts); err != nil {
storageLogIf(ctx, fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts)) cancelCause(fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts))
cancel()
return return
} }
}() }()
@ -2162,13 +2162,24 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
merged := make(chan metaCacheEntry, 100) merged := make(chan metaCacheEntry, 100)
vcfg, _ := globalBucketVersioningSys.Get(bucket) vcfg, _ := globalBucketVersioningSys.Get(bucket)
go func() { go func() {
defer cancel() defer cancelCause(nil)
defer xioutil.SafeClose(results) defer xioutil.SafeClose(results)
sentErr := false
sendErr := func(err error) {
if !sentErr {
select {
case results <- itemOrErr[ObjectInfo]{Err: err}:
sentErr = true
case <-parentCtx.Done():
}
}
}
send := func(oi ObjectInfo) bool { send := func(oi ObjectInfo) bool {
select { select {
case results <- oi: case results <- itemOrErr[ObjectInfo]{Item: oi}:
return true return true
case <-ctx.Done(): case <-ctx.Done():
sendErr(context.Cause(ctx))
return false return false
} }
} }
@ -2176,6 +2187,7 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
if opts.LatestOnly { if opts.LatestOnly {
fi, err := entry.fileInfo(bucket) fi, err := entry.fileInfo(bucket)
if err != nil { if err != nil {
sendErr(err)
return return
} }
if opts.Filter != nil { if opts.Filter != nil {
@ -2193,6 +2205,7 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
} }
fivs, err := entry.fileInfoVersions(bucket) fivs, err := entry.fileInfoVersions(bucket)
if err != nil { if err != nil {
sendErr(err)
return return
} }

View File

@ -645,36 +645,37 @@ func (iamOS *IAMObjectStore) deleteGroupInfo(ctx context.Context, name string) e
return err return err
} }
// helper type for listIAMConfigItems
type itemOrErr struct {
Item string
Err error
}
// Lists objects in the minioMetaBucket at the given path prefix. All returned // Lists objects in the minioMetaBucket at the given path prefix. All returned
// items have the pathPrefix removed from their names. // items have the pathPrefix removed from their names.
func listIAMConfigItems(ctx context.Context, objAPI ObjectLayer, pathPrefix string) <-chan itemOrErr { func listIAMConfigItems(ctx context.Context, objAPI ObjectLayer, pathPrefix string) <-chan itemOrErr[string] {
ch := make(chan itemOrErr) ch := make(chan itemOrErr[string])
go func() { go func() {
defer xioutil.SafeClose(ch) defer xioutil.SafeClose(ch)
// Allocate new results channel to receive ObjectInfo. // Allocate new results channel to receive ObjectInfo.
objInfoCh := make(chan ObjectInfo) objInfoCh := make(chan itemOrErr[ObjectInfo])
if err := objAPI.Walk(ctx, minioMetaBucket, pathPrefix, objInfoCh, WalkOptions{}); err != nil { if err := objAPI.Walk(ctx, minioMetaBucket, pathPrefix, objInfoCh, WalkOptions{}); err != nil {
select { select {
case ch <- itemOrErr{Err: err}: case ch <- itemOrErr[string]{Err: err}:
case <-ctx.Done(): case <-ctx.Done():
} }
return return
} }
for obj := range objInfoCh { for obj := range objInfoCh {
item := strings.TrimPrefix(obj.Name, pathPrefix) if obj.Err != nil {
select {
case ch <- itemOrErr[string]{Err: obj.Err}:
case <-ctx.Done():
return
}
}
item := strings.TrimPrefix(obj.Item.Name, pathPrefix)
item = strings.TrimSuffix(item, SlashSeparator) item = strings.TrimSuffix(item, SlashSeparator)
select { select {
case ch <- itemOrErr{Item: item}: case ch <- itemOrErr[string]{Item: item}:
case <-ctx.Done(): case <-ctx.Done():
return return
} }

View File

@ -257,7 +257,7 @@ type ObjectLayer interface {
ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error)
ListObjectVersions(ctx context.Context, bucket, prefix, marker, versionMarker, delimiter string, maxKeys int) (result ListObjectVersionsInfo, err error) ListObjectVersions(ctx context.Context, bucket, prefix, marker, versionMarker, delimiter string, maxKeys int) (result ListObjectVersionsInfo, err error)
// Walk lists all objects including versions, delete markers. // Walk lists all objects including versions, delete markers.
Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts WalkOptions) error Walk(ctx context.Context, bucket, prefix string, results chan<- itemOrErr[ObjectInfo], opts WalkOptions) error
// Object operations. // Object operations.

View File

@ -1136,3 +1136,9 @@ func sleepContext(ctx context.Context, d time.Duration) error {
} }
return nil return nil
} }
// helper type to return either item or error.
type itemOrErr[V any] struct {
Item V
Err error
}