replication: add validation API (#17520)

To check if replication is set up properly on a bucket.
This commit is contained in:
Poorna
2023-07-10 23:09:20 -04:00
committed by GitHub
parent 85f5700e4e
commit fb49aead9b
10 changed files with 455 additions and 259 deletions

View File

@@ -18,13 +18,18 @@
package cmd
import (
"bytes"
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"io"
"net/http"
"path"
"time"
"github.com/minio/minio-go/v7"
objectlock "github.com/minio/minio/internal/bucket/object/lock"
"github.com/minio/minio/internal/bucket/replication"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
@@ -466,3 +471,148 @@ func (api objectAPIHandlers) ResetBucketReplicationStatusHandler(w http.Response
// Write success response.
writeSuccessResponseJSON(w, data)
}
// ValidateBucketReplicationCredsHandler - validate replication credentials for a bucket.
// ----------
func (api objectAPIHandlers) ValidateBucketReplicationCredsHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "ValidateBucketReplicationCreds")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
vars := mux.Vars(r)
bucket := vars["bucket"]
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
if s3Error := checkRequestAuthType(ctx, r, policy.GetReplicationConfigurationAction, bucket, ""); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return
}
// Check if bucket exists.
if _, err := objectAPI.GetBucketInfo(ctx, bucket, BucketOptions{}); err != nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErrWithErr(ErrReplicationValidationError, err), r.URL)
return
}
if versioned := globalBucketVersioningSys.Enabled(bucket); !versioned {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrReplicationNeedsVersioningError), r.URL)
return
}
replicationConfig, _, err := globalBucketMetadataSys.GetReplicationConfig(ctx, bucket)
if err != nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErrWithErr(ErrReplicationConfigurationNotFoundError, err), r.URL)
return
}
lockEnabled := false
lcfg, _, err := globalBucketMetadataSys.GetObjectLockConfig(bucket)
if err != nil {
if !errors.Is(err, BucketObjectLockConfigNotFound{Bucket: bucket}) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErrWithErr(ErrReplicationValidationError, err), r.URL)
return
}
}
if lcfg != nil {
lockEnabled = lcfg.Enabled()
}
sameTarget, apiErr := validateReplicationDestination(ctx, bucket, replicationConfig, true)
if apiErr != noError {
writeErrorResponse(ctx, w, apiErr, r.URL)
return
}
// Validate the bucket replication config
if err = replicationConfig.Validate(bucket, sameTarget); err != nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErrWithErr(ErrReplicationValidationError, err), r.URL)
return
}
buf := bytes.Repeat([]byte("a"), 8)
for _, rule := range replicationConfig.Rules {
if rule.Status == replication.Disabled {
continue
}
clnt := globalBucketTargetSys.GetRemoteTargetClient(ctx, rule.Destination.Bucket)
if clnt == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErrWithErr(ErrRemoteTargetNotFoundError, fmt.Errorf("replication config with rule ID %s has a stale target", rule.ID)), r.URL)
return
}
if lockEnabled {
lock, _, _, _, err := clnt.GetObjectLockConfig(ctx, clnt.Bucket)
if err != nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErrWithErr(ErrReplicationValidationError, err), r.URL)
return
}
if lock != objectlock.Enabled {
writeErrorResponse(ctx, w, errorCodes.ToAPIErrWithErr(ErrReplicationDestinationMissingLock, fmt.Errorf("target bucket %s is not object lock enabled", clnt.Bucket)), r.URL)
return
}
}
vcfg, err := clnt.GetBucketVersioning(ctx, clnt.Bucket)
if err != nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErrWithErr(ErrReplicationValidationError, err), r.URL)
return
}
if !vcfg.Enabled() {
writeErrorResponse(ctx, w, errorCodes.ToAPIErrWithErr(ErrRemoteTargetNotVersionedError, fmt.Errorf("target bucket %s is not versioned", clnt.Bucket)), r.URL)
return
}
if sameTarget && bucket == clnt.Bucket {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrBucketRemoteIdenticalToSource), r.URL)
return
}
reader := bytes.NewReader(buf)
// fake a PutObject and RemoveObject call to validate permissions
c := &minio.Core{Client: clnt.Client}
putOpts := minio.PutObjectOptions{
Internal: minio.AdvancedPutOptions{
SourceVersionID: mustGetUUID(),
ReplicationStatus: minio.ReplicationStatusReplica,
SourceMTime: time.Now(),
ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside
ReplicationValidityCheck: true, // set this to validate the replication config
},
}
obj := path.Join(minioReservedBucket, globalLocalNodeNameHex, "deleteme")
ui, err := c.PutObject(ctx, clnt.Bucket, obj, reader, int64(len(buf)), "", "", putOpts)
if err != nil && !isReplicationPermissionCheck(ErrorRespToObjectError(err, bucket, obj)) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErrWithErr(ErrReplicationValidationError, fmt.Errorf("s3:ReplicateObject permissions missing for replication user: %w", err)), r.URL)
return
}
err = c.RemoveObject(ctx, clnt.Bucket, obj, minio.RemoveObjectOptions{
VersionID: ui.VersionID,
Internal: minio.AdvancedRemoveOptions{
ReplicationDeleteMarker: true,
ReplicationMTime: time.Now(),
ReplicationStatus: minio.ReplicationStatusReplica,
ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside
ReplicationValidityCheck: true, // set this to validate the replication config
},
})
if err != nil && !isReplicationPermissionCheck(ErrorRespToObjectError(err, bucket, obj)) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErrWithErr(ErrReplicationValidationError, fmt.Errorf("s3:ReplicateDelete permissions missing for replication user: %w", err)), r.URL)
return
}
// fake a versioned delete - to ensure deny policies are not in place
err = c.RemoveObject(ctx, clnt.Bucket, obj, minio.RemoveObjectOptions{
VersionID: ui.VersionID,
Internal: minio.AdvancedRemoveOptions{
ReplicationDeleteMarker: false,
ReplicationMTime: time.Now(),
ReplicationStatus: minio.ReplicationStatusReplica,
ReplicationRequest: true, // always set this to distinguish between `mc mirror` replication and serverside
ReplicationValidityCheck: true, // set this to validate the replication config
},
})
if err != nil && !isReplicationPermissionCheck(ErrorRespToObjectError(err, bucket, obj)) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErrWithErr(ErrReplicationValidationError, fmt.Errorf("s3:ReplicateDelete/s3:DeleteObject permissions missing for replication user: %w", err)), r.URL)
return
}
}
// Write success response.
writeSuccessResponseHeadersOnly(w)
}