mirror of https://github.com/minio/minio.git
Add a generic Walk()'er to list a bucket, optinally prefix (#9026)
This generic Walk() is used by likes of Lifecyle, or KMS to rotate keys or any other functionality which relies on this functionality.
This commit is contained in:
parent
ece0d4ac53
commit
23a8411732
|
@ -133,25 +133,35 @@ func lifecycleRound(ctx context.Context, objAPI ObjectLayer) error {
|
|||
}
|
||||
commonPrefix := lcp(prefixes)
|
||||
|
||||
// List all objects and calculate lifecycle action based on object name & object modtime
|
||||
marker := ""
|
||||
// Allocate new results channel to receive ObjectInfo.
|
||||
objInfoCh := make(chan ObjectInfo)
|
||||
|
||||
// Walk through all objects
|
||||
if err := objAPI.Walk(ctx, bucket.Name, commonPrefix, objInfoCh); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
res, err := objAPI.ListObjects(ctx, bucket.Name, commonPrefix, marker, "", maxObjectList)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
var objects []string
|
||||
for _, obj := range res.Objects {
|
||||
for obj := range objInfoCh {
|
||||
if len(objects) == maxObjectList {
|
||||
// Reached maximum delete requests, attempt a delete for now.
|
||||
break
|
||||
}
|
||||
|
||||
// Find the action that need to be executed
|
||||
action := l.ComputeAction(obj.Name, obj.UserTags, obj.ModTime)
|
||||
switch action {
|
||||
case lifecycle.DeleteAction:
|
||||
if l.ComputeAction(obj.Name, obj.UserTags, obj.ModTime) == lifecycle.DeleteAction {
|
||||
objects = append(objects, obj.Name)
|
||||
default:
|
||||
// Do nothing, for now.
|
||||
}
|
||||
}
|
||||
|
||||
// Nothing to do.
|
||||
if len(objects) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
waitForLowHTTPReq(int32(globalEndpoints.Nodes()))
|
||||
|
||||
// Deletes a list of objects.
|
||||
deleteErrs, err := objAPI.DeleteObjects(ctx, bucket.Name, objects)
|
||||
if err != nil {
|
||||
|
@ -173,12 +183,6 @@ func lifecycleRound(ctx context.Context, objAPI ObjectLayer) error {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
if !res.IsTruncated {
|
||||
// We are done here, proceed to next bucket.
|
||||
break
|
||||
}
|
||||
marker = res.NextMarker
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1246,6 +1246,15 @@ func (fs *FSObjects) HealBucket(ctx context.Context, bucket string, dryRun, remo
|
|||
return madmin.HealResultItem{}, NotImplemented{}
|
||||
}
|
||||
|
||||
// Walk a bucket, optionally prefix recursively, until we have returned
|
||||
// all the content to objectInfo channel, it is callers responsibility
|
||||
// to allocate a receive channel for ObjectInfo, upon any unhandled
|
||||
// error walker returns error. Optionally if context.Done() is received
|
||||
// then Walk() stops the walker.
|
||||
func (fs *FSObjects) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error {
|
||||
return fsWalk(ctx, fs, bucket, prefix, fs.listDirFactory(), results, fs.getObjectInfo, fs.getObjectInfo)
|
||||
}
|
||||
|
||||
// HealObjects - no-op for fs. Valid only for XL.
|
||||
func (fs *FSObjects) HealObjects(ctx context.Context, bucket, prefix string, fn healObjectFn) (e error) {
|
||||
logger.LogIf(ctx, NotImplemented{})
|
||||
|
|
|
@ -176,6 +176,11 @@ func (a GatewayUnsupported) ListObjectsV2(ctx context.Context, bucket, prefix, c
|
|||
return result, NotImplemented{}
|
||||
}
|
||||
|
||||
// Walk - Not implemented stub
|
||||
func (a GatewayUnsupported) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error {
|
||||
return NotImplemented{}
|
||||
}
|
||||
|
||||
// HealObjects - Not implemented stub
|
||||
func (a GatewayUnsupported) HealObjects(ctx context.Context, bucket, prefix string, fn healObjectFn) (e error) {
|
||||
return NotImplemented{}
|
||||
|
|
|
@ -147,12 +147,6 @@ func (client *lockRESTClient) Expired(args dsync.LockArgs) (expired bool, err er
|
|||
return client.restCall(lockRESTMethodExpired, args)
|
||||
}
|
||||
|
||||
func closeLockers(lockers []dsync.NetLocker) {
|
||||
for _, locker := range lockers {
|
||||
locker.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func newLockAPI(endpoint Endpoint) dsync.NetLocker {
|
||||
if endpoint.IsLocal {
|
||||
return globalLockServers[endpoint]
|
||||
|
|
|
@ -125,7 +125,7 @@ func (d *naughtyDisk) DeleteVol(volume string) (err error) {
|
|||
return d.disk.DeleteVol(volume)
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) Walk(volume, path, marker string, recursive bool, leafFile string, readMetadataFn readMetadataFunc, endWalkCh chan struct{}) (chan FileInfo, error) {
|
||||
func (d *naughtyDisk) Walk(volume, path, marker string, recursive bool, leafFile string, readMetadataFn readMetadataFunc, endWalkCh <-chan struct{}) (chan FileInfo, error) {
|
||||
if err := d.calcError(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -313,12 +313,65 @@ func listObjectsNonSlash(ctx context.Context, bucket, prefix, marker, delimiter
|
|||
return result, nil
|
||||
}
|
||||
|
||||
// Walk a bucket, optionally prefix recursively, until we have returned
|
||||
// all the content to objectInfo channel, it is callers responsibility
|
||||
// to allocate a receive channel for ObjectInfo, upon any unhandled
|
||||
// error walker returns error. Optionally if context.Done() is received
|
||||
// then Walk() stops the walker.
|
||||
func fsWalk(ctx context.Context, obj ObjectLayer, bucket, prefix string, listDir ListDirFunc, results chan<- ObjectInfo, getObjInfo func(context.Context, string, string) (ObjectInfo, error), getObjectInfoDirs ...func(context.Context, string, string) (ObjectInfo, error)) error {
|
||||
if err := checkListObjsArgs(ctx, bucket, prefix, "", obj); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
walkResultCh := startTreeWalk(ctx, bucket, prefix, "", true, listDir, ctx.Done())
|
||||
|
||||
go func() {
|
||||
defer close(results)
|
||||
|
||||
for {
|
||||
walkResult, ok := <-walkResultCh
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
|
||||
var objInfo ObjectInfo
|
||||
var err error
|
||||
if HasSuffix(walkResult.entry, SlashSeparator) {
|
||||
for _, getObjectInfoDir := range getObjectInfoDirs {
|
||||
objInfo, err = getObjectInfoDir(ctx, bucket, walkResult.entry)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
if err == errFileNotFound {
|
||||
err = nil
|
||||
objInfo = ObjectInfo{
|
||||
Bucket: bucket,
|
||||
Name: walkResult.entry,
|
||||
IsDir: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
objInfo, err = getObjInfo(ctx, bucket, walkResult.entry)
|
||||
}
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
results <- objInfo
|
||||
if walkResult.end {
|
||||
break
|
||||
}
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func listObjects(ctx context.Context, obj ObjectLayer, bucket, prefix, marker, delimiter string, maxKeys int, tpool *TreeWalkPool, listDir ListDirFunc, getObjInfo func(context.Context, string, string) (ObjectInfo, error), getObjectInfoDirs ...func(context.Context, string, string) (ObjectInfo, error)) (loi ListObjectsInfo, err error) {
|
||||
if delimiter != SlashSeparator && delimiter != "" {
|
||||
return listObjectsNonSlash(ctx, bucket, prefix, marker, delimiter, maxKeys, tpool, listDir, getObjInfo, getObjectInfoDirs...)
|
||||
}
|
||||
|
||||
if err := checkListObjsArgs(ctx, bucket, prefix, marker, delimiter, obj); err != nil {
|
||||
if err := checkListObjsArgs(ctx, bucket, prefix, marker, obj); err != nil {
|
||||
return loi, err
|
||||
}
|
||||
|
||||
|
|
|
@ -54,7 +54,7 @@ func checkBucketAndObjectNames(ctx context.Context, bucket, object string) error
|
|||
}
|
||||
|
||||
// Checks for all ListObjects arguments validity.
|
||||
func checkListObjsArgs(ctx context.Context, bucket, prefix, marker, delimiter string, obj ObjectLayer) error {
|
||||
func checkListObjsArgs(ctx context.Context, bucket, prefix, marker string, obj ObjectLayer) error {
|
||||
// Verify if bucket exists before validating object name.
|
||||
// This is done on purpose since the order of errors is
|
||||
// important here bucket does not exist error should
|
||||
|
@ -90,7 +90,7 @@ func checkListObjsArgs(ctx context.Context, bucket, prefix, marker, delimiter st
|
|||
|
||||
// Checks for all ListMultipartUploads arguments validity.
|
||||
func checkListMultipartArgs(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, obj ObjectLayer) error {
|
||||
if err := checkListObjsArgs(ctx, bucket, prefix, keyMarker, delimiter, obj); err != nil {
|
||||
if err := checkListObjsArgs(ctx, bucket, prefix, keyMarker, obj); err != nil {
|
||||
return err
|
||||
}
|
||||
if uploadIDMarker != "" {
|
||||
|
|
|
@ -69,6 +69,7 @@ type ObjectLayer interface {
|
|||
DeleteBucket(ctx context.Context, bucket string) error
|
||||
ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error)
|
||||
ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error)
|
||||
Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error
|
||||
|
||||
// Object operations.
|
||||
|
||||
|
@ -101,6 +102,7 @@ type ObjectLayer interface {
|
|||
HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error)
|
||||
HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (madmin.HealResultItem, error)
|
||||
HealObject(ctx context.Context, bucket, object string, dryRun, remove bool, scanMode madmin.HealScanMode) (madmin.HealResultItem, error)
|
||||
|
||||
HealObjects(ctx context.Context, bucket, prefix string, fn healObjectFn) error
|
||||
|
||||
ListBucketsHeal(ctx context.Context) (buckets []BucketInfo, err error)
|
||||
|
|
|
@ -1896,6 +1896,9 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
|
|||
return
|
||||
}
|
||||
|
||||
// To detect if the client has disconnected.
|
||||
r.Body = &detectDisconnect{r.Body, r.Context().Done()}
|
||||
|
||||
// X-Amz-Copy-Source shouldn't be set for this call.
|
||||
if _, ok := r.Header[xhttp.AmzCopySource]; ok {
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopySource), r.URL, guessIsBrowserReq(r))
|
||||
|
|
|
@ -109,7 +109,7 @@ func (p *posixDiskIDCheck) DeleteVol(volume string) (err error) {
|
|||
return p.storage.DeleteVol(volume)
|
||||
}
|
||||
|
||||
func (p *posixDiskIDCheck) Walk(volume, dirPath string, marker string, recursive bool, leafFile string, readMetadataFn readMetadataFunc, endWalkCh chan struct{}) (chan FileInfo, error) {
|
||||
func (p *posixDiskIDCheck) Walk(volume, dirPath string, marker string, recursive bool, leafFile string, readMetadataFn readMetadataFunc, endWalkCh <-chan struct{}) (chan FileInfo, error) {
|
||||
if p.isDiskStale() {
|
||||
return nil, errDiskNotFound
|
||||
}
|
||||
|
|
|
@ -643,7 +643,7 @@ func (s *posix) DeleteVol(volume string) (err error) {
|
|||
// Walk - is a sorted walker which returns file entries in lexically
|
||||
// sorted order, additionally along with metadata about each of those entries.
|
||||
func (s *posix) Walk(volume, dirPath, marker string, recursive bool, leafFile string,
|
||||
readMetadataFn readMetadataFunc, endWalkCh chan struct{}) (ch chan FileInfo, err error) {
|
||||
readMetadataFn readMetadataFunc, endWalkCh <-chan struct{}) (ch chan FileInfo, err error) {
|
||||
|
||||
atomic.AddInt32(&s.activeIOCount, 1)
|
||||
defer func() {
|
||||
|
|
|
@ -19,6 +19,8 @@ package cmd
|
|||
import (
|
||||
"os"
|
||||
"time"
|
||||
|
||||
xhttp "github.com/minio/minio/cmd/http"
|
||||
)
|
||||
|
||||
// VolInfo - represents volume stat information.
|
||||
|
@ -62,3 +64,44 @@ type FileInfo struct {
|
|||
|
||||
Quorum int
|
||||
}
|
||||
|
||||
// ToObjectInfo converts FileInfo into objectInfo.
|
||||
func (entry FileInfo) ToObjectInfo() ObjectInfo {
|
||||
var objInfo ObjectInfo
|
||||
if HasSuffix(entry.Name, SlashSeparator) {
|
||||
objInfo = ObjectInfo{
|
||||
Bucket: entry.Volume,
|
||||
Name: entry.Name,
|
||||
IsDir: true,
|
||||
}
|
||||
} else {
|
||||
objInfo = ObjectInfo{
|
||||
IsDir: false,
|
||||
Bucket: entry.Volume,
|
||||
Name: entry.Name,
|
||||
ModTime: entry.ModTime,
|
||||
Size: entry.Size,
|
||||
ContentType: entry.Metadata["content-type"],
|
||||
ContentEncoding: entry.Metadata["content-encoding"],
|
||||
}
|
||||
|
||||
// Extract etag from metadata.
|
||||
objInfo.ETag = extractETag(entry.Metadata)
|
||||
|
||||
// All the parts per object.
|
||||
objInfo.Parts = entry.Parts
|
||||
|
||||
// etag/md5Sum has already been extracted. We need to
|
||||
// remove to avoid it from appearing as part of
|
||||
// response headers. e.g, X-Minio-* or X-Amz-*.
|
||||
objInfo.UserDefined = cleanMetadata(entry.Metadata)
|
||||
|
||||
// Update storage class
|
||||
if sc, ok := entry.Metadata[xhttp.AmzStorageClass]; ok {
|
||||
objInfo.StorageClass = sc
|
||||
} else {
|
||||
objInfo.StorageClass = globalMinioDefaultStorageClass
|
||||
}
|
||||
}
|
||||
return objInfo
|
||||
}
|
||||
|
|
|
@ -43,7 +43,7 @@ type StorageAPI interface {
|
|||
|
||||
// Walk in sorted order directly on disk.
|
||||
Walk(volume, dirPath string, marker string, recursive bool, leafFile string,
|
||||
readMetadataFn readMetadataFunc, endWalkCh chan struct{}) (chan FileInfo, error)
|
||||
readMetadataFn readMetadataFunc, endWalkCh <-chan struct{}) (chan FileInfo, error)
|
||||
|
||||
// File operations.
|
||||
ListDir(volume, dirPath string, count int, leafFile string) ([]string, error)
|
||||
|
|
|
@ -336,7 +336,7 @@ func (client *storageRESTClient) ReadFile(volume, path string, offset int64, buf
|
|||
}
|
||||
|
||||
func (client *storageRESTClient) Walk(volume, dirPath, marker string, recursive bool, leafFile string,
|
||||
readMetadataFn readMetadataFunc, endWalkCh chan struct{}) (chan FileInfo, error) {
|
||||
readMetadataFn readMetadataFunc, endWalkCh <-chan struct{}) (chan FileInfo, error) {
|
||||
values := make(url.Values)
|
||||
values.Set(storageRESTVolume, volume)
|
||||
values.Set(storageRESTDirPath, dirPath)
|
||||
|
|
|
@ -431,10 +431,7 @@ func (s *storageRESTServer) WalkHandler(w http.ResponseWriter, r *http.Request)
|
|||
}
|
||||
leafFile := vars[storageRESTLeafFile]
|
||||
|
||||
endWalkCh := make(chan struct{})
|
||||
defer close(endWalkCh)
|
||||
|
||||
fch, err := s.storage.Walk(volume, dirPath, markerPath, recursive, leafFile, readMetadata, endWalkCh)
|
||||
fch, err := s.storage.Walk(volume, dirPath, markerPath, recursive, leafFile, readMetadata, r.Context().Done())
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
|
|
|
@ -59,7 +59,7 @@ func filterMatchingPrefix(entries []string, prefixEntry string) []string {
|
|||
type ListDirFunc func(bucket, prefixDir, prefixEntry string) (entries []string)
|
||||
|
||||
// treeWalk walks directory tree recursively pushing TreeWalkResult into the channel as and when it encounters files.
|
||||
func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, listDir ListDirFunc, resultCh chan TreeWalkResult, endWalkCh chan struct{}, isEnd bool) (totalNum int, treeErr error) {
|
||||
func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, listDir ListDirFunc, resultCh chan TreeWalkResult, endWalkCh <-chan struct{}, isEnd bool) (totalNum int, treeErr error) {
|
||||
// Example:
|
||||
// if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively
|
||||
// called with prefixDir="one/two/three/four/" and marker="five.txt"
|
||||
|
@ -151,7 +151,7 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker
|
|||
}
|
||||
|
||||
// Initiate a new treeWalk in a goroutine.
|
||||
func startTreeWalk(ctx context.Context, bucket, prefix, marker string, recursive bool, listDir ListDirFunc, endWalkCh chan struct{}) chan TreeWalkResult {
|
||||
func startTreeWalk(ctx context.Context, bucket, prefix, marker string, recursive bool, listDir ListDirFunc, endWalkCh <-chan struct{}) chan TreeWalkResult {
|
||||
// Example 1
|
||||
// If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt"
|
||||
// treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt"
|
||||
|
|
|
@ -941,7 +941,7 @@ func isTruncated(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool)
|
|||
}
|
||||
|
||||
// Starts a walk channel across all disks and returns a slice.
|
||||
func (s *xlSets) startMergeWalks(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh chan struct{}) []FileInfoCh {
|
||||
func (s *xlSets) startMergeWalks(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}) []FileInfoCh {
|
||||
var entryChs []FileInfoCh
|
||||
for _, set := range s.sets {
|
||||
for _, disk := range set.getDisks() {
|
||||
|
@ -965,8 +965,8 @@ func (s *xlSets) startMergeWalks(ctx context.Context, bucket, prefix, marker str
|
|||
func (s *xlSets) listObjectsNonSlash(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) {
|
||||
endWalkCh := make(chan struct{})
|
||||
defer close(endWalkCh)
|
||||
recursive := true
|
||||
entryChs := s.startMergeWalks(context.Background(), bucket, prefix, "", recursive, endWalkCh)
|
||||
|
||||
entryChs := s.startMergeWalks(context.Background(), bucket, prefix, "", true, endWalkCh)
|
||||
|
||||
var objInfos []ObjectInfo
|
||||
var eof bool
|
||||
|
@ -1070,7 +1070,7 @@ func (s *xlSets) listObjectsNonSlash(ctx context.Context, bucket, prefix, marker
|
|||
// the data in lexically sorted order.
|
||||
// If partialQuorumOnly is set only objects that does not have full quorum is returned.
|
||||
func (s *xlSets) listObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int, partialQuorumOnly bool) (loi ListObjectsInfo, err error) {
|
||||
if err = checkListObjsArgs(ctx, bucket, prefix, marker, delimiter, s); err != nil {
|
||||
if err = checkListObjsArgs(ctx, bucket, prefix, marker, s); err != nil {
|
||||
return loi, err
|
||||
}
|
||||
|
||||
|
@ -1129,45 +1129,10 @@ func (s *xlSets) listObjects(ctx context.Context, bucket, prefix, marker, delimi
|
|||
}
|
||||
|
||||
for _, entry := range entries.Files {
|
||||
var objInfo ObjectInfo
|
||||
if HasSuffix(entry.Name, SlashSeparator) {
|
||||
if !recursive {
|
||||
loi.Prefixes = append(loi.Prefixes, entry.Name)
|
||||
continue
|
||||
}
|
||||
objInfo = ObjectInfo{
|
||||
Bucket: bucket,
|
||||
Name: entry.Name,
|
||||
IsDir: true,
|
||||
}
|
||||
} else {
|
||||
objInfo = ObjectInfo{
|
||||
IsDir: false,
|
||||
Bucket: bucket,
|
||||
Name: entry.Name,
|
||||
ModTime: entry.ModTime,
|
||||
Size: entry.Size,
|
||||
ContentType: entry.Metadata["content-type"],
|
||||
ContentEncoding: entry.Metadata["content-encoding"],
|
||||
}
|
||||
|
||||
// Extract etag from metadata.
|
||||
objInfo.ETag = extractETag(entry.Metadata)
|
||||
|
||||
// All the parts per object.
|
||||
objInfo.Parts = entry.Parts
|
||||
|
||||
// etag/md5Sum has already been extracted. We need to
|
||||
// remove to avoid it from appearing as part of
|
||||
// response headers. e.g, X-Minio-* or X-Amz-*.
|
||||
objInfo.UserDefined = cleanMetadata(entry.Metadata)
|
||||
|
||||
// Update storage class
|
||||
if sc, ok := entry.Metadata[xhttp.AmzStorageClass]; ok {
|
||||
objInfo.StorageClass = sc
|
||||
} else {
|
||||
objInfo.StorageClass = globalMinioDefaultStorageClass
|
||||
}
|
||||
objInfo := entry.ToObjectInfo()
|
||||
if HasSuffix(objInfo.Name, SlashSeparator) && !recursive {
|
||||
loi.Prefixes = append(loi.Prefixes, entry.Name)
|
||||
continue
|
||||
}
|
||||
loi.Objects = append(loi.Objects, objInfo)
|
||||
}
|
||||
|
@ -1645,14 +1610,48 @@ func (s *xlSets) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) {
|
|||
return listBuckets, nil
|
||||
}
|
||||
|
||||
// Walk a bucket, optionally prefix recursively, until we have returned
|
||||
// all the content to objectInfo channel, it is callers responsibility
|
||||
// to allocate a receive channel for ObjectInfo, upon any unhandled
|
||||
// error walker returns error. Optionally if context.Done() is received
|
||||
// then Walk() stops the walker.
|
||||
func (s *xlSets) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error {
|
||||
if err := checkListObjsArgs(ctx, bucket, prefix, "", s); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
entryChs := s.startMergeWalks(ctx, bucket, prefix, "", true, ctx.Done())
|
||||
|
||||
entriesValid := make([]bool, len(entryChs))
|
||||
entries := make([]FileInfo, len(entryChs))
|
||||
|
||||
go func() {
|
||||
defer close(results)
|
||||
|
||||
for {
|
||||
entry, quorumCount, ok := leastEntry(entryChs, entries, entriesValid)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if quorumCount != s.drivesPerSet {
|
||||
return
|
||||
}
|
||||
|
||||
results <- entry.ToObjectInfo()
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// HealObjects - Heal all objects recursively at a specified prefix, any
|
||||
// dangling objects deleted as well automatically.
|
||||
func (s *xlSets) HealObjects(ctx context.Context, bucket, prefix string, healObject healObjectFn) error {
|
||||
endWalkCh := make(chan struct{})
|
||||
defer close(endWalkCh)
|
||||
|
||||
recursive := true
|
||||
entryChs := s.startMergeWalks(ctx, bucket, prefix, "", recursive, endWalkCh)
|
||||
entryChs := s.startMergeWalks(ctx, bucket, prefix, "", true, endWalkCh)
|
||||
|
||||
entriesValid := make([]bool, len(entryChs))
|
||||
entries := make([]FileInfo, len(entryChs))
|
||||
|
|
|
@ -16,14 +16,26 @@
|
|||
|
||||
package cmd
|
||||
|
||||
import "context"
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
)
|
||||
|
||||
// This is not implemented/needed anymore, look for xl-sets.ListBucketHeal()
|
||||
func (xl xlObjects) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) {
|
||||
return nil, nil
|
||||
logger.LogIf(ctx, NotImplemented{})
|
||||
return nil, NotImplemented{}
|
||||
}
|
||||
|
||||
// This is not implemented/needed anymore, look for xl-sets.HealObjects()
|
||||
func (xl xlObjects) HealObjects(ctx context.Context, bucket, prefix string, fn healObjectFn) (e error) {
|
||||
return nil
|
||||
func (xl xlObjects) HealObjects(ctx context.Context, bucket, prefix string, fn healObjectFn) error {
|
||||
logger.LogIf(ctx, NotImplemented{})
|
||||
return NotImplemented{}
|
||||
}
|
||||
|
||||
// this is not implemented/needed anymore, look for xl-sets.Walk()
|
||||
func (xl xlObjects) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error {
|
||||
logger.LogIf(ctx, NotImplemented{})
|
||||
return NotImplemented{}
|
||||
}
|
||||
|
|
|
@ -144,7 +144,7 @@ func (xl xlObjects) listObjects(ctx context.Context, bucket, prefix, marker, del
|
|||
|
||||
// ListObjects - list all objects at prefix, delimited by '/'.
|
||||
func (xl xlObjects) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) {
|
||||
if err := checkListObjsArgs(ctx, bucket, prefix, marker, delimiter, xl); err != nil {
|
||||
if err := checkListObjsArgs(ctx, bucket, prefix, marker, xl); err != nil {
|
||||
return loi, err
|
||||
}
|
||||
|
||||
|
|
|
@ -74,7 +74,6 @@ func (xl xlObjects) NewNSLock(ctx context.Context, bucket string, objects ...str
|
|||
func (xl xlObjects) Shutdown(ctx context.Context) error {
|
||||
// Add any object layer shutdown activities here.
|
||||
closeStorageDisks(xl.getDisks())
|
||||
closeLockers(xl.getLockers())
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
111
cmd/xl-zones.go
111
cmd/xl-zones.go
|
@ -528,12 +528,12 @@ func (z *xlZones) listObjectsNonSlash(ctx context.Context, bucket, prefix, marke
|
|||
|
||||
var zonesEntryChs [][]FileInfoCh
|
||||
|
||||
recursive := true
|
||||
endWalkCh := make(chan struct{})
|
||||
defer close(endWalkCh)
|
||||
|
||||
for _, zone := range z.zones {
|
||||
endWalkCh := make(chan struct{})
|
||||
defer close(endWalkCh)
|
||||
zonesEntryChs = append(zonesEntryChs,
|
||||
zone.startMergeWalks(ctx, bucket, prefix, "", recursive, endWalkCh))
|
||||
zone.startMergeWalks(ctx, bucket, prefix, "", true, endWalkCh))
|
||||
}
|
||||
|
||||
var objInfos []ObjectInfo
|
||||
|
@ -646,7 +646,7 @@ func (z *xlZones) listObjectsNonSlash(ctx context.Context, bucket, prefix, marke
|
|||
func (z *xlZones) listObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int, heal bool) (ListObjectsInfo, error) {
|
||||
loi := ListObjectsInfo{}
|
||||
|
||||
if err := checkListObjsArgs(ctx, bucket, prefix, marker, delimiter, z); err != nil {
|
||||
if err := checkListObjsArgs(ctx, bucket, prefix, marker, z); err != nil {
|
||||
return loi, err
|
||||
}
|
||||
|
||||
|
@ -717,45 +717,10 @@ func (z *xlZones) listObjects(ctx context.Context, bucket, prefix, marker, delim
|
|||
}
|
||||
|
||||
for _, entry := range entries.Files {
|
||||
var objInfo ObjectInfo
|
||||
if HasSuffix(entry.Name, SlashSeparator) {
|
||||
if !recursive {
|
||||
loi.Prefixes = append(loi.Prefixes, entry.Name)
|
||||
continue
|
||||
}
|
||||
objInfo = ObjectInfo{
|
||||
Bucket: bucket,
|
||||
Name: entry.Name,
|
||||
IsDir: true,
|
||||
}
|
||||
} else {
|
||||
objInfo = ObjectInfo{
|
||||
IsDir: false,
|
||||
Bucket: bucket,
|
||||
Name: entry.Name,
|
||||
ModTime: entry.ModTime,
|
||||
Size: entry.Size,
|
||||
ContentType: entry.Metadata["content-type"],
|
||||
ContentEncoding: entry.Metadata["content-encoding"],
|
||||
}
|
||||
|
||||
// Extract etag from metadata.
|
||||
objInfo.ETag = extractETag(entry.Metadata)
|
||||
|
||||
// All the parts per object.
|
||||
objInfo.Parts = entry.Parts
|
||||
|
||||
// etag/md5Sum has already been extracted. We need to
|
||||
// remove to avoid it from appearing as part of
|
||||
// response headers. e.g, X-Minio-* or X-Amz-*.
|
||||
objInfo.UserDefined = cleanMetadata(entry.Metadata)
|
||||
|
||||
// Update storage class
|
||||
if sc, ok := entry.Metadata[xhttp.AmzStorageClass]; ok {
|
||||
objInfo.StorageClass = sc
|
||||
} else {
|
||||
objInfo.StorageClass = globalMinioDefaultStorageClass
|
||||
}
|
||||
objInfo := entry.ToObjectInfo()
|
||||
if HasSuffix(objInfo.Name, SlashSeparator) && !recursive {
|
||||
loi.Prefixes = append(loi.Prefixes, objInfo.Name)
|
||||
continue
|
||||
}
|
||||
loi.Objects = append(loi.Objects, objInfo)
|
||||
}
|
||||
|
@ -1319,17 +1284,67 @@ func (z *xlZones) HealBucket(ctx context.Context, bucket string, dryRun, remove
|
|||
return r, nil
|
||||
}
|
||||
|
||||
// Walk a bucket, optionally prefix recursively, until we have returned
|
||||
// all the content to objectInfo channel, it is callers responsibility
|
||||
// to allocate a receive channel for ObjectInfo, upon any unhandled
|
||||
// error walker returns error. Optionally if context.Done() is received
|
||||
// then Walk() stops the walker.
|
||||
func (z *xlZones) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error {
|
||||
if err := checkListObjsArgs(ctx, bucket, prefix, "", z); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var zonesEntryChs [][]FileInfoCh
|
||||
|
||||
for _, zone := range z.zones {
|
||||
zonesEntryChs = append(zonesEntryChs,
|
||||
zone.startMergeWalks(ctx, bucket, prefix, "", true, ctx.Done()))
|
||||
}
|
||||
|
||||
var zoneDrivesPerSet []int
|
||||
for _, zone := range z.zones {
|
||||
zoneDrivesPerSet = append(zoneDrivesPerSet, zone.drivesPerSet)
|
||||
}
|
||||
|
||||
var zonesEntriesInfos [][]FileInfo
|
||||
var zonesEntriesValid [][]bool
|
||||
for _, entryChs := range zonesEntryChs {
|
||||
zonesEntriesInfos = append(zonesEntriesInfos, make([]FileInfo, len(entryChs)))
|
||||
zonesEntriesValid = append(zonesEntriesValid, make([]bool, len(entryChs)))
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer close(results)
|
||||
|
||||
for {
|
||||
entry, quorumCount, zoneIndex, ok := leastEntryZone(zonesEntryChs,
|
||||
zonesEntriesInfos, zonesEntriesValid)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if quorumCount != zoneDrivesPerSet[zoneIndex] {
|
||||
continue
|
||||
}
|
||||
|
||||
results <- entry.ToObjectInfo()
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type healObjectFn func(string, string) error
|
||||
|
||||
func (z *xlZones) HealObjects(ctx context.Context, bucket, prefix string, healObject healObjectFn) error {
|
||||
var zonesEntryChs [][]FileInfoCh
|
||||
|
||||
recursive := true
|
||||
endWalkCh := make(chan struct{})
|
||||
defer close(endWalkCh)
|
||||
|
||||
for _, zone := range z.zones {
|
||||
endWalkCh := make(chan struct{})
|
||||
defer close(endWalkCh)
|
||||
zonesEntryChs = append(zonesEntryChs,
|
||||
zone.startMergeWalks(ctx, bucket, prefix, "", recursive, endWalkCh))
|
||||
zone.startMergeWalks(ctx, bucket, prefix, "", true, endWalkCh))
|
||||
}
|
||||
|
||||
var zoneDrivesPerSet []int
|
||||
|
|
|
@ -99,8 +99,11 @@ func (lc Lifecycle) Validate() error {
|
|||
// FilterRuleActions returns the expiration and transition from the object name
|
||||
// after evaluating all rules.
|
||||
func (lc Lifecycle) FilterRuleActions(objName, objTags string) (Expiration, Transition) {
|
||||
if objName == "" {
|
||||
return Expiration{}, Transition{}
|
||||
}
|
||||
for _, rule := range lc.Rules {
|
||||
if strings.ToLower(rule.Status) != "enabled" {
|
||||
if rule.Status == Disabled {
|
||||
continue
|
||||
}
|
||||
tags := rule.Tags()
|
||||
|
@ -121,6 +124,9 @@ func (lc Lifecycle) FilterRuleActions(objName, objTags string) (Expiration, Tran
|
|||
// against the object name and its modification time.
|
||||
func (lc Lifecycle) ComputeAction(objName, objTags string, modTime time.Time) Action {
|
||||
var action = NoneAction
|
||||
if modTime.IsZero() {
|
||||
return action
|
||||
}
|
||||
exp, _ := lc.FilterRuleActions(objName, objTags)
|
||||
if !exp.IsDateNull() {
|
||||
if time.Now().After(exp.Date.Time) {
|
||||
|
|
|
@ -179,6 +179,12 @@ func TestComputeActions(t *testing.T) {
|
|||
objectModTime: time.Now().UTC().Add(-10 * 24 * time.Hour), // Created 10 days ago
|
||||
expectedAction: NoneAction,
|
||||
},
|
||||
// No modTime, should be none-action
|
||||
{
|
||||
inputConfig: `<LifecycleConfiguration><Rule><Filter><Prefix>foodir/</Prefix></Filter><Status>Enabled</Status><Expiration><Days>5</Days></Expiration></Rule></LifecycleConfiguration>`,
|
||||
objectName: "foodir/fooobject",
|
||||
expectedAction: NoneAction,
|
||||
},
|
||||
// Prefix not matched
|
||||
{
|
||||
inputConfig: `<LifecycleConfiguration><Rule><Filter><Prefix>foodir/</Prefix></Filter><Status>Enabled</Status><Expiration><Days>5</Days></Expiration></Rule></LifecycleConfiguration>`,
|
||||
|
|
|
@ -21,11 +21,20 @@ import (
|
|||
"encoding/xml"
|
||||
)
|
||||
|
||||
// Status represents lifecycle configuration status
|
||||
type Status string
|
||||
|
||||
// Supported status types
|
||||
const (
|
||||
Enabled Status = "Enabled"
|
||||
Disabled Status = "Disabled"
|
||||
)
|
||||
|
||||
// Rule - a rule for lifecycle configuration.
|
||||
type Rule struct {
|
||||
XMLName xml.Name `xml:"Rule"`
|
||||
ID string `xml:"ID,omitempty"`
|
||||
Status string `xml:"Status"`
|
||||
Status Status `xml:"Status"`
|
||||
Filter Filter `xml:"Filter,omitempty"`
|
||||
Expiration Expiration `xml:"Expiration,omitempty"`
|
||||
Transition Transition `xml:"Transition,omitempty"`
|
||||
|
@ -58,7 +67,7 @@ func (r Rule) validateStatus() error {
|
|||
}
|
||||
|
||||
// Status must be one of Enabled or Disabled
|
||||
if r.Status != "Enabled" && r.Status != "Disabled" {
|
||||
if r.Status != Enabled && r.Status != Disabled {
|
||||
return errInvalidRuleStatus
|
||||
}
|
||||
return nil
|
||||
|
|
Loading…
Reference in New Issue