mirror of
https://github.com/minio/minio.git
synced 2025-11-10 05:59:43 -05:00
feat: maintain in-memory tier stats for the last 24hrs (#13782)
This commit is contained in:
committed by
GitHub
parent
f4e373e0d2
commit
d2e5f01542
@@ -165,6 +165,9 @@ type transitionState struct {
|
||||
killCh chan struct{}
|
||||
|
||||
activeTasks int32
|
||||
|
||||
lastDayMu sync.RWMutex
|
||||
lastDayStats map[string]*lastDayTierStats
|
||||
}
|
||||
|
||||
func (t *transitionState) queueTransitionTask(oi ObjectInfo) {
|
||||
@@ -186,6 +189,7 @@ func newTransitionState(ctx context.Context, objAPI ObjectLayer) *transitionStat
|
||||
ctx: ctx,
|
||||
objAPI: objAPI,
|
||||
killCh: make(chan struct{}),
|
||||
lastDayStats: make(map[string]*lastDayTierStats),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -213,14 +217,46 @@ func (t *transitionState) worker(ctx context.Context, objectAPI ObjectLayer) {
|
||||
return
|
||||
}
|
||||
atomic.AddInt32(&t.activeTasks, 1)
|
||||
if err := transitionObject(ctx, objectAPI, oi); err != nil {
|
||||
var tier string
|
||||
var err error
|
||||
if tier, err = transitionObject(ctx, objectAPI, oi); err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Transition failed for %s/%s version:%s with %w", oi.Bucket, oi.Name, oi.VersionID, err))
|
||||
}
|
||||
atomic.AddInt32(&t.activeTasks, -1)
|
||||
|
||||
ts := tierStats{
|
||||
TotalSize: uint64(oi.Size),
|
||||
NumVersions: 1,
|
||||
}
|
||||
if oi.IsLatest {
|
||||
ts.NumObjects = 1
|
||||
}
|
||||
t.addLastDayStats(tier, ts)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *transitionState) addLastDayStats(tier string, ts tierStats) {
|
||||
t.lastDayMu.Lock()
|
||||
defer t.lastDayMu.Unlock()
|
||||
|
||||
if _, ok := t.lastDayStats[tier]; !ok {
|
||||
t.lastDayStats[tier] = &lastDayTierStats{}
|
||||
}
|
||||
t.lastDayStats[tier].addStats(ts)
|
||||
}
|
||||
|
||||
func (t *transitionState) getDailyAllTierStats() dailyAllTierStats {
|
||||
t.lastDayMu.RLock()
|
||||
defer t.lastDayMu.RUnlock()
|
||||
|
||||
res := make(dailyAllTierStats, len(t.lastDayStats))
|
||||
for tier, st := range t.lastDayStats {
|
||||
res[tier] = st.clone()
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// UpdateWorkers at the end of this function leaves n goroutines waiting for
|
||||
// transition tasks
|
||||
func (t *transitionState) UpdateWorkers(n int) {
|
||||
@@ -365,15 +401,16 @@ func genTransitionObjName(bucket string) (string, error) {
|
||||
// storage specified by the transition ARN, the metadata is left behind on source cluster and original content
|
||||
// is moved to the transition tier. Note that in the case of encrypted objects, entire encrypted stream is moved
|
||||
// to the transition tier without decrypting or re-encrypting.
|
||||
func transitionObject(ctx context.Context, objectAPI ObjectLayer, oi ObjectInfo) error {
|
||||
func transitionObject(ctx context.Context, objectAPI ObjectLayer, oi ObjectInfo) (string, error) {
|
||||
lc, err := globalLifecycleSys.Get(oi.Bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
return "", err
|
||||
}
|
||||
tier := lc.TransitionTier(oi.ToLifecycleOpts())
|
||||
opts := ObjectOptions{
|
||||
Transition: TransitionOptions{
|
||||
Status: lifecycle.TransitionPending,
|
||||
Tier: lc.TransitionTier(oi.ToLifecycleOpts()),
|
||||
Tier: tier,
|
||||
ETag: oi.ETag,
|
||||
},
|
||||
VersionID: oi.VersionID,
|
||||
@@ -381,7 +418,7 @@ func transitionObject(ctx context.Context, objectAPI ObjectLayer, oi ObjectInfo)
|
||||
VersionSuspended: globalBucketVersioningSys.Suspended(oi.Bucket),
|
||||
MTime: oi.ModTime,
|
||||
}
|
||||
return objectAPI.TransitionObject(ctx, oi.Bucket, oi.Name, opts)
|
||||
return tier, objectAPI.TransitionObject(ctx, oi.Bucket, oi.Name, opts)
|
||||
}
|
||||
|
||||
// getTransitionedObjectReader returns a reader from the transitioned tier.
|
||||
|
||||
@@ -1630,3 +1630,31 @@ func (sys *NotificationSys) ReloadSiteReplicationConfig(ctx context.Context) []e
|
||||
wg.Wait()
|
||||
return errs
|
||||
}
|
||||
|
||||
// GetLastDayTierStats fetches per-tier stats of the last 24hrs from all peers
|
||||
func (sys *NotificationSys) GetLastDayTierStats(ctx context.Context) dailyAllTierStats {
|
||||
errs := make([]error, len(sys.allPeerClients))
|
||||
lastDayStats := make([]dailyAllTierStats, len(sys.allPeerClients))
|
||||
var wg sync.WaitGroup
|
||||
for index := range sys.peerClients {
|
||||
if sys.peerClients[index] == nil {
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(index int) {
|
||||
defer wg.Done()
|
||||
lastDayStats[index], errs[index] = sys.peerClients[index].GetLastDayTierStats(ctx)
|
||||
}(index)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
merged := globalTransitionState.getDailyAllTierStats()
|
||||
for i, stat := range lastDayStats {
|
||||
if errs[i] != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("failed to fetch last day tier stats: %w", errs[i]))
|
||||
continue
|
||||
}
|
||||
merged.merge(stat)
|
||||
}
|
||||
return merged
|
||||
}
|
||||
|
||||
@@ -1075,3 +1075,18 @@ func (client *peerRESTClient) ReloadSiteReplicationConfig(ctx context.Context) e
|
||||
defer http.DrainBody(respBody)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (client *peerRESTClient) GetLastDayTierStats(ctx context.Context) (dailyAllTierStats, error) {
|
||||
var result map[string]lastDayTierStats
|
||||
respBody, err := client.callWithContext(context.Background(), peerRESTMethodGetLastDayTierStats, nil, nil, -1)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
defer http.DrainBody(respBody)
|
||||
|
||||
err = gob.NewDecoder(respBody).Decode(&result)
|
||||
if err != nil {
|
||||
return dailyAllTierStats{}, err
|
||||
}
|
||||
return dailyAllTierStats(result), nil
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@
|
||||
package cmd
|
||||
|
||||
const (
|
||||
peerRESTVersion = "v18" // Add LoadPoolMeta
|
||||
peerRESTVersion = "v19" // Add getlastdaytierstats
|
||||
peerRESTVersionPrefix = SlashSeparator + peerRESTVersion
|
||||
peerRESTPrefix = minioReservedBucketPath + "/peer"
|
||||
peerRESTPath = peerRESTPrefix + peerRESTVersionPrefix
|
||||
@@ -68,6 +68,7 @@ const (
|
||||
peerRESTMethodSpeedtest = "/speedtest"
|
||||
peerRESTMethodReloadSiteReplicationConfig = "/reloadsitereplicationconfig"
|
||||
peerRESTMethodReloadPoolMeta = "/reloadpoolmeta"
|
||||
peerRESTMethodGetLastDayTierStats = "/getlastdaytierstats"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -1327,6 +1327,23 @@ func (s *peerRESTServer) SpeedtestHandler(w http.ResponseWriter, r *http.Request
|
||||
logger.LogIf(r.Context(), gob.NewEncoder(w).Encode(result))
|
||||
}
|
||||
|
||||
// GetLastDayTierStatsHandler - returns per-tier stats in the last 24hrs for this server
|
||||
func (s *peerRESTServer) GetLastDayTierStatsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
s.writeErrorResponse(w, errors.New("invalid request"))
|
||||
return
|
||||
}
|
||||
|
||||
ctx := newContext(r, w, "GetLastDayTierStats")
|
||||
if objAPI := newObjectLayerFn(); objAPI == nil || globalTransitionState == nil {
|
||||
s.writeErrorResponse(w, errServerNotInitialized)
|
||||
return
|
||||
}
|
||||
|
||||
result := globalTransitionState.getDailyAllTierStats()
|
||||
logger.LogIf(ctx, gob.NewEncoder(w).Encode(result))
|
||||
}
|
||||
|
||||
// registerPeerRESTHandlers - register peer rest router.
|
||||
func registerPeerRESTHandlers(router *mux.Router) {
|
||||
server := &peerRESTServer{}
|
||||
@@ -1375,4 +1392,5 @@ func registerPeerRESTHandlers(router *mux.Router) {
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodSpeedtest).HandlerFunc(httpTraceHdrs(server.SpeedtestHandler))
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadSiteReplicationConfig).HandlerFunc(httpTraceHdrs(server.ReloadSiteReplicationConfigHandler))
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodReloadPoolMeta).HandlerFunc(httpTraceHdrs(server.ReloadPoolMetaHandler))
|
||||
subrouter.Methods(http.MethodPost).Path(peerRESTVersionPrefix + peerRESTMethodGetLastDayTierStats).HandlerFunc(httpTraceHdrs(server.GetLastDayTierStatsHandler))
|
||||
}
|
||||
|
||||
@@ -227,7 +227,11 @@ func (api adminAPIHandlers) TierStatsHandler(w http.ResponseWriter, r *http.Requ
|
||||
return
|
||||
}
|
||||
|
||||
data, err := json.Marshal(dui.tierStats())
|
||||
tierStats := dui.tierStats()
|
||||
dailyStats := globalNotificationSys.GetLastDayTierStats(ctx)
|
||||
tierStats = dailyStats.addToTierInfo(tierStats)
|
||||
|
||||
data, err := json.Marshal(tierStats)
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL)
|
||||
return
|
||||
|
||||
113
cmd/tier-last-day-stats.go
Normal file
113
cmd/tier-last-day-stats.go
Normal file
@@ -0,0 +1,113 @@
|
||||
// Copyright (c) 2022 MinIO, Inc.
|
||||
//
|
||||
// This file is part of MinIO Object Storage stack
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/minio/madmin-go"
|
||||
)
|
||||
|
||||
type lastDayTierStats struct {
|
||||
Bins [24]tierStats
|
||||
UpdatedAt time.Time
|
||||
}
|
||||
|
||||
func (l *lastDayTierStats) addStats(ts tierStats) {
|
||||
now := time.Now()
|
||||
l.forwardTo(now)
|
||||
|
||||
nowIdx := now.Hour()
|
||||
l.Bins[nowIdx] = l.Bins[nowIdx].add(ts)
|
||||
}
|
||||
|
||||
// forwardTo moves time to t, clearing entries between last update and t.
|
||||
func (l *lastDayTierStats) forwardTo(t time.Time) {
|
||||
since := t.Sub(l.UpdatedAt).Hours()
|
||||
// within the hour since l.UpdatedAt
|
||||
if since < 1 {
|
||||
return
|
||||
}
|
||||
|
||||
idx, lastIdx := t.Hour(), l.UpdatedAt.Hour()
|
||||
l.UpdatedAt = t
|
||||
|
||||
if since >= 24 {
|
||||
l.Bins = [24]tierStats{}
|
||||
return
|
||||
}
|
||||
|
||||
for ; lastIdx != idx; lastIdx++ {
|
||||
l.Bins[(lastIdx+1)%24] = tierStats{}
|
||||
}
|
||||
}
|
||||
|
||||
func (l *lastDayTierStats) clone() lastDayTierStats {
|
||||
clone := lastDayTierStats{
|
||||
UpdatedAt: l.UpdatedAt,
|
||||
}
|
||||
copy(clone.Bins[:], l.Bins[:])
|
||||
return clone
|
||||
}
|
||||
|
||||
func (l lastDayTierStats) merge(m lastDayTierStats) (merged lastDayTierStats) {
|
||||
cl := l.clone()
|
||||
cm := m.clone()
|
||||
|
||||
if cl.UpdatedAt.After(cm.UpdatedAt) {
|
||||
cm.forwardTo(cl.UpdatedAt)
|
||||
merged.UpdatedAt = cl.UpdatedAt
|
||||
} else {
|
||||
cl.forwardTo(cm.UpdatedAt)
|
||||
merged.UpdatedAt = cm.UpdatedAt
|
||||
}
|
||||
|
||||
for i := range cl.Bins {
|
||||
merged.Bins[i] = cl.Bins[i].add(cm.Bins[i])
|
||||
}
|
||||
|
||||
return merged
|
||||
}
|
||||
|
||||
// dailyAllTierStats is used to aggregate last day tier stats across MinIO servers
|
||||
type dailyAllTierStats map[string]lastDayTierStats
|
||||
|
||||
func (l dailyAllTierStats) merge(m dailyAllTierStats) {
|
||||
for tier, st := range m {
|
||||
l[tier] = l[tier].merge(st)
|
||||
}
|
||||
}
|
||||
|
||||
func (l dailyAllTierStats) addToTierInfo(tierInfos []madmin.TierInfo) []madmin.TierInfo {
|
||||
for i := range tierInfos {
|
||||
var lst lastDayTierStats
|
||||
var ok bool
|
||||
if lst, ok = l[tierInfos[i].Name]; !ok {
|
||||
continue
|
||||
}
|
||||
for hr, st := range lst.Bins {
|
||||
tierInfos[i].DailyStats.Bins[hr] = madmin.TierStats{
|
||||
TotalSize: st.TotalSize,
|
||||
NumVersions: st.NumVersions,
|
||||
NumObjects: st.NumObjects,
|
||||
}
|
||||
}
|
||||
tierInfos[i].DailyStats.UpdatedAt = lst.UpdatedAt
|
||||
}
|
||||
return tierInfos
|
||||
}
|
||||
Reference in New Issue
Block a user