mirror of
https://github.com/minio/minio.git
synced 2025-12-08 16:53:11 -05:00
Refactor replication target management. (#10154)
Generalize replication target management so that remote targets for a bucket can be managed with ARNs. `mc admin bucket remote` command will be used to manage targets.
This commit is contained in:
@@ -18,35 +18,22 @@ package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
miniogo "github.com/minio/minio-go/v7"
|
||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||
"github.com/minio/minio-go/v7/pkg/encrypt"
|
||||
"github.com/minio/minio-go/v7/pkg/tags"
|
||||
"github.com/minio/minio/cmd/crypto"
|
||||
xhttp "github.com/minio/minio/cmd/http"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/bucket/replication"
|
||||
"github.com/minio/minio/pkg/bucket/versioning"
|
||||
"github.com/minio/minio/pkg/event"
|
||||
iampolicy "github.com/minio/minio/pkg/iam/policy"
|
||||
"github.com/minio/minio/pkg/madmin"
|
||||
)
|
||||
|
||||
// BucketReplicationSys represents replication subsystem
|
||||
type BucketReplicationSys struct {
|
||||
sync.RWMutex
|
||||
targetsMap map[string]*miniogo.Core
|
||||
targetsARNMap map[string]string
|
||||
}
|
||||
|
||||
// GetConfig - gets replication config associated to a given bucket name.
|
||||
func (sys *BucketReplicationSys) GetConfig(ctx context.Context, bucketName string) (rc *replication.Config, err error) {
|
||||
// gets replication config associated to a given bucket name.
|
||||
func getReplicationConfig(ctx context.Context, bucketName string) (rc *replication.Config, err error) {
|
||||
if globalIsGateway {
|
||||
objAPI := newObjectLayerWithoutSafeModeFn()
|
||||
if objAPI == nil {
|
||||
@@ -59,153 +46,29 @@ func (sys *BucketReplicationSys) GetConfig(ctx context.Context, bucketName strin
|
||||
return globalBucketMetadataSys.GetReplicationConfig(ctx, bucketName)
|
||||
}
|
||||
|
||||
// SetTarget - sets a new minio-go client replication target for this bucket.
|
||||
func (sys *BucketReplicationSys) SetTarget(ctx context.Context, bucket string, tgt *madmin.BucketTarget) error {
|
||||
if globalIsGateway {
|
||||
return nil
|
||||
}
|
||||
// delete replication targets that were removed
|
||||
if tgt.Empty() {
|
||||
sys.Lock()
|
||||
if currTgt, ok := sys.targetsMap[bucket]; ok {
|
||||
delete(sys.targetsARNMap, currTgt.EndpointURL().String())
|
||||
}
|
||||
delete(sys.targetsMap, bucket)
|
||||
sys.Unlock()
|
||||
return nil
|
||||
}
|
||||
clnt, err := getReplicationTargetClient(tgt)
|
||||
if err != nil {
|
||||
return BucketReplicationTargetNotFound{Bucket: tgt.TargetBucket}
|
||||
}
|
||||
ok, err := clnt.BucketExists(ctx, tgt.TargetBucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
return BucketReplicationDestinationNotFound{Bucket: tgt.TargetBucket}
|
||||
}
|
||||
vcfg, err := clnt.GetBucketVersioning(ctx, tgt.TargetBucket)
|
||||
if err != nil || vcfg.Status != string(versioning.Enabled) {
|
||||
return BucketReplicationTargetNotVersioned{Bucket: tgt.TargetBucket}
|
||||
}
|
||||
sys.Lock()
|
||||
sys.targetsMap[bucket] = clnt
|
||||
sys.targetsARNMap[tgt.URL()] = tgt.Arn
|
||||
sys.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetTargetClient returns minio-go client for target instance
|
||||
func (sys *BucketReplicationSys) GetTargetClient(ctx context.Context, bucket string) *miniogo.Core {
|
||||
var clnt *miniogo.Core
|
||||
sys.RLock()
|
||||
if c, ok := sys.targetsMap[bucket]; ok {
|
||||
clnt = c
|
||||
}
|
||||
sys.RUnlock()
|
||||
return clnt
|
||||
}
|
||||
|
||||
// validateDestination returns error if replication destination bucket missing or not configured
|
||||
// validateReplicationDestination returns error if replication destination bucket missing or not configured
|
||||
// It also returns true if replication destination is same as this server.
|
||||
func (sys *BucketReplicationSys) validateDestination(ctx context.Context, bucket string, rCfg *replication.Config) (bool, error) {
|
||||
clnt := sys.GetTargetClient(ctx, bucket)
|
||||
func validateReplicationDestination(ctx context.Context, bucket string, rCfg *replication.Config) (bool, error) {
|
||||
clnt := globalBucketTargetSys.GetReplicationTargetClient(ctx, rCfg.ReplicationArn)
|
||||
if clnt == nil {
|
||||
return false, BucketReplicationTargetNotFound{Bucket: bucket}
|
||||
return false, BucketRemoteTargetNotFound{Bucket: bucket}
|
||||
}
|
||||
if found, _ := clnt.BucketExists(ctx, rCfg.GetDestination().Bucket); !found {
|
||||
return false, BucketReplicationDestinationNotFound{Bucket: rCfg.GetDestination().Bucket}
|
||||
}
|
||||
// validate replication ARN against target endpoint
|
||||
for k, v := range sys.targetsARNMap {
|
||||
if v == rCfg.ReplicationArn {
|
||||
if k == clnt.EndpointURL().String() {
|
||||
sameTarget, _ := isLocalHost(clnt.EndpointURL().Hostname(), clnt.EndpointURL().Port(), globalMinioPort)
|
||||
return sameTarget, nil
|
||||
}
|
||||
c, ok := globalBucketTargetSys.arnRemotesMap[rCfg.ReplicationArn]
|
||||
if ok {
|
||||
if c.EndpointURL().String() == clnt.EndpointURL().String() {
|
||||
sameTarget, _ := isLocalHost(clnt.EndpointURL().Hostname(), clnt.EndpointURL().Port(), globalMinioPort)
|
||||
return sameTarget, nil
|
||||
}
|
||||
}
|
||||
return false, BucketReplicationTargetNotFound{Bucket: bucket}
|
||||
}
|
||||
|
||||
// NewBucketReplicationSys - creates new replication system.
|
||||
func NewBucketReplicationSys() *BucketReplicationSys {
|
||||
return &BucketReplicationSys{
|
||||
targetsMap: make(map[string]*miniogo.Core),
|
||||
targetsARNMap: make(map[string]string),
|
||||
}
|
||||
}
|
||||
|
||||
// Init initializes the bucket replication subsystem for buckets with replication config
|
||||
func (sys *BucketReplicationSys) Init(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error {
|
||||
if objAPI == nil {
|
||||
return errServerNotInitialized
|
||||
}
|
||||
|
||||
// In gateway mode, replication is not supported.
|
||||
if globalIsGateway {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Load bucket replication targets once during boot.
|
||||
sys.load(ctx, buckets, objAPI)
|
||||
return nil
|
||||
}
|
||||
|
||||
// create minio-go clients for buckets having replication targets
|
||||
func (sys *BucketReplicationSys) load(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) {
|
||||
for _, bucket := range buckets {
|
||||
tgt, err := globalBucketMetadataSys.GetReplicationTargetConfig(bucket.Name)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if tgt == nil || tgt.Empty() {
|
||||
continue
|
||||
}
|
||||
tgtClient, err := getReplicationTargetClient(tgt)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
sys.Lock()
|
||||
sys.targetsMap[bucket.Name] = tgtClient
|
||||
sys.targetsARNMap[tgt.URL()] = tgt.Arn
|
||||
sys.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// GetARN returns the ARN associated with replication target URL
|
||||
func (sys *BucketReplicationSys) getARN(endpoint string) string {
|
||||
return sys.targetsARNMap[endpoint]
|
||||
}
|
||||
|
||||
// getReplicationTargetInstanceTransport contains a singleton roundtripper.
|
||||
var getReplicationTargetInstanceTransport http.RoundTripper
|
||||
var getReplicationTargetInstanceTransportOnce sync.Once
|
||||
|
||||
// Returns a minio-go Client configured to access remote host described in replication target config.
|
||||
var getReplicationTargetClient = func(tcfg *madmin.BucketTarget) (*miniogo.Core, error) {
|
||||
config := tcfg.Credentials
|
||||
// if Signature version '4' use NewV4 directly.
|
||||
creds := credentials.NewStaticV4(config.AccessKey, config.SecretKey, "")
|
||||
// if Signature version '2' use NewV2 directly.
|
||||
if strings.ToUpper(tcfg.API) == "S3V2" {
|
||||
creds = credentials.NewStaticV2(config.AccessKey, config.SecretKey, "")
|
||||
}
|
||||
|
||||
getReplicationTargetInstanceTransportOnce.Do(func() {
|
||||
getReplicationTargetInstanceTransport = NewGatewayHTTPTransport()
|
||||
})
|
||||
core, err := miniogo.NewCore(tcfg.Endpoint, &miniogo.Options{
|
||||
Creds: creds,
|
||||
Secure: tcfg.Secure,
|
||||
Transport: getReplicationTargetInstanceTransport,
|
||||
})
|
||||
return core, err
|
||||
return false, BucketRemoteTargetNotFound{Bucket: bucket}
|
||||
}
|
||||
|
||||
// mustReplicate returns true if object meets replication criteria.
|
||||
func (sys *BucketReplicationSys) mustReplicate(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string) bool {
|
||||
func mustReplicate(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string) bool {
|
||||
if globalIsGateway {
|
||||
return false
|
||||
}
|
||||
@@ -218,7 +81,7 @@ func (sys *BucketReplicationSys) mustReplicate(ctx context.Context, r *http.Requ
|
||||
if s3Err := isPutActionAllowed(getRequestAuthType(r), bucket, object, r, iampolicy.GetReplicationConfigurationAction); s3Err != ErrNone {
|
||||
return false
|
||||
}
|
||||
cfg, err := globalBucketReplicationSys.GetConfig(ctx, bucket)
|
||||
cfg, err := getReplicationConfig(ctx, bucket)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
@@ -279,12 +142,12 @@ func putReplicationOpts(dest replication.Destination, objInfo ObjectInfo) (putOp
|
||||
// replicateObject replicates the specified version of the object to destination bucket
|
||||
// The source object is then updated to reflect the replication status.
|
||||
func replicateObject(ctx context.Context, bucket, object, versionID string, objectAPI ObjectLayer, eventArg *eventArgs, healPending bool) {
|
||||
cfg, err := globalBucketReplicationSys.GetConfig(ctx, bucket)
|
||||
cfg, err := getReplicationConfig(ctx, bucket)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return
|
||||
}
|
||||
tgt := globalBucketReplicationSys.GetTargetClient(ctx, bucket)
|
||||
tgt := globalBucketTargetSys.GetReplicationTargetClient(ctx, cfg.ReplicationArn)
|
||||
if tgt == nil {
|
||||
return
|
||||
}
|
||||
@@ -344,12 +207,3 @@ func replicateObject(ctx context.Context, bucket, object, versionID string, obje
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
}
|
||||
|
||||
// getReplicationARN gets existing ARN for an endpoint or generates a new one.
|
||||
func (sys *BucketReplicationSys) getReplicationARN(endpoint string) string {
|
||||
arn, ok := sys.targetsARNMap[endpoint]
|
||||
if ok {
|
||||
return arn
|
||||
}
|
||||
return fmt.Sprintf("arn:minio:s3::%s:*", mustGetUUID())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user