From 88ad742da0ac25527dcbe75cbed2f611564e3f81 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 14 Dec 2021 14:09:57 -0800 Subject: [PATCH] fix: error handling cases in site-replication (#13901) - Allow proper SRError to be propagated to handlers and converted appropriately. - Make sure to enable object locking on buckets when requested in MakeBucketHook. - When DNSConfig is enabled attempt to delete it first before deleting buckets locally. --- cmd/admin-handler-utils.go | 2 + cmd/admin-handlers-site-replication.go | 61 +++++----- cmd/bucket-handlers.go | 2 +- cmd/site-replication.go | 149 ++++++++++++++---------- docs/site-replication/run-multi-site.sh | 47 ++++++-- 5 files changed, 156 insertions(+), 105 deletions(-) diff --git a/cmd/admin-handler-utils.go b/cmd/admin-handler-utils.go index 6430787e0..2f7938c43 100644 --- a/cmd/admin-handler-utils.go +++ b/cmd/admin-handler-utils.go @@ -86,6 +86,8 @@ func toAdminAPIErr(ctx context.Context, err error) APIError { Description: e.Message, HTTPStatusCode: e.StatusCode, } + case SRError: + apiErr = errorCodes.ToAPIErrWithErr(e.Code, e.Cause) default: switch { case errors.Is(err, errConfigNotFound): diff --git a/cmd/admin-handlers-site-replication.go b/cmd/admin-handlers-site-replication.go index 0c49d13f0..76a4b5724 100644 --- a/cmd/admin-handlers-site-replication.go +++ b/cmd/admin-handlers-site-replication.go @@ -45,16 +45,16 @@ func (a adminAPIHandlers) SiteReplicationAdd(w http.ResponseWriter, r *http.Requ } var sites []madmin.PeerSite - errCode := readJSONBody(ctx, r.Body, &sites, cred.SecretKey) - if errCode != ErrNone { - writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(errCode), r.URL) + err := parseJSONBody(ctx, r.Body, &sites, cred.SecretKey) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return } - status, errInfo := globalSiteReplicationSys.AddPeerClusters(ctx, sites) - if errInfo.Code != ErrNone { - logger.LogIf(ctx, errInfo) - writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(errInfo.Code, errInfo.Cause), r.URL) + status, err := globalSiteReplicationSys.AddPeerClusters(ctx, sites) + if err != nil { + logger.LogIf(ctx, err) + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return } @@ -82,16 +82,14 @@ func (a adminAPIHandlers) SRInternalJoin(w http.ResponseWriter, r *http.Request) } var joinArg madmin.SRInternalJoinReq - errCode := readJSONBody(ctx, r.Body, &joinArg, cred.SecretKey) - if errCode != ErrNone { - writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(errCode), r.URL) + if err := parseJSONBody(ctx, r.Body, &joinArg, cred.SecretKey); err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return } - errInfo := globalSiteReplicationSys.InternalJoinReq(ctx, joinArg) - if errInfo.Code != ErrNone { - logger.LogIf(ctx, errInfo) - writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(errInfo.Code, errInfo.Cause), r.URL) + if err := globalSiteReplicationSys.InternalJoinReq(ctx, joinArg); err != nil { + logger.LogIf(ctx, err) + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return } } @@ -114,7 +112,7 @@ func (a adminAPIHandlers) SRInternalBucketOps(w http.ResponseWriter, r *http.Req var err error switch operation { default: - err = errInvalidArgument + err = errSRInvalidRequest(errInvalidArgument) case madmin.MakeWithVersioningBktOp: _, isLockEnabled := r.Form["lockEnabled"] _, isVersioningEnabled := r.Form["versioningEnabled"] @@ -151,16 +149,15 @@ func (a adminAPIHandlers) SRInternalReplicateIAMItem(w http.ResponseWriter, r *h } var item madmin.SRIAMItem - errCode := readJSONBody(ctx, r.Body, &item, "") - if errCode != ErrNone { - writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(errCode), r.URL) + if err := parseJSONBody(ctx, r.Body, &item, ""); err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return } var err error switch item.Type { default: - err = errInvalidArgument + err = errSRInvalidRequest(errInvalidArgument) case madmin.SRIAMItemPolicy: if item.Policy == nil { err = globalSiteReplicationSys.PeerAddPolicyHandler(ctx, item.Name, nil) @@ -202,16 +199,15 @@ func (a adminAPIHandlers) SRInternalReplicateBucketItem(w http.ResponseWriter, r } var item madmin.SRBucketMeta - errCode := readJSONBody(ctx, r.Body, &item, "") - if errCode != ErrNone { - writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(errCode), r.URL) + if err := parseJSONBody(ctx, r.Body, &item, ""); err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) return } var err error switch item.Type { default: - err = errInvalidArgument + err = errSRInvalidRequest(errInvalidArgument) case madmin.SRBucketMetaTypePolicy: if item.Policy == nil { err = globalSiteReplicationSys.PeerBucketPolicyHandler(ctx, item.Bucket, nil) @@ -293,24 +289,25 @@ func (a adminAPIHandlers) SRInternalGetIDPSettings(w http.ResponseWriter, r *htt } } -func readJSONBody(ctx context.Context, body io.Reader, v interface{}, encryptionKey string) APIErrorCode { +func parseJSONBody(ctx context.Context, body io.Reader, v interface{}, encryptionKey string) error { data, err := ioutil.ReadAll(body) if err != nil { - return ErrInvalidRequest + return SRError{ + Cause: err, + Code: ErrSiteReplicationInvalidRequest, + } } if encryptionKey != "" { data, err = madmin.DecryptData(encryptionKey, bytes.NewReader(data)) if err != nil { logger.LogIf(ctx, err) - return ErrInvalidRequest + return SRError{ + Cause: err, + Code: ErrSiteReplicationInvalidRequest, + } } } - err = json.Unmarshal(data, v) - if err != nil { - return ErrAdminConfigBadJSON - } - - return ErrNone + return json.Unmarshal(data, v) } diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index fb5ec3f25..42ea5a71d 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -1280,7 +1280,7 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http. } if globalDNSConfig != nil { if err2 := globalDNSConfig.Put(bucket); err2 != nil { - logger.LogIf(ctx, fmt.Errorf("Unable to restore bucket DNS entry %w, pl1ease fix it manually", err2)) + logger.LogIf(ctx, fmt.Errorf("Unable to restore bucket DNS entry %w, please fix it manually", err2)) } } writeErrorResponse(ctx, w, apiErr, r.URL) diff --git a/cmd/site-replication.go b/cmd/site-replication.go index e46f30e04..669b1d9a3 100644 --- a/cmd/site-replication.go +++ b/cmd/site-replication.go @@ -40,7 +40,6 @@ import ( "github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio/internal/auth" sreplication "github.com/minio/minio/internal/bucket/replication" - "github.com/minio/minio/internal/bucket/versioning" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/bucket/policy" iampolicy "github.com/minio/pkg/iam/policy" @@ -56,11 +55,26 @@ const ( ) var ( - errSRCannotJoin = errors.New("this site is already configured for site-replication") - errSRDuplicateSites = errors.New("duplicate sites provided for site-replication") - errSRSelfNotFound = errors.New("none of the given sites correspond to the current one") - errSRPeerNotFound = errors.New("peer not found") - errSRNotEnabled = errors.New("site replication is not enabled") + errSRCannotJoin = SRError{ + Cause: errors.New("this site is already configured for site-replication"), + Code: ErrSiteReplicationInvalidRequest, + } + errSRDuplicateSites = SRError{ + Cause: errors.New("duplicate sites provided for site-replication"), + Code: ErrSiteReplicationInvalidRequest, + } + errSRSelfNotFound = SRError{ + Cause: errors.New("none of the given sites correspond to the current one"), + Code: ErrSiteReplicationInvalidRequest, + } + errSRPeerNotFound = SRError{ + Cause: errors.New("peer not found"), + Code: ErrSiteReplicationInvalidRequest, + } + errSRNotEnabled = SRError{ + Cause: errors.New("site replication is not enabled"), + Code: ErrSiteReplicationInvalidRequest, + } ) func errSRInvalidRequest(err error) SRError { @@ -309,7 +323,7 @@ func (c *SiteReplicationSys) getSiteStatuses(ctx context.Context, sites []madmin } // AddPeerClusters - add cluster sites for replication configuration. -func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, psites []madmin.PeerSite) (madmin.ReplicateAddStatus, SRError) { +func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, psites []madmin.PeerSite) (madmin.ReplicateAddStatus, error) { sites, serr := c.getSiteStatuses(ctx, psites) if serr.Cause != nil { return madmin.ReplicateAddStatus{}, serr @@ -335,7 +349,7 @@ func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, psites []madmi for i, v := range sites { // deploymentIDs must be unique if deploymentIDsSet.Contains(v.DeploymentID) { - return madmin.ReplicateAddStatus{}, errSRInvalidRequest(errSRDuplicateSites) + return madmin.ReplicateAddStatus{}, errSRDuplicateSites } deploymentIDsSet.Add(v.DeploymentID) @@ -351,32 +365,32 @@ func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, psites []madmi if c.enabled { // If current cluster is already SR enabled and no new site being added ,fail. if currDeploymentIDsSet.Equals(deploymentIDsSet) { - return madmin.ReplicateAddStatus{}, errSRInvalidRequest(errSRCannotJoin) + return madmin.ReplicateAddStatus{}, errSRCannotJoin } if len(currDeploymentIDsSet.Intersection(deploymentIDsSet)) != len(currDeploymentIDsSet) { diffSlc := getMissingSiteNames(currDeploymentIDsSet, deploymentIDsSet, currSites.Sites) - return madmin.ReplicateAddStatus{}, errSRInvalidRequest(fmt.Errorf("All existing replicated sites must be specified - missing %s", strings.Join(diffSlc, " "))) + return madmin.ReplicateAddStatus{}, errSRInvalidRequest(fmt.Errorf("all existing replicated sites must be specified - missing %s", strings.Join(diffSlc, " "))) } } // For this `add` API, either all clusters must be empty or the local // cluster must be the only one having some buckets. if localHasBuckets && nonLocalPeerWithBuckets != "" { - return madmin.ReplicateAddStatus{}, errSRInvalidRequest(errors.New("Only one cluster may have data when configuring site replication")) + return madmin.ReplicateAddStatus{}, errSRInvalidRequest(errors.New("only one cluster may have data when configuring site replication")) } if !localHasBuckets && nonLocalPeerWithBuckets != "" { - return madmin.ReplicateAddStatus{}, errSRInvalidRequest(fmt.Errorf("Please send your request to the cluster containing data/buckets: %s", nonLocalPeerWithBuckets)) + return madmin.ReplicateAddStatus{}, errSRInvalidRequest(fmt.Errorf("please send your request to the cluster containing data/buckets: %s", nonLocalPeerWithBuckets)) } // validate that all clusters are using the same (LDAP based) // external IDP. - pass, verr := c.validateIDPSettings(ctx, sites) - if verr.Cause != nil { - return madmin.ReplicateAddStatus{}, verr + pass, err := c.validateIDPSettings(ctx, sites) + if err != nil { + return madmin.ReplicateAddStatus{}, err } if !pass { - return madmin.ReplicateAddStatus{}, errSRInvalidRequest(fmt.Errorf("All cluster sites must have the same (LDAP) IDP settings.")) + return madmin.ReplicateAddStatus{}, errSRInvalidRequest(errors.New("all cluster sites must have the same (LDAP) IDP settings")) } // FIXME: Ideally, we also need to check if there are any global IAM @@ -429,7 +443,7 @@ func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, psites []madmi addedCount := 0 var ( - peerAddErr SRError + peerAddErr error admClient *madmin.AdminClient ) @@ -456,7 +470,7 @@ func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, psites []madmi addedCount++ } - if peerAddErr.Cause != nil { + if peerAddErr != nil { if addedCount == 0 { return madmin.ReplicateAddStatus{}, peerAddErr } @@ -469,8 +483,9 @@ func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, psites []madmi ErrDetail: peerAddErr.Error(), } - return partial, SRError{} + return partial, nil } + // Other than handling existing buckets, we can now save the cluster // replication configuration state. state := srState{ @@ -478,29 +493,29 @@ func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, psites []madmi Peers: joinReq.Peers, ServiceAccountAccessKey: svcCred.AccessKey, } - err = c.saveToDisk(ctx, state) - if err != nil { + + if err = c.saveToDisk(ctx, state); err != nil { return madmin.ReplicateAddStatus{ Status: madmin.ReplicateAddStatusPartial, ErrDetail: fmt.Sprintf("unable to save cluster-replication state on local: %v", err), - }, SRError{} + }, nil } result := madmin.ReplicateAddStatus{ Success: true, Status: madmin.ReplicateAddStatusSuccess, } - initialSyncErr := c.syncLocalToPeers(ctx) - if initialSyncErr.Code != ErrNone { - result.InitialSyncErrorMessage = initialSyncErr.Error() + + if err := c.syncLocalToPeers(ctx); err != nil { + result.InitialSyncErrorMessage = err.Error() } - return result, SRError{} + return result, nil } // InternalJoinReq - internal API handler to respond to a peer cluster's request // to join. -func (c *SiteReplicationSys) InternalJoinReq(ctx context.Context, arg madmin.SRInternalJoinReq) SRError { +func (c *SiteReplicationSys) InternalJoinReq(ctx context.Context, arg madmin.SRInternalJoinReq) error { var ourName string for d, p := range arg.Peers { if d == globalDeploymentID { @@ -509,7 +524,7 @@ func (c *SiteReplicationSys) InternalJoinReq(ctx context.Context, arg madmin.SRI } } if ourName == "" { - return errSRInvalidRequest(errSRSelfNotFound) + return errSRSelfNotFound } _, _, err := globalIAMSys.GetServiceAccount(ctx, arg.SvcAcctAccessKey) @@ -528,11 +543,10 @@ func (c *SiteReplicationSys) InternalJoinReq(ctx context.Context, arg madmin.SRI Peers: arg.Peers, ServiceAccountAccessKey: arg.SvcAcctAccessKey, } - err = c.saveToDisk(ctx, state) - if err != nil { + if err = c.saveToDisk(ctx, state); err != nil { return errSRBackendIssue(fmt.Errorf("unable to save cluster-replication state to disk on %s: %v", ourName, err)) } - return SRError{} + return nil } // GetIDPSettings returns info about the configured identity provider. It is @@ -547,7 +561,7 @@ func (c *SiteReplicationSys) GetIDPSettings(ctx context.Context) madmin.IDPSetti } } -func (c *SiteReplicationSys) validateIDPSettings(ctx context.Context, peers []PeerSiteInfo) (bool, SRError) { +func (c *SiteReplicationSys) validateIDPSettings(ctx context.Context, peers []PeerSiteInfo) (bool, error) { s := make([]madmin.IDPSettings, 0, len(peers)) for _, v := range peers { if v.self { @@ -569,15 +583,15 @@ func (c *SiteReplicationSys) validateIDPSettings(ctx context.Context, peers []Pe for _, v := range s { if !v.IsLDAPEnabled { - return false, SRError{} + return false, nil } } for i := 1; i < len(s); i++ { if s[i] != s[0] { - return false, SRError{} + return false, nil } } - return true, SRError{} + return true, nil } // GetClusterInfo - returns site replication information. @@ -620,7 +634,11 @@ func (c *SiteReplicationSys) MakeBucketHook(ctx context.Context, bucket string, optsMap["location"] = opts.Location } if opts.LockEnabled { - optsMap["lockEnabled"] = "" + optsMap["lockEnabled"] = "true" + optsMap["versioningEnabled"] = "true" + } + if opts.VersioningEnabled { + optsMap["versioningEnabled"] = "true" } // Create bucket and enable versioning on all peers. @@ -714,6 +732,7 @@ func (c *SiteReplicationSys) PeerBucketMakeWithVersioningHandler(ctx context.Con if objAPI == nil { return errServerNotInitialized } + err := objAPI.MakeBucketWithLocation(ctx, bucket, opts) if err != nil { // Check if this is a bucket exists error. @@ -729,27 +748,25 @@ func (c *SiteReplicationSys) PeerBucketMakeWithVersioningHandler(ctx context.Con globalNotificationSys.LoadBucketMetadata(GlobalContext, bucket) } - // Enable versioning on the bucket. - config, err := globalBucketVersioningSys.Get(bucket) + meta, err := globalBucketMetadataSys.Get(bucket) if err != nil { + logger.LogIf(ctx, c.annotateErr("MakeBucketErr on peer call", err)) return wrapSRErr(err) } - if !config.Enabled() { - verConf := versioning.Versioning{ - Status: versioning.Enabled, - } - // FIXME: need to confirm if skipping object lock and - // versioning-suspended state checks are valid here. - cfgData, err := xml.Marshal(verConf) - if err != nil { - return wrapSRErr(err) - } - err = globalBucketMetadataSys.Update(bucket, bucketVersioningConfig, cfgData) - if err != nil { - logger.LogIf(ctx, c.annotateErr("Versioning enabling error on peer call", err)) - return wrapSRErr(err) - } + + meta.VersioningConfigXML = enabledBucketVersioningConfig + if opts.LockEnabled { + meta.ObjectLockConfigXML = enabledBucketObjectLockConfig } + + if err := meta.Save(context.Background(), objAPI); err != nil { + return wrapSRErr(err) + } + + globalBucketMetadataSys.Set(bucket, meta) + + // Load updated bucket metadata into memory as new metadata updated. + globalNotificationSys.LoadBucketMetadata(GlobalContext, bucket) return nil } @@ -931,14 +948,24 @@ func (c *SiteReplicationSys) PeerBucketDeleteHandler(ctx context.Context, bucket return errSRNotEnabled } - // FIXME: need to handle cases where globalDNSConfig is set. - objAPI := newObjectLayerFn() if objAPI == nil { return errServerNotInitialized } + + if globalDNSConfig != nil { + if err := globalDNSConfig.Delete(bucket); err != nil { + return err + } + } + err := objAPI.DeleteBucket(ctx, bucket, DeleteBucketOptions{Force: forceDelete}) if err != nil { + if globalDNSConfig != nil { + if err2 := globalDNSConfig.Put(bucket); err2 != nil { + logger.LogIf(ctx, fmt.Errorf("Unable to restore bucket DNS entry %w, please fix it manually", err2)) + } + } return err } @@ -1008,7 +1035,7 @@ func (c *SiteReplicationSys) PeerAddPolicyHandler(ctx context.Context, policyNam // PeerSvcAccChangeHandler - copies service-account change to local. func (c *SiteReplicationSys) PeerSvcAccChangeHandler(ctx context.Context, change *madmin.SRSvcAccChange) error { if change == nil { - return errInvalidArgument + return errSRInvalidRequest(errInvalidArgument) } switch { case change.Create != nil: @@ -1073,7 +1100,7 @@ func (c *SiteReplicationSys) PeerSvcAccChangeHandler(ctx context.Context, change // PeerPolicyMappingHandler - copies policy mapping to local. func (c *SiteReplicationSys) PeerPolicyMappingHandler(ctx context.Context, mapping *madmin.SRPolicyMapping) error { if mapping == nil { - return errInvalidArgument + return errSRInvalidRequest(errInvalidArgument) } err := globalIAMSys.PolicyDBSet(ctx, mapping.UserOrGroup, mapping.Policy, mapping.IsGroup) if err != nil { @@ -1085,7 +1112,7 @@ func (c *SiteReplicationSys) PeerPolicyMappingHandler(ctx context.Context, mappi // PeerSTSAccHandler - replicates STS credential locally. func (c *SiteReplicationSys) PeerSTSAccHandler(ctx context.Context, stsCred *madmin.SRSTSCredential) error { if stsCred == nil { - return errInvalidArgument + return errSRInvalidRequest(errInvalidArgument) } // Verify the session token of the stsCred @@ -1261,7 +1288,7 @@ func (c *SiteReplicationSys) getAdminClient(ctx context.Context, deploymentID st func (c *SiteReplicationSys) getPeerCreds() (*auth.Credentials, error) { creds, ok := globalIAMSys.store.GetUser(c.state.ServiceAccountAccessKey) if !ok { - return nil, errors.New("site replication service account not found!") + return nil, errors.New("site replication service account not found") } return &creds, nil } @@ -1269,7 +1296,7 @@ func (c *SiteReplicationSys) getPeerCreds() (*auth.Credentials, error) { // syncLocalToPeers is used when initially configuring site replication, to // copy existing buckets, their settings, service accounts and policies to all // new peers. -func (c *SiteReplicationSys) syncLocalToPeers(ctx context.Context) SRError { +func (c *SiteReplicationSys) syncLocalToPeers(ctx context.Context) error { // If local has buckets, enable versioning on them, create them on peers // and setup replication rules. objAPI := newObjectLayerFn() @@ -1518,7 +1545,7 @@ func (c *SiteReplicationSys) syncLocalToPeers(ctx context.Context) SRError { } } - return SRError{} + return nil } // Concurrency helpers diff --git a/docs/site-replication/run-multi-site.sh b/docs/site-replication/run-multi-site.sh index f51eed743..54899b394 100755 --- a/docs/site-replication/run-multi-site.sh +++ b/docs/site-replication/run-multi-site.sh @@ -79,26 +79,26 @@ if [ $? -eq 0 ]; then fi ./mc admin user info minio1 "uid=dillon,ou=people,ou=swengg,dc=min,dc=io" -if [ $? -eq 1 ]; then +if [ $? -ne 0 ]; then echo "policy mapping missing, exiting.." exit_1; fi ./mc admin user info minio2 "uid=dillon,ou=people,ou=swengg,dc=min,dc=io" -if [ $? -eq 1 ]; then +if [ $? -ne 0 ]; then echo "policy mapping missing, exiting.." exit_1; fi ./mc admin user info minio3 "uid=dillon,ou=people,ou=swengg,dc=min,dc=io" -if [ $? -eq 1 ]; then +if [ $? -ne 0 ]; then echo "policy mapping missing, exiting.." exit_1; fi # LDAP simple user ./mc admin user svcacct add minio2 dillon --access-key testsvc --secret-key testsvc123 -if [ $? -eq 1 ]; then +if [ $? -ne 0 ]; then echo "adding svc account failed, exiting.." exit_1; fi @@ -106,19 +106,19 @@ fi sleep 10 ./mc admin user svcacct info minio1 testsvc -if [ $? -eq 1 ]; then +if [ $? -ne 0 ]; then echo "svc account not mirrored, exiting.." exit_1; fi ./mc admin user svcacct info minio2 testsvc -if [ $? -eq 1 ]; then +if [ $? -ne 0 ]; then echo "svc account not mirrored, exiting.." exit_1; fi ./mc admin user svcacct rm minio1 testsvc -if [ $? -eq 1 ]; then +if [ $? -ne 0 ]; then echo "removing svc account failed, exiting.." exit_1; fi @@ -140,13 +140,13 @@ fi sleep 5 ./mc stat minio2/newbucket -if [ $? -eq 1 ]; then +if [ $? -ne 0 ]; then echo "expecting bucket to be present. exiting.." exit_1; fi ./mc stat minio3/newbucket -if [ $? -eq 1 ]; then +if [ $? -ne 0 ]; then echo "expecting bucket to be present. exiting.." exit_1; fi @@ -155,13 +155,13 @@ fi sleep 5 ./mc stat minio1/newbucket/README.md -if [ $? -eq 1 ]; then +if [ $? -ne 0 ]; then echo "expecting object to be present. exiting.." exit_1; fi ./mc stat minio3/newbucket/README.md -if [ $? -eq 1 ]; then +if [ $? -ne 0 ]; then echo "expecting object to be present. exiting.." exit_1; fi @@ -180,3 +180,28 @@ if [ $? -eq 0 ]; then echo "expected file to be deleted, exiting.." exit_1; fi + +./mc mb --with-lock minio3/newbucket-olock +sleep 5 + +enabled_minio2=$(./mc stat --json minio2/newbucket-olock| jq -r .metadata.ObjectLock.enabled) +if [ $? -ne 0 ]; then + echo "expected bucket to be mirrored with object-lock but not present, exiting..." + exit_1; +fi + +if [ "${enabled_minio2}" != "Enabled" ]; then + echo "expected bucket to be mirrored with object-lock enabled, exiting..." + exit_1; +fi + +enabled_minio1=$(./mc stat --json minio1/newbucket-olock| jq -r .metadata.ObjectLock.enabled) +if [ $? -ne 0 ]; then + echo "expected bucket to be mirrored with object-lock but not present, exiting..." + exit_1; +fi + +if [ "${enabled_minio1}" != "Enabled" ]; then + echo "expected bucket to be mirrored with object-lock enabled, exiting..." + exit_1; +fi