Handle removal of disks - getObject() now reads if disks are missing underneath, add initial stub healing code

This commit is contained in:
Harshavardhana
2015-07-14 18:50:29 -07:00
parent 74853caf0c
commit e885259584
4 changed files with 62 additions and 36 deletions

View File

@@ -86,14 +86,13 @@ func newBucket(bucketName, aclType, donutName string, nodes map[string]node) (bu
func (b bucket) getBucketName() string {
return b.name
}
func (b bucket) getBucketMetadataReaders() ([]io.ReadCloser, error) {
var readers []io.ReadCloser
func (b bucket) getBucketMetadataReaders() (map[int]io.ReadCloser, error) {
readers := make(map[int]io.ReadCloser)
for _, node := range b.nodes {
disks, err := node.ListDisks()
if err != nil {
return nil, iodine.New(err, nil)
}
readers = make([]io.ReadCloser, len(disks))
for order, disk := range disks {
bucketMetaDataReader, err := disk.OpenFile(filepath.Join(b.donutName, bucketMetadataConfig))
if err != nil {
@@ -500,7 +499,7 @@ func (b bucket) readObjectData(objectName string, writer *io.PipeWriter, objMeta
}
// decodeEncodedData -
func (b bucket) decodeEncodedData(totalLeft, blockSize int64, readers []io.ReadCloser, encoder encoder, writer *io.PipeWriter) ([]byte, error) {
func (b bucket) decodeEncodedData(totalLeft, blockSize int64, readers map[int]io.ReadCloser, encoder encoder, writer *io.PipeWriter) ([]byte, error) {
var curBlockSize int64
if blockSize < totalLeft {
curBlockSize = blockSize
@@ -511,7 +510,7 @@ func (b bucket) decodeEncodedData(totalLeft, blockSize int64, readers []io.ReadC
if err != nil {
return nil, iodine.New(err, nil)
}
encodedBytes := make([][]byte, len(readers))
encodedBytes := make([][]byte, encoder.k+encoder.m)
for i, reader := range readers {
var bytesBuffer bytes.Buffer
_, err := io.CopyN(&bytesBuffer, reader, int64(curChunkSize))
@@ -528,15 +527,14 @@ func (b bucket) decodeEncodedData(totalLeft, blockSize int64, readers []io.ReadC
}
// getObjectReaders -
func (b bucket) getObjectReaders(objectName, objectMeta string) ([]io.ReadCloser, error) {
var readers []io.ReadCloser
func (b bucket) getObjectReaders(objectName, objectMeta string) (map[int]io.ReadCloser, error) {
readers := make(map[int]io.ReadCloser)
nodeSlice := 0
for _, node := range b.nodes {
disks, err := node.ListDisks()
if err != nil {
return nil, iodine.New(err, nil)
}
readers = make([]io.ReadCloser, len(disks))
for order, disk := range disks {
bucketSlice := fmt.Sprintf("%s$%d$%d", b.name, nodeSlice, order)
objectPath := filepath.Join(b.donutName, bucketSlice, objectName, objectMeta)