Implement mgmt REST APIs for heal subcommands (#3533)

The heal APIs supported in this change are,
- listing of objects to be healed.
- healing a bucket.
- healing an object.
This commit is contained in:
Krishnan Parthasarathi
2017-01-17 23:32:58 +05:30
committed by Harshavardhana
parent 98a6a2bcab
commit c194b9f5f1
17 changed files with 1482 additions and 103 deletions

View File

@@ -1,5 +1,5 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
* Minio Cloud Storage, (C) 2016, 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@ import (
"encoding/json"
"net/http"
"net/url"
"strconv"
"time"
)
@@ -27,6 +28,21 @@ const (
minioAdminOpHeader = "X-Minio-Operation"
)
// Type-safe query params.
type mgmtQueryKey string
// Only valid query params for list/clear locks management APIs.
const (
mgmtBucket mgmtQueryKey = "bucket"
mgmtObject mgmtQueryKey = "object"
mgmtPrefix mgmtQueryKey = "prefix"
mgmtOlderThan mgmtQueryKey = "older-than"
mgmtDelimiter mgmtQueryKey = "delimiter"
mgmtMarker mgmtQueryKey = "marker"
mgmtMaxKey mgmtQueryKey = "max-key"
mgmtDryRun mgmtQueryKey = "dry-run"
)
// ServiceStatusHandler - GET /?service
// HTTP header x-minio-operation: status
// ----------
@@ -63,32 +79,21 @@ func (adminAPI adminAPIHandlers) ServiceRestartHandler(w http.ResponseWriter, r
}
// Reply to the client before restarting minio server.
w.WriteHeader(http.StatusOK)
writeSuccessResponseHeadersOnly(w)
sendServiceCmd(globalAdminPeers, serviceRestart)
}
// Type-safe lock query params.
type lockQueryKey string
// Only valid query params for list/clear locks management APIs.
const (
lockBucket lockQueryKey = "bucket"
lockPrefix lockQueryKey = "prefix"
lockOlderThan lockQueryKey = "older-than"
)
// validateLockQueryParams - Validates query params for list/clear locks management APIs.
func validateLockQueryParams(vars url.Values) (string, string, time.Duration, APIErrorCode) {
bucket := vars.Get(string(lockBucket))
prefix := vars.Get(string(lockPrefix))
relTimeStr := vars.Get(string(lockOlderThan))
bucket := vars.Get(string(mgmtBucket))
prefix := vars.Get(string(mgmtPrefix))
relTimeStr := vars.Get(string(mgmtOlderThan))
// N B empty bucket name is invalid
if !IsValidBucketName(bucket) {
return "", "", time.Duration(0), ErrInvalidBucketName
}
// empty prefix is valid.
if !IsValidObjectPrefix(prefix) {
return "", "", time.Duration(0), ErrInvalidObjectName
@@ -195,3 +200,176 @@ func (adminAPI adminAPIHandlers) ClearLocksHandler(w http.ResponseWriter, r *htt
// Reply with list of locks cleared, as json.
writeSuccessResponseJSON(w, jsonBytes)
}
// validateHealQueryParams - Validates query params for heal list management API.
func validateHealQueryParams(vars url.Values) (string, string, string, string, int, APIErrorCode) {
bucket := vars.Get(string(mgmtBucket))
prefix := vars.Get(string(mgmtPrefix))
marker := vars.Get(string(mgmtMarker))
delimiter := vars.Get(string(mgmtDelimiter))
maxKeyStr := vars.Get(string(mgmtMaxKey))
// N B empty bucket name is invalid
if !IsValidBucketName(bucket) {
return "", "", "", "", 0, ErrInvalidBucketName
}
// empty prefix is valid.
if !IsValidObjectPrefix(prefix) {
return "", "", "", "", 0, ErrInvalidObjectName
}
// check if maxKey is a valid integer.
maxKey, err := strconv.Atoi(maxKeyStr)
if err != nil {
return "", "", "", "", 0, ErrInvalidMaxKeys
}
// Validate prefix, marker, delimiter and maxKey.
apiErr := validateListObjectsArgs(prefix, marker, delimiter, maxKey)
if apiErr != ErrNone {
return "", "", "", "", 0, apiErr
}
return bucket, prefix, marker, delimiter, maxKey, ErrNone
}
// ListObjectsHealHandler - GET /?heal&bucket=mybucket&prefix=myprefix&marker=mymarker&delimiter=&mydelimiter&maxKey=1000
// - bucket is mandatory query parameter
// - rest are optional query parameters
// List upto maxKey objects that need healing in a given bucket matching the given prefix.
func (adminAPI adminAPIHandlers) ListObjectsHealHandler(w http.ResponseWriter, r *http.Request) {
// Get object layer instance.
objLayer := newObjectLayerFn()
if objLayer == nil {
writeErrorResponse(w, ErrServerNotInitialized, r.URL)
return
}
// Validate request signature.
adminAPIErr := checkRequestAuthType(r, "", "", "")
if adminAPIErr != ErrNone {
writeErrorResponse(w, adminAPIErr, r.URL)
return
}
// Validate query params.
vars := r.URL.Query()
bucket, prefix, marker, delimiter, maxKey, adminAPIErr := validateHealQueryParams(vars)
if adminAPIErr != ErrNone {
writeErrorResponse(w, adminAPIErr, r.URL)
return
}
// Get the list objects to be healed.
objectInfos, err := objLayer.ListObjectsHeal(bucket, prefix, marker, delimiter, maxKey)
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
listResponse := generateListObjectsV1Response(bucket, prefix, marker, delimiter, maxKey, objectInfos)
// Write success response.
writeSuccessResponseXML(w, encodeResponse(listResponse))
}
// HealBucketHandler - POST /?heal&bucket=mybucket
// - bucket is mandatory query parameter
// Heal a given bucket, if present.
func (adminAPI adminAPIHandlers) HealBucketHandler(w http.ResponseWriter, r *http.Request) {
// Get object layer instance.
objLayer := newObjectLayerFn()
if objLayer == nil {
writeErrorResponse(w, ErrServerNotInitialized, r.URL)
return
}
// Validate request signature.
adminAPIErr := checkRequestAuthType(r, "", "", "")
if adminAPIErr != ErrNone {
writeErrorResponse(w, adminAPIErr, r.URL)
return
}
// Validate bucket name and check if it exists.
vars := r.URL.Query()
bucket := vars.Get(string(mgmtBucket))
if err := checkBucketExist(bucket, objLayer); err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
// if dry-run=yes, then only perform validations and return success.
if isDryRun(vars) {
writeSuccessResponseHeadersOnly(w)
return
}
// Heal the given bucket.
err := objLayer.HealBucket(bucket)
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
// Return 200 on success.
writeSuccessResponseHeadersOnly(w)
}
// isDryRun - returns true if dry-run query param was set to yes and false otherwise.
func isDryRun(qval url.Values) bool {
if dryRun := qval.Get(string(mgmtDryRun)); dryRun == "yes" {
return true
}
return false
}
// HealObjectHandler - POST /?heal&bucket=mybucket&object=myobject
// - bucket and object are both mandatory query parameters
// Heal a given object, if present.
func (adminAPI adminAPIHandlers) HealObjectHandler(w http.ResponseWriter, r *http.Request) {
// Get object layer instance.
objLayer := newObjectLayerFn()
if objLayer == nil {
writeErrorResponse(w, ErrServerNotInitialized, r.URL)
return
}
// Validate request signature.
adminAPIErr := checkRequestAuthType(r, "", "", "")
if adminAPIErr != ErrNone {
writeErrorResponse(w, adminAPIErr, r.URL)
return
}
vars := r.URL.Query()
bucket := vars.Get(string(mgmtBucket))
object := vars.Get(string(mgmtObject))
// Validate bucket and object names.
if err := checkBucketAndObjectNames(bucket, object); err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
// Check if object exists.
if _, err := objLayer.GetObjectInfo(bucket, object); err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
// if dry-run=yes, then only perform validations and return success.
if isDryRun(vars) {
writeSuccessResponseHeadersOnly(w)
return
}
err := objLayer.HealObject(bucket, object)
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
// Return 200 on success.
writeSuccessResponseHeadersOnly(w)
}

View File

@@ -17,8 +17,8 @@
package cmd
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
@@ -134,11 +134,12 @@ func testServicesCmdHandler(cmd cmdType, t *testing.T) {
if cmd == statusCmd {
// Initializing objectLayer and corresponding
// []StorageAPI since DiskInfo() method requires it.
objLayer, fsDirs, fsErr := prepareXL()
if fsErr != nil {
t.Fatalf("failed to initialize XL based object layer - %v.", fsErr)
objLayer, xlDirs, xlErr := prepareXL()
if xlErr != nil {
t.Fatalf("failed to initialize XL based object layer - %v.", xlErr)
}
defer removeRoots(fsDirs)
defer removeRoots(xlDirs)
// Make objLayer available to all internal services via globalObjectAPI.
globalObjLayerMutex.Lock()
globalObjectAPI = objLayer
globalObjLayerMutex.Unlock()
@@ -188,6 +189,16 @@ func TestServiceRestartHandler(t *testing.T) {
testServicesCmdHandler(restartCmd, t)
}
// mkLockQueryVal - helper function to build lock query param.
func mkLockQueryVal(bucket, prefix, relTimeStr string) url.Values {
qVal := url.Values{}
qVal.Set("lock", "")
qVal.Set(string(mgmtBucket), bucket)
qVal.Set(string(mgmtPrefix), prefix)
qVal.Set(string(mgmtOlderThan), relTimeStr)
return qVal
}
// Test for locks list management REST API.
func TestListLocksHandler(t *testing.T) {
// reset globals.
@@ -212,6 +223,10 @@ func TestListLocksHandler(t *testing.T) {
globalMinioAddr = eps[0].Host
initGlobalAdminPeers(eps)
// Setup admin mgmt REST API handlers.
adminRouter := router.NewRouter()
registerAdminRouter(adminRouter)
testCases := []struct {
bucket string
prefix string
@@ -223,37 +238,34 @@ func TestListLocksHandler(t *testing.T) {
bucket: "mybucket",
prefix: "myobject",
relTime: "1s",
expectedStatus: 200,
expectedStatus: http.StatusOK,
},
// Test 2 - invalid duration
{
bucket: "mybucket",
prefix: "myprefix",
relTime: "invalidDuration",
expectedStatus: 400,
expectedStatus: http.StatusBadRequest,
},
// Test 3 - invalid bucket name
{
bucket: `invalid\\Bucket`,
prefix: "myprefix",
relTime: "1h",
expectedStatus: 400,
expectedStatus: http.StatusBadRequest,
},
// Test 4 - invalid prefix
{
bucket: "mybucket",
prefix: `invalid\\Prefix`,
relTime: "1h",
expectedStatus: 400,
expectedStatus: http.StatusBadRequest,
},
}
adminRouter := router.NewRouter()
registerAdminRouter(adminRouter)
for i, test := range testCases {
queryStr := fmt.Sprintf("&bucket=%s&prefix=%s&older-than=%s", test.bucket, test.prefix, test.relTime)
req, err := newTestRequest("GET", "/?lock"+queryStr, 0, nil)
queryVal := mkLockQueryVal(test.bucket, test.prefix, test.relTime)
req, err := newTestRequest("GET", "/?"+queryVal.Encode(), 0, nil)
if err != nil {
t.Fatalf("Test %d - Failed to construct list locks request - %v", i+1, err)
}
@@ -293,6 +305,10 @@ func TestClearLocksHandler(t *testing.T) {
}
initGlobalAdminPeers(eps)
// Setup admin mgmt REST API handlers.
adminRouter := router.NewRouter()
registerAdminRouter(adminRouter)
testCases := []struct {
bucket string
prefix string
@@ -304,37 +320,34 @@ func TestClearLocksHandler(t *testing.T) {
bucket: "mybucket",
prefix: "myobject",
relTime: "1s",
expectedStatus: 200,
expectedStatus: http.StatusOK,
},
// Test 2 - invalid duration
{
bucket: "mybucket",
prefix: "myprefix",
relTime: "invalidDuration",
expectedStatus: 400,
expectedStatus: http.StatusBadRequest,
},
// Test 3 - invalid bucket name
{
bucket: `invalid\\Bucket`,
prefix: "myprefix",
relTime: "1h",
expectedStatus: 400,
expectedStatus: http.StatusBadRequest,
},
// Test 4 - invalid prefix
{
bucket: "mybucket",
prefix: `invalid\\Prefix`,
relTime: "1h",
expectedStatus: 400,
expectedStatus: http.StatusBadRequest,
},
}
adminRouter := router.NewRouter()
registerAdminRouter(adminRouter)
for i, test := range testCases {
queryStr := fmt.Sprintf("&bucket=%s&prefix=%s&older-than=%s", test.bucket, test.prefix, test.relTime)
req, err := newTestRequest("POST", "/?lock"+queryStr, 0, nil)
queryVal := mkLockQueryVal(test.bucket, test.prefix, test.relTime)
req, err := newTestRequest("POST", "/?"+queryVal.Encode(), 0, nil)
if err != nil {
t.Fatalf("Test %d - Failed to construct clear locks request - %v", i+1, err)
}
@@ -361,25 +374,10 @@ func TestValidateLockQueryParams(t *testing.T) {
// initialize NSLock.
initNSLock(false)
// Sample query values for test cases.
allValidVal := url.Values{}
allValidVal.Set(string(lockBucket), "bucket")
allValidVal.Set(string(lockPrefix), "prefix")
allValidVal.Set(string(lockOlderThan), "1s")
invalidBucketVal := url.Values{}
invalidBucketVal.Set(string(lockBucket), `invalid\\Bucket`)
invalidBucketVal.Set(string(lockPrefix), "prefix")
invalidBucketVal.Set(string(lockOlderThan), "invalidDuration")
invalidPrefixVal := url.Values{}
invalidPrefixVal.Set(string(lockBucket), "bucket")
invalidPrefixVal.Set(string(lockPrefix), `invalid\\PRefix`)
invalidPrefixVal.Set(string(lockOlderThan), "invalidDuration")
invalidOlderThanVal := url.Values{}
invalidOlderThanVal.Set(string(lockBucket), "bucket")
invalidOlderThanVal.Set(string(lockPrefix), "prefix")
invalidOlderThanVal.Set(string(lockOlderThan), "invalidDuration")
allValidVal := mkLockQueryVal("bucket", "prefix", "1s")
invalidBucketVal := mkLockQueryVal(`invalid\\Bucket`, "prefix", "1s")
invalidPrefixVal := mkLockQueryVal("bucket", `invalid\\Prefix`, "1s")
invalidOlderThanVal := mkLockQueryVal("bucket", "prefix", "invalidDuration")
testCases := []struct {
qVals url.Values
@@ -410,3 +408,469 @@ func TestValidateLockQueryParams(t *testing.T) {
}
}
}
// mkListObjectsQueryStr - helper to build ListObjectsHeal query string.
func mkListObjectsQueryVal(bucket, prefix, marker, delimiter, maxKeyStr string) url.Values {
qVal := url.Values{}
qVal.Set("heal", "")
qVal.Set(string(mgmtBucket), bucket)
qVal.Set(string(mgmtPrefix), prefix)
qVal.Set(string(mgmtMarker), marker)
qVal.Set(string(mgmtDelimiter), delimiter)
qVal.Set(string(mgmtMaxKey), maxKeyStr)
return qVal
}
// TestValidateHealQueryParams - Test for query param validation helper function for heal APIs.
func TestValidateHealQueryParams(t *testing.T) {
testCases := []struct {
bucket string
prefix string
marker string
delimiter string
maxKeys string
apiErr APIErrorCode
}{
// 1. Valid params.
{
bucket: "mybucket",
prefix: "prefix",
marker: "prefix11",
delimiter: "/",
maxKeys: "10",
apiErr: ErrNone,
},
// 2. Valid params with meta bucket.
{
bucket: minioMetaBucket,
prefix: "prefix",
marker: "prefix11",
delimiter: "/",
maxKeys: "10",
apiErr: ErrNone,
},
// 3. Valid params with empty prefix.
{
bucket: "mybucket",
prefix: "",
marker: "",
delimiter: "/",
maxKeys: "10",
apiErr: ErrNone,
},
// 4. Invalid params with invalid bucket.
{
bucket: `invalid\\Bucket`,
prefix: "prefix",
marker: "prefix11",
delimiter: "/",
maxKeys: "10",
apiErr: ErrInvalidBucketName,
},
// 5. Invalid params with invalid prefix.
{
bucket: "mybucket",
prefix: `invalid\\Prefix`,
marker: "prefix11",
delimiter: "/",
maxKeys: "10",
apiErr: ErrInvalidObjectName,
},
// 6. Invalid params with invalid maxKeys.
{
bucket: "mybucket",
prefix: "prefix",
marker: "prefix11",
delimiter: "/",
maxKeys: "-1",
apiErr: ErrInvalidMaxKeys,
},
// 7. Invalid params with unsupported prefix marker combination.
{
bucket: "mybucket",
prefix: "prefix",
marker: "notmatchingmarker",
delimiter: "/",
maxKeys: "10",
apiErr: ErrNotImplemented,
},
// 8. Invalid params with unsupported delimiter.
{
bucket: "mybucket",
prefix: "prefix",
marker: "notmatchingmarker",
delimiter: "unsupported",
maxKeys: "10",
apiErr: ErrNotImplemented,
},
// 9. Invalid params with invalid max Keys
{
bucket: "mybucket",
prefix: "prefix",
marker: "prefix11",
delimiter: "/",
maxKeys: "999999999999999999999999999",
apiErr: ErrInvalidMaxKeys,
},
}
for i, test := range testCases {
vars := mkListObjectsQueryVal(test.bucket, test.prefix, test.marker, test.delimiter, test.maxKeys)
_, _, _, _, _, actualErr := validateHealQueryParams(vars)
if actualErr != test.apiErr {
t.Errorf("Test %d - Expected %v but received %v",
i+1, getAPIError(test.apiErr), getAPIError(actualErr))
}
}
}
// TestListObjectsHeal - Test for ListObjectsHealHandler.
func TestListObjectsHealHandler(t *testing.T) {
rootPath, err := newTestConfig("us-east-1")
if err != nil {
t.Fatalf("Unable to initialize server config. %s", err)
}
defer removeAll(rootPath)
// Initializing objectLayer and corresponding []StorageAPI
// since ListObjectsHeal() method requires it.
objLayer, xlDirs, xlErr := prepareXL()
if xlErr != nil {
t.Fatalf("failed to initialize XL based object layer - %v.", xlErr)
}
defer removeRoots(xlDirs)
err = objLayer.MakeBucket("mybucket")
if err != nil {
t.Fatalf("Failed to make bucket - %v", err)
}
// Delete bucket after running all test cases.
defer objLayer.DeleteBucket("mybucket")
// Make objLayer available to all internal services via globalObjectAPI.
globalObjLayerMutex.Lock()
globalObjectAPI = objLayer
globalObjLayerMutex.Unlock()
// Setup admin mgmt REST API handlers.
adminRouter := router.NewRouter()
registerAdminRouter(adminRouter)
testCases := []struct {
bucket string
prefix string
marker string
delimiter string
maxKeys string
statusCode int
}{
// 1. Valid params.
{
bucket: "mybucket",
prefix: "prefix",
marker: "prefix11",
delimiter: "/",
maxKeys: "10",
statusCode: http.StatusOK,
},
// 2. Valid params with meta bucket.
{
bucket: minioMetaBucket,
prefix: "prefix",
marker: "prefix11",
delimiter: "/",
maxKeys: "10",
statusCode: http.StatusOK,
},
// 3. Valid params with empty prefix.
{
bucket: "mybucket",
prefix: "",
marker: "",
delimiter: "/",
maxKeys: "10",
statusCode: http.StatusOK,
},
// 4. Invalid params with invalid bucket.
{
bucket: `invalid\\Bucket`,
prefix: "prefix",
marker: "prefix11",
delimiter: "/",
maxKeys: "10",
statusCode: getAPIError(ErrInvalidBucketName).HTTPStatusCode,
},
// 5. Invalid params with invalid prefix.
{
bucket: "mybucket",
prefix: `invalid\\Prefix`,
marker: "prefix11",
delimiter: "/",
maxKeys: "10",
statusCode: getAPIError(ErrInvalidObjectName).HTTPStatusCode,
},
// 6. Invalid params with invalid maxKeys.
{
bucket: "mybucket",
prefix: "prefix",
marker: "prefix11",
delimiter: "/",
maxKeys: "-1",
statusCode: getAPIError(ErrInvalidMaxKeys).HTTPStatusCode,
},
// 7. Invalid params with unsupported prefix marker combination.
{
bucket: "mybucket",
prefix: "prefix",
marker: "notmatchingmarker",
delimiter: "/",
maxKeys: "10",
statusCode: getAPIError(ErrNotImplemented).HTTPStatusCode,
},
// 8. Invalid params with unsupported delimiter.
{
bucket: "mybucket",
prefix: "prefix",
marker: "notmatchingmarker",
delimiter: "unsupported",
maxKeys: "10",
statusCode: getAPIError(ErrNotImplemented).HTTPStatusCode,
},
// 9. Invalid params with invalid max Keys
{
bucket: "mybucket",
prefix: "prefix",
marker: "prefix11",
delimiter: "/",
maxKeys: "999999999999999999999999999",
statusCode: getAPIError(ErrInvalidMaxKeys).HTTPStatusCode,
},
}
for i, test := range testCases {
if i != 0 {
continue
}
queryVal := mkListObjectsQueryVal(test.bucket, test.prefix, test.marker, test.delimiter, test.maxKeys)
req, err := newTestRequest("GET", "/?"+queryVal.Encode(), 0, nil)
if err != nil {
t.Fatalf("Test %d - Failed to construct list objects needing heal request - %v", i+1, err)
}
req.Header.Set(minioAdminOpHeader, "list")
cred := serverConfig.GetCredential()
err = signRequestV4(req, cred.AccessKey, cred.SecretKey)
if err != nil {
t.Fatalf("Test %d - Failed to sign list objects needing heal request - %v", i+1, err)
}
rec := httptest.NewRecorder()
adminRouter.ServeHTTP(rec, req)
if test.statusCode != rec.Code {
t.Errorf("Test %d - Expected HTTP status code %d but received %d", i+1, test.statusCode, rec.Code)
}
}
}
// TestHealBucketHandler - Test for HealBucketHandler.
func TestHealBucketHandler(t *testing.T) {
rootPath, err := newTestConfig("us-east-1")
if err != nil {
t.Fatalf("Unable to initialize server config. %s", err)
}
defer removeAll(rootPath)
// Initializing objectLayer and corresponding []StorageAPI
// since MakeBucket() and DeleteBucket() methods requires it.
objLayer, xlDirs, xlErr := prepareXL()
if xlErr != nil {
t.Fatalf("failed to initialize XL based object layer - %v.", xlErr)
}
defer removeRoots(xlDirs)
err = objLayer.MakeBucket("mybucket")
if err != nil {
t.Fatalf("Failed to make bucket - %v", err)
}
// Delete bucket after running all test cases.
defer objLayer.DeleteBucket("mybucket")
// Make objLayer available to all internal services via globalObjectAPI.
globalObjLayerMutex.Lock()
globalObjectAPI = objLayer
globalObjLayerMutex.Unlock()
// Setup admin mgmt REST API handlers.
adminRouter := router.NewRouter()
registerAdminRouter(adminRouter)
testCases := []struct {
bucket string
statusCode int
dryrun string
}{
// 1. Valid test case.
{
bucket: "mybucket",
statusCode: http.StatusOK,
},
// 2. Invalid bucket name.
{
bucket: `invalid\\Bucket`,
statusCode: http.StatusBadRequest,
},
// 3. Bucket not found.
{
bucket: "bucketnotfound",
statusCode: http.StatusNotFound,
},
// 4. Valid test case with dry-run.
{
bucket: "mybucket",
statusCode: http.StatusOK,
dryrun: "yes",
},
}
for i, test := range testCases {
// Prepare query params.
queryVal := url.Values{}
queryVal.Set(string(mgmtBucket), test.bucket)
queryVal.Set("heal", "")
queryVal.Set(string(mgmtDryRun), test.dryrun)
req, err := newTestRequest("POST", "/?"+queryVal.Encode(), 0, nil)
if err != nil {
t.Fatalf("Test %d - Failed to construct heal bucket request - %v", i+1, err)
}
req.Header.Set(minioAdminOpHeader, "bucket")
cred := serverConfig.GetCredential()
err = signRequestV4(req, cred.AccessKey, cred.SecretKey)
if err != nil {
t.Fatalf("Test %d - Failed to sign heal bucket request - %v", i+1, err)
}
rec := httptest.NewRecorder()
adminRouter.ServeHTTP(rec, req)
if test.statusCode != rec.Code {
t.Errorf("Test %d - Expected HTTP status code %d but received %d", i+1, test.statusCode, rec.Code)
}
}
}
// TestHealObjectHandler - Test for HealObjectHandler.
func TestHealObjectHandler(t *testing.T) {
rootPath, err := newTestConfig("us-east-1")
if err != nil {
t.Fatalf("Unable to initialize server config. %s", err)
}
defer removeAll(rootPath)
// Initializing objectLayer and corresponding []StorageAPI
// since MakeBucket(), PutObject() and DeleteBucket() method requires it.
objLayer, xlDirs, xlErr := prepareXL()
if xlErr != nil {
t.Fatalf("failed to initialize XL based object layer - %v.", xlErr)
}
defer removeRoots(xlDirs)
// Create an object myobject under bucket mybucket.
bucketName := "mybucket"
objName := "myobject"
err = objLayer.MakeBucket(bucketName)
if err != nil {
t.Fatalf("Failed to make bucket %s - %v", bucketName, err)
}
_, err = objLayer.PutObject(bucketName, objName, int64(len("hello")), bytes.NewReader([]byte("hello")), nil, "")
if err != nil {
t.Fatalf("Failed to create %s - %v", objName, err)
}
// Delete bucket and object after running all test cases.
defer func(objLayer ObjectLayer, bucketName, objName string) {
objLayer.DeleteObject(bucketName, objName)
objLayer.DeleteBucket(bucketName)
}(objLayer, bucketName, objName)
// Make objLayer available to all internal services via globalObjectAPI.
globalObjLayerMutex.Lock()
globalObjectAPI = objLayer
globalObjLayerMutex.Unlock()
// Setup admin mgmt REST API handlers.
adminRouter := router.NewRouter()
registerAdminRouter(adminRouter)
testCases := []struct {
bucket string
object string
dryrun string
statusCode int
}{
// 1. Valid test case.
{
bucket: bucketName,
object: objName,
statusCode: http.StatusOK,
},
// 2. Invalid bucket name.
{
bucket: `invalid\\Bucket`,
object: "myobject",
statusCode: http.StatusBadRequest,
},
// 3. Bucket not found.
{
bucket: "bucketnotfound",
object: "myobject",
statusCode: http.StatusNotFound,
},
// 4. Invalid object name.
{
bucket: bucketName,
object: `invalid\\Object`,
statusCode: http.StatusBadRequest,
},
// 5. Object not found.
{
bucket: bucketName,
object: "objectnotfound",
statusCode: http.StatusNotFound,
},
// 6. Valid test case with dry-run.
{
bucket: bucketName,
object: objName,
dryrun: "yes",
statusCode: http.StatusOK,
},
}
for i, test := range testCases {
// Prepare query params.
queryVal := url.Values{}
queryVal.Set(string(mgmtBucket), test.bucket)
queryVal.Set(string(mgmtObject), test.object)
queryVal.Set("heal", "")
queryVal.Set(string(mgmtDryRun), test.dryrun)
req, err := newTestRequest("POST", "/?"+queryVal.Encode(), 0, nil)
if err != nil {
t.Fatalf("Test %d - Failed to construct heal object request - %v", i+1, err)
}
req.Header.Set(minioAdminOpHeader, "object")
cred := serverConfig.GetCredential()
err = signRequestV4(req, cred.AccessKey, cred.SecretKey)
if err != nil {
t.Fatalf("Test %d - Failed to sign heal object request - %v", i+1, err)
}
rec := httptest.NewRecorder()
adminRouter.ServeHTTP(rec, req)
if test.statusCode != rec.Code {
t.Errorf("Test %d - Expected HTTP status code %d but received %d", i+1, test.statusCode, rec.Code)
}
}
}

View File

@@ -41,7 +41,15 @@ func registerAdminRouter(mux *router.Router) {
// List Locks
adminRouter.Methods("GET").Queries("lock", "").Headers(minioAdminOpHeader, "list").HandlerFunc(adminAPI.ListLocksHandler)
// Clear locks
adminRouter.Methods("POST").Queries("lock", "").Headers(minioAdminOpHeader, "clear").HandlerFunc(adminAPI.ClearLocksHandler)
/// Heal operations
// List Objects needing heal.
adminRouter.Methods("GET").Queries("heal", "").Headers(minioAdminOpHeader, "list").HandlerFunc(adminAPI.ListObjectsHealHandler)
// Heal Buckets.
adminRouter.Methods("POST").Queries("heal", "").Headers(minioAdminOpHeader, "bucket").HandlerFunc(adminAPI.HealBucketHandler)
// Heal Objects.
adminRouter.Methods("POST").Queries("heal", "").Headers(minioAdminOpHeader, "object").HandlerFunc(adminAPI.HealObjectHandler)
}

View File

@@ -197,6 +197,7 @@ type Object struct {
// The class of storage used to store the object.
StorageClass string
HealInfo *HealInfo `xml:"HealInfo,omitempty"`
}
// CopyObjectResponse container returns ETag and LastModified of the successfully copied object
@@ -316,6 +317,8 @@ func generateListObjectsV1Response(bucket, prefix, marker, delimiter string, max
content.Size = object.Size
content.StorageClass = "STANDARD"
content.Owner = owner
// object.HealInfo is non-empty only when resp is constructed in ListObjectsHeal.
content.HealInfo = object.HealInfo
contents = append(contents, content)
}
// TODO - support EncodingType in xml decoding

View File

@@ -1,5 +1,5 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
* Minio Cloud Storage, (C) 2016, 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -59,6 +59,21 @@ type BucketInfo struct {
Created time.Time
}
type healStatus int
const (
canHeal healStatus = iota // Object can be healed
corrupted // Object can't be healed
quorumUnavailable // Object can't be healed until read quorum is available
)
// HealInfo - represents healing related information of an object.
type HealInfo struct {
Status healStatus
MissingDataCount int
MissingPartityCount int
}
// ObjectInfo - represents object metadata.
type ObjectInfo struct {
// Name of the bucket.
@@ -89,6 +104,7 @@ type ObjectInfo struct {
// User-Defined metadata
UserDefined map[string]string
HealInfo *HealInfo `xml:"HealInfo,omitempty"`
}
// ListPartsInfo - represents list of all parts.

View File

@@ -43,13 +43,21 @@ const (
var validBucket = regexp.MustCompile(`^[a-z0-9][a-z0-9\.\-]{1,61}[a-z0-9]$`)
var isIPAddress = regexp.MustCompile(`^(\d+\.){3}\d+$`)
// isMinioBucket returns true if given bucket is a Minio internal
// bucket and false otherwise.
func isMinioMetaBucketName(bucket string) bool {
return bucket == minioMetaBucket ||
bucket == minioMetaMultipartBucket ||
bucket == minioMetaTmpBucket
}
// IsValidBucketName verifies a bucket name in accordance with Amazon's
// requirements. It must be 3-63 characters long, can contain dashes
// and periods, but must begin and end with a lowercase letter or a number.
// See: http://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html
func IsValidBucketName(bucket string) bool {
// Special case when bucket is equal to 'metaBucket'.
if bucket == minioMetaBucket || bucket == minioMetaMultipartBucket {
// Special case when bucket is equal to one of the meta buckets.
if isMinioMetaBucketName(bucket) {
return true
}
if len(bucket) < 3 || len(bucket) > 63 {

View File

@@ -175,3 +175,40 @@ func TestGetCompleteMultipartMD5(t *testing.T) {
}
}
}
// TestIsMinioBucketName - Tests isMinioBucketName helper function.
func TestIsMinioMetaBucketName(t *testing.T) {
testCases := []struct {
bucket string
result bool
}{
// Minio meta bucket.
{
bucket: minioMetaBucket,
result: true,
},
// Minio meta bucket.
{
bucket: minioMetaMultipartBucket,
result: true,
},
// Minio meta bucket.
{
bucket: minioMetaTmpBucket,
result: true,
},
// Normal bucket
{
bucket: "mybucket",
result: false,
},
}
for i, test := range testCases {
actual := isMinioMetaBucketName(test.bucket)
if actual != test.result {
t.Errorf("Test %d - expected %v but received %v",
i+1, test.result, actual)
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
* Minio Cloud Storage, (C) 2016, 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,7 +19,7 @@ package cmd
import "time"
// commonTime returns a maximally occurring time from a list of time.
func commonTime(modTimes []time.Time) (modTime time.Time) {
func commonTime(modTimes []time.Time) (modTime time.Time, count int) {
var maxima int // Counter for remembering max occurrence of elements.
timeOccurenceMap := make(map[time.Time]int)
// Ignore the uuid sentinel and count the rest.
@@ -32,13 +32,17 @@ func commonTime(modTimes []time.Time) (modTime time.Time) {
// Find the common cardinality from previously collected
// occurrences of elements.
for time, count := range timeOccurenceMap {
if count > maxima {
if count == maxima && time.After(modTime) {
maxima = count
modTime = time
} else if count > maxima {
maxima = count
modTime = time
}
}
// Return the collected common uuid.
return modTime
return modTime, maxima
}
// Beginning of unix time is treated as sentinel value here.
@@ -85,7 +89,7 @@ func listOnlineDisks(disks []StorageAPI, partsMetadata []xlMetaV1, errs []error)
modTimes := listObjectModtimes(partsMetadata, errs)
// Reduce list of UUIDs to a single common value.
modTime = commonTime(modTimes)
modTime, _ = commonTime(modTimes)
// Create a new online disks slice, which have common uuid.
for index, t := range modTimes {
@@ -119,7 +123,7 @@ func outDatedDisks(disks []StorageAPI, partsMetadata []xlMetaV1, errs []error) (
// Returns if the object should be healed.
func xlShouldHeal(partsMetadata []xlMetaV1, errs []error) bool {
modTime := commonTime(listObjectModtimes(partsMetadata, errs))
modTime, _ := commonTime(listObjectModtimes(partsMetadata, errs))
for index := range partsMetadata {
if errs[index] == errDiskNotFound {
continue
@@ -133,3 +137,55 @@ func xlShouldHeal(partsMetadata []xlMetaV1, errs []error) bool {
}
return false
}
// xlHealStat - returns a structure which describes how many data,
// parity erasure blocks are missing and if it is possible to heal
// with the blocks present.
func xlHealStat(xl xlObjects, partsMetadata []xlMetaV1, errs []error) HealInfo {
// Less than quorum erasure coded blocks of the object have the same create time.
// This object can't be healed with the information we have.
modTime, count := commonTime(listObjectModtimes(partsMetadata, errs))
if count < xl.readQuorum {
return HealInfo{
Status: quorumUnavailable,
MissingDataCount: 0,
MissingPartityCount: 0,
}
}
// If there isn't a valid xlMeta then we can't heal the object.
xlMeta, err := pickValidXLMeta(partsMetadata, modTime)
if err != nil {
return HealInfo{
Status: corrupted,
MissingDataCount: 0,
MissingPartityCount: 0,
}
}
// Compute heal statistics like bytes to be healed, missing
// data and missing parity count.
missingDataCount := 0
missingParityCount := 0
for i, err := range errs {
// xl.json is not found, which implies the erasure
// coded blocks are unavailable in the corresponding disk.
// First half of the disks are data and the rest are parity.
if realErr := errorCause(err); realErr == errFileNotFound || realErr == errDiskNotFound {
if xlMeta.Erasure.Distribution[i]-1 < xl.dataBlocks {
missingDataCount++
} else {
missingParityCount++
}
}
}
// This object can be healed. We have enough object metadata
// to reconstruct missing erasure coded blocks.
return HealInfo{
Status: canHeal,
MissingDataCount: missingDataCount,
MissingPartityCount: missingParityCount,
}
}

View File

@@ -1,5 +1,5 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
* Minio Cloud Storage, (C) 2016, 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -75,7 +75,7 @@ func TestCommonTime(t *testing.T) {
// common modtime. Tests fail if modtime does not match.
for i, testCase := range testCases {
// Obtain a common mod time from modTimes slice.
ctime := commonTime(testCase.times)
ctime, _ := commonTime(testCase.times)
if testCase.time != ctime {
t.Fatalf("Test case %d, expect to pass but failed. Wanted modTime: %s, got modTime: %s\n", i+1, testCase.time, ctime)
}

View File

@@ -169,8 +169,8 @@ func listBucketNames(storageDisks []StorageAPI) (bucketNames map[string]struct{}
if !IsValidBucketName(volInfo.Name) {
continue
}
// Ignore the volume special bucket.
if volInfo.Name == minioMetaBucket {
// Skip special volume buckets.
if isMinioMetaBucketName(volInfo.Name) {
continue
}
bucketNames[volInfo.Name] = struct{}{}

View File

@@ -1,5 +1,5 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
* Minio Cloud Storage, (C) 2016, 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -120,8 +120,15 @@ func (xl xlObjects) listObjectsHeal(bucket, prefix, marker, delimiter string, ma
objInfo.Name = entry
objInfo.IsDir = true
} else {
objInfo.Bucket = bucket
objInfo.Name = entry
var err error
objInfo, err = xl.getObjectInfo(bucket, entry)
if err != nil {
// Ignore errFileNotFound
if errorCause(err) == errFileNotFound {
continue
}
return ListObjectsInfo{}, toObjectErr(err, bucket, prefix)
}
}
nextMarker = objInfo.Name
objInfos = append(objInfos, objInfo)
@@ -150,11 +157,13 @@ func (xl xlObjects) listObjectsHeal(bucket, prefix, marker, delimiter string, ma
objectLock.RLock()
partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, objInfo.Name)
if xlShouldHeal(partsMetadata, errs) {
healStat := xlHealStat(xl, partsMetadata, errs)
result.Objects = append(result.Objects, ObjectInfo{
Name: objInfo.Name,
ModTime: objInfo.ModTime,
Size: objInfo.Size,
IsDir: false,
Name: objInfo.Name,
ModTime: objInfo.ModTime,
Size: objInfo.Size,
IsDir: false,
HealInfo: &healStat,
})
}
objectLock.RUnlock()

View File

@@ -322,7 +322,7 @@ func readAllXLMetadata(disks []StorageAPI, bucket, object string) ([]xlMetaV1, [
return metadataArray, errs
}
// Return ordered partsMetadata depeinding on distribution.
// Return ordered partsMetadata depending on distribution.
func getOrderedPartsMetadata(distribution []int, partsMetadata []xlMetaV1) (orderedPartsMetadata []xlMetaV1) {
orderedPartsMetadata = make([]xlMetaV1, len(partsMetadata))
for index := range partsMetadata {