mirror of
https://github.com/minio/minio.git
synced 2024-12-25 22:55:54 -05:00
39f3d5493b
X-Minio-Replication-Delete-Status header shows the status of the replication of a permanent delete of a version. All GETs are disallowed and return 405 on this object version. In the case of replicating delete markers. X-Minio-Replication-DeleteMarker-Status shows the status of replication, and would similarly return 405. Additionally, this PR adds reporting of delete marker event completion and updates documentation
619 lines
19 KiB
Go
619 lines
19 KiB
Go
/*
|
|
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"runtime"
|
|
"strings"
|
|
"time"
|
|
|
|
minio "github.com/minio/minio-go/v7"
|
|
miniogo "github.com/minio/minio-go/v7"
|
|
"github.com/minio/minio-go/v7/pkg/encrypt"
|
|
"github.com/minio/minio-go/v7/pkg/tags"
|
|
"github.com/minio/minio/cmd/crypto"
|
|
xhttp "github.com/minio/minio/cmd/http"
|
|
"github.com/minio/minio/cmd/logger"
|
|
"github.com/minio/minio/pkg/bucket/bandwidth"
|
|
"github.com/minio/minio/pkg/bucket/replication"
|
|
"github.com/minio/minio/pkg/event"
|
|
iampolicy "github.com/minio/minio/pkg/iam/policy"
|
|
"github.com/minio/minio/pkg/madmin"
|
|
)
|
|
|
|
// gets replication config associated to a given bucket name.
|
|
func getReplicationConfig(ctx context.Context, bucketName string) (rc *replication.Config, err error) {
|
|
if globalIsGateway {
|
|
objAPI := newObjectLayerFn()
|
|
if objAPI == nil {
|
|
return nil, errServerNotInitialized
|
|
}
|
|
|
|
return nil, BucketReplicationConfigNotFound{Bucket: bucketName}
|
|
}
|
|
|
|
return globalBucketMetadataSys.GetReplicationConfig(ctx, bucketName)
|
|
}
|
|
|
|
// validateReplicationDestination returns error if replication destination bucket missing or not configured
|
|
// It also returns true if replication destination is same as this server.
|
|
func validateReplicationDestination(ctx context.Context, bucket string, rCfg *replication.Config) (bool, error) {
|
|
arn, err := madmin.ParseARN(rCfg.RoleArn)
|
|
if err != nil {
|
|
return false, BucketRemoteArnInvalid{}
|
|
}
|
|
if arn.Type != madmin.ReplicationService {
|
|
return false, BucketRemoteArnTypeInvalid{}
|
|
}
|
|
clnt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rCfg.RoleArn)
|
|
if clnt == nil {
|
|
return false, BucketRemoteTargetNotFound{Bucket: bucket}
|
|
}
|
|
if found, _ := clnt.BucketExists(ctx, rCfg.GetDestination().Bucket); !found {
|
|
return false, BucketRemoteDestinationNotFound{Bucket: rCfg.GetDestination().Bucket}
|
|
}
|
|
if ret, err := globalBucketObjectLockSys.Get(bucket); err == nil {
|
|
if ret.LockEnabled {
|
|
lock, _, _, _, err := clnt.GetObjectLockConfig(ctx, rCfg.GetDestination().Bucket)
|
|
if err != nil || lock != "Enabled" {
|
|
return false, BucketReplicationDestinationMissingLock{Bucket: rCfg.GetDestination().Bucket}
|
|
}
|
|
}
|
|
}
|
|
// validate replication ARN against target endpoint
|
|
c, ok := globalBucketTargetSys.arnRemotesMap[rCfg.RoleArn]
|
|
if ok {
|
|
if c.EndpointURL().String() == clnt.EndpointURL().String() {
|
|
sameTarget, _ := isLocalHost(clnt.EndpointURL().Hostname(), clnt.EndpointURL().Port(), globalMinioPort)
|
|
return sameTarget, nil
|
|
}
|
|
}
|
|
return false, BucketRemoteTargetNotFound{Bucket: bucket}
|
|
}
|
|
|
|
func mustReplicateWeb(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string, permErr APIErrorCode) bool {
|
|
if permErr != ErrNone {
|
|
return false
|
|
}
|
|
return mustReplicater(ctx, bucket, object, meta, replStatus)
|
|
}
|
|
|
|
// mustReplicate returns true if object meets replication criteria.
|
|
func mustReplicate(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string) bool {
|
|
if s3Err := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, "", r, iampolicy.GetReplicationConfigurationAction); s3Err != ErrNone {
|
|
return false
|
|
}
|
|
return mustReplicater(ctx, bucket, object, meta, replStatus)
|
|
}
|
|
|
|
// mustReplicater returns true if object meets replication criteria.
|
|
func mustReplicater(ctx context.Context, bucket, object string, meta map[string]string, replStatus string) bool {
|
|
if globalIsGateway {
|
|
return false
|
|
}
|
|
if rs, ok := meta[xhttp.AmzBucketReplicationStatus]; ok {
|
|
replStatus = rs
|
|
}
|
|
if replication.StatusType(replStatus) == replication.Replica {
|
|
return false
|
|
}
|
|
cfg, err := getReplicationConfig(ctx, bucket)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
opts := replication.ObjectOpts{
|
|
Name: object,
|
|
SSEC: crypto.SSEC.IsEncrypted(meta),
|
|
}
|
|
tagStr, ok := meta[xhttp.AmzObjectTagging]
|
|
if ok {
|
|
opts.UserTags = tagStr
|
|
}
|
|
return cfg.Replicate(opts)
|
|
}
|
|
|
|
// returns true if any of the objects being deleted qualifies for replication.
|
|
func hasReplicationRules(ctx context.Context, bucket string, objects []ObjectToDelete) bool {
|
|
c, err := getReplicationConfig(ctx, bucket)
|
|
if err != nil || c == nil {
|
|
return false
|
|
}
|
|
for _, obj := range objects {
|
|
if c.HasActiveRules(obj.ObjectName, true) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// returns whether object version is a deletemarker and if object qualifies for replication
|
|
func checkReplicateDelete(ctx context.Context, getObjectInfoFn GetObjectInfoFn, bucket string, dobj ObjectToDelete) (dm, replicate bool) {
|
|
rcfg, err := getReplicationConfig(ctx, bucket)
|
|
if err != nil || rcfg == nil {
|
|
return false, false
|
|
}
|
|
oi, err := getObjectInfoFn(ctx, bucket, dobj.ObjectName, ObjectOptions{VersionID: dobj.VersionID})
|
|
// when incoming delete is removal of a delete marker( a.k.a versioned delete),
|
|
// GetObjectInfo returns extra information even though it returns errFileNotFound
|
|
if err != nil {
|
|
validReplStatus := false
|
|
switch oi.ReplicationStatus {
|
|
case replication.Pending, replication.Complete, replication.Failed:
|
|
validReplStatus = true
|
|
}
|
|
if oi.DeleteMarker && validReplStatus {
|
|
return oi.DeleteMarker, true
|
|
}
|
|
return oi.DeleteMarker, false
|
|
}
|
|
opts := replication.ObjectOpts{
|
|
Name: dobj.ObjectName,
|
|
SSEC: crypto.SSEC.IsEncrypted(oi.UserDefined),
|
|
UserTags: oi.UserTags,
|
|
DeleteMarker: true,
|
|
VersionID: dobj.VersionID,
|
|
}
|
|
return oi.DeleteMarker, rcfg.Replicate(opts)
|
|
}
|
|
|
|
// replicate deletes to the designated replication target if replication configuration
|
|
// has delete marker replication or delete replication (MinIO extension to allow deletes where version id
|
|
// is specified) enabled.
|
|
// Similar to bucket replication for PUT operation, soft delete (a.k.a setting delete marker) and
|
|
// permanent deletes (by specifying a version ID in the delete operation) have three states "Pending", "Complete"
|
|
// and "Failed" to mark the status of the replication of "DELETE" operation. All failed operations can
|
|
// then be retried by healing. In the case of permanent deletes, until the replication is completed on the
|
|
// target cluster, the object version is marked deleted on the source and hidden from listing. It is permanently
|
|
// deleted from the source when the VersionPurgeStatus changes to "Complete", i.e after replication succeeds
|
|
// on target.
|
|
func replicateDelete(ctx context.Context, dobj DeletedObjectVersionInfo, objectAPI ObjectLayer) {
|
|
bucket := dobj.Bucket
|
|
rcfg, err := getReplicationConfig(ctx, bucket)
|
|
if err != nil || rcfg == nil {
|
|
return
|
|
}
|
|
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rcfg.RoleArn)
|
|
if tgt == nil {
|
|
return
|
|
}
|
|
versionID := dobj.DeleteMarkerVersionID
|
|
if versionID == "" {
|
|
versionID = dobj.VersionID
|
|
}
|
|
rmErr := tgt.RemoveObject(ctx, rcfg.GetDestination().Bucket, dobj.ObjectName, miniogo.RemoveObjectOptions{
|
|
VersionID: versionID,
|
|
Internal: miniogo.AdvancedRemoveOptions{
|
|
ReplicationDeleteMarker: dobj.DeleteMarkerVersionID != "",
|
|
ReplicationMTime: dobj.DeleteMarkerMTime,
|
|
ReplicationStatus: miniogo.ReplicationStatusReplica,
|
|
},
|
|
})
|
|
|
|
replicationStatus := dobj.DeleteMarkerReplicationStatus
|
|
versionPurgeStatus := dobj.VersionPurgeStatus
|
|
|
|
if rmErr != nil {
|
|
if dobj.VersionID == "" {
|
|
replicationStatus = string(replication.Failed)
|
|
} else {
|
|
versionPurgeStatus = Failed
|
|
}
|
|
} else {
|
|
if dobj.VersionID == "" {
|
|
replicationStatus = string(replication.Complete)
|
|
} else {
|
|
versionPurgeStatus = Complete
|
|
}
|
|
}
|
|
var eventName = event.ObjectReplicationComplete
|
|
if replicationStatus == string(replication.Failed) || versionPurgeStatus == Failed {
|
|
eventName = event.ObjectReplicationFailed
|
|
}
|
|
objInfo := ObjectInfo{
|
|
Name: dobj.ObjectName,
|
|
DeleteMarker: dobj.DeleteMarker,
|
|
VersionID: versionID,
|
|
ReplicationStatus: replication.StatusType(dobj.DeleteMarkerReplicationStatus),
|
|
VersionPurgeStatus: versionPurgeStatus,
|
|
}
|
|
|
|
eventArg := &eventArgs{
|
|
BucketName: bucket,
|
|
Object: objInfo,
|
|
Host: "Internal: [Replication]",
|
|
EventName: eventName,
|
|
}
|
|
sendEvent(*eventArg)
|
|
|
|
// Update metadata on the delete marker or purge permanent delete if replication success.
|
|
if _, err = objectAPI.DeleteObject(ctx, bucket, dobj.ObjectName, ObjectOptions{
|
|
VersionID: versionID,
|
|
DeleteMarker: dobj.DeleteMarker,
|
|
DeleteMarkerReplicationStatus: replicationStatus,
|
|
Versioned: globalBucketVersioningSys.Enabled(bucket),
|
|
VersionPurgeStatus: versionPurgeStatus,
|
|
VersionSuspended: globalBucketVersioningSys.Suspended(bucket),
|
|
}); err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s/%s %s: %w", bucket, dobj.ObjectName, dobj.VersionID, err))
|
|
}
|
|
}
|
|
|
|
func getCopyObjMetadata(oi ObjectInfo, dest replication.Destination) map[string]string {
|
|
meta := make(map[string]string, len(oi.UserDefined))
|
|
for k, v := range oi.UserDefined {
|
|
if k == xhttp.AmzBucketReplicationStatus {
|
|
continue
|
|
}
|
|
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
|
|
continue
|
|
}
|
|
meta[k] = v
|
|
}
|
|
if oi.ContentEncoding != "" {
|
|
meta[xhttp.ContentEncoding] = oi.ContentEncoding
|
|
}
|
|
if oi.ContentType != "" {
|
|
meta[xhttp.ContentType] = oi.ContentType
|
|
}
|
|
tag, err := tags.ParseObjectTags(oi.UserTags)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
if tag != nil {
|
|
meta[xhttp.AmzObjectTagging] = tag.String()
|
|
meta[xhttp.AmzTagDirective] = "REPLACE"
|
|
}
|
|
sc := dest.StorageClass
|
|
if sc == "" {
|
|
sc = oi.StorageClass
|
|
}
|
|
meta[xhttp.AmzStorageClass] = sc
|
|
if oi.UserTags != "" {
|
|
meta[xhttp.AmzObjectTagging] = oi.UserTags
|
|
}
|
|
meta[xhttp.MinIOSourceMTime] = oi.ModTime.Format(time.RFC3339)
|
|
meta[xhttp.MinIOSourceETag] = oi.ETag
|
|
meta[xhttp.AmzBucketReplicationStatus] = replication.Replica.String()
|
|
return meta
|
|
}
|
|
|
|
func putReplicationOpts(ctx context.Context, dest replication.Destination, objInfo ObjectInfo) (putOpts miniogo.PutObjectOptions) {
|
|
meta := make(map[string]string)
|
|
for k, v := range objInfo.UserDefined {
|
|
if k == xhttp.AmzBucketReplicationStatus {
|
|
continue
|
|
}
|
|
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
|
|
continue
|
|
}
|
|
meta[k] = v
|
|
}
|
|
tag, err := tags.ParseObjectTags(objInfo.UserTags)
|
|
if err != nil {
|
|
return
|
|
}
|
|
sc := dest.StorageClass
|
|
if sc == "" {
|
|
sc = objInfo.StorageClass
|
|
}
|
|
putOpts = miniogo.PutObjectOptions{
|
|
UserMetadata: meta,
|
|
UserTags: tag.ToMap(),
|
|
ContentType: objInfo.ContentType,
|
|
ContentEncoding: objInfo.ContentEncoding,
|
|
StorageClass: sc,
|
|
Internal: miniogo.AdvancedPutOptions{
|
|
SourceVersionID: objInfo.VersionID,
|
|
ReplicationStatus: miniogo.ReplicationStatusReplica,
|
|
SourceMTime: objInfo.ModTime,
|
|
SourceETag: objInfo.ETag,
|
|
},
|
|
}
|
|
if mode, ok := objInfo.UserDefined[xhttp.AmzObjectLockMode]; ok {
|
|
rmode := miniogo.RetentionMode(mode)
|
|
putOpts.Mode = rmode
|
|
}
|
|
if retainDateStr, ok := objInfo.UserDefined[xhttp.AmzObjectLockRetainUntilDate]; ok {
|
|
rdate, err := time.Parse(time.RFC3339, retainDateStr)
|
|
if err != nil {
|
|
return
|
|
}
|
|
putOpts.RetainUntilDate = rdate
|
|
}
|
|
if lhold, ok := objInfo.UserDefined[xhttp.AmzObjectLockLegalHold]; ok {
|
|
putOpts.LegalHold = miniogo.LegalHoldStatus(lhold)
|
|
}
|
|
if crypto.S3.IsEncrypted(objInfo.UserDefined) {
|
|
putOpts.ServerSideEncryption = encrypt.NewSSE()
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
type replicationAction string
|
|
|
|
const (
|
|
replicateMetadata replicationAction = "metadata"
|
|
replicateNone replicationAction = "none"
|
|
replicateAll replicationAction = "all"
|
|
)
|
|
|
|
// returns replicationAction by comparing metadata between source and target
|
|
func getReplicationAction(oi1 ObjectInfo, oi2 minio.ObjectInfo) replicationAction {
|
|
// needs full replication
|
|
if oi1.ETag != oi2.ETag ||
|
|
oi1.VersionID != oi2.VersionID ||
|
|
oi1.Size != oi2.Size ||
|
|
oi1.DeleteMarker != oi2.IsDeleteMarker {
|
|
return replicateAll
|
|
}
|
|
|
|
if !oi1.ModTime.Equal(oi2.LastModified) ||
|
|
oi1.ContentType != oi2.ContentType ||
|
|
oi1.StorageClass != oi2.StorageClass {
|
|
return replicateMetadata
|
|
}
|
|
if oi1.ContentEncoding != "" {
|
|
enc, ok := oi2.UserMetadata[xhttp.ContentEncoding]
|
|
if !ok || enc != oi1.ContentEncoding {
|
|
return replicateMetadata
|
|
}
|
|
}
|
|
for k, v := range oi2.UserMetadata {
|
|
oi2.Metadata[k] = []string{v}
|
|
}
|
|
if len(oi2.Metadata) != len(oi1.UserDefined) {
|
|
return replicateMetadata
|
|
}
|
|
for k1, v1 := range oi1.UserDefined {
|
|
if v2, ok := oi2.Metadata[k1]; !ok || v1 != strings.Join(v2, "") {
|
|
return replicateMetadata
|
|
}
|
|
}
|
|
t, _ := tags.MapToObjectTags(oi2.UserTags)
|
|
if t.String() != oi1.UserTags {
|
|
return replicateMetadata
|
|
}
|
|
return replicateNone
|
|
}
|
|
|
|
// replicateObject replicates the specified version of the object to destination bucket
|
|
// The source object is then updated to reflect the replication status.
|
|
func replicateObject(ctx context.Context, objInfo ObjectInfo, objectAPI ObjectLayer) {
|
|
bucket := objInfo.Bucket
|
|
object := objInfo.Name
|
|
|
|
cfg, err := getReplicationConfig(ctx, bucket)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
return
|
|
}
|
|
tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, cfg.RoleArn)
|
|
if tgt == nil {
|
|
logger.LogIf(ctx, fmt.Errorf("failed to get target for bucket:%s arn:%s", bucket, cfg.RoleArn))
|
|
return
|
|
}
|
|
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{
|
|
VersionID: objInfo.VersionID,
|
|
})
|
|
if err != nil {
|
|
return
|
|
}
|
|
objInfo = gr.ObjInfo
|
|
size, err := objInfo.GetActualSize()
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
gr.Close()
|
|
return
|
|
}
|
|
|
|
dest := cfg.GetDestination()
|
|
if dest.Bucket == "" {
|
|
gr.Close()
|
|
return
|
|
}
|
|
|
|
rtype := replicateAll
|
|
oi, err := tgt.StatObject(ctx, dest.Bucket, object, miniogo.StatObjectOptions{VersionID: objInfo.VersionID})
|
|
if err == nil {
|
|
rtype = getReplicationAction(objInfo, oi)
|
|
if rtype == replicateNone {
|
|
gr.Close()
|
|
// object with same VersionID already exists, replication kicked off by
|
|
// PutObject might have completed.
|
|
return
|
|
}
|
|
}
|
|
|
|
target, err := globalBucketMetadataSys.GetBucketTarget(bucket, cfg.RoleArn)
|
|
if err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("failed to get target for replication bucket:%s cfg:%s err:%s", bucket, cfg.RoleArn, err))
|
|
return
|
|
}
|
|
putOpts := putReplicationOpts(ctx, dest, objInfo)
|
|
replicationStatus := replication.Complete
|
|
|
|
// Setup bandwidth throttling
|
|
peers, _ := globalEndpoints.peers()
|
|
totalNodesCount := len(peers)
|
|
if totalNodesCount == 0 {
|
|
totalNodesCount = 1 // For standalone erasure coding
|
|
}
|
|
b := target.BandwidthLimit / int64(totalNodesCount)
|
|
var headerSize int
|
|
for k, v := range putOpts.Header() {
|
|
headerSize += len(k) + len(v)
|
|
}
|
|
r := bandwidth.NewMonitoredReader(ctx, globalBucketMonitor, objInfo.Bucket, objInfo.Name, gr, headerSize, b, target.BandwidthLimit)
|
|
if rtype == replicateAll {
|
|
_, err = tgt.PutObject(ctx, dest.Bucket, object, r, size, "", "", putOpts)
|
|
} else {
|
|
// replicate metadata for object tagging/copy with metadata replacement
|
|
dstOpts := miniogo.PutObjectOptions{Internal: miniogo.AdvancedPutOptions{SourceVersionID: objInfo.VersionID}}
|
|
_, err = tgt.CopyObject(ctx, dest.Bucket, object, dest.Bucket, object, getCopyObjMetadata(objInfo, dest), dstOpts)
|
|
}
|
|
r.Close()
|
|
if err != nil {
|
|
replicationStatus = replication.Failed
|
|
}
|
|
objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replicationStatus.String()
|
|
if objInfo.UserTags != "" {
|
|
objInfo.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags
|
|
}
|
|
|
|
// FIXME: add support for missing replication events
|
|
// - event.ObjectReplicationNotTracked
|
|
// - event.ObjectReplicationMissedThreshold
|
|
// - event.ObjectReplicationReplicatedAfterThreshold
|
|
var eventName = event.ObjectReplicationComplete
|
|
if replicationStatus == replication.Failed {
|
|
eventName = event.ObjectReplicationFailed
|
|
}
|
|
sendEvent(eventArgs{
|
|
EventName: eventName,
|
|
BucketName: bucket,
|
|
Object: objInfo,
|
|
Host: "Internal: [Replication]",
|
|
})
|
|
objInfo.metadataOnly = true // Perform only metadata updates.
|
|
if _, err = objectAPI.CopyObject(ctx, bucket, object, bucket, object, objInfo, ObjectOptions{
|
|
VersionID: objInfo.VersionID,
|
|
}, ObjectOptions{
|
|
VersionID: objInfo.VersionID,
|
|
}); err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to update replication metadata for %s: %s", objInfo.VersionID, err))
|
|
}
|
|
}
|
|
|
|
// filterReplicationStatusMetadata filters replication status metadata for COPY
|
|
func filterReplicationStatusMetadata(metadata map[string]string) map[string]string {
|
|
// Copy on write
|
|
dst := metadata
|
|
var copied bool
|
|
delKey := func(key string) {
|
|
if _, ok := metadata[key]; !ok {
|
|
return
|
|
}
|
|
if !copied {
|
|
dst = make(map[string]string, len(metadata))
|
|
for k, v := range metadata {
|
|
dst[k] = v
|
|
}
|
|
copied = true
|
|
}
|
|
delete(dst, key)
|
|
}
|
|
|
|
delKey(xhttp.AmzBucketReplicationStatus)
|
|
return dst
|
|
}
|
|
|
|
// DeletedObjectVersionInfo has info on deleted object
|
|
type DeletedObjectVersionInfo struct {
|
|
DeletedObject
|
|
Bucket string
|
|
}
|
|
type replicationState struct {
|
|
// add future metrics here
|
|
replicaCh chan ObjectInfo
|
|
replicaDeleteCh chan DeletedObjectVersionInfo
|
|
}
|
|
|
|
func (r *replicationState) queueReplicaTask(oi ObjectInfo) {
|
|
if r == nil {
|
|
return
|
|
}
|
|
select {
|
|
case r.replicaCh <- oi:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (r *replicationState) queueReplicaDeleteTask(doi DeletedObjectVersionInfo) {
|
|
if r == nil {
|
|
return
|
|
}
|
|
select {
|
|
case r.replicaDeleteCh <- doi:
|
|
default:
|
|
}
|
|
}
|
|
|
|
var (
|
|
globalReplicationState *replicationState
|
|
// TODO: currently keeping it conservative
|
|
// but eventually can be tuned in future,
|
|
// take only half the CPUs for replication
|
|
// conservatively.
|
|
globalReplicationConcurrent = runtime.GOMAXPROCS(0) / 2
|
|
)
|
|
|
|
func newReplicationState() *replicationState {
|
|
|
|
// fix minimum concurrent replication to 1 for single CPU setup
|
|
if globalReplicationConcurrent == 0 {
|
|
globalReplicationConcurrent = 1
|
|
}
|
|
rs := &replicationState{
|
|
replicaCh: make(chan ObjectInfo, 10000),
|
|
replicaDeleteCh: make(chan DeletedObjectVersionInfo, 10000),
|
|
}
|
|
go func() {
|
|
<-GlobalContext.Done()
|
|
close(rs.replicaCh)
|
|
close(rs.replicaDeleteCh)
|
|
}()
|
|
return rs
|
|
}
|
|
|
|
// addWorker creates a new worker to process tasks
|
|
func (r *replicationState) addWorker(ctx context.Context, objectAPI ObjectLayer) {
|
|
// Add a new worker.
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case oi, ok := <-r.replicaCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
replicateObject(ctx, oi, objectAPI)
|
|
case doi, ok := <-r.replicaDeleteCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
replicateDelete(ctx, doi, objectAPI)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func initBackgroundReplication(ctx context.Context, objectAPI ObjectLayer) {
|
|
if globalReplicationState == nil {
|
|
return
|
|
}
|
|
|
|
// Start with globalReplicationConcurrent.
|
|
for i := 0; i < globalReplicationConcurrent; i++ {
|
|
globalReplicationState.addWorker(ctx, objectAPI)
|
|
}
|
|
}
|