Remove upload healing related dead code (#5404)

This commit is contained in:
Aditya Manthramurthy 2018-01-15 18:20:39 -08:00 committed by kannappanr
parent 78a641fc6a
commit aa7e5c71e9
8 changed files with 3 additions and 448 deletions

View File

@ -1057,8 +1057,3 @@ func (fs fsObjects) ListObjectsHeal(bucket, prefix, marker, delimiter string, ma
func (fs fsObjects) ListBucketsHeal() ([]BucketInfo, error) { func (fs fsObjects) ListBucketsHeal() ([]BucketInfo, error) {
return []BucketInfo{}, errors.Trace(NotImplemented{}) return []BucketInfo{}, errors.Trace(NotImplemented{})
} }
func (fs fsObjects) ListUploadsHeal(bucket, prefix, marker, uploadIDMarker,
delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) {
return lmi, errors.Trace(NotImplemented{})
}

View File

@ -103,12 +103,6 @@ func (a GatewayUnsupported) ListObjectsHeal(bucket, prefix, marker, delimiter st
return loi, errors.Trace(NotImplemented{}) return loi, errors.Trace(NotImplemented{})
} }
// ListUploadsHeal - Not implemented stub
func (a GatewayUnsupported) ListUploadsHeal(bucket, prefix, marker, uploadIDMarker,
delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) {
return lmi, errors.Trace(NotImplemented{})
}
// AnonListObjects - List objects anonymously // AnonListObjects - List objects anonymously
func (a GatewayUnsupported) AnonListObjects(bucket string, prefix string, marker string, delimiter string, func (a GatewayUnsupported) AnonListObjects(bucket string, prefix string, marker string, delimiter string,
maxKeys int) (loi ListObjectsInfo, err error) { maxKeys int) (loi ListObjectsInfo, err error) {

View File

@ -57,8 +57,6 @@ type ObjectLayer interface {
ListBucketsHeal() (buckets []BucketInfo, err error) ListBucketsHeal() (buckets []BucketInfo, err error)
HealObject(bucket, object string) (int, int, error) HealObject(bucket, object string) (int, int, error)
ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error)
ListUploadsHeal(bucket, prefix, marker, uploadIDMarker,
delimiter string, maxUploads int) (ListMultipartsInfo, error)
// Locking operations // Locking operations
ListLocks(bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error) ListLocks(bucket, prefix string, duration time.Duration) ([]VolumeLockInfo, error)

View File

@ -17,7 +17,6 @@
package cmd package cmd
import ( import (
"path/filepath"
"sort" "sort"
"strings" "strings"
@ -197,224 +196,3 @@ func (xl xlObjects) ListObjectsHeal(bucket, prefix, marker, delimiter string, ma
// Return error at the end. // Return error at the end.
return loi, toObjectErr(err, bucket, prefix) return loi, toObjectErr(err, bucket, prefix)
} }
// ListUploadsHeal - lists ongoing multipart uploads that require
// healing in one or more disks.
func (xl xlObjects) ListUploadsHeal(bucket, prefix, marker, uploadIDMarker,
delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) {
// For delimiter and prefix as '/' we do not list anything at all
// since according to s3 spec we stop at the 'delimiter' along
// with the prefix. On a flat namespace with 'prefix' as '/'
// we don't have any entries, since all the keys are of form 'keyName/...'
if delimiter == slashSeparator && prefix == slashSeparator {
return lmi, nil
}
// Initiate a list operation.
listMultipartInfo, err := xl.listMultipartUploadsHeal(bucket, prefix,
marker, uploadIDMarker, delimiter, maxUploads)
if err != nil {
return lmi, toObjectErr(err, bucket, prefix)
}
// We got the entries successfully return.
return listMultipartInfo, nil
}
// Fetches list of multipart uploadIDs given bucket, keyMarker, uploadIDMarker.
func (xl xlObjects) fetchMultipartUploadIDs(bucket, keyMarker, uploadIDMarker string,
maxUploads int, disks []StorageAPI) (uploads []MultipartInfo, end bool,
err error) {
// Hold a read lock on keyMarker path.
keyMarkerLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket,
pathJoin(bucket, keyMarker))
if err = keyMarkerLock.GetRLock(globalHealingTimeout); err != nil {
return uploads, end, err
}
for _, disk := range disks {
if disk == nil {
continue
}
uploads, end, err = xl.listMultipartUploadIDs(bucket, keyMarker,
uploadIDMarker, maxUploads, disk)
if err == nil ||
!errors.IsErrIgnored(err, objMetadataOpIgnoredErrs...) {
break
}
}
keyMarkerLock.RUnlock()
return uploads, end, err
}
// listMultipartUploadsHeal - Returns a list of incomplete multipart
// uploads that need to be healed.
func (xl xlObjects) listMultipartUploadsHeal(bucket, prefix, keyMarker,
uploadIDMarker, delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) {
result := ListMultipartsInfo{
IsTruncated: true,
MaxUploads: maxUploads,
KeyMarker: keyMarker,
Prefix: prefix,
Delimiter: delimiter,
}
recursive := delimiter != slashSeparator
var uploads []MultipartInfo
var err error
// List all upload ids for the given keyMarker, starting from
// uploadIDMarker.
if uploadIDMarker != "" {
uploads, _, err = xl.fetchMultipartUploadIDs(bucket, keyMarker,
uploadIDMarker, maxUploads, xl.getLoadBalancedDisks())
if err != nil {
return lmi, err
}
maxUploads = maxUploads - len(uploads)
}
// We can't use path.Join() as it strips off the trailing '/'.
multipartPrefixPath := pathJoin(bucket, prefix)
// multipartPrefixPath should have a trailing '/' when prefix = "".
if prefix == "" {
multipartPrefixPath += slashSeparator
}
multipartMarkerPath := ""
if keyMarker != "" {
multipartMarkerPath = pathJoin(bucket, keyMarker)
}
// `heal bool` is used to differentiate listing of incomplete
// uploads (and parts) from a regular listing of incomplete
// parts by client SDKs or mc-like commands, within a treewalk
// pool.
heal := true
// The listing is truncated if we have maxUploads entries and
// there are more entries to be listed.
truncated := true
var walkerCh chan treeWalkResult
var walkerDoneCh chan struct{}
// Check if we have room left to send more uploads.
if maxUploads > 0 {
uploadsLeft := maxUploads
walkerCh, walkerDoneCh = xl.listPool.Release(listParams{
bucket: minioMetaMultipartBucket,
recursive: recursive,
marker: multipartMarkerPath,
prefix: multipartPrefixPath,
heal: heal,
})
if walkerCh == nil {
walkerDoneCh = make(chan struct{})
isLeaf := xl.isMultipartUpload
listDir := listDirFactory(isLeaf, xlTreeWalkIgnoredErrs,
xl.getLoadBalancedDisks()...)
walkerCh = startTreeWalk(minioMetaMultipartBucket,
multipartPrefixPath, multipartMarkerPath,
recursive, listDir, isLeaf, walkerDoneCh)
}
// Collect uploads until leftUploads limit is reached.
for {
walkResult, ok := <-walkerCh
if !ok {
truncated = false
break
}
// For any error during tree walk, we should return right away.
if walkResult.err != nil {
return lmi, walkResult.err
}
entry := strings.TrimPrefix(walkResult.entry,
retainSlash(bucket))
// Skip entries that are not object directory.
if hasSuffix(walkResult.entry, slashSeparator) {
uploads = append(uploads, MultipartInfo{
Object: entry,
})
uploadsLeft--
if uploadsLeft == 0 {
break
}
continue
}
// For an object entry we get all its pending
// uploadIDs.
var newUploads []MultipartInfo
var end bool
uploadIDMarker = ""
newUploads, end, err = xl.fetchMultipartUploadIDs(bucket, entry, uploadIDMarker,
uploadsLeft, xl.getLoadBalancedDisks())
if err != nil {
return lmi, err
}
uploads = append(uploads, newUploads...)
uploadsLeft -= len(newUploads)
if end && walkResult.end {
truncated = false
break
}
if uploadsLeft == 0 {
break
}
}
}
// For all received uploads fill in the multiparts result.
for _, upload := range uploads {
var objectName string
var uploadID string
if hasSuffix(upload.Object, slashSeparator) {
// All directory entries are common
// prefixes. For common prefixes, upload ids
// are empty.
uploadID = ""
objectName = upload.Object
result.CommonPrefixes = append(result.CommonPrefixes, objectName)
} else {
// Check if upload needs healing.
uploadIDPath := filepath.Join(bucket, upload.Object, upload.UploadID)
partsMetadata, errs := readAllXLMetadata(xl.storageDisks,
minioMetaMultipartBucket, uploadIDPath)
if xlShouldHeal(xl.storageDisks, partsMetadata, errs,
minioMetaMultipartBucket, uploadIDPath) {
healUploadInfo := xlHealStat(xl, partsMetadata, errs)
upload.HealUploadInfo = &healUploadInfo
result.Uploads = append(result.Uploads, upload)
}
uploadID = upload.UploadID
objectName = upload.Object
}
result.NextKeyMarker = objectName
result.NextUploadIDMarker = uploadID
}
if truncated {
// Put back the tree walk go-routine into the pool for
// subsequent use.
xl.listPool.Set(listParams{
bucket: bucket,
recursive: recursive,
marker: result.NextKeyMarker,
prefix: prefix,
heal: heal,
}, walkerCh, walkerDoneCh)
}
result.IsTruncated = truncated
// Result is not truncated, reset the markers.
if !result.IsTruncated {
result.NextKeyMarker = ""
result.NextUploadIDMarker = ""
}
return result, nil
}

View File

@ -19,8 +19,6 @@ package cmd
import ( import (
"bytes" "bytes"
"os" "os"
"path"
"path/filepath"
"strconv" "strconv"
"testing" "testing"
) )
@ -142,78 +140,3 @@ func TestListObjectsHeal(t *testing.T) {
} }
} }
// Test for ListUploadsHeal API for XL.
func TestListUploadsHeal(t *testing.T) {
initNSLock(false)
rootPath, err := newTestConfig(globalMinioDefaultRegion)
if err != nil {
t.Fatalf("Init Test config failed")
}
// Remove config directory after the test ends.
defer os.RemoveAll(rootPath)
// Create an instance of XL backend.
xl, fsDirs, err := prepareXL16()
if err != nil {
t.Fatal(err)
}
// Cleanup backend directories on function return.
defer removeRoots(fsDirs)
bucketName := "bucket"
prefix := "prefix"
objName := path.Join(prefix, "obj")
// Create test bucket.
err = xl.MakeBucketWithLocation(bucketName, "")
if err != nil {
t.Fatal(err)
}
// Create a new multipart upload.
uploadID, err := xl.NewMultipartUpload(bucketName, objName, nil)
if err != nil {
t.Fatal(err)
}
// Upload a part.
data := bytes.Repeat([]byte("a"), 1024)
_, err = xl.PutObjectPart(bucketName, objName, uploadID, 1,
mustGetHashReader(t, bytes.NewReader(data), int64(len(data)), "", ""))
if err != nil {
t.Fatal(err)
}
// Check if list uploads heal returns any uploads to be healed
// incorrectly.
listUploadsInfo, err := xl.ListUploadsHeal(bucketName, prefix, "", "", "", 1000)
if err != nil {
t.Fatal(err)
}
// All uploads intact nothing to heal.
if len(listUploadsInfo.Uploads) != 0 {
t.Errorf("Expected no uploads but received %d", len(listUploadsInfo.Uploads))
}
// Delete the part from the first disk to make the upload (and
// its part) to appear in upload heal listing.
firstDisk := xl.(*xlObjects).storageDisks[0]
err = firstDisk.DeleteFile(minioMetaMultipartBucket,
filepath.Join(bucketName, objName, uploadID, xlMetaJSONFile))
if err != nil {
t.Fatal(err)
}
listUploadsInfo, err = xl.ListUploadsHeal(bucketName, prefix, "", "", "", 1000)
if err != nil {
t.Fatal(err)
}
// One upload with missing xl.json on first disk.
if len(listUploadsInfo.Uploads) != 1 {
t.Errorf("Expected 1 upload but received %d", len(listUploadsInfo.Uploads))
}
}

