Implement heal-upload admin API (#3914)

This API is meant for administrative tools like mc-admin to heal an
ongoing multipart upload on a Minio server.  N B This set of admin
APIs apply only for Minio servers.

`github.com/minio/minio/pkg/madmin` provides a go SDK for this (and
other admin) operations.  Specifically,

  func HealUpload(bucket, object, uploadID string, dryRun bool) error

Sample admin API request:
POST
/?heal&bucket=mybucket&object=myobject&upload-id=myuploadID&dry-run
- Header(s): ["x-minio-operation"] = "upload"

Notes:
- bucket, object and upload-id are mandatory query parameters
- if dry-run is set, API returns success if all parameters passed are
  valid.
This commit is contained in:
Krishnan Parthasarathi 2017-03-17 21:55:49 +05:30 committed by Harshavardhana
parent d4eea224d4
commit c192e5c9b2
8 changed files with 460 additions and 90 deletions

View File

@ -24,6 +24,7 @@ import (
"io/ioutil"
"net/http"
"net/url"
"path"
"strconv"
"time"
)
@ -49,6 +50,7 @@ const (
mgmtDryRun mgmtQueryKey = "dry-run"
mgmtUploadIDMarker mgmtQueryKey = "upload-id-marker"
mgmtMaxUploads mgmtQueryKey = "max-uploads"
mgmtUploadID mgmtQueryKey = "upload-id"
)
// ServerVersion - server version
@ -654,6 +656,75 @@ func (adminAPI adminAPIHandlers) HealObjectHandler(w http.ResponseWriter, r *htt
writeSuccessResponseHeadersOnly(w)
}
// HealUploadHandler - POST /?heal&bucket=mybucket&object=myobject&upload-id=myuploadID&dry-run
// - x-minio-operation = upload
// - bucket, object and upload-id are mandatory query parameters
// Heal a given upload, if present.
func (adminAPI adminAPIHandlers) HealUploadHandler(w http.ResponseWriter, r *http.Request) {
// Get object layer instance.
objLayer := newObjectLayerFn()
if objLayer == nil {
writeErrorResponse(w, ErrServerNotInitialized, r.URL)
return
}
// Validate request signature.
adminAPIErr := checkRequestAuthType(r, "", "", "")
if adminAPIErr != ErrNone {
writeErrorResponse(w, adminAPIErr, r.URL)
return
}
vars := r.URL.Query()
bucket := vars.Get(string(mgmtBucket))
object := vars.Get(string(mgmtObject))
uploadID := vars.Get(string(mgmtUploadID))
uploadObj := path.Join(bucket, object, uploadID)
// Validate bucket and object names as supplied via query
// parameters.
if err := checkBucketAndObjectNames(bucket, object); err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
// Validate the bucket and object w.r.t backend representation
// of an upload.
if err := checkBucketAndObjectNames(minioMetaMultipartBucket,
uploadObj); err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
// Check if upload exists.
if _, err := objLayer.GetObjectInfo(minioMetaMultipartBucket,
uploadObj); err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
// if dry-run is set in query params then perform validations
// and return success.
if isDryRun(vars) {
writeSuccessResponseHeadersOnly(w)
return
}
//We are able to use HealObject for healing an upload since an
//ongoing upload has the same backend representation as an
//object. The 'object' corresponding to a given bucket,
//object and uploadID is
//.minio.sys/multipart/bucket/object/uploadID.
err := objLayer.HealObject(minioMetaMultipartBucket, uploadObj)
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
// Return 200 on success.
writeSuccessResponseHeadersOnly(w)
}
// HealFormatHandler - POST /?heal&dry-run
// - x-minio-operation = format
// - bucket and object are both mandatory query parameters

View File

@ -21,6 +21,7 @@ import (
"encoding/json"
"encoding/xml"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
@ -1048,6 +1049,160 @@ func TestHealObjectHandler(t *testing.T) {
t.Errorf("Test %d - Expected HTTP status code %d but received %d", i+1, test.statusCode, rec.Code)
}
}
}
// buildAdminRequest - helper function to build an admin API request.
func buildAdminRequest(queryVal url.Values, opHdr, method string,
contentLength int64, bodySeeker io.ReadSeeker) (*http.Request, error) {
req, err := newTestRequest(method, "/?"+queryVal.Encode(), contentLength, bodySeeker)
if err != nil {
return nil, traceError(err)
}
req.Header.Set(minioAdminOpHeader, opHdr)
cred := serverConfig.GetCredential()
err = signRequestV4(req, cred.AccessKey, cred.SecretKey)
if err != nil {
return nil, traceError(err)
}
return req, nil
}
// mkHealUploadQuery - helper to build HealUploadHandler query string.
func mkHealUploadQuery(bucket, object, uploadID, dryRun string) url.Values {
queryVal := url.Values{}
queryVal.Set(string(mgmtBucket), bucket)
queryVal.Set(string(mgmtObject), object)
queryVal.Set(string(mgmtUploadID), uploadID)
queryVal.Set("heal", "")
queryVal.Set(string(mgmtDryRun), dryRun)
return queryVal
}
// TestHealUploadHandler - test for HealUploadHandler.
func TestHealUploadHandler(t *testing.T) {
adminTestBed, err := prepareAdminXLTestBed()
if err != nil {
t.Fatal("Failed to initialize a single node XL backend for admin handler tests.")
}
defer adminTestBed.TearDown()
// Create an object myobject under bucket mybucket.
bucketName := "mybucket"
objName := "myobject"
err = adminTestBed.objLayer.MakeBucket(bucketName)
if err != nil {
t.Fatalf("Failed to make bucket %s - %v", bucketName, err)
}
// Create a new multipart upload.
uploadID, err := adminTestBed.objLayer.NewMultipartUpload(bucketName, objName, nil)
if err != nil {
t.Fatalf("Failed to create a new multipart upload %s/%s - %v",
bucketName, objName, err)
}
// Upload a part.
partID := 1
_, err = adminTestBed.objLayer.PutObjectPart(bucketName, objName, uploadID,
partID, int64(len("hello")), bytes.NewReader([]byte("hello")), "", "")
if err != nil {
t.Fatalf("Failed to upload part %d of %s/%s - %v", partID,
bucketName, objName, err)
}
testCases := []struct {
bucket string
object string
dryrun string
statusCode int
}{
// 1. Valid test case.
{
bucket: bucketName,
object: objName,
statusCode: http.StatusOK,
},
// 2. Invalid bucket name.
{
bucket: `invalid\\Bucket`,
object: "myobject",
statusCode: http.StatusBadRequest,
},
// 3. Bucket not found.
{
bucket: "bucketnotfound",
object: "myobject",
statusCode: http.StatusNotFound,
},
// 4. Invalid object name.
{
bucket: bucketName,
object: `invalid\\Object`,
statusCode: http.StatusBadRequest,
},
// 5. Object not found.
{
bucket: bucketName,
object: "objectnotfound",
statusCode: http.StatusNotFound,
},
// 6. Valid test case with dry-run.
{
bucket: bucketName,
object: objName,
dryrun: "yes",
statusCode: http.StatusOK,
},
}
for i, test := range testCases {
// Prepare query params.
queryVal := mkHealUploadQuery(test.bucket, test.object, uploadID, test.dryrun)
req, err := buildAdminRequest(queryVal, "upload", http.MethodPost, 0, nil)
if err != nil {
t.Fatalf("Test %d - Failed to construct heal object request - %v", i+1, err)
}
rec := httptest.NewRecorder()
adminTestBed.mux.ServeHTTP(rec, req)
if test.statusCode != rec.Code {
t.Errorf("Test %d - Expected HTTP status code %d but received %d", i+1, test.statusCode, rec.Code)
}
}
sample := testCases[0]
// Modify authorization header after signing to test signature
// mismatch handling.
queryVal := mkHealUploadQuery(sample.bucket, sample.object, uploadID, sample.dryrun)
req, err := buildAdminRequest(queryVal, "upload", "POST", 0, nil)
if err != nil {
t.Fatalf("Failed to construct heal object request - %v", err)
}
// Set x-amz-date to a date different than time of signing.
req.Header.Set("x-amz-date", time.Time{}.Format(iso8601Format))
rec := httptest.NewRecorder()
adminTestBed.mux.ServeHTTP(rec, req)
if rec.Code != http.StatusForbidden {
t.Errorf("Expected %d but received %d", http.StatusBadRequest, rec.Code)
}
// Set objectAPI to nil to test Server not initialized case.
resetGlobalObjectAPI()
queryVal = mkHealUploadQuery(sample.bucket, sample.object, uploadID, sample.dryrun)
req, err = buildAdminRequest(queryVal, "upload", "POST", 0, nil)
if err != nil {
t.Fatalf("Failed to construct heal object request - %v", err)
}
rec = httptest.NewRecorder()
adminTestBed.mux.ServeHTTP(rec, req)
if rec.Code != http.StatusServiceUnavailable {
t.Errorf("Expected %d but received %d", http.StatusServiceUnavailable, rec.Code)
}
}
// TestHealFormatHandler - test for HealFormatHandler.
@ -1061,21 +1216,11 @@ func TestHealFormatHandler(t *testing.T) {
// Prepare query params for heal-format mgmt REST API.
queryVal := url.Values{}
queryVal.Set("heal", "")
req, err := newTestRequest("POST", "/?"+queryVal.Encode(), 0, nil)
req, err := buildAdminRequest(queryVal, "format", "POST", 0, nil)
if err != nil {
t.Fatalf("Failed to construct heal object request - %v", err)
}
// Set x-minio-operation header to format.
req.Header.Set(minioAdminOpHeader, "format")
// Sign the request using signature v4.
cred := serverConfig.GetCredential()
err = signRequestV4(req, cred.AccessKey, cred.SecretKey)
if err != nil {
t.Fatalf("Failed to sign heal object request - %v", err)
}
rec := httptest.NewRecorder()
adminTestBed.mux.ServeHTTP(rec, req)
if rec.Code != http.StatusOK {
@ -1105,21 +1250,11 @@ func TestGetConfigHandler(t *testing.T) {
queryVal := url.Values{}
queryVal.Set("config", "")
req, err := newTestRequest("GET", "/?"+queryVal.Encode(), 0, nil)
req, err := buildAdminRequest(queryVal, "get", http.MethodGet, 0, nil)
if err != nil {
t.Fatalf("Failed to construct get-config object request - %v", err)
}
// Set x-minio-operation header to get.
req.Header.Set(minioAdminOpHeader, "get")
// Sign the request using signature v4.
cred := serverConfig.GetCredential()
err = signRequestV4(req, cred.AccessKey, cred.SecretKey)
if err != nil {
t.Fatalf("Failed to sign heal object request - %v", err)
}
rec := httptest.NewRecorder()
adminTestBed.mux.ServeHTTP(rec, req)
if rec.Code != http.StatusOK {
@ -1154,21 +1289,12 @@ func TestSetConfigHandler(t *testing.T) {
queryVal := url.Values{}
queryVal.Set("config", "")
req, err := newTestRequest("PUT", "/?"+queryVal.Encode(), int64(len(configJSON)), bytes.NewReader(configJSON))
req, err := buildAdminRequest(queryVal, "set", http.MethodPut, int64(len(configJSON)),
bytes.NewReader(configJSON))
if err != nil {
t.Fatalf("Failed to construct get-config object request - %v", err)
}
// Set x-minio-operation header to set.
req.Header.Set(minioAdminOpHeader, "set")
// Sign the request using signature v4.
cred := serverConfig.GetCredential()
err = signRequestV4(req, cred.AccessKey, cred.SecretKey)
if err != nil {
t.Fatalf("Failed to sign heal object request - %v", err)
}
rec := httptest.NewRecorder()
adminTestBed.mux.ServeHTTP(rec, req)
if rec.Code != http.StatusOK {
@ -1288,9 +1414,9 @@ func TestWriteSetConfigResponse(t *testing.T) {
}
}
// mkUploadsHealQuery - helper function to construct query values for
// mkListUploadsHealQuery - helper function to construct query values for
// listUploadsHeal.
func mkUploadsHealQuery(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploadsStr string) url.Values {
func mkListUploadsHealQuery(bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploadsStr string) url.Values {
queryVal := make(url.Values)
queryVal.Set("heal", "")
@ -1401,19 +1527,13 @@ func TestListHealUploadsHandler(t *testing.T) {
}
for i, test := range testCases {
queryVal := mkUploadsHealQuery(test.bucket, test.prefix, test.keyMarker, "", test.delimiter, test.maxKeys)
queryVal := mkListUploadsHealQuery(test.bucket, test.prefix, test.keyMarker, "", test.delimiter, test.maxKeys)
req, err := newTestRequest("GET", "/?"+queryVal.Encode(), 0, nil)
req, err := buildAdminRequest(queryVal, "list-uploads", http.MethodGet, 0, nil)
if err != nil {
t.Fatalf("Test %d - Failed to construct list uploads needing heal request - %v", i+1, err)
}
req.Header.Set(minioAdminOpHeader, "list-uploads")
cred := serverConfig.GetCredential()
err = signRequestV4(req, cred.AccessKey, cred.SecretKey)
if err != nil {
t.Fatalf("Test %d - Failed to sign list uploads needing heal request - %v", i+1, err)
}
rec := httptest.NewRecorder()
adminTestBed.mux.ServeHTTP(rec, req)
if test.statusCode != rec.Code {

View File

@ -64,6 +64,8 @@ func registerAdminRouter(mux *router.Router) {
adminRouter.Methods("POST").Queries("heal", "").Headers(minioAdminOpHeader, "object").HandlerFunc(adminAPI.HealObjectHandler)
// Heal Format.
adminRouter.Methods("POST").Queries("heal", "").Headers(minioAdminOpHeader, "format").HandlerFunc(adminAPI.HealFormatHandler)
// Heal Uploads.
adminRouter.Methods("POST").Queries("heal", "").Headers(minioAdminOpHeader, "upload").HandlerFunc(adminAPI.HealUploadHandler)
/// Config operations

View File

@ -477,10 +477,6 @@ func healObject(storageDisks []StorageAPI, bucket string, object string, quorum
// and later the disk comes back up again, heal on the object
// should delete it.
func (xl xlObjects) HealObject(bucket, object string) error {
if err := checkGetObjArgs(bucket, object); err != nil {
return err
}
// Lock the object before healing.
objectLock := globalNSMutex.NewNSLock(bucket, object)
objectLock.RLock()

View File

@ -17,7 +17,9 @@
package cmd
import (
"bytes"
"fmt"
"path/filepath"
"testing"
)
@ -486,3 +488,73 @@ func TestListBucketsHeal(t *testing.T) {
t.Fatalf("Name of missing bucket is incorrect, expected: %s, found: %s", corruptedBucketName, buckets[0].Name)
}
}
// Tests healing of object.
func TestHealObjectXL(t *testing.T) {
root, err := newTestConfig(globalMinioDefaultRegion)
if err != nil {
t.Fatal(err)
}
defer removeAll(root)
nDisks := 16
fsDirs, err := getRandomDisks(nDisks)
if err != nil {
t.Fatal(err)
}
defer removeRoots(fsDirs)
endpoints, err := parseStorageEndpoints(fsDirs)
if err != nil {
t.Fatal(err)
}
// Everything is fine, should return nil
obj, _, err := initObjectLayer(endpoints)
if err != nil {
t.Fatal(err)
}
bucket := "bucket"
object := "object"
data := []byte("hello")
err = obj.MakeBucket(bucket)
if err != nil {
t.Fatalf("Failed to make a bucket - %v", err)
}
_, err = obj.PutObject(bucket, object, int64(len(data)), bytes.NewReader(data), nil, "")
if err != nil {
t.Fatalf("Failed to put an object - %v", err)
}
// Remove the object backend files from the first disk.
xl := obj.(*xlObjects)
firstDisk := xl.storageDisks[0]
err = firstDisk.DeleteFile(bucket, filepath.Join(object, xlMetaJSONFile))
if err != nil {
t.Fatalf("Failed to delete a file - %v", err)
}
err = obj.HealObject(bucket, object)
if err != nil {
t.Fatalf("Failed to heal object - %v", err)
}
_, err = firstDisk.StatFile(bucket, filepath.Join(object, xlMetaJSONFile))
if err != nil {
t.Errorf("Expected xl.json file to be present but stat failed - %v", err)
}
// Nil more than half the disks, to remove write quorum.
for i := 0; i <= len(xl.storageDisks)/2; i++ {
xl.storageDisks[i] = nil
}
// Try healing now, expect to receive errDiskNotFound.
err = obj.HealObject(bucket, object)
if errorCause(err) != errDiskNotFound {
t.Errorf("Expected %v but received %v", errDiskNotFound, err)
}
}

View File

@ -44,6 +44,7 @@ func main() {
| | |[`HealObject`](#HealObject)|||
| | |[`HealFormat`](#HealFormat)|||
| | |[`ListUploadsHeal`](#ListUploadsHeal)|||
| | |[`HealUpload`](#HealUpload)|||
## 1. Constructor
<a name="Minio"></a>
@ -278,6 +279,62 @@ __Example__
log.Println("successfully healed storage format on available disks.")
```
<a name="ListUploadsHeal"> </a>
### ListUploadsHeal(bucket, prefix string, recursive bool, doneCh <-chan struct{}) (<-chan UploadInfo, error)
List ongoing multipart uploads that need healing.
| Param | Type | Description |
|---|---|---|
|`ui.Key` | _string_ | Name of the object being uploaded |
|`ui.UploadID` | _string_ | UploadID of the ongoing multipart upload |
|`ui.HealUploadInfo.Status` | _healStatus_| One of `Healthy`, `CanHeal`, `Corrupted`, `QuorumUnavailable`|
|`ui.Err`| _error_ | non-nil if fetching fetching healing information failed |
__Example__
``` go
// Set true if recursive listing is needed.
isRecursive := true
// List objects that need healing for a given bucket and
// prefix.
healUploadsCh, err := madmClnt.ListUploadsHeal(bucket, prefix, isRecursive, doneCh)
if err != nil {
log.Fatalln("Failed to get list of uploads to be healed: ", err)
}
for upload := range healUploadsCh {
if upload.Err != nil {
log.Println("upload listing error: ", upload.Err)
}
if upload.HealUploadInfo != nil {
switch healInfo := *upload.HealUploadInfo; healInfo.Status {
case madmin.CanHeal:
fmt.Println(upload.Key, " can be healed.")
case madmin.QuorumUnavailable:
fmt.Println(upload.Key, " can't be healed until quorum is available.")
case madmin.Corrupted:
fmt.Println(upload.Key, " can't be healed, not enough information.")
}
}
}
```
<a name="HealUpload"></a>
### HealUpload(bucket, object, uploadID string, isDryRun bool) error
If upload is successfully healed returns nil, otherwise returns error indicating the reason for failure. If isDryRun is true, then the upload is not healed, but heal upload request is validated by the server. e.g, if the upload exists, if upload name is valid etc.
``` go
isDryRun = false
err = madmClnt.HealUpload("mybucket", "myobject", "myuploadID", isDryRun)
if err != nil {
log.Fatalln(err)
}
log.Println("successfully healed mybucket/myobject/myuploadID")
```
## 5. Config operations
@ -354,46 +411,3 @@ __Example__
}
log.Println("SetConfig: ", string(buf.Bytes()))
```
<a name="ListUploadsHeal"> </a>
### ListUploadsHeal(bucket, prefix string, recursive bool, doneCh <-chan struct{}) (<-chan UploadInfo, error)
List ongoing multipart uploads that need healing.
| Param | Type | Description |
|---|---|---|
|`ui.Key` | _string_ | Name of the object being uploaded |
|`ui.UploadID` | _string_ | UploadID of the ongoing multipart upload |
|`ui.HealUploadInfo.Status` | _healStatus_| One of `Healthy`, `CanHeal`, `Corrupted`, `QuorumUnavailable`|
|`ui.Err`| _error_ | non-nil if fetching fetching healing information failed |
__Example__
``` go
// Set true if recursive listing is needed.
isRecursive := true
// List objects that need healing for a given bucket and
// prefix.
healUploadsCh, err := madmClnt.ListUploadsHeal(bucket, prefix, isRecursive, doneCh)
if err != nil {
log.Fatalln("Failed to get list of uploads to be healed: ", err)
}
for upload := range healUploadsCh {
if upload.Err != nil {
log.Println("upload listing error: ", upload.Err)
}
if upload.HealUploadInfo != nil {
switch healInfo := *upload.HealUploadInfo; healInfo.Status {
case madmin.CanHeal:
fmt.Println(upload.Key, " can be healed.")
case madmin.QuorumUnavailable:
fmt.Println(upload.Key, " can't be healed until quorum is available.")
case madmin.Corrupted:
fmt.Println(upload.Key, " can't be healed, not enough information.")
}
}
}
```

View File

@ -0,0 +1,57 @@
// +build ignore
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package main
import (
"log"
"github.com/minio/minio/pkg/madmin"
)
func main() {
// Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY are
// dummy values, please replace them with original values.
// Note: YOUR-ACCESSKEYID, YOUR-SECRETACCESSKEY are
// dummy values, please replace them with original values.
// API requests are secure (HTTPS) if secure=true and insecure (HTTPS) otherwise.
// New returns an Minio Admin client object.
madmClnt, err := madmin.New("your-minio.example.com:9000", "YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", true)
if err != nil {
log.Fatalln(err)
}
// Heal upload mybucket/myobject/uploadID - dry run.
isDryRun := true
err = madmClnt.HealUpload("mybucket", "myobject", "myuploadID", isDryRun)
if err != nil {
log.Fatalln(err)
}
// Heal upload mybucket/myobject/uploadID - this time for real.
isDryRun = false
err = madmClnt.HealUpload("mybucket", "myobject", "myuploadID", isDryRun)
if err != nil {
log.Fatalln(err)
}
log.Println("successfully healed mybucket/myobject/myuploadID")
}

View File

@ -231,6 +231,7 @@ const (
healDryRun healQueryKey = "dry-run"
healUploadIDMarker healQueryKey = "upload-id-marker"
healMaxUpload healQueryKey = "max-uploads"
healUploadID healQueryKey = "upload-id"
)
// mkHealQueryVal - helper function to construct heal REST API query params.
@ -432,6 +433,43 @@ func (adm *AdminClient) HealBucket(bucket string, dryrun bool) error {
return nil
}
// HealUpload - Heal the given upload.
func (adm *AdminClient) HealUpload(bucket, object, uploadID string, dryrun bool) error {
// Construct query params.
queryVal := url.Values{}
queryVal.Set("heal", "")
queryVal.Set(string(healBucket), bucket)
queryVal.Set(string(healObject), object)
queryVal.Set(string(healUploadID), uploadID)
if dryrun {
queryVal.Set(string(healDryRun), "")
}
hdrs := make(http.Header)
hdrs.Set(minioAdminOpHeader, "upload")
reqData := requestData{
queryValues: queryVal,
customHeaders: hdrs,
}
// Execute POST on
// /?heal&bucket=mybucket&object=myobject&upload-id=uploadID
// to heal an upload.
resp, err := adm.executeMethod("POST", reqData)
defer closeResponse(resp)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return httpRespToErrorResponse(resp)
}
return nil
}
// HealObject - Heal the given object.
func (adm *AdminClient) HealObject(bucket, object string, dryrun bool) error {
// Construct query params.