Allow synchronous replication if enabled. (#11165)

Synchronous replication can be enabled by setting the --sync
flag while adding a remote replication target.

This PR also adds proxying on GET/HEAD to another node in a
active-active replication setup in the event of a 404 on the current node.
This commit is contained in:
Poorna Krishnamoorthy
2021-01-11 22:36:51 -08:00
committed by GitHub
parent 317305d5f9
commit 7824e19d20
14 changed files with 347 additions and 100 deletions

View File

@@ -405,9 +405,9 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
getObjectInfoFn = api.CacheAPI().GetObjectInfo
}
var (
hasLockEnabled, hasLifecycleConfig bool
goi ObjectInfo
gerr error
hasLockEnabled, hasLifecycleConfig, replicateSync bool
goi ObjectInfo
gerr error
)
replicateDeletes := hasReplicationRules(ctx, bucket, deleteObjects.Objects)
if rcfg, _ := globalBucketObjectLockSys.Get(bucket); rcfg.LockEnabled {
@@ -455,10 +455,11 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
object.PurgeTransitioned = goi.TransitionStatus
}
if replicateDeletes {
delMarker, replicate := checkReplicateDelete(ctx, bucket, ObjectToDelete{
delMarker, replicate, repsync := checkReplicateDelete(ctx, bucket, ObjectToDelete{
ObjectName: object.ObjectName,
VersionID: object.VersionID,
}, goi, gerr)
replicateSync = repsync
if replicate {
if object.VersionID != "" {
object.VersionPurgeStatus = Pending
@@ -549,10 +550,11 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter,
for _, dobj := range deletedObjects {
if replicateDeletes {
if dobj.DeleteMarkerReplicationStatus == string(replication.Pending) || dobj.VersionPurgeStatus == Pending {
globalReplicationState.queueReplicaDeleteTask(DeletedObjectVersionInfo{
dv := DeletedObjectVersionInfo{
DeletedObject: dobj,
Bucket: bucket,
})
}
scheduleReplicationDelete(ctx, dv, objectAPI, replicateSync)
}
}

View File

@@ -346,7 +346,7 @@ func transitionObject(ctx context.Context, objectAPI ObjectLayer, objInfo Object
}
putOpts := putTransitionOpts(oi)
if _, err = tgt.PutObject(ctx, arn.Bucket, oi.Name, gr, oi.Size, "", "", putOpts); err != nil {
if _, err = tgt.PutObject(ctx, arn.Bucket, oi.Name, gr, oi.Size, putOpts); err != nil {
gr.Close()
return err
}
@@ -421,7 +421,7 @@ func getTransitionedObjectReader(ctx context.Context, bucket, object string, rs
}
}
reader, _, _, err := tgt.GetObject(ctx, arn.Bucket, object, gopts)
reader, err := tgt.GetObject(ctx, arn.Bucket, object, gopts)
if err != nil {
return nil, err
}

View File

@@ -88,35 +88,37 @@ func validateReplicationDestination(ctx context.Context, bucket string, rCfg *re
return false, BucketRemoteTargetNotFound{Bucket: bucket}
}
func mustReplicateWeb(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string, permErr APIErrorCode) bool {
func mustReplicateWeb(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string, permErr APIErrorCode) (replicate bool, sync bool) {
if permErr != ErrNone {
return false
return
}
return mustReplicater(ctx, bucket, object, meta, replStatus)
}
// mustReplicate returns true if object meets replication criteria.
func mustReplicate(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string) bool {
// mustReplicate returns 2 booleans - true if object meets replication criteria and true if replication is to be done in
// a synchronous manner.
func mustReplicate(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string) (replicate bool, sync bool) {
if s3Err := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, "", r, iampolicy.GetReplicationConfigurationAction); s3Err != ErrNone {
return false
return
}
return mustReplicater(ctx, bucket, object, meta, replStatus)
}
// mustReplicater returns true if object meets replication criteria.
func mustReplicater(ctx context.Context, bucket, object string, meta map[string]string, replStatus string) bool {
// mustReplicater returns 2 booleans - true if object meets replication criteria and true if replication is to be done in
// a synchronous manner.
func mustReplicater(ctx context.Context, bucket, object string, meta map[string]string, replStatus string) (replicate bool, sync bool) {
if globalIsGateway {
return false
return replicate, sync
}
if rs, ok := meta[xhttp.AmzBucketReplicationStatus]; ok {
replStatus = rs
}
if replication.StatusType(replStatus) == replication.Replica {
return false
return replicate, sync
}
cfg, err := getReplicationConfig(ctx, bucket)
if err != nil {
return false
return replicate, sync
}
opts := replication.ObjectOpts{
Name: object,
@@ -126,7 +128,20 @@ func mustReplicater(ctx context.Context, bucket, object string, meta map[string]
if ok {
opts.UserTags = tagStr
}
return cfg.Replicate(opts)
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, cfg.RoleArn)
if tgt == nil || tgt.isOffline() {
return cfg.Replicate(opts), false
}
return cfg.Replicate(opts), tgt.replicateSync
}
// Standard headers that needs to be extracted from User metadata.
var standardHeaders = []string{
"content-type",
"content-encoding",
xhttp.AmzStorageClass,
xhttp.AmzObjectTagging,
xhttp.AmzBucketReplicationStatus,
}
// returns true if any of the objects being deleted qualifies for replication.
@@ -143,11 +158,22 @@ func hasReplicationRules(ctx context.Context, bucket string, objects []ObjectToD
return false
}
// isStandardHeader returns true if header is a supported header and not a custom header
func isStandardHeader(headerKey string) bool {
key := strings.ToLower(headerKey)
for _, header := range standardHeaders {
if strings.ToLower(header) == key {
return true
}
}
return false
}
// returns whether object version is a deletemarker and if object qualifies for replication
func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelete, oi ObjectInfo, gerr error) (dm, replicate bool) {
func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelete, oi ObjectInfo, gerr error) (dm, replicate, sync bool) {
rcfg, err := getReplicationConfig(ctx, bucket)
if err != nil || rcfg == nil {
return false, false
return false, false, sync
}
// when incoming delete is removal of a delete marker( a.k.a versioned delete),
// GetObjectInfo returns extra information even though it returns errFileNotFound
@@ -158,9 +184,13 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet
validReplStatus = true
}
if oi.DeleteMarker && validReplStatus {
return oi.DeleteMarker, true
return oi.DeleteMarker, true, sync
}
return oi.DeleteMarker, false
return oi.DeleteMarker, false, sync
}
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rcfg.RoleArn)
if tgt == nil || tgt.isOffline() {
return oi.DeleteMarker, false, false
}
opts := replication.ObjectOpts{
Name: dobj.ObjectName,
@@ -169,7 +199,7 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet
DeleteMarker: oi.DeleteMarker,
VersionID: dobj.VersionID,
}
return oi.DeleteMarker, rcfg.Replicate(opts)
return oi.DeleteMarker, rcfg.Replicate(opts), tgt.replicateSync
}
// replicate deletes to the designated replication target if replication configuration
@@ -296,10 +326,10 @@ func getCopyObjMetadata(oi ObjectInfo, dest replication.Destination) map[string]
func putReplicationOpts(ctx context.Context, dest replication.Destination, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions) {
meta := make(map[string]string)
for k, v := range objInfo.UserDefined {
if k == xhttp.AmzBucketReplicationStatus {
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
continue
}
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
if isStandardHeader(k) {
continue
}
meta[k] = v
@@ -413,6 +443,7 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
VersionID: objInfo.VersionID,
})
if err != nil {
logger.LogIf(ctx, err)
return
}
objInfo = gr.ObjInfo
@@ -440,14 +471,14 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
return
}
}
replicationStatus := replication.Complete
if rtype != replicateAll {
gr.Close()
// replicate metadata for object tagging/copy with metadata replacement
dstOpts := miniogo.PutObjectOptions{Internal: miniogo.AdvancedPutOptions{SourceVersionID: objInfo.VersionID}}
_, err = tgt.CopyObject(ctx, dest.Bucket, object, dest.Bucket, object, getCopyObjMetadata(objInfo, dest), dstOpts)
c := &miniogo.Core{Client: tgt.Client}
_, err = c.CopyObject(ctx, dest.Bucket, object, dest.Bucket, object, getCopyObjMetadata(objInfo, dest), dstOpts)
if err != nil {
replicationStatus = replication.Failed
}
@@ -474,7 +505,7 @@ func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLa
// r takes over closing gr.
r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, objInfo.Bucket, objInfo.Name, gr, headerSize, b, target.BandwidthLimit)
_, err = tgt.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts)
_, err = tgt.PutObject(ctx, dest.Bucket, object, r, size, putOpts)
if err != nil {
replicationStatus = replication.Failed
}
@@ -623,3 +654,140 @@ func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) {
globalReplicationState.addWorker(ctx, objectAPI)
}
}
// get Reader from replication target if active-active replication is in place and
// this node returns a 404
func proxyGetToReplicationTarget(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, proxy bool) {
tgt, oi, proxy, err := proxyHeadToRepTarget(ctx, bucket, object, opts)
if !proxy || err != nil {
return nil, false
}
fn, off, length, err := NewGetObjectReader(rs, oi, opts)
if err != nil {
return nil, false
}
gopts := miniogo.GetObjectOptions{
VersionID: opts.VersionID,
ServerSideEncryption: opts.ServerSideEncryption,
Internal: miniogo.AdvancedGetOptions{
ReplicationProxyRequest: true,
},
}
// get correct offsets for encrypted object
if off >= 0 && length >= 0 {
if err := gopts.SetRange(off, off+length-1); err != nil {
return nil, false
}
}
c := miniogo.Core{Client: tgt.Client}
obj, _, _, err := c.GetObject(ctx, bucket, object, gopts)
if err != nil {
return nil, false
}
closeReader := func() { obj.Close() }
reader, err := fn(obj, h, opts.CheckPrecondFn, closeReader)
if err != nil {
return nil, false
}
return reader, true
}
// isProxyable returns true if replication config found for this bucket
func isProxyable(ctx context.Context, bucket string) bool {
cfg, err := getReplicationConfig(ctx, bucket)
if err != nil {
return false
}
dest := cfg.GetDestination()
return dest.Bucket == bucket
}
func proxyHeadToRepTarget(ctx context.Context, bucket, object string, opts ObjectOptions) (tgt *TargetClient, oi ObjectInfo, proxy bool, err error) {
// this option is set when active-active replication is in place between site A -> B,
// and site B does not have the object yet.
if opts.ProxyRequest { // true only when site B sets MinIOSourceProxyRequest header
return nil, oi, false, nil
}
cfg, err := getReplicationConfig(ctx, bucket)
if err != nil {
return nil, oi, false, err
}
dest := cfg.GetDestination()
if dest.Bucket != bucket { // not active-active
return nil, oi, false, err
}
ssec := false
if opts.ServerSideEncryption != nil {
ssec = opts.ServerSideEncryption.Type() == encrypt.SSEC
}
ropts := replication.ObjectOpts{
Name: object,
SSEC: ssec,
}
if !cfg.Replicate(ropts) { // no matching rule for object prefix
return nil, oi, false, nil
}
tgt = globalBucketTargetSys.GetRemoteTargetClient(ctx, cfg.RoleArn)
if tgt == nil || tgt.isOffline() {
return nil, oi, false, fmt.Errorf("missing target")
}
gopts := miniogo.GetObjectOptions{
VersionID: opts.VersionID,
ServerSideEncryption: opts.ServerSideEncryption,
Internal: miniogo.AdvancedGetOptions{
ReplicationProxyRequest: true,
},
}
objInfo, err := tgt.StatObject(ctx, dest.Bucket, object, gopts)
if err != nil {
return nil, oi, false, err
}
tags, _ := tags.MapToObjectTags(objInfo.UserTags)
oi = ObjectInfo{
Bucket: bucket,
Name: object,
ModTime: objInfo.LastModified,
Size: objInfo.Size,
ETag: objInfo.ETag,
VersionID: objInfo.VersionID,
IsLatest: objInfo.IsLatest,
DeleteMarker: objInfo.IsDeleteMarker,
ContentType: objInfo.ContentType,
Expires: objInfo.Expires,
StorageClass: objInfo.StorageClass,
ReplicationStatus: replication.StatusType(objInfo.ReplicationStatus),
UserDefined: cloneMSS(objInfo.UserMetadata),
UserTags: tags.String(),
}
if ce, ok := oi.UserDefined[xhttp.ContentEncoding]; ok {
oi.ContentEncoding = ce
delete(oi.UserDefined, xhttp.ContentEncoding)
}
return tgt, oi, true, nil
}
// get object info from replication target if active-active replication is in place and
// this node returns a 404
func proxyHeadToReplicationTarget(ctx context.Context, bucket, object string, opts ObjectOptions) (oi ObjectInfo, proxy bool, err error) {
_, oi, proxy, err = proxyHeadToRepTarget(ctx, bucket, object, opts)
return oi, proxy, err
}
func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, sync bool) {
if sync {
replicateObject(ctx, objInfo, o)
} else {
globalReplicationState.queueReplicaTask(objInfo)
}
}
func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectVersionInfo, o ObjectLayer, sync bool) {
if sync {
replicateDelete(ctx, dv, o)
} else {
globalReplicationState.queueReplicaDeleteTask(dv)
}
}

