Add missing validation for replication API conditions (#10114)

This commit is contained in:
poornas 2020-07-22 17:39:40 -07:00 committed by GitHub
parent 73890f31af
commit b9be841fd2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 33 additions and 4 deletions

View File

@ -135,6 +135,10 @@ func (a adminAPIHandlers) SetBucketReplicationTargetHandler(w http.ResponseWrite
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return return
} }
if !globalIsErasure {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
// Turn off replication if disk crawl is unavailable. // Turn off replication if disk crawl is unavailable.
if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOff { if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOff {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrBucketReplicationDisabledError), r.URL) writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrBucketReplicationDisabledError), r.URL)
@ -214,8 +218,7 @@ func (a adminAPIHandlers) GetBucketReplicationTargetsHandler(w http.ResponseWrit
if !target.Empty() { if !target.Empty() {
var creds auth.Credentials var creds auth.Credentials
creds.AccessKey = target.Credentials.AccessKey creds.AccessKey = target.Credentials.AccessKey
tgt = madmin.BucketReplicationTarget{Endpoint: target.Endpoint, TargetBucket: target.TargetBucket, Credentials: &creds} tgt = madmin.BucketReplicationTarget{Endpoint: target.Endpoint, TargetBucket: target.TargetBucket, Credentials: &creds, Arn: target.Arn}
} }
data, err := json.Marshal(tgt) data, err := json.Marshal(tgt)
if err != nil { if err != nil {

View File

@ -108,7 +108,7 @@ const (
ErrReplicationConfigurationNotFoundError ErrReplicationConfigurationNotFoundError
ErrReplicationDestinationNotFoundError ErrReplicationDestinationNotFoundError
ErrReplicationTargetNotFoundError ErrReplicationTargetNotFoundError
ErrReplicationTargetNotVersionedError
ErrReplicationNeedsVersioningError ErrReplicationNeedsVersioningError
ErrReplicationBucketNeedsVersioningError ErrReplicationBucketNeedsVersioningError
ErrBucketReplicationDisabledError ErrBucketReplicationDisabledError
@ -830,6 +830,11 @@ var errorCodes = errorCodeMap{
Description: "The replication target does not exist", Description: "The replication target does not exist",
HTTPStatusCode: http.StatusNotFound, HTTPStatusCode: http.StatusNotFound,
}, },
ErrReplicationTargetNotVersionedError: {
Code: "ReplicationTargetNotVersionedError",
Description: "The replication target does not have versioning enabled",
HTTPStatusCode: http.StatusNotFound,
},
ErrReplicationNeedsVersioningError: { ErrReplicationNeedsVersioningError: {
Code: "InvalidRequest", Code: "InvalidRequest",
Description: "Versioning must be 'Enabled' on the bucket to apply a replication configuration", Description: "Versioning must be 'Enabled' on the bucket to apply a replication configuration",
@ -1876,6 +1881,8 @@ func toAPIErrorCode(ctx context.Context, err error) (apiErr APIErrorCode) {
apiErr = ErrReplicationDestinationNotFoundError apiErr = ErrReplicationDestinationNotFoundError
case BucketReplicationTargetNotFound: case BucketReplicationTargetNotFound:
apiErr = ErrReplicationTargetNotFoundError apiErr = ErrReplicationTargetNotFoundError
case BucketReplicationTargetNotVersioned:
apiErr = ErrReplicationTargetNotVersionedError
case BucketQuotaExceeded: case BucketQuotaExceeded:
apiErr = ErrAdminBucketQuotaExceeded apiErr = ErrAdminBucketQuotaExceeded
case *event.ErrInvalidEventName: case *event.ErrInvalidEventName:

View File

@ -1036,7 +1036,10 @@ func (api objectAPIHandlers) PutBucketObjectLockConfigHandler(w http.ResponseWri
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r)) writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r))
return return
} }
if !globalIsErasure {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
if s3Error := checkRequestAuthType(ctx, r, policy.PutBucketObjectLockConfigurationAction, bucket, ""); s3Error != ErrNone { if s3Error := checkRequestAuthType(ctx, r, policy.PutBucketObjectLockConfigurationAction, bucket, ""); s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r)) writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
return return
@ -1238,6 +1241,10 @@ func (api objectAPIHandlers) PutBucketReplicationConfigHandler(w http.ResponseWr
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r)) writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r))
return return
} }
if !globalIsErasure {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
return
}
// Turn off replication if disk crawl is unavailable. // Turn off replication if disk crawl is unavailable.
if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOff { if env.Get(envDataUsageCrawlConf, config.EnableOn) == config.EnableOff {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrBucketReplicationDisabledError), r.URL) writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrBucketReplicationDisabledError), r.URL)

View File

@ -32,6 +32,7 @@ import (
xhttp "github.com/minio/minio/cmd/http" xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/bucket/replication" "github.com/minio/minio/pkg/bucket/replication"
"github.com/minio/minio/pkg/bucket/versioning"
"github.com/minio/minio/pkg/event" "github.com/minio/minio/pkg/event"
iampolicy "github.com/minio/minio/pkg/iam/policy" iampolicy "github.com/minio/minio/pkg/iam/policy"
"github.com/minio/minio/pkg/madmin" "github.com/minio/minio/pkg/madmin"
@ -84,6 +85,10 @@ func (sys *BucketReplicationSys) SetTarget(ctx context.Context, bucket string, t
if !ok { if !ok {
return BucketReplicationDestinationNotFound{Bucket: tgt.TargetBucket} return BucketReplicationDestinationNotFound{Bucket: tgt.TargetBucket}
} }
vcfg, err := clnt.GetBucketVersioning(ctx, tgt.TargetBucket)
if err != nil || vcfg.Status != string(versioning.Enabled) {
return BucketReplicationTargetNotVersioned{Bucket: tgt.TargetBucket}
}
sys.Lock() sys.Lock()
sys.targetsMap[bucket] = clnt sys.targetsMap[bucket] = clnt
sys.targetsARNMap[tgt.URL()] = tgt.Arn sys.targetsARNMap[tgt.URL()] = tgt.Arn

View File

@ -369,6 +369,13 @@ func (e BucketReplicationTargetNotFound) Error() string {
return "Replication target not found: " + e.Bucket return "Replication target not found: " + e.Bucket
} }
// BucketReplicationTargetNotVersioned replication target does not have versioning enabled.
type BucketReplicationTargetNotVersioned GenericError
func (e BucketReplicationTargetNotVersioned) Error() string {
return "Replication target does not have versioning enabled: " + e.Bucket
}
/// Bucket related errors. /// Bucket related errors.
// BucketNameInvalid - bucketname provided is invalid. // BucketNameInvalid - bucketname provided is invalid.