Fix handling of object versions pending purge (#14555)

- GetObject() with vid should return 405
- GetObject() without vid should return 404
- ListObjects() should ignore this object if this is the "latest" version of the object
- ListObjectVersions() should list this object as "DELETE marker"
- Remove data parts before sync'ing the version pending purge
This commit is contained in:
Krishnan Parthasarathi 2022-03-16 16:59:43 -07:00 committed by GitHub
parent ff811f594b
commit 7b81967a3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 4 additions and 8 deletions

View File

@ -425,7 +425,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj
wg.Add(1) wg.Add(1)
go func(index int, tgt *TargetClient) { go func(index int, tgt *TargetClient) {
defer wg.Done() defer wg.Done()
rinfo := replicateDeleteToTarget(ctx, dobj, objectAPI, tgt) rinfo := replicateDeleteToTarget(ctx, dobj, tgt)
rinfos.Targets[index] = rinfo rinfos.Targets[index] = rinfo
}(idx, tgt) }(idx, tgt)
} }
@ -482,7 +482,7 @@ func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, obj
} }
} }
func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationInfo, objectAPI ObjectLayer, tgt *TargetClient) (rinfo replicatedTargetInfo) { func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationInfo, tgt *TargetClient) (rinfo replicatedTargetInfo) {
versionID := dobj.DeleteMarkerVersionID versionID := dobj.DeleteMarkerVersionID
if versionID == "" { if versionID == "" {
versionID = dobj.VersionID versionID = dobj.VersionID

View File

@ -493,11 +493,6 @@ func (er erasureObjects) getObjectInfo(ctx context.Context, bucket, object strin
return objInfo, toObjectErr(err, bucket, object) return objInfo, toObjectErr(err, bucket, object)
} }
objInfo = fi.ToObjectInfo(bucket, object) objInfo = fi.ToObjectInfo(bucket, object)
if opts.VersionID != "" && !fi.VersionPurgeStatus().Empty() {
// Make sure to return object info to provide extra information.
return objInfo, toObjectErr(errMethodNotAllowed, bucket, object)
}
if fi.Deleted { if fi.Deleted {
if opts.VersionID == "" || opts.DeleteMarker { if opts.VersionID == "" || opts.DeleteMarker {
return objInfo, toObjectErr(errFileNotFound, bucket, object) return objInfo, toObjectErr(errFileNotFound, bucket, object)

View File

@ -595,6 +595,7 @@ func (j xlMetaV2Object) ToFileInfo(volume, path string) (FileInfo, error) {
} }
} }
fi.ReplicationState = getInternalReplicationState(fi.Metadata) fi.ReplicationState = getInternalReplicationState(fi.Metadata)
fi.Deleted = !fi.VersionPurgeStatus().Empty()
replStatus := fi.ReplicationState.CompositeReplicationStatus() replStatus := fi.ReplicationState.CompositeReplicationStatus()
if replStatus != "" { if replStatus != "" {
fi.Metadata[xhttp.AmzBucketReplicationStatus] = string(replStatus) fi.Metadata[xhttp.AmzBucketReplicationStatus] = string(replStatus)
@ -1281,7 +1282,7 @@ func (x *xlMetaV2) DeleteVersion(fi FileInfo) (string, error) {
ver.ObjectV2.MetaSys[k] = []byte(v) ver.ObjectV2.MetaSys[k] = []byte(v)
} }
err = x.setIdx(i, *ver) err = x.setIdx(i, *ver)
return "", err return uuid.UUID(ver.ObjectV2.DataDir).String(), err
} }
} }
} }