mirror of
https://github.com/minio/minio.git
synced 2024-12-23 21:55:53 -05:00
fix: support existing folders in single drive mode (#13254)
This PR however also proceeds to simplify the loading of various subsystems such as - globalNotificationSys - globalTargetSys converge them directly into single bucket metadata sys loader, once that is loaded automatically every other target should be loaded and configured properly. fixes #13252
This commit is contained in:
parent
a0d0c8e4af
commit
4d84f0f6f0
@ -1380,6 +1380,12 @@ func (api objectAPIHandlers) PutBucketTaggingHandler(w http.ResponseWriter, r *h
|
||||
return
|
||||
}
|
||||
|
||||
// Check if bucket exists.
|
||||
if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
if s3Error := checkRequestAuthType(ctx, r, policy.PutBucketTaggingAction, bucket, ""); s3Error != ErrNone {
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
|
||||
return
|
||||
|
@ -80,46 +80,8 @@ func (sys *BucketMetadataSys) Update(bucket string, configFile string, configDat
|
||||
return errServerNotInitialized
|
||||
}
|
||||
|
||||
if globalIsGateway {
|
||||
// This code is needed only for gateway implementations.
|
||||
switch configFile {
|
||||
case bucketSSEConfig:
|
||||
if globalGatewayName == NASBackendGateway {
|
||||
meta, err := loadBucketMetadata(GlobalContext, objAPI, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
meta.EncryptionConfigXML = configData
|
||||
return meta.Save(GlobalContext, objAPI)
|
||||
}
|
||||
case bucketLifecycleConfig:
|
||||
if globalGatewayName == NASBackendGateway {
|
||||
meta, err := loadBucketMetadata(GlobalContext, objAPI, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
meta.LifecycleConfigXML = configData
|
||||
return meta.Save(GlobalContext, objAPI)
|
||||
}
|
||||
case bucketTaggingConfig:
|
||||
if globalGatewayName == NASBackendGateway {
|
||||
meta, err := loadBucketMetadata(GlobalContext, objAPI, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
meta.TaggingConfigXML = configData
|
||||
return meta.Save(GlobalContext, objAPI)
|
||||
}
|
||||
case bucketNotificationConfig:
|
||||
if globalGatewayName == NASBackendGateway {
|
||||
meta, err := loadBucketMetadata(GlobalContext, objAPI, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
meta.NotificationConfigXML = configData
|
||||
return meta.Save(GlobalContext, objAPI)
|
||||
}
|
||||
case bucketPolicyConfig:
|
||||
if globalIsGateway && globalGatewayName != NASBackendGateway {
|
||||
if configFile == bucketPolicyConfig {
|
||||
if configData == nil {
|
||||
return objAPI.DeleteBucketPolicy(GlobalContext, bucket)
|
||||
}
|
||||
@ -138,7 +100,12 @@ func (sys *BucketMetadataSys) Update(bucket string, configFile string, configDat
|
||||
|
||||
meta, err := loadBucketMetadata(GlobalContext, objAPI, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
if !globalIsErasure && !globalIsDistErasure && errors.Is(err, errVolumeNotFound) {
|
||||
// Only single drive mode needs this fallback.
|
||||
meta = newBucketMetadata(bucket)
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
switch configFile {
|
||||
@ -447,9 +414,9 @@ func (sys *BucketMetadataSys) Init(ctx context.Context, buckets []BucketInfo, ob
|
||||
return errServerNotInitialized
|
||||
}
|
||||
|
||||
// In gateway mode, we don't need to load the policies
|
||||
// from the backend.
|
||||
if globalIsGateway {
|
||||
// In gateway mode, we don't need to load bucket metadata except
|
||||
// NAS gateway backend.
|
||||
if globalIsGateway && !objAPI.IsNotificationSupported() {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -470,11 +437,20 @@ func (sys *BucketMetadataSys) concurrentLoad(ctx context.Context, buckets []Buck
|
||||
})
|
||||
meta, err := loadBucketMetadata(ctx, objAPI, buckets[index].Name)
|
||||
if err != nil {
|
||||
return err
|
||||
if !globalIsErasure && !globalIsDistErasure && errors.Is(err, errVolumeNotFound) {
|
||||
meta = newBucketMetadata(buckets[index].Name)
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
sys.Lock()
|
||||
sys.metadataMap[buckets[index].Name] = meta
|
||||
sys.Unlock()
|
||||
|
||||
globalNotificationSys.set(buckets[index], meta) // set notification targets
|
||||
|
||||
globalBucketTargetSys.set(buckets[index], meta) // set remote replication targets
|
||||
|
||||
return nil
|
||||
}, index)
|
||||
}
|
||||
|
@ -272,22 +272,6 @@ func NewBucketTargetSys() *BucketTargetSys {
|
||||
}
|
||||
}
|
||||
|
||||
// Init initializes the bucket targets subsystem for buckets which have targets configured.
|
||||
func (sys *BucketTargetSys) Init(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error {
|
||||
if objAPI == nil {
|
||||
return errServerNotInitialized
|
||||
}
|
||||
|
||||
// In gateway mode, bucket targets is not supported.
|
||||
if globalIsGateway {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Load bucket targets once during boot in background.
|
||||
go sys.load(ctx, buckets, objAPI)
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateAllTargets updates target to reflect metadata updates
|
||||
func (sys *BucketTargetSys) UpdateAllTargets(bucket string, tgts *madmin.BucketTargets) {
|
||||
if sys == nil {
|
||||
@ -325,30 +309,26 @@ func (sys *BucketTargetSys) UpdateAllTargets(bucket string, tgts *madmin.BucketT
|
||||
}
|
||||
|
||||
// create minio-go clients for buckets having remote targets
|
||||
func (sys *BucketTargetSys) load(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) {
|
||||
for _, bucket := range buckets {
|
||||
cfg, err := globalBucketMetadataSys.GetBucketTargetsConfig(bucket.Name)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
}
|
||||
if cfg == nil || cfg.Empty() {
|
||||
continue
|
||||
}
|
||||
if len(cfg.Targets) > 0 {
|
||||
sys.targetsMap[bucket.Name] = cfg.Targets
|
||||
}
|
||||
for _, tgt := range cfg.Targets {
|
||||
tgtClient, err := sys.getRemoteTargetClient(&tgt)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
}
|
||||
sys.arnRemotesMap[tgt.Arn] = tgtClient
|
||||
sys.updateBandwidthLimit(bucket.Name, tgt.BandwidthLimit)
|
||||
}
|
||||
func (sys *BucketTargetSys) set(bucket BucketInfo, meta BucketMetadata) {
|
||||
cfg := meta.bucketTargetConfig
|
||||
if cfg == nil || cfg.Empty() {
|
||||
return
|
||||
}
|
||||
sys.Lock()
|
||||
defer sys.Unlock()
|
||||
if len(cfg.Targets) > 0 {
|
||||
sys.targetsMap[bucket.Name] = cfg.Targets
|
||||
}
|
||||
for _, tgt := range cfg.Targets {
|
||||
tgtClient, err := sys.getRemoteTargetClient(&tgt)
|
||||
if err != nil {
|
||||
logger.LogIf(GlobalContext, err)
|
||||
continue
|
||||
}
|
||||
sys.arnRemotesMap[tgt.Arn] = tgtClient
|
||||
sys.updateBandwidthLimit(bucket.Name, tgt.BandwidthLimit)
|
||||
}
|
||||
sys.targetsMap[bucket.Name] = cfg.Targets
|
||||
}
|
||||
|
||||
// getRemoteTargetInstanceTransport contains a singleton roundtripper.
|
||||
|
@ -536,6 +536,10 @@ func (sys *NotificationSys) GetLocks(ctx context.Context, r *http.Request) []*Pe
|
||||
|
||||
// LoadBucketMetadata - calls LoadBucketMetadata call on all peers
|
||||
func (sys *NotificationSys) LoadBucketMetadata(ctx context.Context, bucketName string) {
|
||||
if globalIsGateway {
|
||||
return
|
||||
}
|
||||
|
||||
ng := WithNPeers(len(sys.peerClients))
|
||||
for idx, client := range sys.peerClients {
|
||||
if client == nil {
|
||||
@ -634,23 +638,18 @@ func (sys *NotificationSys) LoadTransitionTierConfig(ctx context.Context) {
|
||||
}
|
||||
|
||||
// Loads notification policies for all buckets into NotificationSys.
|
||||
func (sys *NotificationSys) load(buckets []BucketInfo) {
|
||||
for _, bucket := range buckets {
|
||||
ctx := logger.SetReqInfo(GlobalContext, &logger.ReqInfo{BucketName: bucket.Name})
|
||||
config, err := globalBucketMetadataSys.GetNotificationConfig(bucket.Name)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
}
|
||||
config.SetRegion(globalServerRegion)
|
||||
if err = config.Validate(globalServerRegion, globalNotificationSys.targetList); err != nil {
|
||||
if _, ok := err.(*event.ErrARNNotFound); !ok {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
sys.AddRulesMap(bucket.Name, config.ToRulesMap())
|
||||
func (sys *NotificationSys) set(bucket BucketInfo, meta BucketMetadata) {
|
||||
config := meta.notificationConfig
|
||||
if config == nil {
|
||||
return
|
||||
}
|
||||
config.SetRegion(globalServerRegion)
|
||||
if err := config.Validate(globalServerRegion, globalNotificationSys.targetList); err != nil {
|
||||
if _, ok := err.(*event.ErrARNNotFound); !ok {
|
||||
logger.LogIf(GlobalContext, err)
|
||||
}
|
||||
}
|
||||
sys.AddRulesMap(bucket.Name, config.ToRulesMap())
|
||||
}
|
||||
|
||||
// Init - initializes notification system from notification.xml and listenxl.meta of all buckets.
|
||||
@ -676,7 +675,6 @@ func (sys *NotificationSys) Init(ctx context.Context, buckets []BucketInfo, objA
|
||||
}
|
||||
}()
|
||||
|
||||
go sys.load(buckets)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -411,16 +411,9 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) {
|
||||
// Initialize bucket metadata sub-system.
|
||||
globalBucketMetadataSys.Init(ctx, buckets, newObject)
|
||||
|
||||
// Initialize notification system.
|
||||
globalNotificationSys.Init(ctx, buckets, newObject)
|
||||
|
||||
// Initialize bucket targets sub-system.
|
||||
globalBucketTargetSys.Init(ctx, buckets, newObject)
|
||||
|
||||
if globalIsErasure {
|
||||
// Initialize transition tier configuration manager
|
||||
err = globalTierConfigMgr.Init(ctx, newObject)
|
||||
if err != nil {
|
||||
if err = globalTierConfigMgr.Init(ctx, newObject); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user