From 23a84117320347b068566aba245a8f8914fc87e1 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 25 Feb 2020 21:22:28 +0530 Subject: [PATCH] Add a generic Walk()'er to list a bucket, optinally prefix (#9026) This generic Walk() is used by likes of Lifecyle, or KMS to rotate keys or any other functionality which relies on this functionality. --- cmd/daily-lifecycle-ops.go | 40 +++++---- cmd/fs-v1.go | 9 ++ cmd/gateway-unsupported.go | 5 ++ cmd/lock-rest-client.go | 6 -- cmd/naughty-disk_test.go | 2 +- cmd/object-api-common.go | 55 +++++++++++- cmd/object-api-input-checks.go | 4 +- cmd/object-api-interface.go | 2 + cmd/object-handlers.go | 3 + cmd/posix-diskid-check.go | 2 +- cmd/posix.go | 2 +- cmd/storage-datatypes.go | 43 ++++++++++ cmd/storage-interface.go | 2 +- cmd/storage-rest-client.go | 2 +- cmd/storage-rest-server.go | 5 +- cmd/tree-walk.go | 4 +- cmd/xl-sets.go | 89 ++++++++++---------- cmd/xl-v1-list-objects-heal.go | 20 ++++- cmd/xl-v1-list-objects.go | 2 +- cmd/xl-v1.go | 1 - cmd/xl-zones.go | 111 ++++++++++++++----------- pkg/bucket/lifecycle/lifecycle.go | 8 +- pkg/bucket/lifecycle/lifecycle_test.go | 6 ++ pkg/bucket/lifecycle/rule.go | 13 ++- 24 files changed, 296 insertions(+), 140 deletions(-) diff --git a/cmd/daily-lifecycle-ops.go b/cmd/daily-lifecycle-ops.go index 3202a46f0..23dc1b140 100644 --- a/cmd/daily-lifecycle-ops.go +++ b/cmd/daily-lifecycle-ops.go @@ -133,25 +133,35 @@ func lifecycleRound(ctx context.Context, objAPI ObjectLayer) error { } commonPrefix := lcp(prefixes) - // List all objects and calculate lifecycle action based on object name & object modtime - marker := "" + // Allocate new results channel to receive ObjectInfo. + objInfoCh := make(chan ObjectInfo) + + // Walk through all objects + if err := objAPI.Walk(ctx, bucket.Name, commonPrefix, objInfoCh); err != nil { + return err + } + for { - res, err := objAPI.ListObjects(ctx, bucket.Name, commonPrefix, marker, "", maxObjectList) - if err != nil { - continue - } var objects []string - for _, obj := range res.Objects { + for obj := range objInfoCh { + if len(objects) == maxObjectList { + // Reached maximum delete requests, attempt a delete for now. + break + } + // Find the action that need to be executed - action := l.ComputeAction(obj.Name, obj.UserTags, obj.ModTime) - switch action { - case lifecycle.DeleteAction: + if l.ComputeAction(obj.Name, obj.UserTags, obj.ModTime) == lifecycle.DeleteAction { objects = append(objects, obj.Name) - default: - // Do nothing, for now. } } + // Nothing to do. + if len(objects) == 0 { + break + } + + waitForLowHTTPReq(int32(globalEndpoints.Nodes())) + // Deletes a list of objects. deleteErrs, err := objAPI.DeleteObjects(ctx, bucket.Name, objects) if err != nil { @@ -173,12 +183,6 @@ func lifecycleRound(ctx context.Context, objAPI ObjectLayer) error { }) } } - - if !res.IsTruncated { - // We are done here, proceed to next bucket. - break - } - marker = res.NextMarker } } diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 3774090e4..376db0b3d 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -1246,6 +1246,15 @@ func (fs *FSObjects) HealBucket(ctx context.Context, bucket string, dryRun, remo return madmin.HealResultItem{}, NotImplemented{} } +// Walk a bucket, optionally prefix recursively, until we have returned +// all the content to objectInfo channel, it is callers responsibility +// to allocate a receive channel for ObjectInfo, upon any unhandled +// error walker returns error. Optionally if context.Done() is received +// then Walk() stops the walker. +func (fs *FSObjects) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error { + return fsWalk(ctx, fs, bucket, prefix, fs.listDirFactory(), results, fs.getObjectInfo, fs.getObjectInfo) +} + // HealObjects - no-op for fs. Valid only for XL. func (fs *FSObjects) HealObjects(ctx context.Context, bucket, prefix string, fn healObjectFn) (e error) { logger.LogIf(ctx, NotImplemented{}) diff --git a/cmd/gateway-unsupported.go b/cmd/gateway-unsupported.go index 9ee5ec23f..7b52a2d24 100644 --- a/cmd/gateway-unsupported.go +++ b/cmd/gateway-unsupported.go @@ -176,6 +176,11 @@ func (a GatewayUnsupported) ListObjectsV2(ctx context.Context, bucket, prefix, c return result, NotImplemented{} } +// Walk - Not implemented stub +func (a GatewayUnsupported) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error { + return NotImplemented{} +} + // HealObjects - Not implemented stub func (a GatewayUnsupported) HealObjects(ctx context.Context, bucket, prefix string, fn healObjectFn) (e error) { return NotImplemented{} diff --git a/cmd/lock-rest-client.go b/cmd/lock-rest-client.go index ffe8f9930..3c9d72876 100644 --- a/cmd/lock-rest-client.go +++ b/cmd/lock-rest-client.go @@ -147,12 +147,6 @@ func (client *lockRESTClient) Expired(args dsync.LockArgs) (expired bool, err er return client.restCall(lockRESTMethodExpired, args) } -func closeLockers(lockers []dsync.NetLocker) { - for _, locker := range lockers { - locker.Close() - } -} - func newLockAPI(endpoint Endpoint) dsync.NetLocker { if endpoint.IsLocal { return globalLockServers[endpoint] diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index 5ecd14c89..3dda0b9a2 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -125,7 +125,7 @@ func (d *naughtyDisk) DeleteVol(volume string) (err error) { return d.disk.DeleteVol(volume) } -func (d *naughtyDisk) Walk(volume, path, marker string, recursive bool, leafFile string, readMetadataFn readMetadataFunc, endWalkCh chan struct{}) (chan FileInfo, error) { +func (d *naughtyDisk) Walk(volume, path, marker string, recursive bool, leafFile string, readMetadataFn readMetadataFunc, endWalkCh <-chan struct{}) (chan FileInfo, error) { if err := d.calcError(); err != nil { return nil, err } diff --git a/cmd/object-api-common.go b/cmd/object-api-common.go index 32d0ee24c..b07f60e2c 100644 --- a/cmd/object-api-common.go +++ b/cmd/object-api-common.go @@ -313,12 +313,65 @@ func listObjectsNonSlash(ctx context.Context, bucket, prefix, marker, delimiter return result, nil } +// Walk a bucket, optionally prefix recursively, until we have returned +// all the content to objectInfo channel, it is callers responsibility +// to allocate a receive channel for ObjectInfo, upon any unhandled +// error walker returns error. Optionally if context.Done() is received +// then Walk() stops the walker. +func fsWalk(ctx context.Context, obj ObjectLayer, bucket, prefix string, listDir ListDirFunc, results chan<- ObjectInfo, getObjInfo func(context.Context, string, string) (ObjectInfo, error), getObjectInfoDirs ...func(context.Context, string, string) (ObjectInfo, error)) error { + if err := checkListObjsArgs(ctx, bucket, prefix, "", obj); err != nil { + return err + } + + walkResultCh := startTreeWalk(ctx, bucket, prefix, "", true, listDir, ctx.Done()) + + go func() { + defer close(results) + + for { + walkResult, ok := <-walkResultCh + if !ok { + break + } + + var objInfo ObjectInfo + var err error + if HasSuffix(walkResult.entry, SlashSeparator) { + for _, getObjectInfoDir := range getObjectInfoDirs { + objInfo, err = getObjectInfoDir(ctx, bucket, walkResult.entry) + if err == nil { + break + } + if err == errFileNotFound { + err = nil + objInfo = ObjectInfo{ + Bucket: bucket, + Name: walkResult.entry, + IsDir: true, + } + } + } + } else { + objInfo, err = getObjInfo(ctx, bucket, walkResult.entry) + } + if err != nil { + continue + } + results <- objInfo + if walkResult.end { + break + } + } + }() + return nil +} + func listObjects(ctx context.Context, obj ObjectLayer, bucket, prefix, marker, delimiter string, maxKeys int, tpool *TreeWalkPool, listDir ListDirFunc, getObjInfo func(context.Context, string, string) (ObjectInfo, error), getObjectInfoDirs ...func(context.Context, string, string) (ObjectInfo, error)) (loi ListObjectsInfo, err error) { if delimiter != SlashSeparator && delimiter != "" { return listObjectsNonSlash(ctx, bucket, prefix, marker, delimiter, maxKeys, tpool, listDir, getObjInfo, getObjectInfoDirs...) } - if err := checkListObjsArgs(ctx, bucket, prefix, marker, delimiter, obj); err != nil { + if err := checkListObjsArgs(ctx, bucket, prefix, marker, obj); err != nil { return loi, err } diff --git a/cmd/object-api-input-checks.go b/cmd/object-api-input-checks.go index 8b31211e1..b8430087e 100644 --- a/cmd/object-api-input-checks.go +++ b/cmd/object-api-input-checks.go @@ -54,7 +54,7 @@ func checkBucketAndObjectNames(ctx context.Context, bucket, object string) error } // Checks for all ListObjects arguments validity. -func checkListObjsArgs(ctx context.Context, bucket, prefix, marker, delimiter string, obj ObjectLayer) error { +func checkListObjsArgs(ctx context.Context, bucket, prefix, marker string, obj ObjectLayer) error { // Verify if bucket exists before validating object name. // This is done on purpose since the order of errors is // important here bucket does not exist error should @@ -90,7 +90,7 @@ func checkListObjsArgs(ctx context.Context, bucket, prefix, marker, delimiter st // Checks for all ListMultipartUploads arguments validity. func checkListMultipartArgs(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, obj ObjectLayer) error { - if err := checkListObjsArgs(ctx, bucket, prefix, keyMarker, delimiter, obj); err != nil { + if err := checkListObjsArgs(ctx, bucket, prefix, keyMarker, obj); err != nil { return err } if uploadIDMarker != "" { diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index b7e7ed9bb..9c8501dcd 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -69,6 +69,7 @@ type ObjectLayer interface { DeleteBucket(ctx context.Context, bucket string) error ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) + Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error // Object operations. @@ -101,6 +102,7 @@ type ObjectLayer interface { HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (madmin.HealResultItem, error) HealObject(ctx context.Context, bucket, object string, dryRun, remove bool, scanMode madmin.HealScanMode) (madmin.HealResultItem, error) + HealObjects(ctx context.Context, bucket, prefix string, fn healObjectFn) error ListBucketsHeal(ctx context.Context) (buckets []BucketInfo, err error) diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 41c94e696..2ae91ac8f 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -1896,6 +1896,9 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http return } + // To detect if the client has disconnected. + r.Body = &detectDisconnect{r.Body, r.Context().Done()} + // X-Amz-Copy-Source shouldn't be set for this call. if _, ok := r.Header[xhttp.AmzCopySource]; ok { writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopySource), r.URL, guessIsBrowserReq(r)) diff --git a/cmd/posix-diskid-check.go b/cmd/posix-diskid-check.go index 1e00064a7..240a1c156 100644 --- a/cmd/posix-diskid-check.go +++ b/cmd/posix-diskid-check.go @@ -109,7 +109,7 @@ func (p *posixDiskIDCheck) DeleteVol(volume string) (err error) { return p.storage.DeleteVol(volume) } -func (p *posixDiskIDCheck) Walk(volume, dirPath string, marker string, recursive bool, leafFile string, readMetadataFn readMetadataFunc, endWalkCh chan struct{}) (chan FileInfo, error) { +func (p *posixDiskIDCheck) Walk(volume, dirPath string, marker string, recursive bool, leafFile string, readMetadataFn readMetadataFunc, endWalkCh <-chan struct{}) (chan FileInfo, error) { if p.isDiskStale() { return nil, errDiskNotFound } diff --git a/cmd/posix.go b/cmd/posix.go index d72b30d41..4873899c8 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -643,7 +643,7 @@ func (s *posix) DeleteVol(volume string) (err error) { // Walk - is a sorted walker which returns file entries in lexically // sorted order, additionally along with metadata about each of those entries. func (s *posix) Walk(volume, dirPath, marker string, recursive bool, leafFile string, - readMetadataFn readMetadataFunc, endWalkCh chan struct{}) (ch chan FileInfo, err error) { + readMetadataFn readMetadataFunc, endWalkCh <-chan struct{}) (ch chan FileInfo, err error) { atomic.AddInt32(&s.activeIOCount, 1) defer func() { diff --git a/cmd/storage-datatypes.go b/cmd/storage-datatypes.go index cfa326a28..c43021258 100644 --- a/cmd/storage-datatypes.go +++ b/cmd/storage-datatypes.go @@ -19,6 +19,8 @@ package cmd import ( "os" "time" + + xhttp "github.com/minio/minio/cmd/http" ) // VolInfo - represents volume stat information. @@ -62,3 +64,44 @@ type FileInfo struct { Quorum int } + +// ToObjectInfo converts FileInfo into objectInfo. +func (entry FileInfo) ToObjectInfo() ObjectInfo { + var objInfo ObjectInfo + if HasSuffix(entry.Name, SlashSeparator) { + objInfo = ObjectInfo{ + Bucket: entry.Volume, + Name: entry.Name, + IsDir: true, + } + } else { + objInfo = ObjectInfo{ + IsDir: false, + Bucket: entry.Volume, + Name: entry.Name, + ModTime: entry.ModTime, + Size: entry.Size, + ContentType: entry.Metadata["content-type"], + ContentEncoding: entry.Metadata["content-encoding"], + } + + // Extract etag from metadata. + objInfo.ETag = extractETag(entry.Metadata) + + // All the parts per object. + objInfo.Parts = entry.Parts + + // etag/md5Sum has already been extracted. We need to + // remove to avoid it from appearing as part of + // response headers. e.g, X-Minio-* or X-Amz-*. + objInfo.UserDefined = cleanMetadata(entry.Metadata) + + // Update storage class + if sc, ok := entry.Metadata[xhttp.AmzStorageClass]; ok { + objInfo.StorageClass = sc + } else { + objInfo.StorageClass = globalMinioDefaultStorageClass + } + } + return objInfo +} diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index e60778853..394c3915c 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -43,7 +43,7 @@ type StorageAPI interface { // Walk in sorted order directly on disk. Walk(volume, dirPath string, marker string, recursive bool, leafFile string, - readMetadataFn readMetadataFunc, endWalkCh chan struct{}) (chan FileInfo, error) + readMetadataFn readMetadataFunc, endWalkCh <-chan struct{}) (chan FileInfo, error) // File operations. ListDir(volume, dirPath string, count int, leafFile string) ([]string, error) diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index f2aff4c32..1e95bfe34 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -336,7 +336,7 @@ func (client *storageRESTClient) ReadFile(volume, path string, offset int64, buf } func (client *storageRESTClient) Walk(volume, dirPath, marker string, recursive bool, leafFile string, - readMetadataFn readMetadataFunc, endWalkCh chan struct{}) (chan FileInfo, error) { + readMetadataFn readMetadataFunc, endWalkCh <-chan struct{}) (chan FileInfo, error) { values := make(url.Values) values.Set(storageRESTVolume, volume) values.Set(storageRESTDirPath, dirPath) diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index b214db0eb..c798d5b8e 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -431,10 +431,7 @@ func (s *storageRESTServer) WalkHandler(w http.ResponseWriter, r *http.Request) } leafFile := vars[storageRESTLeafFile] - endWalkCh := make(chan struct{}) - defer close(endWalkCh) - - fch, err := s.storage.Walk(volume, dirPath, markerPath, recursive, leafFile, readMetadata, endWalkCh) + fch, err := s.storage.Walk(volume, dirPath, markerPath, recursive, leafFile, readMetadata, r.Context().Done()) if err != nil { s.writeErrorResponse(w, err) return diff --git a/cmd/tree-walk.go b/cmd/tree-walk.go index 5198f7718..36d6c9a70 100644 --- a/cmd/tree-walk.go +++ b/cmd/tree-walk.go @@ -59,7 +59,7 @@ func filterMatchingPrefix(entries []string, prefixEntry string) []string { type ListDirFunc func(bucket, prefixDir, prefixEntry string) (entries []string) // treeWalk walks directory tree recursively pushing TreeWalkResult into the channel as and when it encounters files. -func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, listDir ListDirFunc, resultCh chan TreeWalkResult, endWalkCh chan struct{}, isEnd bool) (totalNum int, treeErr error) { +func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker string, recursive bool, listDir ListDirFunc, resultCh chan TreeWalkResult, endWalkCh <-chan struct{}, isEnd bool) (totalNum int, treeErr error) { // Example: // if prefixDir="one/two/three/" and marker="four/five.txt" treeWalk is recursively // called with prefixDir="one/two/three/four/" and marker="five.txt" @@ -151,7 +151,7 @@ func doTreeWalk(ctx context.Context, bucket, prefixDir, entryPrefixMatch, marker } // Initiate a new treeWalk in a goroutine. -func startTreeWalk(ctx context.Context, bucket, prefix, marker string, recursive bool, listDir ListDirFunc, endWalkCh chan struct{}) chan TreeWalkResult { +func startTreeWalk(ctx context.Context, bucket, prefix, marker string, recursive bool, listDir ListDirFunc, endWalkCh <-chan struct{}) chan TreeWalkResult { // Example 1 // If prefix is "one/two/three/" and marker is "one/two/three/four/five.txt" // treeWalk is called with prefixDir="one/two/three/" and marker="four/five.txt" diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index 7f57d4346..b85ad0bcd 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -941,7 +941,7 @@ func isTruncated(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool) } // Starts a walk channel across all disks and returns a slice. -func (s *xlSets) startMergeWalks(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh chan struct{}) []FileInfoCh { +func (s *xlSets) startMergeWalks(ctx context.Context, bucket, prefix, marker string, recursive bool, endWalkCh <-chan struct{}) []FileInfoCh { var entryChs []FileInfoCh for _, set := range s.sets { for _, disk := range set.getDisks() { @@ -965,8 +965,8 @@ func (s *xlSets) startMergeWalks(ctx context.Context, bucket, prefix, marker str func (s *xlSets) listObjectsNonSlash(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) { endWalkCh := make(chan struct{}) defer close(endWalkCh) - recursive := true - entryChs := s.startMergeWalks(context.Background(), bucket, prefix, "", recursive, endWalkCh) + + entryChs := s.startMergeWalks(context.Background(), bucket, prefix, "", true, endWalkCh) var objInfos []ObjectInfo var eof bool @@ -1070,7 +1070,7 @@ func (s *xlSets) listObjectsNonSlash(ctx context.Context, bucket, prefix, marker // the data in lexically sorted order. // If partialQuorumOnly is set only objects that does not have full quorum is returned. func (s *xlSets) listObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int, partialQuorumOnly bool) (loi ListObjectsInfo, err error) { - if err = checkListObjsArgs(ctx, bucket, prefix, marker, delimiter, s); err != nil { + if err = checkListObjsArgs(ctx, bucket, prefix, marker, s); err != nil { return loi, err } @@ -1129,45 +1129,10 @@ func (s *xlSets) listObjects(ctx context.Context, bucket, prefix, marker, delimi } for _, entry := range entries.Files { - var objInfo ObjectInfo - if HasSuffix(entry.Name, SlashSeparator) { - if !recursive { - loi.Prefixes = append(loi.Prefixes, entry.Name) - continue - } - objInfo = ObjectInfo{ - Bucket: bucket, - Name: entry.Name, - IsDir: true, - } - } else { - objInfo = ObjectInfo{ - IsDir: false, - Bucket: bucket, - Name: entry.Name, - ModTime: entry.ModTime, - Size: entry.Size, - ContentType: entry.Metadata["content-type"], - ContentEncoding: entry.Metadata["content-encoding"], - } - - // Extract etag from metadata. - objInfo.ETag = extractETag(entry.Metadata) - - // All the parts per object. - objInfo.Parts = entry.Parts - - // etag/md5Sum has already been extracted. We need to - // remove to avoid it from appearing as part of - // response headers. e.g, X-Minio-* or X-Amz-*. - objInfo.UserDefined = cleanMetadata(entry.Metadata) - - // Update storage class - if sc, ok := entry.Metadata[xhttp.AmzStorageClass]; ok { - objInfo.StorageClass = sc - } else { - objInfo.StorageClass = globalMinioDefaultStorageClass - } + objInfo := entry.ToObjectInfo() + if HasSuffix(objInfo.Name, SlashSeparator) && !recursive { + loi.Prefixes = append(loi.Prefixes, entry.Name) + continue } loi.Objects = append(loi.Objects, objInfo) } @@ -1645,14 +1610,48 @@ func (s *xlSets) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) { return listBuckets, nil } +// Walk a bucket, optionally prefix recursively, until we have returned +// all the content to objectInfo channel, it is callers responsibility +// to allocate a receive channel for ObjectInfo, upon any unhandled +// error walker returns error. Optionally if context.Done() is received +// then Walk() stops the walker. +func (s *xlSets) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error { + if err := checkListObjsArgs(ctx, bucket, prefix, "", s); err != nil { + return err + } + + entryChs := s.startMergeWalks(ctx, bucket, prefix, "", true, ctx.Done()) + + entriesValid := make([]bool, len(entryChs)) + entries := make([]FileInfo, len(entryChs)) + + go func() { + defer close(results) + + for { + entry, quorumCount, ok := leastEntry(entryChs, entries, entriesValid) + if !ok { + return + } + + if quorumCount != s.drivesPerSet { + return + } + + results <- entry.ToObjectInfo() + } + }() + + return nil +} + // HealObjects - Heal all objects recursively at a specified prefix, any // dangling objects deleted as well automatically. func (s *xlSets) HealObjects(ctx context.Context, bucket, prefix string, healObject healObjectFn) error { endWalkCh := make(chan struct{}) defer close(endWalkCh) - recursive := true - entryChs := s.startMergeWalks(ctx, bucket, prefix, "", recursive, endWalkCh) + entryChs := s.startMergeWalks(ctx, bucket, prefix, "", true, endWalkCh) entriesValid := make([]bool, len(entryChs)) entries := make([]FileInfo, len(entryChs)) diff --git a/cmd/xl-v1-list-objects-heal.go b/cmd/xl-v1-list-objects-heal.go index e9e94f0c0..4d7ea55bb 100644 --- a/cmd/xl-v1-list-objects-heal.go +++ b/cmd/xl-v1-list-objects-heal.go @@ -16,14 +16,26 @@ package cmd -import "context" +import ( + "context" + + "github.com/minio/minio/cmd/logger" +) // This is not implemented/needed anymore, look for xl-sets.ListBucketHeal() func (xl xlObjects) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) { - return nil, nil + logger.LogIf(ctx, NotImplemented{}) + return nil, NotImplemented{} } // This is not implemented/needed anymore, look for xl-sets.HealObjects() -func (xl xlObjects) HealObjects(ctx context.Context, bucket, prefix string, fn healObjectFn) (e error) { - return nil +func (xl xlObjects) HealObjects(ctx context.Context, bucket, prefix string, fn healObjectFn) error { + logger.LogIf(ctx, NotImplemented{}) + return NotImplemented{} +} + +// this is not implemented/needed anymore, look for xl-sets.Walk() +func (xl xlObjects) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error { + logger.LogIf(ctx, NotImplemented{}) + return NotImplemented{} } diff --git a/cmd/xl-v1-list-objects.go b/cmd/xl-v1-list-objects.go index e3247439e..7ddd50a0b 100644 --- a/cmd/xl-v1-list-objects.go +++ b/cmd/xl-v1-list-objects.go @@ -144,7 +144,7 @@ func (xl xlObjects) listObjects(ctx context.Context, bucket, prefix, marker, del // ListObjects - list all objects at prefix, delimited by '/'. func (xl xlObjects) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) { - if err := checkListObjsArgs(ctx, bucket, prefix, marker, delimiter, xl); err != nil { + if err := checkListObjsArgs(ctx, bucket, prefix, marker, xl); err != nil { return loi, err } diff --git a/cmd/xl-v1.go b/cmd/xl-v1.go index 1c6cb31c2..c2c216c3a 100644 --- a/cmd/xl-v1.go +++ b/cmd/xl-v1.go @@ -74,7 +74,6 @@ func (xl xlObjects) NewNSLock(ctx context.Context, bucket string, objects ...str func (xl xlObjects) Shutdown(ctx context.Context) error { // Add any object layer shutdown activities here. closeStorageDisks(xl.getDisks()) - closeLockers(xl.getLockers()) return nil } diff --git a/cmd/xl-zones.go b/cmd/xl-zones.go index 320cd080f..d46520cec 100644 --- a/cmd/xl-zones.go +++ b/cmd/xl-zones.go @@ -528,12 +528,12 @@ func (z *xlZones) listObjectsNonSlash(ctx context.Context, bucket, prefix, marke var zonesEntryChs [][]FileInfoCh - recursive := true + endWalkCh := make(chan struct{}) + defer close(endWalkCh) + for _, zone := range z.zones { - endWalkCh := make(chan struct{}) - defer close(endWalkCh) zonesEntryChs = append(zonesEntryChs, - zone.startMergeWalks(ctx, bucket, prefix, "", recursive, endWalkCh)) + zone.startMergeWalks(ctx, bucket, prefix, "", true, endWalkCh)) } var objInfos []ObjectInfo @@ -646,7 +646,7 @@ func (z *xlZones) listObjectsNonSlash(ctx context.Context, bucket, prefix, marke func (z *xlZones) listObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int, heal bool) (ListObjectsInfo, error) { loi := ListObjectsInfo{} - if err := checkListObjsArgs(ctx, bucket, prefix, marker, delimiter, z); err != nil { + if err := checkListObjsArgs(ctx, bucket, prefix, marker, z); err != nil { return loi, err } @@ -717,45 +717,10 @@ func (z *xlZones) listObjects(ctx context.Context, bucket, prefix, marker, delim } for _, entry := range entries.Files { - var objInfo ObjectInfo - if HasSuffix(entry.Name, SlashSeparator) { - if !recursive { - loi.Prefixes = append(loi.Prefixes, entry.Name) - continue - } - objInfo = ObjectInfo{ - Bucket: bucket, - Name: entry.Name, - IsDir: true, - } - } else { - objInfo = ObjectInfo{ - IsDir: false, - Bucket: bucket, - Name: entry.Name, - ModTime: entry.ModTime, - Size: entry.Size, - ContentType: entry.Metadata["content-type"], - ContentEncoding: entry.Metadata["content-encoding"], - } - - // Extract etag from metadata. - objInfo.ETag = extractETag(entry.Metadata) - - // All the parts per object. - objInfo.Parts = entry.Parts - - // etag/md5Sum has already been extracted. We need to - // remove to avoid it from appearing as part of - // response headers. e.g, X-Minio-* or X-Amz-*. - objInfo.UserDefined = cleanMetadata(entry.Metadata) - - // Update storage class - if sc, ok := entry.Metadata[xhttp.AmzStorageClass]; ok { - objInfo.StorageClass = sc - } else { - objInfo.StorageClass = globalMinioDefaultStorageClass - } + objInfo := entry.ToObjectInfo() + if HasSuffix(objInfo.Name, SlashSeparator) && !recursive { + loi.Prefixes = append(loi.Prefixes, objInfo.Name) + continue } loi.Objects = append(loi.Objects, objInfo) } @@ -1319,17 +1284,67 @@ func (z *xlZones) HealBucket(ctx context.Context, bucket string, dryRun, remove return r, nil } +// Walk a bucket, optionally prefix recursively, until we have returned +// all the content to objectInfo channel, it is callers responsibility +// to allocate a receive channel for ObjectInfo, upon any unhandled +// error walker returns error. Optionally if context.Done() is received +// then Walk() stops the walker. +func (z *xlZones) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo) error { + if err := checkListObjsArgs(ctx, bucket, prefix, "", z); err != nil { + return err + } + + var zonesEntryChs [][]FileInfoCh + + for _, zone := range z.zones { + zonesEntryChs = append(zonesEntryChs, + zone.startMergeWalks(ctx, bucket, prefix, "", true, ctx.Done())) + } + + var zoneDrivesPerSet []int + for _, zone := range z.zones { + zoneDrivesPerSet = append(zoneDrivesPerSet, zone.drivesPerSet) + } + + var zonesEntriesInfos [][]FileInfo + var zonesEntriesValid [][]bool + for _, entryChs := range zonesEntryChs { + zonesEntriesInfos = append(zonesEntriesInfos, make([]FileInfo, len(entryChs))) + zonesEntriesValid = append(zonesEntriesValid, make([]bool, len(entryChs))) + } + + go func() { + defer close(results) + + for { + entry, quorumCount, zoneIndex, ok := leastEntryZone(zonesEntryChs, + zonesEntriesInfos, zonesEntriesValid) + if !ok { + return + } + + if quorumCount != zoneDrivesPerSet[zoneIndex] { + continue + } + + results <- entry.ToObjectInfo() + } + }() + + return nil +} + type healObjectFn func(string, string) error func (z *xlZones) HealObjects(ctx context.Context, bucket, prefix string, healObject healObjectFn) error { var zonesEntryChs [][]FileInfoCh - recursive := true + endWalkCh := make(chan struct{}) + defer close(endWalkCh) + for _, zone := range z.zones { - endWalkCh := make(chan struct{}) - defer close(endWalkCh) zonesEntryChs = append(zonesEntryChs, - zone.startMergeWalks(ctx, bucket, prefix, "", recursive, endWalkCh)) + zone.startMergeWalks(ctx, bucket, prefix, "", true, endWalkCh)) } var zoneDrivesPerSet []int diff --git a/pkg/bucket/lifecycle/lifecycle.go b/pkg/bucket/lifecycle/lifecycle.go index 18113f631..9b7ae7390 100644 --- a/pkg/bucket/lifecycle/lifecycle.go +++ b/pkg/bucket/lifecycle/lifecycle.go @@ -99,8 +99,11 @@ func (lc Lifecycle) Validate() error { // FilterRuleActions returns the expiration and transition from the object name // after evaluating all rules. func (lc Lifecycle) FilterRuleActions(objName, objTags string) (Expiration, Transition) { + if objName == "" { + return Expiration{}, Transition{} + } for _, rule := range lc.Rules { - if strings.ToLower(rule.Status) != "enabled" { + if rule.Status == Disabled { continue } tags := rule.Tags() @@ -121,6 +124,9 @@ func (lc Lifecycle) FilterRuleActions(objName, objTags string) (Expiration, Tran // against the object name and its modification time. func (lc Lifecycle) ComputeAction(objName, objTags string, modTime time.Time) Action { var action = NoneAction + if modTime.IsZero() { + return action + } exp, _ := lc.FilterRuleActions(objName, objTags) if !exp.IsDateNull() { if time.Now().After(exp.Date.Time) { diff --git a/pkg/bucket/lifecycle/lifecycle_test.go b/pkg/bucket/lifecycle/lifecycle_test.go index 035eb64d9..0b7a2f1f8 100644 --- a/pkg/bucket/lifecycle/lifecycle_test.go +++ b/pkg/bucket/lifecycle/lifecycle_test.go @@ -179,6 +179,12 @@ func TestComputeActions(t *testing.T) { objectModTime: time.Now().UTC().Add(-10 * 24 * time.Hour), // Created 10 days ago expectedAction: NoneAction, }, + // No modTime, should be none-action + { + inputConfig: `foodir/Enabled5`, + objectName: "foodir/fooobject", + expectedAction: NoneAction, + }, // Prefix not matched { inputConfig: `foodir/Enabled5`, diff --git a/pkg/bucket/lifecycle/rule.go b/pkg/bucket/lifecycle/rule.go index b99d7caae..eadfbc852 100644 --- a/pkg/bucket/lifecycle/rule.go +++ b/pkg/bucket/lifecycle/rule.go @@ -21,11 +21,20 @@ import ( "encoding/xml" ) +// Status represents lifecycle configuration status +type Status string + +// Supported status types +const ( + Enabled Status = "Enabled" + Disabled Status = "Disabled" +) + // Rule - a rule for lifecycle configuration. type Rule struct { XMLName xml.Name `xml:"Rule"` ID string `xml:"ID,omitempty"` - Status string `xml:"Status"` + Status Status `xml:"Status"` Filter Filter `xml:"Filter,omitempty"` Expiration Expiration `xml:"Expiration,omitempty"` Transition Transition `xml:"Transition,omitempty"` @@ -58,7 +67,7 @@ func (r Rule) validateStatus() error { } // Status must be one of Enabled or Disabled - if r.Status != "Enabled" && r.Status != "Disabled" { + if r.Status != Enabled && r.Status != Disabled { return errInvalidRuleStatus } return nil