Fix DeleteObject when only free versions remain (#16289)

This commit is contained in:
Krishnan Parthasarathi 2022-12-21 16:24:07 -08:00 committed by GitHub
parent 34167c51d5
commit 2fa35def2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 95 additions and 13 deletions

View File

@ -998,6 +998,7 @@ func (i *scannerItem) applyTierObjSweep(ctx context.Context, o ObjectLayer, oi O
// Remove this free version
_, err = o.DeleteObject(ctx, oi.Bucket, oi.Name, ObjectOptions{
VersionID: oi.VersionID,
InclFreeVersions: true,
})
if err == nil {
auditLogLifecycle(ctx, oi, ILMFreeVersionDelete)

View File

@ -93,7 +93,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
if srcOpts.VersionID != "" {
metaArr, errs = readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID, true)
} else {
metaArr, errs = readAllXL(ctx, storageDisks, srcBucket, srcObject, true)
metaArr, errs = readAllXL(ctx, storageDisks, srcBucket, srcObject, true, false)
}
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount)
@ -501,7 +501,7 @@ func (er erasureObjects) deleteIfDangling(ctx context.Context, bucket, object st
return m, err
}
func readAllXL(ctx context.Context, disks []StorageAPI, bucket, object string, readData bool) ([]FileInfo, []error) {
func readAllXL(ctx context.Context, disks []StorageAPI, bucket, object string, readData, inclFreeVers bool) ([]FileInfo, []error) {
metadataArray := make([]*xlMetaV2, len(disks))
metaFileInfos := make([]FileInfo, len(metadataArray))
metadataShallowVersions := make([][]xlMetaV2ShallowVersion, len(disks))
@ -566,7 +566,7 @@ func readAllXL(ctx context.Context, disks []StorageAPI, bucket, object string, r
readQuorum := (len(disks) + 1) / 2
meta := &xlMetaV2{versions: mergeXLV2Versions(readQuorum, false, 1, metadataShallowVersions...)}
lfi, err := meta.ToFileInfo(bucket, object, "")
lfi, err := meta.ToFileInfo(bucket, object, "", inclFreeVers)
if err != nil {
for i := range errs {
if errs[i] == nil {
@ -596,7 +596,7 @@ func readAllXL(ctx context.Context, disks []StorageAPI, bucket, object string, r
// make sure to preserve this for diskmtime based healing bugfix.
diskMTime := metaFileInfos[index].DiskMTime
metaFileInfos[index], errs[index] = metadataArray[index].ToFileInfo(bucket, object, versionID)
metaFileInfos[index], errs[index] = metadataArray[index].ToFileInfo(bucket, object, versionID, inclFreeVers)
if errs[index] != nil {
continue
}
@ -620,7 +620,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
if opts.VersionID != "" {
metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, readData)
} else {
metaArr, errs = readAllXL(ctx, disks, bucket, object, readData)
metaArr, errs = readAllXL(ctx, disks, bucket, object, readData, opts.InclFreeVersions)
}
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount)
@ -1714,7 +1714,7 @@ func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object s
if opts.VersionID != "" {
metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false)
} else {
metaArr, errs = readAllXL(ctx, disks, bucket, object, false)
metaArr, errs = readAllXL(ctx, disks, bucket, object, false, false)
}
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount)
@ -1787,7 +1787,7 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
if opts.VersionID != "" {
metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false)
} else {
metaArr, errs = readAllXL(ctx, disks, bucket, object, false)
metaArr, errs = readAllXL(ctx, disks, bucket, object, false, false)
}
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, er.defaultParityCount)

View File

@ -211,7 +211,7 @@ func (e *metaCacheEntry) fileInfo(bucket string) (FileInfo, error) {
ModTime: timeSentinel1970,
}, nil
}
return e.cached.ToFileInfo(bucket, e.name, "")
return e.cached.ToFileInfo(bucket, e.name, "", false)
}
return getFileInfo(e.metadata, bucket, e.name, "", false)
}

View File

@ -96,6 +96,8 @@ type ObjectOptions struct {
// IndexCB will return any index created but the compression.
// Object must have been read at this point.
IndexCB func() []byte
InclFreeVersions bool
}
// ExpirationOptions represents object options for object expiration at objectLayer.

View File

