mirror of
https://github.com/minio/minio.git
synced 2025-11-07 12:52:58 -05:00
Fix globalDeploymentID race (#18275)
globalDeploymentID was being read while it was being set.
Fixes race:
```
WARNING: DATA RACE
Write at 0x0000079605a0 by main goroutine:
github.com/minio/minio/cmd.connectLoadInitFormats()
github.com/minio/minio/cmd/prepare-storage.go:269 +0x14f0
github.com/minio/minio/cmd.waitForFormatErasure()
github.com/minio/minio/cmd/prepare-storage.go:294 +0x21d
...
Previous read at 0x0000079605a0 by goroutine 105:
github.com/minio/minio/cmd.newContext()
github.com/minio/minio/cmd/utils.go:817 +0x31e
github.com/minio/minio/cmd.adminMiddleware.func1()
github.com/minio/minio/cmd/admin-router.go:110 +0x96
net/http.HandlerFunc.ServeHTTP()
net/http/server.go:2136 +0x47
github.com/minio/minio/cmd.setBucketForwardingMiddleware.func1()
github.com/minio/minio/cmd/generic-handlers.go:460 +0xb1a
net/http.HandlerFunc.ServeHTTP()
net/http/server.go:2136 +0x47
...
```
This commit is contained in:
@@ -365,7 +365,7 @@ func (c *SiteReplicationSys) getSiteStatuses(ctx context.Context, sites ...madmi
|
||||
PeerSite: v,
|
||||
DeploymentID: info.DeploymentID,
|
||||
Empty: len(buckets) == 0,
|
||||
self: info.DeploymentID == globalDeploymentID,
|
||||
self: info.DeploymentID == globalDeploymentID(),
|
||||
})
|
||||
}
|
||||
return
|
||||
@@ -410,7 +410,7 @@ func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, psites []madmi
|
||||
}
|
||||
}
|
||||
if selfIdx == -1 {
|
||||
return madmin.ReplicateAddStatus{}, errSRBackendIssue(fmt.Errorf("global deployment ID %s mismatch, expected one of %s", globalDeploymentID, deploymentIDsSet))
|
||||
return madmin.ReplicateAddStatus{}, errSRBackendIssue(fmt.Errorf("global deployment ID %s mismatch, expected one of %s", globalDeploymentID(), deploymentIDsSet))
|
||||
}
|
||||
if !currDeploymentIDsSet.IsEmpty() {
|
||||
// If current cluster is already SR enabled and no new site being added ,fail.
|
||||
@@ -569,7 +569,7 @@ func (c *SiteReplicationSys) AddPeerClusters(ctx context.Context, psites []madmi
|
||||
func (c *SiteReplicationSys) PeerJoinReq(ctx context.Context, arg madmin.SRPeerJoinReq) error {
|
||||
var ourName string
|
||||
for d, p := range arg.Peers {
|
||||
if d == globalDeploymentID {
|
||||
if d == globalDeploymentID() {
|
||||
ourName = p.Name
|
||||
break
|
||||
}
|
||||
@@ -658,7 +658,7 @@ func (c *SiteReplicationSys) Netperf(ctx context.Context, duration time.Duration
|
||||
for _, info := range infos.Sites {
|
||||
info := info
|
||||
// will call siteNetperf, means call others's adminAPISiteReplicationDevNull
|
||||
if globalDeploymentID == info.DeploymentID {
|
||||
if globalDeploymentID() == info.DeploymentID {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
@@ -1090,7 +1090,7 @@ func (c *SiteReplicationSys) PeerBucketConfigureReplHandler(ctx context.Context,
|
||||
defer c.RUnlock()
|
||||
errMap := make(map[string]error, len(c.state.Peers))
|
||||
for d, peer := range c.state.Peers {
|
||||
if d == globalDeploymentID {
|
||||
if d == globalDeploymentID() {
|
||||
continue
|
||||
}
|
||||
errMap[d] = configurePeerFn(d, peer)
|
||||
@@ -2112,7 +2112,7 @@ func (c *SiteReplicationSys) concDo(selfActionFn func() error, peerActionFn func
|
||||
for i := range depIDs {
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
if depIDs[i] == globalDeploymentID {
|
||||
if depIDs[i] == globalDeploymentID() {
|
||||
if selfActionFn != nil {
|
||||
errs[i] = selfActionFn()
|
||||
}
|
||||
@@ -2193,11 +2193,11 @@ func (c *SiteReplicationSys) RemovePeerCluster(ctx context.Context, objectAPI Ob
|
||||
|
||||
for _, v := range info.Sites {
|
||||
wg.Add(1)
|
||||
if v.DeploymentID == globalDeploymentID {
|
||||
if v.DeploymentID == globalDeploymentID() {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
err := c.RemoveRemoteTargetsForEndpoint(ctx, objectAPI, rmvEndpoints, false)
|
||||
errs[globalDeploymentID] = err
|
||||
errs[globalDeploymentID()] = err
|
||||
}()
|
||||
continue
|
||||
}
|
||||
@@ -2209,7 +2209,7 @@ func (c *SiteReplicationSys) RemovePeerCluster(ctx context.Context, objectAPI Ob
|
||||
return
|
||||
}
|
||||
// set the requesting site's deploymentID for verification of peer request
|
||||
rreq.RequestingDepID = globalDeploymentID
|
||||
rreq.RequestingDepID = globalDeploymentID()
|
||||
if _, err = admClient.SRPeerRemove(ctx, rreq); err != nil {
|
||||
if errors.Is(err, errMissingSRConfig) {
|
||||
// ignore if peer is already removed.
|
||||
@@ -2223,7 +2223,7 @@ func (c *SiteReplicationSys) RemovePeerCluster(ctx context.Context, objectAPI Ob
|
||||
wg.Wait()
|
||||
|
||||
errdID := ""
|
||||
selfTgtsDeleted := errs[globalDeploymentID] == nil // true if all remote targets and replication config cleared successfully on local cluster
|
||||
selfTgtsDeleted := errs[globalDeploymentID()] == nil // true if all remote targets and replication config cleared successfully on local cluster
|
||||
|
||||
for dID, err := range errs {
|
||||
if err != nil {
|
||||
@@ -2311,7 +2311,7 @@ func (c *SiteReplicationSys) InternalRemoveReq(ctx context.Context, objectAPI Ob
|
||||
|
||||
for _, p := range c.state.Peers {
|
||||
peerMap[p.Name] = p
|
||||
if p.DeploymentID == globalDeploymentID {
|
||||
if p.DeploymentID == globalDeploymentID() {
|
||||
ourName = p.Name
|
||||
}
|
||||
updatedPeers[p.DeploymentID] = p
|
||||
@@ -2327,7 +2327,7 @@ func (c *SiteReplicationSys) InternalRemoveReq(ctx context.Context, objectAPI Ob
|
||||
if !ok {
|
||||
return errMissingSRConfig
|
||||
}
|
||||
if info.DeploymentID == globalDeploymentID {
|
||||
if info.DeploymentID == globalDeploymentID() {
|
||||
unlinkSelf = true
|
||||
continue
|
||||
}
|
||||
@@ -2608,7 +2608,7 @@ func (c *SiteReplicationSys) siteReplicationStatus(ctx context.Context, objAPI O
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sris[depIdx[globalDeploymentID]] = srInfo
|
||||
sris[depIdx[globalDeploymentID()]] = srInfo
|
||||
return nil
|
||||
},
|
||||
func(deploymentID string, p madmin.PeerInfo) error {
|
||||
@@ -3354,7 +3354,7 @@ func (c *SiteReplicationSys) SiteReplicationMetaInfo(ctx context.Context, objAPI
|
||||
if !c.enabled {
|
||||
return info, nil
|
||||
}
|
||||
info.DeploymentID = globalDeploymentID
|
||||
info.DeploymentID = globalDeploymentID()
|
||||
if opts.Buckets || opts.Entity == madmin.SRBucketEntity {
|
||||
var (
|
||||
buckets []BucketInfo
|
||||
@@ -3611,7 +3611,7 @@ func (c *SiteReplicationSys) EditPeerCluster(ctx context.Context, peer madmin.Pe
|
||||
admClient *madmin.AdminClient
|
||||
)
|
||||
|
||||
if globalDeploymentID == peer.DeploymentID && !peer.SyncState.Empty() && !peer.DefaultBandwidth.IsSet {
|
||||
if globalDeploymentID() == peer.DeploymentID && !peer.SyncState.Empty() && !peer.DefaultBandwidth.IsSet {
|
||||
return madmin.ReplicateEditStatus{}, errSRInvalidRequest(fmt.Errorf("a peer cluster, rather than the local cluster (endpoint=%s, deployment-id=%s) needs to be specified while setting a 'sync' replication mode", peer.Endpoint, peer.DeploymentID))
|
||||
}
|
||||
|
||||
@@ -3665,7 +3665,7 @@ func (c *SiteReplicationSys) EditPeerCluster(ctx context.Context, peer madmin.Pe
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for dID, v := range state.Peers {
|
||||
if v.DeploymentID == globalDeploymentID {
|
||||
if v.DeploymentID == globalDeploymentID() {
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
@@ -3792,7 +3792,7 @@ func (c *SiteReplicationSys) PeerEditReq(ctx context.Context, arg madmin.PeerInf
|
||||
}
|
||||
c.state.Peers[arg.DeploymentID] = p
|
||||
}
|
||||
if p.DeploymentID == globalDeploymentID {
|
||||
if p.DeploymentID == globalDeploymentID() {
|
||||
ourName = p.Name
|
||||
}
|
||||
}
|
||||
@@ -3993,7 +3993,7 @@ func (c *SiteReplicationSys) healTagMetadata(ctx context.Context, objAPI ObjectL
|
||||
if isBucketMetadataEqual(latestTaggingConfig, bStatus.meta.Tags) {
|
||||
continue
|
||||
}
|
||||
if dID == globalDeploymentID {
|
||||
if dID == globalDeploymentID() {
|
||||
if _, err := globalBucketMetadataSys.Update(ctx, bucket, bucketTaggingConfig, latestTaggingConfigBytes); err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to heal tagging metadata from peer site %s : %w", latestPeerName, err))
|
||||
}
|
||||
@@ -4057,7 +4057,7 @@ func (c *SiteReplicationSys) healBucketPolicies(ctx context.Context, objAPI Obje
|
||||
if strings.EqualFold(string(latestIAMPolicy), string(bStatus.meta.Policy)) {
|
||||
continue
|
||||
}
|
||||
if dID == globalDeploymentID {
|
||||
if dID == globalDeploymentID() {
|
||||
if _, err := globalBucketMetadataSys.Update(ctx, bucket, bucketPolicyConfig, latestIAMPolicy); err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to heal bucket policy metadata from peer site %s : %w", latestPeerName, err))
|
||||
}
|
||||
@@ -4132,7 +4132,7 @@ func (c *SiteReplicationSys) healBucketQuotaConfig(ctx context.Context, objAPI O
|
||||
if isBucketMetadataEqual(latestQuotaConfig, bStatus.meta.QuotaConfig) {
|
||||
continue
|
||||
}
|
||||
if dID == globalDeploymentID {
|
||||
if dID == globalDeploymentID() {
|
||||
if _, err := globalBucketMetadataSys.Update(ctx, bucket, bucketQuotaConfigFile, latestQuotaConfigBytes); err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to heal quota metadata from peer site %s : %w", latestPeerName, err))
|
||||
}
|
||||
@@ -4207,7 +4207,7 @@ func (c *SiteReplicationSys) healVersioningMetadata(ctx context.Context, objAPI
|
||||
if isBucketMetadataEqual(latestVersioningConfig, bStatus.meta.Versioning) {
|
||||
continue
|
||||
}
|
||||
if dID == globalDeploymentID {
|
||||
if dID == globalDeploymentID() {
|
||||
if _, err := globalBucketMetadataSys.Update(ctx, bucket, bucketVersioningConfig, latestVersioningConfigBytes); err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to heal versioning metadata from peer site %s : %w", latestPeerName, err))
|
||||
}
|
||||
@@ -4282,7 +4282,7 @@ func (c *SiteReplicationSys) healSSEMetadata(ctx context.Context, objAPI ObjectL
|
||||
if isBucketMetadataEqual(latestSSEConfig, bStatus.meta.SSEConfig) {
|
||||
continue
|
||||
}
|
||||
if dID == globalDeploymentID {
|
||||
if dID == globalDeploymentID() {
|
||||
if _, err := globalBucketMetadataSys.Update(ctx, bucket, bucketSSEConfig, latestSSEConfigBytes); err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to heal sse metadata from peer site %s : %w", latestPeerName, err))
|
||||
}
|
||||
@@ -4357,7 +4357,7 @@ func (c *SiteReplicationSys) healOLockConfigMetadata(ctx context.Context, objAPI
|
||||
if isBucketMetadataEqual(latestObjLockConfig, bStatus.meta.ObjectLockConfig) {
|
||||
continue
|
||||
}
|
||||
if dID == globalDeploymentID {
|
||||
if dID == globalDeploymentID() {
|
||||
if _, err := globalBucketMetadataSys.Update(ctx, bucket, objectLockConfig, latestObjLockConfigBytes); err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable to heal objectlock config metadata from peer site %s : %w", latestPeerName, err))
|
||||
}
|
||||
@@ -4445,7 +4445,7 @@ func (c *SiteReplicationSys) healBucket(ctx context.Context, objAPI ObjectLayer,
|
||||
bStatus := info.BucketStats[bucket][latestID].meta
|
||||
isMakeBucket := len(missingB) > 0
|
||||
deleteOp := NoOp
|
||||
if latestID != globalDeploymentID {
|
||||
if latestID != globalDeploymentID() {
|
||||
return nil
|
||||
}
|
||||
if lastUpdate.Equal(bStatus.DeletedAt) {
|
||||
@@ -4479,7 +4479,7 @@ func (c *SiteReplicationSys) healBucket(ctx context.Context, objAPI ObjectLayer,
|
||||
}
|
||||
for _, dID := range missingB {
|
||||
peerName := info.Sites[dID].Name
|
||||
if dID == globalDeploymentID {
|
||||
if dID == globalDeploymentID() {
|
||||
err := c.PeerBucketMakeWithVersioningHandler(ctx, bucket, opts)
|
||||
if err != nil {
|
||||
return c.annotateErr(makeBucketWithVersion, fmt.Errorf("error healing bucket for site replication %w from %s -> %s",
|
||||
@@ -4512,7 +4512,7 @@ func (c *SiteReplicationSys) healBucket(ctx context.Context, objAPI ObjectLayer,
|
||||
if deleteOp == Purge {
|
||||
for _, dID := range missingB {
|
||||
peerName := info.Sites[dID].Name
|
||||
if dID == globalDeploymentID {
|
||||
if dID == globalDeploymentID() {
|
||||
c.purgeDeletedBucket(ctx, objAPI, bucket)
|
||||
} else {
|
||||
admClient, err := c.getAdminClient(ctx, dID)
|
||||
@@ -4529,7 +4529,7 @@ func (c *SiteReplicationSys) healBucket(ctx context.Context, objAPI ObjectLayer,
|
||||
if deleteOp == MarkDelete {
|
||||
for _, dID := range withB {
|
||||
peerName := info.Sites[dID].Name
|
||||
if dID == globalDeploymentID {
|
||||
if dID == globalDeploymentID() {
|
||||
err := c.PeerBucketDeleteHandler(ctx, bucket, DeleteBucketOptions{
|
||||
Force: true,
|
||||
})
|
||||
@@ -4685,14 +4685,14 @@ func (c *SiteReplicationSys) healPolicies(ctx context.Context, objAPI ObjectLaye
|
||||
latestPolicyStat = ss
|
||||
}
|
||||
}
|
||||
if latestID != globalDeploymentID {
|
||||
if latestID != globalDeploymentID() {
|
||||
// heal only from the site with latest info.
|
||||
return nil
|
||||
}
|
||||
latestPeerName = info.Sites[latestID].Name
|
||||
// heal policy of peers if peer does not have it.
|
||||
for dID, pStatus := range ps {
|
||||
if dID == globalDeploymentID {
|
||||
if dID == globalDeploymentID() {
|
||||
continue
|
||||
}
|
||||
if !pStatus.PolicyMismatch && pStatus.HasPolicy {
|
||||
@@ -4739,14 +4739,14 @@ func (c *SiteReplicationSys) healUserPolicies(ctx context.Context, objAPI Object
|
||||
latestUserStat = ss
|
||||
}
|
||||
}
|
||||
if latestID != globalDeploymentID {
|
||||
if latestID != globalDeploymentID() {
|
||||
// heal only from the site with latest info.
|
||||
return nil
|
||||
}
|
||||
latestPeerName = info.Sites[latestID].Name
|
||||
// heal policy of peers if peer does not have it.
|
||||
for dID, pStatus := range us {
|
||||
if dID == globalDeploymentID {
|
||||
if dID == globalDeploymentID() {
|
||||
continue
|
||||
}
|
||||
if !pStatus.PolicyMismatch && pStatus.HasPolicyMapping {
|
||||
@@ -4800,14 +4800,14 @@ func (c *SiteReplicationSys) healGroupPolicies(ctx context.Context, objAPI Objec
|
||||
latestGroupStat = ss
|
||||
}
|
||||
}
|
||||
if latestID != globalDeploymentID {
|
||||
if latestID != globalDeploymentID() {
|
||||
// heal only from the site with latest info.
|
||||
return nil
|
||||
}
|
||||
latestPeerName = info.Sites[latestID].Name
|
||||
// heal policy of peers if peer does not have it.
|
||||
for dID, pStatus := range gs {
|
||||
if dID == globalDeploymentID {
|
||||
if dID == globalDeploymentID() {
|
||||
continue
|
||||
}
|
||||
if !pStatus.PolicyMismatch && pStatus.HasPolicyMapping {
|
||||
@@ -4862,13 +4862,13 @@ func (c *SiteReplicationSys) healUsers(ctx context.Context, objAPI ObjectLayer,
|
||||
latestUserStat = ss
|
||||
}
|
||||
}
|
||||
if latestID != globalDeploymentID {
|
||||
if latestID != globalDeploymentID() {
|
||||
// heal only from the site with latest info.
|
||||
return nil
|
||||
}
|
||||
latestPeerName = info.Sites[latestID].Name
|
||||
for dID, uStatus := range us {
|
||||
if dID == globalDeploymentID {
|
||||
if dID == globalDeploymentID() {
|
||||
continue
|
||||
}
|
||||
if !uStatus.UserInfoMismatch {
|
||||
@@ -5008,13 +5008,13 @@ func (c *SiteReplicationSys) healGroups(ctx context.Context, objAPI ObjectLayer,
|
||||
latestGroupStat = ss
|
||||
}
|
||||
}
|
||||
if latestID != globalDeploymentID {
|
||||
if latestID != globalDeploymentID() {
|
||||
// heal only from the site with latest info.
|
||||
return nil
|
||||
}
|
||||
latestPeerName = info.Sites[latestID].Name
|
||||
for dID, gStatus := range gs {
|
||||
if dID == globalDeploymentID {
|
||||
if dID == globalDeploymentID() {
|
||||
continue
|
||||
}
|
||||
if !gStatus.GroupDescMismatch {
|
||||
@@ -5113,7 +5113,7 @@ func (c *SiteReplicationSys) getPeerForUpload(deplID string) (pi srPeerInfo, loc
|
||||
PeerInfo: site,
|
||||
EndpointURL: ep,
|
||||
}
|
||||
return pi, site.DeploymentID == globalDeploymentID
|
||||
return pi, site.DeploymentID == globalDeploymentID()
|
||||
}
|
||||
}
|
||||
return pi, true
|
||||
@@ -5130,7 +5130,7 @@ func (c *SiteReplicationSys) startResync(ctx context.Context, objAPI ObjectLayer
|
||||
return res, errSRObjectLayerNotReady
|
||||
}
|
||||
|
||||
if peer.DeploymentID == globalDeploymentID {
|
||||
if peer.DeploymentID == globalDeploymentID() {
|
||||
return res, errSRResyncToSelf
|
||||
}
|
||||
if _, ok := c.state.Peers[peer.DeploymentID]; !ok {
|
||||
@@ -5244,7 +5244,7 @@ func (c *SiteReplicationSys) cancelResync(ctx context.Context, objAPI ObjectLaye
|
||||
if objAPI == nil {
|
||||
return res, errSRObjectLayerNotReady
|
||||
}
|
||||
if peer.DeploymentID == globalDeploymentID {
|
||||
if peer.DeploymentID == globalDeploymentID() {
|
||||
return res, errSRResyncToSelf
|
||||
}
|
||||
if _, ok := c.state.Peers[peer.DeploymentID]; !ok {
|
||||
|
||||
Reference in New Issue
Block a user