mirror of
https://github.com/minio/minio.git
synced 2025-05-21 09:33:50 -04:00
reject resync start on misconfigured replication rules (#15041)
we expect resync to start on buckets with replication rule ExistingObjects enabled, if not we reject such calls.
This commit is contained in:
parent
fd02492cb7
commit
48e367ff7d
1
Makefile
1
Makefile
@ -58,6 +58,7 @@ test-iam: build ## verify IAM (external IDP, etcd backends)
|
|||||||
test-replication: install ## verify multi site replication
|
test-replication: install ## verify multi site replication
|
||||||
@echo "Running tests for replicating three sites"
|
@echo "Running tests for replicating three sites"
|
||||||
@(env bash $(PWD)/docs/bucket/replication/setup_3site_replication.sh)
|
@(env bash $(PWD)/docs/bucket/replication/setup_3site_replication.sh)
|
||||||
|
@(env bash $(PWD)/docs/bucket/replication/setup_2site_existing_replication.sh)
|
||||||
|
|
||||||
test-site-replication-ldap: install ## verify automatic site replication
|
test-site-replication-ldap: install ## verify automatic site replication
|
||||||
@echo "Running tests for automatic site replication of IAM (with LDAP)"
|
@echo "Running tests for automatic site replication of IAM (with LDAP)"
|
||||||
|
@ -131,7 +131,7 @@ const (
|
|||||||
ErrReplicationNeedsVersioningError
|
ErrReplicationNeedsVersioningError
|
||||||
ErrReplicationBucketNeedsVersioningError
|
ErrReplicationBucketNeedsVersioningError
|
||||||
ErrReplicationDenyEditError
|
ErrReplicationDenyEditError
|
||||||
ErrReplicationNoMatchingRuleError
|
ErrReplicationNoExistingObjects
|
||||||
ErrObjectRestoreAlreadyInProgress
|
ErrObjectRestoreAlreadyInProgress
|
||||||
ErrNoSuchKey
|
ErrNoSuchKey
|
||||||
ErrNoSuchUpload
|
ErrNoSuchUpload
|
||||||
@ -893,9 +893,9 @@ var errorCodes = errorCodeMap{
|
|||||||
Description: "Bandwidth limit for remote target must be atleast 100MBps",
|
Description: "Bandwidth limit for remote target must be atleast 100MBps",
|
||||||
HTTPStatusCode: http.StatusBadRequest,
|
HTTPStatusCode: http.StatusBadRequest,
|
||||||
},
|
},
|
||||||
ErrReplicationNoMatchingRuleError: {
|
ErrReplicationNoExistingObjects: {
|
||||||
Code: "XMinioReplicationNoMatchingRule",
|
Code: "XMinioReplicationNoExistingObjects",
|
||||||
Description: "No matching replication rule found for this object prefix",
|
Description: "No matching ExistingsObjects rule enabled",
|
||||||
HTTPStatusCode: http.StatusBadRequest,
|
HTTPStatusCode: http.StatusBadRequest,
|
||||||
},
|
},
|
||||||
ErrReplicationDenyEditError: {
|
ErrReplicationDenyEditError: {
|
||||||
|
File diff suppressed because one or more lines are too long
@ -21,7 +21,6 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"encoding/json"
|
|
||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
@ -33,7 +32,6 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
@ -1596,372 +1594,3 @@ func (api objectAPIHandlers) DeleteBucketTaggingHandler(w http.ResponseWriter, r
|
|||||||
// Write success response.
|
// Write success response.
|
||||||
writeSuccessResponseHeadersOnly(w)
|
writeSuccessResponseHeadersOnly(w)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutBucketReplicationConfigHandler - PUT Bucket replication configuration.
|
|
||||||
// ----------
|
|
||||||
// Add a replication configuration on the specified bucket as specified in https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketReplication.html
|
|
||||||
func (api objectAPIHandlers) PutBucketReplicationConfigHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
ctx := newContext(r, w, "PutBucketReplicationConfig")
|
|
||||||
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
|
||||||
|
|
||||||
vars := mux.Vars(r)
|
|
||||||
bucket := vars["bucket"]
|
|
||||||
objectAPI := api.ObjectAPI()
|
|
||||||
if objectAPI == nil {
|
|
||||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if globalIsGateway {
|
|
||||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if s3Error := checkRequestAuthType(ctx, r, policy.PutReplicationConfigurationAction, bucket, ""); s3Error != ErrNone {
|
|
||||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// Check if bucket exists.
|
|
||||||
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if versioned := globalBucketVersioningSys.Enabled(bucket); !versioned {
|
|
||||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrReplicationNeedsVersioningError), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
replicationConfig, err := replication.ParseConfig(io.LimitReader(r.Body, r.ContentLength))
|
|
||||||
if err != nil {
|
|
||||||
apiErr := errorCodes.ToAPIErr(ErrMalformedXML)
|
|
||||||
apiErr.Description = err.Error()
|
|
||||||
writeErrorResponse(ctx, w, apiErr, r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
sameTarget, apiErr := validateReplicationDestination(ctx, bucket, replicationConfig, true)
|
|
||||||
if apiErr != noError {
|
|
||||||
writeErrorResponse(ctx, w, apiErr, r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// Validate the received bucket replication config
|
|
||||||
if err = replicationConfig.Validate(bucket, sameTarget); err != nil {
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
configData, err := xml.Marshal(replicationConfig)
|
|
||||||
if err != nil {
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err = globalBucketMetadataSys.Update(ctx, bucket, bucketReplicationConfig, configData); err != nil {
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write success response.
|
|
||||||
writeSuccessResponseHeadersOnly(w)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetBucketReplicationConfigHandler - GET Bucket replication configuration.
|
|
||||||
// ----------
|
|
||||||
// Gets the replication configuration for a bucket.
|
|
||||||
func (api objectAPIHandlers) GetBucketReplicationConfigHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
ctx := newContext(r, w, "GetBucketReplicationConfig")
|
|
||||||
|
|
||||||
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
|
||||||
|
|
||||||
vars := mux.Vars(r)
|
|
||||||
bucket := vars["bucket"]
|
|
||||||
|
|
||||||
objectAPI := api.ObjectAPI()
|
|
||||||
if objectAPI == nil {
|
|
||||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if user has permissions to perform this operation
|
|
||||||
if s3Error := checkRequestAuthType(ctx, r, policy.GetReplicationConfigurationAction, bucket, ""); s3Error != ErrNone {
|
|
||||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// Check if bucket exists.
|
|
||||||
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
config, _, err := globalBucketMetadataSys.GetReplicationConfig(ctx, bucket)
|
|
||||||
if err != nil {
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
configData, err := xml.Marshal(config)
|
|
||||||
if err != nil {
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write success response.
|
|
||||||
writeSuccessResponseXML(w, configData)
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeleteBucketReplicationConfigHandler - DELETE Bucket replication config.
|
|
||||||
// ----------
|
|
||||||
func (api objectAPIHandlers) DeleteBucketReplicationConfigHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
ctx := newContext(r, w, "DeleteBucketReplicationConfig")
|
|
||||||
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
|
||||||
vars := mux.Vars(r)
|
|
||||||
bucket := vars["bucket"]
|
|
||||||
|
|
||||||
objectAPI := api.ObjectAPI()
|
|
||||||
if objectAPI == nil {
|
|
||||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if s3Error := checkRequestAuthType(ctx, r, policy.PutReplicationConfigurationAction, bucket, ""); s3Error != ErrNone {
|
|
||||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// Check if bucket exists.
|
|
||||||
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if globalSiteReplicationSys.isEnabled() {
|
|
||||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrReplicationDenyEditError), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := globalBucketMetadataSys.Update(ctx, bucket, bucketReplicationConfig, nil); err != nil {
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write success response.
|
|
||||||
writeSuccessResponseHeadersOnly(w)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetBucketReplicationMetricsHandler - GET Bucket replication metrics.
|
|
||||||
// ----------
|
|
||||||
// Gets the replication metrics for a bucket.
|
|
||||||
func (api objectAPIHandlers) GetBucketReplicationMetricsHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
ctx := newContext(r, w, "GetBucketReplicationMetrics")
|
|
||||||
|
|
||||||
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
|
||||||
|
|
||||||
vars := mux.Vars(r)
|
|
||||||
bucket := vars["bucket"]
|
|
||||||
|
|
||||||
objectAPI := api.ObjectAPI()
|
|
||||||
if objectAPI == nil {
|
|
||||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if user has permissions to perform this operation
|
|
||||||
if s3Error := checkRequestAuthType(ctx, r, policy.GetReplicationConfigurationAction, bucket, ""); s3Error != ErrNone {
|
|
||||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if bucket exists.
|
|
||||||
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
var usageInfo BucketUsageInfo
|
|
||||||
dataUsageInfo, err := loadDataUsageFromBackend(ctx, objectAPI)
|
|
||||||
if err == nil && !dataUsageInfo.LastUpdate.IsZero() {
|
|
||||||
usageInfo = dataUsageInfo.BucketsUsage[bucket]
|
|
||||||
}
|
|
||||||
|
|
||||||
w.Header().Set(xhttp.ContentType, string(mimeJSON))
|
|
||||||
|
|
||||||
enc := json.NewEncoder(w)
|
|
||||||
if err = enc.Encode(getLatestReplicationStats(bucket, usageInfo)); err != nil {
|
|
||||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ResetBucketReplicationStartHandler - starts a replication reset for all objects in a bucket which
|
|
||||||
// qualify for replication and re-sync the object(s) to target, provided ExistingObjectReplication is
|
|
||||||
// enabled for the qualifying rule. This API is a MinIO only extension provided for situations where
|
|
||||||
// remote target is entirely lost,and previously replicated objects need to be re-synced. If resync is
|
|
||||||
// already in progress it returns an error
|
|
||||||
func (api objectAPIHandlers) ResetBucketReplicationStartHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
ctx := newContext(r, w, "ResetBucketReplicationStart")
|
|
||||||
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
|
||||||
|
|
||||||
vars := mux.Vars(r)
|
|
||||||
bucket := vars["bucket"]
|
|
||||||
durationStr := r.URL.Query().Get("older-than")
|
|
||||||
arn := r.URL.Query().Get("arn")
|
|
||||||
resetID := r.URL.Query().Get("reset-id")
|
|
||||||
if resetID == "" {
|
|
||||||
resetID = mustGetUUID()
|
|
||||||
}
|
|
||||||
var (
|
|
||||||
days time.Duration
|
|
||||||
err error
|
|
||||||
)
|
|
||||||
if durationStr != "" {
|
|
||||||
days, err = time.ParseDuration(durationStr)
|
|
||||||
if err != nil {
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, InvalidArgument{
|
|
||||||
Bucket: bucket,
|
|
||||||
Err: fmt.Errorf("invalid query parameter older-than %s for %s : %w", durationStr, bucket, err),
|
|
||||||
}), r.URL)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
resetBeforeDate := UTCNow().AddDate(0, 0, -1*int(days/24))
|
|
||||||
|
|
||||||
objectAPI := api.ObjectAPI()
|
|
||||||
if objectAPI == nil {
|
|
||||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if s3Error := checkRequestAuthType(ctx, r, policy.ResetBucketReplicationStateAction, bucket, ""); s3Error != ErrNone {
|
|
||||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if bucket exists.
|
|
||||||
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
config, _, err := globalBucketMetadataSys.GetReplicationConfig(ctx, bucket)
|
|
||||||
if err != nil {
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !config.HasActiveRules("", true) {
|
|
||||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrReplicationNoMatchingRuleError), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
tgtArns := config.FilterTargetArns(
|
|
||||||
replication.ObjectOpts{
|
|
||||||
OpType: replication.ResyncReplicationType,
|
|
||||||
TargetArn: arn,
|
|
||||||
})
|
|
||||||
|
|
||||||
if len(tgtArns) == 0 {
|
|
||||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrBadRequest, InvalidArgument{
|
|
||||||
Bucket: bucket,
|
|
||||||
Err: fmt.Errorf("Remote target ARN %s missing or ineligible for replication resync", arn),
|
|
||||||
}), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(tgtArns) > 1 && arn == "" {
|
|
||||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrBadRequest, InvalidArgument{
|
|
||||||
Bucket: bucket,
|
|
||||||
Err: fmt.Errorf("ARN should be specified for replication reset"),
|
|
||||||
}), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
var rinfo ResyncTargetsInfo
|
|
||||||
target := globalBucketTargetSys.GetRemoteBucketTargetByArn(ctx, bucket, tgtArns[0])
|
|
||||||
target.ResetBeforeDate = UTCNow().AddDate(0, 0, -1*int(days/24))
|
|
||||||
target.ResetID = resetID
|
|
||||||
rinfo.Targets = append(rinfo.Targets, ResyncTarget{Arn: tgtArns[0], ResetID: target.ResetID})
|
|
||||||
if err = globalBucketTargetSys.SetTarget(ctx, bucket, &target, true); err != nil {
|
|
||||||
switch err.(type) {
|
|
||||||
case BucketRemoteConnectionErr:
|
|
||||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrReplicationRemoteConnectionError, err), r.URL)
|
|
||||||
default:
|
|
||||||
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err := startReplicationResync(ctx, bucket, arn, resetID, resetBeforeDate, objectAPI); err != nil {
|
|
||||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrBadRequest, InvalidArgument{
|
|
||||||
Bucket: bucket,
|
|
||||||
Err: err,
|
|
||||||
}), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
data, err := json.Marshal(rinfo)
|
|
||||||
if err != nil {
|
|
||||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// Write success response.
|
|
||||||
writeSuccessResponseJSON(w, data)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ResetBucketReplicationStatusHandler - returns the status of replication reset.
|
|
||||||
// This API is a MinIO only extension
|
|
||||||
func (api objectAPIHandlers) ResetBucketReplicationStatusHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
ctx := newContext(r, w, "ResetBucketReplicationStatus")
|
|
||||||
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
|
||||||
|
|
||||||
vars := mux.Vars(r)
|
|
||||||
bucket := vars["bucket"]
|
|
||||||
arn := r.URL.Query().Get("arn")
|
|
||||||
var err error
|
|
||||||
|
|
||||||
objectAPI := api.ObjectAPI()
|
|
||||||
if objectAPI == nil {
|
|
||||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if s3Error := checkRequestAuthType(ctx, r, policy.ResetBucketReplicationStateAction, bucket, ""); s3Error != ErrNone {
|
|
||||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if bucket exists.
|
|
||||||
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, _, err := globalBucketMetadataSys.GetReplicationConfig(ctx, bucket); err != nil {
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
globalReplicationPool.resyncState.RLock()
|
|
||||||
brs, ok := globalReplicationPool.resyncState.statusMap[bucket]
|
|
||||||
if !ok {
|
|
||||||
brs, err = loadBucketResyncMetadata(ctx, bucket, objectAPI)
|
|
||||||
if err != nil {
|
|
||||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErrWithErr(ErrBadRequest, InvalidArgument{
|
|
||||||
Bucket: bucket,
|
|
||||||
Err: fmt.Errorf("No replication resync status available for %s", arn),
|
|
||||||
}), r.URL)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var rinfo ResyncTargetsInfo
|
|
||||||
for tarn, st := range brs.TargetsMap {
|
|
||||||
if arn != "" && tarn != arn {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
rinfo.Targets = append(rinfo.Targets, ResyncTarget{
|
|
||||||
Arn: tarn,
|
|
||||||
ResetID: st.ResyncID,
|
|
||||||
StartTime: st.StartTime,
|
|
||||||
EndTime: st.EndTime,
|
|
||||||
ResyncStatus: st.ResyncStatus.String(),
|
|
||||||
ReplicatedSize: st.ReplicatedSize,
|
|
||||||
ReplicatedCount: st.ReplicatedCount,
|
|
||||||
FailedSize: st.FailedSize,
|
|
||||||
FailedCount: st.FailedCount,
|
|
||||||
Bucket: st.Bucket,
|
|
||||||
Object: st.Object,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
globalReplicationPool.resyncState.RUnlock()
|
|
||||||
data, err := json.Marshal(rinfo)
|
|
||||||
if err != nil {
|
|
||||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// Write success response.
|
|
||||||
writeSuccessResponseJSON(w, data)
|
|
||||||
}
|
|
||||||
|
404
cmd/bucket-replication-handlers.go
Normal file
404
cmd/bucket-replication-handlers.go
Normal file
@ -0,0 +1,404 @@
|
|||||||
|
// Copyright (c) 2015-2022 MinIO, Inc.
|
||||||
|
//
|
||||||
|
// This file is part of MinIO Object Storage stack
|
||||||
|
//
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// This program is distributed in the hope that it will be useful
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package cmd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"encoding/xml"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/gorilla/mux"
|
||||||
|
"github.com/minio/minio/internal/bucket/replication"
|
||||||
|
xhttp "github.com/minio/minio/internal/http"
|
||||||
|
"github.com/minio/minio/internal/logger"
|
||||||
|
"github.com/minio/pkg/bucket/policy"
|
||||||
|
)
|
||||||
|
|
||||||
|
// PutBucketReplicationConfigHandler - PUT Bucket replication configuration.
|
||||||
|
// ----------
|
||||||
|
// Add a replication configuration on the specified bucket as specified in https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketReplication.html
|
||||||
|
func (api objectAPIHandlers) PutBucketReplicationConfigHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ctx := newContext(r, w, "PutBucketReplicationConfig")
|
||||||
|
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
||||||
|
|
||||||
|
vars := mux.Vars(r)
|
||||||
|
bucket := vars["bucket"]
|
||||||
|
objectAPI := api.ObjectAPI()
|
||||||
|
if objectAPI == nil {
|
||||||
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if globalIsGateway {
|
||||||
|
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if s3Error := checkRequestAuthType(ctx, r, policy.PutReplicationConfigurationAction, bucket, ""); s3Error != ErrNone {
|
||||||
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Check if bucket exists.
|
||||||
|
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
|
||||||
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if versioned := globalBucketVersioningSys.Enabled(bucket); !versioned {
|
||||||
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrReplicationNeedsVersioningError), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
replicationConfig, err := replication.ParseConfig(io.LimitReader(r.Body, r.ContentLength))
|
||||||
|
if err != nil {
|
||||||
|
apiErr := errorCodes.ToAPIErr(ErrMalformedXML)
|
||||||
|
apiErr.Description = err.Error()
|
||||||
|
writeErrorResponse(ctx, w, apiErr, r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sameTarget, apiErr := validateReplicationDestination(ctx, bucket, replicationConfig, true)
|
||||||
|
if apiErr != noError {
|
||||||
|
writeErrorResponse(ctx, w, apiErr, r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Validate the received bucket replication config
|
||||||
|
if err = replicationConfig.Validate(bucket, sameTarget); err != nil {
|
||||||
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
configData, err := xml.Marshal(replicationConfig)
|
||||||
|
if err != nil {
|
||||||
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err = globalBucketMetadataSys.Update(ctx, bucket, bucketReplicationConfig, configData); err != nil {
|
||||||
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write success response.
|
||||||
|
writeSuccessResponseHeadersOnly(w)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetBucketReplicationConfigHandler - GET Bucket replication configuration.
|
||||||
|
// ----------
|
||||||
|
// Gets the replication configuration for a bucket.
|
||||||
|
func (api objectAPIHandlers) GetBucketReplicationConfigHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ctx := newContext(r, w, "GetBucketReplicationConfig")
|
||||||
|
|
||||||
|
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
||||||
|
|
||||||
|
vars := mux.Vars(r)
|
||||||
|
bucket := vars["bucket"]
|
||||||
|
|
||||||
|
objectAPI := api.ObjectAPI()
|
||||||
|
if objectAPI == nil {
|
||||||
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if user has permissions to perform this operation
|
||||||
|
if s3Error := checkRequestAuthType(ctx, r, policy.GetReplicationConfigurationAction, bucket, ""); s3Error != ErrNone {
|
||||||
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Check if bucket exists.
|
||||||
|
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
|
||||||
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
config, _, err := globalBucketMetadataSys.GetReplicationConfig(ctx, bucket)
|
||||||
|
if err != nil {
|
||||||
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
configData, err := xml.Marshal(config)
|
||||||
|
if err != nil {
|
||||||
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write success response.
|
||||||
|
writeSuccessResponseXML(w, configData)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteBucketReplicationConfigHandler - DELETE Bucket replication config.
|
||||||
|
// ----------
|
||||||
|
func (api objectAPIHandlers) DeleteBucketReplicationConfigHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ctx := newContext(r, w, "DeleteBucketReplicationConfig")
|
||||||
|
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
||||||
|
vars := mux.Vars(r)
|
||||||
|
bucket := vars["bucket"]
|
||||||
|
|
||||||
|
objectAPI := api.ObjectAPI()
|
||||||
|
if objectAPI == nil {
|
||||||
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if s3Error := checkRequestAuthType(ctx, r, policy.PutReplicationConfigurationAction, bucket, ""); s3Error != ErrNone {
|
||||||
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Check if bucket exists.
|
||||||
|
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
|
||||||
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if globalSiteReplicationSys.isEnabled() {
|
||||||
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrReplicationDenyEditError), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := globalBucketMetadataSys.Update(ctx, bucket, bucketReplicationConfig, nil); err != nil {
|
||||||
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write success response.
|
||||||
|
writeSuccessResponseHeadersOnly(w)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetBucketReplicationMetricsHandler - GET Bucket replication metrics.
|
||||||
|
// ----------
|
||||||
|
// Gets the replication metrics for a bucket.
|
||||||
|
func (api objectAPIHandlers) GetBucketReplicationMetricsHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ctx := newContext(r, w, "GetBucketReplicationMetrics")
|
||||||
|
|
||||||
|
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
||||||
|
|
||||||
|
vars := mux.Vars(r)
|
||||||
|
bucket := vars["bucket"]
|
||||||
|
|
||||||
|
objectAPI := api.ObjectAPI()
|
||||||
|
if objectAPI == nil {
|
||||||
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if user has permissions to perform this operation
|
||||||
|
if s3Error := checkRequestAuthType(ctx, r, policy.GetReplicationConfigurationAction, bucket, ""); s3Error != ErrNone {
|
||||||
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if bucket exists.
|
||||||
|
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
|
||||||
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var usageInfo BucketUsageInfo
|
||||||
|
dataUsageInfo, err := loadDataUsageFromBackend(ctx, objectAPI)
|
||||||
|
if err == nil && !dataUsageInfo.LastUpdate.IsZero() {
|
||||||
|
usageInfo = dataUsageInfo.BucketsUsage[bucket]
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Header().Set(xhttp.ContentType, string(mimeJSON))
|
||||||
|
|
||||||
|
enc := json.NewEncoder(w)
|
||||||
|
if err = enc.Encode(getLatestReplicationStats(bucket, usageInfo)); err != nil {
|
||||||
|
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResetBucketReplicationStartHandler - starts a replication reset for all objects in a bucket which
|
||||||
|
// qualify for replication and re-sync the object(s) to target, provided ExistingObjectReplication is
|
||||||
|
// enabled for the qualifying rule. This API is a MinIO only extension provided for situations where
|
||||||
|
// remote target is entirely lost,and previously replicated objects need to be re-synced. If resync is
|
||||||
|
// already in progress it returns an error
|
||||||
|
func (api objectAPIHandlers) ResetBucketReplicationStartHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ctx := newContext(r, w, "ResetBucketReplicationStart")
|
||||||
|
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
||||||
|
|
||||||
|
vars := mux.Vars(r)
|
||||||
|
bucket := vars["bucket"]
|
||||||
|
durationStr := r.URL.Query().Get("older-than")
|
||||||
|
arn := r.URL.Query().Get("arn")
|
||||||
|
resetID := r.URL.Query().Get("reset-id")
|
||||||
|
if resetID == "" {
|
||||||
|
resetID = mustGetUUID()
|
||||||
|
}
|
||||||
|
var (
|
||||||
|
days time.Duration
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
if durationStr != "" {
|
||||||
|
days, err = time.ParseDuration(durationStr)
|
||||||
|
if err != nil {
|
||||||
|
writeErrorResponse(ctx, w, toAPIError(ctx, InvalidArgument{
|
||||||
|
Bucket: bucket,
|
||||||
|
Err: fmt.Errorf("invalid query parameter older-than %s for %s : %w", durationStr, bucket, err),
|
||||||
|
}), r.URL)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
resetBeforeDate := UTCNow().AddDate(0, 0, -1*int(days/24))
|
||||||
|
|
||||||
|
objectAPI := api.ObjectAPI()
|
||||||
|
if objectAPI == nil {
|
||||||
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if s3Error := checkRequestAuthType(ctx, r, policy.ResetBucketReplicationStateAction, bucket, ""); s3Error != ErrNone {
|
||||||
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if bucket exists.
|
||||||
|
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
|
||||||
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
config, _, err := globalBucketMetadataSys.GetReplicationConfig(ctx, bucket)
|
||||||
|
if err != nil {
|
||||||
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !config.HasExistingObjectReplication(arn) {
|
||||||
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrReplicationNoExistingObjects), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
tgtArns := config.FilterTargetArns(
|
||||||
|
replication.ObjectOpts{
|
||||||
|
OpType: replication.ResyncReplicationType,
|
||||||
|
TargetArn: arn,
|
||||||
|
})
|
||||||
|
|
||||||
|
if len(tgtArns) == 0 {
|
||||||
|
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrBadRequest, InvalidArgument{
|
||||||
|
Bucket: bucket,
|
||||||
|
Err: fmt.Errorf("Remote target ARN %s missing or ineligible for replication resync", arn),
|
||||||
|
}), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(tgtArns) > 1 && arn == "" {
|
||||||
|
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrBadRequest, InvalidArgument{
|
||||||
|
Bucket: bucket,
|
||||||
|
Err: fmt.Errorf("ARN should be specified for replication reset"),
|
||||||
|
}), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var rinfo ResyncTargetsInfo
|
||||||
|
target := globalBucketTargetSys.GetRemoteBucketTargetByArn(ctx, bucket, tgtArns[0])
|
||||||
|
target.ResetBeforeDate = UTCNow().AddDate(0, 0, -1*int(days/24))
|
||||||
|
target.ResetID = resetID
|
||||||
|
rinfo.Targets = append(rinfo.Targets, ResyncTarget{Arn: tgtArns[0], ResetID: target.ResetID})
|
||||||
|
if err = globalBucketTargetSys.SetTarget(ctx, bucket, &target, true); err != nil {
|
||||||
|
switch err.(type) {
|
||||||
|
case BucketRemoteConnectionErr:
|
||||||
|
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrReplicationRemoteConnectionError, err), r.URL)
|
||||||
|
default:
|
||||||
|
writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := startReplicationResync(ctx, bucket, arn, resetID, resetBeforeDate, objectAPI); err != nil {
|
||||||
|
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrBadRequest, InvalidArgument{
|
||||||
|
Bucket: bucket,
|
||||||
|
Err: err,
|
||||||
|
}), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
data, err := json.Marshal(rinfo)
|
||||||
|
if err != nil {
|
||||||
|
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Write success response.
|
||||||
|
writeSuccessResponseJSON(w, data)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResetBucketReplicationStatusHandler - returns the status of replication reset.
|
||||||
|
// This API is a MinIO only extension
|
||||||
|
func (api objectAPIHandlers) ResetBucketReplicationStatusHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ctx := newContext(r, w, "ResetBucketReplicationStatus")
|
||||||
|
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
||||||
|
|
||||||
|
vars := mux.Vars(r)
|
||||||
|
bucket := vars["bucket"]
|
||||||
|
arn := r.URL.Query().Get("arn")
|
||||||
|
var err error
|
||||||
|
|
||||||
|
objectAPI := api.ObjectAPI()
|
||||||
|
if objectAPI == nil {
|
||||||
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if s3Error := checkRequestAuthType(ctx, r, policy.ResetBucketReplicationStateAction, bucket, ""); s3Error != ErrNone {
|
||||||
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if bucket exists.
|
||||||
|
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
|
||||||
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, _, err := globalBucketMetadataSys.GetReplicationConfig(ctx, bucket); err != nil {
|
||||||
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
globalReplicationPool.resyncState.RLock()
|
||||||
|
brs, ok := globalReplicationPool.resyncState.statusMap[bucket]
|
||||||
|
globalReplicationPool.resyncState.RUnlock()
|
||||||
|
if !ok {
|
||||||
|
brs, err = loadBucketResyncMetadata(ctx, bucket, objectAPI)
|
||||||
|
if err != nil {
|
||||||
|
writeErrorResponse(ctx, w, errorCodes.ToAPIErrWithErr(ErrBadRequest, InvalidArgument{
|
||||||
|
Bucket: bucket,
|
||||||
|
Err: fmt.Errorf("No replication resync status available for %s", arn),
|
||||||
|
}), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var rinfo ResyncTargetsInfo
|
||||||
|
for tarn, st := range brs.TargetsMap {
|
||||||
|
if arn != "" && tarn != arn {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
rinfo.Targets = append(rinfo.Targets, ResyncTarget{
|
||||||
|
Arn: tarn,
|
||||||
|
ResetID: st.ResyncID,
|
||||||
|
StartTime: st.StartTime,
|
||||||
|
EndTime: st.EndTime,
|
||||||
|
ResyncStatus: st.ResyncStatus.String(),
|
||||||
|
ReplicatedSize: st.ReplicatedSize,
|
||||||
|
ReplicatedCount: st.ReplicatedCount,
|
||||||
|
FailedSize: st.FailedSize,
|
||||||
|
FailedCount: st.FailedCount,
|
||||||
|
Bucket: st.Bucket,
|
||||||
|
Object: st.Object,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
data, err := json.Marshal(rinfo)
|
||||||
|
if err != nil {
|
||||||
|
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write success response.
|
||||||
|
writeSuccessResponseJSON(w, data)
|
||||||
|
}
|
@ -1757,10 +1757,6 @@ func (c replicationConfig) Resync(ctx context.Context, oi ObjectInfo, dsc *Repli
|
|||||||
if c.Empty() {
|
if c.Empty() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// existing object replication does not apply to un-versioned objects
|
|
||||||
if oi.VersionID == "" {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now overlay existing object replication choices for target
|
// Now overlay existing object replication choices for target
|
||||||
if oi.DeleteMarker {
|
if oi.DeleteMarker {
|
||||||
@ -2006,6 +2002,7 @@ func resyncBucket(ctx context.Context, bucket, arn string, heal bool, objectAPI
|
|||||||
st.EndTime = UTCNow()
|
st.EndTime = UTCNow()
|
||||||
st.ResyncStatus = resyncStatus
|
st.ResyncStatus = resyncStatus
|
||||||
m.TargetsMap[arn] = st
|
m.TargetsMap[arn] = st
|
||||||
|
globalReplicationPool.resyncState.statusMap[bucket] = m
|
||||||
globalReplicationPool.resyncState.Unlock()
|
globalReplicationPool.resyncState.Unlock()
|
||||||
}()
|
}()
|
||||||
// Allocate new results channel to receive ObjectInfo.
|
// Allocate new results channel to receive ObjectInfo.
|
||||||
@ -2066,7 +2063,6 @@ func resyncBucket(ctx context.Context, bucket, arn string, heal bool, objectAPI
|
|||||||
}
|
}
|
||||||
|
|
||||||
if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() {
|
if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() {
|
||||||
|
|
||||||
versionID := ""
|
versionID := ""
|
||||||
dmVersionID := ""
|
dmVersionID := ""
|
||||||
if roi.VersionPurgeStatus.Empty() {
|
if roi.VersionPurgeStatus.Empty() {
|
||||||
@ -2113,6 +2109,7 @@ func resyncBucket(ctx context.Context, bucket, arn string, heal bool, objectAPI
|
|||||||
st.ReplicatedSize += roi.Size
|
st.ReplicatedSize += roi.Size
|
||||||
}
|
}
|
||||||
m.TargetsMap[arn] = st
|
m.TargetsMap[arn] = st
|
||||||
|
globalReplicationPool.resyncState.statusMap[bucket] = m
|
||||||
globalReplicationPool.resyncState.Unlock()
|
globalReplicationPool.resyncState.Unlock()
|
||||||
}
|
}
|
||||||
resyncStatus = ResyncCompleted
|
resyncStatus = ResyncCompleted
|
||||||
|
93
docs/bucket/replication/setup_2site_existing_replication.sh
Executable file
93
docs/bucket/replication/setup_2site_existing_replication.sh
Executable file
@ -0,0 +1,93 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
trap 'catch $LINENO' ERR
|
||||||
|
|
||||||
|
# shellcheck disable=SC2120
|
||||||
|
catch() {
|
||||||
|
if [ $# -ne 0 ]; then
|
||||||
|
echo "error on line $1"
|
||||||
|
for site in sitea siteb; do
|
||||||
|
echo "$site server logs ========="
|
||||||
|
cat "/tmp/${site}_1.log"
|
||||||
|
echo "==========================="
|
||||||
|
cat "/tmp/${site}_2.log"
|
||||||
|
done
|
||||||
|
fi
|
||||||
|
|
||||||
|
echo "Cleaning up instances of MinIO"
|
||||||
|
pkill minio
|
||||||
|
pkill -9 minio
|
||||||
|
rm -rf /tmp/multisitea
|
||||||
|
rm -rf /tmp/multisiteb
|
||||||
|
rm -rf /tmp/data
|
||||||
|
}
|
||||||
|
|
||||||
|
catch
|
||||||
|
|
||||||
|
set -e
|
||||||
|
export MINIO_CI_CD=1
|
||||||
|
export MINIO_BROWSER=off
|
||||||
|
export MINIO_ROOT_USER="minio"
|
||||||
|
export MINIO_ROOT_PASSWORD="minio123"
|
||||||
|
export MINIO_KMS_AUTO_ENCRYPTION=off
|
||||||
|
export MINIO_PROMETHEUS_AUTH_TYPE=public
|
||||||
|
export MINIO_KMS_SECRET_KEY=my-minio-key:OSMM+vkKUTCvQs9YL/CVMIMt43HFhkUpqJxTmGl6rYw=
|
||||||
|
unset MINIO_KMS_KES_CERT_FILE
|
||||||
|
unset MINIO_KMS_KES_KEY_FILE
|
||||||
|
unset MINIO_KMS_KES_ENDPOINT
|
||||||
|
unset MINIO_KMS_KES_KEY_NAME
|
||||||
|
|
||||||
|
wget -O mc https://dl.minio.io/client/mc/release/linux-amd64/mc \
|
||||||
|
&& chmod +x mc
|
||||||
|
|
||||||
|
minio server --address 127.0.0.1:9001 "http://127.0.0.1:9001/tmp/multisitea/data/disterasure/xl{1...4}" \
|
||||||
|
"http://127.0.0.1:9002/tmp/multisitea/data/disterasure/xl{5...8}" >/tmp/sitea_1.log 2>&1 &
|
||||||
|
minio server --address 127.0.0.1:9002 "http://127.0.0.1:9001/tmp/multisitea/data/disterasure/xl{1...4}" \
|
||||||
|
"http://127.0.0.1:9002/tmp/multisitea/data/disterasure/xl{5...8}" >/tmp/sitea_2.log 2>&1 &
|
||||||
|
|
||||||
|
minio server --address 127.0.0.1:9003 "http://127.0.0.1:9003/tmp/multisiteb/data/disterasure/xl{1...4}" \
|
||||||
|
"http://127.0.0.1:9004/tmp/multisiteb/data/disterasure/xl{5...8}" >/tmp/siteb_1.log 2>&1 &
|
||||||
|
minio server --address 127.0.0.1:9004 "http://127.0.0.1:9003/tmp/multisiteb/data/disterasure/xl{1...4}" \
|
||||||
|
"http://127.0.0.1:9004/tmp/multisiteb/data/disterasure/xl{5...8}" >/tmp/siteb_2.log 2>&1 &
|
||||||
|
|
||||||
|
sleep 10s
|
||||||
|
|
||||||
|
export MC_HOST_sitea=http://minio:minio123@127.0.0.1:9001
|
||||||
|
export MC_HOST_siteb=http://minio:minio123@127.0.0.1:9004
|
||||||
|
|
||||||
|
./mc mb sitea/bucket
|
||||||
|
|
||||||
|
## Create 100 files
|
||||||
|
mkdir -p /tmp/data
|
||||||
|
for i in $(seq 1 10); do
|
||||||
|
echo "T" > /tmp/data/file_${i}.txt
|
||||||
|
done
|
||||||
|
|
||||||
|
./mc mirror /tmp/data sitea/bucket/
|
||||||
|
./mc version enable sitea/bucket
|
||||||
|
|
||||||
|
./mc mb siteb/bucket/
|
||||||
|
./mc version enable siteb/bucket/
|
||||||
|
|
||||||
|
echo "adding replication config for site a -> site b"
|
||||||
|
remote_arn=$(./mc admin bucket remote add sitea/bucket/ \
|
||||||
|
http://minio:minio123@127.0.0.1:9004/bucket \
|
||||||
|
--service "replication" --json | jq -r ".RemoteARN")
|
||||||
|
echo "adding replication rule for a -> b : ${remote_arn}"
|
||||||
|
|
||||||
|
./mc replicate add sitea/bucket/ \
|
||||||
|
--remote-bucket "${remote_arn}"
|
||||||
|
sleep 1
|
||||||
|
|
||||||
|
./mc replicate resync start sitea/bucket/ --remote-bucket "${remote_arn}"
|
||||||
|
sleep 10s ## sleep for 10s idea is that we give 100ms per object.
|
||||||
|
|
||||||
|
count=$(./mc replicate resync status sitea/bucket --remote-bucket "${remote_arn}" --json | jq .resyncInfo.target[].replicationCount)
|
||||||
|
./mc ls --versions sitea/bucket
|
||||||
|
./mc ls --versions siteb/bucket
|
||||||
|
if [ $count -ne 10 ]; then
|
||||||
|
echo "resync not complete after 100s unexpected failure"
|
||||||
|
./mc diff sitea/bucket siteb/bucket
|
||||||
|
fi
|
||||||
|
|
||||||
|
catch
|
@ -146,6 +146,18 @@ type ObjectOpts struct {
|
|||||||
TargetArn string
|
TargetArn string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HasExistingObjectReplication returns true if any of the rule returns 'ExistingObjects' replication.
|
||||||
|
func (c Config) HasExistingObjectReplication(arn string) bool {
|
||||||
|
for _, rule := range c.Rules {
|
||||||
|
if rule.Destination.ARN == arn || c.RoleArn == arn {
|
||||||
|
if rule.ExistingObjectReplication.Status == Enabled {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// FilterActionableRules returns the rules actions that need to be executed
|
// FilterActionableRules returns the rules actions that need to be executed
|
||||||
// after evaluating prefix/tag filtering
|
// after evaluating prefix/tag filtering
|
||||||
func (c Config) FilterActionableRules(obj ObjectOpts) []Rule {
|
func (c Config) FilterActionableRules(obj ObjectOpts) []Rule {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user