fix: resyncing 'null' version on pre-existing content (#15043)

PR #15041 fixed replicating 'null' version however
due to a regression from #14994 caused the target
versions for these 'null' versioned objects to have
different 'versions', this may cause confusion with
bi-directional replication and cause double replication.

This PR fixes this properly by making sure we replicate
the correct versions on the objects.
This commit is contained in:
Harshavardhana 2022-06-06 15:14:56 -07:00 committed by GitHub
parent 48e367ff7d
commit 31c4fdbf79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 20 additions and 7 deletions

View File

@ -1045,8 +1045,13 @@ func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, object
return
}
versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object)
versionSuspended := globalBucketVersioningSys.PrefixSuspended(bucket, object)
gr, err = objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{
VersionID: objInfo.VersionID,
VersionID: objInfo.VersionID,
Versioned: versioned,
VersionSuspended: versionSuspended,
})
if err != nil {
sendEvent(eventArgs{

View File

@ -1751,13 +1751,13 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
return err
}
vcfg, _ := globalBucketVersioningSys.Get(bucket)
ctx, cancel := context.WithCancel(ctx)
go func() {
defer cancel()
defer close(results)
versioned := opts.Versioned || opts.VersionSuspended
for _, erasureSet := range z.serverPools {
var wg sync.WaitGroup
for _, set := range erasureSet.sets {
@ -1785,11 +1785,14 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re
if opts.WalkAscending {
for i := len(fivs.Versions) - 1; i >= 0; i-- {
version := fivs.Versions[i]
versioned := vcfg != nil && vcfg.Versioned(version.Name)
results <- version.ToObjectInfo(bucket, version.Name, versioned)
}
return
}
for _, version := range fivs.Versions {
versioned := vcfg != nil && vcfg.Versioned(version.Name)
results <- version.ToObjectInfo(bucket, version.Name, versioned)
}
}

View File

@ -441,7 +441,6 @@ func (m *metaCacheEntriesSorted) fileInfoVersions(bucket, prefix, delimiter, aft
versions = make([]ObjectInfo, 0, m.len())
prevPrefix := ""
vcfg, _ := globalBucketVersioningSys.Get(bucket)
versioned := vcfg != nil && vcfg.Versioned(prefix)
for _, entry := range m.o {
if entry.isObject() {
@ -478,6 +477,7 @@ func (m *metaCacheEntriesSorted) fileInfoVersions(bucket, prefix, delimiter, aft
}
for _, version := range fiVersions {
versioned := vcfg != nil && vcfg.Versioned(entry.name)
versions = append(versions, version.ToObjectInfo(bucket, entry.name, versioned))
}
@ -516,7 +516,6 @@ func (m *metaCacheEntriesSorted) fileInfos(bucket, prefix, delimiter string) (ob
prevPrefix := ""
vcfg, _ := globalBucketVersioningSys.Get(bucket)
versioned := vcfg != nil && vcfg.Versioned(prefix)
for _, entry := range m.o {
if entry.isObject() {
@ -540,6 +539,7 @@ func (m *metaCacheEntriesSorted) fileInfos(bucket, prefix, delimiter string) (ob
fi, err := entry.fileInfo(bucket)
if err == nil {
versioned := vcfg != nil && vcfg.Versioned(entry.name)
objects = append(objects, fi.ToObjectInfo(bucket, entry.name, versioned))
}
continue

View File

@ -157,6 +157,7 @@ func getOpts(ctx context.Context, r *http.Request, bucket, object string) (Objec
}
}
}
opts.Versioned = globalBucketVersioningSys.PrefixEnabled(bucket, object)
opts.VersionSuspended = globalBucketVersioningSys.PrefixSuspended(bucket, object)
return opts, nil
}
@ -167,7 +168,7 @@ func delOpts(ctx context.Context, r *http.Request, bucket, object string) (opts
return opts, err
}
opts.Versioned = globalBucketVersioningSys.PrefixEnabled(bucket, object)
opts.VersionSuspended = globalBucketVersioningSys.Suspended(bucket)
opts.VersionSuspended = globalBucketVersioningSys.PrefixSuspended(bucket, object)
delMarker := strings.TrimSpace(r.Header.Get(xhttp.MinIOSourceDeleteMarker))
if delMarker != "" {
switch delMarker {

View File

@ -1041,7 +1041,11 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
srcOpts.VersionID = vid
// convert copy src encryption options for GET calls
getOpts := ObjectOptions{VersionID: srcOpts.VersionID, Versioned: srcOpts.Versioned}
getOpts := ObjectOptions{
VersionID: srcOpts.VersionID,
Versioned: srcOpts.Versioned,
VersionSuspended: srcOpts.VersionSuspended,
}
getSSE := encrypt.SSE(srcOpts.ServerSideEncryption)
if getSSE != srcOpts.ServerSideEncryption {
getOpts.ServerSideEncryption = getSSE