fix: delete-marker replication check properly (#15923)

This commit is contained in:
Poorna
2022-10-21 14:45:06 -07:00
committed by GitHub
parent 58d776daa0
commit e4e90b53c1
7 changed files with 36 additions and 12 deletions

View File

@@ -580,8 +580,8 @@ func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationI
toi, err := tgt.StatObject(ctx, tgt.Bucket, dobj.ObjectName, miniogo.StatObjectOptions{
VersionID: versionID,
Internal: miniogo.AdvancedGetOptions{
ReplicationProxyRequest: "false",
ReplicationDeleteMarker: true,
ReplicationProxyRequest: "false",
IsReplicationReadyForDeleteMarker: true,
},
})
if isErrMethodNotAllowed(ErrorRespToObjectError(err, dobj.Bucket, dobj.ObjectName)) {

View File

@@ -51,7 +51,9 @@ type ObjectOptions struct {
MTime time.Time // Is only set in POST/PUT operations
Expires time.Time // Is only used in POST/PUT operations
DeleteMarker bool // Is only set in DELETE operations for delete marker replication
DeleteMarker bool // Is only set in DELETE operations for delete marker replication
CheckDMReplicationReady bool // Is delete marker ready to be replicated - set only during HEAD
UserDefined map[string]string // only set in case of POST/PUT operations
PartNumber int // only useful in case of GetObject/HeadObject
CheckPrecondFn CheckPreconditionFn // only set during GetObject/HeadObject/CopyObjectPart preconditional valuation

View File

@@ -159,6 +159,23 @@ func getOpts(ctx context.Context, r *http.Request, bucket, object string) (Objec
}
}
}
replReadyCheck := strings.TrimSpace(r.Header.Get(xhttp.MinIOCheckDMReplicationReady))
if replReadyCheck != "" {
switch replReadyCheck {
case "true":
opts.CheckDMReplicationReady = true
case "false":
default:
err = fmt.Errorf("Unable to parse %s, failed with %w", xhttp.MinIOCheckDMReplicationReady, fmt.Errorf("should be true or false"))
logger.LogIf(ctx, err)
return opts, InvalidArgument{
Bucket: bucket,
Object: object,
Err: err,
}
}
}
opts.Versioned = globalBucketVersioningSys.PrefixEnabled(bucket, object)
opts.VersionSuspended = globalBucketVersioningSys.PrefixSuspended(bucket, object)
return opts, nil

View File

@@ -739,14 +739,15 @@ func (api objectAPIHandlers) headObjectHandler(ctx context.Context, objectAPI Ob
w.Header()[xhttp.AmzDeleteMarker] = []string{strconv.FormatBool(objInfo.DeleteMarker)}
}
QueueReplicationHeal(ctx, bucket, objInfo)
}
// do an additional verification whether object exists when opts.DeleteMarker is set by source
// cluster as part of delete marker replication
if opts.DeleteMarker && opts.ProxyHeaderSet {
opts.VersionID = ""
goi, gerr := getObjectInfo(ctx, bucket, object, opts)
if gerr == nil || goi.VersionID != "" { // object layer returned more info because object is deleted
w.Header().Set(xhttp.MinIOTargetReplicationReady, "true")
// do an additional verification whether object exists when object is deletemarker and request
// is from replication
if opts.CheckDMReplicationReady {
topts := opts
topts.VersionID = ""
goi, gerr := getObjectInfo(ctx, bucket, object, topts)
if gerr == nil || goi.VersionID != "" { // object layer returned more info because object is deleted
w.Header().Set(xhttp.MinIOTargetReplicationReady, "true")
}
}
}
writeErrorResponseHeadersOnly(w, toAPIError(ctx, err))