View File

@ -273,9 +273,9 @@ If object is successfully healed returns nil, otherwise returns error indicating
| Value | Description | | Value | Description |
|---|---| |---|---|
|`HealNone` | Object/Upload wasn't healed on any of the disks | |`HealNone` | Object wasn't healed on any of the disks |
|`HealPartial` | Object/Upload was healed on some of the disks needing heal | |`HealPartial` | Object was healed on some of the disks needing heal |
| `HealOK` | Object/Upload was healed on all the disks needing heal | | `HealOK` | Object was healed on all the disks needing heal |
__Example__ __Example__

View File

@ -1,57 +0,0 @@
// +build ignore
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package main
import (
"log"
"github.com/minio/minio/pkg/madmin"
)
func main() {
// Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY are
// dummy values, please replace them with original values.
// Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY are
// dummy values, please replace them with original values.
// API requests are secure (HTTPS) if secure=true and insecure (HTTPS) otherwise.
// New returns an Minio Admin client object.
madmClnt, err := madmin.New("your-minio.example.com:9000", "YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", true)
if err != nil {
log.Fatalln(err)
}
// Heal upload mybucket/myobject/uploadID - dry run.
isDryRun := true
_, err = madmClnt.HealUpload("mybucket", "myobject", "myUploadID", isDryRun)
if err != nil {
log.Fatalln(err)
}
// Heal upload mybucket/myobject/uploadID - this time for real.
isDryRun = false
healResult, err := madmClnt.HealUpload("mybucket", "myobject", "myUploadID", isDryRun)
if err != nil {
log.Fatalln(err)
}
log.Printf("Heal result for mybucket/myobject/myUploadID: %#v\n", healResult)
}

