minio/cmd/metacache-walk.go

426 lines
12 KiB
Go
Raw Normal View History

// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
2023-04-15 10:25:25 -07:00
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/url"
"runtime/debug"
"sort"
"strconv"
"strings"
xhttp "github.com/minio/minio/internal/http"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
)
// WalkDirOptions provides options for WalkDir operations.
type WalkDirOptions struct {
// Bucket to scanner
Bucket string
// Directory inside the bucket.
BaseDir string
// Do a full recursive scan.
Recursive bool
// ReportNotFound will return errFileNotFound if all disks reports the BaseDir cannot be found.
ReportNotFound bool
// FilterPrefix will only return results with given prefix within folder.
// Should never contain a slash.
FilterPrefix string
// ForwardTo will forward to the given object path.
ForwardTo string
// Limit the number of returned objects if > 0.
Limit int
}
// WalkDir will traverse a directory and return all entries found.
// On success a sorted meta cache stream will be returned.
// Metadata has data stripped, if any.
func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writer) (err error) {
// Verify if volume is valid and it exists.
volumeDir, err := s.getVolDir(opts.Bucket)
if err != nil {
return err
}
// Stat a volume entry.
if err = Access(volumeDir); err != nil {
if osIsNotExist(err) {
return errVolumeNotFound
} else if isSysErrIO(err) {
return errFaultyDisk
}
return err
}
// Use a small block size to start sending quickly
w := newMetacacheWriter(wr, 16<<10)
w.reuseBlocks = true // We are not sharing results, so reuse buffers.
defer w.Close()
out, err := w.stream()
if err != nil {
return err
}
defer close(out)
var objsReturned int
objReturned := func(metadata []byte) {
if opts.Limit <= 0 {
return
}
if m, _, _ := isIndexedMetaV2(metadata); m != nil && !m.AllHidden(true) {
objsReturned++
}
}
// Fast exit track to check if we are listing an object with
// a trailing slash, this will avoid to list the object content.
if HasSuffix(opts.BaseDir, SlashSeparator) {
metadata, err := s.readMetadata(ctx, pathJoin(volumeDir,
opts.BaseDir[:len(opts.BaseDir)-1]+globalDirSuffix,
xlStorageFormatFile))
if err == nil {
// if baseDir is already a directory object, consider it
// as part of the list call, this is AWS S3 specific
// behavior.
out <- metaCacheEntry{
name: opts.BaseDir,
metadata: metadata,
}
objReturned(metadata)
} else {
st, sterr := Lstat(pathJoin(volumeDir, opts.BaseDir, xlStorageFormatFile))
if sterr == nil && st.Mode().IsRegular() {
return errFileNotFound
}
}
}
prefix := opts.FilterPrefix
var scanDir func(path string) error
scanDir = func(current string) error {
// Skip forward, if requested...
2023-04-15 10:25:25 -07:00
var sb bytes.Buffer
forward := ""
if len(opts.ForwardTo) > 0 && strings.HasPrefix(opts.ForwardTo, current) {
forward = strings.TrimPrefix(opts.ForwardTo, current)
// Trim further directories and trailing slash.
if idx := strings.IndexByte(forward, '/'); idx > 0 {
forward = forward[:idx]
}
}
if contextCanceled(ctx) {
return ctx.Err()
}
if opts.Limit > 0 && objsReturned >= opts.Limit {
return nil
}
s.walkMu.Lock()
entries, err := s.ListDir(ctx, opts.Bucket, current, -1)
s.walkMu.Unlock()
if err != nil {
// Folder could have gone away in-between
if err != errVolumeNotFound && err != errFileNotFound {
logger.LogIf(ctx, err)
}
if opts.ReportNotFound && err == errFileNotFound && current == opts.BaseDir {
return errFileNotFound
}
// Forward some errors?
return nil
}
Add local disk health checks (#14447) The main goal of this PR is to solve the situation where disks stop responding to operations. This generally causes an FD build-up and eventually will crash the server. This adds detection of hung disks, where calls on disk get stuck. We add functionality to `xlStorageDiskIDCheck` where it keeps track of the number of concurrent requests on a given disk. A total number of 100 operations are allowed. If this limit is reached we will block (but not reject) new requests, but we will monitor the state of the disk. If no requests have been completed or updated within a 15-second window, we mark the disk as offline. Requests that are blocked will be unblocked and return an error as "faulty disk". New requests will be rejected until the disk is marked OK again. Once a disk has been marked faulty, a check will run every 5 seconds that will attempt to write and read back a file. As long as this fails the disk will remain faulty. To prevent lots of long-running requests to mark the disk faulty we implement a callback feature that allows updating the status as parts of these operations are running. We add a reader and writer wrapper that will update the status of each successful read/write operation. This should allow fine enough granularity that a slow, but still operational disk will not reach 15 seconds where 50 operations have not progressed. Note that errors themselves are not enough to mark a disk faulty. A nil (or io.EOF) error will mark a disk as "good". * Make concurrent disk setting configurable via `_MINIO_DISK_MAX_CONCURRENT`. * de-couple IsOnline() from disk health tracker The purpose of IsOnline() is to ensure that we reconnect the drive only when the "drive" was - disconnected from network we need to validate if the drive is "correct" and is the same drive which belongs to this server. - drive was replaced we have to format it - we support hot swapping of the drives. IsOnline() is not meant for taking the drive offline when it is hung, it is not useful we can let the drive be online instead "return" errors for relevant calls. * return errFaultyDisk for DiskInfo() call Co-authored-by: Harshavardhana <harsha@minio.io> Possible future Improvements: * Unify the REST server and local xlStorageDiskIDCheck. This would also improve stats significantly. * Allow reads/writes to be aborted by the context. * Add usage stats, concurrent count, blocked operations, etc.
2022-03-09 11:38:54 -08:00
diskHealthCheckOK(ctx, err)
if len(entries) == 0 {
return nil
}
dirObjects := make(map[string]struct{})
2023-04-15 10:25:25 -07:00
// Avoid a bunch of cleanup when joining.
current = strings.Trim(current, SlashSeparator)
for i, entry := range entries {
if opts.Limit > 0 && objsReturned >= opts.Limit {
return nil
}
if len(prefix) > 0 && !strings.HasPrefix(entry, prefix) {
// Do not retain the file, since it doesn't
// match the prefix.
entries[i] = ""
continue
}
if len(forward) > 0 && entry < forward {
// Do not retain the file, since its
// lexially smaller than 'forward'
entries[i] = ""
continue
}
2023-04-15 10:25:25 -07:00
if hasSuffixByte(entry, SlashSeparatorChar) {
if strings.HasSuffix(entry, globalDirSuffixWithSlash) {
// Add without extension so it is sorted correctly.
entry = strings.TrimSuffix(entry, globalDirSuffixWithSlash) + slashSeparator
dirObjects[entry] = struct{}{}
entries[i] = entry
continue
}
// Trim slash, since we don't know if this is folder or object.
entries[i] = entries[i][:len(entry)-1]
continue
}
// Do not retain the file.
entries[i] = ""
if contextCanceled(ctx) {
return ctx.Err()
}
// If root was an object return it as such.
if HasSuffix(entry, xlStorageFormatFile) {
var meta metaCacheEntry
s.walkReadMu.Lock()
2023-04-15 10:25:25 -07:00
meta.metadata, err = s.readMetadata(ctx, pathJoinBuf(&sb, volumeDir, current, entry))
s.walkReadMu.Unlock()
Add local disk health checks (#14447) The main goal of this PR is to solve the situation where disks stop responding to operations. This generally causes an FD build-up and eventually will crash the server. This adds detection of hung disks, where calls on disk get stuck. We add functionality to `xlStorageDiskIDCheck` where it keeps track of the number of concurrent requests on a given disk. A total number of 100 operations are allowed. If this limit is reached we will block (but not reject) new requests, but we will monitor the state of the disk. If no requests have been completed or updated within a 15-second window, we mark the disk as offline. Requests that are blocked will be unblocked and return an error as "faulty disk". New requests will be rejected until the disk is marked OK again. Once a disk has been marked faulty, a check will run every 5 seconds that will attempt to write and read back a file. As long as this fails the disk will remain faulty. To prevent lots of long-running requests to mark the disk faulty we implement a callback feature that allows updating the status as parts of these operations are running. We add a reader and writer wrapper that will update the status of each successful read/write operation. This should allow fine enough granularity that a slow, but still operational disk will not reach 15 seconds where 50 operations have not progressed. Note that errors themselves are not enough to mark a disk faulty. A nil (or io.EOF) error will mark a disk as "good". * Make concurrent disk setting configurable via `_MINIO_DISK_MAX_CONCURRENT`. * de-couple IsOnline() from disk health tracker The purpose of IsOnline() is to ensure that we reconnect the drive only when the "drive" was - disconnected from network we need to validate if the drive is "correct" and is the same drive which belongs to this server. - drive was replaced we have to format it - we support hot swapping of the drives. IsOnline() is not meant for taking the drive offline when it is hung, it is not useful we can let the drive be online instead "return" errors for relevant calls. * return errFaultyDisk for DiskInfo() call Co-authored-by: Harshavardhana <harsha@minio.io> Possible future Improvements: * Unify the REST server and local xlStorageDiskIDCheck. This would also improve stats significantly. * Allow reads/writes to be aborted by the context. * Add usage stats, concurrent count, blocked operations, etc.
2022-03-09 11:38:54 -08:00
diskHealthCheckOK(ctx, err)
if err != nil {
// It is totally possible that xl.meta was overwritten
// while being concurrently listed at the same time in
// such scenarios the 'xl.meta' might get truncated
if !IsErrIgnored(err, io.EOF, io.ErrUnexpectedEOF) {
logger.LogIf(ctx, err)
}
continue
}
meta.name = strings.TrimSuffix(entry, xlStorageFormatFile)
meta.name = strings.TrimSuffix(meta.name, SlashSeparator)
2023-04-15 10:25:25 -07:00
meta.name = pathJoinBuf(&sb, current, meta.name)
meta.name = decodeDirObject(meta.name)
objReturned(meta.metadata)
out <- meta
return nil
}
// Check legacy.
if HasSuffix(entry, xlStorageFormatFileV1) {
var meta metaCacheEntry
s.walkReadMu.Lock()
2023-04-15 10:25:25 -07:00
meta.metadata, err = xioutil.ReadFile(pathJoinBuf(&sb, volumeDir, current, entry))
s.walkReadMu.Unlock()
Add local disk health checks (#14447) The main goal of this PR is to solve the situation where disks stop responding to operations. This generally causes an FD build-up and eventually will crash the server. This adds detection of hung disks, where calls on disk get stuck. We add functionality to `xlStorageDiskIDCheck` where it keeps track of the number of concurrent requests on a given disk. A total number of 100 operations are allowed. If this limit is reached we will block (but not reject) new requests, but we will monitor the state of the disk. If no requests have been completed or updated within a 15-second window, we mark the disk as offline. Requests that are blocked will be unblocked and return an error as "faulty disk". New requests will be rejected until the disk is marked OK again. Once a disk has been marked faulty, a check will run every 5 seconds that will attempt to write and read back a file. As long as this fails the disk will remain faulty. To prevent lots of long-running requests to mark the disk faulty we implement a callback feature that allows updating the status as parts of these operations are running. We add a reader and writer wrapper that will update the status of each successful read/write operation. This should allow fine enough granularity that a slow, but still operational disk will not reach 15 seconds where 50 operations have not progressed. Note that errors themselves are not enough to mark a disk faulty. A nil (or io.EOF) error will mark a disk as "good". * Make concurrent disk setting configurable via `_MINIO_DISK_MAX_CONCURRENT`. * de-couple IsOnline() from disk health tracker The purpose of IsOnline() is to ensure that we reconnect the drive only when the "drive" was - disconnected from network we need to validate if the drive is "correct" and is the same drive which belongs to this server. - drive was replaced we have to format it - we support hot swapping of the drives. IsOnline() is not meant for taking the drive offline when it is hung, it is not useful we can let the drive be online instead "return" errors for relevant calls. * return errFaultyDisk for DiskInfo() call Co-authored-by: Harshavardhana <harsha@minio.io> Possible future Improvements: * Unify the REST server and local xlStorageDiskIDCheck. This would also improve stats significantly. * Allow reads/writes to be aborted by the context. * Add usage stats, concurrent count, blocked operations, etc.
2022-03-09 11:38:54 -08:00
diskHealthCheckOK(ctx, err)
if err != nil {
if !IsErrIgnored(err, io.EOF, io.ErrUnexpectedEOF) {
logger.LogIf(ctx, err)
}
continue
}
meta.name = strings.TrimSuffix(entry, xlStorageFormatFileV1)
meta.name = strings.TrimSuffix(meta.name, SlashSeparator)
2023-04-15 10:25:25 -07:00
meta.name = pathJoinBuf(&sb, current, meta.name)
objReturned(meta.metadata)
out <- meta
return nil
}
// Skip all other files.
}
// Process in sort order.
sort.Strings(entries)
dirStack := make([]string, 0, 5)
prefix = "" // Remove prefix after first level as we have already filtered the list.
if len(forward) > 0 {
// Conservative forwarding. Entries may be either objects or prefixes.
for i, entry := range entries {
if entry >= forward || strings.HasPrefix(forward, entry) {
entries = entries[i:]
break
}
}
}
for _, entry := range entries {
if opts.Limit > 0 && objsReturned >= opts.Limit {
return nil
}
if entry == "" {
continue
}
if contextCanceled(ctx) {
return ctx.Err()
}
2023-04-15 10:25:25 -07:00
meta := metaCacheEntry{name: pathJoinBuf(&sb, current, entry)}
// If directory entry on stack before this, pop it now.
for len(dirStack) > 0 && dirStack[len(dirStack)-1] < meta.name {
pop := dirStack[len(dirStack)-1]
out <- metaCacheEntry{name: pop}
if opts.Recursive {
// Scan folder we found. Should be in correct sort order where we are.
err := scanDir(pop)
if err != nil && !IsErrIgnored(err, context.Canceled) {
logger.LogIf(ctx, err)
}
}
dirStack = dirStack[:len(dirStack)-1]
}
// All objects will be returned as directories, there has been no object check yet.
// Check it by attempting to read metadata.
_, isDirObj := dirObjects[entry]
if isDirObj {
meta.name = meta.name[:len(meta.name)-1] + globalDirSuffixWithSlash
}
s.walkReadMu.Lock()
2023-04-15 10:25:25 -07:00
meta.metadata, err = s.readMetadata(ctx, pathJoinBuf(&sb, volumeDir, meta.name, xlStorageFormatFile))
s.walkReadMu.Unlock()
Add local disk health checks (#14447) The main goal of this PR is to solve the situation where disks stop responding to operations. This generally causes an FD build-up and eventually will crash the server. This adds detection of hung disks, where calls on disk get stuck. We add functionality to `xlStorageDiskIDCheck` where it keeps track of the number of concurrent requests on a given disk. A total number of 100 operations are allowed. If this limit is reached we will block (but not reject) new requests, but we will monitor the state of the disk. If no requests have been completed or updated within a 15-second window, we mark the disk as offline. Requests that are blocked will be unblocked and return an error as "faulty disk". New requests will be rejected until the disk is marked OK again. Once a disk has been marked faulty, a check will run every 5 seconds that will attempt to write and read back a file. As long as this fails the disk will remain faulty. To prevent lots of long-running requests to mark the disk faulty we implement a callback feature that allows updating the status as parts of these operations are running. We add a reader and writer wrapper that will update the status of each successful read/write operation. This should allow fine enough granularity that a slow, but still operational disk will not reach 15 seconds where 50 operations have not progressed. Note that errors themselves are not enough to mark a disk faulty. A nil (or io.EOF) error will mark a disk as "good". * Make concurrent disk setting configurable via `_MINIO_DISK_MAX_CONCURRENT`. * de-couple IsOnline() from disk health tracker The purpose of IsOnline() is to ensure that we reconnect the drive only when the "drive" was - disconnected from network we need to validate if the drive is "correct" and is the same drive which belongs to this server. - drive was replaced we have to format it - we support hot swapping of the drives. IsOnline() is not meant for taking the drive offline when it is hung, it is not useful we can let the drive be online instead "return" errors for relevant calls. * return errFaultyDisk for DiskInfo() call Co-authored-by: Harshavardhana <harsha@minio.io> Possible future Improvements: * Unify the REST server and local xlStorageDiskIDCheck. This would also improve stats significantly. * Allow reads/writes to be aborted by the context. * Add usage stats, concurrent count, blocked operations, etc.
2022-03-09 11:38:54 -08:00
diskHealthCheckOK(ctx, err)
switch {
case err == nil:
// It was an object
if isDirObj {
meta.name = strings.TrimSuffix(meta.name, globalDirSuffixWithSlash) + slashSeparator
}
objReturned(meta.metadata)
out <- meta
case osIsNotExist(err), isSysErrIsDir(err):
2023-04-15 10:25:25 -07:00
meta.metadata, err = xioutil.ReadFile(pathJoinBuf(&sb, volumeDir, meta.name, xlStorageFormatFileV1))
Add local disk health checks (#14447) The main goal of this PR is to solve the situation where disks stop responding to operations. This generally causes an FD build-up and eventually will crash the server. This adds detection of hung disks, where calls on disk get stuck. We add functionality to `xlStorageDiskIDCheck` where it keeps track of the number of concurrent requests on a given disk. A total number of 100 operations are allowed. If this limit is reached we will block (but not reject) new requests, but we will monitor the state of the disk. If no requests have been completed or updated within a 15-second window, we mark the disk as offline. Requests that are blocked will be unblocked and return an error as "faulty disk". New requests will be rejected until the disk is marked OK again. Once a disk has been marked faulty, a check will run every 5 seconds that will attempt to write and read back a file. As long as this fails the disk will remain faulty. To prevent lots of long-running requests to mark the disk faulty we implement a callback feature that allows updating the status as parts of these operations are running. We add a reader and writer wrapper that will update the status of each successful read/write operation. This should allow fine enough granularity that a slow, but still operational disk will not reach 15 seconds where 50 operations have not progressed. Note that errors themselves are not enough to mark a disk faulty. A nil (or io.EOF) error will mark a disk as "good". * Make concurrent disk setting configurable via `_MINIO_DISK_MAX_CONCURRENT`. * de-couple IsOnline() from disk health tracker The purpose of IsOnline() is to ensure that we reconnect the drive only when the "drive" was - disconnected from network we need to validate if the drive is "correct" and is the same drive which belongs to this server. - drive was replaced we have to format it - we support hot swapping of the drives. IsOnline() is not meant for taking the drive offline when it is hung, it is not useful we can let the drive be online instead "return" errors for relevant calls. * return errFaultyDisk for DiskInfo() call Co-authored-by: Harshavardhana <harsha@minio.io> Possible future Improvements: * Unify the REST server and local xlStorageDiskIDCheck. This would also improve stats significantly. * Allow reads/writes to be aborted by the context. * Add usage stats, concurrent count, blocked operations, etc.
2022-03-09 11:38:54 -08:00
diskHealthCheckOK(ctx, err)
if err == nil {
// It was an object
objReturned(meta.metadata)
out <- meta
continue
}
// NOT an object, append to stack (with slash)
// If dirObject, but no metadata (which is unexpected) we skip it.
if !isDirObj {
2023-04-15 10:25:25 -07:00
if !isDirEmpty(pathJoinBuf(&sb, volumeDir, meta.name)) {
dirStack = append(dirStack, meta.name+slashSeparator)
}
}
case isSysErrNotDir(err):
// skip
}
}
// If directory entry left on stack, pop it now.
for len(dirStack) > 0 {
if opts.Limit > 0 && objsReturned >= opts.Limit {
return nil
}
if contextCanceled(ctx) {
return ctx.Err()
}
pop := dirStack[len(dirStack)-1]
out <- metaCacheEntry{name: pop}
if opts.Recursive {
// Scan folder we found. Should be in correct sort order where we are.
logger.LogIf(ctx, scanDir(pop))
}
dirStack = dirStack[:len(dirStack)-1]
}
return nil
}
// Stream output.
return scanDir(opts.BaseDir)
}
Add local disk health checks (#14447) The main goal of this PR is to solve the situation where disks stop responding to operations. This generally causes an FD build-up and eventually will crash the server. This adds detection of hung disks, where calls on disk get stuck. We add functionality to `xlStorageDiskIDCheck` where it keeps track of the number of concurrent requests on a given disk. A total number of 100 operations are allowed. If this limit is reached we will block (but not reject) new requests, but we will monitor the state of the disk. If no requests have been completed or updated within a 15-second window, we mark the disk as offline. Requests that are blocked will be unblocked and return an error as "faulty disk". New requests will be rejected until the disk is marked OK again. Once a disk has been marked faulty, a check will run every 5 seconds that will attempt to write and read back a file. As long as this fails the disk will remain faulty. To prevent lots of long-running requests to mark the disk faulty we implement a callback feature that allows updating the status as parts of these operations are running. We add a reader and writer wrapper that will update the status of each successful read/write operation. This should allow fine enough granularity that a slow, but still operational disk will not reach 15 seconds where 50 operations have not progressed. Note that errors themselves are not enough to mark a disk faulty. A nil (or io.EOF) error will mark a disk as "good". * Make concurrent disk setting configurable via `_MINIO_DISK_MAX_CONCURRENT`. * de-couple IsOnline() from disk health tracker The purpose of IsOnline() is to ensure that we reconnect the drive only when the "drive" was - disconnected from network we need to validate if the drive is "correct" and is the same drive which belongs to this server. - drive was replaced we have to format it - we support hot swapping of the drives. IsOnline() is not meant for taking the drive offline when it is hung, it is not useful we can let the drive be online instead "return" errors for relevant calls. * return errFaultyDisk for DiskInfo() call Co-authored-by: Harshavardhana <harsha@minio.io> Possible future Improvements: * Unify the REST server and local xlStorageDiskIDCheck. This would also improve stats significantly. * Allow reads/writes to be aborted by the context. * Add usage stats, concurrent count, blocked operations, etc.
2022-03-09 11:38:54 -08:00
func (p *xlStorageDiskIDCheck) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writer) (err error) {
ctx, done, err := p.TrackDiskHealth(ctx, storageMetricWalkDir, opts.Bucket, opts.BaseDir)
if err != nil {
return err
}
Add local disk health checks (#14447) The main goal of this PR is to solve the situation where disks stop responding to operations. This generally causes an FD build-up and eventually will crash the server. This adds detection of hung disks, where calls on disk get stuck. We add functionality to `xlStorageDiskIDCheck` where it keeps track of the number of concurrent requests on a given disk. A total number of 100 operations are allowed. If this limit is reached we will block (but not reject) new requests, but we will monitor the state of the disk. If no requests have been completed or updated within a 15-second window, we mark the disk as offline. Requests that are blocked will be unblocked and return an error as "faulty disk". New requests will be rejected until the disk is marked OK again. Once a disk has been marked faulty, a check will run every 5 seconds that will attempt to write and read back a file. As long as this fails the disk will remain faulty. To prevent lots of long-running requests to mark the disk faulty we implement a callback feature that allows updating the status as parts of these operations are running. We add a reader and writer wrapper that will update the status of each successful read/write operation. This should allow fine enough granularity that a slow, but still operational disk will not reach 15 seconds where 50 operations have not progressed. Note that errors themselves are not enough to mark a disk faulty. A nil (or io.EOF) error will mark a disk as "good". * Make concurrent disk setting configurable via `_MINIO_DISK_MAX_CONCURRENT`. * de-couple IsOnline() from disk health tracker The purpose of IsOnline() is to ensure that we reconnect the drive only when the "drive" was - disconnected from network we need to validate if the drive is "correct" and is the same drive which belongs to this server. - drive was replaced we have to format it - we support hot swapping of the drives. IsOnline() is not meant for taking the drive offline when it is hung, it is not useful we can let the drive be online instead "return" errors for relevant calls. * return errFaultyDisk for DiskInfo() call Co-authored-by: Harshavardhana <harsha@minio.io> Possible future Improvements: * Unify the REST server and local xlStorageDiskIDCheck. This would also improve stats significantly. * Allow reads/writes to be aborted by the context. * Add usage stats, concurrent count, blocked operations, etc.
2022-03-09 11:38:54 -08:00
defer done(&err)
return p.storage.WalkDir(ctx, opts, wr)
}
// WalkDir will traverse a directory and return all entries found.
// On success a meta cache stream will be returned, that should be closed when done.
func (client *storageRESTClient) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writer) error {
values := make(url.Values)
values.Set(storageRESTVolume, opts.Bucket)
values.Set(storageRESTDirPath, opts.BaseDir)
values.Set(storageRESTRecursive, strconv.FormatBool(opts.Recursive))
values.Set(storageRESTReportNotFound, strconv.FormatBool(opts.ReportNotFound))
values.Set(storageRESTPrefixFilter, opts.FilterPrefix)
values.Set(storageRESTForwardFilter, opts.ForwardTo)
respBody, err := client.call(ctx, storageRESTMethodWalkDir, values, nil, -1)
if err != nil {
logger.LogIf(ctx, err)
return err
}
defer xhttp.DrainBody(respBody)
return waitForHTTPStream(respBody, wr)
}
// WalkDirHandler - remote caller to list files and folders in a requested directory path.
func (s *storageRESTServer) WalkDirHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
volume := r.Form.Get(storageRESTVolume)
dirPath := r.Form.Get(storageRESTDirPath)
recursive, err := strconv.ParseBool(r.Form.Get(storageRESTRecursive))
if err != nil {
s.writeErrorResponse(w, err)
return
}
var reportNotFound bool
if v := r.Form.Get(storageRESTReportNotFound); v != "" {
reportNotFound, err = strconv.ParseBool(v)
if err != nil {
s.writeErrorResponse(w, err)
return
}
}
prefix := r.Form.Get(storageRESTPrefixFilter)
forward := r.Form.Get(storageRESTForwardFilter)
writer := streamHTTPResponse(w)
defer func() {
if r := recover(); r != nil {
debug.PrintStack()
writer.CloseWithError(fmt.Errorf("panic: %v", r))
}
}()
writer.CloseWithError(s.storage.WalkDir(r.Context(), WalkDirOptions{
Bucket: volume,
BaseDir: dirPath,
Recursive: recursive,
ReportNotFound: reportNotFound,
FilterPrefix: prefix,
ForwardTo: forward,
}, writer))
}