Offload listing to posix layer (#7611)

This PR adds one API WalkCh which
sorts and sends list over the network

Each disk walks independently in a sorted manner.
This commit is contained in:
Harshavardhana 2019-05-14 13:49:10 -07:00 committed by kannappanr
parent a343d14f19
commit b3f22eac56
11 changed files with 692 additions and 25 deletions

143
cmd/merge-walk-pool.go Normal file
View File

@ -0,0 +1,143 @@
/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"reflect"
"sync"
"time"
)
const (
globalMergeLookupTimeout = time.Minute * 1 // 1 minutes.
)
// mergeWalk - represents the go routine that does the merge walk.
type mergeWalk struct {
entryChs []FileInfoCh
endWalkCh chan struct{} // To signal when mergeWalk go-routine should end.
endTimerCh chan<- struct{} // To signal when timer go-routine should end.
}
// MergeWalkPool - pool of mergeWalk go routines.
// A mergeWalk is added to the pool by Set() and removed either by
// doing a Release() or if the concerned timer goes off.
// mergeWalkPool's purpose is to maintain active mergeWalk go-routines in a map so that
// it can be looked up across related list calls.
type MergeWalkPool struct {
pool map[listParams][]mergeWalk
timeOut time.Duration
lock *sync.Mutex
}
// NewMergeWalkPool - initialize new tree walk pool.
func NewMergeWalkPool(timeout time.Duration) *MergeWalkPool {
tPool := &MergeWalkPool{
pool: make(map[listParams][]mergeWalk),
timeOut: timeout,
lock: &sync.Mutex{},
}
return tPool
}
// Release - selects a mergeWalk from the pool based on the input
// listParams, removes it from the pool, and returns the MergeWalkResult
// channel.
// Returns nil if listParams does not have an asccociated mergeWalk.
func (t MergeWalkPool) Release(params listParams) ([]FileInfoCh, chan struct{}) {
t.lock.Lock()
defer t.lock.Unlock()
walks, ok := t.pool[params] // Pick the valid walks.
if ok {
if len(walks) > 0 {
// Pop out the first valid walk entry.
walk := walks[0]
walks = walks[1:]
if len(walks) > 0 {
t.pool[params] = walks
} else {
delete(t.pool, params)
}
walk.endTimerCh <- struct{}{}
return walk.entryChs, walk.endWalkCh
}
}
// Release return nil if params not found.
return nil, nil
}
// Set - adds a mergeWalk to the mergeWalkPool.
// Also starts a timer go-routine that ends when:
// 1) time.After() expires after t.timeOut seconds.
// The expiration is needed so that the mergeWalk go-routine resources are freed after a timeout
// if the S3 client does only partial listing of objects.
// 2) Relase() signals the timer go-routine to end on endTimerCh.
// During listing the timer should not timeout and end the mergeWalk go-routine, hence the
// timer go-routine should be ended.
func (t MergeWalkPool) Set(params listParams, resultChs []FileInfoCh, endWalkCh chan struct{}) {
t.lock.Lock()
defer t.lock.Unlock()
// Should be a buffered channel so that Release() never blocks.
endTimerCh := make(chan struct{}, 1)
walkInfo := mergeWalk{
entryChs: resultChs,
endWalkCh: endWalkCh,
endTimerCh: endTimerCh,
}
// Append new walk info.
t.pool[params] = append(t.pool[params], walkInfo)
// Timer go-routine which times out after t.timeOut seconds.
go func(endTimerCh <-chan struct{}, walkInfo mergeWalk) {
select {
// Wait until timeOut
case <-time.After(t.timeOut):
// Timeout has expired. Remove the mergeWalk from mergeWalkPool and
// end the mergeWalk go-routine.
t.lock.Lock()
walks, ok := t.pool[params]
if ok {
// Trick of filtering without allocating
// https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
nwalks := walks[:0]
// Look for walkInfo, remove it from the walks list.
for _, walk := range walks {
if !reflect.DeepEqual(walk, walkInfo) {
nwalks = append(nwalks, walk)
}
}
if len(nwalks) == 0 {
// No more mergeWalk go-routines associated with listParams
// hence remove map entry.
delete(t.pool, params)
} else {
// There are more mergeWalk go-routines associated with listParams
// hence save the list in the map.
t.pool[params] = nwalks
}
}
// Signal the mergeWalk go-routine to die.
close(endWalkCh)
t.lock.Unlock()
case <-endTimerCh:
return
}
}(endTimerCh, walkInfo)
}

