From ed3418c04665675ca047f692a47e1d83d7ab0888 Mon Sep 17 00:00:00 2001 From: Poorna Date: Thu, 10 Feb 2022 10:16:52 -0800 Subject: [PATCH] Refactor replication resync to be an active process (#14266) When resync is triggered, walk the bucket namespace and resync objects that are unreplicated. This PR also adds an API to report resync progress. --- cmd/api-router.go | 8 +- cmd/bucket-handlers.go | 110 ++- cmd/bucket-metadata.go | 1 + cmd/bucket-replication-utils.go | 100 ++- cmd/bucket-replication-utils_gen.go | 1003 ++++++++++++++++++++-- cmd/bucket-replication-utils_gen_test.go | 226 +++++ cmd/bucket-replication.go | 360 +++++++- cmd/erasure-server-pool.go | 8 +- cmd/object-api-interface.go | 3 +- cmd/server-main.go | 2 + docs/bucket/replication/DESIGN.md | 31 +- docs/bucket/replication/README.md | 4 +- 12 files changed, 1717 insertions(+), 139 deletions(-) diff --git a/cmd/api-router.go b/cmd/api-router.go index 58c5ef363..19baacbd3 100644 --- a/cmd/api-router.go +++ b/cmd/api-router.go @@ -343,6 +343,9 @@ func registerAPIRouter(router *mux.Router) { // ListenNotification router.Methods(http.MethodGet).HandlerFunc( collectAPIStats("listennotification", maxClients(gz(httpTraceAll(api.ListenNotificationHandler))))).Queries("events", "{events:.*}") + // ResetBucketReplicationStatus - MinIO extension API + router.Methods(http.MethodGet).HandlerFunc( + collectAPIStats("resetbucketreplicationstatus", maxClients(gz(httpTraceAll(api.ResetBucketReplicationStatusHandler))))).Queries("replication-reset-status", "") // Dummy Bucket Calls // GetBucketACL -- this is a dummy call. @@ -417,9 +420,10 @@ func registerAPIRouter(router *mux.Router) { // PutBucketNotification router.Methods(http.MethodPut).HandlerFunc( collectAPIStats("putbucketnotification", maxClients(gz(httpTraceAll(api.PutBucketNotificationHandler))))).Queries("notification", "") - // ResetBucketReplicationState - MinIO extension API + // ResetBucketReplicationStart - MinIO extension API router.Methods(http.MethodPut).HandlerFunc( - collectAPIStats("resetbucketreplicationstate", maxClients(gz(httpTraceAll(api.ResetBucketReplicationStateHandler))))).Queries("replication-reset", "") + collectAPIStats("resetbucketreplicationstart", maxClients(gz(httpTraceAll(api.ResetBucketReplicationStartHandler))))).Queries("replication-reset", "") + // PutBucket router.Methods(http.MethodPut).HandlerFunc( collectAPIStats("putbucket", maxClients(gz(httpTraceAll(api.PutBucketHandler))))) diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index fa92f50c0..235d93cde 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -1301,7 +1301,7 @@ func (api objectAPIHandlers) DeleteBucketHandler(w http.ResponseWriter, r *http. } globalNotificationSys.DeleteBucketMetadata(ctx, bucket) - + globalReplicationPool.deleteResyncMetadata(ctx, bucket) // Call site replication hook. if err := globalSiteReplicationSys.DeleteBucketHook(ctx, bucket, forceDelete); err != nil { writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) @@ -1760,12 +1760,13 @@ func (api objectAPIHandlers) GetBucketReplicationMetricsHandler(w http.ResponseW } } -// ResetBucketReplicationStateHandler - starts a replication reset for all objects in a bucket which +// ResetBucketReplicationStartHandler - starts a replication reset for all objects in a bucket which // qualify for replication and re-sync the object(s) to target, provided ExistingObjectReplication is // enabled for the qualifying rule. This API is a MinIO only extension provided for situations where -// remote target is entirely lost,and previously replicated objects need to be re-synced. -func (api objectAPIHandlers) ResetBucketReplicationStateHandler(w http.ResponseWriter, r *http.Request) { - ctx := newContext(r, w, "ResetBucketReplicationState") +// remote target is entirely lost,and previously replicated objects need to be re-synced. If resync is +// already in progress it returns an error +func (api objectAPIHandlers) ResetBucketReplicationStartHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "ResetBucketReplicationStart") defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) vars := mux.Vars(r) @@ -1789,6 +1790,7 @@ func (api objectAPIHandlers) ResetBucketReplicationStateHandler(w http.ResponseW }), r.URL) } } + resetBeforeDate := UTCNow().AddDate(0, 0, -1*int(days/24)) objectAPI := api.ObjectAPI() if objectAPI == nil { @@ -1825,7 +1827,7 @@ func (api objectAPIHandlers) ResetBucketReplicationStateHandler(w http.ResponseW if len(tgtArns) == 0 { writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrBadRequest, InvalidArgument{ Bucket: bucket, - Err: fmt.Errorf("Remote target ARN %s missing/not eligible for replication resync", arn), + Err: fmt.Errorf("Remote target ARN %s missing or ineligible for replication resync", arn), }), r.URL) return } @@ -1849,22 +1851,90 @@ func (api objectAPIHandlers) ResetBucketReplicationStateHandler(w http.ResponseW default: writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) } + } + if err := startReplicationResync(ctx, bucket, arn, resetID, resetBeforeDate, objectAPI); err != nil { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrBadRequest, InvalidArgument{ + Bucket: bucket, + Err: err, + }), r.URL) return } - targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket) - if err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } - tgtBytes, err := json.Marshal(&targets) - if err != nil { - writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrAdminConfigBadJSON, err), r.URL) - return - } - if err = globalBucketMetadataSys.Update(ctx, bucket, bucketTargetsFile, tgtBytes); err != nil { - writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) - return - } + + data, err := json.Marshal(rinfo) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + // Write success response. + writeSuccessResponseJSON(w, data) +} + +// ResetBucketReplicationStatusHandler - returns the status of replication reset. +// This API is a MinIO only extension +func (api objectAPIHandlers) ResetBucketReplicationStatusHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "ResetBucketReplicationStatus") + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + vars := mux.Vars(r) + bucket := vars["bucket"] + arn := r.URL.Query().Get("arn") + var err error + + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL) + return + } + + if s3Error := checkRequestAuthType(ctx, r, policy.ResetBucketReplicationStateAction, bucket, ""); s3Error != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL) + return + } + + // Check if bucket exists. + if _, err := objectAPI.GetBucketInfo(ctx, bucket); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + if _, err := globalBucketMetadataSys.GetReplicationConfig(ctx, bucket); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + globalReplicationPool.resyncState.RLock() + brs, ok := globalReplicationPool.resyncState.statusMap[bucket] + if !ok { + brs, err = loadBucketResyncMetadata(ctx, bucket, objectAPI) + if err != nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErrWithErr(ErrBadRequest, InvalidArgument{ + Bucket: bucket, + Err: fmt.Errorf("No replication resync status available for %s", arn), + }), r.URL) + } + return + } + + var rinfo ResyncTargetsInfo + for tarn, st := range brs.TargetsMap { + if arn != "" && tarn != arn { + continue + } + rinfo.Targets = append(rinfo.Targets, ResyncTarget{ + Arn: tarn, + ResetID: st.ResyncID, + StartTime: st.StartTime, + EndTime: st.EndTime, + ResyncStatus: st.ResyncStatus.String(), + ReplicatedSize: st.ReplicatedSize, + ReplicatedCount: st.ReplicatedCount, + FailedSize: st.FailedSize, + FailedCount: st.FailedCount, + Bucket: st.Bucket, + Object: st.Object, + }) + } + globalReplicationPool.resyncState.RUnlock() data, err := json.Marshal(rinfo) if err != nil { writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) diff --git a/cmd/bucket-metadata.go b/cmd/bucket-metadata.go index 11dd924de..f90c8183d 100644 --- a/cmd/bucket-metadata.go +++ b/cmd/bucket-metadata.go @@ -375,6 +375,7 @@ func deleteBucketMetadata(ctx context.Context, obj objectDeleter, bucket string) metadataFiles := []string{ dataUsageCacheName, bucketMetadataFile, + path.Join(replicationDir, resyncFileName), } for _, metaFile := range metadataFiles { configFile := path.Join(bucketMetaPrefix, bucket, metaFile) diff --git a/cmd/bucket-replication-utils.go b/cmd/bucket-replication-utils.go index 0c7de5a8a..6bd747b9b 100644 --- a/cmd/bucket-replication-utils.go +++ b/cmd/bucket-replication-utils.go @@ -23,6 +23,7 @@ import ( "regexp" "strconv" "strings" + "sync" "time" "github.com/minio/minio/internal/bucket/replication" @@ -576,8 +577,23 @@ type ResyncTargetsInfo struct { // ResyncTarget is a struct representing the Target reset ID where target is identified by its Arn type ResyncTarget struct { - Arn string `json:"arn"` - ResetID string `json:"resetid"` + Arn string `json:"arn"` + ResetID string `json:"resetid"` + StartTime time.Time `json:"startTime"` + EndTime time.Time `json:"endTime"` + // Status of resync operation + ResyncStatus string `json:"resyncStatus,omitempty"` + // Completed size in bytes + ReplicatedSize int64 `json:"completedReplicationSize"` + // Failed size in bytes + FailedSize int64 `json:"failedReplicationSize"` + // Total number of failed operations + FailedCount int64 `json:"failedReplicationCount"` + // Total number of failed operations + ReplicatedCount int64 `json:"replicationCount"` + // Last bucket/object replicated. + Bucket string `json:"bucket,omitempty"` + Object string `json:"object,omitempty"` } // VersionPurgeStatusType represents status of a versioned delete or permanent delete w.r.t bucket replication @@ -603,3 +619,83 @@ func (v VersionPurgeStatusType) Empty() bool { func (v VersionPurgeStatusType) Pending() bool { return v == Pending || v == Failed } + +type replicationResyncState struct { + // map of bucket to their resync status + statusMap map[string]BucketReplicationResyncStatus + sync.RWMutex +} + +const ( + replicationDir = "replication" + resyncFileName = "resync.bin" + resyncMetaFormat = 1 + resyncMetaVersionV1 = 1 + resyncMetaVersion = resyncMetaVersionV1 +) + +// ResyncStatusType status of resync operation +type ResyncStatusType int + +const ( + // NoResync - no resync in progress + NoResync ResyncStatusType = iota + // ResyncStarted - resync in progress + ResyncStarted + // ResyncCompleted - resync finished + ResyncCompleted + // ResyncFailed - resync failed + ResyncFailed +) + +func (rt ResyncStatusType) String() string { + switch rt { + case ResyncStarted: + return "Ongoing" + case ResyncCompleted: + return "Completed" + case ResyncFailed: + return "Failed" + default: + return "" + } +} + +// TargetReplicationResyncStatus status of resync of bucket for a specific target +type TargetReplicationResyncStatus struct { + StartTime time.Time `json:"startTime" msg:"st"` + EndTime time.Time `json:"endTime" msg:"et"` + // Resync ID assigned to this reset + ResyncID string `json:"resyncID" msg:"id"` + // ResyncBeforeDate - resync all objects created prior to this date + ResyncBeforeDate time.Time `json:"resyncBeforeDate" msg:"rdt"` + // Status of resync operation + ResyncStatus ResyncStatusType `json:"resyncStatus" msg:"rst"` + // Failed size in bytes + FailedSize int64 `json:"failedReplicationSize" msg:"fs"` + // Total number of failed operations + FailedCount int64 `json:"failedReplicationCount" msg:"frc"` + // Completed size in bytes + ReplicatedSize int64 `json:"completedReplicationSize" msg:"rs"` + // Total number of failed operations + ReplicatedCount int64 `json:"replicationCount" msg:"rrc"` + // Last bucket/object replicated. + Bucket string `json:"-" msg:"bkt"` + Object string `json:"-" msg:"obj"` +} + +// BucketReplicationResyncStatus captures current replication resync status +type BucketReplicationResyncStatus struct { + Version int `json:"version" msg:"v"` + // map of remote arn to their resync status for a bucket + TargetsMap map[string]TargetReplicationResyncStatus `json:"resyncMap,omitempty" msg:"brs"` + ID int `json:"id" msg:"id"` + LastUpdate time.Time `json:"lastUpdate" msg:"lu"` +} + +func newBucketResyncStatus(bucket string) BucketReplicationResyncStatus { + return BucketReplicationResyncStatus{ + TargetsMap: make(map[string]TargetReplicationResyncStatus), + Version: resyncMetaVersion, + } +} diff --git a/cmd/bucket-replication-utils_gen.go b/cmd/bucket-replication-utils_gen.go index c91b75772..7d2aa7dda 100644 --- a/cmd/bucket-replication-utils_gen.go +++ b/cmd/bucket-replication-utils_gen.go @@ -7,6 +7,259 @@ import ( "github.com/tinylib/msgp/msgp" ) +// DecodeMsg implements msgp.Decodable +func (z *BucketReplicationResyncStatus) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "v": + z.Version, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "Version") + return + } + case "brs": + var zb0002 uint32 + zb0002, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, "TargetsMap") + return + } + if z.TargetsMap == nil { + z.TargetsMap = make(map[string]TargetReplicationResyncStatus, zb0002) + } else if len(z.TargetsMap) > 0 { + for key := range z.TargetsMap { + delete(z.TargetsMap, key) + } + } + for zb0002 > 0 { + zb0002-- + var za0001 string + var za0002 TargetReplicationResyncStatus + za0001, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "TargetsMap") + return + } + err = za0002.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "TargetsMap", za0001) + return + } + z.TargetsMap[za0001] = za0002 + } + case "id": + z.ID, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "ID") + return + } + case "lu": + z.LastUpdate, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "LastUpdate") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *BucketReplicationResyncStatus) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 4 + // write "v" + err = en.Append(0x84, 0xa1, 0x76) + if err != nil { + return + } + err = en.WriteInt(z.Version) + if err != nil { + err = msgp.WrapError(err, "Version") + return + } + // write "brs" + err = en.Append(0xa3, 0x62, 0x72, 0x73) + if err != nil { + return + } + err = en.WriteMapHeader(uint32(len(z.TargetsMap))) + if err != nil { + err = msgp.WrapError(err, "TargetsMap") + return + } + for za0001, za0002 := range z.TargetsMap { + err = en.WriteString(za0001) + if err != nil { + err = msgp.WrapError(err, "TargetsMap") + return + } + err = za0002.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "TargetsMap", za0001) + return + } + } + // write "id" + err = en.Append(0xa2, 0x69, 0x64) + if err != nil { + return + } + err = en.WriteInt(z.ID) + if err != nil { + err = msgp.WrapError(err, "ID") + return + } + // write "lu" + err = en.Append(0xa2, 0x6c, 0x75) + if err != nil { + return + } + err = en.WriteTime(z.LastUpdate) + if err != nil { + err = msgp.WrapError(err, "LastUpdate") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *BucketReplicationResyncStatus) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 4 + // string "v" + o = append(o, 0x84, 0xa1, 0x76) + o = msgp.AppendInt(o, z.Version) + // string "brs" + o = append(o, 0xa3, 0x62, 0x72, 0x73) + o = msgp.AppendMapHeader(o, uint32(len(z.TargetsMap))) + for za0001, za0002 := range z.TargetsMap { + o = msgp.AppendString(o, za0001) + o, err = za0002.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "TargetsMap", za0001) + return + } + } + // string "id" + o = append(o, 0xa2, 0x69, 0x64) + o = msgp.AppendInt(o, z.ID) + // string "lu" + o = append(o, 0xa2, 0x6c, 0x75) + o = msgp.AppendTime(o, z.LastUpdate) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *BucketReplicationResyncStatus) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "v": + z.Version, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Version") + return + } + case "brs": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "TargetsMap") + return + } + if z.TargetsMap == nil { + z.TargetsMap = make(map[string]TargetReplicationResyncStatus, zb0002) + } else if len(z.TargetsMap) > 0 { + for key := range z.TargetsMap { + delete(z.TargetsMap, key) + } + } + for zb0002 > 0 { + var za0001 string + var za0002 TargetReplicationResyncStatus + zb0002-- + za0001, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "TargetsMap") + return + } + bts, err = za0002.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "TargetsMap", za0001) + return + } + z.TargetsMap[za0001] = za0002 + } + case "id": + z.ID, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "ID") + return + } + case "lu": + z.LastUpdate, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "LastUpdate") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *BucketReplicationResyncStatus) Msgsize() (s int) { + s = 1 + 2 + msgp.IntSize + 4 + msgp.MapHeaderSize + if z.TargetsMap != nil { + for za0001, za0002 := range z.TargetsMap { + _ = za0002 + s += msgp.StringPrefixSize + len(za0001) + za0002.Msgsize() + } + } + s += 3 + msgp.IntSize + 3 + msgp.TimeSize + return +} + // DecodeMsg implements msgp.Decodable func (z *ReplicateDecision) DecodeMsg(dc *msgp.Reader) (err error) { var field []byte @@ -729,6 +982,58 @@ func (z ResyncDecision) Msgsize() (s int) { return } +// DecodeMsg implements msgp.Decodable +func (z *ResyncStatusType) DecodeMsg(dc *msgp.Reader) (err error) { + { + var zb0001 int + zb0001, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err) + return + } + (*z) = ResyncStatusType(zb0001) + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z ResyncStatusType) EncodeMsg(en *msgp.Writer) (err error) { + err = en.WriteInt(int(z)) + if err != nil { + err = msgp.WrapError(err) + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z ResyncStatusType) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + o = msgp.AppendInt(o, int(z)) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *ResyncStatusType) UnmarshalMsg(bts []byte) (o []byte, err error) { + { + var zb0001 int + zb0001, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + (*z) = ResyncStatusType(zb0001) + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z ResyncStatusType) Msgsize() (s int) { + s = msgp.IntSize + return +} + // DecodeMsg implements msgp.Decodable func (z *ResyncTarget) DecodeMsg(dc *msgp.Reader) (err error) { var field []byte @@ -759,6 +1064,60 @@ func (z *ResyncTarget) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "ResetID") return } + case "StartTime": + z.StartTime, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "StartTime") + return + } + case "EndTime": + z.EndTime, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "EndTime") + return + } + case "ResyncStatus": + z.ResyncStatus, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "ResyncStatus") + return + } + case "ReplicatedSize": + z.ReplicatedSize, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "ReplicatedSize") + return + } + case "FailedSize": + z.FailedSize, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "FailedSize") + return + } + case "FailedCount": + z.FailedCount, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "FailedCount") + return + } + case "ReplicatedCount": + z.ReplicatedCount, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "ReplicatedCount") + return + } + case "Bucket": + z.Bucket, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "Object": + z.Object, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Object") + return + } default: err = dc.Skip() if err != nil { @@ -771,10 +1130,10 @@ func (z *ResyncTarget) DecodeMsg(dc *msgp.Reader) (err error) { } // EncodeMsg implements msgp.Encodable -func (z ResyncTarget) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 2 +func (z *ResyncTarget) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 11 // write "Arn" - err = en.Append(0x82, 0xa3, 0x41, 0x72, 0x6e) + err = en.Append(0x8b, 0xa3, 0x41, 0x72, 0x6e) if err != nil { return } @@ -793,19 +1152,136 @@ func (z ResyncTarget) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "ResetID") return } + // write "StartTime" + err = en.Append(0xa9, 0x53, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65) + if err != nil { + return + } + err = en.WriteTime(z.StartTime) + if err != nil { + err = msgp.WrapError(err, "StartTime") + return + } + // write "EndTime" + err = en.Append(0xa7, 0x45, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65) + if err != nil { + return + } + err = en.WriteTime(z.EndTime) + if err != nil { + err = msgp.WrapError(err, "EndTime") + return + } + // write "ResyncStatus" + err = en.Append(0xac, 0x52, 0x65, 0x73, 0x79, 0x6e, 0x63, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73) + if err != nil { + return + } + err = en.WriteString(z.ResyncStatus) + if err != nil { + err = msgp.WrapError(err, "ResyncStatus") + return + } + // write "ReplicatedSize" + err = en.Append(0xae, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, 0x53, 0x69, 0x7a, 0x65) + if err != nil { + return + } + err = en.WriteInt64(z.ReplicatedSize) + if err != nil { + err = msgp.WrapError(err, "ReplicatedSize") + return + } + // write "FailedSize" + err = en.Append(0xaa, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x53, 0x69, 0x7a, 0x65) + if err != nil { + return + } + err = en.WriteInt64(z.FailedSize) + if err != nil { + err = msgp.WrapError(err, "FailedSize") + return + } + // write "FailedCount" + err = en.Append(0xab, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x43, 0x6f, 0x75, 0x6e, 0x74) + if err != nil { + return + } + err = en.WriteInt64(z.FailedCount) + if err != nil { + err = msgp.WrapError(err, "FailedCount") + return + } + // write "ReplicatedCount" + err = en.Append(0xaf, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, 0x43, 0x6f, 0x75, 0x6e, 0x74) + if err != nil { + return + } + err = en.WriteInt64(z.ReplicatedCount) + if err != nil { + err = msgp.WrapError(err, "ReplicatedCount") + return + } + // write "Bucket" + err = en.Append(0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) + if err != nil { + return + } + err = en.WriteString(z.Bucket) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + // write "Object" + err = en.Append(0xa6, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74) + if err != nil { + return + } + err = en.WriteString(z.Object) + if err != nil { + err = msgp.WrapError(err, "Object") + return + } return } // MarshalMsg implements msgp.Marshaler -func (z ResyncTarget) MarshalMsg(b []byte) (o []byte, err error) { +func (z *ResyncTarget) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 2 + // map header, size 11 // string "Arn" - o = append(o, 0x82, 0xa3, 0x41, 0x72, 0x6e) + o = append(o, 0x8b, 0xa3, 0x41, 0x72, 0x6e) o = msgp.AppendString(o, z.Arn) // string "ResetID" o = append(o, 0xa7, 0x52, 0x65, 0x73, 0x65, 0x74, 0x49, 0x44) o = msgp.AppendString(o, z.ResetID) + // string "StartTime" + o = append(o, 0xa9, 0x53, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65) + o = msgp.AppendTime(o, z.StartTime) + // string "EndTime" + o = append(o, 0xa7, 0x45, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65) + o = msgp.AppendTime(o, z.EndTime) + // string "ResyncStatus" + o = append(o, 0xac, 0x52, 0x65, 0x73, 0x79, 0x6e, 0x63, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73) + o = msgp.AppendString(o, z.ResyncStatus) + // string "ReplicatedSize" + o = append(o, 0xae, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, 0x53, 0x69, 0x7a, 0x65) + o = msgp.AppendInt64(o, z.ReplicatedSize) + // string "FailedSize" + o = append(o, 0xaa, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x53, 0x69, 0x7a, 0x65) + o = msgp.AppendInt64(o, z.FailedSize) + // string "FailedCount" + o = append(o, 0xab, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x43, 0x6f, 0x75, 0x6e, 0x74) + o = msgp.AppendInt64(o, z.FailedCount) + // string "ReplicatedCount" + o = append(o, 0xaf, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65, 0x64, 0x43, 0x6f, 0x75, 0x6e, 0x74) + o = msgp.AppendInt64(o, z.ReplicatedCount) + // string "Bucket" + o = append(o, 0xa6, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74) + o = msgp.AppendString(o, z.Bucket) + // string "Object" + o = append(o, 0xa6, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74) + o = msgp.AppendString(o, z.Object) return } @@ -839,6 +1315,60 @@ func (z *ResyncTarget) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "ResetID") return } + case "StartTime": + z.StartTime, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "StartTime") + return + } + case "EndTime": + z.EndTime, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "EndTime") + return + } + case "ResyncStatus": + z.ResyncStatus, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "ResyncStatus") + return + } + case "ReplicatedSize": + z.ReplicatedSize, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ReplicatedSize") + return + } + case "FailedSize": + z.FailedSize, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "FailedSize") + return + } + case "FailedCount": + z.FailedCount, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "FailedCount") + return + } + case "ReplicatedCount": + z.ReplicatedCount, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ReplicatedCount") + return + } + case "Bucket": + z.Bucket, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "Object": + z.Object, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Object") + return + } default: bts, err = msgp.Skip(bts) if err != nil { @@ -852,8 +1382,8 @@ func (z *ResyncTarget) UnmarshalMsg(bts []byte) (o []byte, err error) { } // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z ResyncTarget) Msgsize() (s int) { - s = 1 + 4 + msgp.StringPrefixSize + len(z.Arn) + 8 + msgp.StringPrefixSize + len(z.ResetID) +func (z *ResyncTarget) Msgsize() (s int) { + s = 1 + 4 + msgp.StringPrefixSize + len(z.Arn) + 8 + msgp.StringPrefixSize + len(z.ResetID) + 10 + msgp.TimeSize + 8 + msgp.TimeSize + 13 + msgp.StringPrefixSize + len(z.ResyncStatus) + 15 + msgp.Int64Size + 11 + msgp.Int64Size + 12 + msgp.Int64Size + 16 + msgp.Int64Size + 7 + msgp.StringPrefixSize + len(z.Bucket) + 7 + msgp.StringPrefixSize + len(z.Object) return } @@ -1041,40 +1571,11 @@ func (z *ResyncTargetsInfo) DecodeMsg(dc *msgp.Reader) (err error) { z.Targets = make([]ResyncTarget, zb0002) } for za0001 := range z.Targets { - var zb0003 uint32 - zb0003, err = dc.ReadMapHeader() + err = z.Targets[za0001].DecodeMsg(dc) if err != nil { err = msgp.WrapError(err, "Targets", za0001) return } - for zb0003 > 0 { - zb0003-- - field, err = dc.ReadMapKeyPtr() - if err != nil { - err = msgp.WrapError(err, "Targets", za0001) - return - } - switch msgp.UnsafeString(field) { - case "Arn": - z.Targets[za0001].Arn, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "Targets", za0001, "Arn") - return - } - case "ResetID": - z.Targets[za0001].ResetID, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "Targets", za0001, "ResetID") - return - } - default: - err = dc.Skip() - if err != nil { - err = msgp.WrapError(err, "Targets", za0001) - return - } - } - } } default: err = dc.Skip() @@ -1101,25 +1602,9 @@ func (z *ResyncTargetsInfo) EncodeMsg(en *msgp.Writer) (err error) { return } for za0001 := range z.Targets { - // map header, size 2 - // write "Arn" - err = en.Append(0x82, 0xa3, 0x41, 0x72, 0x6e) + err = z.Targets[za0001].EncodeMsg(en) if err != nil { - return - } - err = en.WriteString(z.Targets[za0001].Arn) - if err != nil { - err = msgp.WrapError(err, "Targets", za0001, "Arn") - return - } - // write "ResetID" - err = en.Append(0xa7, 0x52, 0x65, 0x73, 0x65, 0x74, 0x49, 0x44) - if err != nil { - return - } - err = en.WriteString(z.Targets[za0001].ResetID) - if err != nil { - err = msgp.WrapError(err, "Targets", za0001, "ResetID") + err = msgp.WrapError(err, "Targets", za0001) return } } @@ -1134,13 +1619,11 @@ func (z *ResyncTargetsInfo) MarshalMsg(b []byte) (o []byte, err error) { o = append(o, 0x81, 0xa7, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x73) o = msgp.AppendArrayHeader(o, uint32(len(z.Targets))) for za0001 := range z.Targets { - // map header, size 2 - // string "Arn" - o = append(o, 0x82, 0xa3, 0x41, 0x72, 0x6e) - o = msgp.AppendString(o, z.Targets[za0001].Arn) - // string "ResetID" - o = append(o, 0xa7, 0x52, 0x65, 0x73, 0x65, 0x74, 0x49, 0x44) - o = msgp.AppendString(o, z.Targets[za0001].ResetID) + o, err = z.Targets[za0001].MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Targets", za0001) + return + } } return } @@ -1176,40 +1659,11 @@ func (z *ResyncTargetsInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { z.Targets = make([]ResyncTarget, zb0002) } for za0001 := range z.Targets { - var zb0003 uint32 - zb0003, bts, err = msgp.ReadMapHeaderBytes(bts) + bts, err = z.Targets[za0001].UnmarshalMsg(bts) if err != nil { err = msgp.WrapError(err, "Targets", za0001) return } - for zb0003 > 0 { - zb0003-- - field, bts, err = msgp.ReadMapKeyZC(bts) - if err != nil { - err = msgp.WrapError(err, "Targets", za0001) - return - } - switch msgp.UnsafeString(field) { - case "Arn": - z.Targets[za0001].Arn, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "Targets", za0001, "Arn") - return - } - case "ResetID": - z.Targets[za0001].ResetID, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "Targets", za0001, "ResetID") - return - } - default: - bts, err = msgp.Skip(bts) - if err != nil { - err = msgp.WrapError(err, "Targets", za0001) - return - } - } - } } default: bts, err = msgp.Skip(bts) @@ -1227,11 +1681,372 @@ func (z *ResyncTargetsInfo) UnmarshalMsg(bts []byte) (o []byte, err error) { func (z *ResyncTargetsInfo) Msgsize() (s int) { s = 1 + 8 + msgp.ArrayHeaderSize for za0001 := range z.Targets { - s += 1 + 4 + msgp.StringPrefixSize + len(z.Targets[za0001].Arn) + 8 + msgp.StringPrefixSize + len(z.Targets[za0001].ResetID) + s += z.Targets[za0001].Msgsize() } return } +// DecodeMsg implements msgp.Decodable +func (z *TargetReplicationResyncStatus) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "st": + z.StartTime, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "StartTime") + return + } + case "et": + z.EndTime, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "EndTime") + return + } + case "id": + z.ResyncID, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "ResyncID") + return + } + case "rdt": + z.ResyncBeforeDate, err = dc.ReadTime() + if err != nil { + err = msgp.WrapError(err, "ResyncBeforeDate") + return + } + case "rst": + { + var zb0002 int + zb0002, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "ResyncStatus") + return + } + z.ResyncStatus = ResyncStatusType(zb0002) + } + case "fs": + z.FailedSize, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "FailedSize") + return + } + case "frc": + z.FailedCount, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "FailedCount") + return + } + case "rs": + z.ReplicatedSize, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "ReplicatedSize") + return + } + case "rrc": + z.ReplicatedCount, err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "ReplicatedCount") + return + } + case "bkt": + z.Bucket, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "obj": + z.Object, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "Object") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *TargetReplicationResyncStatus) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 11 + // write "st" + err = en.Append(0x8b, 0xa2, 0x73, 0x74) + if err != nil { + return + } + err = en.WriteTime(z.StartTime) + if err != nil { + err = msgp.WrapError(err, "StartTime") + return + } + // write "et" + err = en.Append(0xa2, 0x65, 0x74) + if err != nil { + return + } + err = en.WriteTime(z.EndTime) + if err != nil { + err = msgp.WrapError(err, "EndTime") + return + } + // write "id" + err = en.Append(0xa2, 0x69, 0x64) + if err != nil { + return + } + err = en.WriteString(z.ResyncID) + if err != nil { + err = msgp.WrapError(err, "ResyncID") + return + } + // write "rdt" + err = en.Append(0xa3, 0x72, 0x64, 0x74) + if err != nil { + return + } + err = en.WriteTime(z.ResyncBeforeDate) + if err != nil { + err = msgp.WrapError(err, "ResyncBeforeDate") + return + } + // write "rst" + err = en.Append(0xa3, 0x72, 0x73, 0x74) + if err != nil { + return + } + err = en.WriteInt(int(z.ResyncStatus)) + if err != nil { + err = msgp.WrapError(err, "ResyncStatus") + return + } + // write "fs" + err = en.Append(0xa2, 0x66, 0x73) + if err != nil { + return + } + err = en.WriteInt64(z.FailedSize) + if err != nil { + err = msgp.WrapError(err, "FailedSize") + return + } + // write "frc" + err = en.Append(0xa3, 0x66, 0x72, 0x63) + if err != nil { + return + } + err = en.WriteInt64(z.FailedCount) + if err != nil { + err = msgp.WrapError(err, "FailedCount") + return + } + // write "rs" + err = en.Append(0xa2, 0x72, 0x73) + if err != nil { + return + } + err = en.WriteInt64(z.ReplicatedSize) + if err != nil { + err = msgp.WrapError(err, "ReplicatedSize") + return + } + // write "rrc" + err = en.Append(0xa3, 0x72, 0x72, 0x63) + if err != nil { + return + } + err = en.WriteInt64(z.ReplicatedCount) + if err != nil { + err = msgp.WrapError(err, "ReplicatedCount") + return + } + // write "bkt" + err = en.Append(0xa3, 0x62, 0x6b, 0x74) + if err != nil { + return + } + err = en.WriteString(z.Bucket) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + // write "obj" + err = en.Append(0xa3, 0x6f, 0x62, 0x6a) + if err != nil { + return + } + err = en.WriteString(z.Object) + if err != nil { + err = msgp.WrapError(err, "Object") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *TargetReplicationResyncStatus) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 11 + // string "st" + o = append(o, 0x8b, 0xa2, 0x73, 0x74) + o = msgp.AppendTime(o, z.StartTime) + // string "et" + o = append(o, 0xa2, 0x65, 0x74) + o = msgp.AppendTime(o, z.EndTime) + // string "id" + o = append(o, 0xa2, 0x69, 0x64) + o = msgp.AppendString(o, z.ResyncID) + // string "rdt" + o = append(o, 0xa3, 0x72, 0x64, 0x74) + o = msgp.AppendTime(o, z.ResyncBeforeDate) + // string "rst" + o = append(o, 0xa3, 0x72, 0x73, 0x74) + o = msgp.AppendInt(o, int(z.ResyncStatus)) + // string "fs" + o = append(o, 0xa2, 0x66, 0x73) + o = msgp.AppendInt64(o, z.FailedSize) + // string "frc" + o = append(o, 0xa3, 0x66, 0x72, 0x63) + o = msgp.AppendInt64(o, z.FailedCount) + // string "rs" + o = append(o, 0xa2, 0x72, 0x73) + o = msgp.AppendInt64(o, z.ReplicatedSize) + // string "rrc" + o = append(o, 0xa3, 0x72, 0x72, 0x63) + o = msgp.AppendInt64(o, z.ReplicatedCount) + // string "bkt" + o = append(o, 0xa3, 0x62, 0x6b, 0x74) + o = msgp.AppendString(o, z.Bucket) + // string "obj" + o = append(o, 0xa3, 0x6f, 0x62, 0x6a) + o = msgp.AppendString(o, z.Object) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *TargetReplicationResyncStatus) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "st": + z.StartTime, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "StartTime") + return + } + case "et": + z.EndTime, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "EndTime") + return + } + case "id": + z.ResyncID, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "ResyncID") + return + } + case "rdt": + z.ResyncBeforeDate, bts, err = msgp.ReadTimeBytes(bts) + if err != nil { + err = msgp.WrapError(err, "ResyncBeforeDate") + return + } + case "rst": + { + var zb0002 int + zb0002, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "ResyncStatus") + return + } + z.ResyncStatus = ResyncStatusType(zb0002) + } + case "fs": + z.FailedSize, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "FailedSize") + return + } + case "frc": + z.FailedCount, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "FailedCount") + return + } + case "rs": + z.ReplicatedSize, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ReplicatedSize") + return + } + case "rrc": + z.ReplicatedCount, bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "ReplicatedCount") + return + } + case "bkt": + z.Bucket, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Bucket") + return + } + case "obj": + z.Object, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Object") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *TargetReplicationResyncStatus) Msgsize() (s int) { + s = 1 + 3 + msgp.TimeSize + 3 + msgp.TimeSize + 3 + msgp.StringPrefixSize + len(z.ResyncID) + 4 + msgp.TimeSize + 4 + msgp.IntSize + 3 + msgp.Int64Size + 4 + msgp.Int64Size + 3 + msgp.Int64Size + 4 + msgp.Int64Size + 4 + msgp.StringPrefixSize + len(z.Bucket) + 4 + msgp.StringPrefixSize + len(z.Object) + return +} + // DecodeMsg implements msgp.Decodable func (z *VersionPurgeStatusType) DecodeMsg(dc *msgp.Reader) (err error) { { diff --git a/cmd/bucket-replication-utils_gen_test.go b/cmd/bucket-replication-utils_gen_test.go index 51390211f..7ed7f1fef 100644 --- a/cmd/bucket-replication-utils_gen_test.go +++ b/cmd/bucket-replication-utils_gen_test.go @@ -9,6 +9,119 @@ import ( "github.com/tinylib/msgp/msgp" ) +func TestMarshalUnmarshalBucketReplicationResyncStatus(t *testing.T) { + v := BucketReplicationResyncStatus{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgBucketReplicationResyncStatus(b *testing.B) { + v := BucketReplicationResyncStatus{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgBucketReplicationResyncStatus(b *testing.B) { + v := BucketReplicationResyncStatus{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalBucketReplicationResyncStatus(b *testing.B) { + v := BucketReplicationResyncStatus{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeBucketReplicationResyncStatus(t *testing.T) { + v := BucketReplicationResyncStatus{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeBucketReplicationResyncStatus Msgsize() is inaccurate") + } + + vn := BucketReplicationResyncStatus{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeBucketReplicationResyncStatus(b *testing.B) { + v := BucketReplicationResyncStatus{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeBucketReplicationResyncStatus(b *testing.B) { + v := BucketReplicationResyncStatus{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} + func TestMarshalUnmarshalReplicateDecision(t *testing.T) { v := ReplicateDecision{} bts, err := v.MarshalMsg(nil) @@ -686,3 +799,116 @@ func BenchmarkDecodeResyncTargetsInfo(b *testing.B) { } } } + +func TestMarshalUnmarshalTargetReplicationResyncStatus(t *testing.T) { + v := TargetReplicationResyncStatus{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgTargetReplicationResyncStatus(b *testing.B) { + v := TargetReplicationResyncStatus{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgTargetReplicationResyncStatus(b *testing.B) { + v := TargetReplicationResyncStatus{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalTargetReplicationResyncStatus(b *testing.B) { + v := TargetReplicationResyncStatus{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeTargetReplicationResyncStatus(t *testing.T) { + v := TargetReplicationResyncStatus{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeTargetReplicationResyncStatus Msgsize() is inaccurate") + } + + vn := TargetReplicationResyncStatus{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeTargetReplicationResyncStatus(b *testing.B) { + v := TargetReplicationResyncStatus{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeTargetReplicationResyncStatus(b *testing.B) { + v := TargetReplicationResyncStatus{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 313e50562..ce31a22fe 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -19,10 +19,13 @@ package cmd import ( "context" + "encoding/binary" + "errors" "fmt" "io" "math" "net/http" + "path" "reflect" "strings" "sync" @@ -998,6 +1001,7 @@ func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, object OpType: ri.OpType, ReplicationAction: rAction, } + if ri.ObjectInfo.TargetReplicationStatus(tgt.ARN) == replication.Completed && !ri.ExistingObjResync.Empty() && !ri.ExistingObjResync.mustResyncTarget(tgt.ARN) { rinfo.ReplicationStatus = replication.Completed rinfo.ReplicationResynced = true @@ -1057,6 +1061,13 @@ func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, object }) return rinfo } + defer func() { + if rinfo.ReplicationStatus == replication.Completed && ri.OpType == replication.ExistingObjectReplicationType && tgt.ResetID != "" { + rinfo.ResyncTimestamp = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), tgt.ResetID) + rinfo.ReplicationResynced = true + } + rinfo.Duration = time.Since(startTime) + }() rAction = replicateAll oi, cerr := tgt.StatObject(ctx, tgt.Bucket, object, miniogo.StatObjectOptions{ @@ -1089,13 +1100,8 @@ func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, object // Note: Replication Stats would have been updated despite metadata update failure. gr.Close() closeOnDefer = false - return replicatedTargetInfo{ - ReplicationStatus: replication.Completed, - Size: sz, - Arn: tgt.ARN, - ReplicationAction: rAction, - PrevReplicationStatus: objInfo.TargetReplicationStatus(tgt.ARN), - } + rinfo.ReplicationAction = rAction + rinfo.ReplicationStatus = replication.Completed } return } @@ -1103,13 +1109,6 @@ func replicateObjectToTarget(ctx context.Context, ri ReplicateObjectInfo, object rinfo.ReplicationStatus = replication.Completed rinfo.Size = size rinfo.ReplicationAction = rAction - defer func() { - if rinfo.ReplicationStatus == replication.Completed && ri.OpType == replication.ExistingObjectReplicationType && tgt.ResetID != "" { - rinfo.ResyncTimestamp = fmt.Sprintf("%s;%s", UTCNow().Format(http.TimeFormat), tgt.ResetID) - rinfo.ReplicationResynced = true - } - rinfo.Duration = time.Since(startTime) - }() // use core client to avoid doing multipart on PUT c := &miniogo.Core{Client: tgt.Client} if rAction != replicateAll { @@ -1308,6 +1307,7 @@ type ReplicationPool struct { existingReplicaDeleteCh chan DeletedObjectReplicationInfo workerSize int mrfWorkerSize int + resyncState replicationResyncState workerWg sync.WaitGroup mrfWorkerWg sync.WaitGroup once sync.Once @@ -1324,6 +1324,7 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool mrfWorkerKillCh: make(chan struct{}, opts.FailedWorkers), existingReplicaCh: make(chan ReplicateObjectInfo, 100000), existingReplicaDeleteCh: make(chan DeletedObjectReplicationInfo, 100000), + resyncState: replicationResyncState{statusMap: make(map[string]BucketReplicationResyncStatus)}, ctx: ctx, objLayer: o, } @@ -1331,6 +1332,7 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool pool.ResizeWorkers(opts.Workers) pool.ResizeFailedWorkers(opts.FailedWorkers) go pool.AddExistingObjectReplicateWorker() + go pool.periodicResyncMetaSave(ctx, o) return pool } @@ -1870,3 +1872,333 @@ func getLatestReplicationStats(bucket string, u BucketUsageInfo) (s BucketReplic s.ReplicatedSize = int64(math.Max(float64(s.ReplicatedSize), float64(latestTotReplicatedSize))) return s } + +const resyncTimeInterval = time.Minute * 10 + +// periodicResyncMetaSave saves in-memory resync meta stats to disk in periodic intervals +func (p *ReplicationPool) periodicResyncMetaSave(ctx context.Context, objectAPI ObjectLayer) { + resyncTimer := time.NewTimer(resyncTimeInterval) + defer resyncTimer.Stop() + + for { + select { + case <-resyncTimer.C: + resyncTimer.Reset(resyncTimeInterval) + now := UTCNow() + p.resyncState.RLock() + for bucket, brs := range p.resyncState.statusMap { + var updt bool + for _, st := range brs.TargetsMap { + // if resync in progress or just ended, needs to save to disk + if st.EndTime.Equal(timeSentinel) || now.Sub(st.EndTime) <= resyncTimeInterval { + updt = true + break + } + } + if updt { + brs.LastUpdate = now + if err := saveResyncStatus(ctx, bucket, brs, objectAPI); err != nil { + logger.LogIf(ctx, fmt.Errorf("Could not save resync metadata to disk for %s - %w", bucket, err)) + continue + } + } + } + p.resyncState.RUnlock() + case <-ctx.Done(): + // server could be restarting - need + // to exit immediately + return + } + } +} + +// resyncBucket resyncs all qualifying objects as per replication rules for the target +// ARN +func resyncBucket(ctx context.Context, bucket, arn string, heal bool, objectAPI ObjectLayer) { + resyncStatus := ResyncFailed + defer func() { + globalReplicationPool.resyncState.Lock() + m := globalReplicationPool.resyncState.statusMap[bucket] + st := m.TargetsMap[arn] + st.EndTime = UTCNow() + st.ResyncStatus = resyncStatus + m.TargetsMap[arn] = st + globalReplicationPool.resyncState.Unlock() + }() + // Allocate new results channel to receive ObjectInfo. + objInfoCh := make(chan ObjectInfo) + cfg, err := getReplicationConfig(ctx, bucket) + if err != nil { + logger.LogIf(ctx, fmt.Errorf("Replication resync of %s for arn %s failed with %w", bucket, arn, err)) + return + } + tgts, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket) + if err != nil { + logger.LogIf(ctx, fmt.Errorf("Replication resync of %s for arn %s failed %w", bucket, arn, err)) + return + } + rcfg := replicationConfig{ + Config: cfg, + remotes: tgts, + } + tgtArns := cfg.FilterTargetArns( + replication.ObjectOpts{ + OpType: replication.ResyncReplicationType, + TargetArn: arn, + }) + if len(tgtArns) != 1 { + logger.LogIf(ctx, fmt.Errorf("Replication resync failed for %s - arn specified %s is missing in the replication config", bucket, arn)) + return + } + tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, arn) + if tgt == nil { + logger.LogIf(ctx, fmt.Errorf("Replication resync failed for %s - target could not be created for arn %s", bucket, arn)) + return + } + + // Walk through all object versions - note ascending order of walk needed to ensure delete marker replicated to + // target after object version is first created. + if err := objectAPI.Walk(ctx, bucket, "", objInfoCh, ObjectOptions{WalkAscending: true}); err != nil { + logger.LogIf(ctx, err) + return + } + + globalReplicationPool.resyncState.RLock() + m := globalReplicationPool.resyncState.statusMap[bucket] + st := m.TargetsMap[arn] + globalReplicationPool.resyncState.RUnlock() + var lastCheckpoint string + if st.ResyncStatus == ResyncStarted || st.ResyncStatus == ResyncFailed { + lastCheckpoint = st.Object + } + for obj := range objInfoCh { + if heal && lastCheckpoint != "" && lastCheckpoint != obj.Name { + continue + } + lastCheckpoint = "" + + roi := getHealReplicateObjectInfo(obj, rcfg) + if !roi.ExistingObjResync.mustResync() { + continue + } + + if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() { + + versionID := "" + dmVersionID := "" + if roi.VersionPurgeStatus.Empty() { + dmVersionID = roi.VersionID + } else { + versionID = roi.VersionID + } + + doi := DeletedObjectReplicationInfo{ + DeletedObject: DeletedObject{ + ObjectName: roi.Name, + DeleteMarkerVersionID: dmVersionID, + VersionID: versionID, + ReplicationState: roi.getReplicationState(roi.Dsc.String(), versionID, true), + DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime}, + DeleteMarker: roi.DeleteMarker, + }, + Bucket: roi.Bucket, + OpType: replication.ExistingObjectReplicationType, + } + replicateDelete(ctx, doi, objectAPI, ReplicateDelete) + } else { + roi.OpType = replication.ExistingObjectReplicationType + replicateObject(ctx, roi, objectAPI, ReplicateExisting) + } + _, err = tgt.StatObject(ctx, tgt.Bucket, roi.Name, miniogo.StatObjectOptions{ + VersionID: roi.VersionID, + Internal: miniogo.AdvancedGetOptions{ + ReplicationProxyRequest: "false", + }, + }) + globalReplicationPool.resyncState.Lock() + m = globalReplicationPool.resyncState.statusMap[bucket] + st = m.TargetsMap[arn] + st.Object = roi.Name + if err != nil { + if roi.DeleteMarker && isErrMethodNotAllowed(ErrorRespToObjectError(err, bucket, roi.Name)) { + st.ReplicatedCount++ + } else { + st.FailedCount++ + } + } else { + st.ReplicatedCount++ + st.ReplicatedSize += roi.Size + } + m.TargetsMap[arn] = st + globalReplicationPool.resyncState.Unlock() + } + resyncStatus = ResyncCompleted +} + +// start replication resync for the remote target ARN specified +func startReplicationResync(ctx context.Context, bucket, arn, resyncID string, resyncBeforeDate time.Time, objAPI ObjectLayer) error { + if bucket == "" { + return fmt.Errorf("bucket name is empty") + } + if arn == "" { + return fmt.Errorf("target ARN specified for resync is empty") + } + // Check if the current bucket has quota restrictions, if not skip it + cfg, err := getReplicationConfig(ctx, bucket) + if err != nil { + return err + } + tgtArns := cfg.FilterTargetArns( + replication.ObjectOpts{ + OpType: replication.ResyncReplicationType, + TargetArn: arn, + }) + + if len(tgtArns) == 0 { + return fmt.Errorf("arn %s specified for resync not found in replication config", arn) + } + + data, err := loadBucketResyncMetadata(ctx, bucket, objAPI) + if err != nil { + return err + } + // validate if resync is in progress for this arn + for tArn, st := range data.TargetsMap { + if arn == tArn && st.ResyncStatus == ResyncStarted { + return fmt.Errorf("Resync of bucket %s is already in progress for remote bucket %s", bucket, arn) + } + } + + status := TargetReplicationResyncStatus{ + ResyncID: resyncID, + ResyncBeforeDate: resyncBeforeDate, + StartTime: UTCNow(), + ResyncStatus: ResyncStarted, + Bucket: bucket, + } + data.TargetsMap[arn] = status + if err = saveResyncStatus(ctx, bucket, data, objAPI); err != nil { + return err + } + globalReplicationPool.resyncState.Lock() + defer globalReplicationPool.resyncState.Unlock() + brs, ok := globalReplicationPool.resyncState.statusMap[bucket] + if !ok { + brs = BucketReplicationResyncStatus{ + Version: resyncMetaVersion, + TargetsMap: make(map[string]TargetReplicationResyncStatus), + } + } + brs.TargetsMap[arn] = status + globalReplicationPool.resyncState.statusMap[bucket] = brs + go resyncBucket(GlobalContext, bucket, arn, false, objAPI) + return nil +} + +// delete resync metadata from replication resync state in memory +func (p *ReplicationPool) deleteResyncMetadata(ctx context.Context, bucket string) { + if p == nil { + return + } + p.resyncState.Lock() + delete(p.resyncState.statusMap, bucket) + defer p.resyncState.Unlock() +} + +// initResync - initializes bucket replication resync for all buckets. +func (p *ReplicationPool) initResync(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error { + if objAPI == nil { + return errServerNotInitialized + } + // replication applies only to erasure coded setups + if !globalIsErasure { + return nil + } + // Load bucket metadata sys in background + go p.loadResync(ctx, buckets, objAPI) + return nil +} + +// Loads bucket replication resync statuses into memory. +func (p *ReplicationPool) loadResync(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) { + for index := range buckets { + meta, err := loadBucketResyncMetadata(ctx, buckets[index].Name, objAPI) + if err != nil { + if errors.Is(err, errVolumeNotFound) { + meta = newBucketResyncStatus(buckets[index].Name) + } else { + logger.LogIf(ctx, err) + continue + } + } + p.resyncState.statusMap[buckets[index].Name] = meta + } + for index := range buckets { + bucket := buckets[index].Name + m, ok := p.resyncState.statusMap[bucket] + if ok { + for arn, st := range m.TargetsMap { + if st.ResyncStatus == ResyncFailed || st.ResyncStatus == ResyncStarted { + go resyncBucket(ctx, bucket, arn, true, objAPI) + } + } + } + } +} + +// load bucket resync metadata from disk +func loadBucketResyncMetadata(ctx context.Context, bucket string, objAPI ObjectLayer) (brs BucketReplicationResyncStatus, e error) { + brs = newBucketResyncStatus(bucket) + + resyncDirPath := path.Join(bucketMetaPrefix, bucket, replicationDir) + data, err := readConfig(GlobalContext, objAPI, pathJoin(resyncDirPath, resyncFileName)) + if err != nil && err != errConfigNotFound { + return brs, err + } + if len(data) == 0 { + // Seems to be empty. + return brs, nil + } + if len(data) <= 4 { + return brs, fmt.Errorf("replication resync: no data") + } + // Read resync meta header + switch binary.LittleEndian.Uint16(data[0:2]) { + case resyncMetaFormat: + default: + return brs, fmt.Errorf("resyncMeta: unknown format: %d", binary.LittleEndian.Uint16(data[0:2])) + } + switch binary.LittleEndian.Uint16(data[2:4]) { + case resyncMetaVersion: + default: + return brs, fmt.Errorf("resyncMeta: unknown version: %d", binary.LittleEndian.Uint16(data[2:4])) + } + // OK, parse data. + if _, err = brs.UnmarshalMsg(data[4:]); err != nil { + return brs, err + } + + switch brs.Version { + case resyncMetaVersionV1: + default: + return brs, fmt.Errorf("unexpected resync meta version: %d", brs.Version) + } + return brs, nil +} + +// save resync status to resync.bin +func saveResyncStatus(ctx context.Context, bucket string, brs BucketReplicationResyncStatus, objectAPI ObjectLayer) error { + data := make([]byte, 4, brs.Msgsize()+4) + + // Initialize the resync meta header. + binary.LittleEndian.PutUint16(data[0:2], resyncMetaFormat) + binary.LittleEndian.PutUint16(data[2:4], resyncMetaVersion) + + buf, err := brs.MarshalMsg(data) + if err != nil { + return err + } + + configFile := path.Join(bucketMetaPrefix, bucket, replicationDir, resyncFileName) + return saveConfig(ctx, objectAPI, configFile, buf) +} diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 9bd0af472..0c4f14d58 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -1668,7 +1668,13 @@ func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, re cancel() return } - + if opts.WalkAscending { + for i := len(fivs.Versions) - 1; i >= 0; i-- { + version := fivs.Versions[i] + results <- version.ToObjectInfo(bucket, version.Name) + } + return + } for _, version := range fivs.Versions { results <- version.ToObjectInfo(bucket, version.Name) } diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index a70a873ff..2fffc6c06 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -72,7 +72,8 @@ type ObjectOptions struct { MaxParity bool // Mutate set to 'true' if the call is namespace mutation call - Mutate bool + Mutate bool + WalkAscending bool // return Walk results in ascending order of versions } // ExpirationOptions represents object options for object expiration at objectLayer. diff --git a/cmd/server-main.go b/cmd/server-main.go index bea75f459..00a09109c 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -577,6 +577,8 @@ func serverMain(ctx *cli.Context) { if err != nil { logger.LogIf(GlobalContext, fmt.Errorf("Unable to list buckets to heal: %w", err)) } + // initialize replication resync state. + go globalReplicationPool.initResync(GlobalContext, buckets, newObject) // Populate existing buckets to the etcd backend if globalDNSConfig != nil { diff --git a/docs/bucket/replication/DESIGN.md b/docs/bucket/replication/DESIGN.md index b2f0ffd65..22583c38d 100644 --- a/docs/bucket/replication/DESIGN.md +++ b/docs/bucket/replication/DESIGN.md @@ -41,7 +41,9 @@ existing object replication are not marked as `PENDING` prior to replication. Note that objects with `null` versions, i.e. objects created prior to enabling versioning break the immutability guarantees provided by versioning. When existing object replication is enabled, these objects will be replicated as `null` versions to the remote targets provided they are not present on the target or if `null` version of object on source is newer than the `null` version of object on target. -If the remote site is fully lost and objects previously replicated need to be re-synced, the `mc replicate resync` command with optional flag of `--older-than` needs to be used to trigger re-syncing of previously replicated objects. This command generates a ResetID which is a unique UUID saved to the remote target config along with the applicable date(defaults to time of initiating the reset). All objects created prior to this date are eligible for re-replication if existing object replication is enabled for the replication rule the object satisfies. At the time of completion of replication, `X-Minio-Replication-Reset-Status` is set in the metadata with the timestamp of replication and ResetID. For saving iops, the objects which are re-replicated are not first set to `PENDING` state. +If the remote site is fully lost and objects previously replicated need to be re-synced, the `mc replicate resync start` command with optional flag of `--older-than` needs to be used to trigger re-syncing of previously replicated objects. This command generates a ResetID which is a unique UUID saved to the remote target config along with the applicable date(defaults to time of initiating the reset). All objects created prior to this date are eligible for re-replication if existing object replication is enabled for the replication rule the object satisfies. At the time of completion of replication, `x-minio-internal-replication-reset-arn:` is set in the metadata with the timestamp of replication and ResetID. For saving iops, the objects which are re-replicated are not first set to `PENDING` state. + +This is a slower operation that does not use replication queues and is designed to walk the namespace and replicate objects one at a time so as not to impede server load. Ideally, resync should not be initiated for multiple buckets simultaneously - progress of the syncing can be monitored by looking at `mc replicate resync status alias/bucket --remote-bucket `. In the event that resync operation failed to replicate some versions, they would be picked up by the healing mechanism in-built as part of the scanner. If the resync operation reports a failed status or in the event of a cluster restart while resync is in progress, a fresh `resync start` can be issued - this will replicate previously unsynced content at the cost of additional overhead in additional metadata updates. ### Multi destination replication The replication design for multiple sites works in a similar manner as described above for two site scenario. However there are some @@ -90,8 +92,10 @@ If 3 or more targets are participating in active-active replication, the replica }, ... ``` -### Additional replication metadata for DeleteMarker +### Additional replication metadata for DeleteMarker +``` +... { "DelObj": { "ID": "u8H5pYQFRMKgkIgkpSKIkQ==", @@ -122,6 +126,29 @@ If 3 or more targets are participating in active-active replication, the replica } ``` +### Additional Metadata for object replication resync - on source + +``` +... + "MetaSys": { + ... + "x-minio-internal-replication-reset-arn:minio:replication::af470089-d354-4473-934c-9e1f52f6da89:bucket": "TW9uLCAwNyBGZWIgMjAyMiAyMDowMzo0MCBHTVQ7ZGMxMWQzNDgtMTAwMS00ODA3LWFhNjEtOGY2MmFiNWQ5ZjU2", + ... + }, +... +``` + +### Additional Metadata for resync replication of delete-markers - on source + +``` +... + "MetaSys": { + "x-minio-internal-replication-reset-arn:minio:replication::af470089-d354-4473-934c-9e1f52f6da89:bucket": "TW9uLCAwNyBGZWIgMjAyMiAyMDowMzo0MCBHTVQ7ZGMxMWQzNDgtMTAwMS00ODA3LWFhNjEtOGY2MmFiNWQ5ZjU2", + ... + } +... +``` + ## Explore Further - [MinIO Bucket Versioning Implementation](https://docs.minio.io/docs/minio-bucket-versioning-guide.html) - [MinIO Client Quickstart Guide](https://docs.minio.io/docs/minio-client-quickstart-guide.html) diff --git a/docs/bucket/replication/README.md b/docs/bucket/replication/README.md index 0ea5cdefe..a270e2ab0 100644 --- a/docs/bucket/replication/README.md +++ b/docs/bucket/replication/README.md @@ -215,9 +215,7 @@ Existing object replication as detailed [here](https://aws.amazon.com/blogs/stor Once existing object replication is enabled, all objects or object prefixes that satisfy the replication rules and were created prior to adding replication configuration OR while replication rules were disabled will be synced to the target cluster. Depending on the number of previously existing objects, the existing objects that are now eligible to be replicated will eventually be synced to the target cluster as the scanner schedules them. This may be slower depending on the load on the cluster, latency and size of the namespace. -In the rare event that target DR site is entirely lost and previously replicated objects to the DR cluster need to be re-replicated, `mc replicate resync alias/bucket` can be used to initiate a reset. This would initiate a re-sync between the two clusters on a lower priority as the scanner picks up these objects to re-sync. - -This is an expensive operation and should be initiated only once - progress of the syncing can be monitored by looking at Prometheus metrics. If object version has been re-replicated, `mc stat --vid --debug` on this version shows an additional header `X-Minio-Replication-Reset-Status` with the replication timestamp and ResetID generated at the time of issuing the `mc replicate resync` command. +In the rare event that target DR site is entirely lost and previously replicated objects to the DR cluster need to be re-replicated, `mc replicate resync start alias/bucket --remote-bucket ` can be used to initiate a reset. This would initiate a re-sync between the two clusters by walking the bucket namespace and replicating eligible objects that satisfy the existing objects replication rule specified in the replication config. The status of the resync operation can be viewed with `mc replicate resync status alias/bucket --remote-bucket `. Note that ExistingObjectReplication needs to be enabled in the config via `mc replicate [add|edit]` by passing `existing-objects` as one of the values to `--replicate` flag. Only those objects meeting replication rules and having existing object replication enabled will be re-synced.