From 4523da65437819ab7698a954a05fb51726061019 Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Tue, 25 Oct 2022 12:36:57 -0700 Subject: [PATCH] feat: introduce pool-level rebalance (#15483) --- cmd/admin-handler-utils.go | 12 + cmd/admin-handlers-pools.go | 145 ++ cmd/admin-router.go | 5 + cmd/api-errors.go | 14 + cmd/apierrorcode_string.go | 212 +-- cmd/config-common.go | 20 +- cmd/erasure-server-pool-decom.go | 22 + cmd/erasure-server-pool-rebalance.go | 876 ++++++++++++ cmd/erasure-server-pool-rebalance_gen.go | 1228 +++++++++++++++++ cmd/erasure-server-pool-rebalance_gen_test.go | 575 ++++++++ cmd/erasure-server-pool.go | 24 +- cmd/iam-object-store.go | 2 +- cmd/notification.go | 43 + cmd/object-api-interface.go | 3 + cmd/peer-rest-client.go | 22 + cmd/peer-rest-common.go | 44 +- cmd/peer-rest-server.go | 56 + cmd/rebalance-admin.go | 108 ++ cmd/rebalancemetric_string.go | 27 + cmd/rebalstatus_string.go | 27 + 20 files changed, 3328 insertions(+), 137 deletions(-) create mode 100644 cmd/erasure-server-pool-rebalance.go create mode 100644 cmd/erasure-server-pool-rebalance_gen.go create mode 100644 cmd/erasure-server-pool-rebalance_gen_test.go create mode 100644 cmd/rebalance-admin.go create mode 100644 cmd/rebalancemetric_string.go create mode 100644 cmd/rebalstatus_string.go diff --git a/cmd/admin-handler-utils.go b/cmd/admin-handler-utils.go index 3a918a5a7..b0f0d8aa3 100644 --- a/cmd/admin-handler-utils.go +++ b/cmd/admin-handler-utils.go @@ -124,6 +124,18 @@ func toAdminAPIErr(ctx context.Context, err error) APIError { Description: err.Error(), HTTPStatusCode: http.StatusBadRequest, } + case errors.Is(err, errDecommissionRebalanceAlreadyRunning): + apiErr = APIError{ + Code: "XMinioDecommissionNotAllowed", + Description: err.Error(), + HTTPStatusCode: http.StatusBadRequest, + } + case errors.Is(err, errRebalanceDecommissionAlreadyRunning): + apiErr = APIError{ + Code: "XMinioRebalanceNotAllowed", + Description: err.Error(), + HTTPStatusCode: http.StatusBadRequest, + } case errors.Is(err, errConfigNotFound): apiErr = APIError{ Code: "XMinioConfigError", diff --git a/cmd/admin-handlers-pools.go b/cmd/admin-handlers-pools.go index 50e6cdec4..29b21a50d 100644 --- a/cmd/admin-handlers-pools.go +++ b/cmd/admin-handlers-pools.go @@ -19,6 +19,7 @@ package cmd import ( "encoding/json" + "errors" "fmt" "net/http" @@ -27,6 +28,11 @@ import ( iampolicy "github.com/minio/pkg/iam/policy" ) +var ( + errRebalanceDecommissionAlreadyRunning = errors.New("Rebalance cannot be started, decommission is aleady in progress") + errDecommissionRebalanceAlreadyRunning = errors.New("Decommission cannot be started, rebalance is already in progress") +) + func (a adminAPIHandlers) StartDecommission(w http.ResponseWriter, r *http.Request) { ctx := newContext(r, w, "StartDecommission") @@ -49,6 +55,11 @@ func (a adminAPIHandlers) StartDecommission(w http.ResponseWriter, r *http.Reque return } + if pools.IsRebalanceStarted() { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errDecommissionRebalanceAlreadyRunning), r.URL) + return + } + vars := mux.Vars(r) v := vars["pool"] @@ -200,3 +211,137 @@ func (a adminAPIHandlers) ListPools(w http.ResponseWriter, r *http.Request) { logger.LogIf(r.Context(), json.NewEncoder(w).Encode(poolsStatus)) } + +func (a adminAPIHandlers) RebalanceStart(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "RebalanceStart") + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.RebalanceAdminAction) + if objectAPI == nil { + return + } + + // NB rebalance-start admin API is always coordinated from first pool's + // first node. The following is required to serialize (the effects of) + // concurrent rebalance-start commands. + if ep := globalEndpoints[0].Endpoints[0]; !ep.IsLocal { + for nodeIdx, proxyEp := range globalProxyEndpoints { + if proxyEp.Endpoint.Host == ep.Host { + if proxyRequestByNodeIndex(ctx, w, r, nodeIdx) { + return + } + } + } + } + + pools, ok := objectAPI.(*erasureServerPools) + if !ok || len(pools.serverPools) == 1 { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } + + if pools.IsDecommissionRunning() { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, errRebalanceDecommissionAlreadyRunning), r.URL) + return + } + + if pools.IsRebalanceStarted() { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminRebalanceAlreadyStarted), r.URL) + return + } + + bucketInfos, err := objectAPI.ListBuckets(ctx, BucketOptions{}) + if err != nil { + writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + buckets := make([]string, 0, len(bucketInfos)) + for _, bInfo := range bucketInfos { + buckets = append(buckets, bInfo.Name) + } + + var id string + if id, err = pools.initRebalanceMeta(ctx, buckets); err != nil { + writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + // Rebalance routine is run on the first node of any pool participating in rebalance. + pools.StartRebalance() + + b, err := json.Marshal(struct { + ID string `json:"id"` + }{ID: id}) + if err != nil { + writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + writeSuccessResponseJSON(w, b) + // Notify peers to load rebalance.bin and start rebalance routine if they happen to be + // participating pool's leader node + globalNotificationSys.LoadRebalanceMeta(ctx, true) +} + +func (a adminAPIHandlers) RebalanceStatus(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "RebalanceStatus") + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.RebalanceAdminAction) + if objectAPI == nil { + return + } + + // Proxy rebalance-status to first pool first node, so that users see a + // consistent view of rebalance progress even though different rebalancing + // pools may temporarily have out of date info on the others. + if ep := globalEndpoints[0].Endpoints[0]; !ep.IsLocal { + for nodeIdx, proxyEp := range globalProxyEndpoints { + if proxyEp.Endpoint.Host == ep.Host { + if proxyRequestByNodeIndex(ctx, w, r, nodeIdx) { + return + } + } + } + } + + pools, ok := objectAPI.(*erasureServerPools) + if !ok { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } + + rs, err := rebalanceStatus(ctx, pools) + if err != nil { + if errors.Is(err, errRebalanceNotStarted) { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrAdminRebalanceNotStarted), r.URL) + return + } + logger.LogIf(ctx, fmt.Errorf("failed to fetch rebalance status: %w", err)) + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + logger.LogIf(r.Context(), json.NewEncoder(w).Encode(rs)) +} + +func (a adminAPIHandlers) RebalanceStop(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "RebalanceStop") + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.RebalanceAdminAction) + if objectAPI == nil { + return + } + + pools, ok := objectAPI.(*erasureServerPools) + if !ok { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL) + return + } + + // Cancel any ongoing rebalance operation + globalNotificationSys.StopRebalance(r.Context()) + writeSuccessResponseHeadersOnly(w) + logger.LogIf(ctx, pools.saveRebalanceStats(GlobalContext, 0, rebalSaveStoppedAt)) +} diff --git a/cmd/admin-router.go b/cmd/admin-router.go index ad28454cc..bd669cede 100644 --- a/cmd/admin-router.go +++ b/cmd/admin-router.go @@ -84,6 +84,11 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) { adminRouter.Methods(http.MethodPost).Path(adminVersion+"/pools/decommission").HandlerFunc(gz(httpTraceAll(adminAPI.StartDecommission))).Queries("pool", "{pool:.*}") adminRouter.Methods(http.MethodPost).Path(adminVersion+"/pools/cancel").HandlerFunc(gz(httpTraceAll(adminAPI.CancelDecommission))).Queries("pool", "{pool:.*}") + + // Rebalance operations + adminRouter.Methods(http.MethodPost).Path(adminVersion + "/rebalance/start").HandlerFunc(gz(httpTraceAll(adminAPI.RebalanceStart))) + adminRouter.Methods(http.MethodGet).Path(adminVersion + "/rebalance/status").HandlerFunc(gz(httpTraceAll(adminAPI.RebalanceStatus))) + adminRouter.Methods(http.MethodPost).Path(adminVersion + "/rebalance/stop").HandlerFunc(gz(httpTraceAll(adminAPI.RebalanceStop))) } // Profiling operations - deprecated API diff --git a/cmd/api-errors.go b/cmd/api-errors.go index f7557e981..c24e72d6d 100644 --- a/cmd/api-errors.go +++ b/cmd/api-errors.go @@ -291,6 +291,10 @@ const ( ErrSiteReplicationIAMError ErrSiteReplicationConfigMissing + // Pool rebalance errors + ErrAdminRebalanceAlreadyStarted + ErrAdminRebalanceNotStarted + // Bucket Quota error codes ErrAdminBucketQuotaExceeded ErrAdminNoSuchQuotaConfiguration @@ -1397,6 +1401,16 @@ var errorCodes = errorCodeMap{ Description: "Site not found in site replication configuration", HTTPStatusCode: http.StatusBadRequest, }, + ErrAdminRebalanceAlreadyStarted: { + Code: "XMinioAdminRebalanceAlreadyStarted", + Description: "Pool rebalance is already started", + HTTPStatusCode: http.StatusConflict, + }, + ErrAdminRebalanceNotStarted: { + Code: "XMinioAdminRebalanceNotStarted", + Description: "Pool rebalance is not started", + HTTPStatusCode: http.StatusNotFound, + }, ErrMaximumExpires: { Code: "AuthorizationQueryParametersError", Description: "X-Amz-Expires must be less than a week (in seconds); that is, the given X-Amz-Expires must be less than 604800 seconds", diff --git a/cmd/apierrorcode_string.go b/cmd/apierrorcode_string.go index 1080f5c6a..6dff7fa8f 100644 --- a/cmd/apierrorcode_string.go +++ b/cmd/apierrorcode_string.go @@ -203,114 +203,116 @@ func _() { _ = x[ErrSiteReplicationBucketMetaError-192] _ = x[ErrSiteReplicationIAMError-193] _ = x[ErrSiteReplicationConfigMissing-194] - _ = x[ErrAdminBucketQuotaExceeded-195] - _ = x[ErrAdminNoSuchQuotaConfiguration-196] - _ = x[ErrHealNotImplemented-197] - _ = x[ErrHealNoSuchProcess-198] - _ = x[ErrHealInvalidClientToken-199] - _ = x[ErrHealMissingBucket-200] - _ = x[ErrHealAlreadyRunning-201] - _ = x[ErrHealOverlappingPaths-202] - _ = x[ErrIncorrectContinuationToken-203] - _ = x[ErrEmptyRequestBody-204] - _ = x[ErrUnsupportedFunction-205] - _ = x[ErrInvalidExpressionType-206] - _ = x[ErrBusy-207] - _ = x[ErrUnauthorizedAccess-208] - _ = x[ErrExpressionTooLong-209] - _ = x[ErrIllegalSQLFunctionArgument-210] - _ = x[ErrInvalidKeyPath-211] - _ = x[ErrInvalidCompressionFormat-212] - _ = x[ErrInvalidFileHeaderInfo-213] - _ = x[ErrInvalidJSONType-214] - _ = x[ErrInvalidQuoteFields-215] - _ = x[ErrInvalidRequestParameter-216] - _ = x[ErrInvalidDataType-217] - _ = x[ErrInvalidTextEncoding-218] - _ = x[ErrInvalidDataSource-219] - _ = x[ErrInvalidTableAlias-220] - _ = x[ErrMissingRequiredParameter-221] - _ = x[ErrObjectSerializationConflict-222] - _ = x[ErrUnsupportedSQLOperation-223] - _ = x[ErrUnsupportedSQLStructure-224] - _ = x[ErrUnsupportedSyntax-225] - _ = x[ErrUnsupportedRangeHeader-226] - _ = x[ErrLexerInvalidChar-227] - _ = x[ErrLexerInvalidOperator-228] - _ = x[ErrLexerInvalidLiteral-229] - _ = x[ErrLexerInvalidIONLiteral-230] - _ = x[ErrParseExpectedDatePart-231] - _ = x[ErrParseExpectedKeyword-232] - _ = x[ErrParseExpectedTokenType-233] - _ = x[ErrParseExpected2TokenTypes-234] - _ = x[ErrParseExpectedNumber-235] - _ = x[ErrParseExpectedRightParenBuiltinFunctionCall-236] - _ = x[ErrParseExpectedTypeName-237] - _ = x[ErrParseExpectedWhenClause-238] - _ = x[ErrParseUnsupportedToken-239] - _ = x[ErrParseUnsupportedLiteralsGroupBy-240] - _ = x[ErrParseExpectedMember-241] - _ = x[ErrParseUnsupportedSelect-242] - _ = x[ErrParseUnsupportedCase-243] - _ = x[ErrParseUnsupportedCaseClause-244] - _ = x[ErrParseUnsupportedAlias-245] - _ = x[ErrParseUnsupportedSyntax-246] - _ = x[ErrParseUnknownOperator-247] - _ = x[ErrParseMissingIdentAfterAt-248] - _ = x[ErrParseUnexpectedOperator-249] - _ = x[ErrParseUnexpectedTerm-250] - _ = x[ErrParseUnexpectedToken-251] - _ = x[ErrParseUnexpectedKeyword-252] - _ = x[ErrParseExpectedExpression-253] - _ = x[ErrParseExpectedLeftParenAfterCast-254] - _ = x[ErrParseExpectedLeftParenValueConstructor-255] - _ = x[ErrParseExpectedLeftParenBuiltinFunctionCall-256] - _ = x[ErrParseExpectedArgumentDelimiter-257] - _ = x[ErrParseCastArity-258] - _ = x[ErrParseInvalidTypeParam-259] - _ = x[ErrParseEmptySelect-260] - _ = x[ErrParseSelectMissingFrom-261] - _ = x[ErrParseExpectedIdentForGroupName-262] - _ = x[ErrParseExpectedIdentForAlias-263] - _ = x[ErrParseUnsupportedCallWithStar-264] - _ = x[ErrParseNonUnaryAgregateFunctionCall-265] - _ = x[ErrParseMalformedJoin-266] - _ = x[ErrParseExpectedIdentForAt-267] - _ = x[ErrParseAsteriskIsNotAloneInSelectList-268] - _ = x[ErrParseCannotMixSqbAndWildcardInSelectList-269] - _ = x[ErrParseInvalidContextForWildcardInSelectList-270] - _ = x[ErrIncorrectSQLFunctionArgumentType-271] - _ = x[ErrValueParseFailure-272] - _ = x[ErrEvaluatorInvalidArguments-273] - _ = x[ErrIntegerOverflow-274] - _ = x[ErrLikeInvalidInputs-275] - _ = x[ErrCastFailed-276] - _ = x[ErrInvalidCast-277] - _ = x[ErrEvaluatorInvalidTimestampFormatPattern-278] - _ = x[ErrEvaluatorInvalidTimestampFormatPatternSymbolForParsing-279] - _ = x[ErrEvaluatorTimestampFormatPatternDuplicateFields-280] - _ = x[ErrEvaluatorTimestampFormatPatternHourClockAmPmMismatch-281] - _ = x[ErrEvaluatorUnterminatedTimestampFormatPatternToken-282] - _ = x[ErrEvaluatorInvalidTimestampFormatPatternToken-283] - _ = x[ErrEvaluatorInvalidTimestampFormatPatternSymbol-284] - _ = x[ErrEvaluatorBindingDoesNotExist-285] - _ = x[ErrMissingHeaders-286] - _ = x[ErrInvalidColumnIndex-287] - _ = x[ErrAdminConfigNotificationTargetsFailed-288] - _ = x[ErrAdminProfilerNotEnabled-289] - _ = x[ErrInvalidDecompressedSize-290] - _ = x[ErrAddUserInvalidArgument-291] - _ = x[ErrAdminResourceInvalidArgument-292] - _ = x[ErrAdminAccountNotEligible-293] - _ = x[ErrAccountNotEligible-294] - _ = x[ErrAdminServiceAccountNotFound-295] - _ = x[ErrPostPolicyConditionInvalidFormat-296] - _ = x[ErrInvalidChecksum-297] + _ = x[ErrAdminRebalanceAlreadyStarted-195] + _ = x[ErrAdminRebalanceNotStarted-196] + _ = x[ErrAdminBucketQuotaExceeded-197] + _ = x[ErrAdminNoSuchQuotaConfiguration-198] + _ = x[ErrHealNotImplemented-199] + _ = x[ErrHealNoSuchProcess-200] + _ = x[ErrHealInvalidClientToken-201] + _ = x[ErrHealMissingBucket-202] + _ = x[ErrHealAlreadyRunning-203] + _ = x[ErrHealOverlappingPaths-204] + _ = x[ErrIncorrectContinuationToken-205] + _ = x[ErrEmptyRequestBody-206] + _ = x[ErrUnsupportedFunction-207] + _ = x[ErrInvalidExpressionType-208] + _ = x[ErrBusy-209] + _ = x[ErrUnauthorizedAccess-210] + _ = x[ErrExpressionTooLong-211] + _ = x[ErrIllegalSQLFunctionArgument-212] + _ = x[ErrInvalidKeyPath-213] + _ = x[ErrInvalidCompressionFormat-214] + _ = x[ErrInvalidFileHeaderInfo-215] + _ = x[ErrInvalidJSONType-216] + _ = x[ErrInvalidQuoteFields-217] + _ = x[ErrInvalidRequestParameter-218] + _ = x[ErrInvalidDataType-219] + _ = x[ErrInvalidTextEncoding-220] + _ = x[ErrInvalidDataSource-221] + _ = x[ErrInvalidTableAlias-222] + _ = x[ErrMissingRequiredParameter-223] + _ = x[ErrObjectSerializationConflict-224] + _ = x[ErrUnsupportedSQLOperation-225] + _ = x[ErrUnsupportedSQLStructure-226] + _ = x[ErrUnsupportedSyntax-227] + _ = x[ErrUnsupportedRangeHeader-228] + _ = x[ErrLexerInvalidChar-229] + _ = x[ErrLexerInvalidOperator-230] + _ = x[ErrLexerInvalidLiteral-231] + _ = x[ErrLexerInvalidIONLiteral-232] + _ = x[ErrParseExpectedDatePart-233] + _ = x[ErrParseExpectedKeyword-234] + _ = x[ErrParseExpectedTokenType-235] + _ = x[ErrParseExpected2TokenTypes-236] + _ = x[ErrParseExpectedNumber-237] + _ = x[ErrParseExpectedRightParenBuiltinFunctionCall-238] + _ = x[ErrParseExpectedTypeName-239] + _ = x[ErrParseExpectedWhenClause-240] + _ = x[ErrParseUnsupportedToken-241] + _ = x[ErrParseUnsupportedLiteralsGroupBy-242] + _ = x[ErrParseExpectedMember-243] + _ = x[ErrParseUnsupportedSelect-244] + _ = x[ErrParseUnsupportedCase-245] + _ = x[ErrParseUnsupportedCaseClause-246] + _ = x[ErrParseUnsupportedAlias-247] + _ = x[ErrParseUnsupportedSyntax-248] + _ = x[ErrParseUnknownOperator-249] + _ = x[ErrParseMissingIdentAfterAt-250] + _ = x[ErrParseUnexpectedOperator-251] + _ = x[ErrParseUnexpectedTerm-252] + _ = x[ErrParseUnexpectedToken-253] + _ = x[ErrParseUnexpectedKeyword-254] + _ = x[ErrParseExpectedExpression-255] + _ = x[ErrParseExpectedLeftParenAfterCast-256] + _ = x[ErrParseExpectedLeftParenValueConstructor-257] + _ = x[ErrParseExpectedLeftParenBuiltinFunctionCall-258] + _ = x[ErrParseExpectedArgumentDelimiter-259] + _ = x[ErrParseCastArity-260] + _ = x[ErrParseInvalidTypeParam-261] + _ = x[ErrParseEmptySelect-262] + _ = x[ErrParseSelectMissingFrom-263] + _ = x[ErrParseExpectedIdentForGroupName-264] + _ = x[ErrParseExpectedIdentForAlias-265] + _ = x[ErrParseUnsupportedCallWithStar-266] + _ = x[ErrParseNonUnaryAgregateFunctionCall-267] + _ = x[ErrParseMalformedJoin-268] + _ = x[ErrParseExpectedIdentForAt-269] + _ = x[ErrParseAsteriskIsNotAloneInSelectList-270] + _ = x[ErrParseCannotMixSqbAndWildcardInSelectList-271] + _ = x[ErrParseInvalidContextForWildcardInSelectList-272] + _ = x[ErrIncorrectSQLFunctionArgumentType-273] + _ = x[ErrValueParseFailure-274] + _ = x[ErrEvaluatorInvalidArguments-275] + _ = x[ErrIntegerOverflow-276] + _ = x[ErrLikeInvalidInputs-277] + _ = x[ErrCastFailed-278] + _ = x[ErrInvalidCast-279] + _ = x[ErrEvaluatorInvalidTimestampFormatPattern-280] + _ = x[ErrEvaluatorInvalidTimestampFormatPatternSymbolForParsing-281] + _ = x[ErrEvaluatorTimestampFormatPatternDuplicateFields-282] + _ = x[ErrEvaluatorTimestampFormatPatternHourClockAmPmMismatch-283] + _ = x[ErrEvaluatorUnterminatedTimestampFormatPatternToken-284] + _ = x[ErrEvaluatorInvalidTimestampFormatPatternToken-285] + _ = x[ErrEvaluatorInvalidTimestampFormatPatternSymbol-286] + _ = x[ErrEvaluatorBindingDoesNotExist-287] + _ = x[ErrMissingHeaders-288] + _ = x[ErrInvalidColumnIndex-289] + _ = x[ErrAdminConfigNotificationTargetsFailed-290] + _ = x[ErrAdminProfilerNotEnabled-291] + _ = x[ErrInvalidDecompressedSize-292] + _ = x[ErrAddUserInvalidArgument-293] + _ = x[ErrAdminResourceInvalidArgument-294] + _ = x[ErrAdminAccountNotEligible-295] + _ = x[ErrAccountNotEligible-296] + _ = x[ErrAdminServiceAccountNotFound-297] + _ = x[ErrPostPolicyConditionInvalidFormat-298] + _ = x[ErrInvalidChecksum-299] } -const _APIErrorCode_name = "NoneAccessDeniedBadDigestEntityTooSmallEntityTooLargePolicyTooLargeIncompleteBodyInternalErrorInvalidAccessKeyIDAccessKeyDisabledInvalidBucketNameInvalidDigestInvalidRangeInvalidRangePartNumberInvalidCopyPartRangeInvalidCopyPartRangeSourceInvalidMaxKeysInvalidEncodingMethodInvalidMaxUploadsInvalidMaxPartsInvalidPartNumberMarkerInvalidPartNumberInvalidRequestBodyInvalidCopySourceInvalidMetadataDirectiveInvalidCopyDestInvalidPolicyDocumentInvalidObjectStateMalformedXMLMissingContentLengthMissingContentMD5MissingRequestBodyErrorMissingSecurityHeaderNoSuchBucketNoSuchBucketPolicyNoSuchBucketLifecycleNoSuchLifecycleConfigurationInvalidLifecycleWithObjectLockNoSuchBucketSSEConfigNoSuchCORSConfigurationNoSuchWebsiteConfigurationReplicationConfigurationNotFoundErrorRemoteDestinationNotFoundErrorReplicationDestinationMissingLockRemoteTargetNotFoundErrorReplicationRemoteConnectionErrorReplicationBandwidthLimitErrorBucketRemoteIdenticalToSourceBucketRemoteAlreadyExistsBucketRemoteLabelInUseBucketRemoteArnTypeInvalidBucketRemoteArnInvalidBucketRemoteRemoveDisallowedRemoteTargetNotVersionedErrorReplicationSourceNotVersionedErrorReplicationNeedsVersioningErrorReplicationBucketNeedsVersioningErrorReplicationDenyEditErrorReplicationNoExistingObjectsObjectRestoreAlreadyInProgressNoSuchKeyNoSuchUploadInvalidVersionIDNoSuchVersionNotImplementedPreconditionFailedRequestTimeTooSkewedSignatureDoesNotMatchMethodNotAllowedInvalidPartInvalidPartOrderAuthorizationHeaderMalformedMalformedPOSTRequestPOSTFileRequiredSignatureVersionNotSupportedBucketNotEmptyAllAccessDisabledMalformedPolicyMissingFieldsMissingCredTagCredMalformedInvalidRegionInvalidServiceS3InvalidServiceSTSInvalidRequestVersionMissingSignTagMissingSignHeadersTagMalformedDateMalformedPresignedDateMalformedCredentialDateMalformedCredentialRegionMalformedExpiresNegativeExpiresAuthHeaderEmptyExpiredPresignRequestRequestNotReadyYetUnsignedHeadersMissingDateHeaderInvalidQuerySignatureAlgoInvalidQueryParamsBucketAlreadyOwnedByYouInvalidDurationBucketAlreadyExistsTooManyBucketsMetadataTooLargeUnsupportedMetadataMaximumExpiresSlowDownInvalidPrefixMarkerBadRequestKeyTooLongErrorInvalidBucketObjectLockConfigurationObjectLockConfigurationNotFoundObjectLockConfigurationNotAllowedNoSuchObjectLockConfigurationObjectLockedInvalidRetentionDatePastObjectLockRetainDateUnknownWORMModeDirectiveBucketTaggingNotFoundObjectLockInvalidHeadersInvalidTagDirectiveInvalidEncryptionMethodInvalidEncryptionKeyIDInsecureSSECustomerRequestSSEMultipartEncryptedSSEEncryptedObjectInvalidEncryptionParametersInvalidSSECustomerAlgorithmInvalidSSECustomerKeyMissingSSECustomerKeyMissingSSECustomerKeyMD5SSECustomerKeyMD5MismatchInvalidSSECustomerParametersIncompatibleEncryptionMethodKMSNotConfiguredKMSKeyNotFoundExceptionNoAccessKeyInvalidTokenEventNotificationARNNotificationRegionNotificationOverlappingFilterNotificationFilterNameInvalidFilterNamePrefixFilterNameSuffixFilterValueInvalidOverlappingConfigsUnsupportedNotificationContentSHA256MismatchContentChecksumMismatchReadQuorumWriteQuorumStorageFullRequestBodyParseObjectExistsAsDirectoryInvalidObjectNameInvalidObjectNamePrefixSlashInvalidResourceNameServerNotInitializedOperationTimedOutClientDisconnectedOperationMaxedOutInvalidRequestTransitionStorageClassNotFoundErrorInvalidStorageClassBackendDownMalformedJSONAdminNoSuchUserAdminNoSuchGroupAdminGroupNotEmptyAdminNoSuchJobAdminNoSuchPolicyAdminInvalidArgumentAdminInvalidAccessKeyAdminInvalidSecretKeyAdminConfigNoQuorumAdminConfigTooLargeAdminConfigBadJSONAdminNoSuchConfigTargetAdminConfigEnvOverriddenAdminConfigDuplicateKeysAdminConfigInvalidIDPTypeAdminConfigLDAPValidationAdminCredentialsMismatchInsecureClientRequestObjectTamperedSiteReplicationInvalidRequestSiteReplicationPeerRespSiteReplicationBackendIssueSiteReplicationServiceAccountErrorSiteReplicationBucketConfigErrorSiteReplicationBucketMetaErrorSiteReplicationIAMErrorSiteReplicationConfigMissingAdminBucketQuotaExceededAdminNoSuchQuotaConfigurationHealNotImplementedHealNoSuchProcessHealInvalidClientTokenHealMissingBucketHealAlreadyRunningHealOverlappingPathsIncorrectContinuationTokenEmptyRequestBodyUnsupportedFunctionInvalidExpressionTypeBusyUnauthorizedAccessExpressionTooLongIllegalSQLFunctionArgumentInvalidKeyPathInvalidCompressionFormatInvalidFileHeaderInfoInvalidJSONTypeInvalidQuoteFieldsInvalidRequestParameterInvalidDataTypeInvalidTextEncodingInvalidDataSourceInvalidTableAliasMissingRequiredParameterObjectSerializationConflictUnsupportedSQLOperationUnsupportedSQLStructureUnsupportedSyntaxUnsupportedRangeHeaderLexerInvalidCharLexerInvalidOperatorLexerInvalidLiteralLexerInvalidIONLiteralParseExpectedDatePartParseExpectedKeywordParseExpectedTokenTypeParseExpected2TokenTypesParseExpectedNumberParseExpectedRightParenBuiltinFunctionCallParseExpectedTypeNameParseExpectedWhenClauseParseUnsupportedTokenParseUnsupportedLiteralsGroupByParseExpectedMemberParseUnsupportedSelectParseUnsupportedCaseParseUnsupportedCaseClauseParseUnsupportedAliasParseUnsupportedSyntaxParseUnknownOperatorParseMissingIdentAfterAtParseUnexpectedOperatorParseUnexpectedTermParseUnexpectedTokenParseUnexpectedKeywordParseExpectedExpressionParseExpectedLeftParenAfterCastParseExpectedLeftParenValueConstructorParseExpectedLeftParenBuiltinFunctionCallParseExpectedArgumentDelimiterParseCastArityParseInvalidTypeParamParseEmptySelectParseSelectMissingFromParseExpectedIdentForGroupNameParseExpectedIdentForAliasParseUnsupportedCallWithStarParseNonUnaryAgregateFunctionCallParseMalformedJoinParseExpectedIdentForAtParseAsteriskIsNotAloneInSelectListParseCannotMixSqbAndWildcardInSelectListParseInvalidContextForWildcardInSelectListIncorrectSQLFunctionArgumentTypeValueParseFailureEvaluatorInvalidArgumentsIntegerOverflowLikeInvalidInputsCastFailedInvalidCastEvaluatorInvalidTimestampFormatPatternEvaluatorInvalidTimestampFormatPatternSymbolForParsingEvaluatorTimestampFormatPatternDuplicateFieldsEvaluatorTimestampFormatPatternHourClockAmPmMismatchEvaluatorUnterminatedTimestampFormatPatternTokenEvaluatorInvalidTimestampFormatPatternTokenEvaluatorInvalidTimestampFormatPatternSymbolEvaluatorBindingDoesNotExistMissingHeadersInvalidColumnIndexAdminConfigNotificationTargetsFailedAdminProfilerNotEnabledInvalidDecompressedSizeAddUserInvalidArgumentAdminResourceInvalidArgumentAdminAccountNotEligibleAccountNotEligibleAdminServiceAccountNotFoundPostPolicyConditionInvalidFormatInvalidChecksum" +const _APIErrorCode_name = "NoneAccessDeniedBadDigestEntityTooSmallEntityTooLargePolicyTooLargeIncompleteBodyInternalErrorInvalidAccessKeyIDAccessKeyDisabledInvalidBucketNameInvalidDigestInvalidRangeInvalidRangePartNumberInvalidCopyPartRangeInvalidCopyPartRangeSourceInvalidMaxKeysInvalidEncodingMethodInvalidMaxUploadsInvalidMaxPartsInvalidPartNumberMarkerInvalidPartNumberInvalidRequestBodyInvalidCopySourceInvalidMetadataDirectiveInvalidCopyDestInvalidPolicyDocumentInvalidObjectStateMalformedXMLMissingContentLengthMissingContentMD5MissingRequestBodyErrorMissingSecurityHeaderNoSuchBucketNoSuchBucketPolicyNoSuchBucketLifecycleNoSuchLifecycleConfigurationInvalidLifecycleWithObjectLockNoSuchBucketSSEConfigNoSuchCORSConfigurationNoSuchWebsiteConfigurationReplicationConfigurationNotFoundErrorRemoteDestinationNotFoundErrorReplicationDestinationMissingLockRemoteTargetNotFoundErrorReplicationRemoteConnectionErrorReplicationBandwidthLimitErrorBucketRemoteIdenticalToSourceBucketRemoteAlreadyExistsBucketRemoteLabelInUseBucketRemoteArnTypeInvalidBucketRemoteArnInvalidBucketRemoteRemoveDisallowedRemoteTargetNotVersionedErrorReplicationSourceNotVersionedErrorReplicationNeedsVersioningErrorReplicationBucketNeedsVersioningErrorReplicationDenyEditErrorReplicationNoExistingObjectsObjectRestoreAlreadyInProgressNoSuchKeyNoSuchUploadInvalidVersionIDNoSuchVersionNotImplementedPreconditionFailedRequestTimeTooSkewedSignatureDoesNotMatchMethodNotAllowedInvalidPartInvalidPartOrderAuthorizationHeaderMalformedMalformedPOSTRequestPOSTFileRequiredSignatureVersionNotSupportedBucketNotEmptyAllAccessDisabledMalformedPolicyMissingFieldsMissingCredTagCredMalformedInvalidRegionInvalidServiceS3InvalidServiceSTSInvalidRequestVersionMissingSignTagMissingSignHeadersTagMalformedDateMalformedPresignedDateMalformedCredentialDateMalformedCredentialRegionMalformedExpiresNegativeExpiresAuthHeaderEmptyExpiredPresignRequestRequestNotReadyYetUnsignedHeadersMissingDateHeaderInvalidQuerySignatureAlgoInvalidQueryParamsBucketAlreadyOwnedByYouInvalidDurationBucketAlreadyExistsTooManyBucketsMetadataTooLargeUnsupportedMetadataMaximumExpiresSlowDownInvalidPrefixMarkerBadRequestKeyTooLongErrorInvalidBucketObjectLockConfigurationObjectLockConfigurationNotFoundObjectLockConfigurationNotAllowedNoSuchObjectLockConfigurationObjectLockedInvalidRetentionDatePastObjectLockRetainDateUnknownWORMModeDirectiveBucketTaggingNotFoundObjectLockInvalidHeadersInvalidTagDirectiveInvalidEncryptionMethodInvalidEncryptionKeyIDInsecureSSECustomerRequestSSEMultipartEncryptedSSEEncryptedObjectInvalidEncryptionParametersInvalidSSECustomerAlgorithmInvalidSSECustomerKeyMissingSSECustomerKeyMissingSSECustomerKeyMD5SSECustomerKeyMD5MismatchInvalidSSECustomerParametersIncompatibleEncryptionMethodKMSNotConfiguredKMSKeyNotFoundExceptionNoAccessKeyInvalidTokenEventNotificationARNNotificationRegionNotificationOverlappingFilterNotificationFilterNameInvalidFilterNamePrefixFilterNameSuffixFilterValueInvalidOverlappingConfigsUnsupportedNotificationContentSHA256MismatchContentChecksumMismatchReadQuorumWriteQuorumStorageFullRequestBodyParseObjectExistsAsDirectoryInvalidObjectNameInvalidObjectNamePrefixSlashInvalidResourceNameServerNotInitializedOperationTimedOutClientDisconnectedOperationMaxedOutInvalidRequestTransitionStorageClassNotFoundErrorInvalidStorageClassBackendDownMalformedJSONAdminNoSuchUserAdminNoSuchGroupAdminGroupNotEmptyAdminNoSuchJobAdminNoSuchPolicyAdminInvalidArgumentAdminInvalidAccessKeyAdminInvalidSecretKeyAdminConfigNoQuorumAdminConfigTooLargeAdminConfigBadJSONAdminNoSuchConfigTargetAdminConfigEnvOverriddenAdminConfigDuplicateKeysAdminConfigInvalidIDPTypeAdminConfigLDAPValidationAdminCredentialsMismatchInsecureClientRequestObjectTamperedSiteReplicationInvalidRequestSiteReplicationPeerRespSiteReplicationBackendIssueSiteReplicationServiceAccountErrorSiteReplicationBucketConfigErrorSiteReplicationBucketMetaErrorSiteReplicationIAMErrorSiteReplicationConfigMissingAdminRebalanceAlreadyStartedAdminRebalanceNotStartedAdminBucketQuotaExceededAdminNoSuchQuotaConfigurationHealNotImplementedHealNoSuchProcessHealInvalidClientTokenHealMissingBucketHealAlreadyRunningHealOverlappingPathsIncorrectContinuationTokenEmptyRequestBodyUnsupportedFunctionInvalidExpressionTypeBusyUnauthorizedAccessExpressionTooLongIllegalSQLFunctionArgumentInvalidKeyPathInvalidCompressionFormatInvalidFileHeaderInfoInvalidJSONTypeInvalidQuoteFieldsInvalidRequestParameterInvalidDataTypeInvalidTextEncodingInvalidDataSourceInvalidTableAliasMissingRequiredParameterObjectSerializationConflictUnsupportedSQLOperationUnsupportedSQLStructureUnsupportedSyntaxUnsupportedRangeHeaderLexerInvalidCharLexerInvalidOperatorLexerInvalidLiteralLexerInvalidIONLiteralParseExpectedDatePartParseExpectedKeywordParseExpectedTokenTypeParseExpected2TokenTypesParseExpectedNumberParseExpectedRightParenBuiltinFunctionCallParseExpectedTypeNameParseExpectedWhenClauseParseUnsupportedTokenParseUnsupportedLiteralsGroupByParseExpectedMemberParseUnsupportedSelectParseUnsupportedCaseParseUnsupportedCaseClauseParseUnsupportedAliasParseUnsupportedSyntaxParseUnknownOperatorParseMissingIdentAfterAtParseUnexpectedOperatorParseUnexpectedTermParseUnexpectedTokenParseUnexpectedKeywordParseExpectedExpressionParseExpectedLeftParenAfterCastParseExpectedLeftParenValueConstructorParseExpectedLeftParenBuiltinFunctionCallParseExpectedArgumentDelimiterParseCastArityParseInvalidTypeParamParseEmptySelectParseSelectMissingFromParseExpectedIdentForGroupNameParseExpectedIdentForAliasParseUnsupportedCallWithStarParseNonUnaryAgregateFunctionCallParseMalformedJoinParseExpectedIdentForAtParseAsteriskIsNotAloneInSelectListParseCannotMixSqbAndWildcardInSelectListParseInvalidContextForWildcardInSelectListIncorrectSQLFunctionArgumentTypeValueParseFailureEvaluatorInvalidArgumentsIntegerOverflowLikeInvalidInputsCastFailedInvalidCastEvaluatorInvalidTimestampFormatPatternEvaluatorInvalidTimestampFormatPatternSymbolForParsingEvaluatorTimestampFormatPatternDuplicateFieldsEvaluatorTimestampFormatPatternHourClockAmPmMismatchEvaluatorUnterminatedTimestampFormatPatternTokenEvaluatorInvalidTimestampFormatPatternTokenEvaluatorInvalidTimestampFormatPatternSymbolEvaluatorBindingDoesNotExistMissingHeadersInvalidColumnIndexAdminConfigNotificationTargetsFailedAdminProfilerNotEnabledInvalidDecompressedSizeAddUserInvalidArgumentAdminResourceInvalidArgumentAdminAccountNotEligibleAccountNotEligibleAdminServiceAccountNotFoundPostPolicyConditionInvalidFormatInvalidChecksum" -var _APIErrorCode_index = [...]uint16{0, 4, 16, 25, 39, 53, 67, 81, 94, 112, 129, 146, 159, 171, 193, 213, 239, 253, 274, 291, 306, 329, 346, 364, 381, 405, 420, 441, 459, 471, 491, 508, 531, 552, 564, 582, 603, 631, 661, 682, 705, 731, 768, 798, 831, 856, 888, 918, 947, 972, 994, 1020, 1042, 1070, 1099, 1133, 1164, 1201, 1225, 1253, 1283, 1292, 1304, 1320, 1333, 1347, 1365, 1385, 1406, 1422, 1433, 1449, 1477, 1497, 1513, 1541, 1555, 1572, 1587, 1600, 1614, 1627, 1640, 1656, 1673, 1694, 1708, 1729, 1742, 1764, 1787, 1812, 1828, 1843, 1858, 1879, 1897, 1912, 1929, 1954, 1972, 1995, 2010, 2029, 2043, 2059, 2078, 2092, 2100, 2119, 2129, 2144, 2180, 2211, 2244, 2273, 2285, 2305, 2329, 2353, 2374, 2398, 2417, 2440, 2462, 2488, 2509, 2527, 2554, 2581, 2602, 2623, 2647, 2672, 2700, 2728, 2744, 2767, 2778, 2790, 2807, 2822, 2840, 2869, 2886, 2902, 2918, 2936, 2954, 2977, 2998, 3021, 3031, 3042, 3053, 3069, 3092, 3109, 3137, 3156, 3176, 3193, 3211, 3228, 3242, 3277, 3296, 3307, 3320, 3335, 3351, 3369, 3383, 3400, 3420, 3441, 3462, 3481, 3500, 3518, 3541, 3565, 3589, 3614, 3639, 3663, 3684, 3698, 3727, 3750, 3777, 3811, 3843, 3873, 3896, 3924, 3948, 3977, 3995, 4012, 4034, 4051, 4069, 4089, 4115, 4131, 4150, 4171, 4175, 4193, 4210, 4236, 4250, 4274, 4295, 4310, 4328, 4351, 4366, 4385, 4402, 4419, 4443, 4470, 4493, 4516, 4533, 4555, 4571, 4591, 4610, 4632, 4653, 4673, 4695, 4719, 4738, 4780, 4801, 4824, 4845, 4876, 4895, 4917, 4937, 4963, 4984, 5006, 5026, 5050, 5073, 5092, 5112, 5134, 5157, 5188, 5226, 5267, 5297, 5311, 5332, 5348, 5370, 5400, 5426, 5454, 5487, 5505, 5528, 5563, 5603, 5645, 5677, 5694, 5719, 5734, 5751, 5761, 5772, 5810, 5864, 5910, 5962, 6010, 6053, 6097, 6125, 6139, 6157, 6193, 6216, 6239, 6261, 6289, 6312, 6330, 6357, 6389, 6404} +var _APIErrorCode_index = [...]uint16{0, 4, 16, 25, 39, 53, 67, 81, 94, 112, 129, 146, 159, 171, 193, 213, 239, 253, 274, 291, 306, 329, 346, 364, 381, 405, 420, 441, 459, 471, 491, 508, 531, 552, 564, 582, 603, 631, 661, 682, 705, 731, 768, 798, 831, 856, 888, 918, 947, 972, 994, 1020, 1042, 1070, 1099, 1133, 1164, 1201, 1225, 1253, 1283, 1292, 1304, 1320, 1333, 1347, 1365, 1385, 1406, 1422, 1433, 1449, 1477, 1497, 1513, 1541, 1555, 1572, 1587, 1600, 1614, 1627, 1640, 1656, 1673, 1694, 1708, 1729, 1742, 1764, 1787, 1812, 1828, 1843, 1858, 1879, 1897, 1912, 1929, 1954, 1972, 1995, 2010, 2029, 2043, 2059, 2078, 2092, 2100, 2119, 2129, 2144, 2180, 2211, 2244, 2273, 2285, 2305, 2329, 2353, 2374, 2398, 2417, 2440, 2462, 2488, 2509, 2527, 2554, 2581, 2602, 2623, 2647, 2672, 2700, 2728, 2744, 2767, 2778, 2790, 2807, 2822, 2840, 2869, 2886, 2902, 2918, 2936, 2954, 2977, 2998, 3021, 3031, 3042, 3053, 3069, 3092, 3109, 3137, 3156, 3176, 3193, 3211, 3228, 3242, 3277, 3296, 3307, 3320, 3335, 3351, 3369, 3383, 3400, 3420, 3441, 3462, 3481, 3500, 3518, 3541, 3565, 3589, 3614, 3639, 3663, 3684, 3698, 3727, 3750, 3777, 3811, 3843, 3873, 3896, 3924, 3952, 3976, 4000, 4029, 4047, 4064, 4086, 4103, 4121, 4141, 4167, 4183, 4202, 4223, 4227, 4245, 4262, 4288, 4302, 4326, 4347, 4362, 4380, 4403, 4418, 4437, 4454, 4471, 4495, 4522, 4545, 4568, 4585, 4607, 4623, 4643, 4662, 4684, 4705, 4725, 4747, 4771, 4790, 4832, 4853, 4876, 4897, 4928, 4947, 4969, 4989, 5015, 5036, 5058, 5078, 5102, 5125, 5144, 5164, 5186, 5209, 5240, 5278, 5319, 5349, 5363, 5384, 5400, 5422, 5452, 5478, 5506, 5539, 5557, 5580, 5615, 5655, 5697, 5729, 5746, 5771, 5786, 5803, 5813, 5824, 5862, 5916, 5962, 6014, 6062, 6105, 6149, 6177, 6191, 6209, 6245, 6268, 6291, 6313, 6341, 6364, 6382, 6409, 6441, 6456} func (i APIErrorCode) String() string { if i < 0 || i >= APIErrorCode(len(_APIErrorCode_index)-1) { diff --git a/cmd/config-common.go b/cmd/config-common.go index 8bc629c77..57078b7f4 100644 --- a/cmd/config-common.go +++ b/cmd/config-common.go @@ -29,10 +29,14 @@ import ( var errConfigNotFound = errors.New("config file not found") -func readConfigWithMetadata(ctx context.Context, store objectIO, configFile string) ([]byte, ObjectInfo, error) { - r, err := store.GetObjectNInfo(ctx, minioMetaBucket, configFile, nil, http.Header{}, readLock, ObjectOptions{}) +func readConfigWithMetadata(ctx context.Context, store objectIO, configFile string, opts ObjectOptions) ([]byte, ObjectInfo, error) { + lockType := readLock + if opts.NoLock { + lockType = noLock // erasureObjects.GetObjectNInfo honors lockType argument but not opts.NoLock. + } + + r, err := store.GetObjectNInfo(ctx, minioMetaBucket, configFile, nil, http.Header{}, lockType, opts) if err != nil { - // Treat object not found as config not found. if isErrObjectNotFound(err) { return nil, ObjectInfo{}, errConfigNotFound } @@ -52,7 +56,7 @@ func readConfigWithMetadata(ctx context.Context, store objectIO, configFile stri } func readConfig(ctx context.Context, store objectIO, configFile string) ([]byte, error) { - buf, _, err := readConfigWithMetadata(ctx, store, configFile) + buf, _, err := readConfigWithMetadata(ctx, store, configFile, ObjectOptions{}) return buf, err } @@ -70,16 +74,20 @@ func deleteConfig(ctx context.Context, objAPI objectDeleter, configFile string) return err } -func saveConfig(ctx context.Context, store objectIO, configFile string, data []byte) error { +func saveConfigWithOpts(ctx context.Context, store objectIO, configFile string, data []byte, opts ObjectOptions) error { hashReader, err := hash.NewReader(bytes.NewReader(data), int64(len(data)), "", getSHA256Hash(data), int64(len(data))) if err != nil { return err } - _, err = store.PutObject(ctx, minioMetaBucket, configFile, NewPutObjReader(hashReader), ObjectOptions{MaxParity: true}) + _, err = store.PutObject(ctx, minioMetaBucket, configFile, NewPutObjReader(hashReader), opts) return err } +func saveConfig(ctx context.Context, store objectIO, configFile string, data []byte) error { + return saveConfigWithOpts(ctx, store, configFile, data, ObjectOptions{MaxParity: true}) +} + func checkConfig(ctx context.Context, objAPI ObjectLayer, configFile string) error { if _, err := objAPI.GetObjectInfo(ctx, minioMetaBucket, configFile, ObjectOptions{}); err != nil { // Treat object not found as config not found. diff --git a/cmd/erasure-server-pool-decom.go b/cmd/erasure-server-pool-decom.go index 31cbe9609..2877c34c9 100644 --- a/cmd/erasure-server-pool-decom.go +++ b/cmd/erasure-server-pool-decom.go @@ -499,6 +499,15 @@ const ( // Init() initializes pools and saves additional information about them // in 'pool.bin', this is eventually used for decommissioning the pool. func (z *erasureServerPools) Init(ctx context.Context) error { + // Load rebalance metadata if present + err := z.loadRebalanceMeta(ctx) + if err != nil { + return fmt.Errorf("failed to load rebalance data: %w", err) + } + + // Start rebalance routine + z.StartRebalance() + meta := poolMeta{} if err := meta.load(ctx, z.serverPools[0], z.serverPools); err != nil { @@ -573,6 +582,19 @@ func (z *erasureServerPools) Init(ctx context.Context) error { return nil } +func (z *erasureServerPools) IsDecommissionRunning() bool { + z.poolMetaMutex.RLock() + defer z.poolMetaMutex.RUnlock() + meta := z.poolMeta + for _, pool := range meta.Pools { + if pool.Decommission != nil { + return true + } + } + + return false +} + func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket string, gr *GetObjectReader) (err error) { objInfo := gr.ObjInfo diff --git a/cmd/erasure-server-pool-rebalance.go b/cmd/erasure-server-pool-rebalance.go new file mode 100644 index 000000000..aad8fa2b0 --- /dev/null +++ b/cmd/erasure-server-pool-rebalance.go @@ -0,0 +1,876 @@ +// Copyright (c) 2015-2022 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "math" + "math/rand" + "net/http" + "strconv" + "strings" + "sync" + "time" + + "github.com/lithammer/shortuuid/v4" + "github.com/minio/madmin-go" + "github.com/minio/minio/internal/bucket/lifecycle" + "github.com/minio/minio/internal/hash" + "github.com/minio/minio/internal/logger" + "github.com/minio/pkg/env" +) + +//go:generate msgp -file $GOFILE -unexported + +// rebalanceStats contains per-pool rebalance statistics like number of objects, +// versions and bytes rebalanced out of a pool +type rebalanceStats struct { + InitFreeSpace uint64 `json:"initFreeSpace" msg:"ifs"` // Pool free space at the start of rebalance + InitCapacity uint64 `json:"initCapacity" msg:"ic"` // Pool capacity at the start of rebalance + + Buckets []string `json:"buckets" msg:"bus"` // buckets being rebalanced or to be rebalanced + RebalancedBuckets []string `json:"rebalancedBuckets" msg:"rbs"` // buckets rebalanced + Bucket string `json:"bucket" msg:"bu"` // Last rebalanced bucket + Object string `json:"object" msg:"ob"` // Last rebalanced object + NumObjects uint64 `json:"numObjects" msg:"no"` // Number of objects rebalanced + NumVersions uint64 `json:"numVersions" msg:"nv"` // Number of versions rebalanced + Bytes uint64 `json:"bytes" msg:"bs"` // Number of bytes rebalanced + Participating bool `json:"participating" msg:"par"` + Info rebalanceInfo `json:"info" msg:"inf"` +} + +func (rs *rebalanceStats) update(bucket string, oi ObjectInfo) { + if oi.IsLatest { + rs.NumObjects++ + } + + rs.NumVersions++ + rs.Bytes += uint64(oi.Size) + rs.Bucket = bucket + rs.Object = oi.Name +} + +type rstats []*rebalanceStats + +//go:generate stringer -type=rebalStatus -trimprefix=rebal $GOFILE +type rebalStatus uint8 + +const ( + rebalNone rebalStatus = iota + rebalStarted + rebalCompleted + rebalStopped + rebalFailed +) + +type rebalanceInfo struct { + StartTime time.Time `msg:"startTs"` // Time at which rebalance-start was issued + EndTime time.Time `msg:"stopTs"` // Time at which rebalance operation completed or rebalance-stop was called + Status rebalStatus `msg:"status"` // Current state of rebalance operation. One of Started|Stopped|Completed|Failed. +} + +// rebalanceMeta contains information pertaining to an ongoing rebalance operation. +type rebalanceMeta struct { + cancel context.CancelFunc `msg:"-"` // to be invoked on rebalance-stop + lastRefreshedAt time.Time `msg:"-"` + StoppedAt time.Time `msg:"stopTs"` // Time when rebalance-stop was issued. + ID string `msg:"id"` // ID of the ongoing rebalance operation + PercentFreeGoal float64 `msg:"pf"` // Computed from total free space and capacity at the start of rebalance + PoolStats []*rebalanceStats `msg:"rss"` // Per-pool rebalance stats keyed by pool index +} + +var errRebalanceNotStarted = errors.New("rebalance not started") + +func (z *erasureServerPools) loadRebalanceMeta(ctx context.Context) error { + r := &rebalanceMeta{} + err := r.load(ctx, z.serverPools[0]) + if err != nil { + if errors.Is(err, errConfigNotFound) { + return nil + } + return err + } + + z.rebalMu.Lock() + z.rebalMeta = r + z.rebalMu.Unlock() + + return nil +} + +// initRebalanceMeta initializes rebalance metadata for a new rebalance +// operation and saves it in the object store. +func (z *erasureServerPools) initRebalanceMeta(ctx context.Context, buckets []string) (arn string, err error) { + r := &rebalanceMeta{ + ID: shortuuid.New(), + PoolStats: make([]*rebalanceStats, len(z.serverPools)), + } + + // Fetch disk capacity and available space. + si, _ := z.StorageInfo(ctx) + diskStats := make([]struct { + AvailableSpace uint64 + TotalSpace uint64 + }, len(z.serverPools)) + var totalCap, totalFree uint64 + for _, disk := range si.Disks { + totalCap += disk.TotalSpace + totalFree += disk.AvailableSpace + + diskStats[disk.PoolIndex].AvailableSpace += disk.AvailableSpace + diskStats[disk.PoolIndex].TotalSpace += disk.TotalSpace + } + r.PercentFreeGoal = float64(totalFree) / float64(totalCap) + + now := time.Now() + for idx := range z.serverPools { + r.PoolStats[idx] = &rebalanceStats{ + Buckets: make([]string, len(buckets)), + RebalancedBuckets: make([]string, 0, len(buckets)), + InitFreeSpace: diskStats[idx].AvailableSpace, + InitCapacity: diskStats[idx].TotalSpace, + } + copy(r.PoolStats[idx].Buckets, buckets) + + if pfi := float64(diskStats[idx].AvailableSpace) / float64(diskStats[idx].TotalSpace); pfi < r.PercentFreeGoal { + r.PoolStats[idx].Participating = true + r.PoolStats[idx].Info = rebalanceInfo{ + StartTime: now, + Status: rebalStarted, + } + } + } + + err = r.save(ctx, z.serverPools[0]) + if err != nil { + return arn, err + } + + z.rebalMeta = r + return r.ID, nil +} + +func (z *erasureServerPools) updatePoolStats(poolIdx int, bucket string, oi ObjectInfo) { + z.rebalMu.Lock() + defer z.rebalMu.Unlock() + + r := z.rebalMeta + if r == nil { + return + } + + r.PoolStats[poolIdx].update(bucket, oi) +} + +const ( + rebalMetaName = "rebalance.bin" + rebalMetaFmt = 1 + rebalMetaVer = 1 +) + +func (z *erasureServerPools) nextRebalBucket(poolIdx int) (string, bool) { + z.rebalMu.RLock() + defer z.rebalMu.RUnlock() + + r := z.rebalMeta + if r == nil { + return "", false + } + + ps := r.PoolStats[poolIdx] + if ps == nil { + return "", false + } + + if ps.Info.Status == rebalCompleted || !ps.Participating { + return "", false + } + + if len(ps.Buckets) == 0 { + return "", false + } + + return ps.Buckets[0], true +} + +func (z *erasureServerPools) bucketRebalanceDone(bucket string, poolIdx int) { + z.rebalMu.Lock() + defer z.rebalMu.Unlock() + + ps := z.rebalMeta.PoolStats[poolIdx] + if ps == nil { + return + } + + for i, b := range ps.Buckets { + if b == bucket { + ps.Buckets = append(ps.Buckets[:i], ps.Buckets[i+1:]...) + ps.RebalancedBuckets = append(ps.RebalancedBuckets, bucket) + break + } + } +} + +func (r *rebalanceMeta) load(ctx context.Context, store objectIO) error { + return r.loadWithOpts(ctx, store, ObjectOptions{}) +} + +func (r *rebalanceMeta) loadWithOpts(ctx context.Context, store objectIO, opts ObjectOptions) error { + data, _, err := readConfigWithMetadata(ctx, store, rebalMetaName, opts) + if err != nil { + return err + } + + if len(data) == 0 { + return nil + } + if len(data) <= 4 { + return fmt.Errorf("rebalanceMeta: no data") + } + + // Read header + switch binary.LittleEndian.Uint16(data[0:2]) { + case rebalMetaFmt: + default: + return fmt.Errorf("rebalanceMeta: unknown format: %d", binary.LittleEndian.Uint16(data[0:2])) + } + switch binary.LittleEndian.Uint16(data[2:4]) { + case rebalMetaVer: + default: + return fmt.Errorf("rebalanceMeta: unknown version: %d", binary.LittleEndian.Uint16(data[2:4])) + } + + // OK, parse data. + if _, err = r.UnmarshalMsg(data[4:]); err != nil { + return err + } + + r.lastRefreshedAt = time.Now() + + return nil +} + +func (r *rebalanceMeta) saveWithOpts(ctx context.Context, store objectIO, opts ObjectOptions) error { + data := make([]byte, 4, r.Msgsize()+4) + + // Initialize the header. + binary.LittleEndian.PutUint16(data[0:2], rebalMetaFmt) + binary.LittleEndian.PutUint16(data[2:4], rebalMetaVer) + + buf, err := r.MarshalMsg(data) + if err != nil { + return err + } + + return saveConfigWithOpts(ctx, store, rebalMetaName, buf, opts) +} + +func (r *rebalanceMeta) save(ctx context.Context, store objectIO) error { + return r.saveWithOpts(ctx, store, ObjectOptions{}) +} + +func (z *erasureServerPools) IsRebalanceStarted() bool { + z.rebalMu.RLock() + defer z.rebalMu.RUnlock() + + if r := z.rebalMeta; r != nil { + if r.StoppedAt.IsZero() { + return true + } + } + return false +} + +func (z *erasureServerPools) IsPoolRebalancing(poolIndex int) bool { + z.rebalMu.RLock() + defer z.rebalMu.RUnlock() + + if r := z.rebalMeta; r != nil { + if !r.StoppedAt.IsZero() { + return false + } + ps := z.rebalMeta.PoolStats[poolIndex] + return ps.Participating && ps.Info.Status == rebalStarted + } + return false +} + +func (z *erasureServerPools) rebalanceBuckets(ctx context.Context, poolIdx int) (err error) { + doneCh := make(chan struct{}) + defer close(doneCh) + + // Save rebalance.bin periodically. + go func() { + // Update rebalance.bin periodically once every 5-10s, chosen randomly + // to avoid multiple pool leaders herding to update around the same + // time. + r := rand.New(rand.NewSource(time.Now().UnixNano())) + randSleepFor := func() time.Duration { + return 5*time.Second + time.Duration(float64(5*time.Second)*r.Float64()) + } + + timer := time.NewTimer(randSleepFor()) + defer timer.Stop() + var rebalDone bool + var traceMsg string + + for { + select { + case <-doneCh: + // rebalance completed for poolIdx + now := time.Now() + z.rebalMu.Lock() + z.rebalMeta.PoolStats[poolIdx].Info.Status = rebalCompleted + z.rebalMeta.PoolStats[poolIdx].Info.EndTime = now + z.rebalMu.Unlock() + + rebalDone = true + traceMsg = fmt.Sprintf("completed at %s", now) + + case <-ctx.Done(): + + // rebalance stopped for poolIdx + now := time.Now() + z.rebalMu.Lock() + z.rebalMeta.PoolStats[poolIdx].Info.Status = rebalStopped + z.rebalMeta.PoolStats[poolIdx].Info.EndTime = now + z.rebalMu.Unlock() + + rebalDone = true + traceMsg = fmt.Sprintf("stopped at %s", now) + + case <-timer.C: + traceMsg = fmt.Sprintf("saved at %s", time.Now()) + } + + stopFn := globalRebalanceMetrics.log(rebalanceMetricSaveMetadata, poolIdx, traceMsg) + err := z.saveRebalanceStats(ctx, poolIdx, rebalSaveStats) + stopFn(err) + logger.LogIf(ctx, err) + timer.Reset(randSleepFor()) + + if rebalDone { + return + } + } + }() + + for { + select { + case <-ctx.Done(): + return + default: + } + + bucket, ok := z.nextRebalBucket(poolIdx) + if !ok { + // no more buckets to rebalance or target free_space/capacity reached + break + } + + stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceBucket, poolIdx, bucket) + err = z.rebalanceBucket(ctx, bucket, poolIdx) + if err != nil { + stopFn(err) + logger.LogIf(ctx, err) + return + } + stopFn(nil) + z.bucketRebalanceDone(bucket, poolIdx) + } + + return err +} + +func (z *erasureServerPools) checkIfRebalanceDone(poolIdx int) bool { + z.rebalMu.Lock() + defer z.rebalMu.Unlock() + + // check if enough objects have been rebalanced + r := z.rebalMeta + poolStats := r.PoolStats[poolIdx] + if poolStats.Info.Status == rebalCompleted { + return true + } + + pfi := float64(poolStats.InitFreeSpace+poolStats.Bytes) / float64(poolStats.InitCapacity) + // Mark pool rebalance as done if within 5% from PercentFreeGoal. + if diff := math.Abs(pfi - r.PercentFreeGoal); diff <= 0.05 { + r.PoolStats[poolIdx].Info.Status = rebalCompleted + r.PoolStats[poolIdx].Info.EndTime = time.Now() + return true + } + + return false +} + +// rebalanceBucket rebalances objects under bucket in poolIdx pool +func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string, poolIdx int) error { + ctx = logger.SetReqInfo(ctx, &logger.ReqInfo{}) + vc, _ := globalBucketVersioningSys.Get(bucket) + // Check if the current bucket has a configured lifecycle policy + lc, _ := globalLifecycleSys.Get(bucket) + // Check if bucket is object locked. + lr, _ := globalBucketObjectLockSys.Get(bucket) + + pool := z.serverPools[poolIdx] + const envRebalanceWorkers = "_MINIO_REBALANCE_WORKERS" + wStr := env.Get(envRebalanceWorkers, strconv.Itoa(len(pool.sets))) + workerSize, err := strconv.Atoi(wStr) + if err != nil { + logger.LogIf(ctx, fmt.Errorf("invalid %s value: %s err: %v, defaulting to %d", envRebalanceWorkers, wStr, err, len(pool.sets))) + workerSize = len(pool.sets) + } + workers := make(chan struct{}, workerSize) + var wg sync.WaitGroup + for _, set := range pool.sets { + set := set + disks := set.getOnlineDisks() + if len(disks) == 0 { + logger.LogIf(ctx, fmt.Errorf("no online disks found for set with endpoints %s", + set.getEndpoints())) + continue + } + + filterLifecycle := func(bucket, object string, fi FileInfo) bool { + if lc == nil { + return false + } + versioned := vc != nil && vc.Versioned(object) + objInfo := fi.ToObjectInfo(bucket, object, versioned) + event := evalActionFromLifecycle(ctx, *lc, lr, objInfo) + switch action := event.Action; action { + case lifecycle.DeleteVersionAction, lifecycle.DeleteAction: + globalExpiryState.enqueueByDays(objInfo, false, action == lifecycle.DeleteVersionAction) + // Skip this entry. + return true + case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction: + globalExpiryState.enqueueByDays(objInfo, true, action == lifecycle.DeleteRestoredVersionAction) + // Skip this entry. + return true + } + return false + } + + rebalanceEntry := func(entry metaCacheEntry) { + defer func() { + <-workers + wg.Done() + }() + + if entry.isDir() { + return + } + + // rebalance on poolIdx has reached its goal + if z.checkIfRebalanceDone(poolIdx) { + return + } + + fivs, err := entry.fileInfoVersions(bucket) + if err != nil { + return + } + + // We need a reversed order for rebalance, + // to create the appropriate stack. + versionsSorter(fivs.Versions).reverse() + + var rebalanced int + for _, version := range fivs.Versions { + // Skip transitioned objects for now. TBD + if version.IsRemote() { + continue + } + + // Apply lifecycle rules on the objects that are expired. + if filterLifecycle(bucket, version.Name, version) { + logger.LogIf(ctx, fmt.Errorf("found %s/%s (%s) expired object based on ILM rules, skipping and scheduled for deletion", bucket, version.Name, version.VersionID)) + continue + } + + // We will skip rebalancing delete markers + // with single version, its as good as there + // is no data associated with the object. + if version.Deleted && len(fivs.Versions) == 1 { + logger.LogIf(ctx, fmt.Errorf("found %s/%s delete marked object with no other versions, skipping since there is no data to be rebalanced", bucket, version.Name)) + continue + } + + if version.Deleted { + _, err := z.DeleteObject(ctx, + bucket, + version.Name, + ObjectOptions{ + Versioned: vc.PrefixEnabled(version.Name), + VersionID: version.VersionID, + MTime: version.ModTime, + DeleteReplication: version.ReplicationState, + DeleteMarker: true, // make sure we create a delete marker + SkipRebalancing: true, // make sure we skip the decommissioned pool + }) + var failure bool + if err != nil && !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { + logger.LogIf(ctx, err) + failure = true + } + + if !failure { + z.updatePoolStats(poolIdx, bucket, version.ToObjectInfo(bucket, version.Name, vc.PrefixEnabled(version.Name))) + rebalanced++ + } + continue + } + + var failure bool + var oi ObjectInfo + for try := 0; try < 3; try++ { + // GetObjectReader.Close is called by rebalanceObject + gr, err := set.GetObjectNInfo(ctx, + bucket, + encodeDirObject(version.Name), + nil, + http.Header{}, + noLock, // all mutations are blocked reads are safe without locks. + ObjectOptions{ + VersionID: version.VersionID, + NoDecryption: true, + }) + if isErrObjectNotFound(err) || isErrVersionNotFound(err) { + // object deleted by the application, nothing to do here we move on. + return + } + if err != nil { + failure = true + logger.LogIf(ctx, err) + continue + } + + stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceObject, poolIdx, bucket, version.Name) + if err = z.rebalanceObject(ctx, bucket, gr); err != nil { + stopFn(err) + failure = true + logger.LogIf(ctx, err) + continue + } + + stopFn(nil) + failure = false + oi = gr.ObjInfo + break + } + + if failure { + break // break out on first error + } + z.updatePoolStats(poolIdx, bucket, oi) + rebalanced++ + } + + // if all versions were rebalanced, we can delete the object versions. + if rebalanced == len(fivs.Versions) { + stopFn := globalRebalanceMetrics.log(rebalanceMetricRebalanceRemoveObject, poolIdx, bucket, entry.name) + _, err := set.DeleteObject(ctx, + bucket, + encodeDirObject(entry.name), + ObjectOptions{ + DeletePrefix: true, // use prefix delete to delete all versions at once. + }, + ) + stopFn(err) + auditLogRebalance(ctx, "Rebalance:DeleteObject", bucket, entry.name, "", err) + if err != nil { + logger.LogIf(ctx, err) + } + } + } + + wg.Add(1) + go func() { + defer wg.Done() + + // How to resolve partial results. + resolver := metadataResolutionParams{ + dirQuorum: len(disks) / 2, // make sure to capture all quorum ratios + objQuorum: len(disks) / 2, // make sure to capture all quorum ratios + bucket: bucket, + } + err := listPathRaw(ctx, listPathRawOptions{ + disks: disks, + bucket: bucket, + recursive: true, + forwardTo: "", + minDisks: len(disks) / 2, // to capture all quorum ratios + reportNotFound: false, + agreed: func(entry metaCacheEntry) { + workers <- struct{}{} + wg.Add(1) + go rebalanceEntry(entry) + }, + partial: func(entries metaCacheEntries, _ []error) { + entry, ok := entries.resolve(&resolver) + if ok { + workers <- struct{}{} + wg.Add(1) + go rebalanceEntry(*entry) + } + }, + finished: nil, + }) + logger.LogIf(ctx, err) + }() + } + wg.Wait() + return nil +} + +type rebalSaveOpts uint8 + +const ( + rebalSaveStats rebalSaveOpts = iota + rebalSaveStoppedAt +) + +func (z *erasureServerPools) saveRebalanceStats(ctx context.Context, poolIdx int, opts rebalSaveOpts) error { + lock := z.serverPools[0].NewNSLock(minioMetaBucket, rebalMetaName) + lkCtx, err := lock.GetLock(ctx, globalOperationTimeout) + if err != nil { + logger.LogIf(ctx, fmt.Errorf("failed to acquire write lock on %s/%s: %w", minioMetaBucket, rebalMetaName, err)) + return err + } + defer lock.Unlock(lkCtx.Cancel) + + ctx = lkCtx.Context() + noLockOpts := ObjectOptions{NoLock: true} + r := &rebalanceMeta{} + if err := r.loadWithOpts(ctx, z.serverPools[0], noLockOpts); err != nil { + return err + } + + z.rebalMu.Lock() + defer z.rebalMu.Unlock() + + switch opts { + case rebalSaveStoppedAt: + r.StoppedAt = time.Now() + case rebalSaveStats: + r.PoolStats[poolIdx] = z.rebalMeta.PoolStats[poolIdx] + } + z.rebalMeta = r + + err = z.rebalMeta.saveWithOpts(ctx, z.serverPools[0], noLockOpts) + return err +} + +func auditLogRebalance(ctx context.Context, apiName, bucket, object, versionID string, err error) { + errStr := "" + if err != nil { + errStr = err.Error() + } + auditLogInternal(ctx, AuditLogOptions{ + Event: "rebalance", + APIName: apiName, + Bucket: bucket, + Object: object, + VersionID: versionID, + Error: errStr, + }) +} + +func (z *erasureServerPools) rebalanceObject(ctx context.Context, bucket string, gr *GetObjectReader) (err error) { + oi := gr.ObjInfo + + defer func() { + gr.Close() + auditLogRebalance(ctx, "RebalanceCopyData", oi.Bucket, oi.Name, oi.VersionID, err) + }() + + actualSize, err := oi.GetActualSize() + if err != nil { + return err + } + + if oi.isMultipart() { + res, err := z.NewMultipartUpload(ctx, bucket, oi.Name, ObjectOptions{ + VersionID: oi.VersionID, + MTime: oi.ModTime, + UserDefined: oi.UserDefined, + }) + if err != nil { + return fmt.Errorf("rebalanceObject: NewMultipartUpload() %w", err) + } + defer z.AbortMultipartUpload(ctx, bucket, oi.Name, res.UploadID, ObjectOptions{}) + + parts := make([]CompletePart, len(oi.Parts)) + for i, part := range oi.Parts { + hr, err := hash.NewReader(gr, part.Size, "", "", part.ActualSize) + if err != nil { + return fmt.Errorf("rebalanceObject: hash.NewReader() %w", err) + } + pi, err := z.PutObjectPart(ctx, bucket, oi.Name, res.UploadID, + part.Number, + NewPutObjReader(hr), + ObjectOptions{ + PreserveETag: part.ETag, // Preserve original ETag to ensure same metadata. + IndexCB: func() []byte { + return part.Index // Preserve part Index to ensure decompression works. + }, + }) + if err != nil { + return fmt.Errorf("rebalanceObject: PutObjectPart() %w", err) + } + parts[i] = CompletePart{ + ETag: pi.ETag, + PartNumber: pi.PartNumber, + } + } + _, err = z.CompleteMultipartUpload(ctx, bucket, oi.Name, res.UploadID, parts, ObjectOptions{ + MTime: oi.ModTime, + }) + if err != nil { + err = fmt.Errorf("rebalanceObject: CompleteMultipartUpload() %w", err) + } + return err + } + + hr, err := hash.NewReader(gr, oi.Size, "", "", actualSize) + if err != nil { + return fmt.Errorf("rebalanceObject: hash.NewReader() %w", err) + } + _, err = z.PutObject(ctx, + bucket, + oi.Name, + NewPutObjReader(hr), + ObjectOptions{ + VersionID: oi.VersionID, + MTime: oi.ModTime, + UserDefined: oi.UserDefined, + PreserveETag: oi.ETag, // Preserve original ETag to ensure same metadata. + IndexCB: func() []byte { + return oi.Parts[0].Index // Preserve part Index to ensure decompression works. + }, + }) + if err != nil { + err = fmt.Errorf("rebalanceObject: PutObject() %w", err) + } + return err +} + +func (z *erasureServerPools) StartRebalance() { + z.rebalMu.Lock() + if z.rebalMeta == nil || !z.rebalMeta.StoppedAt.IsZero() { // rebalance not running, nothing to do + z.rebalMu.Unlock() + return + } + ctx, cancel := context.WithCancel(GlobalContext) + z.rebalMeta.cancel = cancel // to be used when rebalance-stop is called + z.rebalMu.Unlock() + + z.rebalMu.RLock() + participants := make([]bool, len(z.rebalMeta.PoolStats)) + for i, ps := range z.rebalMeta.PoolStats { + // skip pools which have completed rebalancing + if ps.Info.Status != rebalStarted { + continue + } + + participants[i] = ps.Participating + } + z.rebalMu.RUnlock() + + for poolIdx, doRebalance := range participants { + if !doRebalance { + continue + } + // nothing to do if this node is not pool's first node (i.e pool's rebalance 'leader'). + if !globalEndpoints[poolIdx].Endpoints[0].IsLocal { + continue + } + + go func(idx int) { + stopfn := globalRebalanceMetrics.log(rebalanceMetricRebalanceBuckets, idx) + err := z.rebalanceBuckets(ctx, idx) + stopfn(err) + }(poolIdx) + } + return +} + +// StopRebalance signals the rebalance goroutine running on this node (if any) +// to stop, using the context.CancelFunc(s) saved at the time ofStartRebalance. +func (z *erasureServerPools) StopRebalance() error { + z.rebalMu.Lock() + defer z.rebalMu.Unlock() + + r := z.rebalMeta + if r == nil { // rebalance not running in this node, nothing to do + return nil + } + + if cancel := r.cancel; cancel != nil { + // cancel != nil only on pool leaders + r.cancel = nil + cancel() + } + return nil +} + +// for rebalance trace support +type rebalanceMetrics struct{} + +var globalRebalanceMetrics rebalanceMetrics + +//go:generate stringer -type=rebalanceMetric -trimprefix=rebalanceMetric $GOFILE +type rebalanceMetric uint8 + +const ( + rebalanceMetricRebalanceBuckets rebalanceMetric = iota + rebalanceMetricRebalanceBucket + rebalanceMetricRebalanceObject + rebalanceMetricRebalanceRemoveObject + rebalanceMetricSaveMetadata +) + +func rebalanceTrace(r rebalanceMetric, poolIdx int, startTime time.Time, duration time.Duration, err error, path string) madmin.TraceInfo { + var errStr string + if err != nil { + errStr = err.Error() + } + return madmin.TraceInfo{ + TraceType: madmin.TraceRebalance, + Time: startTime, + NodeName: globalLocalNodeName, + FuncName: fmt.Sprintf("rebalance.%s (pool-id=%d)", r.String(), poolIdx), + Duration: duration, + Path: path, + Error: errStr, + } +} + +func (p *rebalanceMetrics) log(r rebalanceMetric, poolIdx int, paths ...string) func(err error) { + startTime := time.Now() + return func(err error) { + duration := time.Since(startTime) + if globalTrace.NumSubscribers(madmin.TraceRebalance) > 0 { + globalTrace.Publish(rebalanceTrace(r, poolIdx, startTime, duration, err, strings.Join(paths, " "))) + } + } +} diff --git a/cmd/erasure-server-pool-rebalance_gen.go b/cmd/erasure-server-pool-rebalance_gen.go new file mode 100644 index 000000000..a0cbd56f4 --- /dev/null +++ b/cmd/erasure-server-pool-rebalance_gen.go @@ -0,0 +1,1228 @@ +package cmd + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "github.com/tinylib/msgp/msgp" +) + +// DecodeMsg implements msgp.Decodable +func (z *rebalSaveOpts) DecodeMsg(dc *msgp.Reader) (err error) { + { + var zb0001 uint8 + zb0001, err = dc.ReadUint8() + if err != nil { + err = msgp.WrapError(err) + return + } + (*z) = rebalSaveOpts(zb0001) + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z rebalSaveOpts) EncodeMsg(en *msgp.Writer) (err error) { + err = en.WriteUint8(uint8(z)) + if err != nil { + err = msgp.WrapError(err) + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z rebalSaveOpts) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + o = msgp.AppendUint8(o, uint8(z)) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *rebalSaveOpts) UnmarshalMsg(bts []byte) (o []byte, err error) { + { + var zb0001 uint8 + zb0001, bts, err = msgp.ReadUint8Bytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + (*z) = rebalSaveOpts(zb0001) + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z rebalSaveOpts) Msgsize() (s int) { + s = msgp.Uint8Size + return +} + +// DecodeMsg implements msgp.Decodable +func (z *rebalStatus) DecodeMsg(dc *msgp.Reader) (err error) { + { + var zb0001 uint8 + zb0001, err = dc.ReadUint8() + if err != nil { + err = msgp.WrapError(err) + return + } + (*z) = rebalStatus(zb0001) + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z rebalStatus) EncodeMsg(en *msgp.Writer) (err error) { + err = en.WriteUint8(uint8(z)) + if err != nil { + err = msgp.WrapError(err) + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z rebalStatus) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + o = msgp.AppendUint8(o, uint8(z)) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *rebalStatus) UnmarshalMsg(bts []byte) (o []byte, err error) { + { + var zb0001 uint8 + zb0001, bts, err = msgp.ReadUint8Bytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + (*z) = rebalStatus(zb0001) + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z rebalStatus) Msgsize() (s int) { + s = msgp.Uint8Size + return +} + +// DecodeMsg implements msgp.Decodable +func (z *rebalanceInfo) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "startTs": + z.StartTime, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "StartTime") + return + } + case "stopTs": + z.EndTime, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "EndTime") + return + } + case "status": + { + var zb0002 uint8 + zb0002, err = dc.ReadUint8() + if err != nil { + err = msgp.WrapError(err, "Status") + return + } + z.Status = rebalStatus(zb0002) + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z rebalanceInfo) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 3 + // write "startTs" + err = en.Append(0x83, 0xa7, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x73) + if err != nil { + return + } + err = en.WriteTime(z.StartTime) + if err != nil { + err = msgp.WrapError(err, "StartTime") + return + } + // write "stopTs" + err = en.Append(0xa6, 0x73, 0x74, 0x6f, 0x70, 0x54, 0x73) + if err != nil { + return + } + err = en.WriteTime(z.EndTime) + if err != nil { + err = msgp.WrapError(err, "EndTime") + return + } + // write "status" + err = en.Append(0xa6, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73) + if err != nil { + return + } + err = en.WriteUint8(uint8(z.Status)) + if err != nil { + err = msgp.WrapError(err, "Status") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z rebalanceInfo) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 3 + // string "startTs" + o = append(o, 0x83, 0xa7, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x73) + o = msgp.AppendTime(o, z.StartTime) + // string "stopTs" + o = append(o, 0xa6, 0x73, 0x74, 0x6f, 0x70, 0x54, 0x73) + o = msgp.AppendTime(o, z.EndTime) + // string "status" + o = append(o, 0xa6, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73) + o = msgp.AppendUint8(o, uint8(z.Status)) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *rebalanceInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "startTs": + z.StartTime, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "StartTime") + return + } + case "stopTs": + z.EndTime, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "EndTime") + return + } + case "status": + { + var zb0002 uint8 + zb0002, bts, err = msgp.ReadUint8Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "Status") + return + } + z.Status = rebalStatus(zb0002) + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z rebalanceInfo) Msgsize() (s int) { + s = 1 + 8 + msgp.TimeSize + 7 + msgp.TimeSize + 7 + msgp.Uint8Size + return +} + +// DecodeMsg implements msgp.Decodable +func (z *rebalanceMeta) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "stopTs": + z.StoppedAt, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "StoppedAt") + return + } + case "id": + z.ID, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "ID") + return + } + case "pf": + z.PercentFreeGoal, err = dc.ReadFloat64() + if err != nil { + err = msgp.WrapError(err, "PercentFreeGoal") + return + } + case "rss": + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "PoolStats") + return + } + if cap(z.PoolStats) >= int(zb0002) { + z.PoolStats = (z.PoolStats)[:zb0002] + } else { + z.PoolStats = make([]*rebalanceStats, zb0002) + } + for za0001 := range z.PoolStats { + if dc.IsNil() { + err = dc.ReadNil() + if err != nil { + err = msgp.WrapError(err, "PoolStats", za0001) + return + } + z.PoolStats[za0001] = nil + } else { + if z.PoolStats[za0001] == nil { + z.PoolStats[za0001] = new(rebalanceStats) + } + err = z.PoolStats[za0001].DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "PoolStats", za0001) + return + } + } + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *rebalanceMeta) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 4 + // write "stopTs" + err = en.Append(0x84, 0xa6, 0x73, 0x74, 0x6f, 0x70, 0x54, 0x73) + if err != nil { + return + } + err = en.WriteTime(z.StoppedAt) + if err != nil { + err = msgp.WrapError(err, "StoppedAt") + return + } + // write "id" + err = en.Append(0xa2, 0x69, 0x64) + if err != nil { + return + } + err = en.WriteString(z.ID) + if err != nil { + err = msgp.WrapError(err, "ID") + return + } + // write "pf" + err = en.Append(0xa2, 0x70, 0x66) + if err != nil { + return + } + err = en.WriteFloat64(z.PercentFreeGoal) + if err != nil { + err = msgp.WrapError(err, "PercentFreeGoal") + return + } + // write "rss" + err = en.Append(0xa3, 0x72, 0x73, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.PoolStats))) + if err != nil { + err = msgp.WrapError(err, "PoolStats") + return + } + for za0001 := range z.PoolStats { + if z.PoolStats[za0001] == nil { + err = en.WriteNil() + if err != nil { + return + } + } else { + err = z.PoolStats[za0001].EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "PoolStats", za0001) + return + } + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *rebalanceMeta) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 4 + // string "stopTs" + o = append(o, 0x84, 0xa6, 0x73, 0x74, 0x6f, 0x70, 0x54, 0x73) + o = msgp.AppendTime(o, z.StoppedAt) + // string "id" + o = append(o, 0xa2, 0x69, 0x64) + o = msgp.AppendString(o, z.ID) + // string "pf" + o = append(o, 0xa2, 0x70, 0x66) + o = msgp.AppendFloat64(o, z.PercentFreeGoal) + // string "rss" + o = append(o, 0xa3, 0x72, 0x73, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.PoolStats))) + for za0001 := range z.PoolStats { + if z.PoolStats[za0001] == nil { + o = msgp.AppendNil(o) + } else { + o, err = z.PoolStats[za0001].MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "PoolStats", za0001) + return + } + } + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *rebalanceMeta) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "stopTs": + z.StoppedAt, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "StoppedAt") + return + } + case "id": + z.ID, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "ID") + return + } + case "pf": + z.PercentFreeGoal, bts, err = msgp.ReadFloat64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "PercentFreeGoal") + return + } + case "rss": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "PoolStats") + return + } + if cap(z.PoolStats) >= int(zb0002) { + z.PoolStats = (z.PoolStats)[:zb0002] + } else { + z.PoolStats = make([]*rebalanceStats, zb0002) + } + for za0001 := range z.PoolStats { + if msgp.IsNil(bts) { + bts, err = msgp.ReadNilBytes(bts) + if err != nil { + return + } + z.PoolStats[za0001] = nil + } else { + if z.PoolStats[za0001] == nil { + z.PoolStats[za0001] = new(rebalanceStats) + } + bts, err = z.PoolStats[za0001].UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "PoolStats", za0001) + return + } + } + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *rebalanceMeta) Msgsize() (s int) { + s = 1 + 7 + msgp.TimeSize + 3 + msgp.StringPrefixSize + len(z.ID) + 3 + msgp.Float64Size + 4 + msgp.ArrayHeaderSize + for za0001 := range z.PoolStats { + if z.PoolStats[za0001] == nil { + s += msgp.NilSize + } else { + s += z.PoolStats[za0001].Msgsize() + } + } + return +} + +// DecodeMsg implements msgp.Decodable +func (z *rebalanceMetric) DecodeMsg(dc *msgp.Reader) (err error) { + { + var zb0001 uint8 + zb0001, err = dc.ReadUint8() + if err != nil { + err = msgp.WrapError(err) + return + } + (*z) = rebalanceMetric(zb0001) + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z rebalanceMetric) EncodeMsg(en *msgp.Writer) (err error) { + err = en.WriteUint8(uint8(z)) + if err != nil { + err = msgp.WrapError(err) + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z rebalanceMetric) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + o = msgp.AppendUint8(o, uint8(z)) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *rebalanceMetric) UnmarshalMsg(bts []byte) (o []byte, err error) { + { + var zb0001 uint8 + zb0001, bts, err = msgp.ReadUint8Bytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + (*z) = rebalanceMetric(zb0001) + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z rebalanceMetric) Msgsize() (s int) { + s = msgp.Uint8Size + return +} + +// DecodeMsg implements msgp.Decodable +func (z *rebalanceMetrics) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z rebalanceMetrics) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 0 + err = en.Append(0x80) + if err != nil { + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z rebalanceMetrics) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 0 + o = append(o, 0x80) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *rebalanceMetrics) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z rebalanceMetrics) Msgsize() (s int) { + s = 1 + return +} + +// DecodeMsg implements msgp.Decodable +func (z *rebalanceStats) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "ifs": + z.InitFreeSpace, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "InitFreeSpace") + return + } + case "ic": + z.InitCapacity, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "InitCapacity") + return + } + case "bus": + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "Buckets") + return + } + if cap(z.Buckets) >= int(zb0002) { + z.Buckets = (z.Buckets)[:zb0002] + } else { + z.Buckets = make([]string, zb0002) + } + for za0001 := range z.Buckets { + z.Buckets[za0001], err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Buckets", za0001) + return + } + } + case "rbs": + var zb0003 uint32 + zb0003, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "RebalancedBuckets") + return + } + if cap(z.RebalancedBuckets) >= int(zb0003) { + z.RebalancedBuckets = (z.RebalancedBuckets)[:zb0003] + } else { + z.RebalancedBuckets = make([]string, zb0003) + } + for za0002 := range z.RebalancedBuckets { + z.RebalancedBuckets[za0002], err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "RebalancedBuckets", za0002) + return + } + } + case "bu": + z.Bucket, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "ob": + z.Object, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Object") + return + } + case "no": + z.NumObjects, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "NumObjects") + return + } + case "nv": + z.NumVersions, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "NumVersions") + return + } + case "bs": + z.Bytes, err = dc.ReadUint64() + if err != nil { + err = msgp.WrapError(err, "Bytes") + return + } + case "par": + z.Participating, err = dc.ReadBool() + if err != nil { + err = msgp.WrapError(err, "Participating") + return + } + case "inf": + err = z.Info.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Info") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *rebalanceStats) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 11 + // write "ifs" + err = en.Append(0x8b, 0xa3, 0x69, 0x66, 0x73) + if err != nil { + return + } + err = en.WriteUint64(z.InitFreeSpace) + if err != nil { + err = msgp.WrapError(err, "InitFreeSpace") + return + } + // write "ic" + err = en.Append(0xa2, 0x69, 0x63) + if err != nil { + return + } + err = en.WriteUint64(z.InitCapacity) + if err != nil { + err = msgp.WrapError(err, "InitCapacity") + return + } + // write "bus" + err = en.Append(0xa3, 0x62, 0x75, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.Buckets))) + if err != nil { + err = msgp.WrapError(err, "Buckets") + return + } + for za0001 := range z.Buckets { + err = en.WriteString(z.Buckets[za0001]) + if err != nil { + err = msgp.WrapError(err, "Buckets", za0001) + return + } + } + // write "rbs" + err = en.Append(0xa3, 0x72, 0x62, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.RebalancedBuckets))) + if err != nil { + err = msgp.WrapError(err, "RebalancedBuckets") + return + } + for za0002 := range z.RebalancedBuckets { + err = en.WriteString(z.RebalancedBuckets[za0002]) + if err != nil { + err = msgp.WrapError(err, "RebalancedBuckets", za0002) + return + } + } + // write "bu" + err = en.Append(0xa2, 0x62, 0x75) + if err != nil { + return + } + err = en.WriteString(z.Bucket) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + // write "ob" + err = en.Append(0xa2, 0x6f, 0x62) + if err != nil { + return + } + err = en.WriteString(z.Object) + if err != nil { + err = msgp.WrapError(err, "Object") + return + } + // write "no" + err = en.Append(0xa2, 0x6e, 0x6f) + if err != nil { + return + } + err = en.WriteUint64(z.NumObjects) + if err != nil { + err = msgp.WrapError(err, "NumObjects") + return + } + // write "nv" + err = en.Append(0xa2, 0x6e, 0x76) + if err != nil { + return + } + err = en.WriteUint64(z.NumVersions) + if err != nil { + err = msgp.WrapError(err, "NumVersions") + return + } + // write "bs" + err = en.Append(0xa2, 0x62, 0x73) + if err != nil { + return + } + err = en.WriteUint64(z.Bytes) + if err != nil { + err = msgp.WrapError(err, "Bytes") + return + } + // write "par" + err = en.Append(0xa3, 0x70, 0x61, 0x72) + if err != nil { + return + } + err = en.WriteBool(z.Participating) + if err != nil { + err = msgp.WrapError(err, "Participating") + return + } + // write "inf" + err = en.Append(0xa3, 0x69, 0x6e, 0x66) + if err != nil { + return + } + err = z.Info.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Info") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *rebalanceStats) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 11 + // string "ifs" + o = append(o, 0x8b, 0xa3, 0x69, 0x66, 0x73) + o = msgp.AppendUint64(o, z.InitFreeSpace) + // string "ic" + o = append(o, 0xa2, 0x69, 0x63) + o = msgp.AppendUint64(o, z.InitCapacity) + // string "bus" + o = append(o, 0xa3, 0x62, 0x75, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.Buckets))) + for za0001 := range z.Buckets { + o = msgp.AppendString(o, z.Buckets[za0001]) + } + // string "rbs" + o = append(o, 0xa3, 0x72, 0x62, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.RebalancedBuckets))) + for za0002 := range z.RebalancedBuckets { + o = msgp.AppendString(o, z.RebalancedBuckets[za0002]) + } + // string "bu" + o = append(o, 0xa2, 0x62, 0x75) + o = msgp.AppendString(o, z.Bucket) + // string "ob" + o = append(o, 0xa2, 0x6f, 0x62) + o = msgp.AppendString(o, z.Object) + // string "no" + o = append(o, 0xa2, 0x6e, 0x6f) + o = msgp.AppendUint64(o, z.NumObjects) + // string "nv" + o = append(o, 0xa2, 0x6e, 0x76) + o = msgp.AppendUint64(o, z.NumVersions) + // string "bs" + o = append(o, 0xa2, 0x62, 0x73) + o = msgp.AppendUint64(o, z.Bytes) + // string "par" + o = append(o, 0xa3, 0x70, 0x61, 0x72) + o = msgp.AppendBool(o, z.Participating) + // string "inf" + o = append(o, 0xa3, 0x69, 0x6e, 0x66) + o, err = z.Info.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Info") + return + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *rebalanceStats) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "ifs": + z.InitFreeSpace, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "InitFreeSpace") + return + } + case "ic": + z.InitCapacity, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "InitCapacity") + return + } + case "bus": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Buckets") + return + } + if cap(z.Buckets) >= int(zb0002) { + z.Buckets = (z.Buckets)[:zb0002] + } else { + z.Buckets = make([]string, zb0002) + } + for za0001 := range z.Buckets { + z.Buckets[za0001], bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Buckets", za0001) + return + } + } + case "rbs": + var zb0003 uint32 + zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "RebalancedBuckets") + return + } + if cap(z.RebalancedBuckets) >= int(zb0003) { + z.RebalancedBuckets = (z.RebalancedBuckets)[:zb0003] + } else { + z.RebalancedBuckets = make([]string, zb0003) + } + for za0002 := range z.RebalancedBuckets { + z.RebalancedBuckets[za0002], bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "RebalancedBuckets", za0002) + return + } + } + case "bu": + z.Bucket, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "ob": + z.Object, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Object") + return + } + case "no": + z.NumObjects, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "NumObjects") + return + } + case "nv": + z.NumVersions, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "NumVersions") + return + } + case "bs": + z.Bytes, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "Bytes") + return + } + case "par": + z.Participating, bts, err = msgp.ReadBoolBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Participating") + return + } + case "inf": + bts, err = z.Info.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Info") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *rebalanceStats) Msgsize() (s int) { + s = 1 + 4 + msgp.Uint64Size + 3 + msgp.Uint64Size + 4 + msgp.ArrayHeaderSize + for za0001 := range z.Buckets { + s += msgp.StringPrefixSize + len(z.Buckets[za0001]) + } + s += 4 + msgp.ArrayHeaderSize + for za0002 := range z.RebalancedBuckets { + s += msgp.StringPrefixSize + len(z.RebalancedBuckets[za0002]) + } + s += 3 + msgp.StringPrefixSize + len(z.Bucket) + 3 + msgp.StringPrefixSize + len(z.Object) + 3 + msgp.Uint64Size + 3 + msgp.Uint64Size + 3 + msgp.Uint64Size + 4 + msgp.BoolSize + 4 + z.Info.Msgsize() + return +} + +// DecodeMsg implements msgp.Decodable +func (z *rstats) DecodeMsg(dc *msgp.Reader) (err error) { + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + if cap((*z)) >= int(zb0002) { + (*z) = (*z)[:zb0002] + } else { + (*z) = make(rstats, zb0002) + } + for zb0001 := range *z { + if dc.IsNil() { + err = dc.ReadNil() + if err != nil { + err = msgp.WrapError(err, zb0001) + return + } + (*z)[zb0001] = nil + } else { + if (*z)[zb0001] == nil { + (*z)[zb0001] = new(rebalanceStats) + } + err = (*z)[zb0001].DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, zb0001) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z rstats) EncodeMsg(en *msgp.Writer) (err error) { + err = en.WriteArrayHeader(uint32(len(z))) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0003 := range z { + if z[zb0003] == nil { + err = en.WriteNil() + if err != nil { + return + } + } else { + err = z[zb0003].EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, zb0003) + return + } + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z rstats) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + o = msgp.AppendArrayHeader(o, uint32(len(z))) + for zb0003 := range z { + if z[zb0003] == nil { + o = msgp.AppendNil(o) + } else { + o, err = z[zb0003].MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, zb0003) + return + } + } + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *rstats) UnmarshalMsg(bts []byte) (o []byte, err error) { + var zb0002 uint32 + zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + if cap((*z)) >= int(zb0002) { + (*z) = (*z)[:zb0002] + } else { + (*z) = make(rstats, zb0002) + } + for zb0001 := range *z { + if msgp.IsNil(bts) { + bts, err = msgp.ReadNilBytes(bts) + if err != nil { + return + } + (*z)[zb0001] = nil + } else { + if (*z)[zb0001] == nil { + (*z)[zb0001] = new(rebalanceStats) + } + bts, err = (*z)[zb0001].UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, zb0001) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z rstats) Msgsize() (s int) { + s = msgp.ArrayHeaderSize + for zb0003 := range z { + if z[zb0003] == nil { + s += msgp.NilSize + } else { + s += z[zb0003].Msgsize() + } + } + return +} diff --git a/cmd/erasure-server-pool-rebalance_gen_test.go b/cmd/erasure-server-pool-rebalance_gen_test.go new file mode 100644 index 000000000..0f0b8f466 --- /dev/null +++ b/cmd/erasure-server-pool-rebalance_gen_test.go @@ -0,0 +1,575 @@ +package cmd + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "bytes" + "testing" + + "github.com/tinylib/msgp/msgp" +) + +func TestMarshalUnmarshalrebalanceInfo(t *testing.T) { + v := rebalanceInfo{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgrebalanceInfo(b *testing.B) { + v := rebalanceInfo{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgrebalanceInfo(b *testing.B) { + v := rebalanceInfo{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalrebalanceInfo(b *testing.B) { + v := rebalanceInfo{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecoderebalanceInfo(t *testing.T) { + v := rebalanceInfo{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecoderebalanceInfo Msgsize() is inaccurate") + } + + vn := rebalanceInfo{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncoderebalanceInfo(b *testing.B) { + v := rebalanceInfo{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecoderebalanceInfo(b *testing.B) { + v := rebalanceInfo{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + +func TestMarshalUnmarshalrebalanceMeta(t *testing.T) { + v := rebalanceMeta{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgrebalanceMeta(b *testing.B) { + v := rebalanceMeta{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgrebalanceMeta(b *testing.B) { + v := rebalanceMeta{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalrebalanceMeta(b *testing.B) { + v := rebalanceMeta{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecoderebalanceMeta(t *testing.T) { + v := rebalanceMeta{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecoderebalanceMeta Msgsize() is inaccurate") + } + + vn := rebalanceMeta{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncoderebalanceMeta(b *testing.B) { + v := rebalanceMeta{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecoderebalanceMeta(b *testing.B) { + v := rebalanceMeta{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + +func TestMarshalUnmarshalrebalanceMetrics(t *testing.T) { + v := rebalanceMetrics{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgrebalanceMetrics(b *testing.B) { + v := rebalanceMetrics{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgrebalanceMetrics(b *testing.B) { + v := rebalanceMetrics{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalrebalanceMetrics(b *testing.B) { + v := rebalanceMetrics{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecoderebalanceMetrics(t *testing.T) { + v := rebalanceMetrics{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecoderebalanceMetrics Msgsize() is inaccurate") + } + + vn := rebalanceMetrics{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncoderebalanceMetrics(b *testing.B) { + v := rebalanceMetrics{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecoderebalanceMetrics(b *testing.B) { + v := rebalanceMetrics{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + +func TestMarshalUnmarshalrebalanceStats(t *testing.T) { + v := rebalanceStats{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgrebalanceStats(b *testing.B) { + v := rebalanceStats{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgrebalanceStats(b *testing.B) { + v := rebalanceStats{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalrebalanceStats(b *testing.B) { + v := rebalanceStats{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecoderebalanceStats(t *testing.T) { + v := rebalanceStats{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecoderebalanceStats Msgsize() is inaccurate") + } + + vn := rebalanceStats{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncoderebalanceStats(b *testing.B) { + v := rebalanceStats{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecoderebalanceStats(b *testing.B) { + v := rebalanceStats{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + +func TestMarshalUnmarshalrstats(t *testing.T) { + v := rstats{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgrstats(b *testing.B) { + v := rstats{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgrstats(b *testing.B) { + v := rstats{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalrstats(b *testing.B) { + v := rstats{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecoderstats(t *testing.T) { + v := rstats{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecoderstats Msgsize() is inaccurate") + } + + vn := rstats{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncoderstats(b *testing.B) { + v := rstats{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecoderstats(b *testing.B) { + v := rstats{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index f57bda199..c4b33dc35 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -44,7 +44,11 @@ import ( type erasureServerPools struct { poolMetaMutex sync.RWMutex poolMeta poolMeta - serverPools []*erasureSets + + rebalMu sync.RWMutex + rebalMeta *rebalanceMeta + + serverPools []*erasureSets // Shut down async operations shutdown context.CancelFunc @@ -327,8 +331,9 @@ func (z *erasureServerPools) getServerPoolsAvailableSpace(ctx context.Context, b g := errgroup.WithNErrs(len(z.serverPools)) for index := range z.serverPools { index := index - // skip suspended pools for any new I/O. - if z.IsSuspended(index) { + // Skip suspended pools or pools participating in rebalance for any new + // I/O. + if z.IsSuspended(index) || z.IsPoolRebalancing(index) { continue } pool := z.serverPools[index] @@ -426,6 +431,10 @@ func (z *erasureServerPools) getPoolInfoExistingWithOpts(ctx context.Context, bu if z.IsSuspended(pinfo.Index) && opts.SkipDecommissioned { continue } + // Skip object if it's from pools participating in a rebalance operation. + if opts.SkipRebalancing && z.IsPoolRebalancing(pinfo.Index) { + continue + } if pinfo.Err != nil && !isErrObjectNotFound(pinfo.Err) { return pinfo, pinfo.Err @@ -466,6 +475,7 @@ func (z *erasureServerPools) getPoolIdxExistingNoLock(ctx context.Context, bucke return z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{ NoLock: true, SkipDecommissioned: true, + SkipRebalancing: true, }) } @@ -489,7 +499,10 @@ func (z *erasureServerPools) getPoolIdxNoLock(ctx context.Context, bucket, objec // if none are found falls back to most available space pool, this function is // designed to be only used by PutObject, CopyObject (newObject creation) and NewMultipartUpload. func (z *erasureServerPools) getPoolIdx(ctx context.Context, bucket, object string, size int64) (idx int, err error) { - idx, err = z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{SkipDecommissioned: true}) + idx, err = z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{ + SkipDecommissioned: true, + SkipRebalancing: true, + }) if err != nil && !isErrObjectNotFound(err) { return idx, err } @@ -1387,9 +1400,10 @@ func (z *erasureServerPools) NewMultipartUpload(ctx context.Context, bucket, obj } for idx, pool := range z.serverPools { - if z.IsSuspended(idx) { + if z.IsSuspended(idx) || z.IsPoolRebalancing(idx) { continue } + result, err := pool.ListMultipartUploads(ctx, bucket, object, "", "", "", maxUploadsList) if err != nil { return nil, err diff --git a/cmd/iam-object-store.go b/cmd/iam-object-store.go index 617a007a4..c4c6efee7 100644 --- a/cmd/iam-object-store.go +++ b/cmd/iam-object-store.go @@ -91,7 +91,7 @@ func (iamOS *IAMObjectStore) saveIAMConfig(ctx context.Context, item interface{} } func (iamOS *IAMObjectStore) loadIAMConfigBytesWithMetadata(ctx context.Context, objPath string) ([]byte, ObjectInfo, error) { - data, meta, err := readConfigWithMetadata(ctx, iamOS.objAPI, objPath) + data, meta, err := readConfigWithMetadata(ctx, iamOS.objAPI, objPath, ObjectOptions{}) if err != nil { return nil, meta, err } diff --git a/cmd/notification.go b/cmd/notification.go index cfbf2dc24..e15cb9c54 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -630,6 +630,49 @@ func (sys *NotificationSys) ReloadPoolMeta(ctx context.Context) { } } +// StopRebalance notifies all MinIO nodes to signal any ongoing rebalance +// goroutine to stop. +func (sys *NotificationSys) StopRebalance(ctx context.Context) { + ng := WithNPeers(len(sys.peerClients)) + for idx, client := range sys.peerClients { + if client == nil { + continue + } + client := client + ng.Go(ctx, func() error { + return client.StopRebalance(ctx) + }, idx, *client.host) + } + for _, nErr := range ng.Wait() { + reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", nErr.Host.String()) + if nErr.Err != nil { + logger.LogIf(logger.SetReqInfo(ctx, reqInfo), nErr.Err) + } + } +} + +// LoadRebalanceMeta notifies all peers to load rebalance.bin from object layer. +// Note: Only peers participating in rebalance operation, namely the first node +// in each pool will load rebalance.bin. +func (sys *NotificationSys) LoadRebalanceMeta(ctx context.Context, startRebalance bool) { + ng := WithNPeers(len(sys.peerClients)) + for idx, client := range sys.peerClients { + if client == nil { + continue + } + client := client + ng.Go(ctx, func() error { + return client.LoadRebalanceMeta(ctx, startRebalance) + }, idx, *client.host) + } + for _, nErr := range ng.Wait() { + reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", nErr.Host.String()) + if nErr.Err != nil { + logger.LogIf(logger.SetReqInfo(ctx, reqInfo), nErr.Err) + } + } +} + // LoadTransitionTierConfig notifies remote peers to load their remote tier // configs from config store. func (sys *NotificationSys) LoadTransitionTierConfig(ctx context.Context) { diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 9b3d6329c..0236e63fd 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -85,6 +85,9 @@ type ObjectOptions struct { // SkipDecommissioned set to 'true' if the call requires skipping the pool being decommissioned. // mainly set for certain WRITE operations. SkipDecommissioned bool + // SkipRebalancing should be set to 'true' if the call should skip pools + // participating in a rebalance operation. Typically set for 'write' operations. + SkipRebalancing bool WalkFilter func(info FileInfo) bool // return WalkFilter returns 'true/false' WalkMarker string // set to skip until this object diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 943bc7215..b2ed6961e 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -551,6 +551,28 @@ func (client *peerRESTClient) ReloadPoolMeta(ctx context.Context) error { return nil } +func (client *peerRESTClient) StopRebalance(ctx context.Context) error { + respBody, err := client.callWithContext(ctx, peerRESTMethodStopRebalance, nil, nil, 0) + if err != nil { + logger.LogIf(ctx, err) + return err + } + defer http.DrainBody(respBody) + return nil +} + +func (client *peerRESTClient) LoadRebalanceMeta(ctx context.Context, startRebalance bool) error { + values := url.Values{} + values.Set(peerRESTStartRebalance, strconv.FormatBool(startRebalance)) + respBody, err := client.callWithContext(ctx, peerRESTMethodLoadRebalanceMeta, values, nil, 0) + if err != nil { + logger.LogIf(ctx, err) + return err + } + defer http.DrainBody(respBody) + return nil +} + func (client *peerRESTClient) LoadTransitionTierConfig(ctx context.Context) error { respBody, err := client.callWithContext(ctx, peerRESTMethodLoadTransitionTierConfig, nil, nil, 0) if err != nil { diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go index 28901c4bc..11724c826 100644 --- a/cmd/peer-rest-common.go +++ b/cmd/peer-rest-common.go @@ -18,7 +18,8 @@ package cmd const ( - peerRESTVersion = "v27" // change in GetAllBucketStats response. + peerRESTVersion = "v28" // Added Rebalance peer APIs + peerRESTVersionPrefix = SlashSeparator + peerRESTVersion peerRESTPrefix = minioReservedBucketPath + "/peer" peerRESTPath = peerRESTPrefix + peerRESTVersionPrefix @@ -68,6 +69,8 @@ const ( peerRESTMethodDriveSpeedTest = "/drivespeedtest" peerRESTMethodReloadSiteReplicationConfig = "/reloadsitereplicationconfig" peerRESTMethodReloadPoolMeta = "/reloadpoolmeta" + peerRESTMethodLoadRebalanceMeta = "/loadrebalancemeta" + peerRESTMethodStopRebalance = "/stoprebalance" peerRESTMethodGetLastDayTierStats = "/getlastdaytierstats" peerRESTMethodDevNull = "/devnull" peerRESTMethodNetperf = "/netperf" @@ -75,25 +78,26 @@ const ( ) const ( - peerRESTBucket = "bucket" - peerRESTBuckets = "buckets" - peerRESTUser = "user" - peerRESTGroup = "group" - peerRESTUserTemp = "user-temp" - peerRESTPolicy = "policy" - peerRESTUserOrGroup = "user-or-group" - peerRESTUserType = "user-type" - peerRESTIsGroup = "is-group" - peerRESTSignal = "signal" - peerRESTSubSys = "sub-sys" - peerRESTProfiler = "profiler" - peerRESTSize = "size" - peerRESTConcurrent = "concurrent" - peerRESTDuration = "duration" - peerRESTStorageClass = "storage-class" - peerRESTMetricsTypes = "types" - peerRESTDisk = "disk" - peerRESTJobID = "job-id" + peerRESTBucket = "bucket" + peerRESTBuckets = "buckets" + peerRESTUser = "user" + peerRESTGroup = "group" + peerRESTUserTemp = "user-temp" + peerRESTPolicy = "policy" + peerRESTUserOrGroup = "user-or-group" + peerRESTUserType = "user-type" + peerRESTIsGroup = "is-group" + peerRESTSignal = "signal" + peerRESTSubSys = "sub-sys" + peerRESTProfiler = "profiler" + peerRESTSize = "size" + peerRESTConcurrent = "concurrent" + peerRESTDuration = "duration" + peerRESTStorageClass = "storage-class" + peerRESTMetricsTypes = "types" + peerRESTDisk = "disk" + peerRESTJobID = "job-id" + peerRESTStartRebalance = "start-rebalance" peerRESTListenBucket = "bucket" peerRESTListenPrefix = "prefix" diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 0af707fcc..adc907663 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -1048,6 +1048,60 @@ func (s *peerRESTServer) ReloadPoolMetaHandler(w http.ResponseWriter, r *http.Re } } +func (s *peerRESTServer) StopRebalanceHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("invalid request")) + return + } + + objAPI := newObjectLayerFn() + if objAPI == nil { + s.writeErrorResponse(w, errServerNotInitialized) + return + } + pools, ok := objAPI.(*erasureServerPools) + if !ok { + s.writeErrorResponse(w, errors.New("not a multiple pools setup")) + return + } + + pools.StopRebalance() +} + +func (s *peerRESTServer) LoadRebalanceMetaHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + s.writeErrorResponse(w, errors.New("invalid request")) + return + } + + objAPI := newObjectLayerFn() + if objAPI == nil { + s.writeErrorResponse(w, errServerNotInitialized) + return + } + + pools, ok := objAPI.(*erasureServerPools) + if !ok { + s.writeErrorResponse(w, errors.New("not a multiple pools setup")) + return + } + + startRebalanceStr := r.Form.Get(peerRESTStartRebalance) + startRebalance, err := strconv.ParseBool(startRebalanceStr) + if err != nil { + s.writeErrorResponse(w, err) + return + } + + if err := pools.loadRebalanceMeta(r.Context()); err != nil { + s.writeErrorResponse(w, err) + return + } + if startRebalance { + go pools.StartRebalance() + } +} + func (s *peerRESTServer) LoadTransitionTierConfigHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { s.writeErrorResponse(w, errors.New("invalid request")) @@ -1352,5 +1406,7 @@ func registerPeerRESTHandlers(router *mux.Router) { subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodDevNull).HandlerFunc(httpTraceHdrs(server.DevNull)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadSiteReplicationConfig).HandlerFunc(httpTraceHdrs(server.ReloadSiteReplicationConfigHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadPoolMeta).HandlerFunc(httpTraceHdrs(server.ReloadPoolMetaHandler)) + subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodLoadRebalanceMeta).HandlerFunc(httpTraceHdrs(server.LoadRebalanceMetaHandler)) + subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodStopRebalance).HandlerFunc(httpTraceHdrs(server.StopRebalanceHandler)) subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetLastDayTierStats).HandlerFunc(httpTraceHdrs(server.GetLastDayTierStatsHandler)) } diff --git a/cmd/rebalance-admin.go b/cmd/rebalance-admin.go new file mode 100644 index 000000000..ff22abcae --- /dev/null +++ b/cmd/rebalance-admin.go @@ -0,0 +1,108 @@ +// Copyright (c) 2022 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "context" + "time" +) + +type rebalPoolProgress struct { + NumObjects uint64 `json:"objects"` + NumVersions uint64 `json:"versions"` + Bytes uint64 `json:"bytes"` + Bucket string `json:"bucket"` + Object string `json:"object"` + Elapsed time.Duration `json:"elapsed"` + ETA time.Duration `json:"eta"` +} + +type rebalancePoolStatus struct { + ID int `json:"id"` // Pool index (zero-based) + Status string `json:"status"` // Active if rebalance is running, empty otherwise + Used float64 `json:"used"` // Percentage used space + Progress rebalPoolProgress `json:"progress,omitempty"` // is empty when rebalance is not running +} + +// rebalanceAdminStatus holds rebalance status related information exported to mc, console, etc. +type rebalanceAdminStatus struct { + ID string // identifies the ongoing rebalance operation by a uuid + Pools []rebalancePoolStatus `json:"pools"` // contains all pools, including inactive + StoppedAt time.Time `json:"stoppedAt,omitempty"` +} + +func rebalanceStatus(ctx context.Context, z *erasureServerPools) (r rebalanceAdminStatus, err error) { + // Load latest rebalance status + meta := &rebalanceMeta{} + err = meta.load(ctx, z.serverPools[0]) + if err != nil { + return r, err + } + + // Compute disk usage percentage + si, _ := z.StorageInfo(ctx) + diskStats := make([]struct { + AvailableSpace uint64 + TotalSpace uint64 + }, len(z.serverPools)) + for _, disk := range si.Disks { + diskStats[disk.PoolIndex].AvailableSpace += disk.AvailableSpace + diskStats[disk.PoolIndex].TotalSpace += disk.TotalSpace + } + + stopTime := meta.StoppedAt + r = rebalanceAdminStatus{ + ID: meta.ID, + StoppedAt: meta.StoppedAt, + Pools: make([]rebalancePoolStatus, len(meta.PoolStats)), + } + for i, ps := range meta.PoolStats { + r.Pools[i] = rebalancePoolStatus{ + ID: i, + Status: ps.Info.Status.String(), + Used: float64(diskStats[i].TotalSpace-diskStats[i].AvailableSpace) / float64(diskStats[i].TotalSpace), + } + if !ps.Participating { + continue + } + // for participating pools, total bytes to be rebalanced by this pool is given by, + // pf_c = (f_i + x)/c_i, + // pf_c - percentage free space across pools, f_i - ith pool's free space, c_i - ith pool's capacity + // i.e. x = c_i*pfc -f_i + totalBytesToRebal := float64(ps.InitCapacity)*meta.PercentFreeGoal - float64(ps.InitFreeSpace) + elapsed := time.Since(ps.Info.StartTime) + eta := time.Duration(totalBytesToRebal * float64(elapsed) / float64(ps.Bytes)) + if !ps.Info.EndTime.IsZero() { + stopTime = ps.Info.EndTime + } + + if !stopTime.IsZero() { // rebalance is stopped or completed + elapsed = stopTime.Sub(ps.Info.StartTime) + eta = 0 + } + + r.Pools[i].Progress = rebalPoolProgress{ + NumObjects: ps.NumObjects, + NumVersions: ps.NumVersions, + Bytes: ps.Bytes, + Elapsed: elapsed, + ETA: eta, + } + } + return r, nil +} diff --git a/cmd/rebalancemetric_string.go b/cmd/rebalancemetric_string.go new file mode 100644 index 000000000..930e04341 --- /dev/null +++ b/cmd/rebalancemetric_string.go @@ -0,0 +1,27 @@ +// Code generated by "stringer -type=rebalanceMetric -trimprefix=rebalanceMetric erasure-server-pool-rebalance.go"; DO NOT EDIT. + +package cmd + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[rebalanceMetricRebalanceBuckets-0] + _ = x[rebalanceMetricRebalanceBucket-1] + _ = x[rebalanceMetricRebalanceObject-2] + _ = x[rebalanceMetricRebalanceRemoveObject-3] + _ = x[rebalanceMetricSaveMetadata-4] +} + +const _rebalanceMetric_name = "RebalanceBucketsRebalanceBucketRebalanceObjectRebalanceRemoveObjectSaveMetadata" + +var _rebalanceMetric_index = [...]uint8{0, 16, 31, 46, 67, 79} + +func (i rebalanceMetric) String() string { + if i >= rebalanceMetric(len(_rebalanceMetric_index)-1) { + return "rebalanceMetric(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _rebalanceMetric_name[_rebalanceMetric_index[i]:_rebalanceMetric_index[i+1]] +} diff --git a/cmd/rebalstatus_string.go b/cmd/rebalstatus_string.go new file mode 100644 index 000000000..0dc74b217 --- /dev/null +++ b/cmd/rebalstatus_string.go @@ -0,0 +1,27 @@ +// Code generated by "stringer -type=rebalStatus -trimprefix=rebal erasure-server-pool-rebalance.go"; DO NOT EDIT. + +package cmd + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[rebalNone-0] + _ = x[rebalStarted-1] + _ = x[rebalCompleted-2] + _ = x[rebalStopped-3] + _ = x[rebalFailed-4] +} + +const _rebalStatus_name = "NoneStartedCompletedStoppedFailed" + +var _rebalStatus_index = [...]uint8{0, 4, 11, 20, 27, 33} + +func (i rebalStatus) String() string { + if i >= rebalStatus(len(_rebalStatus_index)-1) { + return "rebalStatus(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _rebalStatus_name[_rebalStatus_index[i]:_rebalStatus_index[i+1]] +}