103
cmd/merge-walk-pool_test.go Normal file
View File

@ -0,0 +1,103 @@
/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"testing"
"time"
)
// Test if tree walker go-routine is removed from the pool after timeout
// and that is available in the pool before the timeout.
func TestMergeWalkPoolBasic(t *testing.T) {
// Create a treeWalkPool
tw := NewMergeWalkPool(1 * time.Second)
// Create sample params
params := listParams{
bucket: "test-bucket",
}
endWalkCh := make(chan struct{})
// Add a treeWalk to the pool
tw.Set(params, []FileInfoCh{}, endWalkCh)
// Wait for treeWalkPool timeout to happen
<-time.After(2 * time.Second)
if c1, _ := tw.Release(params); c1 != nil {
t.Error("treeWalk go-routine must have been freed")
}
// Add the treeWalk back to the pool
endWalkCh = make(chan struct{})
tw.Set(params, []FileInfoCh{}, endWalkCh)
// Release the treeWalk before timeout
select {
case <-time.After(1 * time.Second):
break
default:
if c1, _ := tw.Release(params); c1 == nil {
t.Error("treeWalk go-routine got freed before timeout")
}
}
}
// Test if multiple merge walkers for the same listParams are managed as expected by the pool.
func TestManyMergeWalksSameParam(t *testing.T) {
// Create a treeWalkPool.
tw := NewMergeWalkPool(5 * time.Second)
// Create sample params.
params := listParams{
bucket: "test-bucket",
}
select {
// This timeout is an upper-bound. This is started
// before the first treeWalk go-routine's timeout period starts.
case <-time.After(5 * time.Second):
break
default:
// Create many treeWalk go-routines for the same params.
for i := 0; i < 10; i++ {
endWalkCh := make(chan struct{})
walkChs := make([]FileInfoCh, 0)
tw.Set(params, walkChs, endWalkCh)
}
tw.lock.Lock()
if walks, ok := tw.pool[params]; ok {
if len(walks) != 10 {
t.Error("There aren't as many walks as were Set")
}
}
tw.lock.Unlock()
for i := 0; i < 10; i++ {
tw.lock.Lock()
if walks, ok := tw.pool[params]; ok {
// Before ith Release we should have 10-i treeWalk go-routines.
if 10-i != len(walks) {
t.Error("There aren't as many walks as were Set")
}
}
tw.lock.Unlock()
tw.Release(params)
}
}
}

View File

