Implement list uploads heal admin API (#3885)

This commit is contained in:
Krishnan Parthasarathi
2017-03-16 12:45:06 +05:30
committed by Harshavardhana
parent 6509589adb
commit 051f9bb5c6
12 changed files with 845 additions and 33 deletions

View File

@@ -36,16 +36,19 @@ const (
// Type-safe query params.
type mgmtQueryKey string
// Only valid query params for list/clear locks management APIs.
// Only valid query params for mgmt admin APIs.
const (
mgmtBucket mgmtQueryKey = "bucket"
mgmtObject mgmtQueryKey = "object"
mgmtPrefix mgmtQueryKey = "prefix"
mgmtLockDuration mgmtQueryKey = "duration"
mgmtDelimiter mgmtQueryKey = "delimiter"
mgmtMarker mgmtQueryKey = "marker"
mgmtMaxKey mgmtQueryKey = "max-key"
mgmtDryRun mgmtQueryKey = "dry-run"
mgmtBucket mgmtQueryKey = "bucket"
mgmtObject mgmtQueryKey = "object"
mgmtPrefix mgmtQueryKey = "prefix"
mgmtLockDuration mgmtQueryKey = "duration"
mgmtDelimiter mgmtQueryKey = "delimiter"
mgmtMarker mgmtQueryKey = "marker"
mgmtKeyMarker mgmtQueryKey = "key-marker"
mgmtMaxKey mgmtQueryKey = "max-key"
mgmtDryRun mgmtQueryKey = "dry-run"
mgmtUploadIDMarker mgmtQueryKey = "upload-id-marker"
mgmtMaxUploads mgmtQueryKey = "max-uploads"
)
// ServerVersion - server version
@@ -400,8 +403,57 @@ func (adminAPI adminAPIHandlers) ClearLocksHandler(w http.ResponseWriter, r *htt
writeSuccessResponseJSON(w, jsonBytes)
}
// validateHealQueryParams - Validates query params for heal list management API.
func validateHealQueryParams(vars url.Values) (string, string, string, string, int, APIErrorCode) {
// ListUploadsHealHandler - similar to listObjectsHealHandler
// GET
// /?heal&bucket=mybucket&prefix=myprefix&key-marker=mymarker&upload-id-marker=myuploadid&delimiter=mydelimiter&max-uploads=1000
// - bucket is mandatory query parameter
// - rest are optional query parameters List upto maxKey objects that
// need healing in a given bucket matching the given prefix.
func (adminAPI adminAPIHandlers) ListUploadsHealHandler(w http.ResponseWriter, r *http.Request) {
// Get object layer instance.
objLayer := newObjectLayerFn()
if objLayer == nil {
writeErrorResponse(w, ErrServerNotInitialized, r.URL)
return
}
// Validate request signature.
adminAPIErr := checkRequestAuthType(r, "", "", "")
if adminAPIErr != ErrNone {
writeErrorResponse(w, adminAPIErr, r.URL)
return
}
// Validate query params.
vars := r.URL.Query()
bucket := vars.Get(string(mgmtBucket))
prefix, keyMarker, uploadIDMarker, delimiter, maxUploads, _ := getBucketMultipartResources(r.URL.Query())
if err := checkListMultipartArgs(bucket, prefix, keyMarker, uploadIDMarker, delimiter, objLayer); err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
if maxUploads <= 0 || maxUploads > maxUploadsList {
writeErrorResponse(w, ErrInvalidMaxUploads, r.URL)
return
}
// Get the list objects to be healed.
listMultipartInfos, err := objLayer.ListUploadsHeal(bucket, prefix,
keyMarker, uploadIDMarker, delimiter, maxUploads)
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
listResponse := generateListMultipartUploadsResponse(bucket, listMultipartInfos)
// Write success response.
writeSuccessResponseXML(w, encodeResponse(listResponse))
}
// extractListObjectsHealQuery - Validates query params for heal objects list management API.
func extractListObjectsHealQuery(vars url.Values) (string, string, string, string, int, APIErrorCode) {
bucket := vars.Get(string(mgmtBucket))
prefix := vars.Get(string(mgmtPrefix))
marker := vars.Get(string(mgmtMarker))
@@ -418,10 +470,13 @@ func validateHealQueryParams(vars url.Values) (string, string, string, string, i
return "", "", "", "", 0, ErrInvalidObjectName
}
// check if maxKey is a valid integer.
maxKey, err := strconv.Atoi(maxKeyStr)
if err != nil {
return "", "", "", "", 0, ErrInvalidMaxKeys
// check if maxKey is a valid integer, if present.
var maxKey int
var err error
if maxKeyStr != "" {
if maxKey, err = strconv.Atoi(maxKeyStr); err != nil {
return "", "", "", "", 0, ErrInvalidMaxKeys
}
}
// Validate prefix, marker, delimiter and maxKey.
@@ -454,7 +509,7 @@ func (adminAPI adminAPIHandlers) ListObjectsHealHandler(w http.ResponseWriter, r
// Validate query params.
vars := r.URL.Query()
bucket, prefix, marker, delimiter, maxKey, adminAPIErr := validateHealQueryParams(vars)
bucket, prefix, marker, delimiter, maxKey, adminAPIErr := extractListObjectsHealQuery(vars)
if adminAPIErr != ErrNone {
writeErrorResponse(w, adminAPIErr, r.URL)
return

View File

@@ -740,7 +740,7 @@ func TestValidateHealQueryParams(t *testing.T) {
}
for i, test := range testCases {
vars := mkListObjectsQueryVal(test.bucket, test.prefix, test.marker, test.delimiter, test.maxKeys)
_, _, _, _, _, actualErr := validateHealQueryParams(vars)
_, _, _, _, _, actualErr := extractListObjectsHealQuery(vars)
if actualErr != test.apiErr {
t.Errorf("Test %d - Expected %v but received %v",
i+1, getAPIError(test.apiErr), getAPIError(actualErr))
@@ -856,9 +856,6 @@ func TestListObjectsHealHandler(t *testing.T) {
}
for i, test := range testCases {
if i != 0 {
continue
}
queryVal := mkListObjectsQueryVal(test.bucket, test.prefix, test.marker, test.delimiter, test.maxKeys)
req, err := newTestRequest("GET", "/?"+queryVal.Encode(), 0, nil)
if err != nil {
@@ -1290,3 +1287,137 @@ func TestWriteSetConfigResponse(t *testing.T) {
}
}
}
// mkUploadsHealQuery - helper function to construct query values for
// listUploadsHeal.
func mkUploadsHealQuery(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploadsStr string) url.Values {
queryVal := make(url.Values)
queryVal.Set("heal", "")
queryVal.Set(string(mgmtBucket), bucket)
queryVal.Set(string(mgmtPrefix), prefix)
queryVal.Set(string(mgmtKeyMarker), keyMarker)
queryVal.Set(string(mgmtUploadIDMarker), uploadIDMarker)
queryVal.Set(string(mgmtDelimiter), delimiter)
queryVal.Set(string(mgmtMaxUploads), maxUploadsStr)
return queryVal
}
func TestListHealUploadsHandler(t *testing.T) {
adminTestBed, err := prepareAdminXLTestBed()
if err != nil {
t.Fatal("Failed to initialize a single node XL backend for admin handler tests.")
}
defer adminTestBed.TearDown()
err = adminTestBed.objLayer.MakeBucket("mybucket")
if err != nil {
t.Fatalf("Failed to make bucket - %v", err)
}
// Delete bucket after running all test cases.
defer adminTestBed.objLayer.DeleteBucket("mybucket")
testCases := []struct {
bucket string
prefix string
keyMarker string
delimiter string
maxKeys string
statusCode int
}{
// 1. Valid params.
{
bucket: "mybucket",
prefix: "prefix",
keyMarker: "prefix11",
delimiter: "/",
maxKeys: "10",
statusCode: http.StatusOK,
},
// 2. Valid params with empty prefix.
{
bucket: "mybucket",
prefix: "",
keyMarker: "",
delimiter: "/",
maxKeys: "10",
statusCode: http.StatusOK,
},
// 3. Invalid params with invalid bucket.
{
bucket: `invalid\\Bucket`,
prefix: "prefix",
keyMarker: "prefix11",
delimiter: "/",
maxKeys: "10",
statusCode: getAPIError(ErrInvalidBucketName).HTTPStatusCode,
},
// 4. Invalid params with invalid prefix.
{
bucket: "mybucket",
prefix: `invalid\\Prefix`,
keyMarker: "prefix11",
delimiter: "/",
maxKeys: "10",
statusCode: getAPIError(ErrInvalidObjectName).HTTPStatusCode,
},
// 5. Invalid params with invalid maxKeys.
{
bucket: "mybucket",
prefix: "prefix",
keyMarker: "prefix11",
delimiter: "/",
maxKeys: "-1",
statusCode: getAPIError(ErrInvalidMaxUploads).HTTPStatusCode,
},
// 6. Invalid params with unsupported prefix marker combination.
{
bucket: "mybucket",
prefix: "prefix",
keyMarker: "notmatchingmarker",
delimiter: "/",
maxKeys: "10",
statusCode: getAPIError(ErrNotImplemented).HTTPStatusCode,
},
// 7. Invalid params with unsupported delimiter.
{
bucket: "mybucket",
prefix: "prefix",
keyMarker: "notmatchingmarker",
delimiter: "unsupported",
maxKeys: "10",
statusCode: getAPIError(ErrNotImplemented).HTTPStatusCode,
},
// 8. Invalid params with invalid max Keys
{
bucket: "mybucket",
prefix: "prefix",
keyMarker: "prefix11",
delimiter: "/",
maxKeys: "999999999999999999999999999",
statusCode: getAPIError(ErrInvalidMaxUploads).HTTPStatusCode,
},
}
for i, test := range testCases {
queryVal := mkUploadsHealQuery(test.bucket, test.prefix, test.keyMarker, "", test.delimiter, test.maxKeys)
req, err := newTestRequest("GET", "/?"+queryVal.Encode(), 0, nil)
if err != nil {
t.Fatalf("Test %d - Failed to construct list uploads needing heal request - %v", i+1, err)
}
req.Header.Set(minioAdminOpHeader, "list-uploads")
cred := serverConfig.GetCredential()
err = signRequestV4(req, cred.AccessKey, cred.SecretKey)
if err != nil {
t.Fatalf("Test %d - Failed to sign list uploads needing heal request - %v", i+1, err)
}
rec := httptest.NewRecorder()
adminTestBed.mux.ServeHTTP(rec, req)
if test.statusCode != rec.Code {
t.Errorf("Test %d - Expected HTTP status code %d but received %d", i+1, test.statusCode, rec.Code)
}
}
}

View File

@@ -53,6 +53,8 @@ func registerAdminRouter(mux *router.Router) {
// List Objects needing heal.
adminRouter.Methods("GET").Queries("heal", "").Headers(minioAdminOpHeader, "list-objects").HandlerFunc(adminAPI.ListObjectsHealHandler)
// List Uploads needing heal.
adminRouter.Methods("GET").Queries("heal", "").Headers(minioAdminOpHeader, "list-uploads").HandlerFunc(adminAPI.ListUploadsHealHandler)
// List Buckets needing heal.
adminRouter.Methods("GET").Queries("heal", "").Headers(minioAdminOpHeader, "list-buckets").HandlerFunc(adminAPI.ListBucketsHealHandler)

View File

@@ -166,12 +166,13 @@ type ListBucketsResponse struct {
// Upload container for in progress multipart upload
type Upload struct {
Key string
UploadID string `xml:"UploadId"`
Initiator Initiator
Owner Owner
StorageClass string
Initiated string
Key string
UploadID string `xml:"UploadId"`
Initiator Initiator
Owner Owner
StorageClass string
Initiated string
HealUploadInfo *HealObjectInfo `xml:"HealObjectInfo,omitempty"`
}
// CommonPrefix container for prefix response in ListObjectsResponse
@@ -488,6 +489,7 @@ func generateListMultipartUploadsResponse(bucket string, multipartsInfo ListMult
newUpload.UploadID = upload.UploadID
newUpload.Key = upload.Object
newUpload.Initiated = upload.Initiated.UTC().Format(timeFormatAMZLong)
newUpload.HealUploadInfo = upload.HealUploadInfo
listMultipartUploadsResponse.Uploads[index] = newUpload
}
return listMultipartUploadsResponse

View File

@@ -827,3 +827,8 @@ func (fs fsObjects) ListObjectsHeal(bucket, prefix, marker, delimiter string, ma
func (fs fsObjects) ListBucketsHeal() ([]BucketInfo, error) {
return []BucketInfo{}, traceError(NotImplemented{})
}
func (fs fsObjects) ListUploadsHeal(bucket, prefix, marker, uploadIDMarker,
delimiter string, maxUploads int) (ListMultipartsInfo, error) {
return ListMultipartsInfo{}, traceError(NotImplemented{})
}

View File

@@ -248,6 +248,8 @@ type uploadMetadata struct {
Initiated time.Time
StorageClass string // Not supported yet.
HealUploadInfo *HealObjectInfo `xml:"HealUploadInfo,omitempty"`
}
// completePart - completed part container.

View File

@@ -52,4 +52,6 @@ type ObjectLayer interface {
ListBucketsHeal() (buckets []BucketInfo, err error)
HealObject(bucket, object string) error
ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error)
ListUploadsHeal(bucket, prefix, marker, uploadIDMarker,
delimiter string, maxUploads int) (ListMultipartsInfo, error)
}

View File

@@ -17,6 +17,7 @@
package cmd
import (
"path/filepath"
"sort"
"strings"
)
@@ -205,3 +206,219 @@ func (xl xlObjects) ListObjectsHeal(bucket, prefix, marker, delimiter string, ma
// Return error at the end.
return ListObjectsInfo{}, toObjectErr(err, bucket, prefix)
}
// ListUploadsHeal - lists ongoing multipart uploads that require
// healing in one or more disks.
func (xl xlObjects) ListUploadsHeal(bucket, prefix, marker, uploadIDMarker,
delimiter string, maxUploads int) (ListMultipartsInfo, error) {
// For delimiter and prefix as '/' we do not list anything at all
// since according to s3 spec we stop at the 'delimiter' along
// with the prefix. On a flat namespace with 'prefix' as '/'
// we don't have any entries, since all the keys are of form 'keyName/...'
if delimiter == slashSeparator && prefix == slashSeparator {
return ListMultipartsInfo{}, nil
}
// Initiate a list operation.
listMultipartInfo, err := xl.listMultipartUploadsHeal(bucket, prefix,
marker, uploadIDMarker, delimiter, maxUploads)
if err != nil {
return ListMultipartsInfo{}, toObjectErr(err, bucket, prefix)
}
// We got the entries successfully return.
return listMultipartInfo, nil
}
// Fetches list of multipart uploadIDs given bucket, keyMarker, uploadIDMarker.
func fetchMultipartUploadIDs(bucket, keyMarker, uploadIDMarker string,
maxUploads int, disks []StorageAPI) (uploads []uploadMetadata, end bool,
err error) {
// Hold a read lock on keyMarker path.
keyMarkerLock := globalNSMutex.NewNSLock(minioMetaMultipartBucket,
pathJoin(bucket, keyMarker))
keyMarkerLock.RLock()
for _, disk := range disks {
if disk == nil {
continue
}
uploads, end, err = listMultipartUploadIDs(bucket, keyMarker,
uploadIDMarker, maxUploads, disk)
if err == nil ||
!isErrIgnored(err, objMetadataOpIgnoredErrs...) {
break
}
}
keyMarkerLock.RUnlock()
return uploads, end, err
}
// listMultipartUploadsHeal - Returns a list of incomplete multipart
// uploads that need to be healed.
func (xl xlObjects) listMultipartUploadsHeal(bucket, prefix, keyMarker,
uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
result := ListMultipartsInfo{
IsTruncated: true,
MaxUploads: maxUploads,
KeyMarker: keyMarker,
Prefix: prefix,
Delimiter: delimiter,
}
recursive := delimiter != slashSeparator
var uploads []uploadMetadata
var err error
// List all upload ids for the given keyMarker, starting from
// uploadIDMarker.
if uploadIDMarker != "" {
uploads, _, err = fetchMultipartUploadIDs(bucket, keyMarker,
uploadIDMarker, maxUploads, xl.getLoadBalancedDisks())
if err != nil {
return ListMultipartsInfo{}, err
}
maxUploads = maxUploads - len(uploads)
}
// We can't use path.Join() as it strips off the trailing '/'.
multipartPrefixPath := pathJoin(bucket, prefix)
// multipartPrefixPath should have a trailing '/' when prefix = "".
if prefix == "" {
multipartPrefixPath += slashSeparator
}
multipartMarkerPath := ""
if keyMarker != "" {
multipartMarkerPath = pathJoin(bucket, keyMarker)
}
// `heal bool` is used to differentiate listing of incomplete
// uploads (and parts) from a regular listing of incomplete
// parts by client SDKs or mc-like commands, within a treewalk
// pool.
heal := true
// The listing is truncated if we have maxUploads entries and
// there are more entries to be listed.
truncated := true
var walkerCh chan treeWalkResult
var walkerDoneCh chan struct{}
// Check if we have room left to send more uploads.
if maxUploads > 0 {
walkerCh, walkerDoneCh = xl.listPool.Release(listParams{
bucket: minioMetaMultipartBucket,
recursive: recursive,
marker: multipartMarkerPath,
prefix: multipartPrefixPath,
heal: heal,
})
if walkerCh == nil {
walkerDoneCh = make(chan struct{})
isLeaf := xl.isMultipartUpload
listDir := listDirFactory(isLeaf, xlTreeWalkIgnoredErrs,
xl.getLoadBalancedDisks()...)
walkerCh = startTreeWalk(minioMetaMultipartBucket,
multipartPrefixPath, multipartMarkerPath,
recursive, listDir, isLeaf, walkerDoneCh)
}
// Collect uploads until maxUploads limit is reached.
for walkResult := range walkerCh {
// Ignore errors like errDiskNotFound
// and errFileNotFound.
if isErrIgnored(walkResult.err,
xlTreeWalkIgnoredErrs...) {
continue
}
// For any error during tree walk we should
// return right away.
if walkResult.err != nil {
return ListMultipartsInfo{}, walkResult.err
}
entry := strings.TrimPrefix(walkResult.entry,
retainSlash(bucket))
// Skip entries that are not object directory.
if hasSuffix(walkResult.entry, slashSeparator) {
uploads = append(uploads, uploadMetadata{
Object: entry,
})
maxUploads--
if maxUploads == 0 {
break
}
continue
}
// For an object entry we get all its pending
// uploadIDs.
var newUploads []uploadMetadata
var end bool
uploadIDMarker = ""
newUploads, end, err = fetchMultipartUploadIDs(bucket, entry, uploadIDMarker,
maxUploads, xl.getLoadBalancedDisks())
if err != nil {
return ListMultipartsInfo{}, err
}
uploads = append(uploads, newUploads...)
maxUploads -= len(newUploads)
if end && walkResult.end {
truncated = false
break
}
if maxUploads == 0 {
break
}
}
}
// For all received uploads fill in the multiparts result.
for _, upload := range uploads {
var objectName string
var uploadID string
if hasSuffix(upload.Object, slashSeparator) {
// All directory entries are common
// prefixes. For common prefixes, upload ids
// are empty.
uploadID = ""
objectName = upload.Object
result.CommonPrefixes = append(result.CommonPrefixes, objectName)
} else {
// Check if upload needs healing.
uploadIDPath := filepath.Join(bucket, upload.Object, upload.UploadID)
partsMetadata, errs := readAllXLMetadata(xl.storageDisks,
minioMetaMultipartBucket, uploadIDPath)
if xlShouldHeal(partsMetadata, errs) {
healUploadInfo := xlHealStat(xl, partsMetadata, errs)
upload.HealUploadInfo = &healUploadInfo
result.Uploads = append(result.Uploads, upload)
}
uploadID = upload.UploadID
objectName = upload.Object
}
result.NextKeyMarker = objectName
result.NextUploadIDMarker = uploadID
}
if truncated {
// Put back the tree walk go-routine into the pool for
// subsequent use.
xl.listPool.Set(listParams{
bucket: bucket,
recursive: recursive,
marker: result.NextKeyMarker,
prefix: prefix,
heal: heal,
}, walkerCh, walkerDoneCh)
}
result.IsTruncated = truncated
// Result is not truncated, reset the markers.
if !result.IsTruncated {
result.NextKeyMarker = ""
result.NextUploadIDMarker = ""
}
return result, nil
}

View File

@@ -18,6 +18,8 @@ package cmd
import (
"bytes"
"path"
"path/filepath"
"strconv"
"testing"
)
@@ -139,3 +141,78 @@ func TestListObjectsHeal(t *testing.T) {
}
}
// Test for ListUploadsHeal API for XL.
func TestListUploadsHeal(t *testing.T) {
initNSLock(false)
rootPath, err := newTestConfig(globalMinioDefaultRegion)
if err != nil {
t.Fatalf("Init Test config failed")
}
// Remove config directory after the test ends.
defer removeAll(rootPath)
// Create an instance of XL backend.
xl, fsDirs, err := prepareXL()
if err != nil {
t.Fatal(err)
}
// Cleanup backend directories on function return.
defer removeRoots(fsDirs)
bucketName := "bucket"
prefix := "prefix"
objName := path.Join(prefix, "obj")
// Create test bucket.
err = xl.MakeBucket(bucketName)
if err != nil {
t.Fatal(err)
}
// Create a new multipart upload.
uploadID, err := xl.NewMultipartUpload(bucketName, objName, nil)
if err != nil {
t.Fatal(err)
}
// Upload a part.
data := bytes.Repeat([]byte("a"), 1024)
_, err = xl.PutObjectPart(bucketName, objName, uploadID, 1,
int64(len(data)), bytes.NewReader(data), "", "")
if err != nil {
t.Fatal(err)
}
// Check if list uploads heal returns any uploads to be healed
// incorrectly.
listUploadsInfo, err := xl.ListUploadsHeal(bucketName, prefix, "", "", "", 1000)
if err != nil {
t.Fatal(err)
}
// All uploads intact nothing to heal.
if len(listUploadsInfo.Uploads) != 0 {
t.Errorf("Expected no uploads but received %d", len(listUploadsInfo.Uploads))
}
// Delete the part from the first disk to make the upload (and
// its part) to appear in upload heal listing.
firstDisk := xl.(*xlObjects).storageDisks[0]
err = firstDisk.DeleteFile(minioMetaMultipartBucket,
filepath.Join(bucketName, objName, uploadID, xlMetaJSONFile))
if err != nil {
t.Fatal(err)
}
listUploadsInfo, err = xl.ListUploadsHeal(bucketName, prefix, "", "", "", 1000)
if err != nil {
t.Fatal(err)
}
// One upload with missing xl.json on first disk.
if len(listUploadsInfo.Uploads) != 1 {
t.Errorf("Expected 1 upload but received %d", len(listUploadsInfo.Uploads))
}
}