diff --git a/cmd/admin-handler-utils.go b/cmd/admin-handler-utils.go index 2cdc5cfa2..6e7d4417c 100644 --- a/cmd/admin-handler-utils.go +++ b/cmd/admin-handler-utils.go @@ -96,6 +96,12 @@ func toAdminAPIErr(ctx context.Context, err error) APIError { } default: switch { + case errors.Is(err, errTooManyPolicies): + apiErr = APIError{ + Code: "XMinioAdminInvalidRequest", + Description: err.Error(), + HTTPStatusCode: http.StatusBadRequest, + } case errors.Is(err, errDecommissionAlreadyRunning): apiErr = APIError{ Code: "XMinioDecommissionNotAllowed", diff --git a/cmd/api-errors.go b/cmd/api-errors.go index 0104256fd..b02fa0b73 100644 --- a/cmd/api-errors.go +++ b/cmd/api-errors.go @@ -2231,7 +2231,7 @@ func toAPIError(ctx context.Context, err error) APIError { } case crypto.Error: apiErr = APIError{ - Code: "XMinIOEncryptionError", + Code: "XMinioEncryptionError", Description: e.Error(), HTTPStatusCode: http.StatusBadRequest, } diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 1ad4ffeb9..53b7ffbf4 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -806,19 +806,14 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req } // Proceed to creating a bucket. - err := objectAPI.MakeBucketWithLocation(ctx, bucket, opts) - if _, ok := err.(BucketExists); ok { - // Though bucket exists locally, we send the site-replication - // hook to ensure all sites have this bucket. If the hook - // succeeds, the client will still receive a bucket exists - // message. - err2 := globalSiteReplicationSys.MakeBucketHook(ctx, bucket, opts) - if err2 != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return + if err := objectAPI.MakeBucketWithLocation(ctx, bucket, opts); err != nil { + if _, ok := err.(BucketExists); ok { + // Though bucket exists locally, we send the site-replication + // hook to ensure all sites have this bucket. If the hook + // succeeds, the client will still receive a bucket exists + // message. + globalSiteReplicationSys.MakeBucketHook(ctx, bucket, opts) } - } - if err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) return } @@ -827,8 +822,7 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req globalNotificationSys.LoadBucketMetadata(GlobalContext, bucket) // Call site replication hook - err = globalSiteReplicationSys.MakeBucketHook(ctx, bucket, opts) - if err != nil { + if err := globalSiteReplicationSys.MakeBucketHook(ctx, bucket, opts); err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) return } diff --git a/cmd/site-replication.go b/cmd/site-replication.go index 878183525..5418ca64e 100644 --- a/cmd/site-replication.go +++ b/cmd/site-replication.go @@ -149,7 +149,14 @@ type SRError struct { } func (c SRError) Error() string { - return c.Cause.Error() + if c.Cause != nil { + return c.Cause.Error() + } + return "" +} + +func (c SRError) Unwrap() error { + return c.Cause } func wrapSRErr(err error) SRError { @@ -249,13 +256,13 @@ func (c *SiteReplicationSys) saveToDisk(ctx context.Context, state srState) erro if objAPI == nil { return errServerNotInitialized } - err = saveConfig(ctx, objAPI, getSRStateFilePath(), buf) - if err != nil { + + if err = saveConfig(ctx, objAPI, getSRStateFilePath(), buf); err != nil { return err } - for _, e := range globalNotificationSys.ReloadSiteReplicationConfig(ctx) { - logger.LogIf(ctx, e) + for _, err := range globalNotificationSys.ReloadSiteReplicationConfig(ctx) { + logger.LogIf(ctx, err) } c.Lock() @@ -606,6 +613,14 @@ func (c *SiteReplicationSys) GetClusterInfo(ctx context.Context) (info madmin.Si return info, nil } +const ( + makeBucketWithVersion = "MakeBucketWithVersioning" + configureReplication = "ConfigureReplication" + deleteBucket = "DeleteBucket" + replicateIAMItem = "SRPeerReplicateIAMItem" + replicateBucketMetadata = "SRPeerReplicateBucketMeta" +) + // MakeBucketHook - called during a regular make bucket call when cluster // replication is enabled. It is responsible for the creation of the same bucket // on remote clusters, and creating replication rules on local and peer @@ -637,9 +652,7 @@ func (c *SiteReplicationSys) MakeBucketHook(ctx context.Context, bucket string, // Create bucket and enable versioning on all peers. makeBucketConcErr := c.concDo( func() error { - err := c.PeerBucketMakeWithVersioningHandler(ctx, bucket, opts) - logger.LogIf(ctx, c.annotateErr("MakeWithVersioning", err)) - return err + return c.annotateErr(makeBucketWithVersion, c.PeerBucketMakeWithVersioningHandler(ctx, bucket, opts)) }, func(deploymentID string, p madmin.PeerInfo) error { admClient, err := c.getAdminClient(ctx, deploymentID) @@ -647,27 +660,15 @@ func (c *SiteReplicationSys) MakeBucketHook(ctx context.Context, bucket string, return err } - err = admClient.SRPeerBucketOps(ctx, bucket, madmin.MakeWithVersioningBktOp, optsMap) - logger.LogIf(ctx, c.annotatePeerErr(p.Name, "MakeWithVersioning", err)) - return err + return c.annotatePeerErr(p.Name, makeBucketWithVersion, admClient.SRPeerBucketOps(ctx, bucket, madmin.MakeWithVersioningBktOp, optsMap)) }, + makeBucketWithVersion, ) - // If all make-bucket-and-enable-versioning operations failed, nothing - // more to do. - if makeBucketConcErr.allFailed() { - return makeBucketConcErr - } - // Log any errors in make-bucket operations. - logger.LogIf(ctx, makeBucketConcErr.summaryErr) - - // Create bucket remotes and add replication rules for the bucket on - // self and peers. + // Create bucket remotes and add replication rules for the bucket on self and peers. makeRemotesConcErr := c.concDo( func() error { - err := c.PeerBucketConfigureReplHandler(ctx, bucket) - logger.LogIf(ctx, c.annotateErr("ConfigureRepl", err)) - return err + return c.annotateErr(configureReplication, c.PeerBucketConfigureReplHandler(ctx, bucket)) }, func(deploymentID string, p madmin.PeerInfo) error { admClient, err := c.getAdminClient(ctx, deploymentID) @@ -675,13 +676,16 @@ func (c *SiteReplicationSys) MakeBucketHook(ctx context.Context, bucket string, return err } - err = admClient.SRPeerBucketOps(ctx, bucket, madmin.ConfigureReplBktOp, nil) - logger.LogIf(ctx, c.annotatePeerErr(p.Name, "ConfigureRepl", err)) - return err + return c.annotatePeerErr(p.Name, configureReplication, admClient.SRPeerBucketOps(ctx, bucket, madmin.ConfigureReplBktOp, nil)) }, + configureReplication, ) - err := makeRemotesConcErr.summaryErr - if err != nil { + + if err := errors.Unwrap(makeBucketConcErr); err != nil { + return err + } + + if err := errors.Unwrap(makeRemotesConcErr); err != nil { return err } @@ -706,17 +710,17 @@ func (c *SiteReplicationSys) DeleteBucketHook(ctx context.Context, bucket string } // Send bucket delete to other clusters. - cErr := c.concDo(nil, func(deploymentID string, p madmin.PeerInfo) error { + cerr := c.concDo(nil, func(deploymentID string, p madmin.PeerInfo) error { admClient, err := c.getAdminClient(ctx, deploymentID) if err != nil { return wrapSRErr(err) } - err = admClient.SRPeerBucketOps(ctx, bucket, op, nil) - logger.LogIf(ctx, c.annotatePeerErr(p.Name, "DeleteBucket", err)) - return err - }) - return cErr.summaryErr + return c.annotatePeerErr(p.Name, deleteBucket, admClient.SRPeerBucketOps(ctx, bucket, op, nil)) + }, + deleteBucket, + ) + return errors.Unwrap(cerr) } // PeerBucketMakeWithVersioningHandler - creates bucket and enables versioning. @@ -732,8 +736,7 @@ func (c *SiteReplicationSys) PeerBucketMakeWithVersioningHandler(ctx context.Con _, ok1 := err.(BucketExists) _, ok2 := err.(BucketAlreadyExists) if !ok1 && !ok2 { - logger.LogIf(ctx, c.annotateErr("MakeBucketErr on peer call", err)) - return wrapSRErr(err) + return wrapSRErr(c.annotateErr(makeBucketWithVersion, err)) } } else { // Load updated bucket metadata into memory as new @@ -743,8 +746,7 @@ func (c *SiteReplicationSys) PeerBucketMakeWithVersioningHandler(ctx context.Con meta, err := globalBucketMetadataSys.Get(bucket) if err != nil { - logger.LogIf(ctx, c.annotateErr("MakeBucketErr on peer call", err)) - return wrapSRErr(err) + return wrapSRErr(c.annotateErr(makeBucketWithVersion, err)) } meta.VersioningConfigXML = enabledBucketVersioningConfig @@ -805,8 +807,7 @@ func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context, bucketTarget.Arn = globalBucketTargetSys.getRemoteARN(bucket, &bucketTarget) err := globalBucketTargetSys.SetTarget(ctx, bucket, &bucketTarget, false) if err != nil { - logger.LogIf(ctx, c.annotatePeerErr(peer.Name, "Bucket target creation error", err)) - return err + return c.annotatePeerErr(peer.Name, "Bucket target creation error", err) } targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket) if err != nil { @@ -901,11 +902,10 @@ func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context, default: err = replicationConfig.AddRule(opts) } - if err != nil { - logger.LogIf(ctx, c.annotatePeerErr(peer.Name, "Error adding bucket replication rule", err)) - return err + return c.annotatePeerErr(peer.Name, "Error adding bucket replication rule", err) } + // Now convert the configuration back to server's type so we can // do some validation. newReplCfgBytes, err := xml.Marshal(replicationConfig) @@ -929,9 +929,9 @@ func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context, if err != nil { return err } + err = globalBucketMetadataSys.Update(ctx, bucket, bucketReplicationConfig, replCfgData) - logger.LogIf(ctx, c.annotatePeerErr(peer.Name, "Error updating replication configuration", err)) - return err + return c.annotatePeerErr(peer.Name, "Error updating replication configuration", err) } c.RLock() @@ -941,11 +941,9 @@ func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context, if d == globalDeploymentID { continue } - if err := configurePeerFn(d, peer); err != nil { - errMap[d] = err - } + errMap[d] = configurePeerFn(d, peer) } - return c.toErrorFromErrMap(errMap) + return c.toErrorFromErrMap(errMap, configureReplication) } // PeerBucketDeleteHandler - deletes bucket on local in response to a delete @@ -1009,17 +1007,17 @@ func (c *SiteReplicationSys) IAMChangeHook(ctx context.Context, item madmin.SRIA return nil } - cErr := c.concDo(nil, func(d string, p madmin.PeerInfo) error { + cerr := c.concDo(nil, func(d string, p madmin.PeerInfo) error { admClient, err := c.getAdminClient(ctx, d) if err != nil { return wrapSRErr(err) } - err = admClient.SRPeerReplicateIAMItem(ctx, item) - logger.LogIf(ctx, c.annotatePeerErr(p.Name, "SRPeerReplicateIAMItem", err)) - return err - }) - return cErr.summaryErr + return c.annotatePeerErr(p.Name, replicateIAMItem, admClient.SRPeerReplicateIAMItem(ctx, item)) + }, + replicateIAMItem, + ) + return errors.Unwrap(cerr) } // PeerAddPolicyHandler - copies IAM policy to local. A nil policy argument, @@ -1165,14 +1163,13 @@ func (c *SiteReplicationSys) PeerSTSAccHandler(ctx context.Context, stsCred *mad // Verify the session token of the stsCred claims, err := auth.ExtractClaims(stsCred.SessionToken, globalActiveCred.SecretKey) if err != nil { - logger.LogIf(ctx, err) - return fmt.Errorf("STS credential could not be verified") + return fmt.Errorf("STS credential could not be verified: %w", err) } mapClaims := claims.Map() expiry, err := auth.ExpToInt64(mapClaims["exp"]) if err != nil { - return fmt.Errorf("Expiry claim was not found: %v", mapClaims) + return fmt.Errorf("Expiry claim was not found: %v: %w", mapClaims, err) } cred := auth.Credentials{ @@ -1191,7 +1188,7 @@ func (c *SiteReplicationSys) PeerSTSAccHandler(ctx context.Context, stsCred *mad // Need to lookup the groups from LDAP. _, ldapGroups, err := globalLDAPConfig.LookupUserDN(ldapUser) if err != nil { - return fmt.Errorf("unable to query LDAP server for %s: %v", ldapUser, err) + return fmt.Errorf("unable to query LDAP server for %s: %w", ldapUser, err) } cred.Groups = ldapGroups @@ -1199,7 +1196,7 @@ func (c *SiteReplicationSys) PeerSTSAccHandler(ctx context.Context, stsCred *mad // Set these credentials to IAM. if err := globalIAMSys.SetTempUser(ctx, cred.AccessKey, cred, stsCred.ParentPolicyMapping); err != nil { - return fmt.Errorf("unable to save STS credential and/or parent policy mapping: %v", err) + return fmt.Errorf("unable to save STS credential and/or parent policy mapping: %w", err) } return nil @@ -1217,17 +1214,17 @@ func (c *SiteReplicationSys) BucketMetaHook(ctx context.Context, item madmin.SRB return nil } - cErr := c.concDo(nil, func(d string, p madmin.PeerInfo) error { + cerr := c.concDo(nil, func(d string, p madmin.PeerInfo) error { admClient, err := c.getAdminClient(ctx, d) if err != nil { return wrapSRErr(err) } - err = admClient.SRPeerReplicateBucketMeta(ctx, item) - logger.LogIf(ctx, c.annotatePeerErr(p.Name, "SRPeerReplicateBucketMeta", err)) - return err - }) - return cErr.summaryErr + return c.annotatePeerErr(p.Name, replicateBucketMetadata, admClient.SRPeerReplicateBucketMeta(ctx, item)) + }, + replicateBucketMetadata, + ) + return errors.Unwrap(cerr) } // PeerBucketVersioningHandler - updates versioning config to local cluster. @@ -1795,52 +1792,63 @@ func (c *SiteReplicationSys) syncToAllPeers(ctx context.Context) error { // Concurrency helpers type concErr struct { - numActions int errMap map[string]error summaryErr error } func (c concErr) Error() string { - return c.summaryErr.Error() + if c.summaryErr != nil { + return c.summaryErr.Error() + } + return "" } -func (c concErr) allFailed() bool { - return len(c.errMap) == c.numActions +func (c concErr) Unwrap() error { + return c.summaryErr } -func (c *SiteReplicationSys) toErrorFromErrMap(errMap map[string]error) error { +func (c *SiteReplicationSys) toErrorFromErrMap(errMap map[string]error, actionName string) error { if len(errMap) == 0 { return nil } + var success int msgs := []string{} for d, err := range errMap { name := c.state.Peers[d].Name - msgs = append(msgs, fmt.Sprintf("Site %s (%s): %v", name, d, err)) + if err == nil { + msgs = append(msgs, fmt.Sprintf("'%s' on site %s (%s): succeeded", actionName, name, d)) + success++ + } else { + msgs = append(msgs, fmt.Sprintf("'%s' on site %s (%s): failed(%v)", actionName, name, d, err)) + } } - return fmt.Errorf("Site replication error(s): %s", strings.Join(msgs, "; ")) + if success == len(errMap) { + return nil + } + return fmt.Errorf("Site replication error(s): \n%s", strings.Join(msgs, "\n")) } -func (c *SiteReplicationSys) newConcErr(numActions int, errMap map[string]error) concErr { +func (c *SiteReplicationSys) newConcErr(errMap map[string]error, actionName string) error { return concErr{ - numActions: numActions, errMap: errMap, - summaryErr: c.toErrorFromErrMap(errMap), + summaryErr: c.toErrorFromErrMap(errMap, actionName), } } // concDo calls actions concurrently. selfActionFn is run for the current // cluster and peerActionFn is run for each peer replication cluster. -func (c *SiteReplicationSys) concDo(selfActionFn func() error, peerActionFn func(deploymentID string, p madmin.PeerInfo) error) concErr { +func (c *SiteReplicationSys) concDo(selfActionFn func() error, peerActionFn func(deploymentID string, p madmin.PeerInfo) error, actionName string) error { depIDs := make([]string, 0, len(c.state.Peers)) for d := range c.state.Peers { depIDs = append(depIDs, d) } errs := make([]error, len(c.state.Peers)) var wg sync.WaitGroup + wg.Add(len(depIDs)) for i := range depIDs { - wg.Add(1) go func(i int) { + defer wg.Done() if depIDs[i] == globalDeploymentID { if selfActionFn != nil { errs[i] = selfActionFn() @@ -1848,35 +1856,28 @@ func (c *SiteReplicationSys) concDo(selfActionFn func() error, peerActionFn func } else { errs[i] = peerActionFn(depIDs[i], c.state.Peers[depIDs[i]]) } - wg.Done() }(i) } wg.Wait() errMap := make(map[string]error, len(c.state.Peers)) for i, depID := range depIDs { - if errs[i] != nil { - errMap[depID] = errs[i] - } + errMap[depID] = errs[i] } - numActions := len(c.state.Peers) - 1 - if selfActionFn != nil { - numActions++ - } - return c.newConcErr(numActions, errMap) + return c.newConcErr(errMap, actionName) } func (c *SiteReplicationSys) annotateErr(annotation string, err error) error { if err == nil { return nil } - return fmt.Errorf("%s: %s: %v", c.state.Name, annotation, err) + return fmt.Errorf("%s: %s: %w", c.state.Name, annotation, err) } func (c *SiteReplicationSys) annotatePeerErr(dstPeer string, annotation string, err error) error { if err == nil { return nil } - return fmt.Errorf("%s->%s: %s: %v", c.state.Name, dstPeer, annotation, err) + return fmt.Errorf("%s->%s: %s: %w", c.state.Name, dstPeer, annotation, err) } // isEnabled returns true if site replication is enabled @@ -2230,6 +2231,10 @@ func (c *SiteReplicationSys) SiteReplicationStatus(ctx context.Context, objAPI O return } +const ( + replicationStatus = "ReplicationStatus" +) + // siteReplicationStatus returns the site replication status across clusters participating in site replication. func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI ObjectLayer, opts madmin.SRStatusOptions) (info srStatusInfo, err error) { c.RLock() @@ -2269,10 +2274,11 @@ func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI O sris[depIdx[deploymentID]] = srInfo return nil }, + replicationStatus, ) - if metaInfoConcErr.summaryErr != nil { - return info, errSRBackendIssue(metaInfoConcErr.summaryErr) + if err := errors.Unwrap(metaInfoConcErr); err != nil { + return info, errSRBackendIssue(err) } info.Enabled = true @@ -3520,7 +3526,8 @@ func (c *SiteReplicationSys) healTagMetadata(ctx context.Context, objAPI ObjectL Tags: latestTaggingConfig, }) if err != nil { - logger.LogIf(ctx, c.annotatePeerErr(peerName, "SRPeerReplicateBucketMeta", fmt.Errorf("Error healing tagging metadata for peer %s from peer %s : %w", peerName, latestPeerName, err))) + logger.LogIf(ctx, c.annotatePeerErr(peerName, replicateBucketMetadata, + fmt.Errorf("Error healing tagging metadata for peer %s from peer %s : %w", peerName, latestPeerName, err))) } } return nil @@ -3582,8 +3589,9 @@ func (c *SiteReplicationSys) healBucketPolicies(ctx context.Context, objAPI Obje Bucket: bucket, Policy: latestIAMPolicy, }); err != nil { - logger.LogIf(ctx, c.annotatePeerErr(peerName, "SRPeerReplicateBucketMeta", fmt.Errorf("Error healing bucket policy metadata for peer %s from peer %s : %w", - peerName, latestPeerName, err))) + logger.LogIf(ctx, c.annotatePeerErr(peerName, replicateBucketMetadata, + fmt.Errorf("Error healing bucket policy metadata for peer %s from peer %s : %w", + peerName, latestPeerName, err))) } } return nil @@ -3640,7 +3648,7 @@ func (c *SiteReplicationSys) healBucketQuotaConfig(ctx context.Context, objAPI O } if dID == globalDeploymentID { if err := globalBucketMetadataSys.Update(ctx, bucket, bucketQuotaConfigFile, latestQuotaConfigBytes); err != nil { - logger.LogIf(ctx, fmt.Errorf("Error healing quota metadata from peer site %s : %s", latestPeerName, err.Error())) + logger.LogIf(ctx, fmt.Errorf("Error healing quota metadata from peer site %s : %w", latestPeerName, err)) } continue } @@ -3656,8 +3664,9 @@ func (c *SiteReplicationSys) healBucketQuotaConfig(ctx context.Context, objAPI O Bucket: bucket, Quota: latestQuotaConfigBytes, }); err != nil { - logger.LogIf(ctx, c.annotatePeerErr(peerName, "SRPeerReplicateBucketMeta", fmt.Errorf("Error healing quota config metadata for peer %s from peer %s : %s", - peerName, latestPeerName, err.Error()))) + logger.LogIf(ctx, c.annotatePeerErr(peerName, replicateBucketMetadata, + fmt.Errorf("Error healing quota config metadata for peer %s from peer %s : %w", + peerName, latestPeerName, err))) } } return nil @@ -3729,8 +3738,9 @@ func (c *SiteReplicationSys) healVersioningMetadata(ctx context.Context, objAPI Versioning: latestVersioningConfig, }) if err != nil { - logger.LogIf(ctx, c.annotatePeerErr(peerName, "SRPeerReplicateBucketMeta", fmt.Errorf("Error healing versioning config metadata for peer %s from peer %s : %s", - peerName, latestPeerName, err.Error()))) + logger.LogIf(ctx, c.annotatePeerErr(peerName, replicateBucketMetadata, + fmt.Errorf("Error healing versioning config metadata for peer %s from peer %s : %w", + peerName, latestPeerName, err))) } } return nil @@ -3802,8 +3812,9 @@ func (c *SiteReplicationSys) healSSEMetadata(ctx context.Context, objAPI ObjectL SSEConfig: latestSSEConfig, }) if err != nil { - logger.LogIf(ctx, c.annotatePeerErr(peerName, "SRPeerReplicateBucketMeta", fmt.Errorf("Error healing SSE config metadata for peer %s from peer %s : %s", - peerName, latestPeerName, err.Error()))) + logger.LogIf(ctx, c.annotatePeerErr(peerName, replicateBucketMetadata, + fmt.Errorf("Error healing SSE config metadata for peer %s from peer %s : %w", + peerName, latestPeerName, err))) } } return nil @@ -3875,8 +3886,9 @@ func (c *SiteReplicationSys) healOLockConfigMetadata(ctx context.Context, objAPI Tags: latestObjLockConfig, }) if err != nil { - logger.LogIf(ctx, c.annotatePeerErr(peerName, "SRPeerReplicateBucketMeta", fmt.Errorf("Error healing object lock config metadata for peer %s from peer %s : %w", - peerName, latestPeerName, err))) + logger.LogIf(ctx, c.annotatePeerErr(peerName, replicateBucketMetadata, + fmt.Errorf("Error healing object lock config metadata for peer %s from peer %s : %w", + peerName, latestPeerName, err))) } } return nil @@ -3939,25 +3951,19 @@ func (c *SiteReplicationSys) healCreateMissingBucket(ctx context.Context, objAPI if dID == globalDeploymentID { err := c.PeerBucketMakeWithVersioningHandler(ctx, bucket, opts) if err != nil { - logger.LogIf(ctx, c.annotateErr("MakeWithVersioning", fmt.Errorf("error healing bucket for site replication %w from %s -> %s", - err, latestPeerName, peerName))) - return err + return c.annotateErr(makeBucketWithVersion, fmt.Errorf("error healing bucket for site replication %w from %s -> %s", + err, latestPeerName, peerName)) } } else { admClient, err := c.getAdminClient(ctx, dID) if err != nil { - logger.LogIf(ctx, c.annotateErr("ConfigureRepl", fmt.Errorf("admin client not found: %w", err))) - return err + return c.annotateErr(configureReplication, fmt.Errorf("unable to use admin client for %s: %w", dID, err)) } - err = admClient.SRPeerBucketOps(ctx, bucket, madmin.MakeWithVersioningBktOp, optsMap) - logger.LogIf(ctx, c.annotatePeerErr(peerName, "MakeWithVersioning", err)) - if err != nil { - return err + if err = admClient.SRPeerBucketOps(ctx, bucket, madmin.MakeWithVersioningBktOp, optsMap); err != nil { + return c.annotatePeerErr(peerName, makeBucketWithVersion, err) } - err = admClient.SRPeerBucketOps(ctx, bucket, madmin.ConfigureReplBktOp, nil) - logger.LogIf(ctx, c.annotatePeerErr(peerName, "ConfigureRepl", err)) - if err != nil { - return err + if err = admClient.SRPeerBucketOps(ctx, bucket, madmin.ConfigureReplBktOp, nil); err != nil { + return c.annotatePeerErr(peerName, configureReplication, err) } } } @@ -3965,8 +3971,7 @@ func (c *SiteReplicationSys) healCreateMissingBucket(ctx context.Context, objAPI // configure replication from current cluster to other clusters err := c.PeerBucketConfigureReplHandler(ctx, bucket) if err != nil { - logger.LogIf(ctx, c.annotateErr("ConfigureRepl", err)) - return err + return c.annotateErr(configureReplication, err) } } @@ -4007,10 +4012,7 @@ func (c *SiteReplicationSys) healBucketReplicationConfig(ctx context.Context, ob } if replMismatch { - err := c.PeerBucketConfigureReplHandler(ctx, bucket) - if err != nil { - logger.LogIf(ctx, c.annotateErr("ConfigureRepl", err)) - } + logger.LogIf(ctx, c.annotateErr(configureReplication, c.PeerBucketConfigureReplHandler(ctx, bucket))) } return nil }