Merge pull request #752 from harshavardhana/pr_out_handle_removal_of_disks_getobject_now_reads_if_disks_are_missing_underneath_add_initial_stub_healing_code

Handle removal of disks - getObject() now reads if disks are missing underneath, add initial stub healing code
This commit is contained in:
Harshavardhana 2015-07-15 01:54:41 +00:00
commit 2e5e85d8ad
4 changed files with 62 additions and 36 deletions

View File

@ -86,14 +86,13 @@ func newBucket(bucketName, aclType, donutName string, nodes map[string]node) (bu
func (b bucket) getBucketName() string {
return b.name
}
func (b bucket) getBucketMetadataReaders() ([]io.ReadCloser, error) {
var readers []io.ReadCloser
func (b bucket) getBucketMetadataReaders() (map[int]io.ReadCloser, error) {
readers := make(map[int]io.ReadCloser)
for _, node := range b.nodes {
disks, err := node.ListDisks()
if err != nil {
return nil, iodine.New(err, nil)
}
readers = make([]io.ReadCloser, len(disks))
for order, disk := range disks {
bucketMetaDataReader, err := disk.OpenFile(filepath.Join(b.donutName, bucketMetadataConfig))
if err != nil {
@ -500,7 +499,7 @@ func (b bucket) readObjectData(objectName string, writer *io.PipeWriter, objMeta
}
// decodeEncodedData -
func (b bucket) decodeEncodedData(totalLeft, blockSize int64, readers []io.ReadCloser, encoder encoder, writer *io.PipeWriter) ([]byte, error) {
func (b bucket) decodeEncodedData(totalLeft, blockSize int64, readers map[int]io.ReadCloser, encoder encoder, writer *io.PipeWriter) ([]byte, error) {
var curBlockSize int64
if blockSize < totalLeft {
curBlockSize = blockSize
@ -511,7 +510,7 @@ func (b bucket) decodeEncodedData(totalLeft, blockSize int64, readers []io.ReadC
if err != nil {
return nil, iodine.New(err, nil)
}
encodedBytes := make([][]byte, len(readers))
encodedBytes := make([][]byte, encoder.k+encoder.m)
for i, reader := range readers {
var bytesBuffer bytes.Buffer
_, err := io.CopyN(&bytesBuffer, reader, int64(curChunkSize))
@ -528,15 +527,14 @@ func (b bucket) decodeEncodedData(totalLeft, blockSize int64, readers []io.ReadC
}
// getObjectReaders -
func (b bucket) getObjectReaders(objectName, objectMeta string) ([]io.ReadCloser, error) {
var readers []io.ReadCloser
func (b bucket) getObjectReaders(objectName, objectMeta string) (map[int]io.ReadCloser, error) {
readers := make(map[int]io.ReadCloser)
nodeSlice := 0
for _, node := range b.nodes {
disks, err := node.ListDisks()
if err != nil {
return nil, iodine.New(err, nil)
}
readers = make([]io.ReadCloser, len(disks))
for order, disk := range disks {
bucketSlice := fmt.Sprintf("%s$%d$%d", b.name, nodeSlice, order)
objectPath := filepath.Join(b.donutName, bucketSlice, objectName, objectMeta)

View File

@ -25,6 +25,7 @@ import (
"strconv"
"strings"
"github.com/minio/minio/pkg/donut/disk"
"github.com/minio/minio/pkg/iodine"
)
@ -229,14 +230,13 @@ func (donut API) getBucketMetadataWriters() ([]io.WriteCloser, error) {
}
// getBucketMetadataReaders -
func (donut API) getBucketMetadataReaders() ([]io.ReadCloser, error) {
var readers []io.ReadCloser
func (donut API) getBucketMetadataReaders() (map[int]io.ReadCloser, error) {
readers := make(map[int]io.ReadCloser)
for _, node := range donut.nodes {
disks, err := node.ListDisks()
if err != nil {
return nil, iodine.New(err, nil)
}
readers = make([]io.ReadCloser, len(disks))
for order, d := range disks {
bucketMetaDataReader, err := d.OpenFile(filepath.Join(donut.config.DonutName, bucketMetadataConfig))
if err != nil {
@ -339,13 +339,22 @@ func (donut API) makeDonutBucket(bucketName, acl string) error {
// listDonutBuckets -
func (donut API) listDonutBuckets() error {
var disks map[int]disk.Disk
var err error
for _, node := range donut.nodes {
disks, err := node.ListDisks()
disks, err = node.ListDisks()
if err != nil {
return iodine.New(err, nil)
}
}
var dirs []os.FileInfo
for _, disk := range disks {
dirs, err := disk.ListDir(donut.config.DonutName)
dirs, err = disk.ListDir(donut.config.DonutName)
if err == nil {
break
}
}
// if all disks are missing then return error
if err != nil {
return iodine.New(err, nil)
}
@ -362,7 +371,5 @@ func (donut API) listDonutBuckets() error {
}
donut.buckets[bucketName] = bucket
}
}
}
return nil
}

26
pkg/donut/heal.go Normal file
View File

@ -0,0 +1,26 @@
package donut
import (
"fmt"
"github.com/minio/minio/pkg/iodine"
)
// Heal heal an existing donut
func (donut API) Heal() error {
missingDisks := make(map[int]struct{})
for _, node := range donut.nodes {
disks, err := node.ListDisks()
if err != nil {
return iodine.New(err, nil)
}
for i, disk := range disks {
dirs, err := disk.ListDir(donut.config.DonutName)
if err != nil {
missingDisks[i] = struct{}{}
}
fmt.Println(dirs)
}
}
return nil
}

View File

@ -24,11 +24,6 @@ import (
"github.com/minio/minio/pkg/iodine"
)
// Heal - heal a donut and fix bad data blocks
func (donut API) Heal() error {
return iodine.New(NotImplemented{Function: "Heal"}, nil)
}
// Info - return info about donut configuration
func (donut API) Info() (nodeDiskMap map[string][]string, err error) {
nodeDiskMap = make(map[string][]string)
@ -59,7 +54,7 @@ func (donut API) AttachNode(hostname string, disks []string) error {
for i, d := range disks {
newDisk, err := disk.New(d)
if err != nil {
return iodine.New(err, nil)
continue
}
if err := newDisk.MakeDir(donut.config.DonutName); err != nil {
return iodine.New(err, nil)