fix: issues with handling delete markers in metacache (#11150)

Additional cases handled

- fix address situations where healing is not
  triggered on failed writes and deletes.

- consider object exists during listing when
  metadata can be successfully decoded.
This commit is contained in:
Harshavardhana
2020-12-22 09:16:43 -08:00
committed by GitHub
parent 274bbad5cb
commit 35fafb837b
6 changed files with 41 additions and 48 deletions

View File

@@ -727,20 +727,20 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
// Whether a disk was initially or becomes offline
// during this upload, send it to the MRF list.
for i := 0; i < len(onlineDisks); i++ {
if onlineDisks[i] == nil || storageDisks[i] == nil {
er.addPartial(bucket, object, fi.VersionID)
break
if onlineDisks[i] != nil && onlineDisks[i].IsOnline() {
continue
}
er.addPartial(bucket, object, fi.VersionID)
break
}
for i := 0; i < len(onlineDisks); i++ {
if onlineDisks[i] == nil {
continue
if onlineDisks[i] != nil && onlineDisks[i].IsOnline() {
// Object info is the same in all disks, so we can pick
// the first meta from online disk
fi = partsMetadata[i]
break
}
// Object info is the same in all disks, so we can pick
// the first meta from online disk
fi = partsMetadata[i]
break
}
return fi.ToObjectInfo(bucket, object), nil
@@ -922,16 +922,15 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
for _, version := range versions {
// Check if there is any offline disk and add it to the MRF list
for _, disk := range storageDisks {
if disk == nil {
// ignore delete markers for quorum
if version.Deleted {
continue
}
// all other direct versionId references we should
// ensure no dangling file is left over.
er.addPartial(bucket, version.Name, version.VersionID)
break
if disk != nil && disk.IsOnline() {
// Skip attempted heal on online disks.
continue
}
// all other direct versionId references we should
// ensure no dangling file is left over.
er.addPartial(bucket, version.Name, version.VersionID)
break
}
}
@@ -1042,10 +1041,11 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
}
for _, disk := range storageDisks {
if disk == nil {
er.addPartial(bucket, object, opts.VersionID)
break
if disk != nil && disk.IsOnline() {
continue
}
er.addPartial(bucket, object, opts.VersionID)
break
}
return ObjectInfo{