fix: avoid sending errors on missing objects on locked buckets (#10994)

make sure multi-object delete returned errors that are AWS S3 compatible
This commit is contained in:
Harshavardhana
2020-11-28 21:15:45 -08:00
committed by GitHub
parent e6fa410778
commit bdd094bc39
16 changed files with 114 additions and 61 deletions

View File

@@ -18,7 +18,6 @@ package cmd
import (
"context"
"errors"
"fmt"
"io"
"net/http"
@@ -857,19 +856,21 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
}
// Initialize list of errors.
var opErrs = make([]error, len(storageDisks))
var delObjErrs = make([][]error, len(storageDisks))
var wg sync.WaitGroup
// Remove versions in bulk for each disk
for index, disk := range storageDisks {
if disk == nil {
opErrs[index] = errDiskNotFound
continue
}
wg.Add(1)
go func(index int, disk StorageAPI) {
defer wg.Done()
if disk == nil {
delObjErrs[index] = make([]error, len(versions))
for i := range versions {
delObjErrs[index][i] = errDiskNotFound
}
return
}
delObjErrs[index] = disk.DeleteVersions(ctx, bucket, versions)
}(index, disk)
}
@@ -878,43 +879,43 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
// Reduce errors for each object
for objIndex := range objects {
if errs[objIndex] != nil {
continue
}
diskErrs := make([]error, len(storageDisks))
// Iterate over disks to fetch the error
// of deleting of the current object
for i := range delObjErrs {
// delObjErrs[i] is not nil when disks[i] is also not nil
if delObjErrs[i] != nil {
if errors.Is(delObjErrs[i][objIndex], errFileNotFound) ||
errors.Is(delObjErrs[i][objIndex], errFileVersionNotFound) {
continue
}
diskErrs[i] = delObjErrs[i][objIndex]
}
}
errs[objIndex] = reduceWriteQuorumErrs(ctx, diskErrs, objectOpIgnoredErrs, writeQuorums[objIndex])
err := reduceWriteQuorumErrs(ctx, diskErrs, objectOpIgnoredErrs, writeQuorums[objIndex])
if objects[objIndex].VersionID != "" {
errs[objIndex] = toObjectErr(err, bucket, objects[objIndex].ObjectName, objects[objIndex].VersionID)
} else {
errs[objIndex] = toObjectErr(err, bucket, objects[objIndex].ObjectName)
}
if errs[objIndex] == nil {
ObjectPathUpdated(pathJoin(bucket, objects[objIndex].ObjectName))
if versions[objIndex].Deleted {
dobjects[objIndex] = DeletedObject{
DeleteMarker: versions[objIndex].Deleted,
DeleteMarkerVersionID: versions[objIndex].VersionID,
DeleteMarkerMTime: versions[objIndex].ModTime,
DeleteMarkerReplicationStatus: versions[objIndex].DeleteMarkerReplicationStatus,
ObjectName: versions[objIndex].Name,
VersionPurgeStatus: versions[objIndex].VersionPurgeStatus,
PurgeTransitioned: objects[objIndex].PurgeTransitioned,
}
} else {
dobjects[objIndex] = DeletedObject{
ObjectName: versions[objIndex].Name,
VersionID: versions[objIndex].VersionID,
VersionPurgeStatus: versions[objIndex].VersionPurgeStatus,
DeleteMarkerReplicationStatus: versions[objIndex].DeleteMarkerReplicationStatus,
PurgeTransitioned: objects[objIndex].PurgeTransitioned,
}
}
if versions[objIndex].Deleted {
dobjects[objIndex] = DeletedObject{
DeleteMarker: versions[objIndex].Deleted,
DeleteMarkerVersionID: versions[objIndex].VersionID,
DeleteMarkerMTime: DeleteMarkerMTime{versions[objIndex].ModTime},
DeleteMarkerReplicationStatus: versions[objIndex].DeleteMarkerReplicationStatus,
ObjectName: versions[objIndex].Name,
VersionPurgeStatus: versions[objIndex].VersionPurgeStatus,
PurgeTransitioned: objects[objIndex].PurgeTransitioned,
}
} else {
dobjects[objIndex] = DeletedObject{
ObjectName: versions[objIndex].Name,
VersionID: versions[objIndex].VersionID,
VersionPurgeStatus: versions[objIndex].VersionPurgeStatus,
DeleteMarkerReplicationStatus: versions[objIndex].DeleteMarkerReplicationStatus,
PurgeTransitioned: objects[objIndex].PurgeTransitioned,
}
}
}