From dbea8d2ee07cd0ae6df42db13300c9bfdfa1bd9a Mon Sep 17 00:00:00 2001 From: Poorna Krishnamoorthy Date: Tue, 1 Jun 2021 19:59:11 -0700 Subject: [PATCH] Add support for existing object replication. (#12109) Also adding an API to allow resyncing replication when existing object replication is enabled and the remote target is entirely lost. With the `mc replicate reset` command, the objects that are eligible for replication as per the replication config will be resynced to target if existing object replication is enabled on the rule. --- cmd/api-errors.go | 6 + cmd/api-router.go | 3 + cmd/apierrorcode_string.go | 447 +++++++++++---------- cmd/bucket-handlers.go | 90 ++++- cmd/bucket-replication.go | 244 +++++++++-- cmd/bucket-replication_test.go | 209 ++++++++++ cmd/data-scanner.go | 72 +++- cmd/data-usage-cache.go | 3 +- cmd/object-api-datatypes.go | 1 + cmd/object-handlers.go | 43 +- cmd/web-handlers.go | 6 +- cmd/xl-storage.go | 14 +- docs/bucket/replication/DESIGN.md | 6 +- docs/bucket/replication/README.md | 15 + go.mod | 2 +- go.sum | 2 + internal/bucket/replication/replication.go | 29 +- internal/bucket/replication/rule.go | 67 ++- internal/http/headers.go | 3 + 19 files changed, 928 insertions(+), 334 deletions(-) create mode 100644 cmd/bucket-replication_test.go diff --git a/cmd/api-errors.go b/cmd/api-errors.go index 03e2442ed..2cc9d3cdb 100644 --- a/cmd/api-errors.go +++ b/cmd/api-errors.go @@ -125,6 +125,7 @@ const ( ErrReplicationSourceNotVersionedError ErrReplicationNeedsVersioningError ErrReplicationBucketNeedsVersioningError + ErrReplicationNoMatchingRuleError ErrObjectRestoreAlreadyInProgress ErrNoSuchKey ErrNoSuchUpload @@ -859,6 +860,11 @@ var errorCodes = errorCodeMap{ Description: "Remote service connection error - please check remote service credentials and target bucket", HTTPStatusCode: http.StatusNotFound, }, + ErrReplicationNoMatchingRuleError: { + Code: "XMinioReplicationNoMatchingRule", + Description: "No matching replication rule found for this object prefix", + HTTPStatusCode: http.StatusBadRequest, + }, ErrBucketRemoteIdenticalToSource: { Code: "XMinioAdminRemoteIdenticalToSource", Description: "The remote target cannot be identical to source", diff --git a/cmd/api-router.go b/cmd/api-router.go index bdaab8d39..f6f24c79d 100644 --- a/cmd/api-router.go +++ b/cmd/api-router.go @@ -394,6 +394,9 @@ func registerAPIRouter(router *mux.Router) { // PutBucketNotification router.Methods(http.MethodPut).HandlerFunc( collectAPIStats("putbucketnotification", maxClients(httpTraceAll(api.PutBucketNotificationHandler)))).Queries("notification", "") + // ResetBucketReplicationState - MinIO extension API + router.Methods(http.MethodPut).HandlerFunc( + collectAPIStats("resetbucketreplicationstate", maxClients(httpTraceAll(api.ResetBucketReplicationStateHandler)))).Queries("replication-reset", "") // PutBucket router.Methods(http.MethodPut).HandlerFunc( collectAPIStats("putbucket", maxClients(httpTraceAll(api.PutBucketHandler)))) diff --git a/cmd/apierrorcode_string.go b/cmd/apierrorcode_string.go index 1023da1f9..0b50cf5a5 100644 --- a/cmd/apierrorcode_string.go +++ b/cmd/apierrorcode_string.go @@ -62,232 +62,233 @@ func _() { _ = x[ErrReplicationSourceNotVersionedError-51] _ = x[ErrReplicationNeedsVersioningError-52] _ = x[ErrReplicationBucketNeedsVersioningError-53] - _ = x[ErrObjectRestoreAlreadyInProgress-54] - _ = x[ErrNoSuchKey-55] - _ = x[ErrNoSuchUpload-56] - _ = x[ErrInvalidVersionID-57] - _ = x[ErrNoSuchVersion-58] - _ = x[ErrNotImplemented-59] - _ = x[ErrPreconditionFailed-60] - _ = x[ErrRequestTimeTooSkewed-61] - _ = x[ErrSignatureDoesNotMatch-62] - _ = x[ErrMethodNotAllowed-63] - _ = x[ErrInvalidPart-64] - _ = x[ErrInvalidPartOrder-65] - _ = x[ErrAuthorizationHeaderMalformed-66] - _ = x[ErrMalformedPOSTRequest-67] - _ = x[ErrPOSTFileRequired-68] - _ = x[ErrSignatureVersionNotSupported-69] - _ = x[ErrBucketNotEmpty-70] - _ = x[ErrAllAccessDisabled-71] - _ = x[ErrMalformedPolicy-72] - _ = x[ErrMissingFields-73] - _ = x[ErrMissingCredTag-74] - _ = x[ErrCredMalformed-75] - _ = x[ErrInvalidRegion-76] - _ = x[ErrInvalidServiceS3-77] - _ = x[ErrInvalidServiceSTS-78] - _ = x[ErrInvalidRequestVersion-79] - _ = x[ErrMissingSignTag-80] - _ = x[ErrMissingSignHeadersTag-81] - _ = x[ErrMalformedDate-82] - _ = x[ErrMalformedPresignedDate-83] - _ = x[ErrMalformedCredentialDate-84] - _ = x[ErrMalformedCredentialRegion-85] - _ = x[ErrMalformedExpires-86] - _ = x[ErrNegativeExpires-87] - _ = x[ErrAuthHeaderEmpty-88] - _ = x[ErrExpiredPresignRequest-89] - _ = x[ErrRequestNotReadyYet-90] - _ = x[ErrUnsignedHeaders-91] - _ = x[ErrMissingDateHeader-92] - _ = x[ErrInvalidQuerySignatureAlgo-93] - _ = x[ErrInvalidQueryParams-94] - _ = x[ErrBucketAlreadyOwnedByYou-95] - _ = x[ErrInvalidDuration-96] - _ = x[ErrBucketAlreadyExists-97] - _ = x[ErrMetadataTooLarge-98] - _ = x[ErrUnsupportedMetadata-99] - _ = x[ErrMaximumExpires-100] - _ = x[ErrSlowDown-101] - _ = x[ErrInvalidPrefixMarker-102] - _ = x[ErrBadRequest-103] - _ = x[ErrKeyTooLongError-104] - _ = x[ErrInvalidBucketObjectLockConfiguration-105] - _ = x[ErrObjectLockConfigurationNotFound-106] - _ = x[ErrObjectLockConfigurationNotAllowed-107] - _ = x[ErrNoSuchObjectLockConfiguration-108] - _ = x[ErrObjectLocked-109] - _ = x[ErrInvalidRetentionDate-110] - _ = x[ErrPastObjectLockRetainDate-111] - _ = x[ErrUnknownWORMModeDirective-112] - _ = x[ErrBucketTaggingNotFound-113] - _ = x[ErrObjectLockInvalidHeaders-114] - _ = x[ErrInvalidTagDirective-115] - _ = x[ErrInvalidEncryptionMethod-116] - _ = x[ErrInsecureSSECustomerRequest-117] - _ = x[ErrSSEMultipartEncrypted-118] - _ = x[ErrSSEEncryptedObject-119] - _ = x[ErrInvalidEncryptionParameters-120] - _ = x[ErrInvalidSSECustomerAlgorithm-121] - _ = x[ErrInvalidSSECustomerKey-122] - _ = x[ErrMissingSSECustomerKey-123] - _ = x[ErrMissingSSECustomerKeyMD5-124] - _ = x[ErrSSECustomerKeyMD5Mismatch-125] - _ = x[ErrInvalidSSECustomerParameters-126] - _ = x[ErrIncompatibleEncryptionMethod-127] - _ = x[ErrKMSNotConfigured-128] - _ = x[ErrNoAccessKey-129] - _ = x[ErrInvalidToken-130] - _ = x[ErrEventNotification-131] - _ = x[ErrARNNotification-132] - _ = x[ErrRegionNotification-133] - _ = x[ErrOverlappingFilterNotification-134] - _ = x[ErrFilterNameInvalid-135] - _ = x[ErrFilterNamePrefix-136] - _ = x[ErrFilterNameSuffix-137] - _ = x[ErrFilterValueInvalid-138] - _ = x[ErrOverlappingConfigs-139] - _ = x[ErrUnsupportedNotification-140] - _ = x[ErrContentSHA256Mismatch-141] - _ = x[ErrReadQuorum-142] - _ = x[ErrWriteQuorum-143] - _ = x[ErrParentIsObject-144] - _ = x[ErrStorageFull-145] - _ = x[ErrRequestBodyParse-146] - _ = x[ErrObjectExistsAsDirectory-147] - _ = x[ErrInvalidObjectName-148] - _ = x[ErrInvalidObjectNamePrefixSlash-149] - _ = x[ErrInvalidResourceName-150] - _ = x[ErrServerNotInitialized-151] - _ = x[ErrOperationTimedOut-152] - _ = x[ErrClientDisconnected-153] - _ = x[ErrOperationMaxedOut-154] - _ = x[ErrInvalidRequest-155] - _ = x[ErrTransitionStorageClassNotFoundError-156] - _ = x[ErrInvalidStorageClass-157] - _ = x[ErrBackendDown-158] - _ = x[ErrMalformedJSON-159] - _ = x[ErrAdminNoSuchUser-160] - _ = x[ErrAdminNoSuchGroup-161] - _ = x[ErrAdminGroupNotEmpty-162] - _ = x[ErrAdminNoSuchPolicy-163] - _ = x[ErrAdminInvalidArgument-164] - _ = x[ErrAdminInvalidAccessKey-165] - _ = x[ErrAdminInvalidSecretKey-166] - _ = x[ErrAdminConfigNoQuorum-167] - _ = x[ErrAdminConfigTooLarge-168] - _ = x[ErrAdminConfigBadJSON-169] - _ = x[ErrAdminConfigDuplicateKeys-170] - _ = x[ErrAdminCredentialsMismatch-171] - _ = x[ErrInsecureClientRequest-172] - _ = x[ErrObjectTampered-173] - _ = x[ErrAdminBucketQuotaExceeded-174] - _ = x[ErrAdminNoSuchQuotaConfiguration-175] - _ = x[ErrHealNotImplemented-176] - _ = x[ErrHealNoSuchProcess-177] - _ = x[ErrHealInvalidClientToken-178] - _ = x[ErrHealMissingBucket-179] - _ = x[ErrHealAlreadyRunning-180] - _ = x[ErrHealOverlappingPaths-181] - _ = x[ErrIncorrectContinuationToken-182] - _ = x[ErrEmptyRequestBody-183] - _ = x[ErrUnsupportedFunction-184] - _ = x[ErrInvalidExpressionType-185] - _ = x[ErrBusy-186] - _ = x[ErrUnauthorizedAccess-187] - _ = x[ErrExpressionTooLong-188] - _ = x[ErrIllegalSQLFunctionArgument-189] - _ = x[ErrInvalidKeyPath-190] - _ = x[ErrInvalidCompressionFormat-191] - _ = x[ErrInvalidFileHeaderInfo-192] - _ = x[ErrInvalidJSONType-193] - _ = x[ErrInvalidQuoteFields-194] - _ = x[ErrInvalidRequestParameter-195] - _ = x[ErrInvalidDataType-196] - _ = x[ErrInvalidTextEncoding-197] - _ = x[ErrInvalidDataSource-198] - _ = x[ErrInvalidTableAlias-199] - _ = x[ErrMissingRequiredParameter-200] - _ = x[ErrObjectSerializationConflict-201] - _ = x[ErrUnsupportedSQLOperation-202] - _ = x[ErrUnsupportedSQLStructure-203] - _ = x[ErrUnsupportedSyntax-204] - _ = x[ErrUnsupportedRangeHeader-205] - _ = x[ErrLexerInvalidChar-206] - _ = x[ErrLexerInvalidOperator-207] - _ = x[ErrLexerInvalidLiteral-208] - _ = x[ErrLexerInvalidIONLiteral-209] - _ = x[ErrParseExpectedDatePart-210] - _ = x[ErrParseExpectedKeyword-211] - _ = x[ErrParseExpectedTokenType-212] - _ = x[ErrParseExpected2TokenTypes-213] - _ = x[ErrParseExpectedNumber-214] - _ = x[ErrParseExpectedRightParenBuiltinFunctionCall-215] - _ = x[ErrParseExpectedTypeName-216] - _ = x[ErrParseExpectedWhenClause-217] - _ = x[ErrParseUnsupportedToken-218] - _ = x[ErrParseUnsupportedLiteralsGroupBy-219] - _ = x[ErrParseExpectedMember-220] - _ = x[ErrParseUnsupportedSelect-221] - _ = x[ErrParseUnsupportedCase-222] - _ = x[ErrParseUnsupportedCaseClause-223] - _ = x[ErrParseUnsupportedAlias-224] - _ = x[ErrParseUnsupportedSyntax-225] - _ = x[ErrParseUnknownOperator-226] - _ = x[ErrParseMissingIdentAfterAt-227] - _ = x[ErrParseUnexpectedOperator-228] - _ = x[ErrParseUnexpectedTerm-229] - _ = x[ErrParseUnexpectedToken-230] - _ = x[ErrParseUnexpectedKeyword-231] - _ = x[ErrParseExpectedExpression-232] - _ = x[ErrParseExpectedLeftParenAfterCast-233] - _ = x[ErrParseExpectedLeftParenValueConstructor-234] - _ = x[ErrParseExpectedLeftParenBuiltinFunctionCall-235] - _ = x[ErrParseExpectedArgumentDelimiter-236] - _ = x[ErrParseCastArity-237] - _ = x[ErrParseInvalidTypeParam-238] - _ = x[ErrParseEmptySelect-239] - _ = x[ErrParseSelectMissingFrom-240] - _ = x[ErrParseExpectedIdentForGroupName-241] - _ = x[ErrParseExpectedIdentForAlias-242] - _ = x[ErrParseUnsupportedCallWithStar-243] - _ = x[ErrParseNonUnaryAgregateFunctionCall-244] - _ = x[ErrParseMalformedJoin-245] - _ = x[ErrParseExpectedIdentForAt-246] - _ = x[ErrParseAsteriskIsNotAloneInSelectList-247] - _ = x[ErrParseCannotMixSqbAndWildcardInSelectList-248] - _ = x[ErrParseInvalidContextForWildcardInSelectList-249] - _ = x[ErrIncorrectSQLFunctionArgumentType-250] - _ = x[ErrValueParseFailure-251] - _ = x[ErrEvaluatorInvalidArguments-252] - _ = x[ErrIntegerOverflow-253] - _ = x[ErrLikeInvalidInputs-254] - _ = x[ErrCastFailed-255] - _ = x[ErrInvalidCast-256] - _ = x[ErrEvaluatorInvalidTimestampFormatPattern-257] - _ = x[ErrEvaluatorInvalidTimestampFormatPatternSymbolForParsing-258] - _ = x[ErrEvaluatorTimestampFormatPatternDuplicateFields-259] - _ = x[ErrEvaluatorTimestampFormatPatternHourClockAmPmMismatch-260] - _ = x[ErrEvaluatorUnterminatedTimestampFormatPatternToken-261] - _ = x[ErrEvaluatorInvalidTimestampFormatPatternToken-262] - _ = x[ErrEvaluatorInvalidTimestampFormatPatternSymbol-263] - _ = x[ErrEvaluatorBindingDoesNotExist-264] - _ = x[ErrMissingHeaders-265] - _ = x[ErrInvalidColumnIndex-266] - _ = x[ErrAdminConfigNotificationTargetsFailed-267] - _ = x[ErrAdminProfilerNotEnabled-268] - _ = x[ErrInvalidDecompressedSize-269] - _ = x[ErrAddUserInvalidArgument-270] - _ = x[ErrAdminAccountNotEligible-271] - _ = x[ErrAccountNotEligible-272] - _ = x[ErrAdminServiceAccountNotFound-273] - _ = x[ErrPostPolicyConditionInvalidFormat-274] + _ = x[ErrReplicationNoMatchingRuleError-54] + _ = x[ErrObjectRestoreAlreadyInProgress-55] + _ = x[ErrNoSuchKey-56] + _ = x[ErrNoSuchUpload-57] + _ = x[ErrInvalidVersionID-58] + _ = x[ErrNoSuchVersion-59] + _ = x[ErrNotImplemented-60] + _ = x[ErrPreconditionFailed-61] + _ = x[ErrRequestTimeTooSkewed-62] + _ = x[ErrSignatureDoesNotMatch-63] + _ = x[ErrMethodNotAllowed-64] + _ = x[ErrInvalidPart-65] + _ = x[ErrInvalidPartOrder-66] + _ = x[ErrAuthorizationHeaderMalformed-67] + _ = x[ErrMalformedPOSTRequest-68] + _ = x[ErrPOSTFileRequired-69] + _ = x[ErrSignatureVersionNotSupported-70] + _ = x[ErrBucketNotEmpty-71] + _ = x[ErrAllAccessDisabled-72] + _ = x[ErrMalformedPolicy-73] + _ = x[ErrMissingFields-74] + _ = x[ErrMissingCredTag-75] + _ = x[ErrCredMalformed-76] + _ = x[ErrInvalidRegion-77] + _ = x[ErrInvalidServiceS3-78] + _ = x[ErrInvalidServiceSTS-79] + _ = x[ErrInvalidRequestVersion-80] + _ = x[ErrMissingSignTag-81] + _ = x[ErrMissingSignHeadersTag-82] + _ = x[ErrMalformedDate-83] + _ = x[ErrMalformedPresignedDate-84] + _ = x[ErrMalformedCredentialDate-85] + _ = x[ErrMalformedCredentialRegion-86] + _ = x[ErrMalformedExpires-87] + _ = x[ErrNegativeExpires-88] + _ = x[ErrAuthHeaderEmpty-89] + _ = x[ErrExpiredPresignRequest-90] + _ = x[ErrRequestNotReadyYet-91] + _ = x[ErrUnsignedHeaders-92] + _ = x[ErrMissingDateHeader-93] + _ = x[ErrInvalidQuerySignatureAlgo-94] + _ = x[ErrInvalidQueryParams-95] + _ = x[ErrBucketAlreadyOwnedByYou-96] + _ = x[ErrInvalidDuration-97] + _ = x[ErrBucketAlreadyExists-98] + _ = x[ErrMetadataTooLarge-99] + _ = x[ErrUnsupportedMetadata-100] + _ = x[ErrMaximumExpires-101] + _ = x[ErrSlowDown-102] + _ = x[ErrInvalidPrefixMarker-103] + _ = x[ErrBadRequest-104] + _ = x[ErrKeyTooLongError-105] + _ = x[ErrInvalidBucketObjectLockConfiguration-106] + _ = x[ErrObjectLockConfigurationNotFound-107] + _ = x[ErrObjectLockConfigurationNotAllowed-108] + _ = x[ErrNoSuchObjectLockConfiguration-109] + _ = x[ErrObjectLocked-110] + _ = x[ErrInvalidRetentionDate-111] + _ = x[ErrPastObjectLockRetainDate-112] + _ = x[ErrUnknownWORMModeDirective-113] + _ = x[ErrBucketTaggingNotFound-114] + _ = x[ErrObjectLockInvalidHeaders-115] + _ = x[ErrInvalidTagDirective-116] + _ = x[ErrInvalidEncryptionMethod-117] + _ = x[ErrInsecureSSECustomerRequest-118] + _ = x[ErrSSEMultipartEncrypted-119] + _ = x[ErrSSEEncryptedObject-120] + _ = x[ErrInvalidEncryptionParameters-121] + _ = x[ErrInvalidSSECustomerAlgorithm-122] + _ = x[ErrInvalidSSECustomerKey-123] + _ = x[ErrMissingSSECustomerKey-124] + _ = x[ErrMissingSSECustomerKeyMD5-125] + _ = x[ErrSSECustomerKeyMD5Mismatch-126] + _ = x[ErrInvalidSSECustomerParameters-127] + _ = x[ErrIncompatibleEncryptionMethod-128] + _ = x[ErrKMSNotConfigured-129] + _ = x[ErrNoAccessKey-130] + _ = x[ErrInvalidToken-131] + _ = x[ErrEventNotification-132] + _ = x[ErrARNNotification-133] + _ = x[ErrRegionNotification-134] + _ = x[ErrOverlappingFilterNotification-135] + _ = x[ErrFilterNameInvalid-136] + _ = x[ErrFilterNamePrefix-137] + _ = x[ErrFilterNameSuffix-138] + _ = x[ErrFilterValueInvalid-139] + _ = x[ErrOverlappingConfigs-140] + _ = x[ErrUnsupportedNotification-141] + _ = x[ErrContentSHA256Mismatch-142] + _ = x[ErrReadQuorum-143] + _ = x[ErrWriteQuorum-144] + _ = x[ErrParentIsObject-145] + _ = x[ErrStorageFull-146] + _ = x[ErrRequestBodyParse-147] + _ = x[ErrObjectExistsAsDirectory-148] + _ = x[ErrInvalidObjectName-149] + _ = x[ErrInvalidObjectNamePrefixSlash-150] + _ = x[ErrInvalidResourceName-151] + _ = x[ErrServerNotInitialized-152] + _ = x[ErrOperationTimedOut-153] + _ = x[ErrClientDisconnected-154] + _ = x[ErrOperationMaxedOut-155] + _ = x[ErrInvalidRequest-156] + _ = x[ErrTransitionStorageClassNotFoundError-157] + _ = x[ErrInvalidStorageClass-158] + _ = x[ErrBackendDown-159] + _ = x[ErrMalformedJSON-160] + _ = x[ErrAdminNoSuchUser-161] + _ = x[ErrAdminNoSuchGroup-162] + _ = x[ErrAdminGroupNotEmpty-163] + _ = x[ErrAdminNoSuchPolicy-164] + _ = x[ErrAdminInvalidArgument-165] + _ = x[ErrAdminInvalidAccessKey-166] + _ = x[ErrAdminInvalidSecretKey-167] + _ = x[ErrAdminConfigNoQuorum-168] + _ = x[ErrAdminConfigTooLarge-169] + _ = x[ErrAdminConfigBadJSON-170] + _ = x[ErrAdminConfigDuplicateKeys-171] + _ = x[ErrAdminCredentialsMismatch-172] + _ = x[ErrInsecureClientRequest-173] + _ = x[ErrObjectTampered-174] + _ = x[ErrAdminBucketQuotaExceeded-175] + _ = x[ErrAdminNoSuchQuotaConfiguration-176] + _ = x[ErrHealNotImplemented-177] + _ = x[ErrHealNoSuchProcess-178] + _ = x[ErrHealInvalidClientToken-179] + _ = x[ErrHealMissingBucket-180] + _ = x[ErrHealAlreadyRunning-181] + _ = x[ErrHealOverlappingPaths-182] + _ = x[ErrIncorrectContinuationToken-183] + _ = x[ErrEmptyRequestBody-184] + _ = x[ErrUnsupportedFunction-185] + _ = x[ErrInvalidExpressionType-186] + _ = x[ErrBusy-187] + _ = x[ErrUnauthorizedAccess-188] + _ = x[ErrExpressionTooLong-189] + _ = x[ErrIllegalSQLFunctionArgument-190] + _ = x[ErrInvalidKeyPath-191] + _ = x[ErrInvalidCompressionFormat-192] + _ = x[ErrInvalidFileHeaderInfo-193] + _ = x[ErrInvalidJSONType-194] + _ = x[ErrInvalidQuoteFields-195] + _ = x[ErrInvalidRequestParameter-196] + _ = x[ErrInvalidDataType-197] + _ = x[ErrInvalidTextEncoding-198] + _ = x[ErrInvalidDataSource-199] + _ = x[ErrInvalidTableAlias-200] + _ = x[ErrMissingRequiredParameter-201] + _ = x[ErrObjectSerializationConflict-202] + _ = x[ErrUnsupportedSQLOperation-203] + _ = x[ErrUnsupportedSQLStructure-204] + _ = x[ErrUnsupportedSyntax-205] + _ = x[ErrUnsupportedRangeHeader-206] + _ = x[ErrLexerInvalidChar-207] + _ = x[ErrLexerInvalidOperator-208] + _ = x[ErrLexerInvalidLiteral-209] + _ = x[ErrLexerInvalidIONLiteral-210] + _ = x[ErrParseExpectedDatePart-211] + _ = x[ErrParseExpectedKeyword-212] + _ = x[ErrParseExpectedTokenType-213] + _ = x[ErrParseExpected2TokenTypes-214] + _ = x[ErrParseExpectedNumber-215] + _ = x[ErrParseExpectedRightParenBuiltinFunctionCall-216] + _ = x[ErrParseExpectedTypeName-217] + _ = x[ErrParseExpectedWhenClause-218] + _ = x[ErrParseUnsupportedToken-219] + _ = x[ErrParseUnsupportedLiteralsGroupBy-220] + _ = x[ErrParseExpectedMember-221] + _ = x[ErrParseUnsupportedSelect-222] + _ = x[ErrParseUnsupportedCase-223] + _ = x[ErrParseUnsupportedCaseClause-224] + _ = x[ErrParseUnsupportedAlias-225] + _ = x[ErrParseUnsupportedSyntax-226] + _ = x[ErrParseUnknownOperator-227] + _ = x[ErrParseMissingIdentAfterAt-228] + _ = x[ErrParseUnexpectedOperator-229] + _ = x[ErrParseUnexpectedTerm-230] + _ = x[ErrParseUnexpectedToken-231] + _ = x[ErrParseUnexpectedKeyword-232] + _ = x[ErrParseExpectedExpression-233] + _ = x[ErrParseExpectedLeftParenAfterCast-234] + _ = x[ErrParseExpectedLeftParenValueConstructor-235] + _ = x[ErrParseExpectedLeftParenBuiltinFunctionCall-236] + _ = x[ErrParseExpectedArgumentDelimiter-237] + _ = x[ErrParseCastArity-238] + _ = x[ErrParseInvalidTypeParam-239] + _ = x[ErrParseEmptySelect-240] + _ = x[ErrParseSelectMissingFrom-241] + _ = x[ErrParseExpectedIdentForGroupName-242] + _ = x[ErrParseExpectedIdentForAlias-243] + _ = x[ErrParseUnsupportedCallWithStar-244] + _ = x[ErrParseNonUnaryAgregateFunctionCall-245] + _ = x[ErrParseMalformedJoin-246] + _ = x[ErrParseExpectedIdentForAt-247] + _ = x[ErrParseAsteriskIsNotAloneInSelectList-248] + _ = x[ErrParseCannotMixSqbAndWildcardInSelectList-249] + _ = x[ErrParseInvalidContextForWildcardInSelectList-250] + _ = x[ErrIncorrectSQLFunctionArgumentType-251] + _ = x[ErrValueParseFailure-252] + _ = x[ErrEvaluatorInvalidArguments-253] + _ = x[ErrIntegerOverflow-254] + _ = x[ErrLikeInvalidInputs-255] + _ = x[ErrCastFailed-256] + _ = x[ErrInvalidCast-257] + _ = x[ErrEvaluatorInvalidTimestampFormatPattern-258] + _ = x[ErrEvaluatorInvalidTimestampFormatPatternSymbolForParsing-259] + _ = x[ErrEvaluatorTimestampFormatPatternDuplicateFields-260] + _ = x[ErrEvaluatorTimestampFormatPatternHourClockAmPmMismatch-261] + _ = x[ErrEvaluatorUnterminatedTimestampFormatPatternToken-262] + _ = x[ErrEvaluatorInvalidTimestampFormatPatternToken-263] + _ = x[ErrEvaluatorInvalidTimestampFormatPatternSymbol-264] + _ = x[ErrEvaluatorBindingDoesNotExist-265] + _ = x[ErrMissingHeaders-266] + _ = x[ErrInvalidColumnIndex-267] + _ = x[ErrAdminConfigNotificationTargetsFailed-268] + _ = x[ErrAdminProfilerNotEnabled-269] + _ = x[ErrInvalidDecompressedSize-270] + _ = x[ErrAddUserInvalidArgument-271] + _ = x[ErrAdminAccountNotEligible-272] + _ = x[ErrAccountNotEligible-273] + _ = x[ErrAdminServiceAccountNotFound-274] + _ = x[ErrPostPolicyConditionInvalidFormat-275] } -const _APIErrorCode_name = "NoneAccessDeniedBadDigestEntityTooSmallEntityTooLargePolicyTooLargeIncompleteBodyInternalErrorInvalidAccessKeyIDInvalidBucketNameInvalidDigestInvalidRangeInvalidRangePartNumberInvalidCopyPartRangeInvalidCopyPartRangeSourceInvalidMaxKeysInvalidEncodingMethodInvalidMaxUploadsInvalidMaxPartsInvalidPartNumberMarkerInvalidPartNumberInvalidRequestBodyInvalidCopySourceInvalidMetadataDirectiveInvalidCopyDestInvalidPolicyDocumentInvalidObjectStateMalformedXMLMissingContentLengthMissingContentMD5MissingRequestBodyErrorMissingSecurityHeaderNoSuchBucketNoSuchBucketPolicyNoSuchBucketLifecycleNoSuchLifecycleConfigurationNoSuchBucketSSEConfigNoSuchCORSConfigurationNoSuchWebsiteConfigurationReplicationConfigurationNotFoundErrorRemoteDestinationNotFoundErrorReplicationDestinationMissingLockRemoteTargetNotFoundErrorReplicationRemoteConnectionErrorBucketRemoteIdenticalToSourceBucketRemoteAlreadyExistsBucketRemoteLabelInUseBucketRemoteArnTypeInvalidBucketRemoteArnInvalidBucketRemoteRemoveDisallowedRemoteTargetNotVersionedErrorReplicationSourceNotVersionedErrorReplicationNeedsVersioningErrorReplicationBucketNeedsVersioningErrorObjectRestoreAlreadyInProgressNoSuchKeyNoSuchUploadInvalidVersionIDNoSuchVersionNotImplementedPreconditionFailedRequestTimeTooSkewedSignatureDoesNotMatchMethodNotAllowedInvalidPartInvalidPartOrderAuthorizationHeaderMalformedMalformedPOSTRequestPOSTFileRequiredSignatureVersionNotSupportedBucketNotEmptyAllAccessDisabledMalformedPolicyMissingFieldsMissingCredTagCredMalformedInvalidRegionInvalidServiceS3InvalidServiceSTSInvalidRequestVersionMissingSignTagMissingSignHeadersTagMalformedDateMalformedPresignedDateMalformedCredentialDateMalformedCredentialRegionMalformedExpiresNegativeExpiresAuthHeaderEmptyExpiredPresignRequestRequestNotReadyYetUnsignedHeadersMissingDateHeaderInvalidQuerySignatureAlgoInvalidQueryParamsBucketAlreadyOwnedByYouInvalidDurationBucketAlreadyExistsMetadataTooLargeUnsupportedMetadataMaximumExpiresSlowDownInvalidPrefixMarkerBadRequestKeyTooLongErrorInvalidBucketObjectLockConfigurationObjectLockConfigurationNotFoundObjectLockConfigurationNotAllowedNoSuchObjectLockConfigurationObjectLockedInvalidRetentionDatePastObjectLockRetainDateUnknownWORMModeDirectiveBucketTaggingNotFoundObjectLockInvalidHeadersInvalidTagDirectiveInvalidEncryptionMethodInsecureSSECustomerRequestSSEMultipartEncryptedSSEEncryptedObjectInvalidEncryptionParametersInvalidSSECustomerAlgorithmInvalidSSECustomerKeyMissingSSECustomerKeyMissingSSECustomerKeyMD5SSECustomerKeyMD5MismatchInvalidSSECustomerParametersIncompatibleEncryptionMethodKMSNotConfiguredNoAccessKeyInvalidTokenEventNotificationARNNotificationRegionNotificationOverlappingFilterNotificationFilterNameInvalidFilterNamePrefixFilterNameSuffixFilterValueInvalidOverlappingConfigsUnsupportedNotificationContentSHA256MismatchReadQuorumWriteQuorumParentIsObjectStorageFullRequestBodyParseObjectExistsAsDirectoryInvalidObjectNameInvalidObjectNamePrefixSlashInvalidResourceNameServerNotInitializedOperationTimedOutClientDisconnectedOperationMaxedOutInvalidRequestTransitionStorageClassNotFoundErrorInvalidStorageClassBackendDownMalformedJSONAdminNoSuchUserAdminNoSuchGroupAdminGroupNotEmptyAdminNoSuchPolicyAdminInvalidArgumentAdminInvalidAccessKeyAdminInvalidSecretKeyAdminConfigNoQuorumAdminConfigTooLargeAdminConfigBadJSONAdminConfigDuplicateKeysAdminCredentialsMismatchInsecureClientRequestObjectTamperedAdminBucketQuotaExceededAdminNoSuchQuotaConfigurationHealNotImplementedHealNoSuchProcessHealInvalidClientTokenHealMissingBucketHealAlreadyRunningHealOverlappingPathsIncorrectContinuationTokenEmptyRequestBodyUnsupportedFunctionInvalidExpressionTypeBusyUnauthorizedAccessExpressionTooLongIllegalSQLFunctionArgumentInvalidKeyPathInvalidCompressionFormatInvalidFileHeaderInfoInvalidJSONTypeInvalidQuoteFieldsInvalidRequestParameterInvalidDataTypeInvalidTextEncodingInvalidDataSourceInvalidTableAliasMissingRequiredParameterObjectSerializationConflictUnsupportedSQLOperationUnsupportedSQLStructureUnsupportedSyntaxUnsupportedRangeHeaderLexerInvalidCharLexerInvalidOperatorLexerInvalidLiteralLexerInvalidIONLiteralParseExpectedDatePartParseExpectedKeywordParseExpectedTokenTypeParseExpected2TokenTypesParseExpectedNumberParseExpectedRightParenBuiltinFunctionCallParseExpectedTypeNameParseExpectedWhenClauseParseUnsupportedTokenParseUnsupportedLiteralsGroupByParseExpectedMemberParseUnsupportedSelectParseUnsupportedCaseParseUnsupportedCaseClauseParseUnsupportedAliasParseUnsupportedSyntaxParseUnknownOperatorParseMissingIdentAfterAtParseUnexpectedOperatorParseUnexpectedTermParseUnexpectedTokenParseUnexpectedKeywordParseExpectedExpressionParseExpectedLeftParenAfterCastParseExpectedLeftParenValueConstructorParseExpectedLeftParenBuiltinFunctionCallParseExpectedArgumentDelimiterParseCastArityParseInvalidTypeParamParseEmptySelectParseSelectMissingFromParseExpectedIdentForGroupNameParseExpectedIdentForAliasParseUnsupportedCallWithStarParseNonUnaryAgregateFunctionCallParseMalformedJoinParseExpectedIdentForAtParseAsteriskIsNotAloneInSelectListParseCannotMixSqbAndWildcardInSelectListParseInvalidContextForWildcardInSelectListIncorrectSQLFunctionArgumentTypeValueParseFailureEvaluatorInvalidArgumentsIntegerOverflowLikeInvalidInputsCastFailedInvalidCastEvaluatorInvalidTimestampFormatPatternEvaluatorInvalidTimestampFormatPatternSymbolForParsingEvaluatorTimestampFormatPatternDuplicateFieldsEvaluatorTimestampFormatPatternHourClockAmPmMismatchEvaluatorUnterminatedTimestampFormatPatternTokenEvaluatorInvalidTimestampFormatPatternTokenEvaluatorInvalidTimestampFormatPatternSymbolEvaluatorBindingDoesNotExistMissingHeadersInvalidColumnIndexAdminConfigNotificationTargetsFailedAdminProfilerNotEnabledInvalidDecompressedSizeAddUserInvalidArgumentAdminAccountNotEligibleAccountNotEligibleAdminServiceAccountNotFoundPostPolicyConditionInvalidFormat" +const _APIErrorCode_name = "NoneAccessDeniedBadDigestEntityTooSmallEntityTooLargePolicyTooLargeIncompleteBodyInternalErrorInvalidAccessKeyIDInvalidBucketNameInvalidDigestInvalidRangeInvalidRangePartNumberInvalidCopyPartRangeInvalidCopyPartRangeSourceInvalidMaxKeysInvalidEncodingMethodInvalidMaxUploadsInvalidMaxPartsInvalidPartNumberMarkerInvalidPartNumberInvalidRequestBodyInvalidCopySourceInvalidMetadataDirectiveInvalidCopyDestInvalidPolicyDocumentInvalidObjectStateMalformedXMLMissingContentLengthMissingContentMD5MissingRequestBodyErrorMissingSecurityHeaderNoSuchBucketNoSuchBucketPolicyNoSuchBucketLifecycleNoSuchLifecycleConfigurationNoSuchBucketSSEConfigNoSuchCORSConfigurationNoSuchWebsiteConfigurationReplicationConfigurationNotFoundErrorRemoteDestinationNotFoundErrorReplicationDestinationMissingLockRemoteTargetNotFoundErrorReplicationRemoteConnectionErrorBucketRemoteIdenticalToSourceBucketRemoteAlreadyExistsBucketRemoteLabelInUseBucketRemoteArnTypeInvalidBucketRemoteArnInvalidBucketRemoteRemoveDisallowedRemoteTargetNotVersionedErrorReplicationSourceNotVersionedErrorReplicationNeedsVersioningErrorReplicationBucketNeedsVersioningErrorReplicationNoMatchingRuleErrorObjectRestoreAlreadyInProgressNoSuchKeyNoSuchUploadInvalidVersionIDNoSuchVersionNotImplementedPreconditionFailedRequestTimeTooSkewedSignatureDoesNotMatchMethodNotAllowedInvalidPartInvalidPartOrderAuthorizationHeaderMalformedMalformedPOSTRequestPOSTFileRequiredSignatureVersionNotSupportedBucketNotEmptyAllAccessDisabledMalformedPolicyMissingFieldsMissingCredTagCredMalformedInvalidRegionInvalidServiceS3InvalidServiceSTSInvalidRequestVersionMissingSignTagMissingSignHeadersTagMalformedDateMalformedPresignedDateMalformedCredentialDateMalformedCredentialRegionMalformedExpiresNegativeExpiresAuthHeaderEmptyExpiredPresignRequestRequestNotReadyYetUnsignedHeadersMissingDateHeaderInvalidQuerySignatureAlgoInvalidQueryParamsBucketAlreadyOwnedByYouInvalidDurationBucketAlreadyExistsMetadataTooLargeUnsupportedMetadataMaximumExpiresSlowDownInvalidPrefixMarkerBadRequestKeyTooLongErrorInvalidBucketObjectLockConfigurationObjectLockConfigurationNotFoundObjectLockConfigurationNotAllowedNoSuchObjectLockConfigurationObjectLockedInvalidRetentionDatePastObjectLockRetainDateUnknownWORMModeDirectiveBucketTaggingNotFoundObjectLockInvalidHeadersInvalidTagDirectiveInvalidEncryptionMethodInsecureSSECustomerRequestSSEMultipartEncryptedSSEEncryptedObjectInvalidEncryptionParametersInvalidSSECustomerAlgorithmInvalidSSECustomerKeyMissingSSECustomerKeyMissingSSECustomerKeyMD5SSECustomerKeyMD5MismatchInvalidSSECustomerParametersIncompatibleEncryptionMethodKMSNotConfiguredNoAccessKeyInvalidTokenEventNotificationARNNotificationRegionNotificationOverlappingFilterNotificationFilterNameInvalidFilterNamePrefixFilterNameSuffixFilterValueInvalidOverlappingConfigsUnsupportedNotificationContentSHA256MismatchReadQuorumWriteQuorumParentIsObjectStorageFullRequestBodyParseObjectExistsAsDirectoryInvalidObjectNameInvalidObjectNamePrefixSlashInvalidResourceNameServerNotInitializedOperationTimedOutClientDisconnectedOperationMaxedOutInvalidRequestTransitionStorageClassNotFoundErrorInvalidStorageClassBackendDownMalformedJSONAdminNoSuchUserAdminNoSuchGroupAdminGroupNotEmptyAdminNoSuchPolicyAdminInvalidArgumentAdminInvalidAccessKeyAdminInvalidSecretKeyAdminConfigNoQuorumAdminConfigTooLargeAdminConfigBadJSONAdminConfigDuplicateKeysAdminCredentialsMismatchInsecureClientRequestObjectTamperedAdminBucketQuotaExceededAdminNoSuchQuotaConfigurationHealNotImplementedHealNoSuchProcessHealInvalidClientTokenHealMissingBucketHealAlreadyRunningHealOverlappingPathsIncorrectContinuationTokenEmptyRequestBodyUnsupportedFunctionInvalidExpressionTypeBusyUnauthorizedAccessExpressionTooLongIllegalSQLFunctionArgumentInvalidKeyPathInvalidCompressionFormatInvalidFileHeaderInfoInvalidJSONTypeInvalidQuoteFieldsInvalidRequestParameterInvalidDataTypeInvalidTextEncodingInvalidDataSourceInvalidTableAliasMissingRequiredParameterObjectSerializationConflictUnsupportedSQLOperationUnsupportedSQLStructureUnsupportedSyntaxUnsupportedRangeHeaderLexerInvalidCharLexerInvalidOperatorLexerInvalidLiteralLexerInvalidIONLiteralParseExpectedDatePartParseExpectedKeywordParseExpectedTokenTypeParseExpected2TokenTypesParseExpectedNumberParseExpectedRightParenBuiltinFunctionCallParseExpectedTypeNameParseExpectedWhenClauseParseUnsupportedTokenParseUnsupportedLiteralsGroupByParseExpectedMemberParseUnsupportedSelectParseUnsupportedCaseParseUnsupportedCaseClauseParseUnsupportedAliasParseUnsupportedSyntaxParseUnknownOperatorParseMissingIdentAfterAtParseUnexpectedOperatorParseUnexpectedTermParseUnexpectedTokenParseUnexpectedKeywordParseExpectedExpressionParseExpectedLeftParenAfterCastParseExpectedLeftParenValueConstructorParseExpectedLeftParenBuiltinFunctionCallParseExpectedArgumentDelimiterParseCastArityParseInvalidTypeParamParseEmptySelectParseSelectMissingFromParseExpectedIdentForGroupNameParseExpectedIdentForAliasParseUnsupportedCallWithStarParseNonUnaryAgregateFunctionCallParseMalformedJoinParseExpectedIdentForAtParseAsteriskIsNotAloneInSelectListParseCannotMixSqbAndWildcardInSelectListParseInvalidContextForWildcardInSelectListIncorrectSQLFunctionArgumentTypeValueParseFailureEvaluatorInvalidArgumentsIntegerOverflowLikeInvalidInputsCastFailedInvalidCastEvaluatorInvalidTimestampFormatPatternEvaluatorInvalidTimestampFormatPatternSymbolForParsingEvaluatorTimestampFormatPatternDuplicateFieldsEvaluatorTimestampFormatPatternHourClockAmPmMismatchEvaluatorUnterminatedTimestampFormatPatternTokenEvaluatorInvalidTimestampFormatPatternTokenEvaluatorInvalidTimestampFormatPatternSymbolEvaluatorBindingDoesNotExistMissingHeadersInvalidColumnIndexAdminConfigNotificationTargetsFailedAdminProfilerNotEnabledInvalidDecompressedSizeAddUserInvalidArgumentAdminAccountNotEligibleAccountNotEligibleAdminServiceAccountNotFoundPostPolicyConditionInvalidFormat" -var _APIErrorCode_index = [...]uint16{0, 4, 16, 25, 39, 53, 67, 81, 94, 112, 129, 142, 154, 176, 196, 222, 236, 257, 274, 289, 312, 329, 347, 364, 388, 403, 424, 442, 454, 474, 491, 514, 535, 547, 565, 586, 614, 635, 658, 684, 721, 751, 784, 809, 841, 870, 895, 917, 943, 965, 993, 1022, 1056, 1087, 1124, 1154, 1163, 1175, 1191, 1204, 1218, 1236, 1256, 1277, 1293, 1304, 1320, 1348, 1368, 1384, 1412, 1426, 1443, 1458, 1471, 1485, 1498, 1511, 1527, 1544, 1565, 1579, 1600, 1613, 1635, 1658, 1683, 1699, 1714, 1729, 1750, 1768, 1783, 1800, 1825, 1843, 1866, 1881, 1900, 1916, 1935, 1949, 1957, 1976, 1986, 2001, 2037, 2068, 2101, 2130, 2142, 2162, 2186, 2210, 2231, 2255, 2274, 2297, 2323, 2344, 2362, 2389, 2416, 2437, 2458, 2482, 2507, 2535, 2563, 2579, 2590, 2602, 2619, 2634, 2652, 2681, 2698, 2714, 2730, 2748, 2766, 2789, 2810, 2820, 2831, 2845, 2856, 2872, 2895, 2912, 2940, 2959, 2979, 2996, 3014, 3031, 3045, 3080, 3099, 3110, 3123, 3138, 3154, 3172, 3189, 3209, 3230, 3251, 3270, 3289, 3307, 3331, 3355, 3376, 3390, 3414, 3443, 3461, 3478, 3500, 3517, 3535, 3555, 3581, 3597, 3616, 3637, 3641, 3659, 3676, 3702, 3716, 3740, 3761, 3776, 3794, 3817, 3832, 3851, 3868, 3885, 3909, 3936, 3959, 3982, 3999, 4021, 4037, 4057, 4076, 4098, 4119, 4139, 4161, 4185, 4204, 4246, 4267, 4290, 4311, 4342, 4361, 4383, 4403, 4429, 4450, 4472, 4492, 4516, 4539, 4558, 4578, 4600, 4623, 4654, 4692, 4733, 4763, 4777, 4798, 4814, 4836, 4866, 4892, 4920, 4953, 4971, 4994, 5029, 5069, 5111, 5143, 5160, 5185, 5200, 5217, 5227, 5238, 5276, 5330, 5376, 5428, 5476, 5519, 5563, 5591, 5605, 5623, 5659, 5682, 5705, 5727, 5750, 5768, 5795, 5827} +var _APIErrorCode_index = [...]uint16{0, 4, 16, 25, 39, 53, 67, 81, 94, 112, 129, 142, 154, 176, 196, 222, 236, 257, 274, 289, 312, 329, 347, 364, 388, 403, 424, 442, 454, 474, 491, 514, 535, 547, 565, 586, 614, 635, 658, 684, 721, 751, 784, 809, 841, 870, 895, 917, 943, 965, 993, 1022, 1056, 1087, 1124, 1154, 1184, 1193, 1205, 1221, 1234, 1248, 1266, 1286, 1307, 1323, 1334, 1350, 1378, 1398, 1414, 1442, 1456, 1473, 1488, 1501, 1515, 1528, 1541, 1557, 1574, 1595, 1609, 1630, 1643, 1665, 1688, 1713, 1729, 1744, 1759, 1780, 1798, 1813, 1830, 1855, 1873, 1896, 1911, 1930, 1946, 1965, 1979, 1987, 2006, 2016, 2031, 2067, 2098, 2131, 2160, 2172, 2192, 2216, 2240, 2261, 2285, 2304, 2327, 2353, 2374, 2392, 2419, 2446, 2467, 2488, 2512, 2537, 2565, 2593, 2609, 2620, 2632, 2649, 2664, 2682, 2711, 2728, 2744, 2760, 2778, 2796, 2819, 2840, 2850, 2861, 2875, 2886, 2902, 2925, 2942, 2970, 2989, 3009, 3026, 3044, 3061, 3075, 3110, 3129, 3140, 3153, 3168, 3184, 3202, 3219, 3239, 3260, 3281, 3300, 3319, 3337, 3361, 3385, 3406, 3420, 3444, 3473, 3491, 3508, 3530, 3547, 3565, 3585, 3611, 3627, 3646, 3667, 3671, 3689, 3706, 3732, 3746, 3770, 3791, 3806, 3824, 3847, 3862, 3881, 3898, 3915, 3939, 3966, 3989, 4012, 4029, 4051, 4067, 4087, 4106, 4128, 4149, 4169, 4191, 4215, 4234, 4276, 4297, 4320, 4341, 4372, 4391, 4413, 4433, 4459, 4480, 4502, 4522, 4546, 4569, 4588, 4608, 4630, 4653, 4684, 4722, 4763, 4793, 4807, 4828, 4844, 4866, 4896, 4922, 4950, 4983, 5001, 5024, 5059, 5099, 5141, 5173, 5190, 5215, 5230, 5247, 5257, 5268, 5306, 5360, 5406, 5458, 5506, 5549, 5593, 5621, 5635, 5653, 5689, 5712, 5735, 5757, 5780, 5798, 5825, 5857} func (i APIErrorCode) String() string { if i < 0 || i >= APIErrorCode(len(_APIErrorCode_index)-1) { diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 6db466dfb..948cc0ee3 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -33,6 +33,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/google/uuid" "github.com/gorilla/mux" @@ -611,7 +612,7 @@ func (api objectAPIHandlers) DeleteMultipleObjectsHandler(w http.ResponseWriter, if replicateDeletes { if dobj.DeleteMarkerReplicationStatus == string(replication.Pending) || dobj.VersionPurgeStatus == Pending { - dv := DeletedObjectVersionInfo{ + dv := DeletedObjectReplicationInfo{ DeletedObject: dobj, Bucket: bucket, } @@ -1686,3 +1687,90 @@ func (api objectAPIHandlers) GetBucketReplicationMetricsHandler(w http.ResponseW } w.(http.Flusher).Flush() } + +// ResetBucketReplicationStateHandler - starts a replication reset for all objects in a bucket which +// qualify for replication and re-sync the object(s) to target, provided ExistingObjectReplication is +// enabled for the qualifying rule. This API is a MinIO only extension provided for situations where +// remote target is entirely lost,and previously replicated objects need to be re-synced. +func (api objectAPIHandlers) ResetBucketReplicationStateHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "ResetBucketReplicationState") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + vars := mux.Vars(r) + bucket := vars["bucket"] + durationStr := r.URL.Query().Get("older-than") + var ( + days time.Duration + err error + ) + if durationStr != "" { + days, err = time.ParseDuration(durationStr) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, InvalidArgument{ + Bucket: bucket, + Err: fmt.Errorf("invalid query parameter older-than %s for %s : %w", durationStr, bucket, err), + }), r.URL, guessIsBrowserReq(r)) + } + } + + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r)) + return + } + + if s3Error := checkRequestAuthType(ctx, r, policy.ResetBucketReplicationStateAction, bucket, ""); s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r)) + return + } + + // Check if bucket exists. + if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + config, err := globalBucketMetadataSys.GetReplicationConfig(ctx, bucket) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + if !config.HasActiveRules("", true) { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrReplicationNoMatchingRuleError), r.URL, guessIsBrowserReq(r)) + return + } + target := globalBucketTargetSys.GetRemoteBucketTargetByArn(ctx, bucket, config.RoleArn) + target.ResetBeforeDate = UTCNow().AddDate(0, 0, -1*int(days/24)) + target.ResetID = mustGetUUID() + if err = globalBucketTargetSys.SetTarget(ctx, bucket, &target, true); err != nil { + switch err.(type) { + case BucketRemoteConnectionErr: + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrReplicationRemoteConnectionError, err), r.URL) + default: + writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) + } + return + } + targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + tgtBytes, err := json.Marshal(&targets) + if err != nil { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrAdminConfigBadJSON, err), r.URL) + return + } + if err = globalBucketMetadataSys.Update(bucket, bucketTargetsFile, tgtBytes); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + data, err := json.Marshal(target.ResetID) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + // Write success response. + writeSuccessResponseJSON(w, data) +} diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index b3d72fb0c..d90a7bf10 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -91,32 +91,70 @@ func validateReplicationDestination(ctx context.Context, bucket string, rCfg *re return false, BucketRemoteTargetNotFound{Bucket: bucket} } -func mustReplicateWeb(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string, permErr APIErrorCode) (replicate bool, sync bool) { +type mustReplicateOptions struct { + meta map[string]string + status replication.StatusType + opType replication.Type +} + +func (o mustReplicateOptions) ReplicationStatus() (s replication.StatusType) { + if rs, ok := o.meta[xhttp.AmzBucketReplicationStatus]; ok { + return replication.StatusType(rs) + } + return s +} +func (o mustReplicateOptions) isExistingObjectReplication() bool { + return o.opType == replication.ExistingObjectReplicationType +} + +func (o mustReplicateOptions) isMetadataReplication() bool { + return o.opType == replication.MetadataReplicationType +} +func getMustReplicateOptions(o ObjectInfo, op replication.Type) mustReplicateOptions { + if !op.Valid() { + op = replication.ObjectReplicationType + if o.metadataOnly { + op = replication.MetadataReplicationType + } + } + meta := cloneMSS(o.UserDefined) + if o.UserTags != "" { + meta[xhttp.AmzObjectTagging] = o.UserTags + } + return mustReplicateOptions{ + meta: meta, + status: o.ReplicationStatus, + opType: op, + } +} +func mustReplicateWeb(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus replication.StatusType, permErr APIErrorCode) (replicate bool, sync bool) { if permErr != ErrNone { return } - return mustReplicater(ctx, bucket, object, meta, replStatus, false) + return mustReplicater(ctx, bucket, object, mustReplicateOptions{ + meta: meta, + status: replStatus, + opType: replication.ObjectReplicationType, + }) } // mustReplicate returns 2 booleans - true if object meets replication criteria and true if replication is to be done in // a synchronous manner. -func mustReplicate(ctx context.Context, r *http.Request, bucket, object string, meta map[string]string, replStatus string, metadataOnly bool) (replicate bool, sync bool) { +func mustReplicate(ctx context.Context, r *http.Request, bucket, object string, opts mustReplicateOptions) (replicate bool, sync bool) { if s3Err := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, "", r, iampolicy.GetReplicationConfigurationAction); s3Err != ErrNone { return } - return mustReplicater(ctx, bucket, object, meta, replStatus, metadataOnly) + return mustReplicater(ctx, bucket, object, opts) } // mustReplicater returns 2 booleans - true if object meets replication criteria and true if replication is to be done in // a synchronous manner. -func mustReplicater(ctx context.Context, bucket, object string, meta map[string]string, replStatus string, metadataOnly bool) (replicate bool, sync bool) { +func mustReplicater(ctx context.Context, bucket, object string, mopts mustReplicateOptions) (replicate bool, sync bool) { if globalIsGateway { return replicate, sync } - if rs, ok := meta[xhttp.AmzBucketReplicationStatus]; ok { - replStatus = rs - } - if replication.StatusType(replStatus) == replication.Replica && !metadataOnly { + replStatus := mopts.ReplicationStatus() + if replStatus == replication.Replica && !mopts.isMetadataReplication() { return replicate, sync } cfg, err := getReplicationConfig(ctx, bucket) @@ -124,11 +162,12 @@ func mustReplicater(ctx context.Context, bucket, object string, meta map[string] return replicate, sync } opts := replication.ObjectOpts{ - Name: object, - SSEC: crypto.SSEC.IsEncrypted(meta), - Replica: replication.StatusType(replStatus) == replication.Replica, + Name: object, + SSEC: crypto.SSEC.IsEncrypted(mopts.meta), + Replica: replStatus == replication.Replica, + ExistingObject: mopts.isExistingObjectReplication(), } - tagStr, ok := meta[xhttp.AmzObjectTagging] + tagStr, ok := mopts.meta[xhttp.AmzObjectTagging] if ok { opts.UserTags = tagStr } @@ -226,7 +265,7 @@ func checkReplicateDelete(ctx context.Context, bucket string, dobj ObjectToDelet // target cluster, the object version is marked deleted on the source and hidden from listing. It is permanently // deleted from the source when the VersionPurgeStatus changes to "Complete", i.e after replication succeeds // on target. -func replicateDelete(ctx context.Context, dobj DeletedObjectVersionInfo, objectAPI ObjectLayer) { +func replicateDelete(ctx context.Context, dobj DeletedObjectReplicationInfo, objectAPI ObjectLayer) { bucket := dobj.Bucket versionID := dobj.DeleteMarkerVersionID if versionID == "" { @@ -760,7 +799,9 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje if objInfo.UserTags != "" { objInfo.UserDefined[xhttp.AmzObjectTagging] = objInfo.UserTags } - + if ri.OpType == replication.ExistingObjectReplicationType { + objInfo.UserDefined[xhttp.MinIOReplicationResetStatus] = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), ri.ResetID) + } // FIXME: add support for missing replication events // - event.ObjectReplicationMissedThreshold // - event.ObjectReplicationReplicatedAfterThreshold @@ -774,7 +815,8 @@ func replicateObject(ctx context.Context, ri ReplicateObjectInfo, objectAPI Obje return } // Leave metadata in `PENDING` state if inline replication fails to save iops - if ri.OpType == replication.HealReplicationType || replicationStatus == replication.Completed { + if ri.OpType == replication.HealReplicationType || + replicationStatus == replication.Completed { // This lower level implementation is necessary to avoid write locks from CopyObject. poolIdx, err := z.getPoolIdx(ctx, bucket, object, objInfo.Size) if err != nil { @@ -834,10 +876,12 @@ func filterReplicationStatusMetadata(metadata map[string]string) map[string]stri return dst } -// DeletedObjectVersionInfo has info on deleted object -type DeletedObjectVersionInfo struct { +// DeletedObjectReplicationInfo has info on deleted object +type DeletedObjectReplicationInfo struct { DeletedObject - Bucket string + Bucket string + OpType replication.Type + ResetID string } var ( @@ -847,35 +891,40 @@ var ( // ReplicationPool describes replication pool type ReplicationPool struct { - objLayer ObjectLayer - ctx context.Context - mrfWorkerKillCh chan struct{} - workerKillCh chan struct{} - replicaCh chan ReplicateObjectInfo - replicaDeleteCh chan DeletedObjectVersionInfo - mrfReplicaCh chan ReplicateObjectInfo - workerSize int - mrfWorkerSize int - workerWg sync.WaitGroup - mrfWorkerWg sync.WaitGroup - once sync.Once - mu sync.Mutex + objLayer ObjectLayer + ctx context.Context + mrfWorkerKillCh chan struct{} + workerKillCh chan struct{} + replicaCh chan ReplicateObjectInfo + replicaDeleteCh chan DeletedObjectReplicationInfo + mrfReplicaCh chan ReplicateObjectInfo + existingReplicaCh chan ReplicateObjectInfo + existingReplicaDeleteCh chan DeletedObjectReplicationInfo + workerSize int + mrfWorkerSize int + workerWg sync.WaitGroup + mrfWorkerWg sync.WaitGroup + once sync.Once + mu sync.Mutex } // NewReplicationPool creates a pool of replication workers of specified size func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPoolOpts) *ReplicationPool { pool := &ReplicationPool{ - replicaCh: make(chan ReplicateObjectInfo, 100000), - replicaDeleteCh: make(chan DeletedObjectVersionInfo, 100000), - mrfReplicaCh: make(chan ReplicateObjectInfo, 100000), - workerKillCh: make(chan struct{}, opts.Workers), - mrfWorkerKillCh: make(chan struct{}, opts.FailedWorkers), - ctx: ctx, - objLayer: o, + replicaCh: make(chan ReplicateObjectInfo, 100000), + replicaDeleteCh: make(chan DeletedObjectReplicationInfo, 100000), + mrfReplicaCh: make(chan ReplicateObjectInfo, 100000), + workerKillCh: make(chan struct{}, opts.Workers), + mrfWorkerKillCh: make(chan struct{}, opts.FailedWorkers), + existingReplicaCh: make(chan ReplicateObjectInfo, 100000), + existingReplicaDeleteCh: make(chan DeletedObjectReplicationInfo, 100000), + ctx: ctx, + objLayer: o, } pool.ResizeWorkers(opts.Workers) pool.ResizeFailedWorkers(opts.FailedWorkers) + go pool.AddExistingObjectReplicateWorker() return pool } @@ -921,6 +970,26 @@ func (p *ReplicationPool) AddWorker() { } +// AddExistingObjectReplicateWorker adds a worker to queue existing objects that need to be sync'd +func (p *ReplicationPool) AddExistingObjectReplicateWorker() { + for { + select { + case <-p.ctx.Done(): + return + case oi, ok := <-p.existingReplicaCh: + if !ok { + return + } + replicateObject(p.ctx, oi, p.objLayer) + case doi, ok := <-p.existingReplicaDeleteCh: + if !ok { + return + } + replicateDelete(p.ctx, doi, p.objLayer) + } + } +} + // ResizeWorkers sets replication workers pool to new size func (p *ReplicationPool) ResizeWorkers(n int) { p.mu.Lock() @@ -962,6 +1031,7 @@ func (p *ReplicationPool) queueReplicaFailedTask(ri ReplicateObjectInfo) { p.once.Do(func() { close(p.replicaCh) close(p.mrfReplicaCh) + close(p.existingReplicaCh) }) case p.mrfReplicaCh <- ri: default: @@ -972,27 +1042,44 @@ func (p *ReplicationPool) queueReplicaTask(ri ReplicateObjectInfo) { if p == nil { return } + var ch chan ReplicateObjectInfo + switch ri.OpType { + case replication.ExistingObjectReplicationType: + ch = p.existingReplicaCh + default: + ch = p.replicaCh + } select { case <-GlobalContext.Done(): p.once.Do(func() { close(p.replicaCh) close(p.mrfReplicaCh) + close(p.existingReplicaCh) }) - case p.replicaCh <- ri: + case ch <- ri: default: } } -func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectVersionInfo) { +func (p *ReplicationPool) queueReplicaDeleteTask(doi DeletedObjectReplicationInfo) { if p == nil { return } + var ch chan DeletedObjectReplicationInfo + switch doi.OpType { + case replication.ExistingObjectReplicationType: + ch = p.existingReplicaDeleteCh + default: + ch = p.replicaDeleteCh + } + select { case <-GlobalContext.Done(): p.once.Do(func() { close(p.replicaDeleteCh) + close(p.existingReplicaDeleteCh) }) - case p.replicaDeleteCh <- doi: + case ch <- doi: default: } } @@ -1157,7 +1244,78 @@ func scheduleReplication(ctx context.Context, objInfo ObjectInfo, o ObjectLayer, } } -func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectVersionInfo, o ObjectLayer, sync bool) { +func scheduleReplicationDelete(ctx context.Context, dv DeletedObjectReplicationInfo, o ObjectLayer, sync bool) { globalReplicationPool.queueReplicaDeleteTask(dv) globalReplicationStats.Update(dv.Bucket, 0, replication.Pending, replication.StatusType(""), replication.DeleteReplicationType) } + +type replicationConfig struct { + Config *replication.Config + ResetID string + ResetBeforeDate time.Time +} + +func (c replicationConfig) Empty() bool { + return c.Config == nil +} +func (c replicationConfig) Replicate(opts replication.ObjectOpts) bool { + return c.Config.Replicate(opts) +} + +// Resync returns true if replication reset is requested +func (c replicationConfig) Resync(ctx context.Context, oi ObjectInfo) bool { + if c.Empty() { + return false + } + // existing object replication does not apply to un-versioned objects + if oi.VersionID == "" || oi.VersionID == nullVersionID { + return false + } + + var replicate bool + if oi.DeleteMarker { + if c.Replicate(replication.ObjectOpts{ + Name: oi.Name, + SSEC: crypto.SSEC.IsEncrypted(oi.UserDefined), + UserTags: oi.UserTags, + DeleteMarker: oi.DeleteMarker, + VersionID: oi.VersionID, + OpType: replication.DeleteReplicationType, + ExistingObject: true}) { + replicate = true + } + } else { + // Ignore previous replication status when deciding if object can be re-replicated + objInfo := oi.Clone() + objInfo.ReplicationStatus = replication.StatusType("") + replicate, _ = mustReplicater(ctx, oi.Bucket, oi.Name, getMustReplicateOptions(objInfo, replication.ExistingObjectReplicationType)) + } + return c.resync(oi, replicate) +} + +// wrapper function for testability. Returns true if a new reset is requested on +// already replicated objects OR object qualifies for existing object replication +// and no reset requested. +func (c replicationConfig) resync(oi ObjectInfo, replicate bool) bool { + if !replicate { + return false + } + rs, ok := oi.UserDefined[xhttp.MinIOReplicationResetStatus] + if !ok { // existing object replication is enabled and object version is unreplicated so far. + if c.ResetID != "" && oi.ModTime.Before(c.ResetBeforeDate) { // trigger replication if `mc replicate reset` requested + return true + } + return oi.ReplicationStatus != replication.Completed + } + if c.ResetID == "" || c.ResetBeforeDate.Equal(timeSentinel) { // no reset in progress + return false + } + // if already replicated, return true if a new reset was requested. + splits := strings.SplitN(rs, ";", 2) + newReset := splits[1] != c.ResetID + if !newReset && oi.ReplicationStatus == replication.Completed { + // already replicated and no reset requested + return false + } + return newReset && oi.ModTime.Before(c.ResetBeforeDate) +} diff --git a/cmd/bucket-replication_test.go b/cmd/bucket-replication_test.go new file mode 100644 index 000000000..f307c8389 --- /dev/null +++ b/cmd/bucket-replication_test.go @@ -0,0 +1,209 @@ +// Copyright (c) 2015-2021 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "context" + "fmt" + "net/http" + "testing" + "time" + + "github.com/minio/minio/internal/bucket/replication" + xhttp "github.com/minio/minio/internal/http" +) + +var configs = []replication.Config{ + { // Config0 - Replication config has no filters, existing object replication enabled + Rules: []replication.Rule{ + { + Status: replication.Enabled, + Priority: 1, + DeleteMarkerReplication: replication.DeleteMarkerReplication{Status: replication.Enabled}, + DeleteReplication: replication.DeleteReplication{Status: replication.Enabled}, + Filter: replication.Filter{}, + ExistingObjectReplication: replication.ExistingObjectReplication{Status: replication.Enabled}, + SourceSelectionCriteria: replication.SourceSelectionCriteria{ + ReplicaModifications: replication.ReplicaModifications{Status: replication.Enabled}, + }, + }, + }, + }, +} + +var replicationConfigTests = []struct { + info ObjectInfo + name string + rcfg replicationConfig + expectedSync bool +}{ + { //1. no replication config + name: "no replication config", + info: ObjectInfo{Size: 100}, + rcfg: replicationConfig{Config: nil}, + expectedSync: false, + }, + { //2. existing object replication config enabled, no versioning + name: "existing object replication config enabled, no versioning", + info: ObjectInfo{Size: 100}, + rcfg: replicationConfig{Config: &configs[0]}, + expectedSync: false, + }, + { //3. existing object replication config enabled, versioning suspended + name: "existing object replication config enabled, versioning suspended", + info: ObjectInfo{Size: 100, VersionID: nullVersionID}, + rcfg: replicationConfig{Config: &configs[0]}, + expectedSync: false, + }, + { //4. existing object replication enabled, versioning enabled; no reset in progress + name: "existing object replication enabled, versioning enabled; no reset in progress", + info: ObjectInfo{Size: 100, + ReplicationStatus: replication.Completed, + VersionID: "a3348c34-c352-4498-82f0-1098e8b34df9", + }, + rcfg: replicationConfig{Config: &configs[0]}, + expectedSync: false, + }, +} + +func TestReplicationResync(t *testing.T) { + ctx := context.Background() + for i, test := range replicationConfigTests { + if sync := test.rcfg.Resync(ctx, test.info); sync != test.expectedSync { + t.Errorf("Test%d (%s): Resync got %t , want %t", i+1, test.name, sync, test.expectedSync) + } + } +} + +var start = UTCNow().AddDate(0, 0, -1) +var replicationConfigTests2 = []struct { + info ObjectInfo + name string + replicate bool + rcfg replicationConfig + expectedSync bool +}{ + { // Cases 1-4: existing object replication enabled, versioning enabled, no reset - replication status varies + // 1: Pending replication + name: "existing object replication on object in Pending replication status", + info: ObjectInfo{Size: 100, + ReplicationStatus: replication.Pending, + VersionID: "a3348c34-c352-4498-82f0-1098e8b34df9", + }, + replicate: true, + expectedSync: true, + }, + { // 2. replication status Failed + name: "existing object replication on object in Failed replication status", + info: ObjectInfo{Size: 100, + ReplicationStatus: replication.Failed, + VersionID: "a3348c34-c352-4498-82f0-1098e8b34df9", + }, + replicate: true, + expectedSync: true, + }, + { //3. replication status unset + name: "existing object replication on pre-existing unreplicated object", + info: ObjectInfo{Size: 100, + ReplicationStatus: replication.StatusType(""), + VersionID: "a3348c34-c352-4498-82f0-1098e8b34df9", + }, + replicate: true, + expectedSync: true, + }, + { //4. replication status Complete + name: "existing object replication on object in Completed replication status", + info: ObjectInfo{Size: 100, + ReplicationStatus: replication.Completed, + VersionID: "a3348c34-c352-4498-82f0-1098e8b34df9", + }, + replicate: true, + expectedSync: false, + }, + { //5. existing object replication enabled, versioning enabled, replication status Pending & reset ID present + name: "existing object replication with reset in progress and object in Pending status", + info: ObjectInfo{Size: 100, + ReplicationStatus: replication.Pending, + VersionID: "a3348c34-c352-4498-82f0-1098e8b34df9", + }, + replicate: true, + expectedSync: true, + rcfg: replicationConfig{ResetID: "xyz", ResetBeforeDate: UTCNow()}, + }, + { //6. existing object replication enabled, versioning enabled, replication status Failed & reset ID present + name: "existing object replication with reset in progress and object in Failed status", + info: ObjectInfo{Size: 100, + ReplicationStatus: replication.Failed, + VersionID: "a3348c34-c352-4498-82f0-1098e8b34df9", + }, + replicate: true, + expectedSync: true, + rcfg: replicationConfig{ResetID: "xyz", ResetBeforeDate: UTCNow()}, + }, + { //7. existing object replication enabled, versioning enabled, replication status unset & reset ID present + name: "existing object replication with reset in progress and object never replicated before", + info: ObjectInfo{Size: 100, + ReplicationStatus: replication.StatusType(""), + VersionID: "a3348c34-c352-4498-82f0-1098e8b34df9", + }, + replicate: true, + expectedSync: true, + rcfg: replicationConfig{ResetID: "xyz", ResetBeforeDate: UTCNow()}, + }, + { //8. existing object replication enabled, versioning enabled, replication status Complete & reset ID present + name: "existing object replication enabled - reset in progress for an object in Completed status", + info: ObjectInfo{Size: 100, + ReplicationStatus: replication.Completed, + VersionID: "a3348c34-c352-4498-82f0-1098e8b34df8", + }, + replicate: true, + expectedSync: true, + rcfg: replicationConfig{ResetID: "xyz", ResetBeforeDate: UTCNow()}, + }, + { //9. existing object replication enabled, versioning enabled, replication status Pending & reset ID different + name: "existing object replication enabled, newer reset in progress on object in Pending replication status", + info: ObjectInfo{Size: 100, + ReplicationStatus: replication.Pending, + VersionID: "a3348c34-c352-4498-82f0-1098e8b34df9", + UserDefined: map[string]string{xhttp.MinIOReplicationResetStatus: fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), "xyz")}, + ModTime: UTCNow().AddDate(0, 0, -1), + }, + replicate: true, + expectedSync: true, + rcfg: replicationConfig{ResetID: "abc", ResetBeforeDate: UTCNow()}, + }, + { //10. existing object replication enabled, versioning enabled, replication status Complete & reset done + name: "reset done on object in Completed Status - ineligbile for re-replication", + info: ObjectInfo{Size: 100, + ReplicationStatus: replication.Completed, + VersionID: "a3348c34-c352-4498-82f0-1098e8b34df9", + UserDefined: map[string]string{xhttp.MinIOReplicationResetStatus: fmt.Sprintf("%s;%s", start.Format(http.TimeFormat), "xyz")}, + }, + replicate: true, + expectedSync: false, + rcfg: replicationConfig{ResetID: "xyz", ResetBeforeDate: UTCNow()}, + }, +} + +func TestReplicationResyncwrapper(t *testing.T) { + for i, test := range replicationConfigTests2 { + if sync := test.rcfg.resync(test.info, test.replicate); sync != test.expectedSync { + t.Errorf("%s (%s): Replicationresync got %t , want %t", fmt.Sprintf("Test%d - %s", i+1, time.Now().Format(http.TimeFormat)), test.name, sync, test.expectedSync) + } + } +} diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index 3157eb1ac..7bd21b9ba 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -375,7 +375,12 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int activeLifeCycle = f.oldCache.Info.lifeCycle filter = nil } - + // If there are replication rules for the prefix, remove the filter. + var replicationCfg replicationConfig + if !f.oldCache.Info.replication.Empty() && f.oldCache.Info.replication.Config.HasActiveRules(prefix, true) { + replicationCfg = f.oldCache.Info.replication + filter = nil + } // Check if we can skip it due to bloom filter... if filter != nil && ok && existing.Compacted { // If folder isn't in filter and we have data, skip it completely. @@ -449,16 +454,16 @@ func (f *folderScanner) scanFolder(ctx context.Context, folder cachedFolder, int // Get file size, ignore errors. item := scannerItem{ - Path: path.Join(f.root, entName), - Typ: typ, - bucket: bucket, - prefix: path.Dir(prefix), - objectName: path.Base(entName), - debug: f.dataUsageScannerDebug, - lifeCycle: activeLifeCycle, - heal: thisHash.mod(f.oldCache.Info.NextCycle, f.healObjectSelect/folder.objectHealProbDiv) && globalIsErasure, + Path: path.Join(f.root, entName), + Typ: typ, + bucket: bucket, + prefix: path.Dir(prefix), + objectName: path.Base(entName), + debug: f.dataUsageScannerDebug, + lifeCycle: activeLifeCycle, + replication: replicationCfg, + heal: thisHash.mod(f.oldCache.Info.NextCycle, f.healObjectSelect/folder.objectHealProbDiv) && globalIsErasure, } - // if the drive belongs to an erasure set // that is already being healed, skip the // healing attempt on this drive. @@ -808,12 +813,13 @@ type scannerItem struct { Path string Typ os.FileMode - bucket string // Bucket. - prefix string // Only the prefix if any, does not have final object name. - objectName string // Only the object name without prefixes. - lifeCycle *lifecycle.Lifecycle - heal bool // Has the object been selected for heal check? - debug bool + bucket string // Bucket. + prefix string // Only the prefix if any, does not have final object name. + objectName string // Only the object name without prefixes. + lifeCycle *lifecycle.Lifecycle + replication replicationConfig + heal bool // Has the object been selected for heal check? + debug bool } type sizeSummary struct { @@ -1140,33 +1146,50 @@ func (i *scannerItem) objectPath() string { // healReplication will heal a scanned item that has failed replication. func (i *scannerItem) healReplication(ctx context.Context, o ObjectLayer, oi ObjectInfo, sizeS *sizeSummary) { + existingObjResync := i.replication.Resync(ctx, oi) if oi.DeleteMarker || !oi.VersionPurgeStatus.Empty() { // heal delete marker replication failure or versioned delete replication failure if oi.ReplicationStatus == replication.Pending || oi.ReplicationStatus == replication.Failed || oi.VersionPurgeStatus == Failed || oi.VersionPurgeStatus == Pending { - i.healReplicationDeletes(ctx, o, oi) + i.healReplicationDeletes(ctx, o, oi, existingObjResync) return } + // if replication status is Complete on DeleteMarker and existing object resync required + if existingObjResync && oi.ReplicationStatus == replication.Completed { + i.healReplicationDeletes(ctx, o, oi, existingObjResync) + return + } + return + } + roi := ReplicateObjectInfo{ObjectInfo: oi, OpType: replication.HealReplicationType} + if existingObjResync { + roi.OpType = replication.ExistingObjectReplicationType + roi.ResetID = i.replication.ResetID } switch oi.ReplicationStatus { case replication.Pending: sizeS.pendingCount++ sizeS.pendingSize += oi.Size - globalReplicationPool.queueReplicaTask(ReplicateObjectInfo{ObjectInfo: oi, OpType: replication.HealReplicationType}) + globalReplicationPool.queueReplicaTask(roi) + return case replication.Failed: sizeS.failedSize += oi.Size sizeS.failedCount++ - globalReplicationPool.queueReplicaTask(ReplicateObjectInfo{ObjectInfo: oi, OpType: replication.HealReplicationType}) + globalReplicationPool.queueReplicaTask(roi) + return case replication.Completed, "COMPLETE": sizeS.replicatedSize += oi.Size case replication.Replica: sizeS.replicaSize += oi.Size } + if existingObjResync { + globalReplicationPool.queueReplicaTask(roi) + } } // healReplicationDeletes will heal a scanned deleted item that failed to replicate deletes. -func (i *scannerItem) healReplicationDeletes(ctx context.Context, o ObjectLayer, oi ObjectInfo) { +func (i *scannerItem) healReplicationDeletes(ctx context.Context, o ObjectLayer, oi ObjectInfo, existingObject bool) { // handle soft delete and permanent delete failures here. if oi.DeleteMarker || !oi.VersionPurgeStatus.Empty() { versionID := "" @@ -1176,7 +1199,7 @@ func (i *scannerItem) healReplicationDeletes(ctx context.Context, o ObjectLayer, } else { versionID = oi.VersionID } - globalReplicationPool.queueReplicaDeleteTask(DeletedObjectVersionInfo{ + doi := DeletedObjectReplicationInfo{ DeletedObject: DeletedObject{ ObjectName: oi.Name, DeleteMarkerVersionID: dmVersionID, @@ -1187,7 +1210,12 @@ func (i *scannerItem) healReplicationDeletes(ctx context.Context, o ObjectLayer, VersionPurgeStatus: oi.VersionPurgeStatus, }, Bucket: oi.Bucket, - }) + } + if existingObject { + doi.OpType = replication.ExistingObjectReplicationType + doi.ResetID = i.replication.ResetID + } + globalReplicationPool.queueReplicaDeleteTask(doi) } } diff --git a/cmd/data-usage-cache.go b/cmd/data-usage-cache.go index b6ade0e6c..12893d252 100644 --- a/cmd/data-usage-cache.go +++ b/cmd/data-usage-cache.go @@ -161,7 +161,8 @@ type dataUsageCacheInfo struct { // optional updates channel. // If set updates will be sent regularly to this channel. // Will not be closed when returned. - updates chan<- dataUsageEntry `msg:"-"` + updates chan<- dataUsageEntry `msg:"-"` + replication replicationConfig `msg:"-"` } func (e *dataUsageEntry) addSizes(summary sizeSummary) { diff --git a/cmd/object-api-datatypes.go b/cmd/object-api-datatypes.go index 6a5b6d9c5..3538da6bd 100644 --- a/cmd/object-api-datatypes.go +++ b/cmd/object-api-datatypes.go @@ -230,6 +230,7 @@ type ReplicateObjectInfo struct { ObjectInfo OpType replication.Type RetryCount uint32 + ResetID string } // MultipartInfo captures metadata information about the uploadId diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index e9c07ae1b..fee5903b8 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -1291,7 +1291,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re if rs := r.Header.Get(xhttp.AmzBucketReplicationStatus); rs != "" { srcInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = rs } - if ok, _ := mustReplicate(ctx, r, dstBucket, dstObject, srcInfo.UserDefined, srcInfo.ReplicationStatus.String(), srcInfo.metadataOnly); ok { + if ok, _ := mustReplicate(ctx, r, dstBucket, dstObject, getMustReplicateOptions(srcInfo, replication.UnsetReplicationType)); ok { srcInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() } // Store the preserved compression metadata. @@ -1393,7 +1393,8 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re objInfo.ETag = getDecryptedETag(r.Header, objInfo, false) response := generateCopyObjectResponse(objInfo.ETag, objInfo.ModTime) encodedSuccessResponse := encodeResponse(response) - if replicate, sync := mustReplicate(ctx, r, dstBucket, dstObject, objInfo.UserDefined, objInfo.ReplicationStatus.String(), objInfo.metadataOnly); replicate { + + if replicate, sync := mustReplicate(ctx, r, dstBucket, dstObject, getMustReplicateOptions(objInfo, replication.UnsetReplicationType)); replicate { scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.ObjectReplicationType) } @@ -1637,7 +1638,9 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r)) return } - if ok, _ := mustReplicate(ctx, r, bucket, object, metadata, "", false); ok { + if ok, _ := mustReplicate(ctx, r, bucket, object, getMustReplicateOptions(ObjectInfo{ + UserDefined: metadata, + }, replication.ObjectReplicationType)); ok { metadata[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() } if r.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String() { @@ -1720,7 +1723,9 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req } } } - if replicate, sync := mustReplicate(ctx, r, bucket, object, metadata, "", false); replicate { + if replicate, sync := mustReplicate(ctx, r, bucket, object, getMustReplicateOptions(ObjectInfo{ + UserDefined: metadata, + }, replication.ObjectReplicationType)); replicate { scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.ObjectReplicationType) } @@ -1956,7 +1961,9 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h return } - if ok, _ := mustReplicate(ctx, r, bucket, object, metadata, "", false); ok { + if ok, _ := mustReplicate(ctx, r, bucket, object, getMustReplicateOptions(ObjectInfo{ + UserDefined: metadata, + }, replication.ObjectReplicationType)); ok { metadata[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() } @@ -2012,7 +2019,9 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h return } - if replicate, sync := mustReplicate(ctx, r, bucket, object, metadata, "", false); replicate { + if replicate, sync := mustReplicate(ctx, r, bucket, object, getMustReplicateOptions(ObjectInfo{ + UserDefined: metadata, + }, replication.ObjectReplicationType)); replicate { scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.ObjectReplicationType) } @@ -2124,7 +2133,9 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r)) return } - if ok, _ := mustReplicate(ctx, r, bucket, object, metadata, "", false); ok { + if ok, _ := mustReplicate(ctx, r, bucket, object, getMustReplicateOptions(ObjectInfo{ + UserDefined: metadata, + }, replication.ObjectReplicationType)); ok { metadata[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() } // We need to preserve the encryption headers set in EncryptRequest, @@ -3114,7 +3125,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite } setPutObjHeaders(w, objInfo, false) - if replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, objInfo.ReplicationStatus.String(), false); replicate { + if replicate, sync := mustReplicate(ctx, r, bucket, object, getMustReplicateOptions(objInfo, replication.ObjectReplicationType)); replicate { scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync, replication.ObjectReplicationType) } @@ -3288,7 +3299,7 @@ func (api objectAPIHandlers) DeleteObjectHandler(w http.ResponseWriter, r *http. } else { versionID = objInfo.VersionID } - dobj := DeletedObjectVersionInfo{ + dobj := DeletedObjectReplicationInfo{ DeletedObject: DeletedObject{ ObjectName: object, VersionID: versionID, @@ -3375,7 +3386,7 @@ func (api objectAPIHandlers) PutObjectLegalHoldHandler(w http.ResponseWriter, r return } objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = strings.ToUpper(string(legalHold.Status)) - replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, objInfo.ReplicationStatus.String(), true) + replicate, sync := mustReplicate(ctx, r, bucket, object, getMustReplicateOptions(objInfo, replication.MetadataReplicationType)) if replicate { objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() } @@ -3554,7 +3565,7 @@ func (api objectAPIHandlers) PutObjectRetentionHandler(w http.ResponseWriter, r objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockMode)] = "" objInfo.UserDefined[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = "" } - replicate, sync := mustReplicate(ctx, r, bucket, object, objInfo.UserDefined, objInfo.ReplicationStatus.String(), true) + replicate, sync := mustReplicate(ctx, r, bucket, object, getMustReplicateOptions(objInfo, replication.MetadataReplicationType)) if replicate { objInfo.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() } @@ -3751,14 +3762,16 @@ func (api objectAPIHandlers) PutObjectTaggingHandler(w http.ResponseWriter, r *h writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } - replicate, sync := mustReplicate(ctx, r, bucket, object, map[string]string{xhttp.AmzObjectTagging: tags.String()}, objInfo.ReplicationStatus.String(), true) + tagsStr := tags.String() + + oi := objInfo.Clone() + oi.UserTags = tagsStr + replicate, sync := mustReplicate(ctx, r, bucket, object, getMustReplicateOptions(oi, replication.MetadataReplicationType)) if replicate { opts.UserDefined = make(map[string]string) opts.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() } - tagsStr := tags.String() - // Put object tags objInfo, err = objAPI.PutObjectTags(ctx, bucket, object, tagsStr, opts) if err != nil { @@ -3828,7 +3841,7 @@ func (api objectAPIHandlers) DeleteObjectTaggingHandler(w http.ResponseWriter, r writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) return } - replicate, sync := mustReplicate(ctx, r, bucket, object, map[string]string{xhttp.AmzObjectTagging: oi.UserTags}, oi.ReplicationStatus.String(), true) + replicate, sync := mustReplicate(ctx, r, bucket, object, getMustReplicateOptions(oi, replication.MetadataReplicationType)) if replicate { opts.UserDefined = make(map[string]string) opts.UserDefined[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index 12ea6980a..dd031315d 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -814,7 +814,7 @@ next: }) if replicateDel { - dobj := DeletedObjectVersionInfo{ + dobj := DeletedObjectReplicationInfo{ DeletedObject: DeletedObject{ ObjectName: objectName, DeleteMarkerVersionID: oi.VersionID, @@ -948,7 +948,7 @@ next: Host: sourceIP, }) if dobj.DeleteMarkerReplicationStatus == string(replication.Pending) || dobj.VersionPurgeStatus == Pending { - dv := DeletedObjectVersionInfo{ + dv := DeletedObjectReplicationInfo{ DeletedObject: dobj, Bucket: args.BucketName, } @@ -1256,7 +1256,7 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { } } - mustReplicate, sync := mustReplicateWeb(ctx, r, bucket, object, metadata, "", replPerms) + mustReplicate, sync := mustReplicateWeb(ctx, r, bucket, object, metadata, replication.StatusType(""), replPerms) if mustReplicate { metadata[xhttp.AmzBucketReplicationStatus] = string(replication.Pending) } diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index e5df9c425..3c55b4d45 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -414,6 +414,19 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates } } + // Check if the current bucket has replication configuration + if rcfg, err := globalBucketMetadataSys.GetReplicationConfig(ctx, cache.Info.Name); err == nil { + if rcfg.HasActiveRules("", true) { + tgt := globalBucketTargetSys.GetRemoteBucketTargetByArn(ctx, cache.Info.Name, rcfg.RoleArn) + cache.Info.replication = replicationConfig{ + Config: rcfg, + ResetID: tgt.ResetID, + ResetBeforeDate: tgt.ResetBeforeDate} + if intDataUpdateTracker.debug { + console.Debugln(color.Green("scannerDisk:") + " replication: Active rules found") + } + } + } // return initialized object layer objAPI := newObjectLayerFn() // object layer not initialized, return. @@ -452,7 +465,6 @@ func (s *xlStorage) NSScanner(ctx context.Context, cache dataUsageCache, updates } return sizeSummary{}, errSkipFile } - sizeS := sizeSummary{} for _, version := range fivs.Versions { oi := version.ToObjectInfo(item.bucket, item.objectPath()) diff --git a/docs/bucket/replication/DESIGN.md b/docs/bucket/replication/DESIGN.md index 9a3ca7297..0fa9b3d1b 100644 --- a/docs/bucket/replication/DESIGN.md +++ b/docs/bucket/replication/DESIGN.md @@ -33,7 +33,11 @@ An additional header `X-Minio-Replication-Delete-Status` is returned which would Note that synchronous replication, i.e. when remote target is configured with --sync mode in `mc admin bucket remote add` does not apply to `DELETE` operations. The version being deleted on the source cluster needs to maintain state and ensure that the operation is mirrored to the target cluster prior to completing on the source object version. Since this needs to account for the target cluster availability and the need to serialize concurrent DELETE operations on different versions of the same object during multi DELETE operations, the current implementation queues the `DELETE` operations in both sync and async modes. -Existing object replication, replica modification sync for 2-way replication and multi site replication are currently not supported. +### Existing object replication +Existing object replication works similar to regular replication. Objects qualifying for existing object replication are detected when scanner runs, and will be replicated if existing object replication is enabled and applicable replication rules are satisfied. Because replication depends on the immutability of versions, only pre-existing objects created while versioning was enabled can be replicated. Even if replication rules are disabled and re-enabled later, the objects created during the interim will be synced as the scanner queues them. For saving iops, objects qualifying for +existing object replication are not marked as `PENDING` prior to replication. + +If the remote site is fully lost and objects previously replicated need to be re-synced, the `mc replicate reset` command with optional flag of `--older-than` needs to be used to trigger re-syncing of previously replicated objects. This command generates a ResetID which is a unique UUID saved to the remote target config along with the applicable date(defaults to time of initiating the reset). All objects created prior to this date are eligible for re-replication if existing object replication is enabled for the replication rule the object satisifes. At the time of completion of replication, `X-Minio-Replication-Reset-Status` is set in the metadata with the timestamp of replication and ResetID. For saving iops, the objects which are re-replicated are not first set to `PENDING` state. ### Internal metadata for replication diff --git a/docs/bucket/replication/README.md b/docs/bucket/replication/README.md index 1962297cc..c9edeb419 100644 --- a/docs/bucket/replication/README.md +++ b/docs/bucket/replication/README.md @@ -202,6 +202,21 @@ remote replication target using the `mc admin bucket remote add` command mc admin bucket remote add myminio/srcbucket https://accessKey:secretKey@replica-endpoint:9000/destbucket --service replication --region us-east-1 --sync --healthcheck-seconds 100 ``` +### Existing object replication +Existing object replication as detailed [here](https://aws.amazon.com/blogs/storage/replicating-existing-objects-between-s3-buckets/) can be enabled by passing `existing-objects` as a value to `--replicate` flag while adding or editing a replication rule. + +Once existing object replication is enabled, all objects or object prefixes that satisfy the replication rules and were created prior to adding replication configuration OR while replication rules were disabled will be synced to the target cluster. Depending on the number of previously existing objects, the existing objects that are now eligible to be replicated will eventually be synced to the target cluster as the scanner schedules them. This may be slower depending on the load on the cluster, latency and size of the namespace. + +Note that existing object replication requires that the objects being replicated were created while versioning was enabled. Objects with null version (i.e. those created when versioning is not enabled or versioning is suspended) will not be replicated. + +In the rare event that target DR site is entirely lost and previously replicated objects to the DR cluster need to be re-replicated, `mc replicate reset alias/bucket` can be used to initiate a reset. This would initiate a re-sync between the two clusters on a lower priority as the scanner picks up these objects to re-sync. + +This is an expensive operation and should be initiated only once - progress of the syncing can be monitored by looking at Prometheus metrics. If object version has been re-replicated, `mc stat --vid --debug` on this version shows an additional header `X-Minio-Replication-Reset-Status` with the replication timestamp and ResetID generated at the time of issuing the `mc replicate reset` command. + +Note that ExistingObjectReplication needs to be enabled in the config via `mc replicate [add|edit]` by passing `existing-objects` as one of the values to `--replicate` flag. Only those objects meeting replication rules and having existing object replication enabled will be re-synced. + +Multi site replication is currently not supported. + ## Explore Further - [MinIO Bucket Replication Design](https://github.com/minio/minio/blob/master/docs/bucket/replication/DESIGN.md) - [MinIO Bucket Versioning Implementation](https://docs.minio.io/docs/minio-bucket-versioning-guide.html) diff --git a/go.mod b/go.mod index 4d71d5250..3d945df89 100644 --- a/go.mod +++ b/go.mod @@ -47,7 +47,7 @@ require ( github.com/minio/madmin-go v1.0.9 github.com/minio/minio-go/v7 v7.0.11-0.20210302210017-6ae69c73ce78 github.com/minio/parquet-go v1.0.0 - github.com/minio/pkg v1.0.3 + github.com/minio/pkg v1.0.4 github.com/minio/rpc v1.0.0 github.com/minio/selfupdate v0.3.1 github.com/minio/sha256-simd v1.0.0 diff --git a/go.sum b/go.sum index 4a8f18c8a..39e0d4c6f 100644 --- a/go.sum +++ b/go.sum @@ -494,6 +494,8 @@ github.com/minio/parquet-go v1.0.0 h1:fcWsEvub04Nsl/4hiRBDWlbqd6jhacQieV07a+nhiI github.com/minio/parquet-go v1.0.0/go.mod h1:aQlkSOfOq2AtQKkuou3mosNVMwNokd+faTacxxk/oHA= github.com/minio/pkg v1.0.3 h1:tUhM6lG/BdNB0+5f2RbE4ifCAYwMs6cRJnZ/AY0WIeQ= github.com/minio/pkg v1.0.3/go.mod h1:obU54TZ9QlMv0TRaDgQ/JTzf11ZSXxnSfLrm4tMtBP8= +github.com/minio/pkg v1.0.4 h1:+BmaCENP6BaMm9PsGK6L1L5MKulWDxl4qobvJYf6m/E= +github.com/minio/pkg v1.0.4/go.mod h1:obU54TZ9QlMv0TRaDgQ/JTzf11ZSXxnSfLrm4tMtBP8= github.com/minio/rpc v1.0.0 h1:tJCHyLfQF6k6HlMQFpKy2FO/7lc2WP8gLDGMZp18E70= github.com/minio/rpc v1.0.0/go.mod h1:b9xqF7J0xeMXr0cM4pnBlP7Te7PDsG5JrRxl5dG6Ldk= github.com/minio/selfupdate v0.3.1 h1:BWEFSNnrZVMUWXbXIgLDNDjbejkmpAmZvy/nCz1HlEs= diff --git a/internal/bucket/replication/replication.go b/internal/bucket/replication/replication.go index 1f1187221..e221e7de1 100644 --- a/internal/bucket/replication/replication.go +++ b/internal/bucket/replication/replication.go @@ -131,23 +131,31 @@ type Type int // Types of replication const ( - ObjectReplicationType Type = 1 + iota + UnsetReplicationType Type = 0 + iota + ObjectReplicationType DeleteReplicationType MetadataReplicationType HealReplicationType + ExistingObjectReplicationType ) +// Valid returns true if replication type is set +func (t Type) Valid() bool { + return t > 0 +} + // ObjectOpts provides information to deduce whether replication // can be triggered on the resultant object. type ObjectOpts struct { - Name string - UserTags string - VersionID string - IsLatest bool - DeleteMarker bool - SSEC bool - OpType Type - Replica bool + Name string + UserTags string + VersionID string + IsLatest bool + DeleteMarker bool + SSEC bool + OpType Type + Replica bool + ExistingObject bool } // FilterActionableRules returns the rules actions that need to be executed @@ -191,6 +199,9 @@ func (c Config) Replicate(obj ObjectOpts) bool { if rule.Status == Disabled { continue } + if obj.ExistingObject && rule.ExistingObjectReplication.Status == Disabled { + return false + } if obj.OpType == DeleteReplicationType { switch { case obj.VersionID != "": diff --git a/internal/bucket/replication/rule.go b/internal/bucket/replication/rule.go index bdc1adeed..094448f28 100644 --- a/internal/bucket/replication/rule.go +++ b/internal/bucket/replication/rule.go @@ -90,6 +90,43 @@ func (d *DeleteReplication) UnmarshalXML(dec *xml.Decoder, start xml.StartElemen return nil } +// ExistingObjectReplication - whether existing object replication is enabled +type ExistingObjectReplication struct { + Status Status `xml:"Status"` // should be set to "Disabled" by default +} + +// IsEmpty returns true if ExistingObjectReplication is not set +func (e ExistingObjectReplication) IsEmpty() bool { + return len(e.Status) == 0 +} + +// Validate validates whether the status is disabled. +func (e ExistingObjectReplication) Validate() error { + if e.IsEmpty() { + return nil + } + if e.Status != Disabled && e.Status != Enabled { + return errInvalidExistingObjectReplicationStatus + } + return nil +} + +// UnmarshalXML - decodes XML data. Default to Disabled unless specified +func (e *ExistingObjectReplication) UnmarshalXML(dec *xml.Decoder, start xml.StartElement) (err error) { + // Make subtype to avoid recursive UnmarshalXML(). + type existingObjectReplication ExistingObjectReplication + erep := existingObjectReplication{} + + if err := dec.DecodeElement(&erep, &start); err != nil { + return err + } + if len(erep.Status) == 0 { + erep.Status = Disabled + } + e.Status = erep.Status + return nil +} + // Rule - a rule for replication configuration. type Rule struct { XMLName xml.Name `xml:"Rule" json:"Rule"` @@ -98,22 +135,24 @@ type Rule struct { Priority int `xml:"Priority" json:"Priority"` DeleteMarkerReplication DeleteMarkerReplication `xml:"DeleteMarkerReplication" json:"DeleteMarkerReplication"` // MinIO extension to replicate versioned deletes - DeleteReplication DeleteReplication `xml:"DeleteReplication" json:"DeleteReplication"` - Destination Destination `xml:"Destination" json:"Destination"` - SourceSelectionCriteria SourceSelectionCriteria `xml:"SourceSelectionCriteria" json:"SourceSelectionCriteria"` - Filter Filter `xml:"Filter" json:"Filter"` + DeleteReplication DeleteReplication `xml:"DeleteReplication" json:"DeleteReplication"` + Destination Destination `xml:"Destination" json:"Destination"` + SourceSelectionCriteria SourceSelectionCriteria `xml:"SourceSelectionCriteria" json:"SourceSelectionCriteria"` + Filter Filter `xml:"Filter" json:"Filter"` + ExistingObjectReplication ExistingObjectReplication `xml:"ExistingObjectReplication,omitempty" json:"ExistingObjectReplication,omitempty"` } var ( - errInvalidRuleID = Errorf("ID must be less than 255 characters") - errEmptyRuleStatus = Errorf("Status should not be empty") - errInvalidRuleStatus = Errorf("Status must be set to either Enabled or Disabled") - errDeleteMarkerReplicationMissing = Errorf("DeleteMarkerReplication must be specified") - errPriorityMissing = Errorf("Priority must be specified") - errInvalidDeleteMarkerReplicationStatus = Errorf("Delete marker replication status is invalid") - errDestinationSourceIdentical = Errorf("Destination bucket cannot be the same as the source bucket.") - errDeleteReplicationMissing = Errorf("Delete replication must be specified") - errInvalidDeleteReplicationStatus = Errorf("Delete replication is either enable|disable") + errInvalidRuleID = Errorf("ID must be less than 255 characters") + errEmptyRuleStatus = Errorf("Status should not be empty") + errInvalidRuleStatus = Errorf("Status must be set to either Enabled or Disabled") + errDeleteMarkerReplicationMissing = Errorf("DeleteMarkerReplication must be specified") + errPriorityMissing = Errorf("Priority must be specified") + errInvalidDeleteMarkerReplicationStatus = Errorf("Delete marker replication status is invalid") + errDestinationSourceIdentical = Errorf("Destination bucket cannot be the same as the source bucket.") + errDeleteReplicationMissing = Errorf("Delete replication must be specified") + errInvalidDeleteReplicationStatus = Errorf("Delete replication is either enable|disable") + errInvalidExistingObjectReplicationStatus = Errorf("Existing object replication status is invalid") ) // validateID - checks if ID is valid or not. @@ -200,7 +239,7 @@ func (r Rule) Validate(bucket string, sameTarget bool) error { if r.Destination.Bucket == bucket && sameTarget { return errDestinationSourceIdentical } - return nil + return r.ExistingObjectReplication.Validate() } // MetadataReplicate returns true if object is not a replica or in the case of replicas, diff --git a/internal/http/headers.go b/internal/http/headers.go index 14f0a5f6e..e5780134a 100644 --- a/internal/http/headers.go +++ b/internal/http/headers.go @@ -170,6 +170,9 @@ const ( MinIOSourceProxyRequest = "X-Minio-Source-Proxy-Request" // Header indicates that this request is a replication request to create a REPLICA MinIOSourceReplicationRequest = "X-Minio-Source-Replication-Request" + // Header indicates replication reset status. + MinIOReplicationResetStatus = "X-Minio-Replication-Reset-Status" + // predicted date/time of transition MinIOTransition = "X-Minio-Transition" )