diff --git a/cmd/admin-handlers-site-replication.go b/cmd/admin-handlers-site-replication.go index 56f96c465..24cff510b 100644 --- a/cmd/admin-handlers-site-replication.go +++ b/cmd/admin-handlers-site-replication.go @@ -515,3 +515,45 @@ func (a adminAPIHandlers) SRPeerRemove(w http.ResponseWriter, r *http.Request) { return } } + +// SiteReplicationResyncOp - PUT /minio/admin/v3/site-replication/resync/op +func (a adminAPIHandlers) SiteReplicationResyncOp(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "SiteReplicationResyncOp") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.SiteReplicationResyncAction) + if objectAPI == nil { + return + } + + var peerSite madmin.PeerInfo + if err := parseJSONBody(ctx, r.Body, &peerSite, ""); err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + vars := mux.Vars(r) + op := madmin.SiteResyncOp(vars["operation"]) + var ( + status madmin.SRResyncOpStatus + err error + ) + switch op { + case madmin.SiteResyncStart: + status, err = globalSiteReplicationSys.startResync(ctx, objectAPI, peerSite) + case madmin.SiteResyncCancel: + status, err = globalSiteReplicationSys.cancelResync(ctx, objectAPI, peerSite) + default: + err = errSRInvalidRequest(errInvalidArgument) + } + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + body, err := json.Marshal(status) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return + } + writeSuccessResponseJSON(w, body) +} diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 24da24d93..4f96e0684 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -428,7 +428,7 @@ func (a adminAPIHandlers) MetricsHandler(w http.ResponseWriter, r *http.Request) } } } - + dID := r.Form.Get("by-depID") done := ctx.Done() ticker := time.NewTicker(interval) defer ticker.Stop() @@ -441,15 +441,16 @@ func (a adminAPIHandlers) MetricsHandler(w http.ResponseWriter, r *http.Request) hosts: hostMap, disks: diskMap, jobID: jobID, + depID: dID, }) m.Merge(&mLocal) - // Allow half the interval for collecting remote... cctx, cancel := context.WithTimeout(ctx, interval/2) mRemote := collectRemoteMetrics(cctx, types, collectMetricsOpts{ hosts: hostMap, disks: diskMap, jobID: jobID, + depID: dID, }) cancel() m.Merge(&mRemote) diff --git a/cmd/admin-router.go b/cmd/admin-router.go index 74fc4ad02..7dba7266d 100644 --- a/cmd/admin-router.go +++ b/cmd/admin-router.go @@ -258,6 +258,7 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) { adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/edit").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationEdit))) adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/edit").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerEdit))) adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/remove").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerRemove))) + adminRouter.Methods(http.MethodPut).Path(adminVersion+"/site-replication/resync/op").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationResyncOp))).Queries("operation", "{operation:.*}") if globalIsDistErasure { // Top locks diff --git a/cmd/bucket-replication-handlers.go b/cmd/bucket-replication-handlers.go index 7df242d30..ab5ef6c3a 100644 --- a/cmd/bucket-replication-handlers.go +++ b/cmd/bucket-replication-handlers.go @@ -321,7 +321,12 @@ func (api objectAPIHandlers) ResetBucketReplicationStartHandler(w http.ResponseW writeErrorResponseJSON(ctx, w, toAPIError(ctx, err), r.URL) } } - if err := startReplicationResync(ctx, bucket, arn, resetID, resetBeforeDate, objectAPI); err != nil { + if err := globalReplicationPool.resyncer.start(ctx, objectAPI, resyncOpts{ + bucket: bucket, + arn: arn, + resyncID: resetID, + resyncBefore: resetBeforeDate, + }); err != nil { writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErrWithErr(ErrBadRequest, InvalidArgument{ Bucket: bucket, Err: err, @@ -370,10 +375,13 @@ func (api objectAPIHandlers) ResetBucketReplicationStatusHandler(w http.Response writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) return } - - globalReplicationPool.resyncState.RLock() - brs, ok := globalReplicationPool.resyncState.statusMap[bucket] - globalReplicationPool.resyncState.RUnlock() + var tgtStats map[string]TargetReplicationResyncStatus + globalReplicationPool.resyncer.RLock() + brs, ok := globalReplicationPool.resyncer.statusMap[bucket] + if ok { + tgtStats = brs.cloneTgtStats() + } + globalReplicationPool.resyncer.RUnlock() if !ok { brs, err = loadBucketResyncMetadata(ctx, bucket, objectAPI) if err != nil { @@ -383,10 +391,11 @@ func (api objectAPIHandlers) ResetBucketReplicationStatusHandler(w http.Response }), r.URL) return } + tgtStats = brs.cloneTgtStats() } var rinfo ResyncTargetsInfo - for tarn, st := range brs.TargetsMap { + for tarn, st := range tgtStats { if arn != "" && tarn != arn { continue } @@ -394,7 +403,7 @@ func (api objectAPIHandlers) ResetBucketReplicationStatusHandler(w http.Response Arn: tarn, ResetID: st.ResyncID, StartTime: st.StartTime, - EndTime: st.EndTime, + EndTime: st.LastUpdate, ResyncStatus: st.ResyncStatus.String(), ReplicatedSize: st.ReplicatedSize, ReplicatedCount: st.ReplicatedCount, diff --git a/cmd/bucket-replication-utils.go b/cmd/bucket-replication-utils.go index 105b7dfb2..788eaf996 100644 --- a/cmd/bucket-replication-utils.go +++ b/cmd/bucket-replication-utils.go @@ -628,9 +628,12 @@ func (v VersionPurgeStatusType) Pending() bool { return v == Pending || v == Failed } -type replicationResyncState struct { +type replicationResyncer struct { // map of bucket to their resync status - statusMap map[string]BucketReplicationResyncStatus + statusMap map[string]BucketReplicationResyncStatus + workerSize int + resyncCancelCh chan struct{} + workerCh chan struct{} sync.RWMutex } @@ -642,12 +645,23 @@ const ( resyncMetaVersion = resyncMetaVersionV1 ) +type resyncOpts struct { + bucket string + arn string + resyncID string + resyncBefore time.Time +} + // ResyncStatusType status of resync operation type ResyncStatusType int const ( // NoResync - no resync in progress NoResync ResyncStatusType = iota + // ResyncPending - resync pending + ResyncPending + // ResyncCanceled - resync canceled + ResyncCanceled // ResyncStarted - resync in progress ResyncStarted // ResyncCompleted - resync finished @@ -656,6 +670,10 @@ const ( ResyncFailed ) +func (rt ResyncStatusType) isValid() bool { + return rt != NoResync +} + func (rt ResyncStatusType) String() string { switch rt { case ResyncStarted: @@ -664,6 +682,10 @@ func (rt ResyncStatusType) String() string { return "Completed" case ResyncFailed: return "Failed" + case ResyncPending: + return "Pending" + case ResyncCanceled: + return "Canceled" default: return "" } @@ -671,8 +693,8 @@ func (rt ResyncStatusType) String() string { // TargetReplicationResyncStatus status of resync of bucket for a specific target type TargetReplicationResyncStatus struct { - StartTime time.Time `json:"startTime" msg:"st"` - EndTime time.Time `json:"endTime" msg:"et"` + StartTime time.Time `json:"startTime" msg:"st"` + LastUpdate time.Time `json:"lastUpdated" msg:"lst"` // Resync ID assigned to this reset ResyncID string `json:"resyncID" msg:"id"` // ResyncBeforeDate - resync all objects created prior to this date @@ -701,6 +723,14 @@ type BucketReplicationResyncStatus struct { LastUpdate time.Time `json:"lastUpdate" msg:"lu"` } +func (rs *BucketReplicationResyncStatus) cloneTgtStats() (m map[string]TargetReplicationResyncStatus) { + m = make(map[string]TargetReplicationResyncStatus) + for arn, st := range rs.TargetsMap { + m[arn] = st + } + return +} + func newBucketResyncStatus(bucket string) BucketReplicationResyncStatus { return BucketReplicationResyncStatus{ TargetsMap: make(map[string]TargetReplicationResyncStatus), diff --git a/cmd/bucket-replication-utils_gen.go b/cmd/bucket-replication-utils_gen.go index c8b22c572..d69039c44 100644 --- a/cmd/bucket-replication-utils_gen.go +++ b/cmd/bucket-replication-utils_gen.go @@ -2117,10 +2117,10 @@ func (z *TargetReplicationResyncStatus) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "StartTime") return } - case "et": - z.EndTime, err = dc.ReadTime() + case "lst": + z.LastUpdate, err = dc.ReadTime() if err != nil { - err = msgp.WrapError(err, "EndTime") + err = msgp.WrapError(err, "LastUpdate") return } case "id": @@ -2205,14 +2205,14 @@ func (z *TargetReplicationResyncStatus) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "StartTime") return } - // write "et" - err = en.Append(0xa2, 0x65, 0x74) + // write "lst" + err = en.Append(0xa3, 0x6c, 0x73, 0x74) if err != nil { return } - err = en.WriteTime(z.EndTime) + err = en.WriteTime(z.LastUpdate) if err != nil { - err = msgp.WrapError(err, "EndTime") + err = msgp.WrapError(err, "LastUpdate") return } // write "id" @@ -2315,9 +2315,9 @@ func (z *TargetReplicationResyncStatus) MarshalMsg(b []byte) (o []byte, err erro // string "st" o = append(o, 0x8b, 0xa2, 0x73, 0x74) o = msgp.AppendTime(o, z.StartTime) - // string "et" - o = append(o, 0xa2, 0x65, 0x74) - o = msgp.AppendTime(o, z.EndTime) + // string "lst" + o = append(o, 0xa3, 0x6c, 0x73, 0x74) + o = msgp.AppendTime(o, z.LastUpdate) // string "id" o = append(o, 0xa2, 0x69, 0x64) o = msgp.AppendString(o, z.ResyncID) @@ -2372,10 +2372,10 @@ func (z *TargetReplicationResyncStatus) UnmarshalMsg(bts []byte) (o []byte, err err = msgp.WrapError(err, "StartTime") return } - case "et": - z.EndTime, bts, err = msgp.ReadTimeBytes(bts) + case "lst": + z.LastUpdate, bts, err = msgp.ReadTimeBytes(bts) if err != nil { - err = msgp.WrapError(err, "EndTime") + err = msgp.WrapError(err, "LastUpdate") return } case "id": @@ -2450,7 +2450,7 @@ func (z *TargetReplicationResyncStatus) UnmarshalMsg(bts []byte) (o []byte, err // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *TargetReplicationResyncStatus) Msgsize() (s int) { - s = 1 + 3 + msgp.TimeSize + 3 + msgp.TimeSize + 3 + msgp.StringPrefixSize + len(z.ResyncID) + 4 + msgp.TimeSize + 4 + msgp.IntSize + 3 + msgp.Int64Size + 4 + msgp.Int64Size + 3 + msgp.Int64Size + 4 + msgp.Int64Size + 4 + msgp.StringPrefixSize + len(z.Bucket) + 4 + msgp.StringPrefixSize + len(z.Object) + s = 1 + 3 + msgp.TimeSize + 4 + msgp.TimeSize + 3 + msgp.StringPrefixSize + len(z.ResyncID) + 4 + msgp.TimeSize + 4 + msgp.IntSize + 3 + msgp.Int64Size + 4 + msgp.Int64Size + 3 + msgp.Int64Size + 4 + msgp.Int64Size + 4 + msgp.StringPrefixSize + len(z.Bucket) + 4 + msgp.StringPrefixSize + len(z.Object) return } diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 63b5104ad..80021ef1b 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -24,6 +24,7 @@ import ( "fmt" "io" "math" + "math/rand" "net/http" "path" "reflect" @@ -1519,17 +1520,16 @@ type ReplicationPool struct { existingReplicaDeleteCh chan DeletedObjectReplicationInfo mrfSaveCh chan MRFReplicateEntry saveStateCh chan struct{} - - workerSize int - mrfWorkerSize int - activeWorkers int32 - activeMRFWorkers int32 - priority string - resyncState replicationResyncState - workerWg sync.WaitGroup - mrfWorkerWg sync.WaitGroup - once sync.Once - mu sync.RWMutex + workerSize int + mrfWorkerSize int + activeWorkers int32 + activeMRFWorkers int32 + priority string + resyncer *replicationResyncer + workerWg sync.WaitGroup + mrfWorkerWg sync.WaitGroup + once sync.Once + mu sync.RWMutex } const ( @@ -1578,7 +1578,7 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool mrfWorkerKillCh: make(chan struct{}, failedWorkers), existingReplicaCh: make(chan ReplicateObjectInfo, 100000), existingReplicaDeleteCh: make(chan DeletedObjectReplicationInfo, 100000), - resyncState: replicationResyncState{statusMap: make(map[string]BucketReplicationResyncStatus)}, + resyncer: newresyncer(), mrfSaveCh: make(chan MRFReplicateEntry, 100000), saveStateCh: make(chan struct{}, 1), ctx: ctx, @@ -1589,7 +1589,7 @@ func NewReplicationPool(ctx context.Context, o ObjectLayer, opts replicationPool pool.ResizeWorkers(workers) pool.ResizeFailedWorkers(failedWorkers) go pool.AddExistingObjectReplicateWorker() - go pool.updateResyncStatus(ctx, o) + go pool.resyncer.PersistToDisk(ctx, o) go pool.processMRF() go pool.persistMRF() go pool.saveStatsToDisk() @@ -2147,8 +2147,8 @@ func resyncTarget(oi ObjectInfo, arn string, resetID string, resetBeforeDate tim const resyncTimeInterval = time.Minute * 1 -// updateResyncStatus persists in-memory resync metadata stats to disk at periodic intervals -func (p *ReplicationPool) updateResyncStatus(ctx context.Context, objectAPI ObjectLayer) { +// PersistToDisk persists in-memory resync metadata stats to disk at periodic intervals +func (s *replicationResyncer) PersistToDisk(ctx context.Context, objectAPI ObjectLayer) { resyncTimer := time.NewTimer(resyncTimeInterval) defer resyncTimer.Stop() @@ -2159,12 +2159,12 @@ func (p *ReplicationPool) updateResyncStatus(ctx context.Context, objectAPI Obje for { select { case <-resyncTimer.C: - p.resyncState.RLock() - for bucket, brs := range p.resyncState.statusMap { + s.RLock() + for bucket, brs := range s.statusMap { var updt bool // Save the replication status if one resync to any bucket target is still not finished for _, st := range brs.TargetsMap { - if st.EndTime.Equal(timeSentinel) { + if st.LastUpdate.Equal(timeSentinel) { updt = true break } @@ -2181,7 +2181,7 @@ func (p *ReplicationPool) updateResyncStatus(ctx context.Context, objectAPI Obje } } } - p.resyncState.RUnlock() + s.RUnlock() resyncTimer.Reset(resyncTimeInterval) case <-ctx.Done(): @@ -2192,31 +2192,54 @@ func (p *ReplicationPool) updateResyncStatus(ctx context.Context, objectAPI Obje } } +const resyncWorkerCnt = 50 // limit of number of bucket resyncs is progress at any given time + +func newresyncer() *replicationResyncer { + rs := replicationResyncer{ + statusMap: make(map[string]BucketReplicationResyncStatus), + workerSize: resyncWorkerCnt, + resyncCancelCh: make(chan struct{}, resyncWorkerCnt), + workerCh: make(chan struct{}, resyncWorkerCnt), + } + for i := 0; i < rs.workerSize; i++ { + rs.workerCh <- struct{}{} + } + return &rs +} + // resyncBucket resyncs all qualifying objects as per replication rules for the target // ARN -func resyncBucket(ctx context.Context, bucket, arn string, heal bool, objectAPI ObjectLayer) { +func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI ObjectLayer, heal bool, opts resyncOpts) { + select { + case <-s.workerCh: // block till a worker is available + case <-ctx.Done(): + return + } + resyncStatus := ResyncFailed defer func() { - globalReplicationPool.resyncState.Lock() - m := globalReplicationPool.resyncState.statusMap[bucket] - st := m.TargetsMap[arn] - st.EndTime = UTCNow() + s.Lock() + m := s.statusMap[opts.bucket] + st := m.TargetsMap[opts.arn] + st.LastUpdate = UTCNow() st.ResyncStatus = resyncStatus - m.TargetsMap[arn] = st + m.TargetsMap[opts.arn] = st m.LastUpdate = UTCNow() - globalReplicationPool.resyncState.statusMap[bucket] = m - globalReplicationPool.resyncState.Unlock() + s.statusMap[opts.bucket] = m + s.Unlock() + globalSiteResyncMetrics.incBucket(opts, resyncStatus) + s.workerCh <- struct{}{} }() // Allocate new results channel to receive ObjectInfo. objInfoCh := make(chan ObjectInfo) - cfg, err := getReplicationConfig(ctx, bucket) + cfg, err := getReplicationConfig(ctx, opts.bucket) if err != nil { - logger.LogIf(ctx, fmt.Errorf("Replication resync of %s for arn %s failed with %w", bucket, arn, err)) + logger.LogIf(ctx, fmt.Errorf("Replication resync of %s for arn %s failed with %w", opts.bucket, opts.arn, err)) return } - tgts, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket) + tgts, err := globalBucketTargetSys.ListBucketTargets(ctx, opts.bucket) if err != nil { - logger.LogIf(ctx, fmt.Errorf("Replication resync of %s for arn %s failed %w", bucket, arn, err)) + logger.LogIf(ctx, fmt.Errorf("Replication resync of %s for arn %s failed %w", opts.bucket, opts.arn, err)) return } rcfg := replicationConfig{ @@ -2226,34 +2249,50 @@ func resyncBucket(ctx context.Context, bucket, arn string, heal bool, objectAPI tgtArns := cfg.FilterTargetArns( replication.ObjectOpts{ OpType: replication.ResyncReplicationType, - TargetArn: arn, + TargetArn: opts.arn, }) if len(tgtArns) != 1 { - logger.LogIf(ctx, fmt.Errorf("Replication resync failed for %s - arn specified %s is missing in the replication config", bucket, arn)) + logger.LogIf(ctx, fmt.Errorf("Replication resync failed for %s - arn specified %s is missing in the replication config", opts.bucket, opts.arn)) return } - tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, arn) + tgt := globalBucketTargetSys.GetRemoteTargetClient(ctx, opts.arn) if tgt == nil { - logger.LogIf(ctx, fmt.Errorf("Replication resync failed for %s - target could not be created for arn %s", bucket, arn)) + logger.LogIf(ctx, fmt.Errorf("Replication resync failed for %s - target could not be created for arn %s", opts.bucket, opts.arn)) return } - + // mark resync status as resync started + if !heal { + s.Lock() + m := s.statusMap[opts.bucket] + st := m.TargetsMap[opts.arn] + st.ResyncStatus = ResyncStarted + m.TargetsMap[opts.arn] = st + m.LastUpdate = UTCNow() + s.statusMap[opts.bucket] = m + s.Unlock() + } // Walk through all object versions - Walk() is always in ascending order needed to ensure // delete marker replicated to target after object version is first created. - if err := objectAPI.Walk(ctx, bucket, "", objInfoCh, ObjectOptions{}); err != nil { + if err := objectAPI.Walk(ctx, opts.bucket, "", objInfoCh, ObjectOptions{}); err != nil { logger.LogIf(ctx, err) return } - globalReplicationPool.resyncState.RLock() - m := globalReplicationPool.resyncState.statusMap[bucket] - st := m.TargetsMap[arn] - globalReplicationPool.resyncState.RUnlock() + s.RLock() + m := s.statusMap[opts.bucket] + st := m.TargetsMap[opts.arn] + s.RUnlock() var lastCheckpoint string if st.ResyncStatus == ResyncStarted || st.ResyncStatus == ResyncFailed { lastCheckpoint = st.Object } for obj := range objInfoCh { + select { + case <-s.resyncCancelCh: + resyncStatus = ResyncCanceled + return + default: + } if heal && lastCheckpoint != "" && lastCheckpoint != obj.Name { continue } @@ -2263,7 +2302,7 @@ func resyncBucket(ctx context.Context, bucket, arn string, heal bool, objectAPI if !roi.ExistingObjResync.mustResync() { continue } - + traceFn := s.trace(tgt.ResetID, fmt.Sprintf("%s/%s (%s)", opts.bucket, roi.Name, roi.VersionID)) if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() { versionID := "" dmVersionID := "" @@ -2298,96 +2337,134 @@ func resyncBucket(ctx context.Context, bucket, arn string, heal bool, objectAPI ReplicationProxyRequest: "false", }, }) - globalReplicationPool.resyncState.Lock() - m = globalReplicationPool.resyncState.statusMap[bucket] - st = m.TargetsMap[arn] + s.Lock() + m = s.statusMap[opts.bucket] + st = m.TargetsMap[opts.arn] st.Object = roi.Name + success := true if err != nil { - if roi.DeleteMarker && isErrMethodNotAllowed(ErrorRespToObjectError(err, bucket, roi.Name)) { + if roi.DeleteMarker && isErrMethodNotAllowed(ErrorRespToObjectError(err, opts.bucket, roi.Name)) { st.ReplicatedCount++ } else { st.FailedCount++ + success = false } } else { st.ReplicatedCount++ st.ReplicatedSize += roi.Size } - m.TargetsMap[arn] = st + m.TargetsMap[opts.arn] = st m.LastUpdate = UTCNow() - globalReplicationPool.resyncState.statusMap[bucket] = m - globalReplicationPool.resyncState.Unlock() + s.statusMap[opts.bucket] = m + s.Unlock() + traceFn(err) + globalSiteResyncMetrics.updateMetric(roi, success, opts.resyncID) } resyncStatus = ResyncCompleted } // start replication resync for the remote target ARN specified -func startReplicationResync(ctx context.Context, bucket, arn, resyncID string, resyncBeforeDate time.Time, objAPI ObjectLayer) error { - if bucket == "" { +func (s *replicationResyncer) start(ctx context.Context, objAPI ObjectLayer, opts resyncOpts) error { + if opts.bucket == "" { return fmt.Errorf("bucket name is empty") } - if arn == "" { + if opts.arn == "" { return fmt.Errorf("target ARN specified for resync is empty") } // Check if the current bucket has quota restrictions, if not skip it - cfg, err := getReplicationConfig(ctx, bucket) + cfg, err := getReplicationConfig(ctx, opts.bucket) if err != nil { return err } tgtArns := cfg.FilterTargetArns( replication.ObjectOpts{ OpType: replication.ResyncReplicationType, - TargetArn: arn, + TargetArn: opts.arn, }) if len(tgtArns) == 0 { - return fmt.Errorf("arn %s specified for resync not found in replication config", arn) + return fmt.Errorf("arn %s specified for resync not found in replication config", opts.arn) } - - data, err := loadBucketResyncMetadata(ctx, bucket, objAPI) - if err != nil { - return err + globalReplicationPool.resyncer.RLock() + data, ok := globalReplicationPool.resyncer.statusMap[opts.bucket] + globalReplicationPool.resyncer.RUnlock() + if !ok { + data, err = loadBucketResyncMetadata(ctx, opts.bucket, objAPI) + if err != nil { + return err + } } // validate if resync is in progress for this arn for tArn, st := range data.TargetsMap { - if arn == tArn && st.ResyncStatus == ResyncStarted { - return fmt.Errorf("Resync of bucket %s is already in progress for remote bucket %s", bucket, arn) + if opts.arn == tArn && (st.ResyncStatus == ResyncStarted || st.ResyncStatus == ResyncPending) { + return fmt.Errorf("Resync of bucket %s is already in progress for remote bucket %s", opts.bucket, opts.arn) } } status := TargetReplicationResyncStatus{ - ResyncID: resyncID, - ResyncBeforeDate: resyncBeforeDate, + ResyncID: opts.resyncID, + ResyncBeforeDate: opts.resyncBefore, StartTime: UTCNow(), - ResyncStatus: ResyncStarted, - Bucket: bucket, + ResyncStatus: ResyncPending, + Bucket: opts.bucket, } - data.TargetsMap[arn] = status - if err = saveResyncStatus(ctx, bucket, data, objAPI); err != nil { + data.TargetsMap[opts.arn] = status + if err = saveResyncStatus(ctx, opts.bucket, data, objAPI); err != nil { return err } - globalReplicationPool.resyncState.Lock() - defer globalReplicationPool.resyncState.Unlock() - brs, ok := globalReplicationPool.resyncState.statusMap[bucket] + + globalReplicationPool.resyncer.Lock() + defer globalReplicationPool.resyncer.Unlock() + brs, ok := globalReplicationPool.resyncer.statusMap[opts.bucket] if !ok { brs = BucketReplicationResyncStatus{ Version: resyncMetaVersion, TargetsMap: make(map[string]TargetReplicationResyncStatus), } } - brs.TargetsMap[arn] = status - globalReplicationPool.resyncState.statusMap[bucket] = brs - go resyncBucket(GlobalContext, bucket, arn, false, objAPI) + brs.TargetsMap[opts.arn] = status + globalReplicationPool.resyncer.statusMap[opts.bucket] = brs + go globalReplicationPool.resyncer.resyncBucket(GlobalContext, objAPI, false, opts) return nil } +func (s *replicationResyncer) trace(resyncID string, path string) func(err error) { + startTime := time.Now() + return func(err error) { + duration := time.Since(startTime) + if globalTrace.NumSubscribers(madmin.TraceReplicationResync) > 0 { + globalTrace.Publish(replicationResyncTrace(resyncID, startTime, duration, path, err)) + } + } +} + +func replicationResyncTrace(resyncID string, startTime time.Time, duration time.Duration, path string, err error) madmin.TraceInfo { + var errStr string + if err != nil { + errStr = err.Error() + } + funcName := fmt.Sprintf("replication.(resyncID=%s)", resyncID) + return madmin.TraceInfo{ + TraceType: madmin.TraceReplicationResync, + Time: startTime, + NodeName: globalLocalNodeName, + FuncName: funcName, + Duration: duration, + Path: path, + Error: errStr, + } +} + // delete resync metadata from replication resync state in memory func (p *ReplicationPool) deleteResyncMetadata(ctx context.Context, bucket string) { if p == nil { return } - p.resyncState.Lock() - delete(p.resyncState.statusMap, bucket) - defer p.resyncState.Unlock() + p.resyncer.Lock() + delete(p.resyncer.statusMap, bucket) + defer p.resyncer.Unlock() + + globalSiteResyncMetrics.deleteBucket(bucket) } // initResync - initializes bucket replication resync for all buckets. @@ -2396,12 +2473,44 @@ func (p *ReplicationPool) initResync(ctx context.Context, buckets []BucketInfo, return errServerNotInitialized } // Load bucket metadata sys in background - go p.loadResync(ctx, buckets, objAPI) + go p.startResyncRoutine(ctx, buckets, objAPI) return nil } +func (p *ReplicationPool) startResyncRoutine(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + // Run the replication resync in a loop + for { + if err := p.loadResync(ctx, buckets, objAPI); err == nil { + <-ctx.Done() + return + } + duration := time.Duration(r.Float64() * float64(time.Minute)) + if duration < time.Second { + // Make sure to sleep atleast a second to avoid high CPU ticks. + duration = time.Second + } + time.Sleep(duration) + } +} + +var replicationResyncLockTimeout = newDynamicTimeoutWithOpts(dynamicTimeoutOpts{ + timeout: 30 * time.Second, + minimum: 10 * time.Second, + retryInterval: time.Second, +}) + // Loads bucket replication resync statuses into memory. -func (p *ReplicationPool) loadResync(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) { +func (p *ReplicationPool) loadResync(ctx context.Context, buckets []BucketInfo, objAPI ObjectLayer) error { + // Make sure only one node running resync on the cluster. + locker := objAPI.NewNSLock(minioMetaBucket, "replication/resync.lock") + lkctx, err := locker.GetLock(ctx, replicationResyncLockTimeout) + if err != nil { + return err + } + ctx = lkctx.Context() + defer lkctx.Cancel() + // No unlock for "leader" lock. for index := range buckets { meta, err := loadBucketResyncMetadata(ctx, buckets[index].Name, objAPI) if err != nil { @@ -2410,30 +2519,38 @@ func (p *ReplicationPool) loadResync(ctx context.Context, buckets []BucketInfo, } continue } - p.resyncState.Lock() - p.resyncState.statusMap[buckets[index].Name] = meta - p.resyncState.Unlock() + + p.resyncer.Lock() + p.resyncer.statusMap[buckets[index].Name] = meta + p.resyncer.Unlock() } for index := range buckets { bucket := buckets[index].Name - p.resyncState.RLock() - m, ok := p.resyncState.statusMap[bucket] - p.resyncState.RUnlock() - + var tgts map[string]TargetReplicationResyncStatus + p.resyncer.RLock() + m, ok := p.resyncer.statusMap[bucket] if ok { - for arn, st := range m.TargetsMap { - if st.ResyncStatus == ResyncFailed || st.ResyncStatus == ResyncStarted { - go resyncBucket(ctx, bucket, arn, true, objAPI) - } + tgts = m.cloneTgtStats() + } + p.resyncer.RUnlock() + for arn, st := range tgts { + switch st.ResyncStatus { + case ResyncFailed, ResyncStarted, ResyncPending: + go p.resyncer.resyncBucket(ctx, objAPI, true, resyncOpts{ + bucket: bucket, + arn: arn, + resyncID: st.ResyncID, + resyncBefore: st.ResyncBeforeDate, + }) } } } + return nil } // load bucket resync metadata from disk func loadBucketResyncMetadata(ctx context.Context, bucket string, objAPI ObjectLayer) (brs BucketReplicationResyncStatus, e error) { brs = newBucketResyncStatus(bucket) - resyncDirPath := path.Join(bucketMetaPrefix, bucket, replicationDir) data, err := readConfig(GlobalContext, objAPI, pathJoin(resyncDirPath, resyncFileName)) if err != nil && err != errConfigNotFound { diff --git a/cmd/bucket-targets.go b/cmd/bucket-targets.go index 72dd1346d..7e3758df9 100644 --- a/cmd/bucket-targets.go +++ b/cmd/bucket-targets.go @@ -487,6 +487,22 @@ func (sys *BucketTargetSys) getRemoteARN(bucket string, target *madmin.BucketTar return generateARN(target) } +// getRemoteARNForPeer returns the remote target for a peer site in site replication +func (sys *BucketTargetSys) getRemoteARNForPeer(bucket string, peer madmin.PeerInfo) string { + tgts := sys.targetsMap[bucket] + for _, target := range tgts { + ep, _ := url.Parse(peer.Endpoint) + if target.SourceBucket == bucket && + target.TargetBucket == bucket && + target.Endpoint == ep.Host && + target.Secure == (ep.Scheme == "https") && + target.Type == madmin.ReplicationService { + return target.Arn + } + } + return "" +} + // generate ARN that is unique to this target type func generateARN(t *madmin.BucketTarget) string { arn := madmin.ARN{ diff --git a/cmd/globals.go b/cmd/globals.go index 74fe21542..829b97388 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -281,6 +281,9 @@ var ( // Cluster replication manager. globalSiteReplicationSys SiteReplicationSys + // Cluster replication resync metrics + globalSiteResyncMetrics *siteResyncMetrics + // Is set to true when Bucket federation is requested // and is 'true' when etcdConfig.PathPrefix is empty globalBucketFederation bool diff --git a/cmd/metrics-realtime.go b/cmd/metrics-realtime.go index c43111c5e..e9e272a0d 100644 --- a/cmd/metrics-realtime.go +++ b/cmd/metrics-realtime.go @@ -29,6 +29,7 @@ type collectMetricsOpts struct { hosts map[string]struct{} disks map[string]struct{} jobID string + depID string } func collectLocalMetrics(types madmin.MetricType, opts collectMetricsOpts) (m madmin.RealtimeMetrics) { @@ -65,7 +66,9 @@ func collectLocalMetrics(types madmin.MetricType, opts collectMetricsOpts) (m ma if types.Contains(madmin.MetricsBatchJobs) { m.Aggregated.BatchJobs = globalBatchJobsMetrics.report(opts.jobID) } - + if types.Contains(madmin.MetricsSiteResync) { + m.Aggregated.SiteResync = globalSiteResyncMetrics.report(opts.depID) + } // Add types... // ByHost is a shallow reference, so careful about sharing. diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 3984e2964..88c0a2726 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -202,6 +202,8 @@ func (client *peerRESTClient) GetMetrics(ctx context.Context, t madmin.MetricTyp values.Set(peerRESTDisk, disk) } values.Set(peerRESTJobID, opts.jobID) + values.Set(peerRESTDepID, opts.depID) + respBody, err := client.callWithContext(ctx, peerRESTMethodMetrics, values, nil, -1) if err != nil { return diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go index 11724c826..739ab9599 100644 --- a/cmd/peer-rest-common.go +++ b/cmd/peer-rest-common.go @@ -97,6 +97,7 @@ const ( peerRESTMetricsTypes = "types" peerRESTDisk = "disk" peerRESTJobID = "job-id" + peerRESTDepID = "depID" peerRESTStartRebalance = "start-rebalance" peerRESTListenBucket = "bucket" diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index a381f6eca..4b0dbe014 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -437,6 +437,7 @@ func (s *peerRESTServer) GetMetricsHandler(w http.ResponseWriter, r *http.Reques diskMap[disk] = struct{}{} } jobID := r.Form.Get(peerRESTJobID) + depID := r.Form.Get(peerRESTDepID) ctx, cancel := context.WithCancel(r.Context()) defer cancel() @@ -444,8 +445,8 @@ func (s *peerRESTServer) GetMetricsHandler(w http.ResponseWriter, r *http.Reques info := collectLocalMetrics(types, collectMetricsOpts{ disks: diskMap, jobID: jobID, + depID: depID, }) - logger.LogIf(ctx, gob.NewEncoder(w).Encode(info)) } diff --git a/cmd/server-main.go b/cmd/server-main.go index 618de8978..77c3e92ac 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -330,6 +330,8 @@ func initAllSubsystems(ctx context.Context) { // Create new ILM tier configuration subsystem globalTierConfigMgr = NewTierConfigMgr() + + globalSiteResyncMetrics = newSiteResyncMetrics(GlobalContext) } func configRetriableErrors(err error) bool { diff --git a/cmd/site-replication-utils.go b/cmd/site-replication-utils.go new file mode 100644 index 000000000..b8af49aa0 --- /dev/null +++ b/cmd/site-replication-utils.go @@ -0,0 +1,334 @@ +// Copyright (c) 2015-2022 MinIO, Inc. +// +// This file is part of MinIO Object Storage stack +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "context" + "math/rand" + "sync" + "time" + + "github.com/minio/madmin-go" +) + +//go:generate msgp -file=$GOFILE + +// SiteResyncStatus captures current replication resync status for a target site +type SiteResyncStatus struct { + Version int `json:"version" msg:"v"` + // Overall site status + Status ResyncStatusType `json:"st" msg:"ss"` + DeplID string `json:"dId" msg:"did"` + BucketStatuses map[string]ResyncStatusType `json:"buckets" msg:"bkts"` + TotBuckets int `json:"totbuckets" msg:"tb"` + TargetReplicationResyncStatus `json:"currSt" msg:"cst"` +} + +func (s *SiteResyncStatus) clone() SiteResyncStatus { + if s == nil { + return SiteResyncStatus{} + } + o := *s + o.BucketStatuses = make(map[string]ResyncStatusType, len(s.BucketStatuses)) + for b, st := range s.BucketStatuses { + o.BucketStatuses[b] = st + } + return o +} + +const ( + siteResyncPrefix = bucketMetaPrefix + "/site-replication/resync" +) + +type resyncState struct { + resyncID string + LastSaved time.Time +} + +//msgp:ignore siteResyncMetrics +type siteResyncMetrics struct { + sync.RWMutex + // resyncStatus maps resync ID to resync status for peer + resyncStatus map[string]SiteResyncStatus + // map peer deployment ID to resync ID + peerResyncMap map[string]resyncState +} + +func newSiteResyncMetrics(ctx context.Context) *siteResyncMetrics { + s := siteResyncMetrics{ + resyncStatus: make(map[string]SiteResyncStatus), + peerResyncMap: make(map[string]resyncState), + } + go s.save(ctx) + go s.init(ctx) + return &s +} + +// init site resync metrics +func (sm *siteResyncMetrics) init(ctx context.Context) { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + // Run the site resync metrics load in a loop + for { + if err := sm.load(ctx, newObjectLayerFn()); err == nil { + <-ctx.Done() + return + } + duration := time.Duration(r.Float64() * float64(time.Second*10)) + if duration < time.Second { + // Make sure to sleep atleast a second to avoid high CPU ticks. + duration = time.Second + } + time.Sleep(duration) + } +} + +// load resync metrics saved on disk into memory +func (sm *siteResyncMetrics) load(ctx context.Context, objAPI ObjectLayer) error { + if objAPI == nil { + return errServerNotInitialized + } + info, err := globalSiteReplicationSys.GetClusterInfo(ctx) + if err != nil { + return err + } + if !info.Enabled { + return nil + } + for _, peer := range info.Sites { + if peer.DeploymentID == globalDeploymentID { + continue + } + rs, err := loadSiteResyncMetadata(ctx, objAPI, peer.DeploymentID) + if err != nil { + return err + } + sm.Lock() + defer sm.Unlock() + if _, ok := sm.peerResyncMap[peer.DeploymentID]; !ok { + sm.peerResyncMap[peer.DeploymentID] = resyncState{resyncID: rs.ResyncID, LastSaved: time.Time{}} + sm.resyncStatus[rs.ResyncID] = rs + } + } + return nil +} + +func (sm *siteResyncMetrics) report(dID string) *madmin.SiteResyncMetrics { + sm.RLock() + defer sm.RUnlock() + rst, ok := sm.peerResyncMap[dID] + if !ok { + return nil + } + rs, ok := sm.resyncStatus[rst.resyncID] + if !ok { + return nil + } + m := madmin.SiteResyncMetrics{ + CollectedAt: rs.LastUpdate, + StartTime: rs.StartTime, + LastUpdate: rs.LastUpdate, + ResyncStatus: rs.Status.String(), + ResyncID: rst.resyncID, + DeplID: rs.DeplID, + ReplicatedSize: rs.ReplicatedSize, + ReplicatedCount: rs.ReplicatedCount, + FailedSize: rs.FailedSize, + FailedCount: rs.FailedCount, + Bucket: rs.Bucket, + Object: rs.Object, + NumBuckets: int64(rs.TotBuckets), + } + for b, st := range rs.BucketStatuses { + if st == ResyncFailed { + m.FailedBuckets = append(m.FailedBuckets, b) + } + } + return &m +} + +// save in-memory stats to disk +func (sm *siteResyncMetrics) save(ctx context.Context) { + sTimer := time.NewTimer(siteResyncSaveInterval) + defer sTimer.Stop() + for { + select { + case <-sTimer.C: + if globalSiteReplicationSys.isEnabled() { + sm.Lock() + for dID, rs := range sm.peerResyncMap { + st, ok := sm.resyncStatus[rs.resyncID] + if ok { + updt := st.Status.isValid() && st.LastUpdate.After(rs.LastSaved) + if !updt { + continue + } + rs.LastSaved = UTCNow() + sm.peerResyncMap[dID] = rs + go saveSiteResyncMetadata(ctx, st, newObjectLayerFn()) + } + } + sm.Unlock() + } + sTimer.Reset(siteResyncSaveInterval) + case <-ctx.Done(): + return + } + } +} + +// update overall site resync state +func (sm *siteResyncMetrics) updateState(s SiteResyncStatus) { + if !globalSiteReplicationSys.isEnabled() { + return + } + sm.Lock() + defer sm.Unlock() + switch s.Status { + case ResyncStarted: + sm.peerResyncMap[s.DeplID] = resyncState{resyncID: s.ResyncID, LastSaved: time.Time{}} + sm.resyncStatus[s.ResyncID] = s + case ResyncCompleted, ResyncCanceled, ResyncFailed: + st, ok := sm.resyncStatus[s.ResyncID] + if ok { + st.LastUpdate = s.LastUpdate + st.Status = s.Status + } + sm.resyncStatus[s.ResyncID] = st + } +} + +// increment SyncedBuckets count +func (sm *siteResyncMetrics) incBucket(o resyncOpts, bktStatus ResyncStatusType) { + if !globalSiteReplicationSys.isEnabled() { + return + } + sm.Lock() + defer sm.Unlock() + st, ok := sm.resyncStatus[o.resyncID] + if ok { + switch bktStatus { + case ResyncCompleted: + st.BucketStatuses[o.bucket] = ResyncCompleted + st.Status = siteResyncStatus(st.Status, st.BucketStatuses) + st.LastUpdate = UTCNow() + sm.resyncStatus[o.resyncID] = st + case ResyncFailed: + st.BucketStatuses[o.bucket] = ResyncFailed + st.Status = siteResyncStatus(st.Status, st.BucketStatuses) + st.LastUpdate = UTCNow() + sm.resyncStatus[o.resyncID] = st + } + } +} + +// remove deleted bucket from active resync tracking +func (sm *siteResyncMetrics) deleteBucket(b string) { + if !globalSiteReplicationSys.isEnabled() { + return + } + sm.Lock() + defer sm.Unlock() + for _, rs := range sm.peerResyncMap { + st, ok := sm.resyncStatus[rs.resyncID] + if !ok { + return + } + switch st.Status { + case ResyncCompleted, ResyncFailed: + return + default: + delete(st.BucketStatuses, b) + } + } +} + +// returns overall resync status from individual bucket resync status map +func siteResyncStatus(currSt ResyncStatusType, m map[string]ResyncStatusType) ResyncStatusType { + // avoid overwriting canceled resync status + if currSt != ResyncStarted { + return currSt + } + totBuckets := len(m) + var cmpCount, failCount int + for _, st := range m { + switch st { + case ResyncCompleted: + cmpCount++ + case ResyncFailed: + failCount++ + } + } + if cmpCount == totBuckets { + return ResyncCompleted + } + if cmpCount+failCount == totBuckets { + return ResyncFailed + } + return ResyncStarted +} + +// update resync metrics per object +func (sm *siteResyncMetrics) updateMetric(roi ReplicateObjectInfo, success bool, resyncID string) { + if !globalSiteReplicationSys.isEnabled() { + return + } + sm.Lock() + defer sm.Unlock() + s := sm.resyncStatus[resyncID] + if success { + s.ReplicatedCount++ + s.ReplicatedSize += roi.Size + } else { + s.FailedCount++ + s.FailedSize += roi.Size + } + s.Bucket = roi.Bucket + s.Object = roi.Name + s.LastUpdate = UTCNow() + sm.resyncStatus[resyncID] = s +} + +// Status returns current in-memory resync status for this deployment +func (sm *siteResyncMetrics) status(dID string) (rs SiteResyncStatus, err error) { + sm.RLock() + defer sm.RUnlock() + if rst, ok1 := sm.peerResyncMap[dID]; ok1 { + if st, ok2 := sm.resyncStatus[rst.resyncID]; ok2 { + return st.clone(), nil + } + } + return rs, errSRNoResync +} + +// Status returns latest resync status for this deployment +func (sm *siteResyncMetrics) siteStatus(ctx context.Context, objAPI ObjectLayer, dID string) (rs SiteResyncStatus, err error) { + if !globalSiteReplicationSys.isEnabled() { + return rs, errSRNotEnabled + } + // check in-memory status + rs, err = sm.status(dID) + if err == nil { + return rs, nil + } + // check disk resync status + rs, err = loadSiteResyncMetadata(ctx, objAPI, dID) + if err != nil && err == errConfigNotFound { + return rs, nil + } + return rs, err +} diff --git a/cmd/site-replication-utils_gen.go b/cmd/site-replication-utils_gen.go new file mode 100644 index 000000000..81f0ff4e9 --- /dev/null +++ b/cmd/site-replication-utils_gen.go @@ -0,0 +1,318 @@ +package cmd + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "github.com/tinylib/msgp/msgp" +) + +// DecodeMsg implements msgp.Decodable +func (z *SiteResyncStatus) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "v": + z.Version, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "Version") + return + } + case "ss": + err = z.Status.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "Status") + return + } + case "did": + z.DeplID, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "DeplID") + return + } + case "bkts": + var zb0002 uint32 + zb0002, err = dc.ReadMapHeader() + if err != nil { + err = msgp.WrapError(err, "BucketStatuses") + return + } + if z.BucketStatuses == nil { + z.BucketStatuses = make(map[string]ResyncStatusType, zb0002) + } else if len(z.BucketStatuses) > 0 { + for key := range z.BucketStatuses { + delete(z.BucketStatuses, key) + } + } + for zb0002 > 0 { + zb0002-- + var za0001 string + var za0002 ResyncStatusType + za0001, err = dc.ReadString() + if err != nil { + err = msgp.WrapError(err, "BucketStatuses") + return + } + err = za0002.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "BucketStatuses", za0001) + return + } + z.BucketStatuses[za0001] = za0002 + } + case "tb": + z.TotBuckets, err = dc.ReadInt() + if err != nil { + err = msgp.WrapError(err, "TotBuckets") + return + } + case "cst": + err = z.TargetReplicationResyncStatus.DecodeMsg(dc) + if err != nil { + err = msgp.WrapError(err, "TargetReplicationResyncStatus") + return + } + default: + err = dc.Skip() + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *SiteResyncStatus) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 6 + // write "v" + err = en.Append(0x86, 0xa1, 0x76) + if err != nil { + return + } + err = en.WriteInt(z.Version) + if err != nil { + err = msgp.WrapError(err, "Version") + return + } + // write "ss" + err = en.Append(0xa2, 0x73, 0x73) + if err != nil { + return + } + err = z.Status.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "Status") + return + } + // write "did" + err = en.Append(0xa3, 0x64, 0x69, 0x64) + if err != nil { + return + } + err = en.WriteString(z.DeplID) + if err != nil { + err = msgp.WrapError(err, "DeplID") + return + } + // write "bkts" + err = en.Append(0xa4, 0x62, 0x6b, 0x74, 0x73) + if err != nil { + return + } + err = en.WriteMapHeader(uint32(len(z.BucketStatuses))) + if err != nil { + err = msgp.WrapError(err, "BucketStatuses") + return + } + for za0001, za0002 := range z.BucketStatuses { + err = en.WriteString(za0001) + if err != nil { + err = msgp.WrapError(err, "BucketStatuses") + return + } + err = za0002.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "BucketStatuses", za0001) + return + } + } + // write "tb" + err = en.Append(0xa2, 0x74, 0x62) + if err != nil { + return + } + err = en.WriteInt(z.TotBuckets) + if err != nil { + err = msgp.WrapError(err, "TotBuckets") + return + } + // write "cst" + err = en.Append(0xa3, 0x63, 0x73, 0x74) + if err != nil { + return + } + err = z.TargetReplicationResyncStatus.EncodeMsg(en) + if err != nil { + err = msgp.WrapError(err, "TargetReplicationResyncStatus") + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *SiteResyncStatus) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 6 + // string "v" + o = append(o, 0x86, 0xa1, 0x76) + o = msgp.AppendInt(o, z.Version) + // string "ss" + o = append(o, 0xa2, 0x73, 0x73) + o, err = z.Status.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "Status") + return + } + // string "did" + o = append(o, 0xa3, 0x64, 0x69, 0x64) + o = msgp.AppendString(o, z.DeplID) + // string "bkts" + o = append(o, 0xa4, 0x62, 0x6b, 0x74, 0x73) + o = msgp.AppendMapHeader(o, uint32(len(z.BucketStatuses))) + for za0001, za0002 := range z.BucketStatuses { + o = msgp.AppendString(o, za0001) + o, err = za0002.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "BucketStatuses", za0001) + return + } + } + // string "tb" + o = append(o, 0xa2, 0x74, 0x62) + o = msgp.AppendInt(o, z.TotBuckets) + // string "cst" + o = append(o, 0xa3, 0x63, 0x73, 0x74) + o, err = z.TargetReplicationResyncStatus.MarshalMsg(o) + if err != nil { + err = msgp.WrapError(err, "TargetReplicationResyncStatus") + return + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *SiteResyncStatus) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + switch msgp.UnsafeString(field) { + case "v": + z.Version, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "Version") + return + } + case "ss": + bts, err = z.Status.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "Status") + return + } + case "did": + z.DeplID, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "DeplID") + return + } + case "bkts": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "BucketStatuses") + return + } + if z.BucketStatuses == nil { + z.BucketStatuses = make(map[string]ResyncStatusType, zb0002) + } else if len(z.BucketStatuses) > 0 { + for key := range z.BucketStatuses { + delete(z.BucketStatuses, key) + } + } + for zb0002 > 0 { + var za0001 string + var za0002 ResyncStatusType + zb0002-- + za0001, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err, "BucketStatuses") + return + } + bts, err = za0002.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "BucketStatuses", za0001) + return + } + z.BucketStatuses[za0001] = za0002 + } + case "tb": + z.TotBuckets, bts, err = msgp.ReadIntBytes(bts) + if err != nil { + err = msgp.WrapError(err, "TotBuckets") + return + } + case "cst": + bts, err = z.TargetReplicationResyncStatus.UnmarshalMsg(bts) + if err != nil { + err = msgp.WrapError(err, "TargetReplicationResyncStatus") + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *SiteResyncStatus) Msgsize() (s int) { + s = 1 + 2 + msgp.IntSize + 3 + z.Status.Msgsize() + 4 + msgp.StringPrefixSize + len(z.DeplID) + 5 + msgp.MapHeaderSize + if z.BucketStatuses != nil { + for za0001, za0002 := range z.BucketStatuses { + _ = za0002 + s += msgp.StringPrefixSize + len(za0001) + za0002.Msgsize() + } + } + s += 3 + msgp.IntSize + 4 + z.TargetReplicationResyncStatus.Msgsize() + return +} diff --git a/cmd/site-replication-utils_gen_test.go b/cmd/site-replication-utils_gen_test.go new file mode 100644 index 000000000..77a68632c --- /dev/null +++ b/cmd/site-replication-utils_gen_test.go @@ -0,0 +1,123 @@ +package cmd + +// Code generated by github.com/tinylib/msgp DO NOT EDIT. + +import ( + "bytes" + "testing" + + "github.com/tinylib/msgp/msgp" +) + +func TestMarshalUnmarshalSiteResyncStatus(t *testing.T) { + v := SiteResyncStatus{} + bts, err := v.MarshalMsg(nil) + if err != nil { + t.Fatal(err) + } + left, err := v.UnmarshalMsg(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after UnmarshalMsg(): %q", len(left), left) + } + + left, err = msgp.Skip(bts) + if err != nil { + t.Fatal(err) + } + if len(left) > 0 { + t.Errorf("%d bytes left over after Skip(): %q", len(left), left) + } +} + +func BenchmarkMarshalMsgSiteResyncStatus(b *testing.B) { + v := SiteResyncStatus{} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.MarshalMsg(nil) + } +} + +func BenchmarkAppendMsgSiteResyncStatus(b *testing.B) { + v := SiteResyncStatus{} + bts := make([]byte, 0, v.Msgsize()) + bts, _ = v.MarshalMsg(bts[0:0]) + b.SetBytes(int64(len(bts))) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bts, _ = v.MarshalMsg(bts[0:0]) + } +} + +func BenchmarkUnmarshalSiteResyncStatus(b *testing.B) { + v := SiteResyncStatus{} + bts, _ := v.MarshalMsg(nil) + b.ReportAllocs() + b.SetBytes(int64(len(bts))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := v.UnmarshalMsg(bts) + if err != nil { + b.Fatal(err) + } + } +} + +func TestEncodeDecodeSiteResyncStatus(t *testing.T) { + v := SiteResyncStatus{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + + m := v.Msgsize() + if buf.Len() > m { + t.Log("WARNING: TestEncodeDecodeSiteResyncStatus Msgsize() is inaccurate") + } + + vn := SiteResyncStatus{} + err := msgp.Decode(&buf, &vn) + if err != nil { + t.Error(err) + } + + buf.Reset() + msgp.Encode(&buf, &v) + err = msgp.NewReader(&buf).Skip() + if err != nil { + t.Error(err) + } +} + +func BenchmarkEncodeSiteResyncStatus(b *testing.B) { + v := SiteResyncStatus{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + en := msgp.NewWriter(msgp.Nowhere) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v.EncodeMsg(en) + } + en.Flush() +} + +func BenchmarkDecodeSiteResyncStatus(b *testing.B) { + v := SiteResyncStatus{} + var buf bytes.Buffer + msgp.Encode(&buf, &v) + b.SetBytes(int64(buf.Len())) + rd := msgp.NewEndlessReader(buf.Bytes(), b) + dc := msgp.NewReader(rd) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := v.DecodeMsg(dc) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/cmd/site-replication.go b/cmd/site-replication.go index b0c5ef84f..08d02ac14 100644 --- a/cmd/site-replication.go +++ b/cmd/site-replication.go @@ -1,4 +1,4 @@ -// Copyright (c) 2015-2021 MinIO, Inc. +// Copyright (c) 2015-2022 MinIO, Inc. // // This file is part of MinIO Object Storage stack // @@ -21,6 +21,7 @@ import ( "bytes" "context" "encoding/base64" + "encoding/binary" "encoding/json" "encoding/xml" "errors" @@ -76,6 +77,22 @@ var ( Cause: errors.New("site replication is not enabled"), Code: ErrSiteReplicationInvalidRequest, } + errSRResyncStarted = SRError{ + Cause: errors.New("site replication resync is already in progress"), + Code: ErrSiteReplicationInvalidRequest, + } + errSRResyncCanceled = SRError{ + Cause: errors.New("site replication resync is already canceled"), + Code: ErrSiteReplicationInvalidRequest, + } + errSRNoResync = SRError{ + Cause: errors.New("no resync in progress"), + Code: ErrSiteReplicationInvalidRequest, + } + errSRResyncToSelf = SRError{ + Cause: errors.New("invalid peer specified - cannot resync to self"), + Code: ErrSiteReplicationInvalidRequest, + } ) func errSRInvalidRequest(err error) SRError { @@ -4858,3 +4875,305 @@ func (c *SiteReplicationSys) getPeerForUpload(deplID string) (pi srPeerInfo, loc } return pi, true } + +// startResync initiates resync of data to peerSite specified. The overall site resync status +// is maintained in .minio.sys/buckets/site-replication/resync/, while collecting +// individual bucket resync status in .minio.sys/buckets//replication/resync.bin +func (c *SiteReplicationSys) startResync(ctx context.Context, objAPI ObjectLayer, peer madmin.PeerInfo) (res madmin.SRResyncOpStatus, err error) { + if !c.isEnabled() { + return res, errSRNotEnabled + } + if objAPI == nil { + return res, errSRObjectLayerNotReady + } + + if peer.DeploymentID == globalDeploymentID { + return res, errSRResyncToSelf + } + if _, ok := c.state.Peers[peer.DeploymentID]; !ok { + return res, errSRPeerNotFound + } + rs, err := globalSiteResyncMetrics.siteStatus(ctx, objAPI, peer.DeploymentID) + if err != nil { + return res, err + } + if rs.Status == ResyncStarted { + return res, errSRResyncStarted + } + var buckets []BucketInfo + buckets, err = objAPI.ListBuckets(ctx, BucketOptions{}) + if err != nil { + return res, err + } + rs = newSiteResyncStatus(peer.DeploymentID, buckets) + defer func() { + if err != nil { + rs.Status = ResyncFailed + saveSiteResyncMetadata(ctx, rs, objAPI) + globalSiteResyncMetrics.updateState(rs) + } + }() + + globalSiteResyncMetrics.updateState(rs) + if err := saveSiteResyncMetadata(ctx, rs, objAPI); err != nil { + return res, err + } + + for _, bi := range buckets { + bucket := bi.Name + if _, err := getReplicationConfig(ctx, bucket); err != nil { + res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{ + ErrDetail: err.Error(), + Bucket: bucket, + Status: ResyncFailed.String(), + }) + continue + } + // mark remote target for this deployment with the new reset id + tgtArn := globalBucketTargetSys.getRemoteARNForPeer(bucket, peer) + if tgtArn == "" { + res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{ + ErrDetail: fmt.Sprintf("no valid remote target found for this peer %s (%s)", peer.Name, peer.DeploymentID), + Bucket: bucket, + }) + continue + } + target := globalBucketTargetSys.GetRemoteBucketTargetByArn(ctx, bucket, tgtArn) + target.ResetBeforeDate = UTCNow() + target.ResetID = rs.ResyncID + if err = globalBucketTargetSys.SetTarget(ctx, bucket, &target, true); err != nil { + res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{ + ErrDetail: err.Error(), + Bucket: bucket, + }) + continue + } + targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket) + if err != nil { + res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{ + ErrDetail: err.Error(), + Bucket: bucket, + }) + continue + } + tgtBytes, err := json.Marshal(&targets) + if err != nil { + res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{ + ErrDetail: err.Error(), + Bucket: bucket, + }) + continue + } + if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketTargetsFile, tgtBytes); err != nil { + res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{ + ErrDetail: err.Error(), + Bucket: bucket, + }) + continue + } + if err := globalReplicationPool.resyncer.start(ctx, objAPI, resyncOpts{ + bucket: bucket, + arn: tgtArn, + resyncID: rs.ResyncID, + }); err != nil { + res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{ + ErrDetail: err.Error(), + Bucket: bucket, + }) + continue + } + } + res = madmin.SRResyncOpStatus{ + Status: ResyncStarted.String(), + OpType: "start", + ResyncID: rs.ResyncID, + } + if len(res.Buckets) > 0 { + res.ErrDetail = "partial failure in starting site resync" + } + return res, nil +} + +// cancelResync stops an ongoing site level resync for the peer specified. +func (c *SiteReplicationSys) cancelResync(ctx context.Context, objAPI ObjectLayer, peer madmin.PeerInfo) (res madmin.SRResyncOpStatus, err error) { + if !c.isEnabled() { + return res, errSRNotEnabled + } + if objAPI == nil { + return res, errSRObjectLayerNotReady + } + if peer.DeploymentID == globalDeploymentID { + return res, errSRResyncToSelf + } + if _, ok := c.state.Peers[peer.DeploymentID]; !ok { + return res, errSRPeerNotFound + } + rs, err := globalSiteResyncMetrics.siteStatus(ctx, objAPI, peer.DeploymentID) + if err != nil { + return res, err + } + switch rs.Status { + case ResyncCanceled: + return res, errSRResyncCanceled + case ResyncCompleted, NoResync: + return res, errSRNoResync + } + + res = madmin.SRResyncOpStatus{ + Status: rs.Status.String(), + OpType: "cancel", + ResyncID: rs.ResyncID, + } + switch rs.Status { + case ResyncCanceled: + return res, errSRResyncCanceled + case ResyncCompleted, NoResync: + return res, errSRNoResync + } + targets := globalBucketTargetSys.ListTargets(ctx, "", string(madmin.ReplicationService)) + // clear the remote target resetID set while initiating resync to stop replication + for _, t := range targets { + if t.ResetID == rs.ResyncID { + // get tgt with credentials + tgt := globalBucketTargetSys.GetRemoteBucketTargetByArn(ctx, t.SourceBucket, t.Arn) + tgt.ResetID = "" + bucket := t.SourceBucket + if err = globalBucketTargetSys.SetTarget(ctx, bucket, &tgt, true); err != nil { + res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{ + ErrDetail: err.Error(), + Bucket: bucket, + }) + continue + } + targets, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket) + if err != nil { + res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{ + ErrDetail: err.Error(), + Bucket: bucket, + }) + continue + } + tgtBytes, err := json.Marshal(&targets) + if err != nil { + res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{ + ErrDetail: err.Error(), + Bucket: bucket, + }) + continue + } + if _, err = globalBucketMetadataSys.Update(ctx, bucket, bucketTargetsFile, tgtBytes); err != nil { + res.Buckets = append(res.Buckets, madmin.ResyncBucketStatus{ + ErrDetail: err.Error(), + Bucket: bucket, + }) + continue + } + // update resync state for the bucket + globalReplicationPool.resyncer.Lock() + m, ok := globalReplicationPool.resyncer.statusMap[bucket] + if !ok { + m = newBucketResyncStatus(bucket) + } + if st, ok := m.TargetsMap[t.Arn]; ok { + st.LastUpdate = UTCNow() + st.ResyncStatus = ResyncCanceled + m.TargetsMap[t.Arn] = st + m.LastUpdate = UTCNow() + } + globalReplicationPool.resyncer.statusMap[bucket] = m + globalReplicationPool.resyncer.Unlock() + } + } + + rs.Status = ResyncCanceled + rs.LastUpdate = UTCNow() + if err := saveSiteResyncMetadata(ctx, rs, objAPI); err != nil { + return res, err + } + + globalSiteResyncMetrics.updateState(rs) + + res.Status = rs.Status.String() + return res, nil +} + +const ( + siteResyncMetaFormat = 1 + siteResyncMetaVersionV1 = 1 + siteResyncMetaVersion = siteResyncMetaVersionV1 + siteResyncSaveInterval = 10 * time.Second +) + +func newSiteResyncStatus(dID string, buckets []BucketInfo) SiteResyncStatus { + now := UTCNow() + s := SiteResyncStatus{ + Version: siteResyncMetaVersion, + Status: ResyncStarted, + DeplID: dID, + TotBuckets: len(buckets), + BucketStatuses: make(map[string]ResyncStatusType), + } + for _, bi := range buckets { + s.BucketStatuses[bi.Name] = ResyncPending + } + s.ResyncID = mustGetUUID() + s.StartTime = now + s.LastUpdate = now + return s +} + +// load site resync metadata from disk +func loadSiteResyncMetadata(ctx context.Context, objAPI ObjectLayer, dID string) (rs SiteResyncStatus, e error) { + data, err := readConfig(GlobalContext, objAPI, getSRResyncFilePath(dID)) + if err != nil { + return rs, err + } + if len(data) == 0 { + // Seems to be empty. + return rs, nil + } + if len(data) <= 4 { + return rs, fmt.Errorf("site resync: no data") + } + // Read resync meta header + switch binary.LittleEndian.Uint16(data[0:2]) { + case siteResyncMetaFormat: + default: + return rs, fmt.Errorf("resyncMeta: unknown format: %d", binary.LittleEndian.Uint16(data[0:2])) + } + switch binary.LittleEndian.Uint16(data[2:4]) { + case siteResyncMetaVersion: + default: + return rs, fmt.Errorf("resyncMeta: unknown version: %d", binary.LittleEndian.Uint16(data[2:4])) + } + // OK, parse data. + if _, err = rs.UnmarshalMsg(data[4:]); err != nil { + return rs, err + } + + switch rs.Version { + case siteResyncMetaVersionV1: + default: + return rs, fmt.Errorf("unexpected resync meta version: %d", rs.Version) + } + return rs, nil +} + +// save resync status of peer to resync/depl-id.meta +func saveSiteResyncMetadata(ctx context.Context, ss SiteResyncStatus, objectAPI ObjectLayer) error { + data := make([]byte, 4, ss.Msgsize()+4) + + // Initialize the resync meta header. + binary.LittleEndian.PutUint16(data[0:2], siteResyncMetaFormat) + binary.LittleEndian.PutUint16(data[2:4], siteResyncMetaVersion) + + buf, err := ss.MarshalMsg(data) + if err != nil { + return err + } + return saveConfig(ctx, objectAPI, getSRResyncFilePath(ss.DeplID), buf) +} + +func getSRResyncFilePath(dID string) string { + return pathJoin(siteResyncPrefix, dID+".meta") +}