mirror of
https://github.com/minio/minio.git
synced 2024-12-24 22:25:54 -05:00
fix: Change ListBucketTargets handler (#10217)
to list all targets across a tenant. Also fixing some validations.
This commit is contained in:
parent
ce129efa09
commit
adcaa6f9de
@ -26,7 +26,6 @@ import (
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/minio/minio/cmd/config"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/auth"
|
||||
"github.com/minio/minio/pkg/env"
|
||||
iampolicy "github.com/minio/minio/pkg/iam/policy"
|
||||
"github.com/minio/minio/pkg/madmin"
|
||||
@ -122,8 +121,8 @@ func (a adminAPIHandlers) GetBucketQuotaConfigHandler(w http.ResponseWriter, r *
|
||||
writeSuccessResponseJSON(w, configData)
|
||||
}
|
||||
|
||||
// SetBucketTargetHandler - sets a remote target for bucket
|
||||
func (a adminAPIHandlers) SetBucketTargetHandler(w http.ResponseWriter, r *http.Request) {
|
||||
// SetRemoteTargetHandler - sets a remote target for bucket
|
||||
func (a adminAPIHandlers) SetRemoteTargetHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "SetBucketTarget")
|
||||
|
||||
defer logger.AuditLog(w, r, "SetBucketTarget", mustGetClaimsFromToken(r))
|
||||
@ -174,6 +173,7 @@ func (a adminAPIHandlers) SetBucketTargetHandler(w http.ResponseWriter, r *http.
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrBucketRemoteIdenticalToSource), r.URL)
|
||||
return
|
||||
}
|
||||
target.SourceBucket = bucket
|
||||
target.Arn = globalBucketTargetSys.getRemoteARN(bucket, &target)
|
||||
if target.Arn == "" {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrAdminConfigBadJSON, err), r.URL)
|
||||
@ -183,7 +183,7 @@ func (a adminAPIHandlers) SetBucketTargetHandler(w http.ResponseWriter, r *http.
|
||||
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
targets, err := globalBucketTargetSys.ListTargets(ctx, bucket)
|
||||
targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
@ -209,9 +209,9 @@ func (a adminAPIHandlers) SetBucketTargetHandler(w http.ResponseWriter, r *http.
|
||||
writeSuccessResponseJSON(w, data)
|
||||
}
|
||||
|
||||
// ListBucketTargetsHandler - lists remote target(s) for a bucket or gets a target
|
||||
// ListRemoteTargetsHandler - lists remote target(s) for a bucket or gets a target
|
||||
// for a particular ARN type
|
||||
func (a adminAPIHandlers) ListBucketTargetsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
func (a adminAPIHandlers) ListRemoteTargetsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "ListBucketTargets")
|
||||
|
||||
defer logger.AuditLog(w, r, "ListBucketTargets", mustGetClaimsFromToken(r))
|
||||
@ -225,28 +225,13 @@ func (a adminAPIHandlers) ListBucketTargetsHandler(w http.ResponseWriter, r *htt
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
cfg, err := globalBucketMetadataSys.GetBucketTargetsConfig(bucket)
|
||||
if err != nil {
|
||||
if bucket != "" {
|
||||
if _, err := globalBucketMetadataSys.GetBucketTargetsConfig(bucket); err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
var (
|
||||
targets []madmin.BucketTarget
|
||||
tgt, ct madmin.BucketTarget
|
||||
creds auth.Credentials
|
||||
)
|
||||
if cfg != nil && !cfg.Empty() {
|
||||
for idx, t := range cfg.Targets {
|
||||
if string(t.Type) == arnType || arnType == "" {
|
||||
ct = cfg.Targets[idx]
|
||||
// remove secretKey from creds
|
||||
creds.AccessKey = ct.Credentials.AccessKey
|
||||
tgt = madmin.BucketTarget{Endpoint: ct.Endpoint, Secure: ct.Secure, TargetBucket: ct.TargetBucket, Credentials: &creds, Arn: ct.Arn, Type: ct.Type}
|
||||
targets = append(targets, tgt)
|
||||
}
|
||||
}
|
||||
}
|
||||
targets := globalBucketTargetSys.ListTargets(ctx, bucket, arnType)
|
||||
data, err := json.Marshal(targets)
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
@ -256,8 +241,8 @@ func (a adminAPIHandlers) ListBucketTargetsHandler(w http.ResponseWriter, r *htt
|
||||
writeSuccessResponseJSON(w, data)
|
||||
}
|
||||
|
||||
// RemoveBucketTargetHandler - removes a remote target for bucket with specified ARN
|
||||
func (a adminAPIHandlers) RemoveBucketTargetHandler(w http.ResponseWriter, r *http.Request) {
|
||||
// RemoveRemoteTargetHandler - removes a remote target for bucket with specified ARN
|
||||
func (a adminAPIHandlers) RemoveRemoteTargetHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "RemoveBucketTarget")
|
||||
|
||||
defer logger.AuditLog(w, r, "RemoveBucketTarget", mustGetClaimsFromToken(r))
|
||||
@ -275,11 +260,18 @@ func (a adminAPIHandlers) RemoveBucketTargetHandler(w http.ResponseWriter, r *ht
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
// Check if bucket exists.
|
||||
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
if err := globalBucketTargetSys.RemoveTarget(ctx, bucket, arn); err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
targets, err := globalBucketTargetSys.ListTargets(ctx, bucket)
|
||||
targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
|
@ -183,14 +183,14 @@ func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool)
|
||||
}
|
||||
// Bucket replication operations
|
||||
// GetBucketTargetHandler
|
||||
adminRouter.Methods(http.MethodGet).Path(adminVersion+"/list-bucket-targets").HandlerFunc(
|
||||
httpTraceHdrs(adminAPI.ListBucketTargetsHandler)).Queries("bucket", "{bucket:.*}", "type", "{type:.*}")
|
||||
// SetBucketTargetHandler
|
||||
adminRouter.Methods(http.MethodPut).Path(adminVersion+"/set-bucket-target").HandlerFunc(
|
||||
httpTraceHdrs(adminAPI.SetBucketTargetHandler)).Queries("bucket", "{bucket:.*}")
|
||||
// SetBucketTargetHandler
|
||||
adminRouter.Methods(http.MethodDelete).Path(adminVersion+"/remove-bucket-target").HandlerFunc(
|
||||
httpTraceHdrs(adminAPI.RemoveBucketTargetHandler)).Queries("bucket", "{bucket:.*}", "arn", "{arn:.*}")
|
||||
adminRouter.Methods(http.MethodGet).Path(adminVersion+"/list-remote-targets").HandlerFunc(
|
||||
httpTraceHdrs(adminAPI.ListRemoteTargetsHandler)).Queries("bucket", "{bucket:.*}", "type", "{type:.*}")
|
||||
// SetRemoteTargetHandler
|
||||
adminRouter.Methods(http.MethodPut).Path(adminVersion+"/set-remote-target").HandlerFunc(
|
||||
httpTraceHdrs(adminAPI.SetRemoteTargetHandler)).Queries("bucket", "{bucket:.*}")
|
||||
// SetRemoteTargetHandler
|
||||
adminRouter.Methods(http.MethodDelete).Path(adminVersion+"/remove-remote-target").HandlerFunc(
|
||||
httpTraceHdrs(adminAPI.RemoveRemoteTargetHandler)).Queries("bucket", "{bucket:.*}", "arn", "{arn:.*}")
|
||||
}
|
||||
|
||||
// -- Top APIs --
|
||||
|
@ -115,6 +115,7 @@ const (
|
||||
ErrBucketRemoteArnInvalid
|
||||
ErrBucketRemoteRemoveDisallowed
|
||||
ErrReplicationTargetNotVersionedError
|
||||
ErrReplicationSourceNotVersionedError
|
||||
ErrReplicationNeedsVersioningError
|
||||
ErrReplicationBucketNeedsVersioningError
|
||||
ErrBucketReplicationDisabledError
|
||||
@ -871,6 +872,11 @@ var errorCodes = errorCodeMap{
|
||||
Description: "The replication target does not have versioning enabled",
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
},
|
||||
ErrReplicationSourceNotVersionedError: {
|
||||
Code: "ReplicationSourceNotVersionedError",
|
||||
Description: "The replication source does not have versioning enabled",
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
},
|
||||
ErrReplicationNeedsVersioningError: {
|
||||
Code: "InvalidRequest",
|
||||
Description: "Versioning must be 'Enabled' on the bucket to apply a replication configuration",
|
||||
@ -1929,6 +1935,8 @@ func toAPIErrorCode(ctx context.Context, err error) (apiErr APIErrorCode) {
|
||||
apiErr = ErrBucketRemoteRemoveDisallowed
|
||||
case BucketReplicationTargetNotVersioned:
|
||||
apiErr = ErrReplicationTargetNotVersionedError
|
||||
case BucketReplicationSourceNotVersioned:
|
||||
apiErr = ErrReplicationSourceNotVersionedError
|
||||
case BucketQuotaExceeded:
|
||||
apiErr = ErrAdminBucketQuotaExceeded
|
||||
case *event.ErrInvalidEventName:
|
||||
|
@ -49,7 +49,7 @@ func getReplicationConfig(ctx context.Context, bucketName string) (rc *replicati
|
||||
// validateReplicationDestination returns error if replication destination bucket missing or not configured
|
||||
// It also returns true if replication destination is same as this server.
|
||||
func validateReplicationDestination(ctx context.Context, bucket string, rCfg *replication.Config) (bool, error) {
|
||||
clnt := globalBucketTargetSys.GetReplicationTargetClient(ctx, rCfg.ReplicationArn)
|
||||
clnt := globalBucketTargetSys.GetReplicationTargetClient(ctx, rCfg.RoleArn)
|
||||
if clnt == nil {
|
||||
return false, BucketRemoteTargetNotFound{Bucket: bucket}
|
||||
}
|
||||
@ -65,7 +65,7 @@ func validateReplicationDestination(ctx context.Context, bucket string, rCfg *re
|
||||
}
|
||||
}
|
||||
// validate replication ARN against target endpoint
|
||||
c, ok := globalBucketTargetSys.arnRemotesMap[rCfg.ReplicationArn]
|
||||
c, ok := globalBucketTargetSys.arnRemotesMap[rCfg.RoleArn]
|
||||
if ok {
|
||||
if c.EndpointURL().String() == clnt.EndpointURL().String() {
|
||||
sameTarget, _ := isLocalHost(clnt.EndpointURL().Hostname(), clnt.EndpointURL().Port(), globalMinioPort)
|
||||
@ -159,7 +159,7 @@ func replicateObject(ctx context.Context, bucket, object, versionID string, obje
|
||||
logger.LogIf(ctx, err)
|
||||
return
|
||||
}
|
||||
tgt := globalBucketTargetSys.GetReplicationTargetClient(ctx, cfg.ReplicationArn)
|
||||
tgt := globalBucketTargetSys.GetReplicationTargetClient(ctx, cfg.RoleArn)
|
||||
if tgt == nil {
|
||||
return
|
||||
}
|
||||
|
@ -18,7 +18,6 @@ package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
@ -36,16 +35,42 @@ type BucketTargetSys struct {
|
||||
clientsCache map[string]*miniogo.Core
|
||||
}
|
||||
|
||||
// ListTargets - gets list of bucket targets for this bucket.
|
||||
func (sys *BucketTargetSys) ListTargets(ctx context.Context, bucket string) (*madmin.BucketTargets, error) {
|
||||
if globalIsGateway {
|
||||
return nil, nil
|
||||
// ListTargets lists bucket targets across tenant or for individual bucket, and returns
|
||||
// results filtered by arnType
|
||||
func (sys *BucketTargetSys) ListTargets(ctx context.Context, bucket, arnType string) (targets []madmin.BucketTarget) {
|
||||
if bucket != "" {
|
||||
if ts, err := sys.ListBucketTargets(ctx, bucket); err == nil {
|
||||
for _, t := range ts.Targets {
|
||||
if string(t.Type) == arnType || arnType == "" {
|
||||
targets = append(targets, t.Clone())
|
||||
}
|
||||
}
|
||||
}
|
||||
return targets
|
||||
}
|
||||
sys.RLock()
|
||||
defer sys.RUnlock()
|
||||
for _, tgts := range sys.targetsMap {
|
||||
for _, t := range tgts {
|
||||
if string(t.Type) == arnType || arnType == "" {
|
||||
targets = append(targets, t.Clone())
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// ListBucketTargets - gets list of bucket targets for this bucket.
|
||||
func (sys *BucketTargetSys) ListBucketTargets(ctx context.Context, bucket string) (*madmin.BucketTargets, error) {
|
||||
|
||||
sys.RLock()
|
||||
defer sys.RUnlock()
|
||||
|
||||
tgts, ok := sys.targetsMap[bucket]
|
||||
if ok {
|
||||
return &madmin.BucketTargets{Targets: tgts}, nil
|
||||
}
|
||||
return nil, fmt.Errorf("No remote targets exist for bucket %s", bucket)
|
||||
return nil, BucketRemoteTargetNotFound{Bucket: bucket}
|
||||
}
|
||||
|
||||
// SetTarget - sets a new minio-go client target for this bucket.
|
||||
@ -60,8 +85,10 @@ func (sys *BucketTargetSys) SetTarget(ctx context.Context, bucket string, tgt *m
|
||||
if err != nil {
|
||||
return BucketRemoteTargetNotFound{Bucket: tgt.TargetBucket}
|
||||
}
|
||||
|
||||
if tgt.Type == madmin.ReplicationArn {
|
||||
if tgt.Type == madmin.ReplicationService {
|
||||
if !globalBucketVersioningSys.Enabled(bucket) {
|
||||
return BucketReplicationSourceNotVersioned{Bucket: bucket}
|
||||
}
|
||||
vcfg, err := clnt.GetBucketVersioning(ctx, tgt.TargetBucket)
|
||||
if err != nil || vcfg.Status != string(versioning.Enabled) {
|
||||
if isErrBucketNotFound(err) {
|
||||
@ -112,10 +139,10 @@ func (sys *BucketTargetSys) RemoveTarget(ctx context.Context, bucket, arnStr str
|
||||
if err != nil {
|
||||
return BucketRemoteArnInvalid{Bucket: bucket}
|
||||
}
|
||||
if arn.Type == madmin.ReplicationArn {
|
||||
if arn.Type == madmin.ReplicationService {
|
||||
// reject removal of remote target if replication configuration is present
|
||||
rcfg, err := getReplicationConfig(ctx, bucket)
|
||||
if err == nil && rcfg.ReplicationArn == arnStr {
|
||||
if err == nil && rcfg.RoleArn == arnStr {
|
||||
if _, ok := sys.arnRemotesMap[arnStr]; ok {
|
||||
return BucketRemoteRemoveDisallowed{Bucket: bucket}
|
||||
}
|
||||
@ -125,11 +152,17 @@ func (sys *BucketTargetSys) RemoveTarget(ctx context.Context, bucket, arnStr str
|
||||
sys.Lock()
|
||||
defer sys.Unlock()
|
||||
targets := make([]madmin.BucketTarget, 0)
|
||||
found := false
|
||||
tgts := sys.targetsMap[bucket]
|
||||
for _, tgt := range tgts {
|
||||
if tgt.Arn != arnStr {
|
||||
targets = append(targets, tgt)
|
||||
continue
|
||||
}
|
||||
found = true
|
||||
}
|
||||
if !found {
|
||||
return BucketRemoteTargetNotFound{Bucket: bucket}
|
||||
}
|
||||
sys.targetsMap[bucket] = targets
|
||||
delete(sys.arnRemotesMap, arnStr)
|
||||
@ -168,6 +201,40 @@ func (sys *BucketTargetSys) Init(ctx context.Context, buckets []BucketInfo, objA
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateTarget updates target to reflect metadata updates
|
||||
func (sys *BucketTargetSys) UpdateTarget(bucket string, cfg *madmin.BucketTargets) {
|
||||
if sys == nil {
|
||||
return
|
||||
}
|
||||
sys.Lock()
|
||||
defer sys.Unlock()
|
||||
if cfg == nil || cfg.Empty() {
|
||||
// remove target and arn association
|
||||
if tgts, ok := sys.targetsMap[bucket]; ok {
|
||||
for _, t := range tgts {
|
||||
delete(sys.arnRemotesMap, t.Arn)
|
||||
}
|
||||
}
|
||||
delete(sys.targetsMap, bucket)
|
||||
return
|
||||
}
|
||||
|
||||
if len(cfg.Targets) > 0 {
|
||||
sys.targetsMap[bucket] = cfg.Targets
|
||||
}
|
||||
for _, tgt := range cfg.Targets {
|
||||
tgtClient, err := sys.getRemoteTargetClient(&tgt)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
sys.arnRemotesMap[tgt.Arn] = tgtClient
|
||||
if _, ok := sys.clientsCache[tgtClient.EndpointURL().String()]; !ok {
|
||||
sys.clientsCache[tgtClient.EndpointURL().String()] = tgtClient
|
||||
}
|
||||
}
|
||||
sys.targetsMap[bucket] = cfg.Targets
|
||||
}
|
||||
|
||||
// create minio-go clients for buckets having remote targets
|
||||
func (sys *BucketTargetSys) load(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) {
|
||||
for _, bucket := range buckets {
|
||||
@ -229,7 +296,7 @@ func (sys *BucketTargetSys) getRemoteARN(bucket string, target *madmin.BucketTar
|
||||
return tgt.Arn
|
||||
}
|
||||
}
|
||||
if !madmin.ArnType(target.Type).IsValid() {
|
||||
if !madmin.ServiceType(target.Type).IsValid() {
|
||||
return ""
|
||||
}
|
||||
arn := madmin.ARN{
|
||||
|
@ -411,6 +411,13 @@ func (e BucketReplicationTargetNotVersioned) Error() string {
|
||||
return "Replication target does not have versioning enabled: " + e.Bucket
|
||||
}
|
||||
|
||||
// BucketReplicationSourceNotVersioned replication source does not have versioning enabled.
|
||||
type BucketReplicationSourceNotVersioned GenericError
|
||||
|
||||
func (e BucketReplicationSourceNotVersioned) Error() string {
|
||||
return "Replication source does not have versioning enabled: " + e.Bucket
|
||||
}
|
||||
|
||||
/// Bucket related errors.
|
||||
|
||||
// BucketNameInvalid - bucketname provided is invalid.
|
||||
|
@ -610,6 +610,10 @@ func (s *peerRESTServer) LoadBucketMetadataHandler(w http.ResponseWriter, r *htt
|
||||
if meta.notificationConfig != nil {
|
||||
globalNotificationSys.AddRulesMap(bucketName, meta.notificationConfig.ToRulesMap())
|
||||
}
|
||||
|
||||
if meta.bucketTargetConfig != nil {
|
||||
globalBucketTargetSys.UpdateTarget(bucketName, meta.bucketTargetConfig)
|
||||
}
|
||||
}
|
||||
|
||||
// ReloadFormatHandler - Reload Format.
|
||||
|
@ -13,22 +13,22 @@ To replicate objects in a bucket to a destination bucket on a target site either
|
||||
Create a replication target on the source cluster as shown below:
|
||||
|
||||
```
|
||||
mc admin bucket remote set myminio/srcbucket https://accessKey:secretKey@replica-endpoint:9000/destbucket --service replication --region us-east-1
|
||||
Replication ARN = 'arn:minio:replication:us-east-1:c5be6b16-769d-432a-9ef1-4567081f3566:destbucket'
|
||||
mc admin bucket remote add myminio/srcbucket https://accessKey:secretKey@replica-endpoint:9000/destbucket --service replication --region us-east-1
|
||||
Role ARN = 'arn:minio:replication:us-east-1:c5be6b16-769d-432a-9ef1-4567081f3566:destbucket'
|
||||
```
|
||||
|
||||
Note that the admin needs *s3:GetReplicationConfigurationAction* permission on source cluster. The credential used at the destination requires *s3:ReplicateObject* permission. Once successfully created and authorized this generates a replication target ARN. The command below lists all the currently authorized replication targets:
|
||||
|
||||
```
|
||||
mc admin bucket remote list myminio/srcbucket --service "replication"
|
||||
Replication ARN = 'arn:minio:replication:us-east-1:c5be6b16-769d-432a-9ef1-4567081f3566:destbucket'
|
||||
mc admin bucket remote ls myminio/srcbucket --service "replication"
|
||||
Role ARN = 'arn:minio:replication:us-east-1:c5be6b16-769d-432a-9ef1-4567081f3566:destbucket'
|
||||
```
|
||||
|
||||
The replication configuration can now be added to the source bucket by applying the json file with replication configuration. The ReplicationArn is passed in as a json element in the configuration.
|
||||
The replication configuration can now be added to the source bucket by applying the json file with replication configuration. The Role ARN above is passed in as a json element in the configuration.
|
||||
|
||||
```json
|
||||
{
|
||||
"ReplicationArn" :"arn:minio:replication:us-east-1:c5be6b16-769d-432a-9ef1-4567081f3566:destbucket",
|
||||
"Role" :"arn:minio:replication:us-east-1:c5be6b16-769d-432a-9ef1-4567081f3566:destbucket",
|
||||
"Rules": [
|
||||
{
|
||||
"Status": "Enabled",
|
||||
@ -63,7 +63,7 @@ mc replicate add myminio/srcbucket/Tax --priority 1 --arn "arn:minio:replication
|
||||
Replication configuration applied successfully to myminio/srcbucket.
|
||||
```
|
||||
|
||||
Apart from *ReplicationArn* , rest of the configuration follows [AWS S3 Spec](https://docs.aws.amazon.com/AmazonS3/latest/dev/replication-add-config.html). Any objects uploaded to the source bucket that meet replication criteria will now be automatically replicated by the MinIO server to the remote destination bucket. Replication can be disabled at any time by disabling specific rules in the configuration or deleting the replication configuration entirely.
|
||||
The replication configuration follows [AWS S3 Spec](https://docs.aws.amazon.com/AmazonS3/latest/dev/replication-add-config.html). Any objects uploaded to the source bucket that meet replication criteria will now be automatically replicated by the MinIO server to the remote destination bucket. Replication can be disabled at any time by disabling specific rules in the configuration or deleting the replication configuration entirely.
|
||||
|
||||
When an object is deleted from the source bucket, the replica will not be deleted as per S3 spec.
|
||||
|
||||
|
@ -50,7 +50,7 @@ var (
|
||||
errReplicationNoRule = Errorf("Replication configuration should have at least one rule")
|
||||
errReplicationUniquePriority = Errorf("Replication configuration has duplicate priority")
|
||||
errReplicationDestinationMismatch = Errorf("The destination bucket must be same for all rules")
|
||||
errReplicationArnMissing = Errorf("Replication Arn missing")
|
||||
errRoleArnMissing = Errorf("Missing required parameter `Role` in ReplicationConfiguration")
|
||||
)
|
||||
|
||||
// Config - replication configuration specified in
|
||||
@ -58,8 +58,8 @@ var (
|
||||
type Config struct {
|
||||
XMLName xml.Name `xml:"ReplicationConfiguration" json:"-"`
|
||||
Rules []Rule `xml:"Rule" json:"Rules"`
|
||||
// ReplicationArn is a MinIO only extension and optional for AWS
|
||||
ReplicationArn string `xml:"ReplicationArn,omitempty" json:"ReplicationArn,omitempty"`
|
||||
// RoleArn is being reused for MinIO replication ARN
|
||||
RoleArn string `xml:"Role" json:"Role"`
|
||||
}
|
||||
|
||||
// Maximum 2MiB size per replication config.
|
||||
@ -84,8 +84,8 @@ func (c Config) Validate(bucket string, sameTarget bool) error {
|
||||
if len(c.Rules) == 0 {
|
||||
return errReplicationNoRule
|
||||
}
|
||||
if c.ReplicationArn == "" {
|
||||
return errReplicationArnMissing
|
||||
if c.RoleArn == "" {
|
||||
return errRoleArnMissing
|
||||
}
|
||||
// Validate all the rules in the replication config
|
||||
targetMap := make(map[string]struct{})
|
||||
|
@ -29,22 +29,22 @@ import (
|
||||
"github.com/minio/minio/pkg/auth"
|
||||
)
|
||||
|
||||
// ArnType represents bucket ARN type
|
||||
type ArnType string
|
||||
// ServiceType represents service type
|
||||
type ServiceType string
|
||||
|
||||
const (
|
||||
// ReplicationArn specifies a ARN type of replication
|
||||
ReplicationArn ArnType = "replication"
|
||||
// ReplicationService specifies replication service
|
||||
ReplicationService ServiceType = "replication"
|
||||
)
|
||||
|
||||
// IsValid returns true if ARN type is replication
|
||||
func (t ArnType) IsValid() bool {
|
||||
return t == ReplicationArn
|
||||
// IsValid returns true if ARN type represents replication
|
||||
func (t ServiceType) IsValid() bool {
|
||||
return t == ReplicationService
|
||||
}
|
||||
|
||||
// ARN is a struct to define arn.
|
||||
type ARN struct {
|
||||
Type ArnType
|
||||
Type ServiceType
|
||||
ID string
|
||||
Region string
|
||||
Bucket string
|
||||
@ -75,7 +75,7 @@ func ParseARN(s string) (*ARN, error) {
|
||||
}
|
||||
|
||||
return &ARN{
|
||||
Type: ArnType(tokens[2]),
|
||||
Type: ServiceType(tokens[2]),
|
||||
Region: tokens[3],
|
||||
ID: tokens[4],
|
||||
Bucket: tokens[5],
|
||||
@ -84,6 +84,7 @@ func ParseARN(s string) (*ARN, error) {
|
||||
|
||||
// BucketTarget represents the target bucket and site association.
|
||||
type BucketTarget struct {
|
||||
SourceBucket string `json:"sourcebucket"`
|
||||
Endpoint string `json:"endpoint"`
|
||||
Credentials *auth.Credentials `json:"credentials"`
|
||||
TargetBucket string `json:"targetbucket"`
|
||||
@ -91,10 +92,26 @@ type BucketTarget struct {
|
||||
Path string `json:"path,omitempty"`
|
||||
API string `json:"api,omitempty"`
|
||||
Arn string `json:"arn,omitempty"`
|
||||
Type ArnType `json:"type"`
|
||||
Type ServiceType `json:"type"`
|
||||
Region string `json:"omitempty"`
|
||||
}
|
||||
|
||||
// Clone returns shallow clone of BucketTarget without secret key in credentials
|
||||
func (t *BucketTarget) Clone() BucketTarget {
|
||||
return BucketTarget{
|
||||
SourceBucket: t.SourceBucket,
|
||||
Endpoint: t.Endpoint,
|
||||
TargetBucket: t.TargetBucket,
|
||||
Credentials: &auth.Credentials{AccessKey: t.Credentials.AccessKey},
|
||||
Secure: t.Secure,
|
||||
Path: t.Path,
|
||||
API: t.Path,
|
||||
Arn: t.Arn,
|
||||
Type: t.Type,
|
||||
Region: t.Region,
|
||||
}
|
||||
}
|
||||
|
||||
// URL returns target url
|
||||
func (t BucketTarget) URL() string {
|
||||
scheme := "http"
|
||||
@ -132,18 +149,18 @@ func (t BucketTargets) Empty() bool {
|
||||
return empty
|
||||
}
|
||||
|
||||
// ListBucketTargets - gets target(s) for this bucket
|
||||
func (adm *AdminClient) ListBucketTargets(ctx context.Context, bucket, arnType string) (targets []BucketTarget, err error) {
|
||||
// ListRemoteTargets - gets target(s) for this bucket
|
||||
func (adm *AdminClient) ListRemoteTargets(ctx context.Context, bucket, arnType string) (targets []BucketTarget, err error) {
|
||||
queryValues := url.Values{}
|
||||
queryValues.Set("bucket", bucket)
|
||||
queryValues.Set("type", arnType)
|
||||
|
||||
reqData := requestData{
|
||||
relPath: adminAPIPrefix + "/list-bucket-targets",
|
||||
relPath: adminAPIPrefix + "/list-remote-targets",
|
||||
queryValues: queryValues,
|
||||
}
|
||||
|
||||
// Execute GET on /minio/admin/v3/list-bucket-targets
|
||||
// Execute GET on /minio/admin/v3/list-remote-targets
|
||||
resp, err := adm.executeMethod(ctx, http.MethodGet, reqData)
|
||||
|
||||
defer closeResponse(resp)
|
||||
@ -165,8 +182,8 @@ func (adm *AdminClient) ListBucketTargets(ctx context.Context, bucket, arnType s
|
||||
return targets, nil
|
||||
}
|
||||
|
||||
// SetBucketTarget sets up a remote target for this bucket
|
||||
func (adm *AdminClient) SetBucketTarget(ctx context.Context, bucket string, target *BucketTarget) (string, error) {
|
||||
// SetRemoteTarget sets up a remote target for this bucket
|
||||
func (adm *AdminClient) SetRemoteTarget(ctx context.Context, bucket string, target *BucketTarget) (string, error) {
|
||||
data, err := json.Marshal(target)
|
||||
if err != nil {
|
||||
return "", err
|
||||
@ -179,12 +196,12 @@ func (adm *AdminClient) SetBucketTarget(ctx context.Context, bucket string, targ
|
||||
queryValues.Set("bucket", bucket)
|
||||
|
||||
reqData := requestData{
|
||||
relPath: adminAPIPrefix + "/set-bucket-target",
|
||||
relPath: adminAPIPrefix + "/set-remote-target",
|
||||
queryValues: queryValues,
|
||||
content: encData,
|
||||
}
|
||||
|
||||
// Execute PUT on /minio/admin/v3/set-bucket-target to set a target for this bucket of specific arn type.
|
||||
// Execute PUT on /minio/admin/v3/set-remote-target to set a target for this bucket of specific arn type.
|
||||
resp, err := adm.executeMethod(ctx, http.MethodPut, reqData)
|
||||
|
||||
defer closeResponse(resp)
|
||||
@ -206,18 +223,18 @@ func (adm *AdminClient) SetBucketTarget(ctx context.Context, bucket string, targ
|
||||
return arn, nil
|
||||
}
|
||||
|
||||
// RemoveBucketTarget removes a remote target associated with particular ARN for this bucket
|
||||
func (adm *AdminClient) RemoveBucketTarget(ctx context.Context, bucket, arn string) error {
|
||||
// RemoveRemoteTarget removes a remote target associated with particular ARN for this bucket
|
||||
func (adm *AdminClient) RemoveRemoteTarget(ctx context.Context, bucket, arn string) error {
|
||||
queryValues := url.Values{}
|
||||
queryValues.Set("bucket", bucket)
|
||||
queryValues.Set("arn", arn)
|
||||
|
||||
reqData := requestData{
|
||||
relPath: adminAPIPrefix + "/remove-bucket-target",
|
||||
relPath: adminAPIPrefix + "/remove-remote-target",
|
||||
queryValues: queryValues,
|
||||
}
|
||||
|
||||
// Execute PUT on /minio/admin/v3/remove-bucket-target to remove a target for this bucket
|
||||
// Execute PUT on /minio/admin/v3/remove-remote-target to remove a target for this bucket
|
||||
// with specific ARN
|
||||
resp, err := adm.executeMethod(ctx, http.MethodDelete, reqData)
|
||||
defer closeResponse(resp)
|
||||
|
Loading…
Reference in New Issue
Block a user