fix: error handling cases in site-replication (#13901)

- Allow proper SRError to be propagated to
  handlers and converted appropriately.

- Make sure to enable object locking on buckets
  when requested in MakeBucketHook.

- When DNSConfig is enabled attempt to delete it
  first before deleting buckets locally.
This commit is contained in:
Harshavardhana 2021-12-14 14:09:57 -08:00 committed by GitHub
parent a8d4042853
commit 88ad742da0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 156 additions and 105 deletions

View File

@ -86,6 +86,8 @@ func toAdminAPIErr(ctx context.Context, err error) APIError {
Description: e.Message,
HTTPStatusCode: e.StatusCode,
}
case SRError:
apiErr = errorCodes.ToAPIErrWithErr(e.Code, e.Cause)
default:
switch {
case errors.Is(err, errConfigNotFound):

View File

@ -45,16 +45,16 @@ func (a adminAPIHandlers) SiteReplicationAdd(w http.ResponseWriter, r *http.Requ
}
var sites []madmin.PeerSite
errCode := readJSONBody(ctx, r.Body, &sites, cred.SecretKey)
if errCode != ErrNone {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(errCode), r.URL)
err := parseJSONBody(ctx, r.Body, &sites, cred.SecretKey)
if err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
status, errInfo := globalSiteReplicationSys.AddPeerClusters(ctx, sites)
if errInfo.Code != ErrNone {
logger.LogIf(ctx, errInfo)
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(errInfo.Code, errInfo.Cause), r.URL)
status, err := globalSiteReplicationSys.AddPeerClusters(ctx, sites)
if err != nil {
logger.LogIf(ctx, err)
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
@ -82,16 +82,14 @@ func (a adminAPIHandlers) SRInternalJoin(w http.ResponseWriter, r *http.Request)
}
var joinArg madmin.SRInternalJoinReq
errCode := readJSONBody(ctx, r.Body, &joinArg, cred.SecretKey)
if errCode != ErrNone {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(errCode), r.URL)
if err := parseJSONBody(ctx, r.Body, &joinArg, cred.SecretKey); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
errInfo := globalSiteReplicationSys.InternalJoinReq(ctx, joinArg)
if errInfo.Code != ErrNone {
logger.LogIf(ctx, errInfo)
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(errInfo.Code, errInfo.Cause), r.URL)
if err := globalSiteReplicationSys.InternalJoinReq(ctx, joinArg); err != nil {
logger.LogIf(ctx, err)
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
}
@ -114,7 +112,7 @@ func (a adminAPIHandlers) SRInternalBucketOps(w http.ResponseWriter, r *http.Req
var err error
switch operation {
default:
err = errInvalidArgument
err = errSRInvalidRequest(errInvalidArgument)
case madmin.MakeWithVersioningBktOp:
_, isLockEnabled := r.Form["lockEnabled"]
_, isVersioningEnabled := r.Form["versioningEnabled"]
@ -151,16 +149,15 @@ func (a adminAPIHandlers) SRInternalReplicateIAMItem(w http.ResponseWriter, r *h
}
var item madmin.SRIAMItem
errCode := readJSONBody(ctx, r.Body, &item, "")
if errCode != ErrNone {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(errCode), r.URL)
if err := parseJSONBody(ctx, r.Body, &item, ""); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
var err error
switch item.Type {
default:
err = errInvalidArgument
err = errSRInvalidRequest(errInvalidArgument)
case madmin.SRIAMItemPolicy:
if item.Policy == nil {
err = globalSiteReplicationSys.PeerAddPolicyHandler(ctx, item.Name, nil)
@ -202,16 +199,15 @@ func (a adminAPIHandlers) SRInternalReplicateBucketItem(w http.ResponseWriter, r
}
var item madmin.SRBucketMeta
errCode := readJSONBody(ctx, r.Body, &item, "")
if errCode != ErrNone {
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(errCode), r.URL)
if err := parseJSONBody(ctx, r.Body, &item, ""); err != nil {
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
return
}
var err error
switch item.Type {
default:
err = errInvalidArgument
err = errSRInvalidRequest(errInvalidArgument)
case madmin.SRBucketMetaTypePolicy:
if item.Policy == nil {
err = globalSiteReplicationSys.PeerBucketPolicyHandler(ctx, item.Bucket, nil)
@ -293,24 +289,25 @@ func (a adminAPIHandlers) SRInternalGetIDPSettings(w http.ResponseWriter, r *htt
}
}
func readJSONBody(ctx context.Context, body io.Reader, v interface{}, encryptionKey string) APIErrorCode {
func parseJSONBody(ctx context.Context, body io.Reader, v interface{}, encryptionKey string) error {
data, err := ioutil.ReadAll(body)
if err != nil {
return ErrInvalidRequest
return SRError{
Cause: err,
Code: ErrSiteReplicationInvalidRequest,
}
}
if encryptionKey != "" {
data, err = madmin.DecryptData(encryptionKey, bytes.NewReader(data))
if err != nil {
logger.LogIf(ctx, err)
return ErrInvalidRequest
return SRError{
Cause: err,
Code: ErrSiteReplicationInvalidRequest,
}
}
}
err = json.Unmarshal(data, v)
if err != nil {
return ErrAdminConfigBadJSON
}
return ErrNone
return json.Unmarshal(data, v)
}

View File

@ -1280,7 +1280,7 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http.
}
if globalDNSConfig != nil {
if err2 := globalDNSConfig.Put(bucket); err2 != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to restore bucket DNS entry %w, pl1ease fix it manually", err2))
logger.LogIf(ctx, fmt.Errorf("Unable to restore bucket DNS entry %w, please fix it manually", err2))
}
}
writeErrorResponse(ctx, w, apiErr, r.URL)

View File

@ -40,7 +40,6 @@ import (
"github.com/minio/minio-go/v7/pkg/set"
"github.com/minio/minio/internal/auth"
sreplication "github.com/minio/minio/internal/bucket/replication"
"github.com/minio/minio/internal/bucket/versioning"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/bucket/policy"
iampolicy "github.com/minio/pkg/iam/policy"
@ -56,11 +55,26 @@ const (
)
var (
errSRCannotJoin = errors.New("this site is already configured for site-replication")
errSRDuplicateSites = errors.New("duplicate sites provided for site-replication")
errSRSelfNotFound = errors.New("none of the given sites correspond to the current one")
errSRPeerNotFound = errors.New("peer not found")
errSRNotEnabled = errors.New("site replication is not enabled")
errSRCannotJoin = SRError{
Cause: errors.New("this site is already configured for site-replication"),
Code: ErrSiteReplicationInvalidRequest,
}
errSRDuplicateSites = SRError{
Cause: errors.New("duplicate sites provided for site-replication"),
Code: ErrSiteReplicationInvalidRequest,
}
errSRSelfNotFound = SRError{
Cause: errors.New("none of the given sites correspond to the current one"),
Code: ErrSiteReplicationInvalidRequest,
}
errSRPeerNotFound = SRError{
Cause: errors.New("peer not found"),
Code: ErrSiteReplicationInvalidRequest,
}
errSRNotEnabled = SRError{
Cause: errors.New("site replication is not enabled"),
Code: ErrSiteReplicationInvalidRequest,
}
)
func errSRInvalidRequest(err error) SRError {
@ -309,7 +323,7 @@ func (c *SiteReplicationSys) getSiteStatuses(ctx context.Context, sites []madmin
}
// AddPeerClusters - add cluster sites for replication configuration.
func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, psites []madmin.PeerSite) (madmin.ReplicateAddStatus, SRError) {
func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, psites []madmin.PeerSite) (madmin.ReplicateAddStatus, error) {
sites, serr := c.getSiteStatuses(ctx, psites)
if serr.Cause != nil {
return madmin.ReplicateAddStatus{}, serr
@ -335,7 +349,7 @@ func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, psites []madmi
for i, v := range sites {
// deploymentIDs must be unique
if deploymentIDsSet.Contains(v.DeploymentID) {
return madmin.ReplicateAddStatus{}, errSRInvalidRequest(errSRDuplicateSites)
return madmin.ReplicateAddStatus{}, errSRDuplicateSites
}
deploymentIDsSet.Add(v.DeploymentID)
@ -351,32 +365,32 @@ func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, psites []madmi
if c.enabled {
// If current cluster is already SR enabled and no new site being added ,fail.
if currDeploymentIDsSet.Equals(deploymentIDsSet) {
return madmin.ReplicateAddStatus{}, errSRInvalidRequest(errSRCannotJoin)
return madmin.ReplicateAddStatus{}, errSRCannotJoin
}
if len(currDeploymentIDsSet.Intersection(deploymentIDsSet)) != len(currDeploymentIDsSet) {
diffSlc := getMissingSiteNames(currDeploymentIDsSet, deploymentIDsSet, currSites.Sites)
return madmin.ReplicateAddStatus{}, errSRInvalidRequest(fmt.Errorf("All existing replicated sites must be specified - missing %s", strings.Join(diffSlc, " ")))
return madmin.ReplicateAddStatus{}, errSRInvalidRequest(fmt.Errorf("all existing replicated sites must be specified - missing %s", strings.Join(diffSlc, " ")))
}
}
// For this `add` API, either all clusters must be empty or the local
// cluster must be the only one having some buckets.
if localHasBuckets && nonLocalPeerWithBuckets != "" {
return madmin.ReplicateAddStatus{}, errSRInvalidRequest(errors.New("Only one cluster may have data when configuring site replication"))
return madmin.ReplicateAddStatus{}, errSRInvalidRequest(errors.New("only one cluster may have data when configuring site replication"))
}
if !localHasBuckets && nonLocalPeerWithBuckets != "" {
return madmin.ReplicateAddStatus{}, errSRInvalidRequest(fmt.Errorf("Please send your request to the cluster containing data/buckets: %s", nonLocalPeerWithBuckets))
return madmin.ReplicateAddStatus{}, errSRInvalidRequest(fmt.Errorf("please send your request to the cluster containing data/buckets: %s", nonLocalPeerWithBuckets))
}
// validate that all clusters are using the same (LDAP based)
// external IDP.
pass, verr := c.validateIDPSettings(ctx, sites)
if verr.Cause != nil {
return madmin.ReplicateAddStatus{}, verr
pass, err := c.validateIDPSettings(ctx, sites)
if err != nil {
return madmin.ReplicateAddStatus{}, err
}
if !pass {
return madmin.ReplicateAddStatus{}, errSRInvalidRequest(fmt.Errorf("All cluster sites must have the same (LDAP) IDP settings."))
return madmin.ReplicateAddStatus{}, errSRInvalidRequest(errors.New("all cluster sites must have the same (LDAP) IDP settings"))
}
// FIXME: Ideally, we also need to check if there are any global IAM
@ -429,7 +443,7 @@ func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, psites []madmi
addedCount := 0
var (
peerAddErr SRError
peerAddErr error
admClient *madmin.AdminClient
)
@ -456,7 +470,7 @@ func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, psites []madmi
addedCount++
}
if peerAddErr.Cause != nil {
if peerAddErr != nil {
if addedCount == 0 {
return madmin.ReplicateAddStatus{}, peerAddErr
}
@ -469,8 +483,9 @@ func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, psites []madmi
ErrDetail: peerAddErr.Error(),
}
return partial, SRError{}
return partial, nil
}
// Other than handling existing buckets, we can now save the cluster
// replication configuration state.
state := srState{
@ -478,29 +493,29 @@ func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, psites []madmi
Peers: joinReq.Peers,
ServiceAccountAccessKey: svcCred.AccessKey,
}
err = c.saveToDisk(ctx, state)
if err != nil {
if err = c.saveToDisk(ctx, state); err != nil {
return madmin.ReplicateAddStatus{
Status: madmin.ReplicateAddStatusPartial,
ErrDetail: fmt.Sprintf("unable to save cluster-replication state on local: %v", err),
}, SRError{}
}, nil
}
result := madmin.ReplicateAddStatus{
Success: true,
Status: madmin.ReplicateAddStatusSuccess,
}
initialSyncErr := c.syncLocalToPeers(ctx)
if initialSyncErr.Code != ErrNone {
result.InitialSyncErrorMessage = initialSyncErr.Error()
if err := c.syncLocalToPeers(ctx); err != nil {
result.InitialSyncErrorMessage = err.Error()
}
return result, SRError{}
return result, nil
}
// InternalJoinReq - internal API handler to respond to a peer cluster's request
// to join.
func (c *SiteReplicationSys) InternalJoinReq(ctx context.Context, arg madmin.SRInternalJoinReq) SRError {
func (c *SiteReplicationSys) InternalJoinReq(ctx context.Context, arg madmin.SRInternalJoinReq) error {
var ourName string
for d, p := range arg.Peers {
if d == globalDeploymentID {
@ -509,7 +524,7 @@ func (c *SiteReplicationSys) InternalJoinReq(ctx context.Context, arg madmin.SRI
}
}
if ourName == "" {
return errSRInvalidRequest(errSRSelfNotFound)
return errSRSelfNotFound
}
_, _, err := globalIAMSys.GetServiceAccount(ctx, arg.SvcAcctAccessKey)
@ -528,11 +543,10 @@ func (c *SiteReplicationSys) InternalJoinReq(ctx context.Context, arg madmin.SRI
Peers: arg.Peers,
ServiceAccountAccessKey: arg.SvcAcctAccessKey,
}
err = c.saveToDisk(ctx, state)
if err != nil {
if err = c.saveToDisk(ctx, state); err != nil {
return errSRBackendIssue(fmt.Errorf("unable to save cluster-replication state to disk on %s: %v", ourName, err))
}
return SRError{}
return nil
}
// GetIDPSettings returns info about the configured identity provider. It is
@ -547,7 +561,7 @@ func (c *SiteReplicationSys) GetIDPSettings(ctx context.Context) madmin.IDPSetti
}
}
func (c *SiteReplicationSys) validateIDPSettings(ctx context.Context, peers []PeerSiteInfo) (bool, SRError) {
func (c *SiteReplicationSys) validateIDPSettings(ctx context.Context, peers []PeerSiteInfo) (bool, error) {
s := make([]madmin.IDPSettings, 0, len(peers))
for _, v := range peers {
if v.self {
@ -569,15 +583,15 @@ func (c *SiteReplicationSys) validateIDPSettings(ctx context.Context, peers []Pe
for _, v := range s {
if !v.IsLDAPEnabled {
return false, SRError{}
return false, nil
}
}
for i := 1; i < len(s); i++ {
if s[i] != s[0] {
return false, SRError{}
return false, nil
}
}
return true, SRError{}
return true, nil
}
// GetClusterInfo - returns site replication information.
@ -620,7 +634,11 @@ func (c *SiteReplicationSys) MakeBucketHook(ctx context.Context, bucket string,
optsMap["location"] = opts.Location
}
if opts.LockEnabled {
optsMap["lockEnabled"] = ""
optsMap["lockEnabled"] = "true"
optsMap["versioningEnabled"] = "true"
}
if opts.VersioningEnabled {
optsMap["versioningEnabled"] = "true"
}
// Create bucket and enable versioning on all peers.
@ -714,6 +732,7 @@ func (c *SiteReplicationSys) PeerBucketMakeWithVersioningHandler(ctx context.Con
if objAPI == nil {
return errServerNotInitialized
}
err := objAPI.MakeBucketWithLocation(ctx, bucket, opts)
if err != nil {
// Check if this is a bucket exists error.
@ -729,27 +748,25 @@ func (c *SiteReplicationSys) PeerBucketMakeWithVersioningHandler(ctx context.Con
globalNotificationSys.LoadBucketMetadata(GlobalContext, bucket)
}
// Enable versioning on the bucket.
config, err := globalBucketVersioningSys.Get(bucket)
meta, err := globalBucketMetadataSys.Get(bucket)
if err != nil {
logger.LogIf(ctx, c.annotateErr("MakeBucketErr on peer call", err))
return wrapSRErr(err)
}
if !config.Enabled() {
verConf := versioning.Versioning{
Status: versioning.Enabled,
}
// FIXME: need to confirm if skipping object lock and
// versioning-suspended state checks are valid here.
cfgData, err := xml.Marshal(verConf)
if err != nil {
return wrapSRErr(err)
}
err = globalBucketMetadataSys.Update(bucket, bucketVersioningConfig, cfgData)
if err != nil {
logger.LogIf(ctx, c.annotateErr("Versioning enabling error on peer call", err))
return wrapSRErr(err)
}
meta.VersioningConfigXML = enabledBucketVersioningConfig
if opts.LockEnabled {
meta.ObjectLockConfigXML = enabledBucketObjectLockConfig
}
if err := meta.Save(context.Background(), objAPI); err != nil {
return wrapSRErr(err)
}
globalBucketMetadataSys.Set(bucket, meta)
// Load updated bucket metadata into memory as new metadata updated.
globalNotificationSys.LoadBucketMetadata(GlobalContext, bucket)
return nil
}
@ -931,14 +948,24 @@ func (c *SiteReplicationSys) PeerBucketDeleteHandler(ctx context.Context, bucket
return errSRNotEnabled
}
// FIXME: need to handle cases where globalDNSConfig is set.
objAPI := newObjectLayerFn()
if objAPI == nil {
return errServerNotInitialized
}
if globalDNSConfig != nil {
if err := globalDNSConfig.Delete(bucket); err != nil {
return err
}
}
err := objAPI.DeleteBucket(ctx, bucket, DeleteBucketOptions{Force: forceDelete})
if err != nil {
if globalDNSConfig != nil {
if err2 := globalDNSConfig.Put(bucket); err2 != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to restore bucket DNS entry %w, please fix it manually", err2))
}
}
return err
}
@ -1008,7 +1035,7 @@ func (c *SiteReplicationSys) PeerAddPolicyHandler(ctx context.Context, policyNam
// PeerSvcAccChangeHandler - copies service-account change to local.
func (c *SiteReplicationSys) PeerSvcAccChangeHandler(ctx context.Context, change *madmin.SRSvcAccChange) error {
if change == nil {
return errInvalidArgument
return errSRInvalidRequest(errInvalidArgument)
}
switch {
case change.Create != nil:
@ -1073,7 +1100,7 @@ func (c *SiteReplicationSys) PeerSvcAccChangeHandler(ctx context.Context, change
// PeerPolicyMappingHandler - copies policy mapping to local.
func (c *SiteReplicationSys) PeerPolicyMappingHandler(ctx context.Context, mapping *madmin.SRPolicyMapping) error {
if mapping == nil {
return errInvalidArgument
return errSRInvalidRequest(errInvalidArgument)
}
err := globalIAMSys.PolicyDBSet(ctx, mapping.UserOrGroup, mapping.Policy, mapping.IsGroup)
if err != nil {
@ -1085,7 +1112,7 @@ func (c *SiteReplicationSys) PeerPolicyMappingHandler(ctx context.Context, mappi
// PeerSTSAccHandler - replicates STS credential locally.
func (c *SiteReplicationSys) PeerSTSAccHandler(ctx context.Context, stsCred *madmin.SRSTSCredential) error {
if stsCred == nil {
return errInvalidArgument
return errSRInvalidRequest(errInvalidArgument)
}
// Verify the session token of the stsCred
@ -1261,7 +1288,7 @@ func (c *SiteReplicationSys) getAdminClient(ctx context.Context, deploymentID st
func (c *SiteReplicationSys) getPeerCreds() (*auth.Credentials, error) {
creds, ok := globalIAMSys.store.GetUser(c.state.ServiceAccountAccessKey)
if !ok {
return nil, errors.New("site replication service account not found!")
return nil, errors.New("site replication service account not found")
}
return &creds, nil
}
@ -1269,7 +1296,7 @@ func (c *SiteReplicationSys) getPeerCreds() (*auth.Credentials, error) {
// syncLocalToPeers is used when initially configuring site replication, to
// copy existing buckets, their settings, service accounts and policies to all
// new peers.
func (c *SiteReplicationSys) syncLocalToPeers(ctx context.Context) SRError {
func (c *SiteReplicationSys) syncLocalToPeers(ctx context.Context) error {
// If local has buckets, enable versioning on them, create them on peers
// and setup replication rules.
objAPI := newObjectLayerFn()
@ -1518,7 +1545,7 @@ func (c *SiteReplicationSys) syncLocalToPeers(ctx context.Context) SRError {
}
}
return SRError{}
return nil
}
// Concurrency helpers

View File

@ -79,26 +79,26 @@ if [ $? -eq 0 ]; then
fi
./mc admin user info minio1 "uid=dillon,ou=people,ou=swengg,dc=min,dc=io"
if [ $? -eq 1 ]; then
if [ $? -ne 0 ]; then
echo "policy mapping missing, exiting.."
exit_1;
fi
./mc admin user info minio2 "uid=dillon,ou=people,ou=swengg,dc=min,dc=io"
if [ $? -eq 1 ]; then
if [ $? -ne 0 ]; then
echo "policy mapping missing, exiting.."
exit_1;
fi
./mc admin user info minio3 "uid=dillon,ou=people,ou=swengg,dc=min,dc=io"
if [ $? -eq 1 ]; then
if [ $? -ne 0 ]; then
echo "policy mapping missing, exiting.."
exit_1;
fi
# LDAP simple user
./mc admin user svcacct add minio2 dillon --access-key testsvc --secret-key testsvc123
if [ $? -eq 1 ]; then
if [ $? -ne 0 ]; then
echo "adding svc account failed, exiting.."
exit_1;
fi
@ -106,19 +106,19 @@ fi
sleep 10
./mc admin user svcacct info minio1 testsvc
if [ $? -eq 1 ]; then
if [ $? -ne 0 ]; then
echo "svc account not mirrored, exiting.."
exit_1;
fi
./mc admin user svcacct info minio2 testsvc
if [ $? -eq 1 ]; then
if [ $? -ne 0 ]; then
echo "svc account not mirrored, exiting.."
exit_1;
fi
./mc admin user svcacct rm minio1 testsvc
if [ $? -eq 1 ]; then
if [ $? -ne 0 ]; then
echo "removing svc account failed, exiting.."
exit_1;
fi
@ -140,13 +140,13 @@ fi
sleep 5
./mc stat minio2/newbucket
if [ $? -eq 1 ]; then
if [ $? -ne 0 ]; then
echo "expecting bucket to be present. exiting.."
exit_1;
fi
./mc stat minio3/newbucket
if [ $? -eq 1 ]; then
if [ $? -ne 0 ]; then
echo "expecting bucket to be present. exiting.."
exit_1;
fi
@ -155,13 +155,13 @@ fi
sleep 5
./mc stat minio1/newbucket/README.md
if [ $? -eq 1 ]; then
if [ $? -ne 0 ]; then
echo "expecting object to be present. exiting.."
exit_1;
fi
./mc stat minio3/newbucket/README.md
if [ $? -eq 1 ]; then
if [ $? -ne 0 ]; then
echo "expecting object to be present. exiting.."
exit_1;
fi
@ -180,3 +180,28 @@ if [ $? -eq 0 ]; then
echo "expected file to be deleted, exiting.."
exit_1;
fi
./mc mb --with-lock minio3/newbucket-olock
sleep 5
enabled_minio2=$(./mc stat --json minio2/newbucket-olock| jq -r .metadata.ObjectLock.enabled)
if [ $? -ne 0 ]; then
echo "expected bucket to be mirrored with object-lock but not present, exiting..."
exit_1;
fi
if [ "${enabled_minio2}" != "Enabled" ]; then
echo "expected bucket to be mirrored with object-lock enabled, exiting..."
exit_1;
fi
enabled_minio1=$(./mc stat --json minio1/newbucket-olock| jq -r .metadata.ObjectLock.enabled)
if [ $? -ne 0 ]; then
echo "expected bucket to be mirrored with object-lock but not present, exiting..."
exit_1;
fi
if [ "${enabled_minio1}" != "Enabled" ]; then
echo "expected bucket to be mirrored with object-lock enabled, exiting..."
exit_1;
fi