fix: delete marker compatibility behavior for suspended bucket (#10395)

- delete-marker should be created on a suspended bucket as `null`
- delete-marker should delete any pre-existing `null` versioned
  object and create an entry `null`
This commit is contained in:
Harshavardhana 2020-09-02 00:19:03 -07:00 committed by GitHub
parent 2acb530ccd
commit 37da0c647e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 93 additions and 62 deletions

View File

@ -449,14 +449,15 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
deleteList := toNames(objectsToDelete) deleteList := toNames(objectsToDelete)
dObjects, errs := deleteObjectsFn(ctx, bucket, deleteList, ObjectOptions{ dObjects, errs := deleteObjectsFn(ctx, bucket, deleteList, ObjectOptions{
Versioned: globalBucketVersioningSys.Enabled(bucket), Versioned: globalBucketVersioningSys.Enabled(bucket),
VersionSuspended: globalBucketVersioningSys.Suspended(bucket),
}) })
deletedObjects := make([]DeletedObject, len(deleteObjects.Objects)) deletedObjects := make([]DeletedObject, len(deleteObjects.Objects))
for i := range errs { for i := range errs {
dindex := objectsToDelete[deleteList[i]] dindex := objectsToDelete[deleteList[i]]
apiErr := toAPIError(ctx, errs[i]) apiErr := toAPIError(ctx, errs[i])
if apiErr.Code == "" || apiErr.Code == "NoSuchKey" { if apiErr.Code == "" || apiErr.Code == "NoSuchKey" || apiErr.Code == "InvalidArgument" {
deletedObjects[dindex] = dObjects[i] deletedObjects[dindex] = dObjects[i]
continue continue
} }

View File

@ -19,6 +19,7 @@ package cmd
import ( import (
"bytes" "bytes"
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
@ -860,14 +861,22 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
versions := make([]FileInfo, len(objects)) versions := make([]FileInfo, len(objects))
for i := range objects { for i := range objects {
if objects[i].VersionID == "" { if objects[i].VersionID == "" {
if opts.Versioned && !HasSuffix(objects[i].ObjectName, SlashSeparator) { if opts.Versioned || opts.VersionSuspended {
versions[i] = FileInfo{ if !HasSuffix(objects[i].ObjectName, SlashSeparator) {
Name: objects[i].ObjectName, fi := FileInfo{
VersionID: mustGetUUID(), Name: objects[i].ObjectName,
ModTime: UTCNow(), ModTime: UTCNow(),
Deleted: true, // delete marker Deleted: true, // delete marker
}
if opts.Versioned {
fi.VersionID = mustGetUUID()
}
// versioning suspended means we add `null`
// version as delete marker
versions[i] = fi
continue
} }
continue
} }
} }
versions[i] = FileInfo{ versions[i] = FileInfo{
@ -907,9 +916,11 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
for i := range delObjErrs { for i := range delObjErrs {
// delObjErrs[i] is not nil when disks[i] is also not nil // delObjErrs[i] is not nil when disks[i] is also not nil
if delObjErrs[i] != nil { if delObjErrs[i] != nil {
if delObjErrs[i][objIndex] != errFileNotFound { if errors.Is(delObjErrs[i][objIndex], errFileNotFound) ||
diskErrs[i] = delObjErrs[i][objIndex] errors.Is(delObjErrs[i][objIndex], errFileVersionNotFound) {
continue
} }
diskErrs[i] = delObjErrs[i][objIndex]
} }
} }
errs[objIndex] = reduceWriteQuorumErrs(ctx, diskErrs, objectOpIgnoredErrs, writeQuorums[objIndex]) errs[objIndex] = reduceWriteQuorumErrs(ctx, diskErrs, objectOpIgnoredErrs, writeQuorums[objIndex])
@ -961,18 +972,27 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
writeQuorum := len(storageDisks)/2 + 1 writeQuorum := len(storageDisks)/2 + 1
if opts.VersionID == "" { if opts.VersionID == "" {
if opts.Versioned && !HasSuffix(object, SlashSeparator) { if opts.Versioned || opts.VersionSuspended {
fi := FileInfo{ if !HasSuffix(object, SlashSeparator) {
Name: object, fi := FileInfo{
VersionID: mustGetUUID(), Name: object,
Deleted: true, Deleted: true,
ModTime: UTCNow(), ModTime: UTCNow(),
}
if opts.Versioned {
fi.VersionID = mustGetUUID()
}
// versioning suspended means we add `null`
// version as delete marker
// Add delete marker, since we don't have any version specified explicitly.
if err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi); err != nil {
return objInfo, toObjectErr(err, bucket, object)
}
return fi.ToObjectInfo(bucket, object), nil
} }
// Add delete marker, since we don't have any version specified explicitly.
if err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi); err != nil {
return objInfo, toObjectErr(err, bucket, object)
}
return fi.ToObjectInfo(bucket, object), nil
} }
} }

View File

@ -37,6 +37,7 @@ type GetObjectInfoFn func(ctx context.Context, bucket, object string, opts Objec
// ObjectOptions represents object options for ObjectLayer object operations // ObjectOptions represents object options for ObjectLayer object operations
type ObjectOptions struct { type ObjectOptions struct {
ServerSideEncryption encrypt.ServerSide ServerSideEncryption encrypt.ServerSide
VersionSuspended bool // indicates if the bucket was previously versioned but is currently suspended.
Versioned bool // indicates if the bucket is versioned Versioned bool // indicates if the bucket is versioned
WalkVersions bool // indicates if the we are interested in walking versions WalkVersions bool // indicates if the we are interested in walking versions
VersionID string // Specifies the versionID which needs to be overwritten or read VersionID string // Specifies the versionID which needs to be overwritten or read

View File

@ -133,6 +133,7 @@ func delOpts(ctx context.Context, r *http.Request, bucket, object string) (opts
return opts, err return opts, err
} }
opts.Versioned = versioned opts.Versioned = versioned
opts.VersionSuspended = globalBucketVersioningSys.Suspended(bucket)
return opts, nil return opts, nil
} }

View File

@ -679,7 +679,8 @@ func (web *webAPIHandlers) RemoveObject(r *http.Request, args *RemoveObjectArgs,
} }
opts := ObjectOptions{ opts := ObjectOptions{
Versioned: globalBucketVersioningSys.Enabled(args.BucketName), Versioned: globalBucketVersioningSys.Enabled(args.BucketName),
VersionSuspended: globalBucketVersioningSys.Suspended(args.BucketName),
} }
var err error var err error
next: next:

View File

@ -227,29 +227,10 @@ func (z *xlMetaV2) Load(buf []byte) error {
// AddVersion adds a new version // AddVersion adds a new version
func (z *xlMetaV2) AddVersion(fi FileInfo) error { func (z *xlMetaV2) AddVersion(fi FileInfo) error {
if fi.Deleted {
uv, err := uuid.Parse(fi.VersionID)
if err != nil {
return err
}
v := xlMetaV2Version{
Type: DeleteType,
DeleteMarker: &xlMetaV2DeleteMarker{
VersionID: uv,
ModTime: fi.ModTime.UnixNano(),
},
}
if !v.Valid() {
return errors.New("internal error: invalid version entry generated")
}
z.Versions = append(z.Versions, v)
return nil
}
if fi.VersionID == "" { if fi.VersionID == "" {
// this means versioning is not yet // this means versioning is not yet
// enabled i.e all versions are basically // enabled or suspend i.e all versions
// default value i.e "null" // are basically default value i.e "null"
fi.VersionID = nullVersionID fi.VersionID = nullVersionID
} }
@ -354,11 +335,17 @@ func newXLMetaV2(fi FileInfo) (xlMetaV2, error) {
} }
func (j xlMetaV2DeleteMarker) ToFileInfo(volume, path string) (FileInfo, error) { func (j xlMetaV2DeleteMarker) ToFileInfo(volume, path string) (FileInfo, error) {
versionID := ""
var uv uuid.UUID
// check if the version is not "null"
if !bytes.Equal(j.VersionID[:], uv[:]) {
versionID = uuid.UUID(j.VersionID).String()
}
fi := FileInfo{ fi := FileInfo{
Volume: volume, Volume: volume,
Name: path, Name: path,
ModTime: time.Unix(0, j.ModTime).UTC(), ModTime: time.Unix(0, j.ModTime).UTC(),
VersionID: uuid.UUID(j.VersionID).String(), VersionID: versionID,
Deleted: true, Deleted: true,
} }
return fi, nil return fi, nil
@ -434,10 +421,26 @@ func (z *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) {
if fi.VersionID == nullVersionID { if fi.VersionID == nullVersionID {
fi.VersionID = "" fi.VersionID = ""
} }
var uv uuid.UUID var uv uuid.UUID
if fi.VersionID != "" { if fi.VersionID != "" {
uv, _ = uuid.Parse(fi.VersionID) uv, _ = uuid.Parse(fi.VersionID)
} }
var ventry xlMetaV2Version
if fi.Deleted {
ventry = xlMetaV2Version{
Type: DeleteType,
DeleteMarker: &xlMetaV2DeleteMarker{
VersionID: uv,
ModTime: fi.ModTime.UnixNano(),
},
}
if !ventry.Valid() {
return "", false, errors.New("internal error: invalid version entry generated")
}
}
for i, version := range z.Versions { for i, version := range z.Versions {
if !version.Valid() { if !version.Valid() {
return "", false, errFileCorrupt return "", false, errFileCorrupt
@ -446,11 +449,17 @@ func (z *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) {
case LegacyType: case LegacyType:
if version.ObjectV1.VersionID == fi.VersionID { if version.ObjectV1.VersionID == fi.VersionID {
z.Versions = append(z.Versions[:i], z.Versions[i+1:]...) z.Versions = append(z.Versions[:i], z.Versions[i+1:]...)
if fi.Deleted {
z.Versions = append(z.Versions, ventry)
}
return version.ObjectV1.DataDir, len(z.Versions) == 0, nil return version.ObjectV1.DataDir, len(z.Versions) == 0, nil
} }
case DeleteType: case DeleteType:
if bytes.Equal(version.DeleteMarker.VersionID[:], uv[:]) { if bytes.Equal(version.DeleteMarker.VersionID[:], uv[:]) {
z.Versions = append(z.Versions[:i], z.Versions[i+1:]...) z.Versions = append(z.Versions[:i], z.Versions[i+1:]...)
if fi.Deleted {
z.Versions = append(z.Versions, ventry)
}
return "", len(z.Versions) == 0, nil return "", len(z.Versions) == 0, nil
} }
} }
@ -478,15 +487,26 @@ func (z *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) {
if bytes.Equal(version.ObjectV2.VersionID[:], uv[:]) { if bytes.Equal(version.ObjectV2.VersionID[:], uv[:]) {
z.Versions = append(z.Versions[:i], z.Versions[i+1:]...) z.Versions = append(z.Versions[:i], z.Versions[i+1:]...)
if findDataDir(version.ObjectV2.DataDir, z.Versions) > 0 { if findDataDir(version.ObjectV2.DataDir, z.Versions) > 0 {
if fi.Deleted {
z.Versions = append(z.Versions, ventry)
}
// Found that another version references the same dataDir // Found that another version references the same dataDir
// we shouldn't remove it, and only remove the version instead // we shouldn't remove it, and only remove the version instead
return "", len(z.Versions) == 0, nil return "", len(z.Versions) == 0, nil
} }
if fi.Deleted {
z.Versions = append(z.Versions, ventry)
}
return uuid.UUID(version.ObjectV2.DataDir).String(), len(z.Versions) == 0, nil return uuid.UUID(version.ObjectV2.DataDir).String(), len(z.Versions) == 0, nil
} }
} }
} }
if fi.Deleted {
z.Versions = append(z.Versions, ventry)
return "", false, nil
}
return "", false, errFileVersionNotFound return "", false, errFileVersionNotFound
} }

View File

@ -381,17 +381,15 @@ func (s *xlStorage) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCac
var totalSize int64 var totalSize int64
for _, version := range fivs.Versions { for _, version := range fivs.Versions {
oi := version.ToObjectInfo(item.bucket, item.objectPath())
size := item.applyActions(ctx, objAPI, actionMeta{ size := item.applyActions(ctx, objAPI, actionMeta{
numVersions: len(fivs.Versions), numVersions: len(fivs.Versions),
oi: version.ToObjectInfo(item.bucket, item.objectPath()), oi: oi,
}) })
if !version.Deleted { if !version.Deleted {
totalSize += size totalSize += size
} }
} item.healReplication(ctx, objAPI, actionMeta{oi: oi})
for _, version := range fivs.Versions {
item.healReplication(ctx, objAPI, actionMeta{oi: version.ToObjectInfo(item.bucket, item.objectPath())})
} }
return totalSize, nil return totalSize, nil
}) })
@ -1131,17 +1129,6 @@ func (s *xlStorage) DeleteVersion(volume, path string, fi FileInfo) error {
return err return err
} }
if fi.Deleted {
if err = xlMeta.AddVersion(fi); err != nil {
return err
}
buf, err = xlMeta.MarshalMsg(append(xlHeader[:], xlVersionV1[:]...))
if err != nil {
return err
}
return s.WriteAll(volume, pathJoin(path, xlStorageFormatFile), bytes.NewReader(buf))
}
dataDir, lastVersion, err := xlMeta.DeleteVersion(fi) dataDir, lastVersion, err := xlMeta.DeleteVersion(fi)
if err != nil { if err != nil {
return err return err
@ -2167,7 +2154,7 @@ func (s *xlStorage) RenameData(srcVolume, srcPath, dataDir, dstVolume, dstPath s
if fi.VersionID == "" { if fi.VersionID == "" {
// return the latest "null" versionId info // return the latest "null" versionId info
ofi, err := xlMeta.ToFileInfo(dstVolume, dstPath, nullVersionID) ofi, err := xlMeta.ToFileInfo(dstVolume, dstPath, nullVersionID)
if err == nil { if err == nil && !ofi.Deleted {
// Purge the destination path as we are not preserving anything // Purge the destination path as we are not preserving anything
// versioned object was not requested. // versioned object was not requested.
oldDstDataPath = pathJoin(dstVolumeDir, dstPath, ofi.DataDir) oldDstDataPath = pathJoin(dstVolumeDir, dstPath, ofi.DataDir)