fix: ListObjectVersions should return ordered Version & DeleteMarker (#9959)

The S3 specification says that versions are ordered in the response of
list object versions.

mc snapshot needs this to know which version comes first especially when
two versions have the same exact last-modified field.
This commit is contained in:
Anis Elleuch 2020-07-03 17:15:44 +01:00 committed by GitHub
parent 810a4f0723
commit 21a37e3393
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 22 additions and 72 deletions

View File

@ -81,7 +81,6 @@ type ListVersionsResponse struct {
CommonPrefixes []CommonPrefix CommonPrefixes []CommonPrefix
Versions []ObjectVersion Versions []ObjectVersion
DeleteMarkers []DeletedVersion
// Encoding type used to encode object keys in the response. // Encoding type used to encode object keys in the response.
EncodingType string `xml:"EncodingType,omitempty"` EncodingType string `xml:"EncodingType,omitempty"`
@ -236,24 +235,23 @@ type Bucket struct {
// ObjectVersion container for object version metadata // ObjectVersion container for object version metadata
type ObjectVersion struct { type ObjectVersion struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Version" json:"-"`
Object Object
IsLatest bool IsLatest bool
VersionID string `xml:"VersionId"` VersionID string `xml:"VersionId"`
isDeleteMarker bool
} }
// DeletedVersion container for the delete object version metadata. // MarshalXML - marshal ObjectVersion
type DeletedVersion struct { func (o ObjectVersion) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ DeleteMarker" json:"-"` if o.isDeleteMarker {
start.Name.Local = "DeleteMarker"
} else {
start.Name.Local = "Version"
}
IsLatest bool type objectVersionWrapper ObjectVersion
Key string return e.EncodeElement(objectVersionWrapper(o), start)
LastModified string // time string of format "2006-01-02T15:04:05.000Z"
// Owner of the object.
Owner Owner
VersionID string `xml:"VersionId"`
} }
// StringMap is a map[string]string. // StringMap is a map[string]string.
@ -431,7 +429,6 @@ func generateListBucketsResponse(buckets []BucketInfo) ListBucketsResponse {
// generates an ListBucketVersions response for the said bucket with other enumerated options. // generates an ListBucketVersions response for the said bucket with other enumerated options.
func generateListVersionsResponse(bucket, prefix, marker, versionIDMarker, delimiter, encodingType string, maxKeys int, resp ListObjectVersionsInfo) ListVersionsResponse { func generateListVersionsResponse(bucket, prefix, marker, versionIDMarker, delimiter, encodingType string, maxKeys int, resp ListObjectVersionsInfo) ListVersionsResponse {
var versions []ObjectVersion var versions []ObjectVersion
var deletedVersions []DeletedVersion
var prefixes []CommonPrefix var prefixes []CommonPrefix
var owner = Owner{} var owner = Owner{}
var data = ListVersionsResponse{} var data = ListVersionsResponse{}
@ -459,23 +456,12 @@ func generateListVersionsResponse(bucket, prefix, marker, versionIDMarker, delim
content.VersionID = nullVersionID content.VersionID = nullVersionID
} }
content.IsLatest = object.IsLatest content.IsLatest = object.IsLatest
content.isDeleteMarker = object.DeleteMarker
versions = append(versions, content) versions = append(versions, content)
} }
for _, deleted := range resp.DeleteObjects {
var dv = DeletedVersion{
Key: s3EncodeName(deleted.Name, encodingType),
Owner: owner,
LastModified: deleted.ModTime.UTC().Format(iso8601TimeFormat),
VersionID: deleted.VersionID,
IsLatest: deleted.IsLatest,
}
deletedVersions = append(deletedVersions, dv)
}
data.Name = bucket data.Name = bucket
data.Versions = versions data.Versions = versions
data.DeleteMarkers = deletedVersions
data.EncodingType = encodingType data.EncodingType = encodingType
data.Prefix = s3EncodeName(prefix, encodingType) data.Prefix = s3EncodeName(prefix, encodingType)
data.KeyMarker = s3EncodeName(marker, encodingType) data.KeyMarker = s3EncodeName(marker, encodingType)

View File

@ -1534,9 +1534,6 @@ func (s *erasureSets) Walk(ctx context.Context, bucket, prefix string, results c
for _, version := range entry.Versions { for _, version := range entry.Versions {
results <- version.ToObjectInfo(bucket, version.Name) results <- version.ToObjectInfo(bucket, version.Name)
} }
for _, deleted := range entry.Deleted {
results <- deleted.ToObjectInfo(bucket, deleted.Name)
}
} }
// skip entries which do not have quorum // skip entries which do not have quorum
} }

View File

