heal: Refactor heal command. (#2901)

- return errors for heal operation through rpc replies.
  - implement rotating wheel for healing status.

Fixes #2491
This commit is contained in:
Harshavardhana 2016-10-14 19:57:40 -07:00 committed by GitHub
parent 18be3bc95a
commit f22862aa28
20 changed files with 619 additions and 244 deletions

View File

@ -6,9 +6,9 @@ language: go
os:
- linux
- osx
#- osx
osx_image: xcode7.2
#osx_image: xcode7.2
env:
- ARCH=x86_64

View File

@ -68,7 +68,7 @@ type HealListArgs struct {
type HealListReply struct {
IsTruncated bool
NextMarker string
Objects []string
Objects []ObjectInfo
}
// ListObjects - list all objects that needs healing.
@ -80,35 +80,30 @@ func (c *controlAPIHandlers) ListObjectsHealHandler(args *HealListArgs, reply *H
if !isRPCTokenValid(args.Token) {
return errInvalidToken
}
if !c.IsXL {
return nil
}
info, err := objAPI.ListObjectsHeal(args.Bucket, args.Prefix, args.Marker, args.Delimiter, args.MaxKeys)
if err != nil {
return err
}
reply.IsTruncated = info.IsTruncated
reply.NextMarker = info.NextMarker
for _, obj := range info.Objects {
reply.Objects = append(reply.Objects, obj.Name)
}
reply.Objects = info.Objects
return nil
}
// HealObjectArgs - argument for HealObject RPC.
type HealObjectArgs struct {
// HealBucketArgs - arguments for HealBucket RPC.
type HealBucketArgs struct {
// Authentication token generated by Login.
GenericArgs
// Name of the bucket.
// Bucket to be healed.
Bucket string
// Name of the object.
Object string
}
// HealObjectReply - reply by HealObject RPC.
type HealObjectReply struct{}
// HealObject - heal the object.
func (c *controlAPIHandlers) HealObjectHandler(args *HealObjectArgs, reply *GenericReply) error {
// Heals missing buckets across disks, if we have enough quorum.
func (c *controlAPIHandlers) HealBucketHandler(args *HealBucketArgs, reply *GenericReply) error {
objAPI := c.ObjectAPI()
if objAPI == nil {
return errServerNotInitialized
@ -116,15 +111,72 @@ func (c *controlAPIHandlers) HealObjectHandler(args *HealObjectArgs, reply *Gene
if !isRPCTokenValid(args.Token) {
return errInvalidToken
}
return objAPI.HealObject(args.Bucket, args.Object)
if !c.IsXL {
return nil
}
// Proceed to heal the bucket.
return objAPI.HealBucket(args.Bucket)
}
// HealObject - heal the object.
func (c *controlAPIHandlers) HealDiskMetadataHandler(args *GenericArgs, reply *GenericReply) error {
// HealObjectArgs - argument for HealObject RPC.
type HealObjectArgs struct {
// Authentication token generated by Login.
GenericArgs
// Name of the bucket where the object
// needs to be healed.
Bucket string
// Name of the object to be healed.
Objects []ObjectInfo
}
// HealObjectReply - reply by HealObject RPC.
type HealObjectReply struct {
Results []string
}
// HealObject heals 1000 objects at a time for missing chunks, missing metadata on a given bucket.
func (c *controlAPIHandlers) HealObjectsHandler(args *HealObjectArgs, reply *HealObjectReply) error {
objAPI := c.ObjectAPI()
if objAPI == nil {
return errServerNotInitialized
}
if !isRPCTokenValid(args.Token) {
return errInvalidToken
}
err := repairDiskMetadata(c.StorageDisks)
if !c.IsXL {
return nil
}
// Heal all objects that need healing.
var errs = make([]error, len(args.Objects))
for idx, objInfo := range args.Objects {
errs[idx] = objAPI.HealObject(args.Bucket, objInfo.Name)
}
// Get all the error causes.
var causes = make([]string, len(args.Objects))
for id, err := range errs {
if err != nil {
causes[id] = err.Error()
}
}
// Save the causes.
reply.Results = causes
return nil
}
// Heals backend storage format.
func (c *controlAPIHandlers) HealFormatHandler(args *GenericArgs, reply *GenericReply) error {
if !isRPCTokenValid(args.Token) {
return errInvalidToken
}
if !c.IsXL {
return nil
}
err := healFormatXL(c.StorageDisks)
if err != nil {
return err
}
@ -214,6 +266,9 @@ func (c *controlAPIHandlers) TryInitHandler(args *GenericArgs, reply *GenericRep
if !isRPCTokenValid(args.Token) {
return errInvalidToken
}
if !c.IsXL {
return nil
}
go func() {
globalWakeupCh <- struct{}{}
}()

View File

@ -17,12 +17,13 @@
package cmd
import (
"errors"
"fmt"
"net/url"
"path"
"strings"
"github.com/minio/cli"
"github.com/minio/mc/pkg/console"
)
var healCmd = cli.Command{
@ -41,48 +42,154 @@ FLAGS:
{{end}}
EXAMPLES:
1. Heal an object.
1. Heal missing on-disk format across all inconsistent nodes.
$ minio control {{.Name}} http://localhost:9000
2. Heals a specific object.
$ minio control {{.Name}} http://localhost:9000/songs/classical/western/piano.mp3
2. Heal all objects in a bucket recursively.
$ minio control {{.Name}} http://localhost:9000/songs
3. Heal bucket and all objects in a bucket recursively.
$ minio control {{.Name}} http://localhost:9000/songs
3. Heall all objects with a given prefix recursively.
$ minio control {{.Name}} http://localhost:9000/songs/classical/
4. Heal all objects with a given prefix recursively.
$ minio control {{.Name}} http://localhost:9000/songs/classical/
`,
}
func checkHealControlSyntax(ctx *cli.Context) {
if len(ctx.Args()) != 1 {
cli.ShowCommandHelpAndExit(ctx, "heal", 1)
}
// heals backend storage format, useful in restoring `format.json` missing on a
// fresh or corrupted disks. This call does deep inspection of backend layout
// and applies appropriate `format.json` to the disk.
func healStorageFormat(authClnt *AuthRPCClient) error {
args := &GenericArgs{}
reply := &GenericReply{}
return authClnt.Call("Control.HealFormatHandler", args, reply)
}
// "minio control heal" entry point.
func healControl(ctx *cli.Context) {
// lists all objects which needs to be healed, this is a precursor helper function called before
// calling actual healing operation. Returns a maximum of 1000 objects that needs healing at a time.
// Marker indicates the next entry point where the listing will start.
func listObjectsHeal(authClnt *AuthRPCClient, bucketName, prefixName, markerName string) (*HealListReply, error) {
args := &HealListArgs{
Bucket: bucketName,
Prefix: prefixName,
Marker: markerName,
Delimiter: "",
MaxKeys: 1000,
}
reply := &HealListReply{}
err := authClnt.Call("Control.ListObjectsHealHandler", args, reply)
if err != nil {
return nil, err
}
return reply, nil
}
checkHealControlSyntax(ctx)
// Internal custom struct encapsulates pretty msg to be printed by the caller.
type healMsg struct {
Msg string
Err error
}
// Parse bucket and object from url.URL.Path
parseBucketObject := func(path string) (bucketName string, objectName string) {
splits := strings.SplitN(path, string(slashSeparator), 3)
switch len(splits) {
case 0, 1:
bucketName = ""
objectName = ""
case 2:
bucketName = splits[1]
objectName = ""
case 3:
bucketName = splits[1]
objectName = splits[2]
// Prettifies heal results and returns them over a channel, caller reads from this channel and prints.
func prettyHealResults(healedObjects []ObjectInfo, healReply *HealObjectReply) <-chan healMsg {
var msgCh = make(chan healMsg)
// Starts writing to message channel for the list of results sent back
// by a previous healing operation.
go func(msgCh chan<- healMsg) {
defer close(msgCh)
// Go through all the results and validate if we have success or failure.
for i, healStr := range healReply.Results {
objPath := path.Join(healedObjects[i].Bucket, healedObjects[i].Name)
// TODO: We need to still print heal error cause.
if healStr != "" {
msgCh <- healMsg{
Msg: fmt.Sprintf("%s %s", colorRed("FAILED"), objPath),
Err: errors.New(healStr),
}
continue
}
msgCh <- healMsg{
Msg: fmt.Sprintf("%s %s", colorGreen("SUCCESS"), objPath),
}
}
return bucketName, objectName
}(msgCh)
// Return ..
return msgCh
}
var scanBar = scanBarFactory()
// Heals all the objects under a given bucket, optionally you can specify an
// object prefix to heal objects under this prefix.
func healObjects(authClnt *AuthRPCClient, bucketName, prefixName string) error {
if authClnt == nil || bucketName == "" {
return errInvalidArgument
}
// Save marker for the next request.
var markerName string
for {
healListReply, err := listObjectsHeal(authClnt, bucketName, prefixName, markerName)
if err != nil {
return err
}
// Attempt to heal only if there are any objects to heal.
if len(healListReply.Objects) > 0 {
healArgs := &HealObjectArgs{
Bucket: bucketName,
Objects: healListReply.Objects,
}
healReply := &HealObjectReply{}
err = authClnt.Call("Control.HealObjectsHandler", healArgs, healReply)
if err != nil {
return err
}
// Pretty print all the heal results.
for msg := range prettyHealResults(healArgs.Objects, healReply) {
if msg.Err != nil {
// TODO we need to print the error cause as well.
scanBar(msg.Msg)
continue
}
// Success.
scanBar(msg.Msg)
}
}
// End of listing objects for healing.
if !healListReply.IsTruncated {
break
}
// Set the marker to list the next set of keys.
markerName = healListReply.NextMarker
}
return nil
}
// Heals your bucket for any missing entries.
func healBucket(authClnt *AuthRPCClient, bucketName string) error {
if authClnt == nil || bucketName == "" {
return errInvalidArgument
}
return authClnt.Call("Control.HealBucketHandler", &HealBucketArgs{
Bucket: bucketName,
}, &GenericReply{})
}
// Entry point for minio control heal command.
func healControl(ctx *cli.Context) {
if ctx.Args().Present() && len(ctx.Args()) != 1 {
cli.ShowCommandHelpAndExit(ctx, "heal", 1)
}
parsedURL, err := url.Parse(ctx.Args()[0])
fatalIf(err, "Unable to parse URL")
parsedURL, err := url.Parse(ctx.Args().Get(0))
fatalIf(err, "Unable to parse URL %s", ctx.Args().Get(0))
authCfg := &authConfig{
accessKey: serverConfig.GetCredential().AccessKeyID,
@ -92,71 +199,19 @@ func healControl(ctx *cli.Context) {
path: path.Join(reservedBucket, controlPath),
loginMethod: "Control.LoginHandler",
}
client := newAuthClient(authCfg)
// Always try to fix disk metadata
fmt.Print("Checking and healing disk metadata..")
args := &GenericArgs{}
reply := &GenericReply{}
err = client.Call("Control.HealDiskMetadataHandler", args, reply)
fatalIf(err, "Unable to heal disk metadata.")
fmt.Println(" ok")
bucketName, objectName := parseBucketObject(parsedURL.Path)
if bucketName == "" {
if parsedURL.Path == "/" || parsedURL.Path == "" {
err = healStorageFormat(client)
fatalIf(err, "Unable to heal disk metadata.")
return
}
// If object does not have trailing "/" then it's an object, hence heal it.
if objectName != "" && !strings.HasSuffix(objectName, slashSeparator) {
fmt.Printf("Healing : /%s/%s\n", bucketName, objectName)
args := &HealObjectArgs{Bucket: bucketName, Object: objectName}
reply := &HealObjectReply{}
err = client.Call("Control.HealObjectHandler", args, reply)
errorIf(err, "Healing object %s failed.", objectName)
return
}
// If object is "" then heal the bucket first.
if objectName == "" {
fmt.Printf("Healing : /%s\n", bucketName)
args := &HealObjectArgs{Bucket: bucketName, Object: ""}
reply := &HealObjectReply{}
err = client.Call("Control.HealObjectHandler", args, reply)
fatalIf(err, "Healing bucket %s failed.", bucketName)
// Continue to heal the objects in the bucket.
}
// Recursively list and heal the objects.
prefix := objectName
marker := ""
for {
args := &HealListArgs{
Bucket: bucketName,
Prefix: prefix,
Marker: marker,
Delimiter: "",
MaxKeys: 1000,
}
reply := &HealListReply{}
err = client.Call("Control.ListObjectsHealHandler", args, reply)
fatalIf(err, "Unable to list objects for healing.")
// Heal the objects returned in the ListObjects reply.
for _, obj := range reply.Objects {
fmt.Printf("Healing : /%s/%s\n", bucketName, obj)
reply := &GenericReply{}
healArgs := &HealObjectArgs{Bucket: bucketName, Object: obj}
err = client.Call("Control.HealObjectHandler", healArgs, reply)
errorIf(err, "Healing object %s failed.", obj)
}
if !reply.IsTruncated {
// End of listing.
break
}
// Set the marker to list the next set of keys.
marker = reply.NextMarker
}
bucketName, prefixName := urlPathSplit(parsedURL.Path)
// Heal the bucket.
err = healBucket(client, bucketName)
fatalIf(err, "Unable to heal bucket %s", bucketName)
// Heal all the objects.
err = healObjects(client, bucketName, prefixName)
fatalIf(err, "Unable to heal objects on bucket %s at prefix %s", bucketName, prefixName)
console.Println()
}

View File

@ -17,6 +17,10 @@
package cmd
import (
"bytes"
"crypto/rand"
"os"
"path"
"testing"
"github.com/minio/cli"
@ -43,7 +47,49 @@ func TestControlHealMain(t *testing.T) {
// run app
err := app.Run(args)
if err != nil {
t.Errorf("Control-Heal-Main test failed with - %s", err.Error())
t.Errorf("Control-Heal-Format-Main test failed with - %s", err.Error())
}
obj := newObjectLayerFn()
// Create "bucket"
err = obj.MakeBucket("bucket")
if err != nil {
t.Fatal(err)
}
bucket := "bucket"
object := "object"
data := make([]byte, 1*1024*1024)
length := int64(len(data))
_, err = rand.Read(data)
if err != nil {
t.Fatal(err)
}
_, err = obj.PutObject(bucket, object, length, bytes.NewReader(data), nil, "")
if err != nil {
t.Fatal(err)
}
// Remove the object - to simulate the case where the disk was down when the object was created.
err = os.RemoveAll(path.Join(testServer.Disks[0], bucket, object))
if err != nil {
t.Fatal(err)
}
args = []string{"./minio", "control", "heal", url + "/bucket"}
// run app
err = app.Run(args)
if err != nil {
t.Errorf("Control-Heal-Bucket-Main test failed with - %s", err.Error())
}
args = []string{"./minio", "control", "heal", url + "/bucket/object"}
// run app
err = app.Run(args)
if err != nil {
t.Errorf("Control-Heal-Bucket-With-Prefix-Main test failed with - %s", err.Error())
}
}

View File

@ -73,9 +73,10 @@ func initRemoteControlClients(srvCmdConfig serverCmdConfig) []*AuthRPCClient {
// operations on server.
type controlAPIHandlers struct {
ObjectAPI func() ObjectLayer
StorageDisks []StorageAPI
IsXL bool
RemoteControls []*AuthRPCClient
LocalNode string
StorageDisks []StorageAPI
}
// Register control RPC handlers.
@ -83,6 +84,7 @@ func registerControlRPCRouter(mux *router.Router, srvCmdConfig serverCmdConfig)
// Initialize Control.
ctrlHandlers := &controlAPIHandlers{
ObjectAPI: newObjectLayerFn,
IsXL: srvCmdConfig.isDistXL || len(srvCmdConfig.storageDisks) > 1,
RemoteControls: initRemoteControlClients(srvCmdConfig),
LocalNode: getLocalAddress(srvCmdConfig),
StorageDisks: srvCmdConfig.storageDisks,

View File

@ -270,77 +270,91 @@ func (s *TestRPCControlSuite) testRPCControlLock(c *testing.T) {
}
func TestControlHealDiskMetadataH(t *testing.T) {
//setup code
// Setup code
s := &TestRPCControlSuite{serverType: "XL"}
s.SetUpSuite(t)
//run test
s.testControlHealDiskMetadataH(t)
// Run test
s.testControlHealFormatH(t)
//teardown code
// Teardown code
s.TearDownSuite(t)
}
// TestControlHandlerHealDiskMetadata - Registers and call the `HealDiskMetadataHandler`, asserts to validate the success.
func (s *TestRPCControlSuite) testControlHealDiskMetadataH(c *testing.T) {
// TestControlHandlerHealFormat - Registers and call the `HealFormatHandler`, asserts to validate the success.
func (s *TestRPCControlSuite) testControlHealFormatH(c *testing.T) {
// The suite has already started the test RPC server, just send RPC calls.
client := newAuthClient(s.testAuthConf)
defer client.Close()
args := &GenericArgs{}
reply := &GenericReply{}
err := client.Call("Control.HealDiskMetadataHandler", args, reply)
err := client.Call("Control.HealFormatHandler", args, reply)
if err != nil {
c.Errorf("Control.HealDiskMetadataH - test failed with <ERROR> %s", err)
c.Errorf("Test failed with <ERROR> %s", err)
}
}
func TestControlHealObjectH(t *testing.T) {
//setup code
// Setup code
s := &TestRPCControlSuite{serverType: "XL"}
s.SetUpSuite(t)
//run test
s.testControlHealObjectH(t)
// Run test
s.testControlHealObjectsH(t)
//teardown code
// Teardown code
s.TearDownSuite(t)
}
func (s *TestRPCControlSuite) testControlHealObjectH(t *testing.T) {
func (s *TestRPCControlSuite) testControlHealObjectsH(t *testing.T) {
client := newAuthClient(s.testAuthConf)
defer client.Close()
err := newObjectLayerFn().MakeBucket("testbucket")
objAPI := newObjectLayerFn()
err := objAPI.MakeBucket("testbucket")
if err != nil {
t.Fatalf(
"Control.HealObjectH - create bucket failed with <ERROR> %s", err)
t.Fatalf("Create bucket failed with <ERROR> %s", err)
}
datum := strings.NewReader("a")
_, err = newObjectLayerFn().PutObject("testbucket", "testobject", 1, datum, nil, "")
_, err = objAPI.PutObject("testbucket", "testobject1", 1, datum, nil, "")
if err != nil {
t.Fatalf("Control.HealObjectH - put object failed with <ERROR> %s", err)
t.Fatalf("Put object failed with <ERROR> %s", err)
}
datum = strings.NewReader("a")
_, err = objAPI.PutObject("testbucket", "testobject2", 1, datum, nil, "")
if err != nil {
t.Fatalf("Put object failed with <ERROR> %s", err)
}
args := &HealObjectArgs{GenericArgs{}, "testbucket", "testobject"}
reply := &GenericReply{}
err = client.Call("Control.HealObjectHandler", args, reply)
args := &HealObjectArgs{
Bucket: "testbucket",
Objects: []ObjectInfo{
{
Name: "testobject1",
}, {
Name: "testobject2",
},
},
}
reply := &HealObjectReply{}
err = client.Call("Control.HealObjectsHandler", args, reply)
if err != nil {
t.Errorf("Control.HealObjectH - test failed with <ERROR> %s", err)
t.Errorf("Test failed with <ERROR> %s", err)
}
}
func TestControlListObjectsHealH(t *testing.T) {
//setup code
// Setup code
s := &TestRPCControlSuite{serverType: "XL"}
s.SetUpSuite(t)
//run test
// Run test
s.testControlListObjectsHealH(t)
//teardown code
// Teardown code
s.TearDownSuite(t)
}
@ -348,17 +362,18 @@ func (s *TestRPCControlSuite) testControlListObjectsHealH(t *testing.T) {
client := newAuthClient(s.testAuthConf)
defer client.Close()
// careate a bucket
err := newObjectLayerFn().MakeBucket("testbucket")
objAPI := newObjectLayerFn()
// Create a bucket
err := objAPI.MakeBucket("testbucket")
if err != nil {
t.Fatalf(
"Control.ListObjectsHealH - create bucket failed - %s", err)
t.Fatalf("Create bucket failed - %s", err)
}
r := strings.NewReader("0")
_, err = newObjectLayerFn().PutObject("testbucket", "testObj-0", 1, r, nil, "")
_, err = objAPI.PutObject("testbucket", "testObj-0", 1, r, nil, "")
if err != nil {
t.Fatalf("Control.ListObjectsHealH - object creation failed - %s", err)
t.Fatalf("Object creation failed - %s", err)
}
args := &HealListArgs{
@ -367,8 +382,7 @@ func (s *TestRPCControlSuite) testControlListObjectsHealH(t *testing.T) {
}
reply := &GenericReply{}
err = client.Call("Control.ListObjectsHealHandler", args, reply)
if err != nil {
t.Errorf("Control.ListObjectsHealHandler - test failed - %s", err)
t.Errorf("Test failed - %s", err)
}
}

View File

@ -111,12 +111,12 @@ func errorCause(err error) error {
// Returns slice of underlying cause error.
func errorsCause(errs []error) []error {
Errs := make([]error, len(errs))
cerrs := make([]error, len(errs))
for i, err := range errs {
if err == nil {
continue
}
Errs[i] = errorCause(err)
cerrs[i] = errorCause(err)
}
return Errs
return cerrs
}

View File

@ -743,9 +743,10 @@ func healFormatXLCorruptedDisks(storageDisks []StorageAPI) error {
// loadFormatXL - loads XL `format.json` and returns back properly
// ordered storage slice based on `format.json`.
func loadFormatXL(bootstrapDisks []StorageAPI) (disks []StorageAPI, err error) {
func loadFormatXL(bootstrapDisks []StorageAPI, readQuorum int) (disks []StorageAPI, err error) {
var unformattedDisksFoundCnt = 0
var diskNotFoundCount = 0
var corruptedDisksFoundCnt = 0
formatConfigs := make([]*formatConfigV1, len(bootstrapDisks))
// Try to load `format.json` bootstrap disks.
@ -763,6 +764,9 @@ func loadFormatXL(bootstrapDisks []StorageAPI) (disks []StorageAPI, err error) {
} else if err == errDiskNotFound {
diskNotFoundCount++
continue
} else if err == errCorruptedFormat {
corruptedDisksFoundCnt++
continue
}
return nil, err
}
@ -771,11 +775,13 @@ func loadFormatXL(bootstrapDisks []StorageAPI) (disks []StorageAPI, err error) {
}
// If all disks indicate that 'format.json' is not available return 'errUnformattedDisk'.
if unformattedDisksFoundCnt > len(bootstrapDisks)-(len(bootstrapDisks)/2+1) {
if unformattedDisksFoundCnt > len(bootstrapDisks)-readQuorum {
return nil, errUnformattedDisk
} else if corruptedDisksFoundCnt > len(bootstrapDisks)-readQuorum {
return nil, errCorruptedFormat
} else if diskNotFoundCount == len(bootstrapDisks) {
return nil, errDiskNotFound
} else if diskNotFoundCount > len(bootstrapDisks)-(len(bootstrapDisks)/2+1) {
} else if diskNotFoundCount > len(bootstrapDisks)-readQuorum {
return nil, errXLReadQuorum
}

View File

@ -292,7 +292,7 @@ func TestFormatXLHealFreshDisks(t *testing.T) {
}
// Load again XL format.json to validate it
_, err = loadFormatXL(storageDisks)
_, err = loadFormatXL(storageDisks, 8)
if err != nil {
t.Fatal("loading healed disk failed: ", err)
}
@ -322,7 +322,7 @@ func TestFormatXLHealFreshDisksErrorExpected(t *testing.T) {
prepareNOfflineDisks(storageDisks, 16, t)
// Load again XL format.json to validate it
_, err = loadFormatXL(storageDisks)
_, err = loadFormatXL(storageDisks, 8)
if err == nil {
t.Fatal("loading format disk error")
}
@ -401,7 +401,7 @@ func TestFormatXLHealCorruptedDisks(t *testing.T) {
}
// Load again XL format.json to validate it
_, err = loadFormatXL(permutedStorageDisks)
_, err = loadFormatXL(permutedStorageDisks, 8)
if err != nil {
t.Fatal("loading healed disk failed: ", err)
}
@ -709,7 +709,7 @@ func TestLoadFormatXLErrs(t *testing.T) {
t.Fatal("storage disk is not *posix type")
}
xl.storageDisks[10] = newNaughtyDisk(posixDisk, nil, errFaultyDisk)
if _, err = loadFormatXL(xl.storageDisks); err != errFaultyDisk {
if _, err = loadFormatXL(xl.storageDisks, 8); err != errFaultyDisk {
t.Fatal("Got an unexpected error: ", err)
}
@ -735,7 +735,7 @@ func TestLoadFormatXLErrs(t *testing.T) {
}
xl.storageDisks[i] = newNaughtyDisk(posixDisk, nil, errDiskNotFound)
}
if _, err = loadFormatXL(xl.storageDisks); err != errXLReadQuorum {
if _, err = loadFormatXL(xl.storageDisks, 8); err != errXLReadQuorum {
t.Fatal("Got an unexpected error: ", err)
}
@ -757,7 +757,7 @@ func TestLoadFormatXLErrs(t *testing.T) {
t.Fatal(err)
}
}
if _, err = loadFormatXL(xl.storageDisks); err != errUnformattedDisk {
if _, err = loadFormatXL(xl.storageDisks, 8); err != errUnformattedDisk {
t.Fatal("Got an unexpected error: ", err)
}
@ -777,7 +777,7 @@ func TestLoadFormatXLErrs(t *testing.T) {
for i := 0; i < 16; i++ {
xl.storageDisks[i] = nil
}
if _, err := loadFormatXL(xl.storageDisks); err != errDiskNotFound {
if _, err := loadFormatXL(xl.storageDisks, 8); err != errDiskNotFound {
t.Fatal("Got an unexpected error: ", err)
}
}

View File

@ -643,7 +643,12 @@ func (fs fsObjects) HealObject(bucket, object string) error {
return traceError(NotImplemented{})
}
// HealListObjects - list objects for healing. Valid only for XL
// HealBucket - no-op for fs, Valid only for XL.
func (fs fsObjects) HealBucket(bucket string) error {
return traceError(NotImplemented{})
}
// ListObjectsHeal - list all objects to be healed. Valid only for XL
func (fs fsObjects) ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
return ListObjectsInfo{}, traceError(NotImplemented{})
}

View File

@ -76,6 +76,8 @@ var (
// global colors.
var (
colorBlue = color.New(color.FgBlue).SprintfFunc()
colorBold = color.New(color.Bold).SprintFunc()
colorRed = color.New(color.FgRed).SprintFunc()
colorBold = color.New(color.Bold).SprintFunc()
colorBlue = color.New(color.FgBlue).SprintfFunc()
colorGreen = color.New(color.FgGreen).SprintfFunc()
)

View File

@ -30,14 +30,12 @@ type ObjectLayer interface {
ListBuckets() (buckets []BucketInfo, err error)
DeleteBucket(bucket string) error
ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error)
ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error)
// Object operations.
GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) (err error)
GetObjectInfo(bucket, object string) (objInfo ObjectInfo, err error)
PutObject(bucket, object string, size int64, data io.Reader, metadata map[string]string, sha256sum string) (objInto ObjectInfo, err error)
DeleteObject(bucket, object string) error
HealObject(bucket, object string) error
// Multipart operations.
ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error)
@ -46,4 +44,9 @@ type ObjectLayer interface {
ListObjectParts(bucket, object, uploadID string, partNumberMarker int, maxParts int) (result ListPartsInfo, err error)
AbortMultipartUpload(bucket, object, uploadID string) error
CompleteMultipartUpload(bucket, object, uploadID string, uploadedParts []completePart) (md5 string, err error)
// Healing operations.
HealBucket(bucket string) error
HealObject(bucket, object string) error
ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error)
}

92
cmd/scan-bar.go Normal file
View File

@ -0,0 +1,92 @@
/*
* Minio Cloud Storage (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"fmt"
"runtime"
"strings"
"github.com/cheggaaa/pb"
"github.com/dustin/go-humanize"
"github.com/minio/mc/pkg/console"
)
// fixateScanBar truncates or stretches text to fit within the terminal size.
func fixateScanBar(text string, width int) string {
if len([]rune(text)) > width {
// Trim text to fit within the screen
trimSize := len([]rune(text)) - width + 3 //"..."
if trimSize < len([]rune(text)) {
text = "..." + text[trimSize:]
}
} else {
text += strings.Repeat(" ", width-len([]rune(text)))
}
return text
}
// Progress bar function report objects being scaned.
type scanBarFunc func(string)
// scanBarFactory returns a progress bar function to report URL scanning.
func scanBarFactory() scanBarFunc {
fileCount := 0
termWidth, err := pb.GetTerminalWidth()
if err != nil {
termWidth = 80
}
// Cursor animate channel.
cursorCh := cursorAnimate()
return func(source string) {
scanPrefix := fmt.Sprintf("[%s] %s ", humanize.Comma(int64(fileCount)), string(<-cursorCh))
source = fixateScanBar(source, termWidth-len([]rune(scanPrefix)))
barText := scanPrefix + source
console.PrintC("\r" + barText + "\r")
fileCount++
}
}
// cursorAnimate - returns a animated rune through read channel for every read.
func cursorAnimate() <-chan rune {
cursorCh := make(chan rune)
var cursors string
switch runtime.GOOS {
case "linux":
// cursors = "➩➪➫➬➭➮➯➱"
// cursors = "▁▃▄▅▆▇█▇▆▅▄▃"
cursors = "◐◓◑◒"
// cursors = "←↖↑↗→↘↓↙"
// cursors = "◴◷◶◵"
// cursors = "◰◳◲◱"
//cursors = "⣾⣽⣻⢿⡿⣟⣯⣷"
case "darwin":
cursors = "◐◓◑◒"
default:
cursors = "|/-\\"
}
go func() {
for {
for _, cursor := range cursors {
cursorCh <- cursor
}
}
}()
return cursorCh
}

View File

@ -163,6 +163,19 @@ func contains(stringList []string, element string) bool {
return false
}
// urlPathSplit - split url path into bucket and object components.
func urlPathSplit(urlPath string) (bucketName, prefixName string) {
if urlPath == "" {
return urlPath, ""
}
urlPath = strings.TrimPrefix(urlPath, "/")
i := strings.Index(urlPath, "/")
if i != -1 {
return urlPath[:i], urlPath[i+1:]
}
return urlPath, ""
}
// Starts a profiler returns nil if profiler is not enabled, caller needs to handle this.
func startProfiler(profiler string) interface {
Stop()

View File

@ -124,6 +124,48 @@ func TestMaxObjectSize(t *testing.T) {
}
}
// Test urlPathSplit.
func TestURLPathSplit(t *testing.T) {
type test struct {
urlPath string
bucketName string
prefixName string
}
testCases := []test{
{
urlPath: "/b/c/",
bucketName: "b",
prefixName: "c/",
},
{
urlPath: "c/aa",
bucketName: "c",
prefixName: "aa",
},
{
urlPath: "",
bucketName: "",
prefixName: "",
},
{
urlPath: "/b",
bucketName: "b",
prefixName: "",
},
}
for i, testCase := range testCases {
bucketName, prefixName := urlPathSplit(testCase.urlPath)
if bucketName != testCase.bucketName {
t.Errorf("Tets %d: Expected %s, %s", i+1, testCase.bucketName, bucketName)
}
if prefixName != testCase.prefixName {
t.Errorf("Tets %d: Expected %s, %s", i+1, testCase.bucketName, bucketName)
}
}
}
// Tests minimum allowed part size.
func TestMinAllowedPartSize(t *testing.T) {
sizes := []struct {

View File

@ -305,27 +305,3 @@ func (xl xlObjects) DeleteBucket(bucket string) error {
// Success.
return nil
}
// Heal bucket - create buckets on disks where it does not exist.
func healBucket(disks []StorageAPI, bucket string) error {
bucketFound := false
for _, disk := range disks {
_, err := disk.StatVol(bucket)
if err == nil {
bucketFound = true
}
}
if !bucketFound {
return traceError(errVolumeNotFound)
}
for _, disk := range disks {
err := disk.MakeVol(bucket)
if err == nil {
continue
}
if err != errVolumeExists {
return traceError(err)
}
}
return nil
}

View File

@ -17,55 +17,59 @@
package cmd
import (
"path"
"sort"
"strings"
)
func listDirHealFactory(disks ...StorageAPI) listDirFunc {
func listDirHealFactory(isLeaf isLeafFunc, disks ...StorageAPI) listDirFunc {
// Returns sorted merged entries from all the disks.
listDir := func(bucket, prefixDir, prefixEntry string) (mergedentries []string, delayIsLeaf bool, err error) {
listDir := func(bucket, prefixDir, prefixEntry string) (mergedEntries []string, delayIsLeaf bool, err error) {
for _, disk := range disks {
if disk == nil {
continue
}
var entries []string
var newEntries []string
entries, err = disk.ListDir(bucket, prefixDir)
if err != nil {
// Skip the disk of listDir returns error.
continue
}
// Listing needs to be sorted.
sort.Strings(entries)
// Filter entries that have the prefix prefixEntry.
entries = filterMatchingPrefix(entries, prefixEntry)
// isLeaf() check has to happen here so that trailing "/" for objects can be removed.
for i, entry := range entries {
if strings.HasSuffix(entry, slashSeparator) {
if _, err = disk.StatFile(bucket, path.Join(prefixDir, entry, xlMetaJSONFile)); err == nil {
// If it is an object trim the trailing "/"
entries[i] = strings.TrimSuffix(entry, slashSeparator)
}
if isLeaf(bucket, pathJoin(prefixDir, entry)) {
entries[i] = strings.TrimSuffix(entry, slashSeparator)
}
}
if len(mergedentries) == 0 {
// Sort again after removing trailing "/" for objects as the previous sort
// does not hold good anymore.
sort.Strings(entries)
if len(mergedEntries) == 0 {
// For the first successful disk.ListDir()
mergedentries = entries
sort.Strings(mergedentries)
mergedEntries = entries
sort.Strings(mergedEntries)
continue
}
// find elements in entries which are not in mergedentries
for _, entry := range entries {
idx := sort.SearchStrings(mergedentries, entry)
if mergedentries[idx] == entry {
idx := sort.SearchStrings(mergedEntries, entry)
if mergedEntries[idx] == entry {
continue
}
newEntries = append(newEntries, entry)
}
if len(newEntries) > 0 {
// Merge the entries and sort it.
mergedentries = append(mergedentries, newEntries...)
sort.Strings(mergedentries)
mergedEntries = append(mergedEntries, newEntries...)
sort.Strings(mergedEntries)
}
}
return mergedentries, false, nil
return mergedEntries, false, nil
}
return listDir
}
@ -83,7 +87,8 @@ func (xl xlObjects) listObjectsHeal(bucket, prefix, marker, delimiter string, ma
walkResultCh, endWalkCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix, heal})
if walkResultCh == nil {
endWalkCh = make(chan struct{})
listDir := listDirHealFactory(xl.storageDisks...)
isLeaf := xl.isObject
listDir := listDirHealFactory(isLeaf, xl.storageDisks...)
walkResultCh = startTreeWalk(bucket, prefix, marker, recursive, listDir, nil, endWalkCh)
}

View File

@ -218,7 +218,70 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
return nil
}
// HealObject - heal the object.
func (xl xlObjects) HealBucket(bucket string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return traceError(BucketNameInvalid{Bucket: bucket})
}
// Heal bucket - create buckets on disks where it does not exist.
// get a random ID for lock instrumentation.
opsID := getOpsID()
nsMutex.Lock(bucket, "", opsID)
defer nsMutex.Unlock(bucket, "", opsID)
// Initialize sync waitgroup.
var wg = &sync.WaitGroup{}
// Initialize list of errors.
var dErrs = make([]error, len(xl.storageDisks))
// Make a volume entry on all underlying storage disks.
for index, disk := range xl.storageDisks {
if disk == nil {
dErrs[index] = traceError(errDiskNotFound)
continue
}
wg.Add(1)
// Make a volume inside a go-routine.
go func(index int, disk StorageAPI) {
defer wg.Done()
if _, err := disk.StatVol(bucket); err != nil {
if err != errVolumeNotFound {
dErrs[index] = traceError(err)
return
}
if err = disk.MakeVol(bucket); err != nil {
dErrs[index] = traceError(err)
}
}
}(index, disk)
}
// Wait for all make vol to finish.
wg.Wait()
// Do we have write quorum?.
if !isDiskQuorum(dErrs, xl.writeQuorum) {
// Purge successfully created buckets if we don't have writeQuorum.
xl.undoMakeBucket(bucket)
return toObjectErr(traceError(errXLWriteQuorum), bucket)
}
// Verify we have any other errors which should be returned as failure.
if reducedErr := reduceErrs(dErrs, []error{
errDiskNotFound,
errFaultyDisk,
errDiskAccessDenied,
}); reducedErr != nil {
return toObjectErr(reducedErr, bucket)
}
return nil
}
// HealObject heals a given object for all its missing entries.
// FIXME: If an object object was deleted and one disk was down, and later the disk comes back
// up again, heal on the object should delete it.
func (xl xlObjects) HealObject(bucket, object string) error {
@ -226,14 +289,9 @@ func (xl xlObjects) HealObject(bucket, object string) error {
if !IsValidBucketName(bucket) {
return traceError(BucketNameInvalid{Bucket: bucket})
}
if object == "" {
// Empty object name indicates that bucket should be healed.
return healBucket(xl.storageDisks, bucket)
}
// Verify if object is valid.
if !IsValidObjectName(object) {
// FIXME: return Invalid prefix.
return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
}
@ -277,8 +335,7 @@ func (xl xlObjects) HealObject(bucket, object string) error {
outDatedMeta := partsMetadata[index]
// Delete all the parts.
for partIndex := 0; partIndex < len(outDatedMeta.Parts); partIndex++ {
err := disk.DeleteFile(bucket,
pathJoin(object, outDatedMeta.Parts[partIndex].Name))
err := disk.DeleteFile(bucket, pathJoin(object, outDatedMeta.Parts[partIndex].Name))
if err != nil {
return traceError(err)
}
@ -298,8 +355,8 @@ func (xl xlObjects) HealObject(bucket, object string) error {
// We write at temporary location and then rename to fianal location.
tmpID := getUUID()
// Checksum of the part files. checkSumInfos[index] will contain checksums of all the part files
// in the outDatedDisks[index]
// Checksum of the part files. checkSumInfos[index] will contain checksums
// of all the part files in the outDatedDisks[index]
checkSumInfos := make([][]checkSumInfo, len(outDatedDisks))
// Heal each part. erasureHealFile() will write the healed part to
@ -336,6 +393,8 @@ func (xl xlObjects) HealObject(bucket, object string) error {
partsMetadata[index] = latestMeta
partsMetadata[index].Erasure.Checksum = checkSumInfos[index]
}
// Generate and write `xl.json` generated from other disks.
err := writeUniqueXLMetadata(outDatedDisks, minioMetaBucket, pathJoin(tmpMetaPrefix, tmpID), partsMetadata, diskCount(outDatedDisks))
if err != nil {
return toObjectErr(err, bucket, object)
@ -346,7 +405,13 @@ func (xl xlObjects) HealObject(bucket, object string) error {
if disk == nil {
continue
}
err := disk.RenameFile(minioMetaBucket, retainSlash(pathJoin(tmpMetaPrefix, tmpID)), bucket, retainSlash(object))
// Remove any lingering partial data from current namespace.
err = disk.DeleteFile(bucket, retainSlash(object))
if err != nil && err != errFileNotFound {
return traceError(err)
}
// Attempt a rename now from healed data to final location.
err = disk.RenameFile(minioMetaBucket, retainSlash(pathJoin(tmpMetaPrefix, tmpID)), bucket, retainSlash(object))
if err != nil {
return traceError(err)
}

View File

@ -265,7 +265,8 @@ func TestPutObjectNoQuorum(t *testing.T) {
removeRoots(fsDirs)
}
func TestHealObject(t *testing.T) {
// Tests both object and bucket healing.
func TestHealing(t *testing.T) {
obj, fsDirs, err := prepareXL()
if err != nil {
t.Fatal(err)
@ -346,14 +347,14 @@ func TestHealObject(t *testing.T) {
t.Fatal("HealObject failed")
}
// Remove the bucket - to simulate the case where bucket was created when the
// disk was down.
// Remove the bucket - to simulate the case where bucket was
// created when the disk was down.
err = os.RemoveAll(path.Join(fsDirs[0], bucket))
if err != nil {
t.Fatal(err)
}
// This would create the bucket.
err = xl.HealObject(bucket, "")
err = xl.HealBucket(bucket)
if err != nil {
t.Fatal(err)
}

View File

@ -75,7 +75,7 @@ var xlTreeWalkIgnoredErrs = []error{
errFaultyDisk,
}
func repairDiskMetadata(storageDisks []StorageAPI) error {
func healFormatXL(storageDisks []StorageAPI) error {
// Attempt to load all `format.json`.
formatConfigs, sErrs := loadAllFormats(storageDisks)
@ -116,14 +116,17 @@ func newXLObjects(storageDisks []StorageAPI) (ObjectLayer, error) {
return nil, err
}
readQuorum := len(storageDisks) / 2
writeQuorum := len(storageDisks)/2 + 1
// Load saved XL format.json and validate.
newPosixDisks, err := loadFormatXL(storageDisks)
newStorageDisks, err := loadFormatXL(storageDisks, readQuorum)
if err != nil {
return nil, fmt.Errorf("Unable to recognize backend format, %s", err)
}
// Calculate data and parity blocks.
dataBlocks, parityBlocks := len(newPosixDisks)/2, len(newPosixDisks)/2
dataBlocks, parityBlocks := len(newStorageDisks)/2, len(newStorageDisks)/2
// Initialize object cache.
objCache := objcache.New(globalMaxCacheSize, globalCacheExpiry)
@ -133,7 +136,7 @@ func newXLObjects(storageDisks []StorageAPI) (ObjectLayer, error) {
// Initialize xl objects.
xl := xlObjects{
storageDisks: newPosixDisks,
storageDisks: newStorageDisks,
dataBlocks: dataBlocks,
parityBlocks: parityBlocks,
listPool: listPool,
@ -143,8 +146,8 @@ func newXLObjects(storageDisks []StorageAPI) (ObjectLayer, error) {
// Figure out read and write quorum based on number of storage disks.
// READ and WRITE quorum is always set to (N/2) number of disks.
xl.readQuorum = len(xl.storageDisks) / 2
xl.writeQuorum = len(xl.storageDisks)/2 + 1
xl.readQuorum = readQuorum
xl.writeQuorum = writeQuorum
// Return successfully initialized object layer.
return xl, nil
@ -156,16 +159,6 @@ func (xl xlObjects) Shutdown() error {
return nil
}
// HealDiskMetadata function for object storage interface.
func (xl xlObjects) HealDiskMetadata() error {
// get a random ID for lock instrumentation.
opsID := getOpsID()
nsMutex.Lock(minioMetaBucket, formatConfigFile, opsID)
defer nsMutex.Unlock(minioMetaBucket, formatConfigFile, opsID)
return repairDiskMetadata(xl.storageDisks)
}
// byDiskTotal is a collection satisfying sort.Interface.
type byDiskTotal []disk.Info