mirror of
https://github.com/minio/minio.git
synced 2025-02-28 05:49:16 -05:00
fix: prioritized latest buckets for crawler to finish the scans faster (#11115)
crawler should only ListBuckets once not for each serverPool, buckets are same across all pools, across sets and ListBuckets always returns an unified view, once list buckets returns sort it by create time to scan the latest buckets earlier with the assumption that latest buckets would have lesser content than older buckets allowing them to be scanned faster and also to be able to provide more closer to latest view.
This commit is contained in:
parent
d674263eb7
commit
c606c76323
@ -21,6 +21,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -873,6 +874,11 @@ func (h *healSequence) healBuckets(objAPI ObjectLayer, bucketsOnly bool) error {
|
|||||||
return errFnHealFromAPIErr(h.ctx, err)
|
return errFnHealFromAPIErr(h.ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Heal latest buckets first.
|
||||||
|
sort.Slice(buckets, func(i, j int) bool {
|
||||||
|
return buckets[i].Created.After(buckets[j].Created)
|
||||||
|
})
|
||||||
|
|
||||||
for _, bucket := range buckets {
|
for _, bucket := range buckets {
|
||||||
if err = h.healBucket(objAPI, bucket.Name, bucketsOnly); err != nil {
|
if err = h.healBucket(objAPI, bucket.Name, bucketsOnly); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/dustin/go-humanize"
|
"github.com/dustin/go-humanize"
|
||||||
@ -161,6 +162,12 @@ wait:
|
|||||||
}
|
}
|
||||||
|
|
||||||
buckets, _ := z.ListBuckets(ctx)
|
buckets, _ := z.ListBuckets(ctx)
|
||||||
|
|
||||||
|
// Heal latest buckets first.
|
||||||
|
sort.Slice(buckets, func(i, j int) bool {
|
||||||
|
return buckets[i].Created.After(buckets[j].Created)
|
||||||
|
})
|
||||||
|
|
||||||
for i, setMap := range erasureSetInZoneDisksToHeal {
|
for i, setMap := range erasureSetInZoneDisksToHeal {
|
||||||
for setIndex, disks := range setMap {
|
for setIndex, disks := range setMap {
|
||||||
for _, disk := range disks {
|
for _, disk := range disks {
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
@ -322,23 +323,19 @@ func (z *erasureServerPools) CrawlAndGetDataUsage(ctx context.Context, bf *bloom
|
|||||||
var mu sync.Mutex
|
var mu sync.Mutex
|
||||||
var results []dataUsageCache
|
var results []dataUsageCache
|
||||||
var firstErr error
|
var firstErr error
|
||||||
var knownBuckets = make(map[string]struct{}) // used to deduplicate buckets.
|
|
||||||
var allBuckets []BucketInfo
|
allBuckets, err := z.ListBuckets(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Crawl latest allBuckets first.
|
||||||
|
sort.Slice(allBuckets, func(i, j int) bool {
|
||||||
|
return allBuckets[i].Created.After(allBuckets[j].Created)
|
||||||
|
})
|
||||||
|
|
||||||
// Collect for each set in serverPools.
|
// Collect for each set in serverPools.
|
||||||
for _, z := range z.serverPools {
|
for _, z := range z.serverPools {
|
||||||
buckets, err := z.ListBuckets(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// Add new buckets.
|
|
||||||
for _, b := range buckets {
|
|
||||||
if _, ok := knownBuckets[b.Name]; ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
allBuckets = append(allBuckets, b)
|
|
||||||
knownBuckets[b.Name] = struct{}{}
|
|
||||||
}
|
|
||||||
for _, erObj := range z.sets {
|
for _, erObj := range z.sets {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
results = append(results, dataUsageCache{})
|
results = append(results, dataUsageCache{})
|
||||||
@ -355,7 +352,7 @@ func (z *erasureServerPools) CrawlAndGetDataUsage(ctx context.Context, bf *bloom
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
// Start crawler. Blocks until done.
|
// Start crawler. Blocks until done.
|
||||||
err := erObj.crawlAndGetDataUsage(ctx, buckets, bf, updates)
|
err := erObj.crawlAndGetDataUsage(ctx, allBuckets, bf, updates)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, err)
|
logger.LogIf(ctx, err)
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
|
@ -722,10 +722,14 @@ func (s *erasureSets) ListBuckets(ctx context.Context) (buckets []BucketInfo, er
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, v := range healBuckets {
|
for _, v := range healBuckets {
|
||||||
listBuckets = append(listBuckets, BucketInfo(v))
|
listBuckets = append(listBuckets, BucketInfo(v))
|
||||||
}
|
}
|
||||||
sort.Sort(byBucketName(listBuckets))
|
|
||||||
|
sort.Slice(listBuckets, func(i, j int) bool {
|
||||||
|
return listBuckets[i].Name < listBuckets[j].Name
|
||||||
|
})
|
||||||
|
|
||||||
return listBuckets, nil
|
return listBuckets, nil
|
||||||
}
|
}
|
||||||
|
@ -531,7 +531,9 @@ func (fs *FSObjects) ListBuckets(ctx context.Context) ([]BucketInfo, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Sort bucket infos by bucket name.
|
// Sort bucket infos by bucket name.
|
||||||
sort.Sort(byBucketName(bucketInfos))
|
sort.Slice(bucketInfos, func(i, j int) bool {
|
||||||
|
return bucketInfos[i].Name < bucketInfos[j].Name
|
||||||
|
})
|
||||||
|
|
||||||
// Succes.
|
// Succes.
|
||||||
return bucketInfos, nil
|
return bucketInfos, nil
|
||||||
|
@ -536,13 +536,6 @@ func getCompressedOffsets(objectInfo ObjectInfo, offset int64) (int64, int64) {
|
|||||||
return compressedOffset, offset - skipLength
|
return compressedOffset, offset - skipLength
|
||||||
}
|
}
|
||||||
|
|
||||||
// byBucketName is a collection satisfying sort.Interface.
|
|
||||||
type byBucketName []BucketInfo
|
|
||||||
|
|
||||||
func (d byBucketName) Len() int { return len(d) }
|
|
||||||
func (d byBucketName) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
|
|
||||||
func (d byBucketName) Less(i, j int) bool { return d[i].Name < d[j].Name }
|
|
||||||
|
|
||||||
// GetObjectReader is a type that wraps a reader with a lock to
|
// GetObjectReader is a type that wraps a reader with a lock to
|
||||||
// provide a ReadCloser interface that unlocks on Close()
|
// provide a ReadCloser interface that unlocks on Close()
|
||||||
type GetObjectReader struct {
|
type GetObjectReader struct {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user