mirror of https://github.com/minio/minio.git
193 lines
5.3 KiB
Go
193 lines
5.3 KiB
Go
/*
|
|
* MinIO Cloud Storage, (C) 2021 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/minio/minio/pkg/bucket/replication"
|
|
)
|
|
|
|
func (b *BucketReplicationStats) hasReplicationUsage() bool {
|
|
return b.PendingSize > 0 ||
|
|
b.FailedSize > 0 ||
|
|
b.ReplicatedSize > 0 ||
|
|
b.ReplicaSize > 0 ||
|
|
b.PendingCount > 0 ||
|
|
b.FailedCount > 0
|
|
}
|
|
|
|
// ReplicationStats holds the global in-memory replication stats
|
|
type ReplicationStats struct {
|
|
sync.RWMutex
|
|
Cache map[string]*BucketReplicationStats
|
|
UsageCache map[string]*BucketReplicationStats // initial usage
|
|
}
|
|
|
|
// Delete deletes in-memory replication statistics for a bucket.
|
|
func (r *ReplicationStats) Delete(bucket string) {
|
|
if r == nil {
|
|
return
|
|
}
|
|
|
|
r.Lock()
|
|
defer r.Unlock()
|
|
delete(r.Cache, bucket)
|
|
delete(r.UsageCache, bucket)
|
|
|
|
}
|
|
|
|
// Update updates in-memory replication statistics with new values.
|
|
func (r *ReplicationStats) Update(bucket string, n int64, status, prevStatus replication.StatusType, opType replication.Type) {
|
|
if r == nil {
|
|
return
|
|
}
|
|
|
|
r.RLock()
|
|
b, ok := r.Cache[bucket]
|
|
if !ok {
|
|
b = &BucketReplicationStats{}
|
|
}
|
|
r.RUnlock()
|
|
switch status {
|
|
case replication.Pending:
|
|
if opType == replication.ObjectReplicationType {
|
|
atomic.AddUint64(&b.PendingSize, uint64(n))
|
|
}
|
|
atomic.AddUint64(&b.PendingCount, 1)
|
|
case replication.Completed:
|
|
switch prevStatus { // adjust counters based on previous state
|
|
case replication.Pending:
|
|
atomic.AddUint64(&b.PendingCount, ^uint64(0))
|
|
case replication.Failed:
|
|
atomic.AddUint64(&b.FailedCount, ^uint64(0))
|
|
}
|
|
if opType == replication.ObjectReplicationType {
|
|
atomic.AddUint64(&b.ReplicatedSize, uint64(n))
|
|
switch prevStatus {
|
|
case replication.Pending:
|
|
atomic.AddUint64(&b.PendingSize, ^uint64(n-1))
|
|
case replication.Failed:
|
|
atomic.AddUint64(&b.FailedSize, ^uint64(n-1))
|
|
}
|
|
}
|
|
case replication.Failed:
|
|
// count failures only once - not on every retry
|
|
switch prevStatus { // adjust counters based on previous state
|
|
case replication.Pending:
|
|
atomic.AddUint64(&b.PendingCount, ^uint64(0))
|
|
}
|
|
if opType == replication.ObjectReplicationType {
|
|
if prevStatus == replication.Pending {
|
|
atomic.AddUint64(&b.FailedSize, uint64(n))
|
|
atomic.AddUint64(&b.FailedCount, 1)
|
|
atomic.AddUint64(&b.PendingSize, ^uint64(n-1))
|
|
}
|
|
}
|
|
case replication.Replica:
|
|
if opType == replication.ObjectReplicationType {
|
|
atomic.AddUint64(&b.ReplicaSize, uint64(n))
|
|
}
|
|
}
|
|
r.Lock()
|
|
r.Cache[bucket] = b
|
|
r.Unlock()
|
|
}
|
|
|
|
// GetInitialUsage get replication metrics available at the time of cluster initialization
|
|
func (r *ReplicationStats) GetInitialUsage(bucket string) BucketReplicationStats {
|
|
if r == nil {
|
|
return BucketReplicationStats{}
|
|
}
|
|
|
|
r.RLock()
|
|
defer r.RUnlock()
|
|
|
|
st, ok := r.UsageCache[bucket]
|
|
if !ok {
|
|
return BucketReplicationStats{}
|
|
}
|
|
return BucketReplicationStats{
|
|
PendingSize: atomic.LoadUint64(&st.PendingSize),
|
|
FailedSize: atomic.LoadUint64(&st.FailedSize),
|
|
ReplicatedSize: atomic.LoadUint64(&st.ReplicatedSize),
|
|
ReplicaSize: atomic.LoadUint64(&st.ReplicaSize),
|
|
PendingCount: atomic.LoadUint64(&st.PendingCount),
|
|
FailedCount: atomic.LoadUint64(&st.FailedCount),
|
|
}
|
|
}
|
|
|
|
// Get replication metrics for a bucket from this node since this node came up.
|
|
func (r *ReplicationStats) Get(bucket string) BucketReplicationStats {
|
|
if r == nil {
|
|
return BucketReplicationStats{}
|
|
}
|
|
|
|
r.RLock()
|
|
defer r.RUnlock()
|
|
|
|
st, ok := r.Cache[bucket]
|
|
if !ok {
|
|
return BucketReplicationStats{}
|
|
}
|
|
|
|
return BucketReplicationStats{
|
|
PendingSize: atomic.LoadUint64(&st.PendingSize),
|
|
FailedSize: atomic.LoadUint64(&st.FailedSize),
|
|
ReplicatedSize: atomic.LoadUint64(&st.ReplicatedSize),
|
|
ReplicaSize: atomic.LoadUint64(&st.ReplicaSize),
|
|
PendingCount: atomic.LoadUint64(&st.PendingCount),
|
|
FailedCount: atomic.LoadUint64(&st.FailedCount),
|
|
}
|
|
}
|
|
|
|
// NewReplicationStats initialize in-memory replication statistics
|
|
func NewReplicationStats(ctx context.Context, objectAPI ObjectLayer) *ReplicationStats {
|
|
st := &ReplicationStats{
|
|
Cache: make(map[string]*BucketReplicationStats),
|
|
UsageCache: make(map[string]*BucketReplicationStats),
|
|
}
|
|
|
|
dataUsageInfo, err := loadDataUsageFromBackend(ctx, objectAPI)
|
|
if err != nil {
|
|
return st
|
|
}
|
|
|
|
// data usage has not captured any data yet.
|
|
if dataUsageInfo.LastUpdate.IsZero() {
|
|
return st
|
|
}
|
|
|
|
for bucket, usage := range dataUsageInfo.BucketsUsage {
|
|
b := &BucketReplicationStats{
|
|
PendingSize: usage.ReplicationPendingSize,
|
|
FailedSize: usage.ReplicationFailedSize,
|
|
ReplicatedSize: usage.ReplicatedSize,
|
|
ReplicaSize: usage.ReplicaSize,
|
|
PendingCount: usage.ReplicationPendingCount,
|
|
FailedCount: usage.ReplicationFailedCount,
|
|
}
|
|
if b.hasReplicationUsage() {
|
|
st.UsageCache[bucket] = b
|
|
}
|
|
}
|
|
|
|
return st
|
|
}
|