validate bucket before attempting batch replication (#15861)

This commit is contained in:
Harshavardhana 2022-10-15 11:58:31 -07:00 committed by GitHub
parent c79bcc8838
commit c68910005b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 80 additions and 2 deletions

View File

@ -2228,6 +2228,12 @@ func toAPIError(ctx context.Context, err error) APIError {
// their internal error types. This code is only
// useful with gateway implementations.
switch e := err.(type) {
case batchReplicationJobError:
apiErr = APIError{
Code: e.Code,
Description: e.Description,
HTTPStatusCode: e.HTTPStatusCode,
}
case InvalidArgument:
apiErr = APIError{
Code: "InvalidArgument",

View File

@ -217,6 +217,8 @@ type BatchJobReplicateV1 struct {
Flags BatchJobReplicateFlags `yaml:"flags" json:"flags"`
Target BatchJobReplicateTarget `yaml:"target" json:"target"`
Source BatchJobReplicateSource `yaml:"source" json:"source"`
clnt *miniogo.Core `msg:"-"`
}
// BatchJobRequest this is an internal data structure not for external consumption.
@ -640,6 +642,17 @@ func (r *BatchJobReplicateV1) Start(ctx context.Context, api ObjectLayer, job Ba
return nil
}
//msgp:ignore batchReplicationJobError
type batchReplicationJobError struct {
Code string
Description string
HTTPStatusCode int
}
func (e batchReplicationJobError) Error() string {
return e.Description
}
// Validate validates the job definition input
func (r *BatchJobReplicateV1) Validate(ctx context.Context, o ObjectLayer) error {
if r == nil {
@ -654,8 +667,15 @@ func (r *BatchJobReplicateV1) Validate(ctx context.Context, o ObjectLayer) error
return errInvalidArgument
}
_, err := o.GetBucketInfo(ctx, r.Source.Bucket, BucketOptions{})
info, err := o.GetBucketInfo(ctx, r.Source.Bucket, BucketOptions{})
if err != nil {
if isErrBucketNotFound(err) {
return batchReplicationJobError{
Code: "NoSuchSourceBucket",
Description: "The specified source bucket does not exist",
HTTPStatusCode: http.StatusNotFound,
}
}
return err
}
@ -695,6 +715,44 @@ func (r *BatchJobReplicateV1) Validate(ctx context.Context, o ObjectLayer) error
return err
}
u, err := url.Parse(r.Target.Endpoint)
if err != nil {
return err
}
cred := r.Target.Creds
c, err := miniogo.NewCore(u.Host, &miniogo.Options{
Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken),
Secure: u.Scheme == "https",
Transport: getRemoteInstanceTransport,
})
if err != nil {
return err
}
vcfg, err := c.GetBucketVersioning(ctx, r.Target.Bucket)
if err != nil {
if miniogo.ToErrorResponse(err).Code == "NoSuchBucket" {
return batchReplicationJobError{
Code: "NoSuchTargetBucket",
Description: "The specified target bucket does not exist",
HTTPStatusCode: http.StatusNotFound,
}
}
return err
}
if info.Versioning && !vcfg.Enabled() {
return batchReplicationJobError{
Code: "InvalidBucketState",
Description: fmt.Sprintf("The source '%s' has versioning enabled, target '%s' must have versioning enabled",
r.Source.Bucket, r.Target.Bucket),
HTTPStatusCode: http.StatusBadRequest,
}
}
r.clnt = c
return nil
}

View File

@ -1588,6 +1588,8 @@ func (z *erasureServerPools) GetBucketInfo(ctx context.Context, bucket string, o
meta, err := globalBucketMetadataSys.Get(bucket)
if err == nil {
bucketInfo.Created = meta.Created
bucketInfo.Versioning = meta.LockEnabled || globalBucketVersioningSys.Enabled(bucket)
bucketInfo.ObjectLocking = meta.LockEnabled
}
return bucketInfo, nil
}
@ -1602,6 +1604,8 @@ func (z *erasureServerPools) GetBucketInfo(ctx context.Context, bucket string, o
meta, err := globalBucketMetadataSys.Get(bucket)
if err == nil {
bucketInfo.Created = meta.Created
bucketInfo.Versioning = meta.LockEnabled || globalBucketVersioningSys.Enabled(bucket)
bucketInfo.ObjectLocking = meta.LockEnabled
}
return bucketInfo, nil
}

View File

@ -344,7 +344,14 @@ func (es *erasureSingle) GetBucketInfo(ctx context.Context, bucket string, opts
}
return bi, toObjectErr(err, bucket)
}
return BucketInfo{Name: volInfo.Name, Created: volInfo.Created}, nil
bi = BucketInfo{Name: volInfo.Name, Created: volInfo.Created}
meta, err := globalBucketMetadataSys.Get(bucket)
if err == nil {
bi.Created = meta.Created
bi.Versioning = meta.LockEnabled || globalBucketVersioningSys.Enabled(bucket)
bi.ObjectLocking = meta.LockEnabled
}
return bi, nil
}
// DeleteBucket - deletes a bucket.

View File

@ -79,6 +79,9 @@ type BucketInfo struct {
// Date and time when the bucket was created.
Created time.Time
Deleted time.Time
// Bucket features enabled
Versioning, ObjectLocking bool
}
// ObjectInfo - represents object metadata.