mirror of
https://github.com/minio/minio.git
synced 2025-01-11 23:13:23 -05:00
ad86454580
Continuing from PR 157ed65c35
Our posix.go implementation did not handle I/O errors
properly on the disks, this led to situations where
top-level callers such as ListObjects might return early
without even verifying all the available disks.
This commit tries to address this in Kubernetes, drbd/nbd based
persistent volumes which can disconnect under load and
result in the situations with disks return I/O errors.
This commit also simplifies listing operation, listing
never returns any error. We can avoid this since we pretty
much ignore most of the errors anyways. When objects are
accessed directly we return proper errors.
645 lines
16 KiB
Go
645 lines
16 KiB
Go
/*
|
|
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"os"
|
|
"reflect"
|
|
"sort"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
// Fixed volume name that could be used across tests
|
|
const volume = "testvolume"
|
|
|
|
// Test for delayIsLeafCheck.
|
|
func TestDelayIsLeafCheck(t *testing.T) {
|
|
testCases := []struct {
|
|
entries []string
|
|
delay bool
|
|
}{
|
|
// Test cases where isLeaf check can't be delayed.
|
|
{
|
|
[]string{"a-b/", "a/"},
|
|
false,
|
|
},
|
|
{
|
|
[]string{"a%b/", "a/"},
|
|
false,
|
|
},
|
|
{
|
|
[]string{"a-b-c", "a-b/"},
|
|
false,
|
|
},
|
|
|
|
// Test cases where isLeaf check can be delayed.
|
|
{
|
|
[]string{"a-b/", "aa/"},
|
|
true,
|
|
},
|
|
{
|
|
[]string{"a", "a-b"},
|
|
true,
|
|
},
|
|
{
|
|
[]string{"aaa", "bbb"},
|
|
true,
|
|
},
|
|
}
|
|
for i, testCase := range testCases {
|
|
expected := testCase.delay
|
|
got := delayIsLeafCheck(testCase.entries)
|
|
if expected != got {
|
|
t.Errorf("Test %d : Expected %t got %t", i+1, expected, got)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Test for filterMatchingPrefix.
|
|
func TestFilterMatchingPrefix(t *testing.T) {
|
|
entries := []string{"a", "aab", "ab", "abbbb", "zzz"}
|
|
testCases := []struct {
|
|
prefixEntry string
|
|
result []string
|
|
}{
|
|
{
|
|
// Empty prefix should match all entries.
|
|
"",
|
|
[]string{"a", "aab", "ab", "abbbb", "zzz"},
|
|
},
|
|
{
|
|
"a",
|
|
[]string{"a", "aab", "ab", "abbbb"},
|
|
},
|
|
{
|
|
"aa",
|
|
[]string{"aab"},
|
|
},
|
|
{
|
|
// Does not match any of the entries.
|
|
"c",
|
|
[]string{},
|
|
},
|
|
}
|
|
for i, testCase := range testCases {
|
|
expected := testCase.result
|
|
got := filterMatchingPrefix(entries, testCase.prefixEntry)
|
|
if !reflect.DeepEqual(expected, got) {
|
|
t.Errorf("Test %d : expected %v, got %v", i+1, expected, got)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Helper function that creates a volume and files in it.
|
|
func createNamespace(disk StorageAPI, volume string, files []string) error {
|
|
// Make a volume.
|
|
err := disk.MakeVol(volume)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Create files.
|
|
for _, file := range files {
|
|
err = disk.AppendFile(volume, file, []byte{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Test if tree walker returns entries matching prefix alone are received
|
|
// when a non empty prefix is supplied.
|
|
func testTreeWalkPrefix(t *testing.T, listDir listDirFunc, isLeaf isLeafFunc, isLeafDir isLeafDirFunc) {
|
|
// Start the tree walk go-routine.
|
|
prefix := "d/"
|
|
endWalkCh := make(chan struct{})
|
|
twResultCh := startTreeWalk(context.Background(), volume, prefix, "", true, listDir, isLeaf, isLeafDir, endWalkCh)
|
|
|
|
// Check if all entries received on the channel match the prefix.
|
|
for res := range twResultCh {
|
|
if !hasPrefix(res.entry, prefix) {
|
|
t.Errorf("Entry %s doesn't match prefix %s", res.entry, prefix)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Test if entries received on tree walk's channel appear after the supplied marker.
|
|
func testTreeWalkMarker(t *testing.T, listDir listDirFunc, isLeaf isLeafFunc, isLeafDir isLeafDirFunc) {
|
|
// Start the tree walk go-routine.
|
|
prefix := ""
|
|
endWalkCh := make(chan struct{})
|
|
twResultCh := startTreeWalk(context.Background(), volume, prefix, "d/g", true, listDir, isLeaf, isLeafDir, endWalkCh)
|
|
|
|
// Check if only 3 entries, namely d/g/h, i/j/k, lmn are received on the channel.
|
|
expectedCount := 3
|
|
actualCount := 0
|
|
for range twResultCh {
|
|
actualCount++
|
|
}
|
|
if expectedCount != actualCount {
|
|
t.Errorf("Expected %d entries, actual no. of entries were %d", expectedCount, actualCount)
|
|
}
|
|
}
|
|
|
|
// Test tree-walk.
|
|
func TestTreeWalk(t *testing.T) {
|
|
fsDir, err := ioutil.TempDir(globalTestTmpDir, "minio-")
|
|
if err != nil {
|
|
t.Fatalf("Unable to create tmp directory: %s", err)
|
|
}
|
|
endpoints := mustGetNewEndpointList(fsDir)
|
|
disk, err := newStorageAPI(endpoints[0])
|
|
if err != nil {
|
|
t.Fatalf("Unable to create StorageAPI: %s", err)
|
|
}
|
|
|
|
var files = []string{
|
|
"d/e",
|
|
"d/f",
|
|
"d/g/h",
|
|
"i/j/k",
|
|
"lmn",
|
|
}
|
|
err = createNamespace(disk, volume, files)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
isLeaf := func(volume, prefix string) bool {
|
|
return !hasSuffix(prefix, slashSeparator)
|
|
}
|
|
|
|
isLeafDir := func(volume, prefix string) bool {
|
|
entries, listErr := disk.ListDir(volume, prefix, 1)
|
|
if listErr != nil {
|
|
return false
|
|
}
|
|
return len(entries) == 0
|
|
}
|
|
|
|
listDir := listDirFactory(context.Background(), isLeaf, disk)
|
|
// Simple test for prefix based walk.
|
|
testTreeWalkPrefix(t, listDir, isLeaf, isLeafDir)
|
|
// Simple test when marker is set.
|
|
testTreeWalkMarker(t, listDir, isLeaf, isLeafDir)
|
|
err = os.RemoveAll(fsDir)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
// Test if tree walk go-routine exits cleanly if tree walk is aborted because of timeout.
|
|
func TestTreeWalkTimeout(t *testing.T) {
|
|
fsDir, err := ioutil.TempDir(globalTestTmpDir, "minio-")
|
|
if err != nil {
|
|
t.Fatalf("Unable to create tmp directory: %s", err)
|
|
}
|
|
endpoints := mustGetNewEndpointList(fsDir)
|
|
disk, err := newStorageAPI(endpoints[0])
|
|
if err != nil {
|
|
t.Fatalf("Unable to create StorageAPI: %s", err)
|
|
}
|
|
var myfiles []string
|
|
// Create maxObjectsList+1 number of entries.
|
|
for i := 0; i < maxObjectList+1; i++ {
|
|
myfiles = append(myfiles, fmt.Sprintf("file.%d", i))
|
|
}
|
|
err = createNamespace(disk, volume, myfiles)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
isLeaf := func(volume, prefix string) bool {
|
|
return !hasSuffix(prefix, slashSeparator)
|
|
}
|
|
|
|
isLeafDir := func(volume, prefix string) bool {
|
|
entries, listErr := disk.ListDir(volume, prefix, 1)
|
|
if listErr != nil {
|
|
return false
|
|
}
|
|
return len(entries) == 0
|
|
}
|
|
|
|
listDir := listDirFactory(context.Background(), isLeaf, disk)
|
|
|
|
// TreeWalk pool with 2 seconds timeout for tree-walk go routines.
|
|
pool := newTreeWalkPool(2 * time.Second)
|
|
|
|
endWalkCh := make(chan struct{})
|
|
prefix := ""
|
|
marker := ""
|
|
recursive := true
|
|
resultCh := startTreeWalk(context.Background(), volume, prefix, marker, recursive, listDir, isLeaf, isLeafDir, endWalkCh)
|
|
|
|
params := listParams{
|
|
bucket: volume,
|
|
recursive: recursive,
|
|
}
|
|
// Add Treewalk to the pool.
|
|
pool.Set(params, resultCh, endWalkCh)
|
|
|
|
// Wait for the Treewalk to timeout.
|
|
<-time.After(3 * time.Second)
|
|
|
|
// Read maxObjectList number of entries from the channel.
|
|
// maxObjectsList number of entries would have been filled into the resultCh
|
|
// buffered channel. After the timeout resultCh would get closed and hence the
|
|
// maxObjectsList+1 entry would not be sent in the channel.
|
|
i := 0
|
|
for range resultCh {
|
|
i++
|
|
if i == maxObjectList {
|
|
break
|
|
}
|
|
}
|
|
|
|
// The last entry will not be received as the Treewalk goroutine would have exited.
|
|
_, ok := <-resultCh
|
|
if ok {
|
|
t.Error("Tree-walk go routine has not exited after timeout.")
|
|
}
|
|
err = os.RemoveAll(fsDir)
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
}
|
|
|
|
// Test ListDir - listDir should list entries from the first disk, if the first disk is down,
|
|
// it should list from the next disk.
|
|
func TestListDir(t *testing.T) {
|
|
file1 := "file1"
|
|
file2 := "file2"
|
|
// Create two backend directories fsDir1 and fsDir2.
|
|
fsDir1, err := ioutil.TempDir(globalTestTmpDir, "minio-")
|
|
if err != nil {
|
|
t.Errorf("Unable to create tmp directory: %s", err)
|
|
}
|
|
fsDir2, err := ioutil.TempDir(globalTestTmpDir, "minio-")
|
|
if err != nil {
|
|
t.Errorf("Unable to create tmp directory: %s", err)
|
|
}
|
|
|
|
// Create two StorageAPIs disk1 and disk2.
|
|
endpoints := mustGetNewEndpointList(fsDir1)
|
|
disk1, err := newStorageAPI(endpoints[0])
|
|
if err != nil {
|
|
t.Errorf("Unable to create StorageAPI: %s", err)
|
|
}
|
|
|
|
endpoints = mustGetNewEndpointList(fsDir2)
|
|
disk2, err := newStorageAPI(endpoints[0])
|
|
if err != nil {
|
|
t.Errorf("Unable to create StorageAPI: %s", err)
|
|
}
|
|
|
|
// create listDir function.
|
|
listDir := listDirFactory(context.Background(), func(volume, prefix string) bool {
|
|
return !hasSuffix(prefix, slashSeparator)
|
|
}, disk1, disk2)
|
|
|
|
// Create file1 in fsDir1 and file2 in fsDir2.
|
|
disks := []StorageAPI{disk1, disk2}
|
|
for i, disk := range disks {
|
|
err = createNamespace(disk, volume, []string{fmt.Sprintf("file%d", i+1)})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
// Should list "file1" from fsDir1.
|
|
entries, _ := listDir(volume, "", "")
|
|
if len(entries) != 2 {
|
|
t.Fatal("Expected the number of entries to be 2")
|
|
}
|
|
if entries[0] != file1 {
|
|
t.Fatal("Expected the entry to be file1")
|
|
}
|
|
if entries[1] != file2 {
|
|
t.Fatal("Expected the entry to be file2")
|
|
}
|
|
|
|
// Remove fsDir1, list should return entries from fsDir2
|
|
err = os.RemoveAll(fsDir1)
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
|
|
// Should list "file2" from fsDir2.
|
|
entries, _ = listDir(volume, "", "")
|
|
if len(entries) != 1 {
|
|
t.Fatal("Expected the number of entries to be 1")
|
|
}
|
|
if entries[0] != file2 {
|
|
t.Fatal("Expected the entry to be file2")
|
|
}
|
|
err = os.RemoveAll(fsDir2)
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
}
|
|
|
|
// TestRecursiveWalk - tests if treeWalk returns entries correctly with and
|
|
// without recursively traversing prefixes.
|
|
func TestRecursiveTreeWalk(t *testing.T) {
|
|
// Create a backend directories fsDir1.
|
|
fsDir1, err := ioutil.TempDir(globalTestTmpDir, "minio-")
|
|
if err != nil {
|
|
t.Fatalf("Unable to create tmp directory: %s", err)
|
|
}
|
|
|
|
endpoints := mustGetNewEndpointList(fsDir1)
|
|
disk1, err := newStorageAPI(endpoints[0])
|
|
if err != nil {
|
|
t.Fatalf("Unable to create StorageAPI: %s", err)
|
|
}
|
|
|
|
// Simple isLeaf check, returns true if there is no trailing "/"
|
|
isLeaf := func(volume, prefix string) bool {
|
|
return !hasSuffix(prefix, slashSeparator)
|
|
}
|
|
|
|
isLeafDir := func(volume, prefix string) bool {
|
|
entries, listErr := disk1.ListDir(volume, prefix, 1)
|
|
if listErr != nil {
|
|
return false
|
|
}
|
|
return len(entries) == 0
|
|
}
|
|
|
|
// Create listDir function.
|
|
listDir := listDirFactory(context.Background(), isLeaf, disk1)
|
|
|
|
// Create the namespace.
|
|
var files = []string{
|
|
"d/e",
|
|
"d/f",
|
|
"d/g/h",
|
|
"i/j/k",
|
|
"lmn",
|
|
}
|
|
err = createNamespace(disk1, volume, files)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
endWalkCh := make(chan struct{})
|
|
testCases := []struct {
|
|
prefix string
|
|
marker string
|
|
recursive bool
|
|
expected map[string]struct{}
|
|
}{
|
|
// with no prefix, no marker and no recursive traversal
|
|
{"", "", false, map[string]struct{}{
|
|
"d/": {},
|
|
"i/": {},
|
|
"lmn": {},
|
|
}},
|
|
// with no prefix, no marker and recursive traversal
|
|
{"", "", true, map[string]struct{}{
|
|
"d/f": {},
|
|
"d/g/h": {},
|
|
"d/e": {},
|
|
"i/j/k": {},
|
|
"lmn": {},
|
|
}},
|
|
// with no prefix, marker and no recursive traversal
|
|
{"", "d/e", false, map[string]struct{}{
|
|
"d/f": {},
|
|
"d/g/": {},
|
|
"i/": {},
|
|
"lmn": {},
|
|
}},
|
|
// with no prefix, marker and recursive traversal
|
|
{"", "d/e", true, map[string]struct{}{
|
|
"d/f": {},
|
|
"d/g/h": {},
|
|
"i/j/k": {},
|
|
"lmn": {},
|
|
}},
|
|
// with prefix, no marker and no recursive traversal
|
|
{"d/", "", false, map[string]struct{}{
|
|
"d/e": {},
|
|
"d/f": {},
|
|
"d/g/": {},
|
|
}},
|
|
// with prefix, no marker and no recursive traversal
|
|
{"d/", "", true, map[string]struct{}{
|
|
"d/e": {},
|
|
"d/f": {},
|
|
"d/g/h": {},
|
|
}},
|
|
// with prefix, marker and no recursive traversal
|
|
{"d/", "d/e", false, map[string]struct{}{
|
|
"d/f": {},
|
|
"d/g/": {},
|
|
}},
|
|
// with prefix, marker and recursive traversal
|
|
{"d/", "d/e", true, map[string]struct{}{
|
|
"d/f": {},
|
|
"d/g/h": {},
|
|
}},
|
|
}
|
|
for i, testCase := range testCases {
|
|
for entry := range startTreeWalk(context.Background(), volume,
|
|
testCase.prefix, testCase.marker, testCase.recursive,
|
|
listDir, isLeaf, isLeafDir, endWalkCh) {
|
|
if _, found := testCase.expected[entry.entry]; !found {
|
|
t.Errorf("Test %d: Expected %s, but couldn't find", i+1, entry.entry)
|
|
}
|
|
}
|
|
}
|
|
err = os.RemoveAll(fsDir1)
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
}
|
|
|
|
func TestSortedness(t *testing.T) {
|
|
// Create a backend directories fsDir1.
|
|
fsDir1, err := ioutil.TempDir(globalTestTmpDir, "minio-")
|
|
if err != nil {
|
|
t.Errorf("Unable to create tmp directory: %s", err)
|
|
}
|
|
|
|
endpoints := mustGetNewEndpointList(fsDir1)
|
|
disk1, err := newStorageAPI(endpoints[0])
|
|
if err != nil {
|
|
t.Fatalf("Unable to create StorageAPI: %s", err)
|
|
}
|
|
|
|
// Simple isLeaf check, returns true if there is no trailing "/"
|
|
isLeaf := func(volume, prefix string) bool {
|
|
return !hasSuffix(prefix, slashSeparator)
|
|
}
|
|
|
|
isLeafDir := func(volume, prefix string) bool {
|
|
entries, listErr := disk1.ListDir(volume, prefix, 1)
|
|
if listErr != nil {
|
|
return false
|
|
}
|
|
return len(entries) == 0
|
|
}
|
|
|
|
// Create listDir function.
|
|
listDir := listDirFactory(context.Background(), isLeaf, disk1)
|
|
|
|
// Create the namespace.
|
|
var files = []string{
|
|
"d/e",
|
|
"d/f",
|
|
"d/g/h",
|
|
"i/j/k",
|
|
"lmn",
|
|
}
|
|
err = createNamespace(disk1, volume, files)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
endWalkCh := make(chan struct{})
|
|
testCases := []struct {
|
|
prefix string
|
|
marker string
|
|
recursive bool
|
|
}{
|
|
// with no prefix, no marker and no recursive traversal
|
|
{"", "", false},
|
|
// with no prefix, no marker and recursive traversal
|
|
{"", "", true},
|
|
// with no prefix, marker and no recursive traversal
|
|
{"", "d/e", false},
|
|
// with no prefix, marker and recursive traversal
|
|
{"", "d/e", true},
|
|
// with prefix, no marker and no recursive traversal
|
|
{"d/", "", false},
|
|
// with prefix, no marker and no recursive traversal
|
|
{"d/", "", true},
|
|
// with prefix, marker and no recursive traversal
|
|
{"d/", "d/e", false},
|
|
// with prefix, marker and recursive traversal
|
|
{"d/", "d/e", true},
|
|
}
|
|
for i, test := range testCases {
|
|
var actualEntries []string
|
|
for entry := range startTreeWalk(context.Background(), volume,
|
|
test.prefix, test.marker, test.recursive,
|
|
listDir, isLeaf, isLeafDir, endWalkCh) {
|
|
actualEntries = append(actualEntries, entry.entry)
|
|
}
|
|
if !sort.IsSorted(sort.StringSlice(actualEntries)) {
|
|
t.Error(i+1, "Expected entries to be sort, but it wasn't")
|
|
}
|
|
}
|
|
|
|
// Remove directory created for testing
|
|
err = os.RemoveAll(fsDir1)
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
}
|
|
|
|
func TestTreeWalkIsEnd(t *testing.T) {
|
|
// Create a backend directories fsDir1.
|
|
fsDir1, err := ioutil.TempDir(globalTestTmpDir, "minio-")
|
|
if err != nil {
|
|
t.Errorf("Unable to create tmp directory: %s", err)
|
|
}
|
|
|
|
endpoints := mustGetNewEndpointList(fsDir1)
|
|
disk1, err := newStorageAPI(endpoints[0])
|
|
if err != nil {
|
|
t.Fatalf("Unable to create StorageAPI: %s", err)
|
|
}
|
|
|
|
isLeaf := func(volume, prefix string) bool {
|
|
return !hasSuffix(prefix, slashSeparator)
|
|
}
|
|
|
|
isLeafDir := func(volume, prefix string) bool {
|
|
entries, listErr := disk1.ListDir(volume, prefix, 1)
|
|
if listErr != nil {
|
|
return false
|
|
}
|
|
return len(entries) == 0
|
|
}
|
|
|
|
// Create listDir function.
|
|
listDir := listDirFactory(context.Background(), isLeaf, disk1)
|
|
|
|
// Create the namespace.
|
|
var files = []string{
|
|
"d/e",
|
|
"d/f",
|
|
"d/g/h",
|
|
"i/j/k",
|
|
"lmn",
|
|
}
|
|
err = createNamespace(disk1, volume, files)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
endWalkCh := make(chan struct{})
|
|
testCases := []struct {
|
|
prefix string
|
|
marker string
|
|
recursive bool
|
|
expectedEntry string
|
|
}{
|
|
// with no prefix, no marker and no recursive traversal
|
|
{"", "", false, "lmn"},
|
|
// with no prefix, no marker and recursive traversal
|
|
{"", "", true, "lmn"},
|
|
// with no prefix, marker and no recursive traversal
|
|
{"", "d/e", false, "lmn"},
|
|
// with no prefix, marker and recursive traversal
|
|
{"", "d/e", true, "lmn"},
|
|
// with prefix, no marker and no recursive traversal
|
|
{"d/", "", false, "d/g/"},
|
|
// with prefix, no marker and no recursive traversal
|
|
{"d/", "", true, "d/g/h"},
|
|
// with prefix, marker and no recursive traversal
|
|
{"d/", "d/e", false, "d/g/"},
|
|
// with prefix, marker and recursive traversal
|
|
{"d/", "d/e", true, "d/g/h"},
|
|
}
|
|
for i, test := range testCases {
|
|
var entry treeWalkResult
|
|
for entry = range startTreeWalk(context.Background(), volume, test.prefix, test.marker, test.recursive, listDir, isLeaf, isLeafDir, endWalkCh) {
|
|
}
|
|
if entry.entry != test.expectedEntry {
|
|
t.Errorf("Test %d: Expected entry %s, but received %s with the EOF marker", i, test.expectedEntry, entry.entry)
|
|
}
|
|
if !entry.end {
|
|
t.Errorf("Test %d: Last entry %s, doesn't have EOF marker set", i, entry.entry)
|
|
}
|
|
}
|
|
|
|
// Remove directory created for testing
|
|
err = os.RemoveAll(fsDir1)
|
|
if err != nil {
|
|
t.Error(err)
|
|
}
|
|
}
|