minio/cmd/xl-v1-list-objects-heal.go

191 lines
5.5 KiB
Go
Raw Normal View History

/*
* Minio Cloud Storage, (C) 2016, 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"sort"
"strings"
"github.com/minio/minio/pkg/errors"
)
func listDirHealFactory(isLeaf isLeafFunc, disks ...StorageAPI) listDirFunc {
// Returns sorted merged entries from all the disks.
listDir := func(bucket, prefixDir, prefixEntry string) (mergedEntries []string, delayIsLeaf bool, err error) {
for _, disk := range disks {
if disk == nil {
continue
}
var entries []string
var newEntries []string
entries, err = disk.ListDir(bucket, prefixDir)
if err != nil {
continue
}
// isLeaf() check has to happen here so that
// trailing "/" for objects can be removed.
for i, entry := range entries {
if isLeaf(bucket, pathJoin(prefixDir, entry)) {
entries[i] = strings.TrimSuffix(entry, slashSeparator)
}
}
// Find elements in entries which are not in mergedEntries
for _, entry := range entries {
idx := sort.SearchStrings(mergedEntries, entry)
// if entry is already present in mergedEntries don't add.
if idx < len(mergedEntries) && mergedEntries[idx] == entry {
continue
}
newEntries = append(newEntries, entry)
}
if len(newEntries) > 0 {
// Merge the entries and sort it.
mergedEntries = append(mergedEntries, newEntries...)
sort.Strings(mergedEntries)
}
Move admin APIs to new path and add redesigned heal APIs (#5351) - Changes related to moving admin APIs - admin APIs now have an endpoint under /minio/admin - admin APIs are now versioned - a new API to server the version is added at "GET /minio/admin/version" and all API operations have the path prefix /minio/admin/v1/<operation> - new service stop API added - credentials change API is moved to /minio/admin/v1/config/credential - credentials change API and configuration get/set API now require TLS so that credentials are protected - all API requests now receive JSON - heal APIs are disabled as they will be changed substantially - Heal API changes Heal API is now provided at a single endpoint with the ability for a client to start a heal sequence on all the data in the server, a single bucket, or under a prefix within a bucket. When a heal sequence is started, the server returns a unique token that needs to be used for subsequent 'status' requests to fetch heal results. On each status request from the client, the server returns heal result records that it has accumulated since the previous status request. The server accumulates upto 1000 records and pauses healing further objects until the client requests for status. If the client does not request any further records for a long time, the server aborts the heal sequence automatically. A heal result record is returned for each entity healed on the server, such as system metadata, object metadata, buckets and objects, and has information about the before and after states on each disk. A client may request to force restart a heal sequence - this causes the running heal sequence to be aborted at the next safe spot and starts a new heal sequence.
2018-01-22 14:54:55 -08:00
// Filter entries that have the prefix prefixEntry.
mergedEntries = filterMatchingPrefix(mergedEntries, prefixEntry)
}
return mergedEntries, false, nil
}
return listDir
}
// listObjectsHeal - wrapper function implemented over file tree walk.
func (xl xlObjects) listObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) {
// Default is recursive, if delimiter is set then list non recursive.
recursive := true
if delimiter == slashSeparator {
recursive = false
}
// "heal" true for listObjectsHeal() and false for listObjects()
heal := true
walkResultCh, endWalkCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix, heal})
if walkResultCh == nil {
endWalkCh = make(chan struct{})
isLeaf := xl.isObject
listDir := listDirHealFactory(isLeaf, xl.storageDisks...)
walkResultCh = startTreeWalk(bucket, prefix, marker, recursive, listDir, nil, endWalkCh)
}
var objInfos []ObjectInfo
var eof bool
var nextMarker string
for i := 0; i < maxKeys; {
walkResult, ok := <-walkResultCh
if !ok {
// Closed channel.
eof = true
break
}
// For any walk error return right away.
if walkResult.err != nil {
return loi, toObjectErr(walkResult.err, bucket, prefix)
}
entry := walkResult.entry
var objInfo ObjectInfo
if hasSuffix(entry, slashSeparator) {
// Object name needs to be full path.
objInfo.Bucket = bucket
objInfo.Name = entry
objInfo.IsDir = true
} else {
var err error
objInfo, err = xl.getObjectInfo(bucket, entry)
if err != nil {
// Ignore errFileNotFound
if errors.Cause(err) == errFileNotFound {
continue
}
return loi, toObjectErr(err, bucket, prefix)
}
}
nextMarker = objInfo.Name
objInfos = append(objInfos, objInfo)
i++
if walkResult.end {
eof = true
break
}
}
params := listParams{bucket, recursive, nextMarker, prefix, heal}
if !eof {
xl.listPool.Set(params, walkResultCh, endWalkCh)
}
result := ListObjectsInfo{IsTruncated: !eof}
for _, objInfo := range objInfos {
result.NextMarker = objInfo.Name
if objInfo.IsDir {
result.Prefixes = append(result.Prefixes, objInfo.Name)
continue
}
Move admin APIs to new path and add redesigned heal APIs (#5351) - Changes related to moving admin APIs - admin APIs now have an endpoint under /minio/admin - admin APIs are now versioned - a new API to server the version is added at "GET /minio/admin/version" and all API operations have the path prefix /minio/admin/v1/<operation> - new service stop API added - credentials change API is moved to /minio/admin/v1/config/credential - credentials change API and configuration get/set API now require TLS so that credentials are protected - all API requests now receive JSON - heal APIs are disabled as they will be changed substantially - Heal API changes Heal API is now provided at a single endpoint with the ability for a client to start a heal sequence on all the data in the server, a single bucket, or under a prefix within a bucket. When a heal sequence is started, the server returns a unique token that needs to be used for subsequent 'status' requests to fetch heal results. On each status request from the client, the server returns heal result records that it has accumulated since the previous status request. The server accumulates upto 1000 records and pauses healing further objects until the client requests for status. If the client does not request any further records for a long time, the server aborts the heal sequence automatically. A heal result record is returned for each entity healed on the server, such as system metadata, object metadata, buckets and objects, and has information about the before and after states on each disk. A client may request to force restart a heal sequence - this causes the running heal sequence to be aborted at the next safe spot and starts a new heal sequence.
2018-01-22 14:54:55 -08:00
// Add each object seen to the result - objects are
// checked for healing later.
result.Objects = append(result.Objects, ObjectInfo{
Bucket: bucket,
Name: objInfo.Name,
ModTime: objInfo.ModTime,
Size: objInfo.Size,
IsDir: false,
})
}
return result, nil
}
// ListObjects - list all objects at prefix, delimited by '/'.
func (xl xlObjects) ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) {
if err := checkListObjsArgs(bucket, prefix, marker, delimiter, xl); err != nil {
return loi, err
}
// With max keys of zero we have reached eof, return right here.
if maxKeys == 0 {
return loi, nil
}
// For delimiter and prefix as '/' we do not list anything at all
// since according to s3 spec we stop at the 'delimiter' along
// with the prefix. On a flat namespace with 'prefix' as '/'
// we don't have any entries, since all the keys are of form 'keyName/...'
if delimiter == slashSeparator && prefix == slashSeparator {
return loi, nil
}
// Over flowing count - reset to maxObjectList.
if maxKeys < 0 || maxKeys > maxObjectList {
maxKeys = maxObjectList
}
// Initiate a list operation, if successful filter and return quickly.
listObjInfo, err := xl.listObjectsHeal(bucket, prefix, marker, delimiter, maxKeys)
if err == nil {
// We got the entries successfully return.
return listObjInfo, nil
}
// Return error at the end.
return loi, toObjectErr(err, bucket, prefix)
}