mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
metacache: Add option for life extension (#10837)
Add `MINIO_API_EXTEND_LIST_CACHE_LIFE` that will extend the life of generated caches for a while. This changes caches to remain valid until no updates have been received for the specified time plus a fixed margin. This also changes the caches from being invalidated when the *first* set finishes until the *last* set has finished plus the specified time has passed.
This commit is contained in:
parent
b72cac4cf3
commit
0724205f35
@ -35,6 +35,7 @@ const (
|
||||
apiCorsAllowOrigin = "cors_allow_origin"
|
||||
apiRemoteTransportDeadline = "remote_transport_deadline"
|
||||
apiListQuorum = "list_quorum"
|
||||
apiExtendListCacheLife = "extend_list_cache_life"
|
||||
|
||||
EnvAPIRequestsMax = "MINIO_API_REQUESTS_MAX"
|
||||
EnvAPIRequestsDeadline = "MINIO_API_REQUESTS_DEADLINE"
|
||||
@ -42,6 +43,7 @@ const (
|
||||
EnvAPICorsAllowOrigin = "MINIO_API_CORS_ALLOW_ORIGIN"
|
||||
EnvAPIRemoteTransportDeadline = "MINIO_API_REMOTE_TRANSPORT_DEADLINE"
|
||||
EnvAPIListQuorum = "MINIO_API_LIST_QUORUM"
|
||||
EnvAPIExtendListCacheLife = "MINIO_API_EXTEND_LIST_CACHE_LIFE"
|
||||
EnvAPISecureCiphers = "MINIO_API_SECURE_CIPHERS"
|
||||
)
|
||||
|
||||
@ -78,6 +80,10 @@ var (
|
||||
Key: apiListQuorum,
|
||||
Value: "optimal",
|
||||
},
|
||||
config.KV{
|
||||
Key: apiExtendListCacheLife,
|
||||
Value: "0s",
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
@ -89,6 +95,7 @@ type Config struct {
|
||||
CorsAllowOrigin []string `json:"cors_allow_origin"`
|
||||
RemoteTransportDeadline time.Duration `json:"remote_transport_deadline"`
|
||||
ListQuorum string `json:"list_strict_quorum"`
|
||||
ExtendListLife time.Duration `json:"extend_list_cache_life"`
|
||||
}
|
||||
|
||||
// UnmarshalJSON - Validate SS and RRS parity when unmarshalling JSON.
|
||||
@ -163,6 +170,11 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) {
|
||||
return cfg, errors.New("invalid value for list strict quorum")
|
||||
}
|
||||
|
||||
listLife, err := time.ParseDuration(env.Get(EnvAPIExtendListCacheLife, kvs.Get(apiExtendListCacheLife)))
|
||||
if err != nil {
|
||||
return cfg, err
|
||||
}
|
||||
|
||||
return Config{
|
||||
RequestsMax: requestsMax,
|
||||
RequestsDeadline: requestsDeadline,
|
||||
@ -170,5 +182,6 @@ func LookupConfig(kvs config.KVS) (cfg Config, err error) {
|
||||
CorsAllowOrigin: corsAllowOrigin,
|
||||
RemoteTransportDeadline: remoteTransportDeadline,
|
||||
ListQuorum: listQuorum,
|
||||
ExtendListLife: listLife,
|
||||
}, nil
|
||||
}
|
||||
|
@ -33,6 +33,7 @@ type apiConfig struct {
|
||||
requestsPool chan struct{}
|
||||
clusterDeadline time.Duration
|
||||
listQuorum int
|
||||
extendListLife time.Duration
|
||||
corsAllowOrigins []string
|
||||
}
|
||||
|
||||
@ -65,6 +66,7 @@ func (t *apiConfig) init(cfg api.Config, setDriveCount int) {
|
||||
t.requestsPool = make(chan struct{}, apiRequestsMaxPerNode)
|
||||
t.requestsDeadline = cfg.RequestsDeadline
|
||||
t.listQuorum = cfg.GetListQuorum()
|
||||
t.extendListLife = cfg.ExtendListLife
|
||||
}
|
||||
|
||||
func (t *apiConfig) getListQuorum() int {
|
||||
@ -74,6 +76,13 @@ func (t *apiConfig) getListQuorum() int {
|
||||
return t.listQuorum
|
||||
}
|
||||
|
||||
func (t *apiConfig) getExtendListLife() time.Duration {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
|
||||
return t.extendListLife
|
||||
}
|
||||
|
||||
func (t *apiConfig) getCorsAllowOrigins() []string {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
|
@ -209,12 +209,13 @@ func (b *bucketMetacache) findCache(o listPathOptions) metacache {
|
||||
}
|
||||
|
||||
var best metacache
|
||||
extend := globalAPIConfig.getExtendListLife()
|
||||
for _, cached := range b.caches {
|
||||
// Never return transient caches if there is no id.
|
||||
if b.transient {
|
||||
break
|
||||
}
|
||||
if cached.status == scanStateError || cached.dataVersion != metacacheStreamVersion {
|
||||
if cached.status == scanStateError || cached.status == scanStateNone || cached.dataVersion != metacacheStreamVersion {
|
||||
debugPrint("cache %s state or stream version mismatch", cached.id)
|
||||
continue
|
||||
}
|
||||
@ -242,15 +243,23 @@ func (b *bucketMetacache) findCache(o listPathOptions) metacache {
|
||||
// Non slash separator requires recursive.
|
||||
continue
|
||||
}
|
||||
if cached.ended.IsZero() && time.Since(cached.lastUpdate) > metacacheMaxRunningAge {
|
||||
if !cached.finished() && time.Since(cached.lastUpdate) > metacacheMaxRunningAge {
|
||||
debugPrint("cache %s not running, time: %v", cached.id, time.Since(cached.lastUpdate))
|
||||
// Abandoned
|
||||
continue
|
||||
}
|
||||
if !cached.ended.IsZero() && cached.endedCycle <= o.OldestCycle {
|
||||
debugPrint("cache %s ended and cycle (%v) <= oldest allowed (%v)", cached.id, cached.endedCycle, o.OldestCycle)
|
||||
// If scan has ended the oldest requested must be less.
|
||||
continue
|
||||
|
||||
if cached.finished() && cached.endedCycle <= o.OldestCycle {
|
||||
if extend <= 0 {
|
||||
// If scan has ended the oldest requested must be less.
|
||||
debugPrint("cache %s ended and cycle (%v) <= oldest allowed (%v)", cached.id, cached.endedCycle, o.OldestCycle)
|
||||
continue
|
||||
}
|
||||
if time.Since(cached.lastUpdate) > metacacheMaxRunningAge+extend {
|
||||
// Cache ended within bloom cycle, but we can extend the life.
|
||||
debugPrint("cache %s ended (%v) and beyond extended life (%v)", cached.id, cached.lastUpdate, extend+metacacheMaxRunningAge)
|
||||
continue
|
||||
}
|
||||
}
|
||||
if cached.started.Before(best.started) {
|
||||
debugPrint("cache %s disregarded - we have a better", cached.id)
|
||||
|
@ -79,6 +79,9 @@ func (m *metacache) worthKeeping(currentCycle uint64) bool {
|
||||
case cache.finished() && cache.startedCycle > currentCycle:
|
||||
// Cycle is somehow bigger.
|
||||
return false
|
||||
case cache.finished() && time.Since(cache.lastHandout) > 48*time.Hour:
|
||||
// Keep only for 2 days. Fallback if crawler is clogged.
|
||||
return false
|
||||
case cache.finished() && currentCycle >= dataUsageUpdateDirCycles && cache.startedCycle < currentCycle-dataUsageUpdateDirCycles:
|
||||
// Cycle is too old to be valuable.
|
||||
return false
|
||||
|
@ -21,7 +21,7 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
var metaCacheTestsetTimestamp, _ = time.Parse(time.RFC822Z, time.RFC822Z)
|
||||
var metaCacheTestsetTimestamp = time.Now()
|
||||
|
||||
var metaCacheTestset = []metacache{
|
||||
0: {
|
||||
@ -80,10 +80,10 @@ var metaCacheTestset = []metacache{
|
||||
status: scanStateError,
|
||||
fileNotFound: false,
|
||||
error: "an error lol",
|
||||
started: metaCacheTestsetTimestamp.Add(time.Minute),
|
||||
ended: metaCacheTestsetTimestamp.Add(2 * time.Minute),
|
||||
lastUpdate: metaCacheTestsetTimestamp.Add(2 * time.Minute),
|
||||
lastHandout: metaCacheTestsetTimestamp,
|
||||
started: metaCacheTestsetTimestamp.Add(-20 * time.Minute),
|
||||
ended: metaCacheTestsetTimestamp.Add(-20 * time.Minute),
|
||||
lastUpdate: metaCacheTestsetTimestamp.Add(-20 * time.Minute),
|
||||
lastHandout: metaCacheTestsetTimestamp.Add(-20 * time.Minute),
|
||||
startedCycle: 10,
|
||||
endedCycle: 10,
|
||||
dataVersion: metacacheStreamVersion,
|
||||
@ -152,6 +152,22 @@ var metaCacheTestset = []metacache{
|
||||
endedCycle: 0,
|
||||
dataVersion: metacacheStreamVersion,
|
||||
},
|
||||
8: {
|
||||
id: "case-8-finished-a-week-ago",
|
||||
bucket: "bucket",
|
||||
root: "folder/finished",
|
||||
recursive: false,
|
||||
status: scanStateSuccess,
|
||||
fileNotFound: false,
|
||||
error: "",
|
||||
started: metaCacheTestsetTimestamp.Add(-7 * 24 * time.Hour),
|
||||
ended: metaCacheTestsetTimestamp.Add(-7 * 24 * time.Hour),
|
||||
lastUpdate: metaCacheTestsetTimestamp.Add(-7 * 24 * time.Hour),
|
||||
lastHandout: metaCacheTestsetTimestamp.Add(-7 * 24 * time.Hour),
|
||||
startedCycle: 10,
|
||||
endedCycle: 10,
|
||||
dataVersion: metacacheStreamVersion,
|
||||
},
|
||||
}
|
||||
|
||||
func Test_baseDirFromPrefix(t *testing.T) {
|
||||
@ -222,7 +238,7 @@ func Test_metacache_canBeReplacedBy(t *testing.T) {
|
||||
endedCycle: 10,
|
||||
dataVersion: metacacheStreamVersion,
|
||||
}
|
||||
wantResults := []bool{0: true, 1: true, 2: true, 3: true, 4: true, 5: false, 6: true, 7: false}
|
||||
wantResults := []bool{0: true, 1: true, 2: true, 3: true, 4: true, 5: false, 6: true, 7: false, 8: false}
|
||||
|
||||
for i, tt := range metaCacheTestset {
|
||||
t.Run(tt.id, func(t *testing.T) {
|
||||
@ -234,7 +250,8 @@ func Test_metacache_canBeReplacedBy(t *testing.T) {
|
||||
}
|
||||
// Add an hour, otherwise it will never be replaced.
|
||||
// We operated on a copy.
|
||||
tt.lastHandout.Add(-2 * time.Hour)
|
||||
tt.lastHandout = tt.lastHandout.Add(-2 * time.Hour)
|
||||
tt.lastUpdate = tt.lastHandout.Add(-2 * time.Hour)
|
||||
got := tt.canBeReplacedBy(&testAgainst)
|
||||
if got != want {
|
||||
t.Errorf("#%d: want %v, got %v", i, want, got)
|
||||
@ -244,7 +261,7 @@ func Test_metacache_canBeReplacedBy(t *testing.T) {
|
||||
}
|
||||
|
||||
func Test_metacache_finished(t *testing.T) {
|
||||
wantResults := []bool{0: true, 1: true, 2: true, 3: true, 4: false, 5: true, 6: true, 7: false}
|
||||
wantResults := []bool{0: true, 1: true, 2: true, 3: true, 4: false, 5: true, 6: true, 7: false, 8: true}
|
||||
|
||||
for i, tt := range metaCacheTestset {
|
||||
t.Run(tt.id, func(t *testing.T) {
|
||||
@ -264,7 +281,7 @@ func Test_metacache_finished(t *testing.T) {
|
||||
}
|
||||
|
||||
func Test_metacache_worthKeeping(t *testing.T) {
|
||||
wantResults := []bool{0: true, 1: true, 2: true, 3: false, 4: false, 5: true, 6: false, 7: false}
|
||||
wantResults := []bool{0: true, 1: true, 2: true, 3: false, 4: false, 5: true, 6: false, 7: false, 8: false}
|
||||
|
||||
for i, tt := range metaCacheTestset {
|
||||
t.Run(tt.id, func(t *testing.T) {
|
||||
|
Loading…
Reference in New Issue
Block a user