add delete-marker proactively in DeleteObject() (#13795)

single object delete was not working properly
on a bucket when versioning was suspended,
current version 'null' object was never removed.

added unit tests to cover the behavior

fixes #13783
This commit is contained in:
Harshavardhana 2021-11-30 18:30:06 -08:00 committed by GitHub
parent 906548d0ba
commit b280a37c4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 99 additions and 18 deletions

View File

@ -1365,6 +1365,11 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
fvID := mustGetUUID() fvID := mustGetUUID()
if markDelete { if markDelete {
if opts.Versioned || opts.VersionSuspended { if opts.Versioned || opts.VersionSuspended {
if !deleteMarker {
// versioning suspended means we add `null` version as
// delete marker, if its not decided already.
deleteMarker = opts.VersionSuspended && opts.VersionID == ""
}
fi := FileInfo{ fi := FileInfo{
Name: object, Name: object,
Deleted: deleteMarker, Deleted: deleteMarker,
@ -1381,10 +1386,10 @@ func (er erasureObjects) DeleteObject(ctx context.Context, bucket, object string
fi.VersionID = opts.VersionID fi.VersionID = opts.VersionID
} }
} }
// versioning suspended means we add `null` // versioning suspended means we add `null` version as
// version as delete marker // delete marker. Add delete marker, since we don't have
// Add delete marker, since we don't have any version specified explicitly. // any version specified explicitly. Or if a particular
// Or if a particular version id needs to be replicated. // version id needs to be replicated.
if err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi, opts.DeleteMarker); err != nil { if err = er.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi, opts.DeleteMarker); err != nil {
return objInfo, toObjectErr(err, bucket, object) return objInfo, toObjectErr(err, bucket, object)
} }

View File

@ -647,6 +647,10 @@ func (z *erasureServerPools) MakeBucketWithLocation(ctx context.Context, bucket
meta.ObjectLockConfigXML = enabledBucketObjectLockConfig meta.ObjectLockConfigXML = enabledBucketObjectLockConfig
} }
if opts.VersioningEnabled {
meta.VersioningConfigXML = enabledBucketVersioningConfig
}
if err := meta.Save(context.Background(), z); err != nil { if err := meta.Save(context.Background(), z); err != nil {
return toObjectErr(err, bucket) return toObjectErr(err, bucket)
} }

View File

@ -726,6 +726,85 @@ func objInfoNames(o []ObjectInfo) []string {
return res return res
} }
func TestDeleteObjectVersionMarker(t *testing.T) {
ExecObjectLayerTest(t, testDeleteObjectVersion)
}
func testDeleteObjectVersion(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
if instanceType == FSTestStr {
return
}
t, _ := t1.(*testing.T)
testBuckets := []string{
"bucket-suspended-version",
"bucket-suspended-version-id",
}
for _, bucket := range testBuckets {
err := obj.MakeBucketWithLocation(context.Background(), bucket, BucketOptions{
VersioningEnabled: true,
})
if err != nil {
t.Fatalf("%s : %s", instanceType, err)
}
meta, err := loadBucketMetadata(context.Background(), obj, bucket)
if err != nil {
t.Fatalf("%s : %s", instanceType, err)
}
meta.VersioningConfigXML = []byte(`<VersioningConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><Status>Suspended</Status></VersioningConfiguration>`)
if err := meta.Save(context.Background(), obj); err != nil {
t.Fatalf("%s : %s", instanceType, err)
}
globalBucketMetadataSys.Set(bucket, meta)
globalNotificationSys.LoadBucketMetadata(context.Background(), bucket)
}
testObjects := []struct {
parentBucket string
name string
content string
meta map[string]string
versionID string
expectDelMarker bool
}{
{testBuckets[0], "delete-file", "contentstring", nil, "", true},
{testBuckets[1], "delete-file", "contentstring", nil, "null", false},
}
for _, object := range testObjects {
md5Bytes := md5.Sum([]byte(object.content))
_, err := obj.PutObject(context.Background(), object.parentBucket, object.name,
mustGetPutObjReader(t, bytes.NewBufferString(object.content),
int64(len(object.content)), hex.EncodeToString(md5Bytes[:]), ""), ObjectOptions{
Versioned: globalBucketVersioningSys.Enabled(object.parentBucket),
VersionSuspended: globalBucketVersioningSys.Suspended(object.parentBucket),
UserDefined: object.meta,
})
if err != nil {
t.Fatalf("%s : %s", instanceType, err)
}
obj, err := obj.DeleteObject(context.Background(), object.parentBucket, object.name, ObjectOptions{
Versioned: globalBucketVersioningSys.Enabled(object.parentBucket),
VersionSuspended: globalBucketVersioningSys.Suspended(object.parentBucket),
VersionID: object.versionID,
})
if err != nil {
if object.versionID != "" {
if !isErrVersionNotFound(err) {
t.Fatalf("%s : %s", instanceType, err)
}
} else {
if !isErrObjectNotFound(err) {
t.Fatalf("%s : %s", instanceType, err)
}
}
}
if obj.DeleteMarker != object.expectDelMarker {
t.Fatalf("%s : expected deleted marker %t, found %t", instanceType, object.expectDelMarker, obj.DeleteMarker)
}
}
}
// Wrapper for calling ListObjectVersions tests for both Erasure multiple disks and single node setup. // Wrapper for calling ListObjectVersions tests for both Erasure multiple disks and single node setup.
func TestListObjectVersions(t *testing.T) { func TestListObjectVersions(t *testing.T) {
ExecObjectLayerTest(t, testListObjectVersions) ExecObjectLayerTest(t, testListObjectVersions)
@ -733,6 +812,10 @@ func TestListObjectVersions(t *testing.T) {
// Unit test for ListObjectVersions // Unit test for ListObjectVersions
func testListObjectVersions(obj ObjectLayer, instanceType string, t1 TestErrHandler) { func testListObjectVersions(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
if instanceType == FSTestStr {
return
}
t, _ := t1.(*testing.T) t, _ := t1.(*testing.T)
testBuckets := []string{ testBuckets := []string{
// This bucket is used for testing ListObject operations. // This bucket is used for testing ListObject operations.
@ -752,10 +835,6 @@ func testListObjectVersions(obj ObjectLayer, instanceType string, t1 TestErrHand
for _, bucket := range testBuckets { for _, bucket := range testBuckets {
err := obj.MakeBucketWithLocation(context.Background(), bucket, BucketOptions{VersioningEnabled: true}) err := obj.MakeBucketWithLocation(context.Background(), bucket, BucketOptions{VersioningEnabled: true})
if err != nil { if err != nil {
if _, ok := err.(NotImplemented); ok {
// Skip test for FS mode.
continue
}
t.Fatalf("%s : %s", instanceType, err.Error()) t.Fatalf("%s : %s", instanceType, err.Error())
} }
} }
@ -791,10 +870,6 @@ func testListObjectVersions(obj ObjectLayer, instanceType string, t1 TestErrHand
_, err = obj.PutObject(context.Background(), object.parentBucket, object.name, mustGetPutObjReader(t, bytes.NewBufferString(object.content), _, err = obj.PutObject(context.Background(), object.parentBucket, object.name, mustGetPutObjReader(t, bytes.NewBufferString(object.content),
int64(len(object.content)), hex.EncodeToString(md5Bytes[:]), ""), ObjectOptions{UserDefined: object.meta}) int64(len(object.content)), hex.EncodeToString(md5Bytes[:]), ""), ObjectOptions{UserDefined: object.meta})
if err != nil { if err != nil {
if _, ok := err.(BucketNotFound); ok {
// Skip test failure for FS mode.
continue
}
t.Fatalf("%s : %s", instanceType, err.Error()) t.Fatalf("%s : %s", instanceType, err.Error())
} }
@ -1313,10 +1388,6 @@ func testListObjectVersions(obj ObjectLayer, instanceType string, t1 TestErrHand
t.Run(fmt.Sprintf("%s-Test%d", instanceType, i+1), func(t *testing.T) { t.Run(fmt.Sprintf("%s-Test%d", instanceType, i+1), func(t *testing.T) {
result, err := obj.ListObjectVersions(context.Background(), testCase.bucketName, result, err := obj.ListObjectVersions(context.Background(), testCase.bucketName,
testCase.prefix, testCase.marker, "", testCase.delimiter, int(testCase.maxKeys)) testCase.prefix, testCase.marker, "", testCase.delimiter, int(testCase.maxKeys))
if _, ok := err.(NotImplemented); ok {
// Not implemented should be skipped
t.Skip()
}
if err != nil && testCase.shouldPass { if err != nil && testCase.shouldPass {
t.Errorf("%s: Expected to pass, but failed with: <ERROR> %s", instanceType, err.Error()) t.Errorf("%s: Expected to pass, but failed with: <ERROR> %s", instanceType, err.Error())
} }

View File

@ -1190,7 +1190,7 @@ func (x *xlMetaV2) DeleteVersion(fi FileInfo) (string, bool, error) {
} }
return "", len(x.versions) == 0, err return "", len(x.versions) == 0, err
case ObjectType: case ObjectType:
if updateVersion { if updateVersion && !fi.Deleted {
ver, err := x.getIdx(i) ver, err := x.getIdx(i)
if err != nil { if err != nil {
return "", false, err return "", false, err

View File

@ -954,11 +954,11 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
if err != errFileNotFound { if err != errFileNotFound {
return err return err
} }
metaDataPoolPut(buf) // Never used, return it
if fi.Deleted && forceDelMarker { if fi.Deleted && forceDelMarker {
// Create a new xl.meta with a delete marker in it // Create a new xl.meta with a delete marker in it
return s.WriteMetadata(ctx, volume, path, fi) return s.WriteMetadata(ctx, volume, path, fi)
} }
metaDataPoolPut(buf) // Never used, return it
buf, err = s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFileV1)) buf, err = s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFileV1))
if err != nil { if err != nil {
@ -1016,6 +1016,7 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
} }
} }
} }
if !lastVersion { if !lastVersion {
buf, err = xlMeta.AppendTo(metaDataPoolGet()) buf, err = xlMeta.AppendTo(metaDataPoolGet())
defer metaDataPoolPut(buf) defer metaDataPoolPut(buf)