View File

@@ -23,6 +23,7 @@ import (
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
minio "github.com/minio/minio-go/v7"
@@ -34,10 +35,14 @@ import (
sha256 "github.com/minio/sha256-simd"
)
const (
defaultHealthCheckDuration = 60 * time.Second
)
// BucketTargetSys represents bucket targets subsystem
type BucketTargetSys struct {
sync.RWMutex
arnRemotesMap map[string]*miniogo.Core
arnRemotesMap map[string]*TargetClient
targetsMap map[string][]madmin.BucketTarget
}
@@ -219,7 +224,7 @@ func (sys *BucketTargetSys) RemoveTarget(ctx context.Context, bucket, arnStr str
}
// GetRemoteTargetClient returns minio-go client for replication target instance
func (sys *BucketTargetSys) GetRemoteTargetClient(ctx context.Context, arn string) *miniogo.Core {
func (sys *BucketTargetSys) GetRemoteTargetClient(ctx context.Context, arn string) *TargetClient {
sys.RLock()
defer sys.RUnlock()
return sys.arnRemotesMap[arn]
@@ -266,7 +271,7 @@ func (sys *BucketTargetSys) GetRemoteLabelWithArn(ctx context.Context, bucket, a
// NewBucketTargetSys - creates new replication system.
func NewBucketTargetSys() *BucketTargetSys {
return &BucketTargetSys{
arnRemotesMap: make(map[string]*miniogo.Core),
arnRemotesMap: make(map[string]*TargetClient),
targetsMap: make(map[string][]madmin.BucketTarget),
}
}
@@ -347,20 +352,33 @@ var getRemoteTargetInstanceTransport http.RoundTripper
var getRemoteTargetInstanceTransportOnce sync.Once
// Returns a minio-go Client configured to access remote host described in replication target config.
func (sys *BucketTargetSys) getRemoteTargetClient(tcfg *madmin.BucketTarget) (*miniogo.Core, error) {
func (sys *BucketTargetSys) getRemoteTargetClient(tcfg *madmin.BucketTarget) (*TargetClient, error) {
config := tcfg.Credentials
creds := credentials.NewStaticV4(config.AccessKey, config.SecretKey, "")
getRemoteTargetInstanceTransportOnce.Do(func() {
getRemoteTargetInstanceTransport = newGatewayHTTPTransport(10 * time.Minute)
})
core, err := miniogo.NewCore(tcfg.URL().Host, &miniogo.Options{
api, err := minio.New(tcfg.Endpoint, &miniogo.Options{
Creds: creds,
Secure: tcfg.Secure,
Transport: getRemoteTargetInstanceTransport,
})
return core, err
if err != nil {
return nil, err
}
hcDuration := tcfg.HealthCheckDuration
if hcDuration < 1 { // require minimum health check duration of 1 sec.
hcDuration = defaultHealthCheckDuration
}
tc := &TargetClient{
Client: api,
healthCheckDuration: hcDuration,
bucket: tcfg.TargetBucket,
replicateSync: tcfg.ReplicationSync,
}
go tc.healthCheck()
return tc, nil
}
// getRemoteARN gets existing ARN for an endpoint or generates a new one.
@@ -424,3 +442,29 @@ func parseBucketTargetConfig(bucket string, cdata, cmetadata []byte) (*madmin.Bu
}
return &t, nil
}
// TargetClient is the struct for remote target client.
type TargetClient struct {
*miniogo.Client
up int32
healthCheckDuration time.Duration
bucket string // remote bucket target
replicateSync bool
}
func (tc *TargetClient) isOffline() bool {
return atomic.LoadInt32(&tc.up) == 0
}
func (tc *TargetClient) healthCheck() {
for {
_, err := tc.BucketExists(GlobalContext, tc.bucket)
if err != nil {
atomic.StoreInt32(&tc.up, 0)
time.Sleep(tc.healthCheckDuration)
continue
}
atomic.StoreInt32(&tc.up, 1)
time.Sleep(tc.healthCheckDuration)
}
}

View File

@@ -18,6 +18,7 @@ package cmd
import (
"context"
"errors"
"fmt"
"io"
"net/http"
@@ -159,6 +160,14 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts, true)
if err != nil {
if isProxyable(ctx, bucket) && (errors.Is(err, errFileNotFound) || errors.Is(err, errFileVersionNotFound)) {
// proxy to replication target if active-active replication is in place.
reader, proxy := proxyGetToReplicationTarget(ctx, bucket, object, rs, h, opts)
if reader == nil || !proxy {
return nil, toObjectErr(err, bucket, object)
}
return reader, nil
}
return nil, toObjectErr(err, bucket, object)
}
@@ -413,7 +422,15 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
func (er erasureObjects) getObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
fi, _, _, err := er.getObjectFileInfo(ctx, bucket, object, opts, false)
if err != nil {
// proxy HEAD to replication target if active-active replication configured on bucket
if isProxyable(ctx, bucket) && (errors.Is(err, errFileNotFound) || errors.Is(err, errFileVersionNotFound)) {
oi, proxy, err := proxyHeadToReplicationTarget(ctx, bucket, object, opts)
if proxy {
return oi, err
}
}
return objInfo, toObjectErr(err, bucket, object)
}
objInfo = fi.ToObjectInfo(bucket, object)
if objInfo.TransitionStatus == lifecycle.TransitionComplete {

View File

@@ -163,6 +163,8 @@ const (
MinIODeleteReplicationStatus = "X-Minio-Replication-Delete-Status"
// Header indicates delete-marker replication status.
MinIODeleteMarkerReplicationStatus = "X-Minio-Replication-DeleteMarker-Status"
// Header indicates if its a GET/HEAD proxy request for active-active replication
MinIOSourceProxyRequest = "x-minio-source-proxy-request"
)
// Common http query params S3 API

View File

@@ -52,6 +52,8 @@ type ObjectOptions struct {
VersionPurgeStatus VersionPurgeStatusType // Is only set in DELETE operations for delete marker version to be permanently deleted.
TransitionStatus string // status of the transition
NoLock bool // indicates to lower layers if the caller is expecting to hold locks.
ProxyRequest bool // only set for GET/HEAD in active-active replication scenario
}
// BucketOptions represents bucket options for ObjectLayer bucket operations

View File

@@ -66,6 +66,7 @@ func getDefaultOpts(header http.Header, copySource bool, metadata map[string]str
if crypto.S3.IsRequested(header) || (metadata != nil && crypto.S3.IsEncrypted(metadata)) {
opts.ServerSideEncryption = encrypt.NewSSE()
}
opts.ProxyRequest = header.Get(xhttp.MinIOSourceProxyRequest) == "true"
return
}

View File

@@ -1193,7 +1193,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
if rs := r.Header.Get(xhttp.AmzBucketReplicationStatus); rs != "" {
srcInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = rs
}
if mustReplicate(ctx, r, dstBucket, dstObject, srcInfo.UserDefined, srcInfo.ReplicationStatus.String()) {
if ok, _ := mustReplicate(ctx, r, dstBucket, dstObject, srcInfo.UserDefined, srcInfo.ReplicationStatus.String()); ok {
srcInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String()
}
// Store the preserved compression metadata.
@@ -1273,8 +1273,8 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
objInfo.ETag = getDecryptedETag(r.Header, objInfo, false)
response := generateCopyObjectResponse(objInfo.ETag, objInfo.ModTime)
encodedSuccessResponse := encodeResponse(response)
if mustReplicate(ctx, r, dstBucket, dstObject, objInfo.UserDefined, objInfo.ReplicationStatus.String()) {
globalReplicationState.queueReplicaTask(objInfo)
if replicate, sync := mustReplicate(ctx, r, dstBucket, dstObject, objInfo.UserDefined, objInfo.ReplicationStatus.String()); replicate {
scheduleReplication(ctx, objInfo, objectAPI, sync)
}
setPutObjHeaders(w, objInfo, false)
@@ -1519,7 +1519,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r))
return
}
if mustReplicate(ctx, r, bucket, object, metadata, "") {
if ok, _ := mustReplicate(ctx, r, bucket, object, metadata, ""); ok {
metadata[xhttp.AmzBucketReplicationStatus] = replication.Pending.String()
}
if r.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String() {
@@ -1587,8 +1587,8 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
objInfo.ETag = objInfo.ETag + "-1"
}
}
if mustReplicate(ctx, r, bucket, object, metadata, "") {
globalReplicationState.queueReplicaTask(objInfo)
if replicate, sync := mustReplicate(ctx, r, bucket, object, metadata, ""); replicate {
scheduleReplication(ctx, objInfo, objectAPI, sync)
}
setPutObjHeaders(w, objInfo, false)
@@ -1704,7 +1704,7 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r))
return
}
if mustReplicate(ctx, r, bucket, object, metadata, "") {
if ok, _ := mustReplicate(ctx, r, bucket, object, metadata, ""); ok {
metadata[xhttp.AmzBucketReplicationStatus] = replication.Pending.String()
}
// We need to preserve the encryption headers set in EncryptRequest,
@@ -2666,10 +2666,9 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
}
setPutObjHeaders(w, objInfo, false)
if mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, objInfo.ReplicationStatus.String()) {
globalReplicationState.queueReplicaTask(objInfo)
if replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, objInfo.ReplicationStatus.String()); replicate {
scheduleReplication(ctx, objInfo, objectAPI, sync)
}
// Write success response.
writeSuccessResponseXML(w, encodedSuccessResponse)
@@ -2747,7 +2746,7 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
VersionID: opts.VersionID,
})
}
_, replicateDel := checkReplicateDelete(ctx, bucket, ObjectToDelete{ObjectName: object, VersionID: opts.VersionID}, goi, gerr)
_, replicateDel, replicateSync := checkReplicateDelete(ctx, bucket, ObjectToDelete{ObjectName: object, VersionID: opts.VersionID}, goi, gerr)
if replicateDel {
if opts.VersionID != "" {
opts.VersionPurgeStatus = Pending
@@ -2808,7 +2807,7 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
} else {
versionID = objInfo.VersionID
}
globalReplicationState.queueReplicaDeleteTask(DeletedObjectVersionInfo{
dobj := DeletedObjectVersionInfo{
DeletedObject: DeletedObject{
ObjectName: object,
VersionID: versionID,
@@ -2819,7 +2818,8 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http.
VersionPurgeStatus: objInfo.VersionPurgeStatus,
},
Bucket: bucket,
})
}
scheduleReplicationDelete(ctx, dobj, objectAPI, replicateSync)
}
if goi.TransitionStatus == lifecycle.TransitionComplete { // clean up transitioned tier
@@ -2907,7 +2907,7 @@ func (api objectAPIHandlers) PutObjectLegalHoldHandler(w http.ResponseWriter, r
if objInfo.UserTags != "" {
objInfo.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags
}
replicate := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, "")
replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, "")
if replicate {
objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String()
}
@@ -2923,7 +2923,7 @@ func (api objectAPIHandlers) PutObjectLegalHoldHandler(w http.ResponseWriter, r
return
}
if replicate {
globalReplicationState.queueReplicaTask(objInfo)
scheduleReplication(ctx, objInfo, objectAPI, sync)
}
writeSuccessResponseHeadersOnly(w)
// Notify object event.
@@ -3080,7 +3080,7 @@ func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r
if objInfo.UserTags != "" {
objInfo.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags
}
replicate := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, "")
replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, "")
if replicate {
objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String()
}
@@ -3095,7 +3095,7 @@ func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r
return
}
if replicate {
globalReplicationState.queueReplicaTask(objInfo)
scheduleReplication(ctx, objInfo, objectAPI, sync)
}
writeSuccessNoContent(w)
@@ -3262,7 +3262,7 @@ func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *h
return
}
replicate := mustReplicate(ctx, r, bucket, object, map[string]string{xhttp.AmzObjectTagging: tags.String()}, "")
replicate, sync := mustReplicate(ctx, r, bucket, object, map[string]string{xhttp.AmzObjectTagging: tags.String()}, "")
if replicate {
opts.UserDefined = make(map[string]string)
opts.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String()
@@ -3277,7 +3277,7 @@ func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *h
if replicate {
if objInfo, err := objAPI.GetObjectInfo(ctx, bucket, object, opts); err == nil {
globalReplicationState.queueReplicaTask(objInfo)
scheduleReplication(ctx, objInfo, objAPI, sync)
}
}
@@ -3327,7 +3327,7 @@ func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
replicate := mustReplicate(ctx, r, bucket, object, map[string]string{xhttp.AmzObjectTagging: oi.UserTags}, "")
replicate, sync := mustReplicate(ctx, r, bucket, object, map[string]string{xhttp.AmzObjectTagging: oi.UserTags}, "")
if replicate {
opts.UserDefined = make(map[string]string)
opts.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String()
@@ -3343,7 +3343,7 @@ func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r
}
if replicate {
globalReplicationState.queueReplicaTask(oi)
scheduleReplication(ctx, oi, objAPI, sync)
}
writeSuccessNoContent(w)

View File

@@ -709,7 +709,10 @@ func (web *webAPIHandlers) RemoveObject(r *http.Request, args *RemoveObjectArgs,
Versioned: globalBucketVersioningSys.Enabled(args.BucketName),
VersionSuspended: globalBucketVersioningSys.Suspended(args.BucketName),
}
var err error
var (
err error
replicateSync bool
)
next:
for _, objectName := range args.Objects {
// If not a directory, remove the object.
@@ -751,7 +754,7 @@ next:
}
if hasReplicationRules(ctx, args.BucketName, []ObjectToDelete{{ObjectName: objectName}}) || hasLifecycleConfig {
goi, gerr = getObjectInfoFn(ctx, args.BucketName, objectName, opts)
if _, replicateDel = checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{ObjectName: objectName, VersionID: goi.VersionID}, goi, gerr); replicateDel {
if _, replicateDel, replicateSync = checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{ObjectName: objectName, VersionID: goi.VersionID}, goi, gerr); replicateDel {
opts.DeleteMarkerReplicationStatus = string(replication.Pending)
opts.DeleteMarker = true
}
@@ -759,7 +762,7 @@ next:
oi, err := deleteObject(ctx, objectAPI, web.CacheAPI(), args.BucketName, objectName, nil, r, opts)
if replicateDel && err == nil {
globalReplicationState.queueReplicaDeleteTask(DeletedObjectVersionInfo{
dobj := DeletedObjectVersionInfo{
DeletedObject: DeletedObject{
ObjectName: objectName,
DeleteMarkerVersionID: oi.VersionID,
@@ -769,7 +772,8 @@ next:
VersionPurgeStatus: oi.VersionPurgeStatus,
},
Bucket: args.BucketName,
})
}
scheduleReplicationDelete(ctx, dobj, objectAPI, replicateSync)
}
if goi.TransitionStatus == lifecycle.TransitionComplete && err == nil && goi.VersionID == "" {
action := lifecycle.DeleteAction
@@ -855,7 +859,7 @@ next:
}
}
}
_, replicateDel := checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{ObjectName: obj.Name, VersionID: obj.VersionID}, obj, nil)
_, replicateDel, _ := checkReplicateDelete(ctx, args.BucketName, ObjectToDelete{ObjectName: obj.Name, VersionID: obj.VersionID}, obj, nil)
// since versioned delete is not available on web browser, yet - this is a simple DeleteMarker replication
objToDel := ObjectToDelete{ObjectName: obj.Name}
if replicateDel {
@@ -904,10 +908,11 @@ next:
Host: handlers.GetSourceIP(r),
})
if dobj.DeleteMarkerReplicationStatus == string(replication.Pending) || dobj.VersionPurgeStatus == Pending {
globalReplicationState.queueReplicaDeleteTask(DeletedObjectVersionInfo{
dv := DeletedObjectVersionInfo{
DeletedObject: dobj,
Bucket: args.BucketName,
})
}
scheduleReplicationDelete(ctx, dv, objectAPI, replicateSync)
}
}
}
@@ -1228,7 +1233,7 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
}
}
mustReplicate := mustReplicateWeb(ctx, r, bucket, object, metadata, "", replPerms)
mustReplicate, sync := mustReplicateWeb(ctx, r, bucket, object, metadata, "", replPerms)
if mustReplicate {
metadata[xhttp.AmzBucketReplicationStatus] = string(replication.Pending)
}
@@ -1298,7 +1303,7 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
}
}
if mustReplicate {
globalReplicationState.queueReplicaTask(objInfo)
scheduleReplication(ctx, objInfo, objectAPI, sync)
}
// Notify object created event.