mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
Save resync status in the backend using a last update timestamp (#15638)
Currently, there is a short time window where the code is allowed to save the status of a replication resync. Currently, the window is `now.Sub(st.EndTime) <= resyncTimeInterval`. Also, any failure to write in the backend disks is not retried. Refactor the code a little bit to rely on the last timestamp of a successful write of the resync status of any given bucket in the backend disks.
This commit is contained in:
parent
10e75116ef
commit
cf52691959
@ -1987,25 +1987,32 @@ func (p *ReplicationPool) updateResyncStatus(ctx context.Context, objectAPI Obje
|
|||||||
resyncTimer := time.NewTimer(resyncTimeInterval)
|
resyncTimer := time.NewTimer(resyncTimeInterval)
|
||||||
defer resyncTimer.Stop()
|
defer resyncTimer.Stop()
|
||||||
|
|
||||||
|
// For each bucket name, store the last timestamp of the
|
||||||
|
// successful save of replication status in the backend disks.
|
||||||
|
lastResyncStatusSave := make(map[string]time.Time)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-resyncTimer.C:
|
case <-resyncTimer.C:
|
||||||
now := UTCNow()
|
|
||||||
p.resyncState.RLock()
|
p.resyncState.RLock()
|
||||||
for bucket, brs := range p.resyncState.statusMap {
|
for bucket, brs := range p.resyncState.statusMap {
|
||||||
var updt bool
|
var updt bool
|
||||||
|
// Save the replication status if one resync to any bucket target is still not finished
|
||||||
for _, st := range brs.TargetsMap {
|
for _, st := range brs.TargetsMap {
|
||||||
// if resync in progress or just ended, needs to save to disk
|
if st.EndTime.Equal(timeSentinel) {
|
||||||
if st.EndTime.Equal(timeSentinel) || now.Sub(st.EndTime) <= resyncTimeInterval {
|
|
||||||
updt = true
|
updt = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Save the replication status if a new stats update is found and not saved in the backend yet
|
||||||
|
if brs.LastUpdate.After(lastResyncStatusSave[bucket]) {
|
||||||
|
updt = true
|
||||||
|
}
|
||||||
if updt {
|
if updt {
|
||||||
brs.LastUpdate = now
|
|
||||||
if err := saveResyncStatus(ctx, bucket, brs, objectAPI); err != nil {
|
if err := saveResyncStatus(ctx, bucket, brs, objectAPI); err != nil {
|
||||||
logger.LogIf(ctx, fmt.Errorf("Could not save resync metadata to drive for %s - %w", bucket, err))
|
logger.LogIf(ctx, fmt.Errorf("Could not save resync metadata to drive for %s - %w", bucket, err))
|
||||||
continue
|
} else {
|
||||||
|
lastResyncStatusSave[bucket] = brs.LastUpdate
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2031,6 +2038,7 @@ func resyncBucket(ctx context.Context, bucket, arn string, heal bool, objectAPI
|
|||||||
st.EndTime = UTCNow()
|
st.EndTime = UTCNow()
|
||||||
st.ResyncStatus = resyncStatus
|
st.ResyncStatus = resyncStatus
|
||||||
m.TargetsMap[arn] = st
|
m.TargetsMap[arn] = st
|
||||||
|
m.LastUpdate = UTCNow()
|
||||||
globalReplicationPool.resyncState.statusMap[bucket] = m
|
globalReplicationPool.resyncState.statusMap[bucket] = m
|
||||||
globalReplicationPool.resyncState.Unlock()
|
globalReplicationPool.resyncState.Unlock()
|
||||||
}()
|
}()
|
||||||
@ -2140,6 +2148,7 @@ func resyncBucket(ctx context.Context, bucket, arn string, heal bool, objectAPI
|
|||||||
st.ReplicatedSize += roi.Size
|
st.ReplicatedSize += roi.Size
|
||||||
}
|
}
|
||||||
m.TargetsMap[arn] = st
|
m.TargetsMap[arn] = st
|
||||||
|
m.LastUpdate = UTCNow()
|
||||||
globalReplicationPool.resyncState.statusMap[bucket] = m
|
globalReplicationPool.resyncState.statusMap[bucket] = m
|
||||||
globalReplicationPool.resyncState.Unlock()
|
globalReplicationPool.resyncState.Unlock()
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user