fix: inconsistent replication delete marker timestamps (#15956)

This commit is contained in:
Klaus Post 2022-10-27 18:46:52 +02:00 committed by GitHub
parent 136d41775f
commit 0f0e154315
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 131 additions and 38 deletions

View File

@ -626,7 +626,7 @@ func isPutRetentionAllowed(bucketName, objectName string, retDays int, retDate t
conditions := getConditionValues(r, "", cred.AccessKey, cred.Claims)
conditions["object-lock-mode"] = []string{string(retMode)}
conditions["object-lock-retain-until-date"] = []string{retDate.Format(time.RFC3339)}
conditions["object-lock-retain-until-date"] = []string{retDate.UTC().Format(time.RFC3339)}
if retDays > 0 {
conditions["object-lock-remaining-retention-days"] = []string{strconv.Itoa(retDays)}
}

View File

@ -653,7 +653,7 @@ func getCopyObjMetadata(oi ObjectInfo, sc string) map[string]string {
}
meta[xhttp.MinIOSourceETag] = oi.ETag
meta[xhttp.MinIOSourceMTime] = oi.ModTime.Format(time.RFC3339Nano)
meta[xhttp.MinIOSourceMTime] = oi.ModTime.UTC().Format(time.RFC3339Nano)
meta[xhttp.AmzBucketReplicationStatus] = replication.Replica.String()
return meta
}

View File

@ -1405,7 +1405,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
ondiskTimestamp, err := time.Parse(lastTaggingTimestamp, time.RFC3339Nano)
// update tagging metadata only if replica timestamp is newer than what's on disk
if err != nil || (err == nil && ondiskTimestamp.Before(srcTimestamp)) {
srcInfo.UserDefined[ReservedMetadataPrefixLower+TaggingTimestamp] = srcTimestamp.Format(time.RFC3339Nano)
srcInfo.UserDefined[ReservedMetadataPrefixLower+TaggingTimestamp] = srcTimestamp.UTC().Format(time.RFC3339Nano)
srcInfo.UserDefined[xhttp.AmzObjectTagging] = objTags
}
}
@ -1437,7 +1437,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
if err != nil || (err == nil && ondiskTimestamp.Before(srcTimestamp)) {
srcInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockMode)] = string(retentionMode)
srcInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = retentionDate.UTC().Format(iso8601TimeFormat)
srcInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockRetentionTimestamp] = srcTimestamp.Format(time.RFC3339Nano)
srcInfo.UserDefined[ReservedMetadataPrefixLower+ObjectLockRetentionTimestamp] = srcTimestamp.UTC().Format(time.RFC3339Nano)
}
}
} else {

View File

@ -676,7 +676,7 @@ func (c *SiteReplicationSys) MakeBucketHook(ctx context.Context, bucket string,
optsMap["forceCreate"] = "true"
}
createdAt, _ := globalBucketMetadataSys.CreatedAt(bucket)
optsMap["createdAt"] = createdAt.Format(time.RFC3339Nano)
optsMap["createdAt"] = createdAt.UTC().Format(time.RFC3339Nano)
opts.CreatedAt = createdAt
// Create bucket and enable versioning on all peers.
@ -4253,7 +4253,7 @@ func (c *SiteReplicationSys) healBucket(ctx context.Context, objAPI ObjectLayer,
optsMap["versioningEnabled"] = "true"
opts.VersioningEnabled = true
opts.CreatedAt = bStatus.CreatedAt
optsMap["createdAt"] = bStatus.CreatedAt.Format(time.RFC3339Nano)
optsMap["createdAt"] = bStatus.CreatedAt.UTC().Format(time.RFC3339Nano)
if bStatus.ObjectLockConfig != nil {
config, err := base64.StdEncoding.DecodeString(*bStatus.ObjectLockConfig)

View File

@ -19,6 +19,7 @@ package cmd
import (
"fmt"
"time"
"github.com/tinylib/msgp/msgp"
)
@ -80,29 +81,45 @@ func (x *xlMetaV2VersionHeader) unmarshalV1(bts []byte) (o []byte, err error) {
// unmarshalV unmarshals with a specific metadata version.
func (j *xlMetaV2Version) unmarshalV(v uint8, bts []byte) (o []byte, err error) {
switch v {
// We accept un-set as latest version.
case 0, xlMetaVersion:
// Clear omitempty fields:
if j.ObjectV2 != nil && len(j.ObjectV2.PartIndices) > 0 {
j.ObjectV2.PartIndices = j.ObjectV2.PartIndices[:0]
}
o, err = j.UnmarshalMsg(bts)
// Clean up PartEtags on v1
if j.ObjectV2 != nil {
allEmpty := true
for _, tag := range j.ObjectV2.PartETags {
if len(tag) != 0 {
allEmpty = false
break
}
}
if allEmpty {
j.ObjectV2.PartETags = nil
}
}
return o, err
if v > xlMetaVersion {
return bts, fmt.Errorf("unknown xlMetaVersion: %d", v)
}
return bts, fmt.Errorf("unknown xlMetaVersion: %d", v)
// Clear omitempty fields:
if j.ObjectV2 != nil && len(j.ObjectV2.PartIndices) > 0 {
j.ObjectV2.PartIndices = j.ObjectV2.PartIndices[:0]
}
o, err = j.UnmarshalMsg(bts)
// Fix inconsistent x-minio-internal-replication-timestamp by converting to UTC.
// Fixed in version 2 or later
if err == nil && j.Type == DeleteType && v < 2 {
if val, ok := j.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicationTimestamp]; ok {
tm, err := time.Parse(time.RFC3339Nano, string(val))
if err == nil {
j.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicationTimestamp] = []byte(tm.UTC().Format(time.RFC3339Nano))
}
}
if val, ok := j.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicaTimestamp]; ok {
tm, err := time.Parse(time.RFC3339Nano, string(val))
if err == nil {
j.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicaTimestamp] = []byte(tm.UTC().Format(time.RFC3339Nano))
}
}
}
// Clean up PartEtags on v1
if j.ObjectV2 != nil {
allEmpty := true
for _, tag := range j.ObjectV2.PartETags {
if len(tag) != 0 {
allEmpty = false
break
}
}
if allEmpty {
j.ObjectV2.PartETags = nil
}
}
return o, err
}

View File

@ -431,7 +431,7 @@ func (j *xlMetaV2Version) ToFileInfo(volume, path string) (fi FileInfo, err erro
const (
xlHeaderVersion = 2
xlMetaVersion = 1
xlMetaVersion = 2
)
func (j xlMetaV2DeleteMarker) ToFileInfo(volume, path string) (FileInfo, error) {
@ -912,7 +912,6 @@ func (x *xlMetaV2) loadIndexed(buf xlMetaBuf, data xlMetaInlineData) error {
x.data.repair()
logger.LogIf(GlobalContext, fmt.Errorf("xlMetaV2.loadIndexed: data validation failed: %v. %d entries after repair", err, x.data.entries()))
}
return decodeVersions(buf, versions, func(i int, hdr, meta []byte) error {
ver := &x.versions[i]
_, err = ver.header.unmarshalV(headerV, hdr)
@ -920,6 +919,25 @@ func (x *xlMetaV2) loadIndexed(buf xlMetaBuf, data xlMetaInlineData) error {
return err
}
ver.meta = meta
// Fix inconsistent x-minio-internal-replication-timestamp by loading and reindexing.
if metaV < 2 && ver.header.Type == DeleteType {
// load (and convert) version.
version, err := x.getIdx(i)
if err == nil {
// Only reindex if set.
_, ok1 := version.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicationTimestamp]
_, ok2 := version.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicaTimestamp]
if ok1 || ok2 {
meta, err := version.MarshalMsg(make([]byte, 0, len(ver.meta)+10))
if err == nil {
// Override both if fine.
ver.header = version.header()
ver.meta = meta
}
}
}
}
return nil
})
}
@ -1233,11 +1251,11 @@ func (x *xlMetaV2) DeleteVersion(fi FileInfo) (string, error) {
if !fi.DeleteMarkerReplicationStatus().Empty() {
switch fi.DeleteMarkerReplicationStatus() {
case replication.Replica:
ventry.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicaStatus] = []byte(string(fi.ReplicationState.ReplicaStatus))
ventry.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicaTimestamp] = []byte(fi.ReplicationState.ReplicaTimeStamp.Format(time.RFC3339Nano))
ventry.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicaStatus] = []byte(fi.ReplicationState.ReplicaStatus)
ventry.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicaTimestamp] = []byte(fi.ReplicationState.ReplicaTimeStamp.UTC().Format(time.RFC3339Nano))
default:
ventry.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicationStatus] = []byte(fi.ReplicationState.ReplicationStatusInternal)
ventry.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicationTimestamp] = []byte(fi.ReplicationState.ReplicationTimeStamp.Format(time.RFC3339Nano))
ventry.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicationTimestamp] = []byte(fi.ReplicationState.ReplicationTimeStamp.UTC().Format(time.RFC3339Nano))
}
}
if !fi.VersionPurgeStatus().Empty() {
@ -1275,11 +1293,11 @@ func (x *xlMetaV2) DeleteVersion(fi FileInfo) (string, error) {
if !fi.DeleteMarkerReplicationStatus().Empty() {
switch fi.DeleteMarkerReplicationStatus() {
case replication.Replica:
ver.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicaStatus] = []byte(string(fi.ReplicationState.ReplicaStatus))
ver.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicaTimestamp] = []byte(fi.ReplicationState.ReplicaTimeStamp.Format(http.TimeFormat))
ver.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicaStatus] = []byte(fi.ReplicationState.ReplicaStatus)
ver.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicaTimestamp] = []byte(fi.ReplicationState.ReplicaTimeStamp.UTC().Format(http.TimeFormat))
default:
ver.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicationStatus] = []byte(fi.ReplicationState.ReplicationStatusInternal)
ver.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicationTimestamp] = []byte(fi.ReplicationState.ReplicationTimeStamp.Format(http.TimeFormat))
ver.DeleteMarker.MetaSys[ReservedMetadataPrefixLower+ReplicationTimestamp] = []byte(fi.ReplicationState.ReplicationTimeStamp.UTC().Format(http.TimeFormat))
}
}
if !fi.VersionPurgeStatus().Empty() {

File diff suppressed because one or more lines are too long