Live multiple disk removal works properly

This commit is contained in:
Harshavardhana 2015-07-14 20:46:08 -07:00
parent 48bb69701e
commit e37c5315ec
2 changed files with 53 additions and 32 deletions

View File

@ -33,6 +33,7 @@ import (
"github.com/minio/minio/pkg/crypto/sha256"
"github.com/minio/minio/pkg/crypto/sha512"
"github.com/minio/minio/pkg/donut/disk"
"github.com/minio/minio/pkg/donut/split"
"github.com/minio/minio/pkg/iodine"
)
@ -88,23 +89,30 @@ func (b bucket) getBucketName() string {
}
func (b bucket) getBucketMetadataReaders() (map[int]io.ReadCloser, error) {
readers := make(map[int]io.ReadCloser)
var disks map[int]disk.Disk
var err error
for _, node := range b.nodes {
disks, err := node.ListDisks()
disks, err = node.ListDisks()
if err != nil {
return nil, iodine.New(err, nil)
}
}
var bucketMetaDataReader io.ReadCloser
for order, disk := range disks {
bucketMetaDataReader, err := disk.OpenFile(filepath.Join(b.donutName, bucketMetadataConfig))
bucketMetaDataReader, err = disk.OpenFile(filepath.Join(b.donutName, bucketMetadataConfig))
if err != nil {
return nil, iodine.New(err, nil)
continue
}
readers[order] = bucketMetaDataReader
}
if err != nil {
return nil, iodine.New(err, nil)
}
return readers, nil
}
func (b bucket) getBucketMetadata() (*AllBuckets, error) {
var err error
metadata := new(AllBuckets)
readers, err := b.getBucketMetadataReaders()
if err != nil {
@ -115,12 +123,11 @@ func (b bucket) getBucketMetadata() (*AllBuckets, error) {
}
for _, reader := range readers {
jenc := json.NewDecoder(reader)
if err := jenc.Decode(metadata); err != nil {
return nil, iodine.New(err, nil)
}
if err = jenc.Decode(metadata); err == nil {
return metadata, nil
}
return nil, iodine.New(InvalidArgument{}, nil)
}
return nil, iodine.New(err, nil)
}
// GetObjectMetadata - get metadata for an object
@ -357,6 +364,7 @@ func (b bucket) readObjectMetadata(objectName string) (ObjectMetadata, error) {
if objectName == "" {
return ObjectMetadata{}, iodine.New(InvalidArgument{}, nil)
}
var err error
objMetadataReaders, err := b.getObjectReaders(objectName, objectMetadataConfig)
if err != nil {
return ObjectMetadata{}, iodine.New(err, nil)
@ -366,12 +374,11 @@ func (b bucket) readObjectMetadata(objectName string) (ObjectMetadata, error) {
}
for _, objMetadataReader := range objMetadataReaders {
jdec := json.NewDecoder(objMetadataReader)
if err := jdec.Decode(&objMetadata); err != nil {
return ObjectMetadata{}, iodine.New(err, nil)
}
break
}
if err = jdec.Decode(&objMetadata); err == nil {
return objMetadata, nil
}
}
return ObjectMetadata{}, iodine.New(err, nil)
}
// TODO - This a temporary normalization of objectNames, need to find a better way
@ -539,23 +546,28 @@ func (b bucket) decodeEncodedData(totalLeft, blockSize int64, readers map[int]io
// getObjectReaders -
func (b bucket) getObjectReaders(objectName, objectMeta string) (map[int]io.ReadCloser, error) {
readers := make(map[int]io.ReadCloser)
var disks map[int]disk.Disk
var err error
nodeSlice := 0
for _, node := range b.nodes {
disks, err := node.ListDisks()
disks, err = node.ListDisks()
if err != nil {
return nil, iodine.New(err, nil)
}
for order, disk := range disks {
var objectSlice io.ReadCloser
bucketSlice := fmt.Sprintf("%s$%d$%d", b.name, nodeSlice, order)
objectPath := filepath.Join(b.donutName, bucketSlice, objectName, objectMeta)
objectSlice, err := disk.OpenFile(objectPath)
if err != nil {
return nil, iodine.New(err, nil)
}
objectSlice, err = disk.OpenFile(objectPath)
if err == nil {
readers[order] = objectSlice
}
}
nodeSlice = nodeSlice + 1
}
if err != nil {
return nil, iodine.New(err, nil)
}
return readers, nil
}

View File

@ -99,6 +99,9 @@ func (donut API) listBuckets() (map[string]BucketMetadata, error) {
// to figure out between acceptable and unacceptable errors
return make(map[string]BucketMetadata), nil
}
if metadata == nil {
return make(map[string]BucketMetadata), nil
}
return metadata.Buckets, nil
}
@ -232,18 +235,24 @@ func (donut API) getBucketMetadataWriters() ([]io.WriteCloser, error) {
// getBucketMetadataReaders - readers are returned in map rather than slice
func (donut API) getBucketMetadataReaders() (map[int]io.ReadCloser, error) {
readers := make(map[int]io.ReadCloser)
var disks map[int]disk.Disk
var err error
for _, node := range donut.nodes {
disks, err := node.ListDisks()
disks, err = node.ListDisks()
if err != nil {
return nil, iodine.New(err, nil)
}
for order, d := range disks {
bucketMetaDataReader, err := d.OpenFile(filepath.Join(donut.config.DonutName, bucketMetadataConfig))
}
var bucketMetaDataReader io.ReadCloser
for order, dsk := range disks {
bucketMetaDataReader, err = dsk.OpenFile(filepath.Join(donut.config.DonutName, bucketMetadataConfig))
if err != nil {
return nil, iodine.New(err, nil)
continue
}
readers[order] = bucketMetaDataReader
}
if err != nil {
return nil, iodine.New(err, nil)
}
return readers, nil
}
@ -269,7 +278,8 @@ func (donut API) setDonutBucketMetadata(metadata *AllBuckets) error {
// getDonutBucketMetadata -
func (donut API) getDonutBucketMetadata() (*AllBuckets, error) {
metadata := new(AllBuckets)
metadata := &AllBuckets{}
var err error
readers, err := donut.getBucketMetadataReaders()
if err != nil {
return nil, iodine.New(err, nil)
@ -279,12 +289,11 @@ func (donut API) getDonutBucketMetadata() (*AllBuckets, error) {
}
for _, reader := range readers {
jenc := json.NewDecoder(reader)
if err := jenc.Decode(metadata); err != nil {
return nil, iodine.New(err, nil)
}
if err = jenc.Decode(metadata); err == nil {
return metadata, nil
}
return nil, iodine.New(InvalidArgument{}, nil)
}
return nil, iodine.New(err, nil)
}
// makeDonutBucket -