From e2498edb4580b6c941b70f2c480de90fa08dd867 Mon Sep 17 00:00:00 2001 From: Krishna Srinivas Date: Thu, 18 Aug 2016 00:06:33 +0530 Subject: [PATCH] contoller: Implement controlled healing and trigger (#2381) This patch introduces new command line 'control' - minio control TO manage minio server connecting through GoRPC API frontend. - minio control heal Is implemented for healing objects. --- control-main.go | 134 ++++++++++++++++++++++++ erasure-healfile.go | 84 +++++++++++++++ erasure-healfile_test.go | 123 ++++++++++++++++++++++ fs-v1-multipart.go | 5 +- fs-v1.go | 15 ++- main.go | 1 + object-errors.go | 7 ++ object-interface.go | 2 + routers.go | 6 ++ rpc-control.go | 85 +++++++++++++++ tree-walk-pool.go | 1 + xl-v1-healing.go | 66 ++++++++---- xl-v1-list-objects-heal.go | 206 +++++++++++++++++++++++++++++++++++++ xl-v1-list-objects.go | 5 +- xl-v1-multipart.go | 5 +- xl-v1-object.go | 128 +++++++++++++++++++++++ 16 files changed, 846 insertions(+), 27 deletions(-) create mode 100644 control-main.go create mode 100644 erasure-healfile.go create mode 100644 erasure-healfile_test.go create mode 100644 rpc-control.go create mode 100644 xl-v1-list-objects-heal.go diff --git a/control-main.go b/control-main.go new file mode 100644 index 000000000..219cb533c --- /dev/null +++ b/control-main.go @@ -0,0 +1,134 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "fmt" + "strings" + + "net/rpc" + "net/url" + + "github.com/minio/cli" +) + +// "minio control" command. +var controlCmd = cli.Command{ + Name: "control", + Usage: "Control and manage minio server.", + Action: mainControl, + Subcommands: []cli.Command{ + healCmd, + }, +} + +func mainControl(c *cli.Context) { + cli.ShowCommandHelp(c, "") +} + +var healCmd = cli.Command{ + Name: "heal", + Usage: "To heal objects.", + Action: healControl, + CustomHelpTemplate: `NAME: + minio {{.Name}} - {{.Usage}} + +USAGE: + minio {{.Name}} heal + +EAMPLES: + 1. Heal an object. + $ minio control heal http://localhost:9000/songs/classical/western/piano.mp3 + + 2. Heal all objects in a bucket recursively. + $ minio control heal http://localhost:9000/songs + + 3. Heall all objects with a given prefix recursively. + $ minio control heal http://localhost:9000/songs/classical/ +`, +} + +// "minio control heal" entry point. +func healControl(c *cli.Context) { + // Parse bucket and object from url.URL.Path + parseBucketObject := func(path string) (bucketName string, objectName string) { + splits := strings.SplitN(path, string(slashSeparator), 3) + switch len(splits) { + case 0, 1: + bucketName = "" + objectName = "" + case 2: + bucketName = splits[1] + objectName = "" + case 3: + bucketName = splits[1] + objectName = splits[2] + + } + return bucketName, objectName + } + + if len(c.Args()) != 1 { + cli.ShowCommandHelpAndExit(c, "heal", 1) + } + + parsedURL, err := url.ParseRequestURI(c.Args()[0]) + fatalIf(err, "Unable to parse URL") + + bucketName, objectName := parseBucketObject(parsedURL.Path) + if bucketName == "" { + cli.ShowCommandHelpAndExit(c, "heal", 1) + } + + client, err := rpc.DialHTTPPath("tcp", parsedURL.Host, healPath) + fatalIf(err, "Unable to connect to %s", parsedURL.Host) + + // If object does not have trailing "/" then it's an object, hence heal it. + if objectName != "" && !strings.HasSuffix(objectName, slashSeparator) { + fmt.Printf("Healing : /%s/%s", bucketName, objectName) + args := &HealObjectArgs{bucketName, objectName} + reply := &HealObjectReply{} + err = client.Call("Heal.HealObject", args, reply) + fatalIf(err, "RPC Heal.HealObject call failed") + fmt.Println() + return + } + + // Recursively list and heal the objects. + prefix := objectName + marker := "" + for { + args := HealListArgs{bucketName, prefix, marker, "", 1000} + reply := &HealListReply{} + err = client.Call("Heal.ListObjects", args, reply) + fatalIf(err, "RPC Heal.ListObjects call failed") + + // Heal the objects returned in the ListObjects reply. + for _, obj := range reply.Objects { + fmt.Printf("Healing : /%s/%s", bucketName, obj) + reply := &HealObjectReply{} + err = client.Call("Heal.HealObject", HealObjectArgs{bucketName, obj}, reply) + fatalIf(err, "RPC Heal.HealObject call failed") + fmt.Println() + } + if !reply.IsTruncated { + // End of listing. + break + } + marker = reply.NextMarker + } +} diff --git a/erasure-healfile.go b/erasure-healfile.go new file mode 100644 index 000000000..943f3376e --- /dev/null +++ b/erasure-healfile.go @@ -0,0 +1,84 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import "encoding/hex" + +// Heals the erasure coded file. reedsolomon.Reconstruct() is used to reconstruct the missing parts. +func erasureHealFile(latestDisks []StorageAPI, outDatedDisks []StorageAPI, volume, path, healBucket, healPath string, size int64, blockSize int64, dataBlocks int, parityBlocks int, algo string) (checkSums []string, err error) { + var offset int64 + remainingSize := size + + // Hash for bitrot protection. + hashWriters := newHashWriters(len(outDatedDisks), bitRotAlgo) + + for remainingSize > 0 { + curBlockSize := blockSize + if remainingSize < curBlockSize { + curBlockSize = remainingSize + } + + // Calculate the block size that needs to be read from each disk. + curEncBlockSize := getChunkSize(curBlockSize, dataBlocks) + + // Memory for reading data from disks and reconstructing missing data using erasure coding. + enBlocks := make([][]byte, len(latestDisks)) + + // Read data from the latest disks. + // FIXME: no need to read from all the disks. dataBlocks+1 is enough. + for index, disk := range latestDisks { + if disk == nil { + continue + } + enBlocks[index] = make([]byte, curEncBlockSize) + _, err := disk.ReadFile(volume, path, offset, enBlocks[index]) + if err != nil { + enBlocks[index] = nil + } + } + + // Reconstruct missing data. + err := decodeData(enBlocks, dataBlocks, parityBlocks) + if err != nil { + return nil, err + } + + // Write to the healPath file. + for index, disk := range outDatedDisks { + if disk == nil { + continue + } + err := disk.AppendFile(healBucket, healPath, enBlocks[index]) + if err != nil { + return nil, err + } + hashWriters[index].Write(enBlocks[index]) + } + remainingSize -= curBlockSize + offset += curEncBlockSize + } + + // Checksums for the bit rot. + checkSums = make([]string, len(outDatedDisks)) + for index, disk := range outDatedDisks { + if disk == nil { + continue + } + checkSums[index] = hex.EncodeToString(hashWriters[index].Sum(nil)) + } + return checkSums, nil +} diff --git a/erasure-healfile_test.go b/erasure-healfile_test.go new file mode 100644 index 000000000..59a00cce5 --- /dev/null +++ b/erasure-healfile_test.go @@ -0,0 +1,123 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "bytes" + "crypto/rand" + "os" + "path" + "testing" +) + +// Test erasureHealFile() +func TestErasureHealFile(t *testing.T) { + // Initialize environment needed for the test. + dataBlocks := 7 + parityBlocks := 7 + blockSize := int64(blockSizeV1) + setup, err := newErasureTestSetup(dataBlocks, parityBlocks, blockSize) + if err != nil { + t.Error(err) + return + } + defer setup.Remove() + + disks := setup.disks + + // Prepare a slice of 1MB with random data. + data := make([]byte, 1*1024*1024) + _, err = rand.Read(data) + if err != nil { + t.Fatal(err) + } + // Create a test file. + size, checkSums, err := erasureCreateFile(disks, "testbucket", "testobject1", bytes.NewReader(data), blockSize, dataBlocks, parityBlocks, bitRotAlgo, dataBlocks+1) + if err != nil { + t.Fatal(err) + } + if size != int64(len(data)) { + t.Errorf("erasureCreateFile returned %d, expected %d", size, len(data)) + } + + latest := make([]StorageAPI, len(disks)) // Slice of latest disks + outDated := make([]StorageAPI, len(disks)) // Slice of outdated disks + + // Test case when one part needs to be healed. + dataPath := path.Join(setup.diskPaths[0], "testbucket", "testobject1") + err = os.Remove(dataPath) + if err != nil { + t.Fatal(err) + } + copy(latest, disks) + latest[0] = nil + outDated[0] = disks[0] + healCheckSums, err := erasureHealFile(latest, outDated, "testbucket", "testobject1", "testbucket", "testobject1", 1*1024*1024, blockSize, dataBlocks, parityBlocks, bitRotAlgo) + // Checksum of the healed file should match. + if checkSums[0] != healCheckSums[0] { + t.Error("Healing failed, data does not match.") + } + + // Test case when parityBlocks number of disks need to be healed. + // Should succeed. + copy(latest, disks) + for index := 0; index < parityBlocks; index++ { + dataPath := path.Join(setup.diskPaths[index], "testbucket", "testobject1") + err = os.Remove(dataPath) + if err != nil { + t.Fatal(err) + } + + latest[index] = nil + outDated[index] = disks[index] + } + + healCheckSums, err = erasureHealFile(latest, outDated, "testbucket", "testobject1", "testbucket", "testobject1", 1*1024*1024, blockSize, dataBlocks, parityBlocks, bitRotAlgo) + if err != nil { + t.Fatal(err) + } + + // Checksums of the healed files should match. + for index := 0; index < parityBlocks; index++ { + if checkSums[index] != healCheckSums[index] { + t.Error("Healing failed, data does not match.") + } + } + for index := dataBlocks; index < len(disks); index++ { + if healCheckSums[index] != "" { + t.Errorf("expected healCheckSums[%d] to be empty", index) + } + } + + // Test case when parityBlocks+1 number of disks need to be healed. + // Should fail. + copy(latest, disks) + for index := 0; index < parityBlocks+1; index++ { + dataPath := path.Join(setup.diskPaths[index], "testbucket", "testobject1") + err = os.Remove(dataPath) + if err != nil { + t.Fatal(err) + } + + latest[index] = nil + outDated[index] = disks[index] + } + healCheckSums, err = erasureHealFile(latest, outDated, "testbucket", "testobject1", "testbucket", "testobject1", 1*1024*1024, blockSize, dataBlocks, parityBlocks, bitRotAlgo) + if err == nil { + t.Error("Expected erasureHealFile() to fail when the number of available disks <= parityBlocks") + } +} diff --git a/fs-v1-multipart.go b/fs-v1-multipart.go index ad6f30dc1..998a957e0 100644 --- a/fs-v1-multipart.go +++ b/fs-v1-multipart.go @@ -68,8 +68,9 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark } var walkResultCh chan treeWalkResult var endWalkCh chan struct{} + heal := false // true only for xl.ListObjectsHeal() if maxUploads > 0 { - walkResultCh, endWalkCh = fs.listPool.Release(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath}) + walkResultCh, endWalkCh = fs.listPool.Release(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath, heal}) if walkResultCh == nil { endWalkCh = make(chan struct{}) isLeaf := fs.isMultipartUpload @@ -144,7 +145,7 @@ func (fs fsObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark if !eof { // Save the go-routine state in the pool so that it can continue from where it left off on // the next request. - fs.listPool.Set(listParams{bucket, recursive, result.NextKeyMarker, prefix}, walkResultCh, endWalkCh) + fs.listPool.Set(listParams{bucket, recursive, result.NextKeyMarker, prefix, heal}, walkResultCh, endWalkCh) } result.IsTruncated = !eof diff --git a/fs-v1.go b/fs-v1.go index b786fd0a2..ea45329e4 100644 --- a/fs-v1.go +++ b/fs-v1.go @@ -574,7 +574,8 @@ func (fs fsObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey recursive = false } - walkResultCh, endWalkCh := fs.listPool.Release(listParams{bucket, recursive, marker, prefix}) + heal := false // true only for xl.ListObjectsHeal() + walkResultCh, endWalkCh := fs.listPool.Release(listParams{bucket, recursive, marker, prefix, heal}) if walkResultCh == nil { endWalkCh = make(chan struct{}) isLeaf := func(bucket, object string) bool { @@ -616,7 +617,7 @@ func (fs fsObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey } i++ } - params := listParams{bucket, recursive, nextMarker, prefix} + params := listParams{bucket, recursive, nextMarker, prefix, heal} if !eof { fs.listPool.Set(params, walkResultCh, endWalkCh) } @@ -643,3 +644,13 @@ func (fs fsObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey func (fs fsObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { return fs.listObjects(bucket, prefix, marker, delimiter, maxKeys) } + +// HealObject - no-op for fs. Valid only for XL. +func (fs fsObjects) HealObject(bucket, object string) error { + return NotImplemented{} +} + +// HealListObjects - list objects for healing. Valid only for XL +func (fs fsObjects) ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { + return ListObjectsInfo{}, NotImplemented{} +} diff --git a/main.go b/main.go index 0e6de9306..3969e8f72 100644 --- a/main.go +++ b/main.go @@ -105,6 +105,7 @@ func registerApp() *cli.App { registerCommand(serverCmd) registerCommand(versionCmd) registerCommand(updateCmd) + registerCommand(controlCmd) // Set up app. app := cli.NewApp() diff --git a/object-errors.go b/object-errors.go index 6611f909f..c7cccd04f 100644 --- a/object-errors.go +++ b/object-errors.go @@ -257,3 +257,10 @@ type PartTooSmall struct { func (e PartTooSmall) Error() string { return fmt.Sprintf("Part size for %d should be atleast 5MB", e.PartNumber) } + +// NotImplemented If a feature is not implemented +type NotImplemented struct{} + +func (e NotImplemented) Error() string { + return "Not Implemented" +} diff --git a/object-interface.go b/object-interface.go index 839fab282..500a80aa7 100644 --- a/object-interface.go +++ b/object-interface.go @@ -30,12 +30,14 @@ type ObjectLayer interface { ListBuckets() (buckets []BucketInfo, err error) DeleteBucket(bucket string) error ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) + ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) // Object operations. GetObject(bucket, object string, startOffset int64, length int64, writer io.Writer) (err error) GetObjectInfo(bucket, object string) (objInfo ObjectInfo, err error) PutObject(bucket, object string, size int64, data io.Reader, metadata map[string]string) (md5 string, err error) DeleteObject(bucket, object string) error + HealObject(bucket, object string) error // Multipart operations. ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) diff --git a/routers.go b/routers.go index b13f6342d..05bb8d36b 100644 --- a/routers.go +++ b/routers.go @@ -95,6 +95,12 @@ func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler { // Register all routers. registerStorageRPCRouter(mux, storageRPC) + // FIXME: till net/rpc auth is brought in "minio control" can be enabled only though + // this env variable. + if os.Getenv("MINIO_CONTROL") != "" { + registerControlRPCRouter(mux, objAPI) + } + // set environmental variable MINIO_BROWSER=off to disable minio web browser. // By default minio web browser is enabled. if !strings.EqualFold(os.Getenv("MINIO_BROWSER"), "off") { diff --git a/rpc-control.go b/rpc-control.go new file mode 100644 index 000000000..6c2b0a094 --- /dev/null +++ b/rpc-control.go @@ -0,0 +1,85 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "net/rpc" + + router "github.com/gorilla/mux" +) + +// Routes paths for "minio control" commands. +const ( + controlRPCPath = reservedBucket + "/control" + healPath = controlRPCPath + "/heal" +) + +// Register control RPC handlers. +func registerControlRPCRouter(mux *router.Router, objAPI ObjectLayer) { + healRPCServer := rpc.NewServer() + healRPCServer.RegisterName("Heal", &healHandler{objAPI}) + mux.Path(healPath).Handler(healRPCServer) +} + +// Handler for object healing. +type healHandler struct { + ObjectAPI ObjectLayer +} + +// HealListArgs - argument for ListObjects RPC. +type HealListArgs struct { + Bucket string + Prefix string + Marker string + Delimiter string + MaxKeys int +} + +// HealListReply - reply by ListObjects RPC. +type HealListReply struct { + IsTruncated bool + NextMarker string + Objects []string +} + +// ListObjects - list objects. +func (h healHandler) ListObjects(arg *HealListArgs, reply *HealListReply) error { + info, err := h.ObjectAPI.ListObjectsHeal(arg.Bucket, arg.Prefix, arg.Marker, arg.Delimiter, arg.MaxKeys) + if err != nil { + return err + } + reply.IsTruncated = info.IsTruncated + reply.NextMarker = info.NextMarker + for _, obj := range info.Objects { + reply.Objects = append(reply.Objects, obj.Name) + } + return nil +} + +// HealObjectArgs - argument for HealObject RPC. +type HealObjectArgs struct { + Bucket string + Object string +} + +// HealObjectReply - reply by HealObject RPC. +type HealObjectReply struct{} + +// HealObject - heal the object. +func (h healHandler) HealObject(arg *HealObjectArgs, reply *HealObjectReply) error { + return h.ObjectAPI.HealObject(arg.Bucket, arg.Object) +} diff --git a/tree-walk-pool.go b/tree-walk-pool.go index 4fd7ffbcb..8bbde3cc0 100644 --- a/tree-walk-pool.go +++ b/tree-walk-pool.go @@ -33,6 +33,7 @@ type listParams struct { recursive bool marker string prefix string + heal bool } // errWalkAbort - returned by doTreeWalk() if it returns prematurely. diff --git a/xl-v1-healing.go b/xl-v1-healing.go index a5d413859..6fee67d4d 100644 --- a/xl-v1-healing.go +++ b/xl-v1-healing.go @@ -77,25 +77,6 @@ func listObjectModtimes(partsMetadata []xlMetaV1, errs []error) (modTimes []time return modTimes } -func (xl xlObjects) shouldHeal(onlineDisks []StorageAPI) (heal bool) { - onlineDiskCount := diskCount(onlineDisks) - // If online disks count is lesser than configured disks, most - // probably we need to heal the file, additionally verify if the - // count is lesser than readQuorum, if not we throw an error. - if onlineDiskCount < len(xl.storageDisks) { - // Online disks lesser than total storage disks, needs to be - // healed. unless we do not have readQuorum. - heal = true - // Verify if online disks count are lesser than readQuorum - // threshold, return an error. - if onlineDiskCount < xl.readQuorum { - errorIf(errXLReadQuorum, "Unable to establish read quorum, disks are offline.") - return false - } - } - return heal -} - // Returns slice of online disks needed. // - slice returing readable disks. // - modTime of the Object @@ -118,3 +99,50 @@ func listOnlineDisks(disks []StorageAPI, partsMetadata []xlMetaV1, errs []error) } return onlineDisks, modTime } + +// Return disks with the outdated or missing object. +func outDatedDisks(disks []StorageAPI, partsMetadata []xlMetaV1, errs []error) (outDatedDisks []StorageAPI) { + outDatedDisks = make([]StorageAPI, len(disks)) + latestDisks, _ := listOnlineDisks(disks, partsMetadata, errs) + for index, disk := range latestDisks { + if errs[index] == errFileNotFound { + outDatedDisks[index] = disks[index] + continue + } + if errs[index] != nil { + continue + } + if disk == nil { + outDatedDisks[index] = disks[index] + } + } + return outDatedDisks +} + +// Return xlMetaV1 of the latest version of the object. +func xlLatestMetadata(partsMetadata []xlMetaV1, errs []error) (latestMeta xlMetaV1) { + // List all the file commit ids from parts metadata. + modTimes := listObjectModtimes(partsMetadata, errs) + + // Reduce list of UUIDs to a single common value. + modTime := commonTime(modTimes) + + return pickValidXLMeta(partsMetadata, modTime) +} + +// Returns if the object should be healed. +func xlShouldHeal(partsMetadata []xlMetaV1, errs []error) bool { + modTime := commonTime(listObjectModtimes(partsMetadata, errs)) + for index := range partsMetadata { + if errs[index] == errFileNotFound { + return true + } + if errs[index] != nil { + continue + } + if modTime != partsMetadata[index].Stat.ModTime { + return true + } + } + return false +} diff --git a/xl-v1-list-objects-heal.go b/xl-v1-list-objects-heal.go new file mode 100644 index 000000000..f9de4ed61 --- /dev/null +++ b/xl-v1-list-objects-heal.go @@ -0,0 +1,206 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "path" + "sort" + "strings" +) + +func listDirHealFactory(disks ...StorageAPI) listDirFunc { + // Returns sorted merged entries from all the disks. + listDir := func(bucket, prefixDir, prefixEntry string) (mergedentries []string, delayIsLeaf bool, err error) { + for _, disk := range disks { + var entries []string + var newEntries []string + entries, err = disk.ListDir(bucket, prefixDir) + if err != nil { + // Skip the disk of listDir returns error. + continue + } + + for i, entry := range entries { + if strings.HasSuffix(entry, slashSeparator) { + if _, err = disk.StatFile(bucket, path.Join(prefixDir, entry, xlMetaJSONFile)); err == nil { + // If it is an object trim the trailing "/" + entries[i] = strings.TrimSuffix(entry, slashSeparator) + } + } + } + + if len(mergedentries) == 0 { + // For the first successful disk.ListDir() + mergedentries = entries + sort.Strings(mergedentries) + continue + } + + // find elements in entries which are not in mergedentries + for _, entry := range entries { + idx := sort.SearchStrings(mergedentries, entry) + if mergedentries[idx] == entry { + continue + } + newEntries = append(newEntries, entry) + } + + if len(newEntries) > 0 { + // Merge the entries and sort it. + mergedentries = append(mergedentries, newEntries...) + sort.Strings(mergedentries) + } + } + return mergedentries, false, nil + } + return listDir +} + +// listObjectsHeal - wrapper function implemented over file tree walk. +func (xl xlObjects) listObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { + // Default is recursive, if delimiter is set then list non recursive. + recursive := true + if delimiter == slashSeparator { + recursive = false + } + + // "heal" true for listObjectsHeal() and false for listObjects() + heal := true + walkResultCh, endWalkCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix, heal}) + if walkResultCh == nil { + endWalkCh = make(chan struct{}) + listDir := listDirHealFactory(xl.storageDisks...) + walkResultCh = startTreeWalk(bucket, prefix, marker, recursive, listDir, nil, endWalkCh) + } + + var objInfos []ObjectInfo + var eof bool + var nextMarker string + for i := 0; i < maxKeys; { + walkResult, ok := <-walkResultCh + if !ok { + // Closed channel. + eof = true + break + } + // For any walk error return right away. + if walkResult.err != nil { + // File not found is a valid case. + if walkResult.err == errFileNotFound { + return ListObjectsInfo{}, nil + } + return ListObjectsInfo{}, toObjectErr(walkResult.err, bucket, prefix) + } + entry := walkResult.entry + var objInfo ObjectInfo + if strings.HasSuffix(entry, slashSeparator) { + // Object name needs to be full path. + objInfo.Bucket = bucket + objInfo.Name = entry + objInfo.IsDir = true + } else { + objInfo.Bucket = bucket + objInfo.Name = entry + } + nextMarker = objInfo.Name + objInfos = append(objInfos, objInfo) + i++ + if walkResult.end == true { + eof = true + break + } + } + + params := listParams{bucket, recursive, nextMarker, prefix, heal} + if !eof { + xl.listPool.Set(params, walkResultCh, endWalkCh) + } + + result := ListObjectsInfo{IsTruncated: !eof} + for _, objInfo := range objInfos { + result.NextMarker = objInfo.Name + if objInfo.IsDir { + result.Prefixes = append(result.Prefixes, objInfo.Name) + continue + } + result.Objects = append(result.Objects, ObjectInfo{ + Name: objInfo.Name, + ModTime: objInfo.ModTime, + Size: objInfo.Size, + IsDir: false, + }) + } + return result, nil +} + +// ListObjects - list all objects at prefix, delimited by '/'. +func (xl xlObjects) ListObjectsHeal(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { + // Verify if bucket is valid. + if !IsValidBucketName(bucket) { + return ListObjectsInfo{}, BucketNameInvalid{Bucket: bucket} + } + // Verify if bucket exists. + if !xl.isBucketExist(bucket) { + return ListObjectsInfo{}, BucketNotFound{Bucket: bucket} + } + if !IsValidObjectPrefix(prefix) { + return ListObjectsInfo{}, ObjectNameInvalid{Bucket: bucket, Object: prefix} + } + // Verify if delimiter is anything other than '/', which we do not support. + if delimiter != "" && delimiter != slashSeparator { + return ListObjectsInfo{}, UnsupportedDelimiter{ + Delimiter: delimiter, + } + } + // Verify if marker has prefix. + if marker != "" { + if !strings.HasPrefix(marker, prefix) { + return ListObjectsInfo{}, InvalidMarkerPrefixCombination{ + Marker: marker, + Prefix: prefix, + } + } + } + + // With max keys of zero we have reached eof, return right here. + if maxKeys == 0 { + return ListObjectsInfo{}, nil + } + + // For delimiter and prefix as '/' we do not list anything at all + // since according to s3 spec we stop at the 'delimiter' along + // with the prefix. On a flat namespace with 'prefix' as '/' + // we don't have any entries, since all the keys are of form 'keyName/...' + if delimiter == slashSeparator && prefix == slashSeparator { + return ListObjectsInfo{}, nil + } + + // Over flowing count - reset to maxObjectList. + if maxKeys < 0 || maxKeys > maxObjectList { + maxKeys = maxObjectList + } + + // Initiate a list operation, if successful filter and return quickly. + listObjInfo, err := xl.listObjectsHeal(bucket, prefix, marker, delimiter, maxKeys) + if err == nil { + // We got the entries successfully return. + return listObjInfo, nil + } + + // Return error at the end. + return ListObjectsInfo{}, toObjectErr(err, bucket, prefix) +} diff --git a/xl-v1-list-objects.go b/xl-v1-list-objects.go index fc49a7e3c..27867a8ff 100644 --- a/xl-v1-list-objects.go +++ b/xl-v1-list-objects.go @@ -26,7 +26,8 @@ func (xl xlObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey recursive = false } - walkResultCh, endWalkCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix}) + heal := false // true only for xl.ListObjectsHeal + walkResultCh, endWalkCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix, heal}) if walkResultCh == nil { endWalkCh = make(chan struct{}) isLeaf := xl.isObject @@ -81,7 +82,7 @@ func (xl xlObjects) listObjects(bucket, prefix, marker, delimiter string, maxKey } } - params := listParams{bucket, recursive, nextMarker, prefix} + params := listParams{bucket, recursive, nextMarker, prefix, heal} if !eof { xl.listPool.Set(params, walkResultCh, endWalkCh) } diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index ec066dbc4..8fbdc92a4 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -85,9 +85,10 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark } var walkerCh chan treeWalkResult var walkerDoneCh chan struct{} + heal := false // true only for xl.ListObjectsHeal // Validate if we need to list further depending on maxUploads. if maxUploads > 0 { - walkerCh, walkerDoneCh = xl.listPool.Release(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath}) + walkerCh, walkerDoneCh = xl.listPool.Release(listParams{minioMetaBucket, recursive, multipartMarkerPath, multipartPrefixPath, heal}) if walkerCh == nil { walkerDoneCh = make(chan struct{}) isLeaf := xl.isMultipartUpload @@ -179,7 +180,7 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark if !eof { // Save the go-routine state in the pool so that it can continue from where it left off on // the next request. - xl.listPool.Set(listParams{bucket, recursive, result.NextKeyMarker, prefix}, walkerCh, walkerDoneCh) + xl.listPool.Set(listParams{bucket, recursive, result.NextKeyMarker, prefix, heal}, walkerCh, walkerDoneCh) } result.IsTruncated = !eof diff --git a/xl-v1-object.go b/xl-v1-object.go index fa831ca4e..dbffda23c 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -213,6 +213,134 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i return nil } +// HealObject - heal the object. +// FIXME: If an object object was deleted and one disk was down, and later the disk comes back +// up again, heal on the object should delete it. +func (xl xlObjects) HealObject(bucket, object string) error { + // Verify if bucket is valid. + if !IsValidBucketName(bucket) { + return BucketNameInvalid{Bucket: bucket} + } + // Verify if object is valid. + if !IsValidObjectName(object) { + // FIXME: return Invalid prefix. + return ObjectNameInvalid{Bucket: bucket, Object: object} + } + + // Lock the object before healing. + nsMutex.RLock(bucket, object) + defer nsMutex.RUnlock(bucket, object) + + partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, object) + if err := reduceErrs(errs, nil); err != nil { + return toObjectErr(err, bucket, object) + } + + if !xlShouldHeal(partsMetadata, errs) { + // There is nothing to heal. + return nil + } + + // List of disks having latest version of the object. + latestDisks, modTime := listOnlineDisks(xl.storageDisks, partsMetadata, errs) + // List of disks having outdated version of the object or missing object. + outDatedDisks := outDatedDisks(xl.storageDisks, partsMetadata, errs) + // Latest xlMetaV1 for reference. + latestMeta := pickValidXLMeta(partsMetadata, modTime) + + for index, disk := range outDatedDisks { + // Before healing outdated disks, we need to remove xl.json + // and part files from "bucket/object/" so that + // rename(".minio.sys", "tmp/tmpuuid/", "bucket", "object/") succeeds. + if disk == nil { + // Not an outdated disk. + continue + } + if errs[index] != nil { + // If there was an error (most likely errFileNotFound) + continue + } + // Outdated object with the same name exists that needs to be deleted. + outDatedMeta := partsMetadata[index] + // Delete all the parts. + for partIndex := 0; partIndex < len(outDatedMeta.Parts); partIndex++ { + err := disk.DeleteFile(bucket, + pathJoin(object, outDatedMeta.Parts[partIndex].Name)) + if err != nil { + return err + } + } + // Delete xl.json file. + err := disk.DeleteFile(bucket, pathJoin(object, xlMetaJSONFile)) + if err != nil { + return err + } + } + + // Reorder so that we have data disks first and parity disks next. + latestDisks = getOrderedDisks(latestMeta.Erasure.Distribution, latestDisks) + outDatedDisks = getOrderedDisks(latestMeta.Erasure.Distribution, outDatedDisks) + partsMetadata = getOrderedPartsMetadata(latestMeta.Erasure.Distribution, partsMetadata) + + // We write at temporary location and then rename to fianal location. + tmpID := getUUID() + + // Checksum of the part files. checkSumInfos[index] will contain checksums of all the part files + // in the outDatedDisks[index] + checkSumInfos := make([][]checkSumInfo, len(outDatedDisks)) + + // Heal each part. erasureHealFile() will write the healed part to + // .minio/tmp/uuid/ which needs to be renamed later to the final location. + for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ { + partName := latestMeta.Parts[partIndex].Name + partSize := latestMeta.Parts[partIndex].Size + erasure := latestMeta.Erasure + sumInfo, err := latestMeta.Erasure.GetCheckSumInfo(partName) + if err != nil { + return err + } + // Heal the part file. + checkSums, err := erasureHealFile(latestDisks, outDatedDisks, + bucket, pathJoin(object, partName), + minioMetaBucket, pathJoin(tmpMetaPrefix, tmpID, partName), + partSize, erasure.BlockSize, erasure.DataBlocks, erasure.ParityBlocks, sumInfo.Algorithm) + if err != nil { + return err + } + for index, sum := range checkSums { + if outDatedDisks[index] == nil { + continue + } + checkSumInfos[index] = append(checkSumInfos[index], checkSumInfo{partName, sumInfo.Algorithm, sum}) + } + } + + // xl.json should be written to all the healed disks. + for index, disk := range outDatedDisks { + if disk == nil { + continue + } + partsMetadata[index] = latestMeta + partsMetadata[index].Erasure.Checksum = checkSumInfos[index] + } + err := writeUniqueXLMetadata(outDatedDisks, minioMetaBucket, pathJoin(tmpMetaPrefix, tmpID), partsMetadata, diskCount(outDatedDisks)) + if err != nil { + return toObjectErr(err, bucket, object) + } + + // Rename from tmp location to the actual location. + for _, disk := range outDatedDisks { + if disk == nil { + continue + } + err := disk.RenameFile(minioMetaBucket, retainSlash(pathJoin(tmpMetaPrefix, tmpID)), bucket, retainSlash(object)) + if err != nil { + return err + } + } + return nil +} + // GetObjectInfo - reads object metadata and replies back ObjectInfo. func (xl xlObjects) GetObjectInfo(bucket, object string) (ObjectInfo, error) { // Verify if bucket is valid.