@ -1322,16 +1322,6 @@ func (z *erasureZones) listObjectVersions(ctx context.Context, bucket, prefix, m
} }
loi.Objects = append(loi.Objects, objInfo) loi.Objects = append(loi.Objects, objInfo)
} }
for _, deleted := range entry.Deleted {
loi.DeleteObjects = append(loi.DeleteObjects, DeletedObjectInfo{
Bucket: bucket,
Name: entry.Name,
VersionID: deleted.VersionID,
ModTime: deleted.ModTime,
IsLatest: deleted.IsLatest,
})
}
} }
if loi.IsTruncated { if loi.IsTruncated {
for i, zone := range z.zones { for i, zone := range z.zones {
@ -1848,9 +1838,6 @@ func (z *erasureZones) Walk(ctx context.Context, bucket, prefix string, results
for _, version := range entry.Versions { for _, version := range entry.Versions {
results <- version.ToObjectInfo(bucket, version.Name) results <- version.ToObjectInfo(bucket, version.Name)
} }
for _, deleted := range entry.Deleted {
results <- deleted.ToObjectInfo(bucket, deleted.Name)
}
} }
// skip entries which do not have quorum // skip entries which do not have quorum

View File

@ -368,9 +368,6 @@ type ListObjectVersionsInfo struct {
// List of objects info for this request. // List of objects info for this request.
Objects []ObjectInfo Objects []ObjectInfo
// List of deleted objects for this request.
DeleteObjects []DeletedObjectInfo
// List of prefixes for this request. // List of prefixes for this request.
Prefixes []string Prefixes []string
} }

View File

@ -57,7 +57,6 @@ type FileInfoVersions struct {
LatestModTime time.Time LatestModTime time.Time
Versions []FileInfo Versions []FileInfo
Deleted []FileInfo
} }
// FileInfo - represents file stat information. // FileInfo - represents file stat information.

View File

@ -26,7 +26,7 @@ func getFileInfoVersions(xlMetaBuf []byte, volume, path string) (FileInfoVersion
if err := xlMeta.Load(xlMetaBuf); err != nil { if err := xlMeta.Load(xlMetaBuf); err != nil {
return FileInfoVersions{}, err return FileInfoVersions{}, err
} }
versions, deletedVersions, latestModTime, err := xlMeta.ListVersions(volume, path) versions, latestModTime, err := xlMeta.ListVersions(volume, path)
if err != nil { if err != nil {
return FileInfoVersions{}, err return FileInfoVersions{}, err
} }
@ -34,7 +34,6 @@ func getFileInfoVersions(xlMetaBuf []byte, volume, path string) (FileInfoVersion
Volume: volume, Volume: volume,
Name: path, Name: path,
Versions: versions, Versions: versions,
Deleted: deletedVersions,
LatestModTime: latestModTime, LatestModTime: latestModTime,
}, nil }, nil
} }

View File

@ -473,12 +473,12 @@ func (z xlMetaV2) TotalSize() int64 {
// ListVersions lists current versions, and current deleted // ListVersions lists current versions, and current deleted
// versions returns error for unexpected entries. // versions returns error for unexpected entries.
func (z xlMetaV2) ListVersions(volume, path string) (versions []FileInfo, deleted []FileInfo, modTime time.Time, err error) { func (z xlMetaV2) ListVersions(volume, path string) (versions []FileInfo, modTime time.Time, err error) {
var latestModTime time.Time var latestModTime time.Time
var latestVersionID string var latestVersionID string
for _, version := range z.Versions { for _, version := range z.Versions {
if !version.Valid() { if !version.Valid() {
return nil, nil, latestModTime, errFileCorrupt return nil, latestModTime, errFileCorrupt
} }
var fi FileInfo var fi FileInfo
switch version.Type { switch version.Type {
@ -492,7 +492,7 @@ func (z xlMetaV2) ListVersions(volume, path string) (versions []FileInfo, delete
continue continue
} }
if err != nil { if err != nil {
return nil, nil, latestModTime, err return nil, latestModTime, err
} }
if fi.ModTime.After(latestModTime) { if fi.ModTime.After(latestModTime) {
latestModTime = fi.ModTime latestModTime = fi.ModTime
@ -501,24 +501,11 @@ func (z xlMetaV2) ListVersions(volume, path string) (versions []FileInfo, delete
switch version.Type { switch version.Type {
case LegacyType: case LegacyType:
fallthrough fallthrough
case ObjectType: case ObjectType, DeleteType:
versions = append(versions, fi) versions = append(versions, fi)
case DeleteType:
deleted = append(deleted, fi)
} }
} }
// Since we can never have duplicate versions the versionID
// if it matches first with deleted markers then we are sure
// that actual versions wouldn't be latest, so we can return
// early if we find the version in delete markers.
for i := range deleted {
if deleted[i].VersionID == latestVersionID {
deleted[i].IsLatest = true
return versions, deleted, latestModTime, nil
}
}
// We didn't find the version in delete markers so latest version // We didn't find the version in delete markers so latest version
// is indeed one of the actual version of the object with data. // is indeed one of the actual version of the object with data.
for i := range versions { for i := range versions {
@ -528,7 +515,8 @@ func (z xlMetaV2) ListVersions(volume, path string) (versions []FileInfo, delete
versions[i].IsLatest = true versions[i].IsLatest = true
break break
} }
return versions, deleted, latestModTime, nil
return versions, latestModTime, nil
} }
// ToFileInfo converts xlMetaV2 into a common FileInfo datastructure // ToFileInfo converts xlMetaV2 into a common FileInfo datastructure

View File

@ -409,12 +409,9 @@ func (s *xlStorage) CrawlAndGetDataUsage(ctx context.Context, cache dataUsageCac
var totalSize int64 var totalSize int64
for _, version := range fivs.Versions { for _, version := range fivs.Versions {
size := item.applyActions(ctx, objAPI, actionMeta{oi: version.ToObjectInfo(item.bucket, item.objectPath())}) size := item.applyActions(ctx, objAPI, actionMeta{oi: version.ToObjectInfo(item.bucket, item.objectPath())})
totalSize += size if !version.Deleted {
} totalSize += size
}
// Delete markers have no size, nothing to do here.
for _, deleted := range fivs.Deleted {
item.applyActions(ctx, objAPI, actionMeta{oi: deleted.ToObjectInfo(item.bucket, item.objectPath())})
} }
return totalSize, nil return totalSize, nil