View File

@ -1,76 +0,0 @@
// +build ignore
package main
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import (
"fmt"
"log"
"github.com/minio/minio/pkg/madmin"
)
func main() {
// Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY are
// dummy values, please replace them with original values.
// API requests are secure (HTTPS) if secure=true and insecure (HTTPS) otherwise.
// New returns an Minio Admin client object.
madmClnt, err := madmin.New("your-minio.example.com:9000", "YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", true)
if err != nil {
log.Fatalln(err)
}
bucket := "mybucket"
prefix := "myprefix"
// Create a done channel to control 'ListUploadsHeal' go routine.
doneCh := make(chan struct{})
// Indicate to our routine to exit cleanly upon return.
defer close(doneCh)
// Set true if recursive listing is needed.
isRecursive := true
// List objects that need healing for a given bucket and
// prefix.
healUploadsCh, err := madmClnt.ListUploadsHeal(bucket, prefix, isRecursive, doneCh)
if err != nil {
log.Fatalln("Failed to get list of uploads to be healed: ", err)
}
for upload := range healUploadsCh {
if upload.Err != nil {
log.Println("upload listing error: ", upload.Err)
}
if upload.HealUploadInfo != nil {
switch healInfo := *upload.HealUploadInfo; healInfo.Status {
case madmin.CanHeal:
fmt.Println(upload.Key, " can be healed.")
case madmin.CanPartiallyHeal:
fmt.Println(upload.Key, " can be healed partially. Some disks may be offline.")
case madmin.QuorumUnavailable:
fmt.Println(upload.Key, " can't be healed until quorum is available.")
case madmin.Corrupted:
fmt.Println(upload.Key, " can't be healed, not enough information.")
}
}
}
}