mirror of
				https://github.com/minio/minio.git
				synced 2025-10-29 15:55:00 -04:00 
			
		
		
		
	fix: unexpected logging with bucket metadata conversions (#9519)
This commit is contained in:
		
							parent
							
								
									7b58dcb28c
								
							
						
					
					
						commit
						b768645fde
					
				| @ -715,7 +715,6 @@ func (h *healSequence) healItemsFromSourceCh() error { | ||||
| 
 | ||||
| func (h *healSequence) healFromSourceCh() { | ||||
| 	h.healItemsFromSourceCh() | ||||
| 	close(h.traverseAndHealDoneCh) | ||||
| } | ||||
| 
 | ||||
| func (h *healSequence) healItems(bucketsOnly bool) error { | ||||
|  | ||||
| @ -293,10 +293,12 @@ func (api objectAPIHandlers) ListBucketsHandler(w http.ResponseWriter, r *http.R | ||||
| 
 | ||||
| 		for i := range bucketsInfo { | ||||
| 			meta, err := loadBucketMetadata(ctx, objectAPI, bucketsInfo[i].Name) | ||||
| 			logger.LogIf(ctx, err) | ||||
| 			if err == nil { | ||||
| 				bucketsInfo[i].Created = meta.Created | ||||
| 			} | ||||
| 			if err != errMetaDataConverted { | ||||
| 				logger.LogIf(ctx, err) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| @ -1103,7 +1105,7 @@ func (api objectAPIHandlers) GetBucketObjectLockConfigHandler(w http.ResponseWri | ||||
| 	} | ||||
| 
 | ||||
| 	meta, err := loadBucketMetadata(ctx, objectAPI, bucket) | ||||
| 	if err != nil { | ||||
| 	if err != nil && err != errMetaDataConverted { | ||||
| 		writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| @ -115,13 +115,15 @@ func (b *bucketMetadata) convertLegacyLockconfig(ctx context.Context, objectAPI | ||||
| 			return err | ||||
| 		} | ||||
| 
 | ||||
| 		logger.LogIf(ctx, deleteConfig(ctx, objectAPI, configFile)) | ||||
| 		if err := deleteConfig(ctx, objectAPI, configFile); err != nil && !errors.Is(err, errConfigNotFound) { | ||||
| 			logger.LogIf(ctx, err) | ||||
| 		} | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	configData, err := readConfig(ctx, objectAPI, configFile) | ||||
| 	if err != nil { | ||||
| 		if err != errConfigNotFound { | ||||
| 		if !errors.Is(err, errConfigNotFound) { | ||||
| 			return err | ||||
| 		} | ||||
| 
 | ||||
|  | ||||
| @ -77,7 +77,7 @@ func (sys *BucketQuotaSys) Init(buckets []BucketInfo, objAPI ObjectLayer) error | ||||
| 		return errServerNotInitialized | ||||
| 	} | ||||
| 
 | ||||
| 	// In gateway mode, we always fetch the bucket lifecycle configuration from the gateway backend. | ||||
| 	// In gateway mode, we do not support bucket quota. | ||||
| 	// So, this is a no-op for gateway servers. | ||||
| 	if globalIsGateway { | ||||
| 		return nil | ||||
|  | ||||
| @ -36,42 +36,10 @@ func handleEncryptedConfigBackend(objAPI ObjectLayer, server bool) error { | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	// If its server mode or nas gateway, migrate the backend. | ||||
| 	doneCh := make(chan struct{}) | ||||
| 
 | ||||
| 	var encrypted bool | ||||
| 	var err error | ||||
| 
 | ||||
| 	// Migrating Config backend needs a retry mechanism for | ||||
| 	// the following reasons: | ||||
| 	//  - Read quorum is lost just after the initialization | ||||
| 	//    of the object layer. | ||||
| 	retryTimerCh := newRetryTimerSimple(doneCh) | ||||
| 	var stop bool | ||||
| 
 | ||||
| 	rquorum := InsufficientReadQuorum{} | ||||
| 	wquorum := InsufficientWriteQuorum{} | ||||
| 
 | ||||
| 	for !stop { | ||||
| 		select { | ||||
| 		case <-retryTimerCh: | ||||
| 			if encrypted, err = checkBackendEncrypted(objAPI); err != nil { | ||||
| 				if errors.Is(err, errDiskNotFound) || | ||||
| 					errors.As(err, &rquorum) || | ||||
| 					isErrBucketNotFound(err) { | ||||
| 					logger.Info("Waiting for config backend to be encrypted..") | ||||
| 					continue | ||||
| 				} | ||||
| 				close(doneCh) | ||||
| 				return err | ||||
| 			} | ||||
| 			stop = true | ||||
| 		case <-globalOSSignalCh: | ||||
| 			close(doneCh) | ||||
| 			return fmt.Errorf("Config encryption process stopped gracefully") | ||||
| 		} | ||||
| 	encrypted, err := checkBackendEncrypted(objAPI) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("Unable to encrypt config %w", err) | ||||
| 	} | ||||
| 	close(doneCh) | ||||
| 
 | ||||
| 	if encrypted { | ||||
| 		// backend is encrypted, but credentials are not specified | ||||
| @ -91,34 +59,12 @@ func handleEncryptedConfigBackend(objAPI ObjectLayer, server bool) error { | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	doneCh = make(chan struct{}) | ||||
| 	defer close(doneCh) | ||||
| 
 | ||||
| 	retryTimerCh = newRetryTimerSimple(doneCh) | ||||
| 
 | ||||
| 	// Migrating Config backend needs a retry mechanism for | ||||
| 	// the following reasons: | ||||
| 	//  - Read quorum is lost just after the initialization | ||||
| 	//    of the object layer. | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-retryTimerCh: | ||||
| 			// Migrate IAM configuration | ||||
| 			if err = migrateConfigPrefixToEncrypted(objAPI, globalOldCred, encrypted); err != nil { | ||||
| 				if errors.Is(err, errDiskNotFound) || | ||||
| 					errors.As(err, &rquorum) || | ||||
| 					errors.As(err, &wquorum) || | ||||
| 					isErrBucketNotFound(err) { | ||||
| 					logger.Info("Waiting for config backend to be encrypted..") | ||||
| 					continue | ||||
| 				} | ||||
| 				return err | ||||
| 			} | ||||
| 			return nil | ||||
| 		case <-globalOSSignalCh: | ||||
| 			return fmt.Errorf("Config encryption process stopped gracefully") | ||||
| 		} | ||||
| 	// Migrate IAM configuration | ||||
| 	if err = migrateConfigPrefixToEncrypted(objAPI, globalOldCred, encrypted); err != nil { | ||||
| 		return fmt.Errorf("Unable to migrate all config at .minio.sys/config/: %w", err) | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| const ( | ||||
|  | ||||
| @ -352,7 +352,9 @@ func (d *dataUpdateTracker) deserialize(src io.Reader, newerThan time.Time) erro | ||||
| 	// Version | ||||
| 	if _, err := io.ReadFull(src, tmp[:1]); err != nil { | ||||
| 		if d.debug { | ||||
| 			logger.LogIf(ctx, err) | ||||
| 			if err != io.EOF { | ||||
| 				logger.LogIf(ctx, err) | ||||
| 			} | ||||
| 		} | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| @ -190,7 +190,7 @@ func initFormatFS(ctx context.Context, fsPath string) (rlk *lock.RLockedFile, er | ||||
| 	fsFormatPath := pathJoin(fsPath, minioMetaBucket, formatConfigFile) | ||||
| 
 | ||||
| 	// Add a deployment ID, if it does not exist. | ||||
| 	if err := formatFSFixDeploymentID(fsFormatPath); err != nil { | ||||
| 	if err := formatFSFixDeploymentID(ctx, fsFormatPath); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| @ -288,7 +288,7 @@ func formatFSGetDeploymentID(rlk *lock.RLockedFile) (id string, err error) { | ||||
| } | ||||
| 
 | ||||
| // Generate a deployment ID if one does not exist already. | ||||
| func formatFSFixDeploymentID(fsFormatPath string) error { | ||||
| func formatFSFixDeploymentID(ctx context.Context, fsFormatPath string) error { | ||||
| 	rlk, err := lock.RLockedOpenFile(fsFormatPath) | ||||
| 	if err == nil { | ||||
| 		// format.json can be empty in a rare condition when another | ||||
| @ -339,11 +339,12 @@ func formatFSFixDeploymentID(fsFormatPath string) error { | ||||
| 		return time.Now().Round(time.Second).Sub(formatStartTime).String() | ||||
| 	} | ||||
| 
 | ||||
| 	doneCh := make(chan struct{}) | ||||
| 	defer close(doneCh) | ||||
| 	retryCtx, cancel := context.WithCancel(ctx) | ||||
| 	// Indicate to our routine to exit cleanly upon return. | ||||
| 	defer cancel() | ||||
| 
 | ||||
| 	var wlk *lock.LockedFile | ||||
| 	retryCh := newRetryTimerSimple(doneCh) | ||||
| 	retryCh := newRetryTimerSimple(retryCtx) | ||||
| 	var stop bool | ||||
| 	for !stop { | ||||
| 		select { | ||||
| @ -358,7 +359,7 @@ func formatFSFixDeploymentID(fsFormatPath string) error { | ||||
| 				return err | ||||
| 			} | ||||
| 			stop = true | ||||
| 		case <-globalOSSignalCh: | ||||
| 		case <-ctx.Done(): | ||||
| 			return fmt.Errorf("Initializing FS format stopped gracefully") | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
							
								
								
									
										11
									
								
								cmd/retry.go
									
									
									
									
									
								
							
							
						
						
									
										11
									
								
								cmd/retry.go
									
									
									
									
									
								
							| @ -17,6 +17,7 @@ | ||||
| package cmd | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"math/rand" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| @ -61,7 +62,7 @@ var globalRandomSource = rand.New(&lockedRandSource{ | ||||
| // until the maximum retry attempts are reached. - this function is a fully | ||||
| // configurable version, meant for only advanced use cases. For the most part | ||||
| // one should use newRetryTimerSimple and newRetryTimer. | ||||
| func newRetryTimerWithJitter(unit time.Duration, cap time.Duration, jitter float64, doneCh chan struct{}) <-chan int { | ||||
| func newRetryTimerWithJitter(ctx context.Context, unit time.Duration, cap time.Duration, jitter float64) <-chan int { | ||||
| 	attemptCh := make(chan int) | ||||
| 
 | ||||
| 	// normalize jitter to the range [0, 1.0] | ||||
| @ -100,7 +101,7 @@ func newRetryTimerWithJitter(unit time.Duration, cap time.Duration, jitter float | ||||
| 			select { // Attempts starts. | ||||
| 			case attemptCh <- nextBackoff: | ||||
| 				nextBackoff++ | ||||
| 			case <-doneCh: | ||||
| 			case <-ctx.Done(): | ||||
| 				// Stop the routine. | ||||
| 				return | ||||
| 			} | ||||
| @ -108,7 +109,7 @@ func newRetryTimerWithJitter(unit time.Duration, cap time.Duration, jitter float | ||||
| 			// wait till next backoff time or till doneCh gets a message. | ||||
| 			select { | ||||
| 			case <-timer.C: | ||||
| 			case <-doneCh: | ||||
| 			case <-ctx.Done(): | ||||
| 				// stop the timer and return. | ||||
| 				timer.Stop() | ||||
| 				return | ||||
| @ -130,6 +131,6 @@ const ( | ||||
| // newRetryTimerSimple creates a timer with exponentially increasing delays | ||||
| // until the maximum retry attempts are reached. - this function is a | ||||
| // simpler version with all default values. | ||||
| func newRetryTimerSimple(doneCh chan struct{}) <-chan int { | ||||
| 	return newRetryTimerWithJitter(defaultRetryUnit, defaultRetryCap, MaxJitter, doneCh) | ||||
| func newRetryTimerSimple(ctx context.Context) <-chan int { | ||||
| 	return newRetryTimerWithJitter(ctx, defaultRetryUnit, defaultRetryCap, MaxJitter) | ||||
| } | ||||
|  | ||||
| @ -160,53 +160,45 @@ func newAllSubsystems() { | ||||
| 	globalBucketQuotaSys = NewBucketQuotaSys() | ||||
| } | ||||
| 
 | ||||
| func initSafeMode(buckets []BucketInfo) (err error) { | ||||
| func initSafeMode() (err error) { | ||||
| 	newObject := newObjectLayerWithoutSafeModeFn() | ||||
| 
 | ||||
| 	// Construct path to config/transaction.lock for locking | ||||
| 	transactionConfigPrefix := minioConfigPrefix + "/transaction.lock" | ||||
| 
 | ||||
| 	// Make sure to hold lock for entire migration to avoid | ||||
| 	// such that only one server should migrate the entire config | ||||
| 	// at a given time, this big transaction lock ensures this | ||||
| 	// appropriately. This is also true for rotation of encrypted | ||||
| 	// content. | ||||
| 	objLock := newObject.NewNSLock(GlobalContext, minioMetaBucket, transactionConfigPrefix) | ||||
| 	if err = objLock.GetLock(globalOperationTimeout); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	defer func(objLock RWLocker) { | ||||
| 		objLock.Unlock() | ||||
| 	txnLk := newObject.NewNSLock(GlobalContext, minioMetaBucket, minioConfigPrefix+"/transaction.lock") | ||||
| 	defer func(txnLk RWLocker) { | ||||
| 		txnLk.Unlock() | ||||
| 
 | ||||
| 		if err != nil { | ||||
| 			var cerr config.Err | ||||
| 			// For any config error, we don't need to drop into safe-mode | ||||
| 			// instead its a user error and should be fixed by user. | ||||
| 			if errors.As(err, &cerr) { | ||||
| 				return | ||||
| 			} | ||||
| 
 | ||||
| 			// Prints the formatted startup message in safe mode operation. | ||||
| 			// Drops-into safe mode where users need to now manually recover | ||||
| 			// the server. | ||||
| 			printStartupSafeModeMessage(getAPIEndpoints(), err) | ||||
| 
 | ||||
| 			// Initialization returned error reaching safe mode and | ||||
| 			// not proceeding waiting for admin action. | ||||
| 			handleSignals() | ||||
| 		} | ||||
| 	}(objLock) | ||||
| 
 | ||||
| 	// Migrate all backend configs to encrypted backend configs, optionally | ||||
| 	// handles rotating keys for encryption. | ||||
| 	if err = handleEncryptedConfigBackend(newObject, true); err != nil { | ||||
| 		return fmt.Errorf("Unable to handle encrypted backend for config, iam and policies: %w", err) | ||||
| 	} | ||||
| 	}(txnLk) | ||||
| 
 | ||||
| 	// ****  WARNING **** | ||||
| 	// Migrating to encrypted backend should happen before initialization of any | ||||
| 	// sub-systems, make sure that we do not move the above codeblock elsewhere. | ||||
| 	// Create cancel context to control 'newRetryTimer' go routine. | ||||
| 	retryCtx, cancel := context.WithCancel(GlobalContext) | ||||
| 
 | ||||
| 	// Validate and initialize all subsystems. | ||||
| 	doneCh := make(chan struct{}) | ||||
| 	defer close(doneCh) | ||||
| 	// Indicate to our routine to exit cleanly upon return. | ||||
| 	defer cancel() | ||||
| 
 | ||||
| 	// Initializing sub-systems needs a retry mechanism for | ||||
| 	// the following reasons: | ||||
| @ -214,43 +206,67 @@ func initSafeMode(buckets []BucketInfo) (err error) { | ||||
| 	//    of the object layer. | ||||
| 	//  - Write quorum not met when upgrading configuration | ||||
| 	//    version is needed, migration is needed etc. | ||||
| 	retryTimerCh := newRetryTimerSimple(doneCh) | ||||
| 	for { | ||||
| 		rquorum := InsufficientReadQuorum{} | ||||
| 		wquorum := InsufficientWriteQuorum{} | ||||
| 		bucketNotFound := BucketNotFound{} | ||||
| 		var err error | ||||
| 		select { | ||||
| 		case n := <-retryTimerCh: | ||||
| 			if err = initAllSubsystems(buckets, newObject); err != nil { | ||||
| 				if errors.Is(err, errDiskNotFound) || | ||||
| 					errors.As(err, &rquorum) || | ||||
| 					errors.As(err, &wquorum) || | ||||
| 					errors.As(err, &bucketNotFound) { | ||||
| 					if n < 5 { | ||||
| 						logger.Info("Waiting for all sub-systems to be initialized..") | ||||
| 					} else { | ||||
| 						logger.Info("Waiting for all sub-systems to be initialized.. %v", err) | ||||
| 					} | ||||
| 					continue | ||||
| 	rquorum := InsufficientReadQuorum{} | ||||
| 	wquorum := InsufficientWriteQuorum{} | ||||
| 	optimeout := OperationTimedOut{} | ||||
| 	for n := range newRetryTimerSimple(retryCtx) { | ||||
| 		// let one of the server acquire the lock, if not let them timeout. | ||||
| 		// which shall be retried again by this loop. | ||||
| 		if err = txnLk.GetLock(leaderLockTimeout); err == nil { | ||||
| 			// Migrate all backend configs to encrypted backend configs, optionally | ||||
| 			// handles rotating keys for encryption, if there is any retriable failure | ||||
| 			// that shall be retried if there is an error. | ||||
| 			if err = handleEncryptedConfigBackend(newObject, true); err == nil { | ||||
| 				// Upon success migrating the config, initialize all sub-systems | ||||
| 				// if all sub-systems initialized successfully return right away | ||||
| 				if err = initAllSubsystems(newObject); err == nil { | ||||
| 					return nil | ||||
| 				} | ||||
| 				return err | ||||
| 			} | ||||
| 			return nil | ||||
| 		case <-globalOSSignalCh: | ||||
| 			if err == nil { | ||||
| 				return errors.New("Initializing sub-systems stopped gracefully") | ||||
| 			} | ||||
| 			return fmt.Errorf("Unable to initialize sub-systems: %w", err) | ||||
| 		} | ||||
| 
 | ||||
| 		// One of these retriable errors shall be retried. | ||||
| 		if errors.Is(err, errDiskNotFound) || | ||||
| 			errors.Is(err, errConfigNotFound) || | ||||
| 			errors.Is(err, context.Canceled) || | ||||
| 			errors.Is(err, context.DeadlineExceeded) || | ||||
| 			errors.As(err, &optimeout) || | ||||
| 			errors.As(err, &rquorum) || | ||||
| 			errors.As(err, &wquorum) || | ||||
| 			isErrBucketNotFound(err) { | ||||
| 			if n < 5 { | ||||
| 				logger.Info("Waiting for all MinIO sub-systems to be initialized..") | ||||
| 			} else { | ||||
| 				logger.Info("Waiting for all MinIO sub-systems to be initialized.. possible cause (%v)", err) | ||||
| 			} | ||||
| 			continue | ||||
| 		} | ||||
| 
 | ||||
| 		// Any other unhandled return right here. | ||||
| 		return fmt.Errorf("Unable to initialize sub-systems: %w", err) | ||||
| 	} | ||||
| 
 | ||||
| 	// Return an error when retry is canceled or deadlined | ||||
| 	if err = retryCtx.Err(); err != nil { | ||||
| 		return fmt.Errorf("Unable to initialize sub-systems: %w", err) | ||||
| 	} | ||||
| 
 | ||||
| 	// Retry was canceled successfully. | ||||
| 	return errors.New("Initializing sub-systems stopped gracefully") | ||||
| } | ||||
| 
 | ||||
| func initAllSubsystems(buckets []BucketInfo, newObject ObjectLayer) (err error) { | ||||
| func initAllSubsystems(newObject ObjectLayer) (err error) { | ||||
| 	// List buckets to be re-used for loading configs. | ||||
| 	buckets, err := newObject.ListBuckets(GlobalContext) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("Unable to list buckets: %w", err) | ||||
| 	} | ||||
| 
 | ||||
| 	// Initialize config system. | ||||
| 	if err = globalConfigSys.Init(newObject); err != nil { | ||||
| 		return fmt.Errorf("Unable to initialize config system: %w", err) | ||||
| 	} | ||||
| 
 | ||||
| 	if globalEtcdClient != nil { | ||||
| 		// ****  WARNING **** | ||||
| 		// Migrating to encrypted backend on etcd should happen before initialization of | ||||
| @ -294,6 +310,11 @@ func initAllSubsystems(buckets []BucketInfo, newObject ObjectLayer) (err error) | ||||
| 		return fmt.Errorf("Unable to initialize bucket quota system: %w", err) | ||||
| 	} | ||||
| 
 | ||||
| 	// Populate existing buckets to the etcd backend | ||||
| 	if globalDNSConfig != nil { | ||||
| 		initFederatorBackend(buckets, newObject) | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| @ -458,13 +479,7 @@ func serverMain(ctx *cli.Context) { | ||||
| 
 | ||||
| 	go startBackgroundOps(GlobalContext, newObject) | ||||
| 
 | ||||
| 	// Calls New() and initializes all sub-systems. | ||||
| 	buckets, err := newObject.ListBuckets(GlobalContext) | ||||
| 	if err != nil { | ||||
| 		logger.Fatal(err, "Unable to list buckets") | ||||
| 	} | ||||
| 
 | ||||
| 	logger.FatalIf(initSafeMode(buckets), "Unable to initialize server switching into safe-mode") | ||||
| 	logger.FatalIf(initSafeMode(), "Unable to initialize server switching into safe-mode") | ||||
| 
 | ||||
| 	if globalCacheConfig.Enabled { | ||||
| 		// initialize the new disk cache objects. | ||||
| @ -477,11 +492,6 @@ func serverMain(ctx *cli.Context) { | ||||
| 		globalObjLayerMutex.Unlock() | ||||
| 	} | ||||
| 
 | ||||
| 	// Populate existing buckets to the etcd backend | ||||
| 	if globalDNSConfig != nil { | ||||
| 		initFederatorBackend(buckets, newObject) | ||||
| 	} | ||||
| 
 | ||||
| 	// Disable safe mode operation, after all initialization is over. | ||||
| 	globalObjLayerMutex.Lock() | ||||
| 	globalSafeMode = false | ||||
|  | ||||
| @ -128,13 +128,16 @@ func (dm *DRWMutex) GetRLock(id, source string, timeout time.Duration) (locked b | ||||
| // algorithm until either the lock is acquired successfully or more | ||||
| // time has elapsed than the timeout value. | ||||
| func (dm *DRWMutex) lockBlocking(timeout time.Duration, id, source string, isReadLock bool) (locked bool) { | ||||
| 	doneCh, start := make(chan struct{}), time.Now().UTC() | ||||
| 	defer close(doneCh) | ||||
| 	start := time.Now().UTC() | ||||
| 
 | ||||
| 	restClnts := dm.clnt.GetLockersFn() | ||||
| 
 | ||||
| 	retryCtx, cancel := context.WithCancel(dm.ctx) | ||||
| 
 | ||||
| 	defer cancel() | ||||
| 
 | ||||
| 	// Use incremental back-off algorithm for repeated attempts to acquire the lock | ||||
| 	for range newRetryTimerSimple(doneCh) { | ||||
| 	for range newRetryTimerSimple(retryCtx) { | ||||
| 		select { | ||||
| 		case <-dm.ctx.Done(): | ||||
| 			return | ||||
|  | ||||
| @ -17,6 +17,7 @@ | ||||
| package dsync | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"math/rand" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| @ -61,7 +62,7 @@ var globalRandomSource = rand.New(&lockedRandSource{ | ||||
| // until the maximum retry attempts are reached. - this function is a fully | ||||
| // configurable version, meant for only advanced use cases. For the most part | ||||
| // one should use newRetryTimerSimple and newRetryTimer. | ||||
| func newRetryTimerWithJitter(unit time.Duration, cap time.Duration, jitter float64, doneCh <-chan struct{}) <-chan int { | ||||
| func newRetryTimerWithJitter(ctx context.Context, unit time.Duration, cap time.Duration, jitter float64) <-chan int { | ||||
| 	attemptCh := make(chan int) | ||||
| 
 | ||||
| 	// normalize jitter to the range [0, 1.0] | ||||
| @ -100,7 +101,7 @@ func newRetryTimerWithJitter(unit time.Duration, cap time.Duration, jitter float | ||||
| 			select { // Attempts starts. | ||||
| 			case attemptCh <- nextBackoff: | ||||
| 				nextBackoff++ | ||||
| 			case <-doneCh: | ||||
| 			case <-ctx.Done(): | ||||
| 				// Stop the routine. | ||||
| 				return | ||||
| 			} | ||||
| @ -108,7 +109,7 @@ func newRetryTimerWithJitter(unit time.Duration, cap time.Duration, jitter float | ||||
| 			// wait till next backoff time or till doneCh gets a message. | ||||
| 			select { | ||||
| 			case <-timer.C: | ||||
| 			case <-doneCh: | ||||
| 			case <-ctx.Done(): | ||||
| 				// stop the timer and return. | ||||
| 				timer.Stop() | ||||
| 				return | ||||
| @ -130,13 +131,13 @@ const ( | ||||
| // newRetryTimer creates a timer with exponentially increasing delays | ||||
| // until the maximum retry attempts are reached. - this function provides | ||||
| // resulting retry values to be of maximum jitter. | ||||
| func newRetryTimer(unit time.Duration, cap time.Duration, doneCh <-chan struct{}) <-chan int { | ||||
| 	return newRetryTimerWithJitter(unit, cap, MaxJitter, doneCh) | ||||
| func newRetryTimer(ctx context.Context, unit time.Duration, cap time.Duration) <-chan int { | ||||
| 	return newRetryTimerWithJitter(ctx, unit, cap, MaxJitter) | ||||
| } | ||||
| 
 | ||||
| // newRetryTimerSimple creates a timer with exponentially increasing delays | ||||
| // until the maximum retry attempts are reached. - this function is a | ||||
| // simpler version with all default values. | ||||
| func newRetryTimerSimple(doneCh <-chan struct{}) <-chan int { | ||||
| 	return newRetryTimerWithJitter(defaultRetryUnit, defaultRetryCap, MaxJitter, doneCh) | ||||
| func newRetryTimerSimple(ctx context.Context) <-chan int { | ||||
| 	return newRetryTimerWithJitter(ctx, defaultRetryUnit, defaultRetryCap, MaxJitter) | ||||
| } | ||||
|  | ||||
| @ -17,25 +17,26 @@ | ||||
| package dsync | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| ) | ||||
| 
 | ||||
| // Tests for retry timer. | ||||
| func TestRetryTimerSimple(t *testing.T) { | ||||
| 	doneCh := make(chan struct{}) | ||||
| 	attemptCh := newRetryTimerSimple(doneCh) | ||||
| 	rctx, cancel := context.WithCancel(context.Background()) | ||||
| 	attemptCh := newRetryTimerSimple(rctx) | ||||
| 	i := <-attemptCh | ||||
| 	if i != 0 { | ||||
| 		close(doneCh) | ||||
| 		cancel() | ||||
| 		t.Fatalf("Invalid attempt counter returned should be 0, found %d instead", i) | ||||
| 	} | ||||
| 	i = <-attemptCh | ||||
| 	if i <= 0 { | ||||
| 		close(doneCh) | ||||
| 		cancel() | ||||
| 		t.Fatalf("Invalid attempt counter returned should be greater than 0, found %d instead", i) | ||||
| 	} | ||||
| 	close(doneCh) | ||||
| 	cancel() | ||||
| 	_, ok := <-attemptCh | ||||
| 	if ok { | ||||
| 		t.Fatal("Attempt counter should be closed") | ||||
| @ -44,18 +45,19 @@ func TestRetryTimerSimple(t *testing.T) { | ||||
| 
 | ||||
| // Test retry time with no jitter. | ||||
| func TestRetryTimerWithNoJitter(t *testing.T) { | ||||
| 	doneCh := make(chan struct{}) | ||||
| 	rctx, cancel := context.WithCancel(context.Background()) | ||||
| 
 | ||||
| 	// No jitter | ||||
| 	attemptCh := newRetryTimerWithJitter(time.Millisecond, 5*time.Millisecond, NoJitter, doneCh) | ||||
| 	attemptCh := newRetryTimerWithJitter(rctx, time.Millisecond, 5*time.Millisecond, NoJitter) | ||||
| 	i := <-attemptCh | ||||
| 	if i != 0 { | ||||
| 		close(doneCh) | ||||
| 		cancel() | ||||
| 		t.Fatalf("Invalid attempt counter returned should be 0, found %d instead", i) | ||||
| 	} | ||||
| 	// Loop through the maximum possible attempt. | ||||
| 	for i = range attemptCh { | ||||
| 		if i == 30 { | ||||
| 			close(doneCh) | ||||
| 			cancel() | ||||
| 		} | ||||
| 	} | ||||
| 	_, ok := <-attemptCh | ||||
| @ -66,15 +68,16 @@ func TestRetryTimerWithNoJitter(t *testing.T) { | ||||
| 
 | ||||
| // Test retry time with Jitter greater than MaxJitter. | ||||
| func TestRetryTimerWithJitter(t *testing.T) { | ||||
| 	doneCh := make(chan struct{}) | ||||
| 	rctx, cancel := context.WithCancel(context.Background()) | ||||
| 
 | ||||
| 	// Jitter will be set back to 1.0 | ||||
| 	attemptCh := newRetryTimerWithJitter(time.Second, 30*time.Second, 2.0, doneCh) | ||||
| 	attemptCh := newRetryTimerWithJitter(rctx, time.Second, 30*time.Second, 2.0) | ||||
| 	i := <-attemptCh | ||||
| 	if i != 0 { | ||||
| 		close(doneCh) | ||||
| 		cancel() | ||||
| 		t.Fatalf("Invalid attempt counter returned should be 0, found %d instead", i) | ||||
| 	} | ||||
| 	close(doneCh) | ||||
| 	cancel() | ||||
| 	_, ok := <-attemptCh | ||||
| 	if ok { | ||||
| 		t.Fatal("Attempt counter should be closed") | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user