@ -111,6 +111,13 @@ func (d *naughtyDisk) DeleteVol(volume string) (err error) {
return d.disk.DeleteVol(volume) return d.disk.DeleteVol(volume)
} }
func (d *naughtyDisk) Walk(volume, path, marker string, recursive bool, leafFile string, readMetadataFn readMetadataFunc, endWalkCh chan struct{}) (chan FileInfo, error) {
if err := d.calcError(); err != nil {
return nil, err
}
return d.disk.Walk(volume, path, marker, recursive, leafFile, readMetadataFn, endWalkCh)
}
func (d *naughtyDisk) ListDir(volume, path string, count int, leafFile string) (entries []string, err error) { func (d *naughtyDisk) ListDir(volume, path string, count int, leafFile string) (entries []string, err error) {
if err := d.calcError(); err != nil { if err := d.calcError(); err != nil {
return []string{}, err return []string{}, err

View File

@ -1,5 +1,5 @@
/* /*
* MinIO Cloud Storage, (C) 2016, 2017, 2018 MinIO, Inc. * MinIO Cloud Storage, (C) 2016-2019 MinIO, Inc.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -25,6 +25,7 @@ import (
slashpath "path" slashpath "path"
"path/filepath" "path/filepath"
"runtime" "runtime"
"sort"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -45,7 +46,9 @@ const (
diskMinTotalSpace = diskMinFreeSpace // Min 900MiB total space. diskMinTotalSpace = diskMinFreeSpace // Min 900MiB total space.
maxAllowedIOError = 5 maxAllowedIOError = 5
posixWriteBlockSize = 4 * humanize.MiByte posixWriteBlockSize = 4 * humanize.MiByte
directioAlignSize = 4096 // DirectIO alignment needs to be 4K. Defined here as directio.AlignSize is defined as 0 in MacOS causing divide by 0 error. // DirectIO alignment needs to be 4K. Defined here as
// directio.AlignSize is defined as 0 in MacOS causing divide by 0 error.
directioAlignSize = 4096
) )
// isValidVolname verifies a volname name in accordance with object // isValidVolname verifies a volname name in accordance with object
@ -642,6 +645,85 @@ func (s *posix) DeleteVol(volume string) (err error) {
return nil return nil
} }
// Walk - is a sorted walker which returns file entries in lexically
// sorted order, additionally along with metadata about each of those entries.
func (s *posix) Walk(volume, dirPath, marker string, recursive bool, leafFile string,
readMetadataFn readMetadataFunc, endWalkCh chan struct{}) (ch chan FileInfo, err error) {
defer func() {
if err == errFaultyDisk {
atomic.AddInt32(&s.ioErrCount, 1)
}
}()
if atomic.LoadInt32(&s.ioErrCount) > maxAllowedIOError {
return nil, errFaultyDisk
}
if err = s.checkDiskFound(); err != nil {
return nil, err
}
// Verify if volume is valid and it exists.
volumeDir, err := s.getVolDir(volume)
if err != nil {
return nil, err
}
// Stat a volume entry.
_, err = os.Stat(volumeDir)
if err != nil {
if os.IsNotExist(err) {
return nil, errVolumeNotFound
} else if isSysErrIO(err) {
return nil, errFaultyDisk
}
return nil, err
}
ch = make(chan FileInfo)
go func() {
defer close(ch)
listDir := func(volume, dirPath, dirEntry string) (entries []string) {
entries, err := s.ListDir(volume, dirPath, -1, leafFile)
if err != nil {
return
}
sort.Strings(entries)
return filterMatchingPrefix(entries, dirEntry)
}
walkResultCh := startTreeWalk(context.Background(), volume, dirPath, marker, recursive, listDir, endWalkCh)
for {
walkResult, ok := <-walkResultCh
if !ok {
return
}
var fi FileInfo
if hasSuffix(walkResult.entry, slashSeparator) {
fi = FileInfo{
Volume: volume,
Name: walkResult.entry,
Mode: os.ModeDir,
}
} else {
buf, err := s.ReadAll(volume, pathJoin(walkResult.entry, leafFile))
if err != nil {
continue
}
fi = readMetadataFn(buf, volume, walkResult.entry)
}
select {
case ch <- fi:
case <-endWalkCh:
return
}
}
}()
return ch, nil
}
// ListDir - return all the entries at the given directory path. // ListDir - return all the entries at the given directory path.
// If an entry is a directory it will be returned with a trailing "/". // If an entry is a directory it will be returned with a trailing "/".
func (s *posix) ListDir(volume, dirPath string, count int, leafFile string) (entries []string, err error) { func (s *posix) ListDir(volume, dirPath string, count int, leafFile string) (entries []string, err error) {

View File

@ -30,6 +30,13 @@ type VolInfo struct {
Created time.Time Created time.Time
} }
// FilesInfo represent a list of files, additionally
// indicates if the list is last.
type FilesInfo struct {
Files []FileInfo
IsTruncated bool
}
// FileInfo - represents file stat information. // FileInfo - represents file stat information.
type FileInfo struct { type FileInfo struct {
// Name of the volume. // Name of the volume.
@ -46,4 +53,12 @@ type FileInfo struct {
// File mode bits. // File mode bits.
Mode os.FileMode Mode os.FileMode
// File metadata
Metadata map[string]string
// All the parts per object.
Parts []ObjectPartInfo
Quorum int
} }

View File

@ -38,6 +38,10 @@ type StorageAPI interface {
StatVol(volume string) (vol VolInfo, err error) StatVol(volume string) (vol VolInfo, err error)
DeleteVol(volume string) (err error) DeleteVol(volume string) (err error)
// Walk in sorted order directly on disk.
Walk(volume, dirPath string, marker string, recursive bool, leafFile string,
readMetadataFn readMetadataFunc, endWalkCh chan struct{}) (chan FileInfo, error)
// File operations. // File operations.
ListDir(volume, dirPath string, count int, leafFile string) ([]string, error) ListDir(volume, dirPath string, count int, leafFile string) ([]string, error)
ReadFile(volume string, path string, offset int64, buf []byte, verifier *BitrotVerifier) (n int64, err error) ReadFile(volume string, path string, offset int64, buf []byte, verifier *BitrotVerifier) (n int64, err error)

View File

@ -330,6 +330,43 @@ func (client *storageRESTClient) ReadFile(volume, path string, offset int64, buf
return int64(n), err return int64(n), err
} }
func (client *storageRESTClient) Walk(volume, dirPath, marker string, recursive bool, leafFile string,
readMetadataFn readMetadataFunc, endWalkCh chan struct{}) (chan FileInfo, error) {
values := make(url.Values)
values.Set(storageRESTVolume, volume)
values.Set(storageRESTDirPath, dirPath)
values.Set(storageRESTMarkerPath, marker)
values.Set(storageRESTRecursive, strconv.FormatBool(recursive))
values.Set(storageRESTLeafFile, leafFile)
respBody, err := client.call(storageRESTMethodWalk, values, nil, -1)
if err != nil {
return nil, err
}
ch := make(chan FileInfo)
go func() {
defer close(ch)
defer http.DrainBody(respBody)
decoder := gob.NewDecoder(respBody)
for {
var fi FileInfo
if gerr := decoder.Decode(&fi); gerr != nil {
// Upon error return
return
}
select {
case ch <- fi:
case <-endWalkCh:
return
}
}
}()
return ch, nil
}
// ListDir - lists a directory. // ListDir - lists a directory.
func (client *storageRESTClient) ListDir(volume, dirPath string, count int, leafFile string) (entries []string, err error) { func (client *storageRESTClient) ListDir(volume, dirPath string, count int, leafFile string) (entries []string, err error) {
values := make(url.Values) values := make(url.Values)

View File

@ -16,7 +16,7 @@
package cmd package cmd
const storageRESTVersion = "v5" const storageRESTVersion = "v6"
const storageRESTPath = minioReservedBucketPath + "/storage/" + storageRESTVersion + "/" const storageRESTPath = minioReservedBucketPath + "/storage/" + storageRESTVersion + "/"
const ( const (
@ -34,6 +34,7 @@ const (
storageRESTMethodReadFile = "readfile" storageRESTMethodReadFile = "readfile"
storageRESTMethodReadFileStream = "readfilestream" storageRESTMethodReadFileStream = "readfilestream"
storageRESTMethodListDir = "listdir" storageRESTMethodListDir = "listdir"
storageRESTMethodWalk = "walk"
storageRESTMethodDeleteFile = "deletefile" storageRESTMethodDeleteFile = "deletefile"
storageRESTMethodDeleteFileBulk = "deletefilebulk" storageRESTMethodDeleteFileBulk = "deletefilebulk"
storageRESTMethodRenameFile = "renamefile" storageRESTMethodRenameFile = "renamefile"
@ -51,7 +52,9 @@ const (
storageRESTOffset = "offset" storageRESTOffset = "offset"
storageRESTLength = "length" storageRESTLength = "length"
storageRESTCount = "count" storageRESTCount = "count"
storageRESTMarkerPath = "marker"
storageRESTLeafFile = "leaf-file" storageRESTLeafFile = "leaf-file"
storageRESTRecursive = "recursive"
storageRESTBitrotAlgo = "bitrot-algo" storageRESTBitrotAlgo = "bitrot-algo"
storageRESTBitrotHash = "bitrot-hash" storageRESTBitrotHash = "bitrot-hash"
storageRESTInstanceID = "instance-id" storageRESTInstanceID = "instance-id"

View File

@ -17,6 +17,7 @@
package cmd package cmd
import ( import (
"context"
"encoding/gob" "encoding/gob"
"encoding/hex" "encoding/hex"
"errors" "errors"
@ -369,6 +370,57 @@ func (s *storageRESTServer) ReadFileStreamHandler(w http.ResponseWriter, r *http
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
} }
// readMetadata func provides the function types for reading leaf metadata.
type readMetadataFunc func(buf []byte, volume, entry string) FileInfo
func readMetadata(buf []byte, volume, entry string) FileInfo {
m, err := xlMetaV1UnmarshalJSON(context.Background(), buf)
if err != nil {
return FileInfo{}
}
return FileInfo{
Volume: volume,
Name: entry,
ModTime: m.Stat.ModTime,
Size: m.Stat.Size,
Metadata: m.Meta,
Parts: m.Parts,
Quorum: m.Erasure.DataBlocks,
}
}
// WalkHandler - remote caller to start walking at a requested directory path.
func (s *storageRESTServer) WalkHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
vars := mux.Vars(r)
volume := vars[storageRESTVolume]
dirPath := vars[storageRESTDirPath]
markerPath := vars[storageRESTMarkerPath]
recursive, err := strconv.ParseBool(vars[storageRESTRecursive])
if err != nil {
s.writeErrorResponse(w, err)
return
}
leafFile := vars[storageRESTLeafFile]
endWalkCh := make(chan struct{})
defer close(endWalkCh)
fch, err := s.storage.Walk(volume, dirPath, markerPath, recursive, leafFile, readMetadata, endWalkCh)
if err != nil {
s.writeErrorResponse(w, err)
return
}
defer w.(http.Flusher).Flush()
encoder := gob.NewEncoder(w)
for fi := range fch {
encoder.Encode(&fi)
}
}
// ListDirHandler - list a directory. // ListDirHandler - list a directory.
func (s *storageRESTServer) ListDirHandler(w http.ResponseWriter, r *http.Request) { func (s *storageRESTServer) ListDirHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) { if !s.IsValid(w, r) {
@ -479,6 +531,8 @@ func registerStorageRESTHandlers(router *mux.Router, endpoints EndpointList) {
Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTOffset, storageRESTLength)...) Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTOffset, storageRESTLength)...)
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodListDir).HandlerFunc(httpTraceHdrs(server.ListDirHandler)). subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodListDir).HandlerFunc(httpTraceHdrs(server.ListDirHandler)).
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTCount, storageRESTLeafFile)...) Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTCount, storageRESTLeafFile)...)
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodWalk).HandlerFunc(httpTraceHdrs(server.WalkHandler)).
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTMarkerPath, storageRESTRecursive, storageRESTLeafFile)...)
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodDeleteFile).HandlerFunc(httpTraceHdrs(server.DeleteFileHandler)). subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodDeleteFile).HandlerFunc(httpTraceHdrs(server.DeleteFileHandler)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodDeleteFileBulk).HandlerFunc(httpTraceHdrs(server.DeleteFileBulkHandler)). subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodDeleteFileBulk).HandlerFunc(httpTraceHdrs(server.DeleteFileBulkHandler)).

View File

@ -18,6 +18,7 @@ package cmd
import ( import (
"errors" "errors"
"reflect"
"sync" "sync"
"time" "time"
) )
@ -127,20 +128,23 @@ func (t TreeWalkPool) Set(params listParams, resultCh chan TreeWalkResult, endWa
t.lock.Lock() t.lock.Lock()
walks, ok := t.pool[params] walks, ok := t.pool[params]
if ok { if ok {
// Trick of filtering without allocating
// https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
nwalks := walks[:0]
// Look for walkInfo, remove it from the walks list. // Look for walkInfo, remove it from the walks list.
for i, walk := range walks { for _, walk := range walks {
if walk == walkInfo { if !reflect.DeepEqual(walk, walkInfo) {
walks = append(walks[:i], walks[i+1:]...) nwalks = append(nwalks, walk)
} }
} }
if len(walks) == 0 { if len(nwalks) == 0 {
// No more treeWalk go-routines associated with listParams // No more treeWalk go-routines associated with listParams
// hence remove map entry. // hence remove map entry.
delete(t.pool, params) delete(t.pool, params)
} else { } else {
// There are more treeWalk go-routines associated with listParams // There are more treeWalk go-routines associated with listParams
// hence save the list in the map. // hence save the list in the map.
t.pool[params] = walks t.pool[params] = nwalks
} }
} }
// Signal the treeWalk go-routine to die. // Signal the treeWalk go-routine to die.

View File

@ -23,7 +23,6 @@ import (
"io" "io"
"net/http" "net/http"
"sort" "sort"
"strings"
"sync" "sync"
"time" "time"
@ -76,8 +75,8 @@ type xlSets struct {
// Distribution algorithm of choice. // Distribution algorithm of choice.
distributionAlgo string distributionAlgo string
// Pack level listObjects pool management. // Merge tree walk
listPool *TreeWalkPool pool *MergeWalkPool
} }
// isConnected - checks if the endpoint is connected or not. // isConnected - checks if the endpoint is connected or not.
@ -270,7 +269,7 @@ func newXLSets(endpoints EndpointList, format *formatXLV3, setCount int, drivesP
format: format, format: format,
disksConnectDoneCh: make(chan struct{}), disksConnectDoneCh: make(chan struct{}),
distributionAlgo: format.XL.DistributionAlgo, distributionAlgo: format.XL.DistributionAlgo,
listPool: NewTreeWalkPool(globalLookupTimeout), pool: NewMergeWalkPool(globalMergeLookupTimeout),
} }
mutex := newNSLock(globalIsDistXL) mutex := newNSLock(globalIsDistXL)
@ -698,7 +697,6 @@ func (s *xlSets) CopyObject(ctx context.Context, srcBucket, srcObject, destBucke
} }
// Returns function "listDir" of the type listDirFunc. // Returns function "listDir" of the type listDirFunc.
// isLeaf - is used by listDir function to check if an entry is a leaf or non-leaf entry.
// disks - used for doing disk.ListDir(). Sets passes set of disks. // disks - used for doing disk.ListDir(). Sets passes set of disks.
func listDirSetsFactory(ctx context.Context, sets ...*xlObjects) ListDirFunc { func listDirSetsFactory(ctx context.Context, sets ...*xlObjects) ListDirFunc {
listDirInternal := func(bucket, prefixDir, prefixEntry string, disks []StorageAPI) (mergedEntries []string) { listDirInternal := func(bucket, prefixDir, prefixEntry string, disks []StorageAPI) (mergedEntries []string) {
@ -765,23 +763,240 @@ func listDirSetsFactory(ctx context.Context, sets ...*xlObjects) ListDirFunc {
return listDir return listDir
} }
// ListObjects - implements listing of objects across sets, each set is independently // FileInfoCh - file info channel
// listed and subsequently merge lexically sorted inside listDirSetsFactory(). Resulting type FileInfoCh struct {
// value through the walk channel receives the data properly lexically sorted. Ch chan FileInfo
func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { Prev FileInfo
listDir := listDirSetsFactory(ctx, s.sets...) Valid bool
}
var getObjectInfoDirs []func(context.Context, string, string) (ObjectInfo, error) // Pop - pops a cached entry if any, or from the cached channel.
// Verify prefixes in all sets. func (f *FileInfoCh) Pop() (fi FileInfo, ok bool) {
if f.Valid {
f.Valid = false
return f.Prev, true
} // No cached entries found, read from channel
f.Prev, ok = <-f.Ch
return f.Prev, ok
}
// Push - cache an entry, for Pop() later.
func (f *FileInfoCh) Push(fi FileInfo) {
f.Prev = fi
f.Valid = true
}
// Calculate least entry across multiple FileInfo channels, additionally
// returns a boolean to indicate if the caller needs to call again.
func leastEntry(entriesCh []FileInfoCh, readQuorum int) (FileInfo, bool) {
var entriesValid = make([]bool, len(entriesCh))
var entries = make([]FileInfo, len(entriesCh))
for i := range entriesCh {
entries[i], entriesValid[i] = entriesCh[i].Pop()
}
var isTruncated = false
for _, valid := range entriesValid {
if !valid {
continue
}
isTruncated = true
break
}
var lentry FileInfo
var found bool
for i, valid := range entriesValid {
if !valid {
continue
}
if !found {
lentry = entries[i]
found = true
continue
}
if entries[i].Name < lentry.Name {
lentry = entries[i]
}
}
// We haven't been able to find any least entry,
// this would mean that we don't have valid.
if !found {
return lentry, isTruncated
}
leastEntryCount := 0
for i, valid := range entriesValid {
if !valid {
continue
}
// Entries are duplicated across disks,
// we should simply skip such entries.
if lentry.Name == entries[i].Name && lentry.ModTime.Equal(entries[i].ModTime) {
leastEntryCount++
continue
}
// Push all entries which are lexically higher
// and will be returned later in Pop()
entriesCh[i].Push(entries[i])
}
quorum := lentry.Quorum
if quorum == 0 {
quorum = readQuorum
}
if leastEntryCount >= quorum {
return lentry, isTruncated
}
return leastEntry(entriesCh, readQuorum)
}
// mergeEntriesCh - merges FileInfo channel to entries upto maxKeys.
func mergeEntriesCh(entriesCh []FileInfoCh, maxKeys int, readQuorum int) (entries FilesInfo) {
for i := 0; i < maxKeys; {
var fi FileInfo
fi, entries.IsTruncated = leastEntry(entriesCh, readQuorum)
if !entries.IsTruncated {
break
}
entries.Files = append(entries.Files, fi)
i++
}
return entries
}
// Starts a walk channel across all disks and returns a slice.
func (s *xlSets) startMergeWalks(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh chan struct{}) []FileInfoCh {
var entryChs []FileInfoCh
for _, set := range s.sets { for _, set := range s.sets {
getObjectInfoDirs = append(getObjectInfoDirs, set.getObjectInfoDir) for _, disk := range set.getDisks() {
if disk == nil {
// Disk can be offline
continue
}
entryCh, err := disk.Walk(bucket, prefix, marker, recursive, xlMetaJSONFile, readMetadata, endWalkCh)
if err != nil {
// Disk walk returned error, ignore it.
continue
}
entryChs = append(entryChs, FileInfoCh{
Ch: entryCh,
})
}
}
return entryChs
}
// ListObjects - implements listing of objects across disks, each disk is indepenently
// walked and merged at this layer. Resulting value through the merge process sends
// the data in lexically sorted order.
func (s *xlSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) {
if err = checkListObjsArgs(ctx, bucket, prefix, marker, delimiter, s); err != nil {
return loi, err
} }
var getObjectInfo = func(ctx context.Context, bucket string, entry string) (ObjectInfo, error) { // Marker is set validate pre-condition.
return s.getHashedSet(entry).getObjectInfo(ctx, bucket, entry) if marker != "" {
// Marker not common with prefix is not implemented. Send an empty response
if !hasPrefix(marker, prefix) {
return loi, nil
}
} }
return listObjects(ctx, s, bucket, prefix, marker, delimiter, maxKeys, s.listPool, listDir, getObjectInfo, getObjectInfoDirs...) // With max keys of zero we have reached eof, return right here.
if maxKeys == 0 {
return loi, nil
}
// For delimiter and prefix as '/' we do not list anything at all
// since according to s3 spec we stop at the 'delimiter'
// along // with the prefix. On a flat namespace with 'prefix'
// as '/' we don't have any entries, since all the keys are
// of form 'keyName/...'
if delimiter == slashSeparator && prefix == slashSeparator {
return loi, nil
}
// Over flowing count - reset to maxObjectList.
if maxKeys < 0 || maxKeys > maxObjectList {
maxKeys = maxObjectList
}
// Default is recursive, if delimiter is set then list non recursive.
recursive := true
if delimiter == slashSeparator {
recursive = false
}
entryChs, endWalkCh := s.pool.Release(listParams{bucket, recursive, marker, prefix})
if entryChs == nil {
endWalkCh = make(chan struct{})
entryChs = s.startMergeWalks(context.Background(), bucket, prefix, marker, recursive, endWalkCh)
}
entries := mergeEntriesCh(entryChs, maxKeys, s.drivesPerSet/2)
if len(entries.Files) == 0 {
return loi, nil
}
loi.Objects = make([]ObjectInfo, len(entries.Files))
loi.IsTruncated = entries.IsTruncated
if loi.IsTruncated {
loi.NextMarker = entries.Files[len(entries.Files)-1].Name
}
for _, entry := range entries.Files {
var objInfo ObjectInfo
if hasSuffix(entry.Name, slashSeparator) {
if !recursive {
loi.Prefixes = append(loi.Prefixes, entry.Name)
continue
}
objInfo = ObjectInfo{
Bucket: bucket,
Name: entry.Name,
IsDir: true,
}
} else {
objInfo = ObjectInfo{
IsDir: false,
Bucket: bucket,
Name: entry.Name,
ModTime: entry.ModTime,
Size: entry.Size,
ContentType: entry.Metadata["content-type"],
ContentEncoding: entry.Metadata["content-encoding"],
}
// Extract etag from metadata.
objInfo.ETag = extractETag(entry.Metadata)
// All the parts per object.
objInfo.Parts = entry.Parts
// etag/md5Sum has already been extracted. We need to
// remove to avoid it from appearing as part of
// response headers. e.g, X-Minio-* or X-Amz-*.
objInfo.UserDefined = cleanMetadata(entry.Metadata)
// Update storage class
if sc, ok := entry.Metadata[amzStorageClass]; ok {
objInfo.StorageClass = sc
} else {
objInfo.StorageClass = globalMinioDefaultStorageClass
}
}
loi.Objects = append(loi.Objects, objInfo)
}
if loi.IsTruncated {
s.pool.Set(listParams{bucket, recursive, loi.NextMarker, prefix}, entryChs, endWalkCh)
}
return loi, nil
} }
func (s *xlSets) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) { func (s *xlSets) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) {
@ -1301,7 +1516,7 @@ func (s *xlSets) HealObjects(ctx context.Context, bucket, prefix string, healObj
if !ok { if !ok {
break break
} }
if err := healObjectFn(bucket, strings.TrimSuffix(walkResult.entry, slashSeparator+xlMetaJSONFile)); err != nil { if err := healObjectFn(bucket, walkResult.entry); err != nil {
return toObjectErr(err, bucket, walkResult.entry) return toObjectErr(err, bucket, walkResult.entry)
} }
if walkResult.end { if walkResult.end {