mirror of
https://github.com/minio/minio.git
synced 2025-01-25 13:43:17 -05:00
cleanup site replication error handling (#15113)
site replication errors were printed at various random locations, repeatedly - this PR attempts to remove double logging and capture all of them at a common place. This PR also enhances the code to show partial success and errors as well.
This commit is contained in:
parent
e83e947ca3
commit
2bb6a3f4d0
@ -96,6 +96,12 @@ func toAdminAPIErr(ctx context.Context, err error) APIError {
|
|||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
switch {
|
switch {
|
||||||
|
case errors.Is(err, errTooManyPolicies):
|
||||||
|
apiErr = APIError{
|
||||||
|
Code: "XMinioAdminInvalidRequest",
|
||||||
|
Description: err.Error(),
|
||||||
|
HTTPStatusCode: http.StatusBadRequest,
|
||||||
|
}
|
||||||
case errors.Is(err, errDecommissionAlreadyRunning):
|
case errors.Is(err, errDecommissionAlreadyRunning):
|
||||||
apiErr = APIError{
|
apiErr = APIError{
|
||||||
Code: "XMinioDecommissionNotAllowed",
|
Code: "XMinioDecommissionNotAllowed",
|
||||||
|
@ -2231,7 +2231,7 @@ func toAPIError(ctx context.Context, err error) APIError {
|
|||||||
}
|
}
|
||||||
case crypto.Error:
|
case crypto.Error:
|
||||||
apiErr = APIError{
|
apiErr = APIError{
|
||||||
Code: "XMinIOEncryptionError",
|
Code: "XMinioEncryptionError",
|
||||||
Description: e.Error(),
|
Description: e.Error(),
|
||||||
HTTPStatusCode: http.StatusBadRequest,
|
HTTPStatusCode: http.StatusBadRequest,
|
||||||
}
|
}
|
||||||
|
@ -806,19 +806,14 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Proceed to creating a bucket.
|
// Proceed to creating a bucket.
|
||||||
err := objectAPI.MakeBucketWithLocation(ctx, bucket, opts)
|
if err := objectAPI.MakeBucketWithLocation(ctx, bucket, opts); err != nil {
|
||||||
if _, ok := err.(BucketExists); ok {
|
if _, ok := err.(BucketExists); ok {
|
||||||
// Though bucket exists locally, we send the site-replication
|
// Though bucket exists locally, we send the site-replication
|
||||||
// hook to ensure all sites have this bucket. If the hook
|
// hook to ensure all sites have this bucket. If the hook
|
||||||
// succeeds, the client will still receive a bucket exists
|
// succeeds, the client will still receive a bucket exists
|
||||||
// message.
|
// message.
|
||||||
err2 := globalSiteReplicationSys.MakeBucketHook(ctx, bucket, opts)
|
globalSiteReplicationSys.MakeBucketHook(ctx, bucket, opts)
|
||||||
if err2 != nil {
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -827,8 +822,7 @@ func (api objectAPIHandlers) PutBucketHandler(w http.ResponseWriter, r *http.Req
|
|||||||
globalNotificationSys.LoadBucketMetadata(GlobalContext, bucket)
|
globalNotificationSys.LoadBucketMetadata(GlobalContext, bucket)
|
||||||
|
|
||||||
// Call site replication hook
|
// Call site replication hook
|
||||||
err = globalSiteReplicationSys.MakeBucketHook(ctx, bucket, opts)
|
if err := globalSiteReplicationSys.MakeBucketHook(ctx, bucket, opts); err != nil {
|
||||||
if err != nil {
|
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -149,7 +149,14 @@ type SRError struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c SRError) Error() string {
|
func (c SRError) Error() string {
|
||||||
return c.Cause.Error()
|
if c.Cause != nil {
|
||||||
|
return c.Cause.Error()
|
||||||
|
}
|
||||||
|
return "<nil>"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c SRError) Unwrap() error {
|
||||||
|
return c.Cause
|
||||||
}
|
}
|
||||||
|
|
||||||
func wrapSRErr(err error) SRError {
|
func wrapSRErr(err error) SRError {
|
||||||
@ -249,13 +256,13 @@ func (c *SiteReplicationSys) saveToDisk(ctx context.Context, state srState) erro
|
|||||||
if objAPI == nil {
|
if objAPI == nil {
|
||||||
return errServerNotInitialized
|
return errServerNotInitialized
|
||||||
}
|
}
|
||||||
err = saveConfig(ctx, objAPI, getSRStateFilePath(), buf)
|
|
||||||
if err != nil {
|
if err = saveConfig(ctx, objAPI, getSRStateFilePath(), buf); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, e := range globalNotificationSys.ReloadSiteReplicationConfig(ctx) {
|
for _, err := range globalNotificationSys.ReloadSiteReplicationConfig(ctx) {
|
||||||
logger.LogIf(ctx, e)
|
logger.LogIf(ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.Lock()
|
c.Lock()
|
||||||
@ -606,6 +613,14 @@ func (c *SiteReplicationSys) GetClusterInfo(ctx context.Context) (info madmin.Si
|
|||||||
return info, nil
|
return info, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
makeBucketWithVersion = "MakeBucketWithVersioning"
|
||||||
|
configureReplication = "ConfigureReplication"
|
||||||
|
deleteBucket = "DeleteBucket"
|
||||||
|
replicateIAMItem = "SRPeerReplicateIAMItem"
|
||||||
|
replicateBucketMetadata = "SRPeerReplicateBucketMeta"
|
||||||
|
)
|
||||||
|
|
||||||
// MakeBucketHook - called during a regular make bucket call when cluster
|
// MakeBucketHook - called during a regular make bucket call when cluster
|
||||||
// replication is enabled. It is responsible for the creation of the same bucket
|
// replication is enabled. It is responsible for the creation of the same bucket
|
||||||
// on remote clusters, and creating replication rules on local and peer
|
// on remote clusters, and creating replication rules on local and peer
|
||||||
@ -637,9 +652,7 @@ func (c *SiteReplicationSys) MakeBucketHook(ctx context.Context, bucket string,
|
|||||||
// Create bucket and enable versioning on all peers.
|
// Create bucket and enable versioning on all peers.
|
||||||
makeBucketConcErr := c.concDo(
|
makeBucketConcErr := c.concDo(
|
||||||
func() error {
|
func() error {
|
||||||
err := c.PeerBucketMakeWithVersioningHandler(ctx, bucket, opts)
|
return c.annotateErr(makeBucketWithVersion, c.PeerBucketMakeWithVersioningHandler(ctx, bucket, opts))
|
||||||
logger.LogIf(ctx, c.annotateErr("MakeWithVersioning", err))
|
|
||||||
return err
|
|
||||||
},
|
},
|
||||||
func(deploymentID string, p madmin.PeerInfo) error {
|
func(deploymentID string, p madmin.PeerInfo) error {
|
||||||
admClient, err := c.getAdminClient(ctx, deploymentID)
|
admClient, err := c.getAdminClient(ctx, deploymentID)
|
||||||
@ -647,27 +660,15 @@ func (c *SiteReplicationSys) MakeBucketHook(ctx context.Context, bucket string,
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = admClient.SRPeerBucketOps(ctx, bucket, madmin.MakeWithVersioningBktOp, optsMap)
|
return c.annotatePeerErr(p.Name, makeBucketWithVersion, admClient.SRPeerBucketOps(ctx, bucket, madmin.MakeWithVersioningBktOp, optsMap))
|
||||||
logger.LogIf(ctx, c.annotatePeerErr(p.Name, "MakeWithVersioning", err))
|
|
||||||
return err
|
|
||||||
},
|
},
|
||||||
|
makeBucketWithVersion,
|
||||||
)
|
)
|
||||||
// If all make-bucket-and-enable-versioning operations failed, nothing
|
|
||||||
// more to do.
|
|
||||||
if makeBucketConcErr.allFailed() {
|
|
||||||
return makeBucketConcErr
|
|
||||||
}
|
|
||||||
|
|
||||||
// Log any errors in make-bucket operations.
|
// Create bucket remotes and add replication rules for the bucket on self and peers.
|
||||||
logger.LogIf(ctx, makeBucketConcErr.summaryErr)
|
|
||||||
|
|
||||||
// Create bucket remotes and add replication rules for the bucket on
|
|
||||||
// self and peers.
|
|
||||||
makeRemotesConcErr := c.concDo(
|
makeRemotesConcErr := c.concDo(
|
||||||
func() error {
|
func() error {
|
||||||
err := c.PeerBucketConfigureReplHandler(ctx, bucket)
|
return c.annotateErr(configureReplication, c.PeerBucketConfigureReplHandler(ctx, bucket))
|
||||||
logger.LogIf(ctx, c.annotateErr("ConfigureRepl", err))
|
|
||||||
return err
|
|
||||||
},
|
},
|
||||||
func(deploymentID string, p madmin.PeerInfo) error {
|
func(deploymentID string, p madmin.PeerInfo) error {
|
||||||
admClient, err := c.getAdminClient(ctx, deploymentID)
|
admClient, err := c.getAdminClient(ctx, deploymentID)
|
||||||
@ -675,13 +676,16 @@ func (c *SiteReplicationSys) MakeBucketHook(ctx context.Context, bucket string,
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = admClient.SRPeerBucketOps(ctx, bucket, madmin.ConfigureReplBktOp, nil)
|
return c.annotatePeerErr(p.Name, configureReplication, admClient.SRPeerBucketOps(ctx, bucket, madmin.ConfigureReplBktOp, nil))
|
||||||
logger.LogIf(ctx, c.annotatePeerErr(p.Name, "ConfigureRepl", err))
|
|
||||||
return err
|
|
||||||
},
|
},
|
||||||
|
configureReplication,
|
||||||
)
|
)
|
||||||
err := makeRemotesConcErr.summaryErr
|
|
||||||
if err != nil {
|
if err := errors.Unwrap(makeBucketConcErr); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := errors.Unwrap(makeRemotesConcErr); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -706,17 +710,17 @@ func (c *SiteReplicationSys) DeleteBucketHook(ctx context.Context, bucket string
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Send bucket delete to other clusters.
|
// Send bucket delete to other clusters.
|
||||||
cErr := c.concDo(nil, func(deploymentID string, p madmin.PeerInfo) error {
|
cerr := c.concDo(nil, func(deploymentID string, p madmin.PeerInfo) error {
|
||||||
admClient, err := c.getAdminClient(ctx, deploymentID)
|
admClient, err := c.getAdminClient(ctx, deploymentID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return wrapSRErr(err)
|
return wrapSRErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = admClient.SRPeerBucketOps(ctx, bucket, op, nil)
|
return c.annotatePeerErr(p.Name, deleteBucket, admClient.SRPeerBucketOps(ctx, bucket, op, nil))
|
||||||
logger.LogIf(ctx, c.annotatePeerErr(p.Name, "DeleteBucket", err))
|
},
|
||||||
return err
|
deleteBucket,
|
||||||
})
|
)
|
||||||
return cErr.summaryErr
|
return errors.Unwrap(cerr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PeerBucketMakeWithVersioningHandler - creates bucket and enables versioning.
|
// PeerBucketMakeWithVersioningHandler - creates bucket and enables versioning.
|
||||||
@ -732,8 +736,7 @@ func (c *SiteReplicationSys) PeerBucketMakeWithVersioningHandler(ctx context.Con
|
|||||||
_, ok1 := err.(BucketExists)
|
_, ok1 := err.(BucketExists)
|
||||||
_, ok2 := err.(BucketAlreadyExists)
|
_, ok2 := err.(BucketAlreadyExists)
|
||||||
if !ok1 && !ok2 {
|
if !ok1 && !ok2 {
|
||||||
logger.LogIf(ctx, c.annotateErr("MakeBucketErr on peer call", err))
|
return wrapSRErr(c.annotateErr(makeBucketWithVersion, err))
|
||||||
return wrapSRErr(err)
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Load updated bucket metadata into memory as new
|
// Load updated bucket metadata into memory as new
|
||||||
@ -743,8 +746,7 @@ func (c *SiteReplicationSys) PeerBucketMakeWithVersioningHandler(ctx context.Con
|
|||||||
|
|
||||||
meta, err := globalBucketMetadataSys.Get(bucket)
|
meta, err := globalBucketMetadataSys.Get(bucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, c.annotateErr("MakeBucketErr on peer call", err))
|
return wrapSRErr(c.annotateErr(makeBucketWithVersion, err))
|
||||||
return wrapSRErr(err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
meta.VersioningConfigXML = enabledBucketVersioningConfig
|
meta.VersioningConfigXML = enabledBucketVersioningConfig
|
||||||
@ -805,8 +807,7 @@ func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context,
|
|||||||
bucketTarget.Arn = globalBucketTargetSys.getRemoteARN(bucket, &bucketTarget)
|
bucketTarget.Arn = globalBucketTargetSys.getRemoteARN(bucket, &bucketTarget)
|
||||||
err := globalBucketTargetSys.SetTarget(ctx, bucket, &bucketTarget, false)
|
err := globalBucketTargetSys.SetTarget(ctx, bucket, &bucketTarget, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, c.annotatePeerErr(peer.Name, "Bucket target creation error", err))
|
return c.annotatePeerErr(peer.Name, "Bucket target creation error", err)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
|
targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -901,11 +902,10 @@ func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context,
|
|||||||
default:
|
default:
|
||||||
err = replicationConfig.AddRule(opts)
|
err = replicationConfig.AddRule(opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, c.annotatePeerErr(peer.Name, "Error adding bucket replication rule", err))
|
return c.annotatePeerErr(peer.Name, "Error adding bucket replication rule", err)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now convert the configuration back to server's type so we can
|
// Now convert the configuration back to server's type so we can
|
||||||
// do some validation.
|
// do some validation.
|
||||||
newReplCfgBytes, err := xml.Marshal(replicationConfig)
|
newReplCfgBytes, err := xml.Marshal(replicationConfig)
|
||||||
@ -929,9 +929,9 @@ func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context,
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = globalBucketMetadataSys.Update(ctx, bucket, bucketReplicationConfig, replCfgData)
|
err = globalBucketMetadataSys.Update(ctx, bucket, bucketReplicationConfig, replCfgData)
|
||||||
logger.LogIf(ctx, c.annotatePeerErr(peer.Name, "Error updating replication configuration", err))
|
return c.annotatePeerErr(peer.Name, "Error updating replication configuration", err)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
c.RLock()
|
c.RLock()
|
||||||
@ -941,11 +941,9 @@ func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context,
|
|||||||
if d == globalDeploymentID {
|
if d == globalDeploymentID {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err := configurePeerFn(d, peer); err != nil {
|
errMap[d] = configurePeerFn(d, peer)
|
||||||
errMap[d] = err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return c.toErrorFromErrMap(errMap)
|
return c.toErrorFromErrMap(errMap, configureReplication)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PeerBucketDeleteHandler - deletes bucket on local in response to a delete
|
// PeerBucketDeleteHandler - deletes bucket on local in response to a delete
|
||||||
@ -1009,17 +1007,17 @@ func (c *SiteReplicationSys) IAMChangeHook(ctx context.Context, item madmin.SRIA
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
cErr := c.concDo(nil, func(d string, p madmin.PeerInfo) error {
|
cerr := c.concDo(nil, func(d string, p madmin.PeerInfo) error {
|
||||||
admClient, err := c.getAdminClient(ctx, d)
|
admClient, err := c.getAdminClient(ctx, d)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return wrapSRErr(err)
|
return wrapSRErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = admClient.SRPeerReplicateIAMItem(ctx, item)
|
return c.annotatePeerErr(p.Name, replicateIAMItem, admClient.SRPeerReplicateIAMItem(ctx, item))
|
||||||
logger.LogIf(ctx, c.annotatePeerErr(p.Name, "SRPeerReplicateIAMItem", err))
|
},
|
||||||
return err
|
replicateIAMItem,
|
||||||
})
|
)
|
||||||
return cErr.summaryErr
|
return errors.Unwrap(cerr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PeerAddPolicyHandler - copies IAM policy to local. A nil policy argument,
|
// PeerAddPolicyHandler - copies IAM policy to local. A nil policy argument,
|
||||||
@ -1165,14 +1163,13 @@ func (c *SiteReplicationSys) PeerSTSAccHandler(ctx context.Context, stsCred *mad
|
|||||||
// Verify the session token of the stsCred
|
// Verify the session token of the stsCred
|
||||||
claims, err := auth.ExtractClaims(stsCred.SessionToken, globalActiveCred.SecretKey)
|
claims, err := auth.ExtractClaims(stsCred.SessionToken, globalActiveCred.SecretKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, err)
|
return fmt.Errorf("STS credential could not be verified: %w", err)
|
||||||
return fmt.Errorf("STS credential could not be verified")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
mapClaims := claims.Map()
|
mapClaims := claims.Map()
|
||||||
expiry, err := auth.ExpToInt64(mapClaims["exp"])
|
expiry, err := auth.ExpToInt64(mapClaims["exp"])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Expiry claim was not found: %v", mapClaims)
|
return fmt.Errorf("Expiry claim was not found: %v: %w", mapClaims, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cred := auth.Credentials{
|
cred := auth.Credentials{
|
||||||
@ -1191,7 +1188,7 @@ func (c *SiteReplicationSys) PeerSTSAccHandler(ctx context.Context, stsCred *mad
|
|||||||
// Need to lookup the groups from LDAP.
|
// Need to lookup the groups from LDAP.
|
||||||
_, ldapGroups, err := globalLDAPConfig.LookupUserDN(ldapUser)
|
_, ldapGroups, err := globalLDAPConfig.LookupUserDN(ldapUser)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to query LDAP server for %s: %v", ldapUser, err)
|
return fmt.Errorf("unable to query LDAP server for %s: %w", ldapUser, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cred.Groups = ldapGroups
|
cred.Groups = ldapGroups
|
||||||
@ -1199,7 +1196,7 @@ func (c *SiteReplicationSys) PeerSTSAccHandler(ctx context.Context, stsCred *mad
|
|||||||
|
|
||||||
// Set these credentials to IAM.
|
// Set these credentials to IAM.
|
||||||
if err := globalIAMSys.SetTempUser(ctx, cred.AccessKey, cred, stsCred.ParentPolicyMapping); err != nil {
|
if err := globalIAMSys.SetTempUser(ctx, cred.AccessKey, cred, stsCred.ParentPolicyMapping); err != nil {
|
||||||
return fmt.Errorf("unable to save STS credential and/or parent policy mapping: %v", err)
|
return fmt.Errorf("unable to save STS credential and/or parent policy mapping: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -1217,17 +1214,17 @@ func (c *SiteReplicationSys) BucketMetaHook(ctx context.Context, item madmin.SRB
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
cErr := c.concDo(nil, func(d string, p madmin.PeerInfo) error {
|
cerr := c.concDo(nil, func(d string, p madmin.PeerInfo) error {
|
||||||
admClient, err := c.getAdminClient(ctx, d)
|
admClient, err := c.getAdminClient(ctx, d)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return wrapSRErr(err)
|
return wrapSRErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = admClient.SRPeerReplicateBucketMeta(ctx, item)
|
return c.annotatePeerErr(p.Name, replicateBucketMetadata, admClient.SRPeerReplicateBucketMeta(ctx, item))
|
||||||
logger.LogIf(ctx, c.annotatePeerErr(p.Name, "SRPeerReplicateBucketMeta", err))
|
},
|
||||||
return err
|
replicateBucketMetadata,
|
||||||
})
|
)
|
||||||
return cErr.summaryErr
|
return errors.Unwrap(cerr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PeerBucketVersioningHandler - updates versioning config to local cluster.
|
// PeerBucketVersioningHandler - updates versioning config to local cluster.
|
||||||
@ -1795,52 +1792,63 @@ func (c *SiteReplicationSys) syncToAllPeers(ctx context.Context) error {
|
|||||||
// Concurrency helpers
|
// Concurrency helpers
|
||||||
|
|
||||||
type concErr struct {
|
type concErr struct {
|
||||||
numActions int
|
|
||||||
errMap map[string]error
|
errMap map[string]error
|
||||||
summaryErr error
|
summaryErr error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c concErr) Error() string {
|
func (c concErr) Error() string {
|
||||||
return c.summaryErr.Error()
|
if c.summaryErr != nil {
|
||||||
|
return c.summaryErr.Error()
|
||||||
|
}
|
||||||
|
return "<nil>"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c concErr) allFailed() bool {
|
func (c concErr) Unwrap() error {
|
||||||
return len(c.errMap) == c.numActions
|
return c.summaryErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *SiteReplicationSys) toErrorFromErrMap(errMap map[string]error) error {
|
func (c *SiteReplicationSys) toErrorFromErrMap(errMap map[string]error, actionName string) error {
|
||||||
if len(errMap) == 0 {
|
if len(errMap) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var success int
|
||||||
msgs := []string{}
|
msgs := []string{}
|
||||||
for d, err := range errMap {
|
for d, err := range errMap {
|
||||||
name := c.state.Peers[d].Name
|
name := c.state.Peers[d].Name
|
||||||
msgs = append(msgs, fmt.Sprintf("Site %s (%s): %v", name, d, err))
|
if err == nil {
|
||||||
|
msgs = append(msgs, fmt.Sprintf("'%s' on site %s (%s): succeeded", actionName, name, d))
|
||||||
|
success++
|
||||||
|
} else {
|
||||||
|
msgs = append(msgs, fmt.Sprintf("'%s' on site %s (%s): failed(%v)", actionName, name, d, err))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return fmt.Errorf("Site replication error(s): %s", strings.Join(msgs, "; "))
|
if success == len(errMap) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("Site replication error(s): \n%s", strings.Join(msgs, "\n"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *SiteReplicationSys) newConcErr(numActions int, errMap map[string]error) concErr {
|
func (c *SiteReplicationSys) newConcErr(errMap map[string]error, actionName string) error {
|
||||||
return concErr{
|
return concErr{
|
||||||
numActions: numActions,
|
|
||||||
errMap: errMap,
|
errMap: errMap,
|
||||||
summaryErr: c.toErrorFromErrMap(errMap),
|
summaryErr: c.toErrorFromErrMap(errMap, actionName),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// concDo calls actions concurrently. selfActionFn is run for the current
|
// concDo calls actions concurrently. selfActionFn is run for the current
|
||||||
// cluster and peerActionFn is run for each peer replication cluster.
|
// cluster and peerActionFn is run for each peer replication cluster.
|
||||||
func (c *SiteReplicationSys) concDo(selfActionFn func() error, peerActionFn func(deploymentID string, p madmin.PeerInfo) error) concErr {
|
func (c *SiteReplicationSys) concDo(selfActionFn func() error, peerActionFn func(deploymentID string, p madmin.PeerInfo) error, actionName string) error {
|
||||||
depIDs := make([]string, 0, len(c.state.Peers))
|
depIDs := make([]string, 0, len(c.state.Peers))
|
||||||
for d := range c.state.Peers {
|
for d := range c.state.Peers {
|
||||||
depIDs = append(depIDs, d)
|
depIDs = append(depIDs, d)
|
||||||
}
|
}
|
||||||
errs := make([]error, len(c.state.Peers))
|
errs := make([]error, len(c.state.Peers))
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(len(depIDs))
|
||||||
for i := range depIDs {
|
for i := range depIDs {
|
||||||
wg.Add(1)
|
|
||||||
go func(i int) {
|
go func(i int) {
|
||||||
|
defer wg.Done()
|
||||||
if depIDs[i] == globalDeploymentID {
|
if depIDs[i] == globalDeploymentID {
|
||||||
if selfActionFn != nil {
|
if selfActionFn != nil {
|
||||||
errs[i] = selfActionFn()
|
errs[i] = selfActionFn()
|
||||||
@ -1848,35 +1856,28 @@ func (c *SiteReplicationSys) concDo(selfActionFn func() error, peerActionFn func
|
|||||||
} else {
|
} else {
|
||||||
errs[i] = peerActionFn(depIDs[i], c.state.Peers[depIDs[i]])
|
errs[i] = peerActionFn(depIDs[i], c.state.Peers[depIDs[i]])
|
||||||
}
|
}
|
||||||
wg.Done()
|
|
||||||
}(i)
|
}(i)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
errMap := make(map[string]error, len(c.state.Peers))
|
errMap := make(map[string]error, len(c.state.Peers))
|
||||||
for i, depID := range depIDs {
|
for i, depID := range depIDs {
|
||||||
if errs[i] != nil {
|
errMap[depID] = errs[i]
|
||||||
errMap[depID] = errs[i]
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
numActions := len(c.state.Peers) - 1
|
return c.newConcErr(errMap, actionName)
|
||||||
if selfActionFn != nil {
|
|
||||||
numActions++
|
|
||||||
}
|
|
||||||
return c.newConcErr(numActions, errMap)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *SiteReplicationSys) annotateErr(annotation string, err error) error {
|
func (c *SiteReplicationSys) annotateErr(annotation string, err error) error {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return fmt.Errorf("%s: %s: %v", c.state.Name, annotation, err)
|
return fmt.Errorf("%s: %s: %w", c.state.Name, annotation, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *SiteReplicationSys) annotatePeerErr(dstPeer string, annotation string, err error) error {
|
func (c *SiteReplicationSys) annotatePeerErr(dstPeer string, annotation string, err error) error {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return fmt.Errorf("%s->%s: %s: %v", c.state.Name, dstPeer, annotation, err)
|
return fmt.Errorf("%s->%s: %s: %w", c.state.Name, dstPeer, annotation, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// isEnabled returns true if site replication is enabled
|
// isEnabled returns true if site replication is enabled
|
||||||
@ -2230,6 +2231,10 @@ func (c *SiteReplicationSys) SiteReplicationStatus(ctx context.Context, objAPI O
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
replicationStatus = "ReplicationStatus"
|
||||||
|
)
|
||||||
|
|
||||||
// siteReplicationStatus returns the site replication status across clusters participating in site replication.
|
// siteReplicationStatus returns the site replication status across clusters participating in site replication.
|
||||||
func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI ObjectLayer, opts madmin.SRStatusOptions) (info srStatusInfo, err error) {
|
func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI ObjectLayer, opts madmin.SRStatusOptions) (info srStatusInfo, err error) {
|
||||||
c.RLock()
|
c.RLock()
|
||||||
@ -2269,10 +2274,11 @@ func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI O
|
|||||||
sris[depIdx[deploymentID]] = srInfo
|
sris[depIdx[deploymentID]] = srInfo
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
|
replicationStatus,
|
||||||
)
|
)
|
||||||
|
|
||||||
if metaInfoConcErr.summaryErr != nil {
|
if err := errors.Unwrap(metaInfoConcErr); err != nil {
|
||||||
return info, errSRBackendIssue(metaInfoConcErr.summaryErr)
|
return info, errSRBackendIssue(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
info.Enabled = true
|
info.Enabled = true
|
||||||
@ -3520,7 +3526,8 @@ func (c *SiteReplicationSys) healTagMetadata(ctx context.Context, objAPI ObjectL
|
|||||||
Tags: latestTaggingConfig,
|
Tags: latestTaggingConfig,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, c.annotatePeerErr(peerName, "SRPeerReplicateBucketMeta", fmt.Errorf("Error healing tagging metadata for peer %s from peer %s : %w", peerName, latestPeerName, err)))
|
logger.LogIf(ctx, c.annotatePeerErr(peerName, replicateBucketMetadata,
|
||||||
|
fmt.Errorf("Error healing tagging metadata for peer %s from peer %s : %w", peerName, latestPeerName, err)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -3582,8 +3589,9 @@ func (c *SiteReplicationSys) healBucketPolicies(ctx context.Context, objAPI Obje
|
|||||||
Bucket: bucket,
|
Bucket: bucket,
|
||||||
Policy: latestIAMPolicy,
|
Policy: latestIAMPolicy,
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
logger.LogIf(ctx, c.annotatePeerErr(peerName, "SRPeerReplicateBucketMeta", fmt.Errorf("Error healing bucket policy metadata for peer %s from peer %s : %w",
|
logger.LogIf(ctx, c.annotatePeerErr(peerName, replicateBucketMetadata,
|
||||||
peerName, latestPeerName, err)))
|
fmt.Errorf("Error healing bucket policy metadata for peer %s from peer %s : %w",
|
||||||
|
peerName, latestPeerName, err)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -3640,7 +3648,7 @@ func (c *SiteReplicationSys) healBucketQuotaConfig(ctx context.Context, objAPI O
|
|||||||
}
|
}
|
||||||
if dID == globalDeploymentID {
|
if dID == globalDeploymentID {
|
||||||
if err := globalBucketMetadataSys.Update(ctx, bucket, bucketQuotaConfigFile, latestQuotaConfigBytes); err != nil {
|
if err := globalBucketMetadataSys.Update(ctx, bucket, bucketQuotaConfigFile, latestQuotaConfigBytes); err != nil {
|
||||||
logger.LogIf(ctx, fmt.Errorf("Error healing quota metadata from peer site %s : %s", latestPeerName, err.Error()))
|
logger.LogIf(ctx, fmt.Errorf("Error healing quota metadata from peer site %s : %w", latestPeerName, err))
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -3656,8 +3664,9 @@ func (c *SiteReplicationSys) healBucketQuotaConfig(ctx context.Context, objAPI O
|
|||||||
Bucket: bucket,
|
Bucket: bucket,
|
||||||
Quota: latestQuotaConfigBytes,
|
Quota: latestQuotaConfigBytes,
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
logger.LogIf(ctx, c.annotatePeerErr(peerName, "SRPeerReplicateBucketMeta", fmt.Errorf("Error healing quota config metadata for peer %s from peer %s : %s",
|
logger.LogIf(ctx, c.annotatePeerErr(peerName, replicateBucketMetadata,
|
||||||
peerName, latestPeerName, err.Error())))
|
fmt.Errorf("Error healing quota config metadata for peer %s from peer %s : %w",
|
||||||
|
peerName, latestPeerName, err)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -3729,8 +3738,9 @@ func (c *SiteReplicationSys) healVersioningMetadata(ctx context.Context, objAPI
|
|||||||
Versioning: latestVersioningConfig,
|
Versioning: latestVersioningConfig,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, c.annotatePeerErr(peerName, "SRPeerReplicateBucketMeta", fmt.Errorf("Error healing versioning config metadata for peer %s from peer %s : %s",
|
logger.LogIf(ctx, c.annotatePeerErr(peerName, replicateBucketMetadata,
|
||||||
peerName, latestPeerName, err.Error())))
|
fmt.Errorf("Error healing versioning config metadata for peer %s from peer %s : %w",
|
||||||
|
peerName, latestPeerName, err)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -3802,8 +3812,9 @@ func (c *SiteReplicationSys) healSSEMetadata(ctx context.Context, objAPI ObjectL
|
|||||||
SSEConfig: latestSSEConfig,
|
SSEConfig: latestSSEConfig,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, c.annotatePeerErr(peerName, "SRPeerReplicateBucketMeta", fmt.Errorf("Error healing SSE config metadata for peer %s from peer %s : %s",
|
logger.LogIf(ctx, c.annotatePeerErr(peerName, replicateBucketMetadata,
|
||||||
peerName, latestPeerName, err.Error())))
|
fmt.Errorf("Error healing SSE config metadata for peer %s from peer %s : %w",
|
||||||
|
peerName, latestPeerName, err)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -3875,8 +3886,9 @@ func (c *SiteReplicationSys) healOLockConfigMetadata(ctx context.Context, objAPI
|
|||||||
Tags: latestObjLockConfig,
|
Tags: latestObjLockConfig,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, c.annotatePeerErr(peerName, "SRPeerReplicateBucketMeta", fmt.Errorf("Error healing object lock config metadata for peer %s from peer %s : %w",
|
logger.LogIf(ctx, c.annotatePeerErr(peerName, replicateBucketMetadata,
|
||||||
peerName, latestPeerName, err)))
|
fmt.Errorf("Error healing object lock config metadata for peer %s from peer %s : %w",
|
||||||
|
peerName, latestPeerName, err)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -3939,25 +3951,19 @@ func (c *SiteReplicationSys) healCreateMissingBucket(ctx context.Context, objAPI
|
|||||||
if dID == globalDeploymentID {
|
if dID == globalDeploymentID {
|
||||||
err := c.PeerBucketMakeWithVersioningHandler(ctx, bucket, opts)
|
err := c.PeerBucketMakeWithVersioningHandler(ctx, bucket, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, c.annotateErr("MakeWithVersioning", fmt.Errorf("error healing bucket for site replication %w from %s -> %s",
|
return c.annotateErr(makeBucketWithVersion, fmt.Errorf("error healing bucket for site replication %w from %s -> %s",
|
||||||
err, latestPeerName, peerName)))
|
err, latestPeerName, peerName))
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
admClient, err := c.getAdminClient(ctx, dID)
|
admClient, err := c.getAdminClient(ctx, dID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, c.annotateErr("ConfigureRepl", fmt.Errorf("admin client not found: %w", err)))
|
return c.annotateErr(configureReplication, fmt.Errorf("unable to use admin client for %s: %w", dID, err))
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
err = admClient.SRPeerBucketOps(ctx, bucket, madmin.MakeWithVersioningBktOp, optsMap)
|
if err = admClient.SRPeerBucketOps(ctx, bucket, madmin.MakeWithVersioningBktOp, optsMap); err != nil {
|
||||||
logger.LogIf(ctx, c.annotatePeerErr(peerName, "MakeWithVersioning", err))
|
return c.annotatePeerErr(peerName, makeBucketWithVersion, err)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
err = admClient.SRPeerBucketOps(ctx, bucket, madmin.ConfigureReplBktOp, nil)
|
if err = admClient.SRPeerBucketOps(ctx, bucket, madmin.ConfigureReplBktOp, nil); err != nil {
|
||||||
logger.LogIf(ctx, c.annotatePeerErr(peerName, "ConfigureRepl", err))
|
return c.annotatePeerErr(peerName, configureReplication, err)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -3965,8 +3971,7 @@ func (c *SiteReplicationSys) healCreateMissingBucket(ctx context.Context, objAPI
|
|||||||
// configure replication from current cluster to other clusters
|
// configure replication from current cluster to other clusters
|
||||||
err := c.PeerBucketConfigureReplHandler(ctx, bucket)
|
err := c.PeerBucketConfigureReplHandler(ctx, bucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, c.annotateErr("ConfigureRepl", err))
|
return c.annotateErr(configureReplication, err)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4007,10 +4012,7 @@ func (c *SiteReplicationSys) healBucketReplicationConfig(ctx context.Context, ob
|
|||||||
}
|
}
|
||||||
|
|
||||||
if replMismatch {
|
if replMismatch {
|
||||||
err := c.PeerBucketConfigureReplHandler(ctx, bucket)
|
logger.LogIf(ctx, c.annotateErr(configureReplication, c.PeerBucketConfigureReplHandler(ctx, bucket)))
|
||||||
if err != nil {
|
|
||||||
logger.LogIf(ctx, c.annotateErr("ConfigureRepl", err))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user