@ -122,7 +122,7 @@ func getFileInfo(xlMetaBuf []byte, volume, path, versionID string, data bool) (F
}, nil
}
inData = xlMeta.data
fi, err = xlMeta.ToFileInfo(volume, path, versionID)
fi, err = xlMeta.ToFileInfo(volume, path, versionID, false)
}
if !data || err != nil {
return fi, err

View File

@ -1700,7 +1700,7 @@ func (x *xlMetaV2) AddLegacy(m *xlMetaV1Object) error {
// ToFileInfo converts xlMetaV2 into a common FileInfo datastructure
// for consumption across callers.
func (x xlMetaV2) ToFileInfo(volume, path, versionID string) (fi FileInfo, err error) {
func (x xlMetaV2) ToFileInfo(volume, path, versionID string, inclFreeVers bool) (fi FileInfo, err error) {
var uv uuid.UUID
if versionID != "" && versionID != nullVersionID {
uv, err = uuid.Parse(versionID)
@ -1712,12 +1712,29 @@ func (x xlMetaV2) ToFileInfo(volume, path, versionID string) (fi FileInfo, err e
var succModTime int64
isLatest := true
nonFreeVersions := len(x.versions)
var (
freeFi FileInfo
freeFound bool
)
found := false
for _, ver := range x.versions {
header := &ver.header
// skip listing free-version unless explicitly requested via versionID
if header.FreeVersion() {
nonFreeVersions--
// remember the latest free version; will return this FileInfo if no non-free version remain
var freeVersion xlMetaV2Version
if inclFreeVers && !freeFound {
// ignore unmarshalling errors, will return errFileNotFound in that case
if _, err := freeVersion.unmarshalV(x.metaV, ver.meta); err == nil {
if freeFi, err = freeVersion.ToFileInfo(volume, path); err == nil {
freeFi.IsLatest = true // when this is returned, it would be the latest free version remaining.
freeFound = true
}
}
}
if header.VersionID != uv {
continue
}
@ -1749,6 +1766,11 @@ func (x xlMetaV2) ToFileInfo(volume, path, versionID string) (fi FileInfo, err e
}
if !found {
if versionID == "" {
if inclFreeVers && nonFreeVersions == 0 {
if freeFound {
return freeFi, nil
}
}
return FileInfo{}, errFileNotFound
}

View File

@ -485,7 +485,7 @@ func BenchmarkXlMetaV2Shallow(b *testing.B) {
b.Fatal(err)
}
// List...
_, err = xl.ToFileInfo("volume", "path", ids[rng.Intn(size)])
_, err = xl.ToFileInfo("volume", "path", ids[rng.Intn(size)], false)
if err != nil {
b.Fatal(err)
}

View File

@ -18,6 +18,7 @@
package cmd
import (
"errors"
"testing"
"time"
@ -128,6 +129,11 @@ func TestFreeVersion(t *testing.T) {
report()
fatalErr(err)
// At this point the version stack must look as below,
// v3 --> free version 00000000-0000-0000-0000-0000000000f2 (from removal of null version)
// v2 --> free version 00000000-0000-0000-0000-0000000000f1 (from overwriting of null version )
// v1 --> non-free version 00000000-0000-0000-0000-000000000001
// Check number of free-versions
freeVersions, err := xl.listFreeVersions(newtierfi.Volume, newtierfi.Name)
if err != nil {
@ -137,6 +143,57 @@ func TestFreeVersion(t *testing.T) {
t.Fatalf("Expected two free versions but got %d", len(freeVersions))
}
freeVersionsTests := []struct {
vol string
name string
inclFreeVers bool
afterFn func(fi FileInfo) (string, error)
expectedFree bool
expectedErr error
}{
// ToFileInfo with 'inclFreeVers = true' should return the latest
// non-free version if one is present
{
vol: newtierfi.Volume,
name: newtierfi.Name,
inclFreeVers: true,
afterFn: xl.DeleteVersion,
expectedFree: false,
},
// ToFileInfo with 'inclFreeVers = true' must return the latest free
// version when no non-free versions are present.
{
vol: newtierfi.Volume,
name: newtierfi.Name,
inclFreeVers: true,
expectedFree: true,
},
// ToFileInfo with 'inclFreeVers = false' must return errFileNotFound
// when no non-free version exist.
{
vol: newtierfi.Volume,
name: newtierfi.Name,
inclFreeVers: false,
expectedErr: errFileNotFound,
},
}
for _, ft := range freeVersionsTests {
fi, err := xl.ToFileInfo(ft.vol, ft.name, "", ft.inclFreeVers)
if err != nil && !errors.Is(err, ft.expectedErr) {
t.Fatalf("ToFileInfo failed due to %v", err)
}
if got := fi.TierFreeVersion(); got != ft.expectedFree {
t.Fatalf("Expected free-version=%v but got free-version=%v", ft.expectedFree, got)
}
if ft.afterFn != nil {
_, err = ft.afterFn(fi)
if err != nil {
t.Fatalf("ft.afterFn failed with err %v", err)
}
}
}
// Simulate scanner removing free-version
freefi := newtierfi
for _, fvID := range fvIDs {

View File

@ -2335,7 +2335,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
}
// Replace the data of null version or any other existing version-id
ofi, err := xlMeta.ToFileInfo(dstVolume, dstPath, reqVID)
ofi, err := xlMeta.ToFileInfo(dstVolume, dstPath, reqVID, false)
if err == nil && !ofi.Deleted {
if xlMeta.SharedDataDirCountStr(reqVID, ofi.DataDir) == 0 {
// Purge the destination path as we are not preserving anything