Add Heal Disk Metadata RPC API + tests (#2556)

This commit is contained in:
Anis Elleuch 2016-08-29 03:31:59 +01:00 committed by Harshavardhana
parent 7f92165c79
commit 0513b3ed07
7 changed files with 189 additions and 45 deletions

View File

@ -47,8 +47,17 @@ EAMPLES:
`, `,
} }
func checkHealControlSyntax(ctx *cli.Context) {
if len(ctx.Args()) != 1 {
cli.ShowCommandHelpAndExit(ctx, "heal", 1)
}
}
// "minio control heal" entry point. // "minio control heal" entry point.
func healControl(ctx *cli.Context) { func healControl(ctx *cli.Context) {
checkHealControlSyntax(ctx)
// Parse bucket and object from url.URL.Path // Parse bucket and object from url.URL.Path
parseBucketObject := func(path string) (bucketName string, objectName string) { parseBucketObject := func(path string) (bucketName string, objectName string) {
splits := strings.SplitN(path, string(slashSeparator), 3) splits := strings.SplitN(path, string(slashSeparator), 3)
@ -67,18 +76,9 @@ func healControl(ctx *cli.Context) {
return bucketName, objectName return bucketName, objectName
} }
if len(ctx.Args()) != 1 {
cli.ShowCommandHelpAndExit(ctx, "heal", 1)
}
parsedURL, err := url.Parse(ctx.Args()[0]) parsedURL, err := url.Parse(ctx.Args()[0])
fatalIf(err, "Unable to parse URL") fatalIf(err, "Unable to parse URL")
bucketName, objectName := parseBucketObject(parsedURL.Path)
if bucketName == "" {
cli.ShowCommandHelpAndExit(ctx, "heal", 1)
}
authCfg := &authConfig{ authCfg := &authConfig{
accessKey: serverConfig.GetCredential().AccessKeyID, accessKey: serverConfig.GetCredential().AccessKeyID,
secretKey: serverConfig.GetCredential().SecretAccessKey, secretKey: serverConfig.GetCredential().SecretAccessKey,
@ -88,6 +88,19 @@ func healControl(ctx *cli.Context) {
} }
client := newAuthClient(authCfg) client := newAuthClient(authCfg)
// Always try to fix disk metadata
fmt.Print("Checking and healing disk metadata..")
args := &GenericArgs{}
reply := &GenericReply{}
err = client.Call("Controller.HealDiskMetadataHandler", args, reply)
fatalIf(err, "Unable to heal disk metadata.")
fmt.Println(" ok")
bucketName, objectName := parseBucketObject(parsedURL.Path)
if bucketName == "" {
return
}
// If object does not have trailing "/" then it's an object, hence heal it. // If object does not have trailing "/" then it's an object, hence heal it.
if objectName != "" && !strings.HasSuffix(objectName, slashSeparator) { if objectName != "" && !strings.HasSuffix(objectName, slashSeparator) {
fmt.Printf("Healing : /%s/%s\n", bucketName, objectName) fmt.Printf("Healing : /%s/%s\n", bucketName, objectName)

View File

@ -103,6 +103,18 @@ func (c *controllerAPIHandlers) HealObjectHandler(args *HealObjectArgs, reply *G
return objAPI.HealObject(args.Bucket, args.Object) return objAPI.HealObject(args.Bucket, args.Object)
} }
// HealObject - heal the object.
func (c *controllerAPIHandlers) HealDiskMetadataHandler(args *GenericArgs, reply *GenericReply) error {
objAPI := c.ObjectAPI()
if objAPI == nil {
return errVolumeBusy
}
if !isRPCTokenValid(args.Token) {
return errInvalidToken
}
return objAPI.HealDiskMetadata()
}
// ShutdownArgs - argument for Shutdown RPC. // ShutdownArgs - argument for Shutdown RPC.
type ShutdownArgs struct { type ShutdownArgs struct {
// Authentication token generated by Login. // Authentication token generated by Login.

View File

@ -0,0 +1,63 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
// "net/rpc"
"testing"
)
// Wrapper for calling heal disk metadata rpc Handler
func TestControllerHandlerHealDiskMetadata(t *testing.T) {
ExecObjectLayerTest(t, testHealDiskMetadataControllerHandler)
}
// testHealDiskMetadataControllerHandler - Test Heal Disk Metadata handler
func testHealDiskMetadataControllerHandler(obj ObjectLayer, instanceType string, t TestErrHandler) {
// Register the API end points with XL/FS object layer.
serverAddress, random, err := initTestControllerRPCEndPoint(obj)
if err != nil {
t.Fatal(err)
}
// initialize the server and obtain the credentials and root.
// credentials are necessary to sign the HTTP request.
rootPath, err := newTestConfig("us-east-1")
if err != nil {
t.Fatalf("Init Test config failed")
}
// remove the root folder after the test ends.
defer removeAll(rootPath)
authCfg := &authConfig{
accessKey: serverConfig.GetCredential().AccessKeyID,
secretKey: serverConfig.GetCredential().SecretAccessKey,
address: serverAddress,
path: "/controller" + random,
loginMethod: "Controller.LoginHandler",
}
client := newAuthClient(authCfg)
args := &GenericArgs{}
reply := &GenericReply{}
err = client.Call("Controller.HealDiskMetadataHandler", args, reply)
if instanceType == "FS" && err == nil {
t.Errorf("Test should fail with FS")
}
if instanceType == "XL" && err != nil {
t.Errorf("Test should succeed with XL %s", err.Error())
}
}

View File

@ -659,3 +659,8 @@ func (fs fsObjects) HealObject(bucket, object string) error {
func (fs fsObjects) ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { func (fs fsObjects) ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
return ListObjectsInfo{}, NotImplemented{} return ListObjectsInfo{}, NotImplemented{}
} }
// HealDiskMetadata -- heal disk metadata, not supported in FS
func (fs fsObjects) HealDiskMetadata() error {
return NotImplemented{}
}

View File

@ -22,6 +22,7 @@ import "io"
type ObjectLayer interface { type ObjectLayer interface {
// Storage operations. // Storage operations.
Shutdown() error Shutdown() error
HealDiskMetadata() error
StorageInfo() StorageInfo StorageInfo() StorageInfo
// Bucket operations. // Bucket operations.

View File

@ -26,8 +26,10 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
"net"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/rpc"
"net/url" "net/url"
"os" "os"
"regexp" "regexp"
@ -931,6 +933,7 @@ func initTestAPIEndPoints(objLayer ObjectLayer, apiFunctions []string) http.Hand
return muxRouter return muxRouter
} }
// Initialize Web RPC Handlers for testing
func initTestWebRPCEndPoint(objLayer ObjectLayer) http.Handler { func initTestWebRPCEndPoint(objLayer ObjectLayer) http.Handler {
// Initialize Web. // Initialize Web.
webHandlers := &webAPIHandlers{ webHandlers := &webAPIHandlers{
@ -942,3 +945,37 @@ func initTestWebRPCEndPoint(objLayer ObjectLayer) http.Handler {
registerWebRouter(muxRouter, webHandlers) registerWebRouter(muxRouter, webHandlers)
return muxRouter return muxRouter
} }
// Initialize Controller RPC Handlers for testing
func initTestControllerRPCEndPoint(objLayer ObjectLayer) (string, string, error) {
controllerHandlers := &controllerAPIHandlers{
ObjectAPI: func() ObjectLayer { return objLayer },
}
// Start configuring net/rpc server
server := rpc.NewServer()
server.RegisterName("Controller", controllerHandlers)
listenTCP := func() (net.Listener, string, error) {
l, e := net.Listen("tcp", ":0") // any available address
if e != nil {
return nil, "", errors.New("net.Listen tcp :0, " + e.Error())
}
return l, l.Addr().String(), nil
}
l, serverAddr, err := listenTCP()
if err != nil {
return "", "", nil
}
go server.Accept(l)
// net/rpc only accepts one registered path and doesn't help to unregister it,
// so we are registering a new rpc path each time this function is called
random := strconv.Itoa(rand.Int())
server.HandleHTTP("/controller"+random, "/controller-debug"+random)
testserver := httptest.NewServer(nil)
serverAddr = testserver.Listener.Addr().String()
return serverAddr, random, nil
}

View File

@ -67,6 +67,46 @@ type xlObjects struct {
objCacheEnabled bool objCacheEnabled bool
} }
func repairDiskMetadata(storageDisks []StorageAPI) error {
// Attempt to load all `format.json`.
formatConfigs, sErrs := loadAllFormats(storageDisks)
// Generic format check validates
// if (no quorum) return error
// if (disks not recognized) // Always error.
if err := genericFormatCheck(formatConfigs, sErrs); err != nil {
return err
}
// Initialize meta volume, if volume already exists ignores it.
if err := initMetaVolume(storageDisks); err != nil {
return fmt.Errorf("Unable to initialize '.minio.sys' meta volume, %s", err)
}
// Handles different cases properly.
switch reduceFormatErrs(sErrs, len(storageDisks)) {
case errCorruptedFormat:
if err := healFormatXLCorruptedDisks(storageDisks); err != nil {
return fmt.Errorf("Unable to repair corrupted format, %s", err)
}
case errUnformattedDisk:
// All drives online but fresh, initialize format.
if err := initFormatXL(storageDisks); err != nil {
return fmt.Errorf("Unable to initialize format, %s", err)
}
case errSomeDiskUnformatted:
// All drives online but some report missing format.json.
if err := healFormatXLFreshDisks(storageDisks); err != nil {
// There was an unexpected unrecoverable error during healing.
return fmt.Errorf("Unable to heal backend %s", err)
}
case errSomeDiskOffline:
// FIXME: in future.
return fmt.Errorf("Unable to initialize format %s and %s", errSomeDiskOffline, errSomeDiskUnformatted)
}
return nil
}
// newXLObjects - initialize new xl object layer. // newXLObjects - initialize new xl object layer.
func newXLObjects(disks, ignoredDisks []string) (ObjectLayer, error) { func newXLObjects(disks, ignoredDisks []string) (ObjectLayer, error) {
if disks == nil { if disks == nil {
@ -97,42 +137,8 @@ func newXLObjects(disks, ignoredDisks []string) (ObjectLayer, error) {
} }
} }
// Attempt to load all `format.json`. // Fix format files in case of fresh or corrupted disks
formatConfigs, sErrs := loadAllFormats(storageDisks) repairDiskMetadata(storageDisks)
// Generic format check validates
// if (no quorum) return error
// if (disks not recognized) // Always error.
if err := genericFormatCheck(formatConfigs, sErrs); err != nil {
return nil, err
}
// Initialize meta volume, if volume already exists ignores it.
if err := initMetaVolume(storageDisks); err != nil {
return nil, fmt.Errorf("Unable to initialize '.minio.sys' meta volume, %s", err)
}
// Handles different cases properly.
switch reduceFormatErrs(sErrs, len(storageDisks)) {
case errCorruptedFormat:
if err := healFormatXLCorruptedDisks(storageDisks); err != nil {
return nil, fmt.Errorf("Unable to repair corrupted format, %s", err)
}
case errUnformattedDisk:
// All drives online but fresh, initialize format.
if err := initFormatXL(storageDisks); err != nil {
return nil, fmt.Errorf("Unable to initialize format, %s", err)
}
case errSomeDiskUnformatted:
// All drives online but some report missing format.json.
if err := healFormatXLFreshDisks(storageDisks); err != nil {
// There was an unexpected unrecoverable error during healing.
return nil, fmt.Errorf("Unable to heal backend %s", err)
}
case errSomeDiskOffline:
// FIXME: in future.
return nil, fmt.Errorf("Unable to initialize format %s and %s", errSomeDiskOffline, errSomeDiskUnformatted)
}
// Runs house keeping code, like t, cleaning up tmp files etc. // Runs house keeping code, like t, cleaning up tmp files etc.
if err := xlHouseKeeping(storageDisks); err != nil { if err := xlHouseKeeping(storageDisks); err != nil {
@ -180,6 +186,13 @@ func (xl xlObjects) Shutdown() error {
return nil return nil
} }
// HealDiskMetadata function for object storage interface.
func (xl xlObjects) HealDiskMetadata() error {
nsMutex.Lock(minioMetaBucket, formatConfigFile)
defer nsMutex.Unlock(minioMetaBucket, formatConfigFile)
return repairDiskMetadata(xl.storageDisks)
}
// byDiskTotal is a collection satisfying sort.Interface. // byDiskTotal is a collection satisfying sort.Interface.
type byDiskTotal []disk.Info type byDiskTotal []disk.Info