mirror of
https://github.com/minio/minio.git
synced 2024-12-26 23:25:54 -05:00
a982baff27
Design: https://gist.github.com/klauspost/025c09b48ed4a1293c917cecfabdf21c Gist of improvements: * Cross-server caching and listing will use the same data across servers and requests. * Lists can be arbitrarily resumed at a constant speed. * Metadata for all files scanned is stored for streaming retrieval. * The existing bloom filters controlled by the crawler is used for validating caches. * Concurrent requests for the same data (or parts of it) will not spawn additional walkers. * Listing a subdirectory of an existing recursive cache will use the cache. * All listing operations are fully streamable so the number of objects in a bucket no longer dictates the amount of memory. * Listings can be handled by any server within the cluster. * Caches are cleaned up when out of date or superseded by a more recent one.
284 lines
6.8 KiB
Go
284 lines
6.8 KiB
Go
/*
|
|
* MinIO Cloud Storage, (C) 2020 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"math/rand"
|
|
"os"
|
|
"path"
|
|
"path/filepath"
|
|
"sync"
|
|
"testing"
|
|
|
|
"github.com/minio/minio/cmd/logger"
|
|
"github.com/minio/minio/cmd/logger/message/log"
|
|
)
|
|
|
|
type testLoggerI interface {
|
|
Helper()
|
|
Log(args ...interface{})
|
|
}
|
|
|
|
type testingLogger struct {
|
|
mu sync.Mutex
|
|
t testLoggerI
|
|
}
|
|
|
|
func (t *testingLogger) Endpoint() string {
|
|
return ""
|
|
}
|
|
|
|
func (t *testingLogger) String() string {
|
|
return ""
|
|
}
|
|
|
|
func (t *testingLogger) Validate() error {
|
|
return nil
|
|
}
|
|
|
|
func (t *testingLogger) Send(entry interface{}, errKind string) error {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
if t.t == nil {
|
|
return nil
|
|
}
|
|
e, ok := entry.(log.Entry)
|
|
if !ok {
|
|
return fmt.Errorf("unexpected log entry structure %#v", entry)
|
|
}
|
|
|
|
t.t.Helper()
|
|
t.t.Log(e.Level, ":", errKind, e.Message)
|
|
return nil
|
|
}
|
|
|
|
func addTestingLogging(t testLoggerI) func() {
|
|
tl := &testingLogger{t: t}
|
|
logger.AddTarget(tl)
|
|
return func() {
|
|
tl.mu.Lock()
|
|
defer tl.mu.Unlock()
|
|
tl.t = nil
|
|
}
|
|
}
|
|
|
|
func TestDataUpdateTracker(t *testing.T) {
|
|
dut := newDataUpdateTracker()
|
|
// Change some defaults.
|
|
dut.debug = testing.Verbose()
|
|
dut.input = make(chan string)
|
|
dut.save = make(chan struct{})
|
|
|
|
defer addTestingLogging(t)()
|
|
|
|
dut.Current.bf = dut.newBloomFilter()
|
|
|
|
tmpDir, err := ioutil.TempDir("", "TestDataUpdateTracker")
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
err = os.MkdirAll(filepath.Dir(filepath.Join(tmpDir, dataUpdateTrackerFilename)), os.ModePerm)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer os.RemoveAll(tmpDir)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
dut.start(ctx, tmpDir)
|
|
|
|
var tests = []struct {
|
|
in string
|
|
check []string // if not empty, check against these instead.
|
|
exist bool
|
|
}{
|
|
{
|
|
in: "bucket/directory/file.txt",
|
|
check: []string{"bucket", "bucket/", "/bucket", "bucket/directory", "bucket/directory/", "bucket/directory/file.txt", "/bucket/directory/file.txt"},
|
|
exist: true,
|
|
},
|
|
{
|
|
// System bucket
|
|
in: ".minio.sys/ignoreme/pls",
|
|
exist: false,
|
|
},
|
|
{
|
|
// Not a valid bucket
|
|
in: "./bucket/okfile.txt",
|
|
check: []string{"./bucket/okfile.txt", "/bucket/okfile.txt", "bucket/okfile.txt"},
|
|
exist: false,
|
|
},
|
|
{
|
|
// Not a valid bucket
|
|
in: "æ/okfile.txt",
|
|
check: []string{"æ/okfile.txt", "æ/okfile.txt", "æ"},
|
|
exist: false,
|
|
},
|
|
{
|
|
in: "/bucket2/okfile2.txt",
|
|
check: []string{"./bucket2/okfile2.txt", "/bucket2/okfile2.txt", "bucket2/okfile2.txt", "bucket2"},
|
|
exist: true,
|
|
},
|
|
{
|
|
in: "/bucket3/prefix/okfile2.txt",
|
|
check: []string{"./bucket3/prefix/okfile2.txt", "/bucket3/prefix/okfile2.txt", "bucket3/prefix/okfile2.txt", "bucket3/prefix", "bucket3"},
|
|
exist: true,
|
|
},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.in, func(t *testing.T) {
|
|
dut.input <- tt.in
|
|
dut.input <- "" // Sending empty string ensures the previous is added to filter.
|
|
dut.mu.Lock()
|
|
defer dut.mu.Unlock()
|
|
if len(tt.check) == 0 {
|
|
got := dut.Current.bf.containsDir(tt.in)
|
|
if got != tt.exist {
|
|
// For unlimited tests this could lead to false positives,
|
|
// but it should be deterministic.
|
|
t.Errorf("entry %q, got: %v, want %v", tt.in, got, tt.exist)
|
|
}
|
|
return
|
|
}
|
|
for _, check := range tt.check {
|
|
got := dut.Current.bf.containsDir(check)
|
|
if got != tt.exist {
|
|
// For unlimited tests this could lead to false positives,
|
|
// but it should be deterministic.
|
|
t.Errorf("entry %q, check: %q, got: %v, want %v", tt.in, check, got, tt.exist)
|
|
}
|
|
continue
|
|
}
|
|
})
|
|
}
|
|
// Cycle to history
|
|
req := bloomFilterRequest{
|
|
Oldest: 1,
|
|
Current: 2,
|
|
}
|
|
|
|
_, err = dut.cycleFilter(ctx, req)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
dut.input <- "cycle2/file.txt"
|
|
dut.input <- "" // Sending empty string ensures the previous is added to filter.
|
|
|
|
tests = append(tests, struct {
|
|
in string
|
|
check []string
|
|
exist bool
|
|
}{in: "cycle2/file.txt", exist: true})
|
|
|
|
// Shut down
|
|
cancel()
|
|
<-dut.saveExited
|
|
|
|
if dut.current() != 2 {
|
|
t.Fatal("wrong current idx after save. want 2, got:", dut.current())
|
|
}
|
|
|
|
ctx, cancel = context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
// Reload...
|
|
dut = newDataUpdateTracker()
|
|
dut.start(ctx, tmpDir)
|
|
|
|
if dut.current() != 2 {
|
|
t.Fatal("current idx after load not preserved. want 2, got:", dut.current())
|
|
}
|
|
req = bloomFilterRequest{
|
|
Oldest: 1,
|
|
Current: 3,
|
|
}
|
|
bfr2, err := dut.cycleFilter(ctx, req)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if !bfr2.Complete {
|
|
t.Fatal("Wanted complete, didn't get it")
|
|
}
|
|
if bfr2.CurrentIdx != 3 {
|
|
t.Fatal("wanted index 3, got", bfr2.CurrentIdx)
|
|
}
|
|
if bfr2.OldestIdx != 1 {
|
|
t.Fatal("wanted oldest index 3, got", bfr2.OldestIdx)
|
|
}
|
|
|
|
// Rerun test with returned bfr2
|
|
bf := dut.newBloomFilter()
|
|
_, err = bf.ReadFrom(bytes.NewBuffer(bfr2.Filter))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.in+"-reloaded", func(t *testing.T) {
|
|
if len(tt.check) == 0 {
|
|
got := bf.containsDir(tt.in)
|
|
if got != tt.exist {
|
|
// For unlimited tests this could lead to false positives,
|
|
// but it should be deterministic.
|
|
t.Errorf("entry %q, got: %v, want %v", tt.in, got, tt.exist)
|
|
}
|
|
return
|
|
}
|
|
for _, check := range tt.check {
|
|
got := bf.containsDir(check)
|
|
if got != tt.exist {
|
|
// For unlimited tests this could lead to false positives,
|
|
// but it should be deterministic.
|
|
t.Errorf("entry %q, check: %q, got: %v, want %v", tt.in, check, got, tt.exist)
|
|
}
|
|
continue
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func BenchmarkDataUpdateTracker(b *testing.B) {
|
|
dut := newDataUpdateTracker()
|
|
// Change some defaults.
|
|
dut.debug = false
|
|
dut.input = make(chan string)
|
|
dut.save = make(chan struct{})
|
|
|
|
defer addTestingLogging(b)()
|
|
|
|
dut.Current.bf = dut.newBloomFilter()
|
|
// We do this unbuffered. This will very significantly reduce throughput, so this is a worst case.
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
go dut.startCollector(ctx)
|
|
input := make([]string, 1000)
|
|
rng := rand.New(rand.NewSource(0xabad1dea))
|
|
tmp := []string{"bucket", "aprefix", "nextprefixlevel", "maybeobjname", "evendeeper", "ok-one-morelevel", "final.object"}
|
|
for i := range input {
|
|
tmp := tmp[:1+rng.Intn(cap(tmp)-1)]
|
|
input[i] = path.Join(tmp...)
|
|
}
|
|
b.SetBytes(1)
|
|
b.ResetTimer()
|
|
b.ReportAllocs()
|
|
for i := 0; i < b.N; i++ {
|
|
dut.input <- input[rng.Intn(len(input))]
|
|
}
|
|
}
|