Add initial version of heal, remove rebalance

This commit is contained in:
Harshavardhana 2015-07-14 22:40:04 -07:00
parent 6a64019659
commit 2553654e24
5 changed files with 90 additions and 82 deletions

View File

@ -67,6 +67,15 @@ func New(diskPath string) (Disk, error) {
map[string]string{"Type": strconv.FormatInt(int64(s.Type), 10)})
}
// IsUsable - is disk usable, alive
func (disk Disk) IsUsable() bool {
_, err := os.Stat(disk.path)
if err != nil {
return false
}
return true
}
// GetPath - get root disk path
func (disk Disk) GetPath() string {
return disk.path

View File

@ -221,8 +221,8 @@ func (donut API) getBucketMetadataWriters() ([]io.WriteCloser, error) {
return nil, iodine.New(err, nil)
}
writers = make([]io.WriteCloser, len(disks))
for order, dd := range disks {
bucketMetaDataWriter, err := dd.CreateFile(filepath.Join(donut.config.DonutName, bucketMetadataConfig))
for order, disk := range disks {
bucketMetaDataWriter, err := disk.CreateFile(filepath.Join(donut.config.DonutName, bucketMetadataConfig))
if err != nil {
return nil, iodine.New(err, nil)
}
@ -235,17 +235,20 @@ func (donut API) getBucketMetadataWriters() ([]io.WriteCloser, error) {
// getBucketMetadataReaders - readers are returned in map rather than slice
func (donut API) getBucketMetadataReaders() (map[int]io.ReadCloser, error) {
readers := make(map[int]io.ReadCloser)
var disks map[int]disk.Disk
disks := make(map[int]disk.Disk)
var err error
for _, node := range donut.nodes {
disks, err = node.ListDisks()
nDisks, err := node.ListDisks()
if err != nil {
return nil, iodine.New(err, nil)
}
for k, v := range nDisks {
disks[k] = v
}
}
var bucketMetaDataReader io.ReadCloser
for order, dsk := range disks {
bucketMetaDataReader, err = dsk.OpenFile(filepath.Join(donut.config.DonutName, bucketMetadataConfig))
for order, disk := range disks {
bucketMetaDataReader, err = disk.OpenFile(filepath.Join(donut.config.DonutName, bucketMetadataConfig))
if err != nil {
continue
}

View File

@ -1,33 +1,81 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package donut
import "github.com/minio/minio/pkg/iodine"
import (
"encoding/json"
"fmt"
"path/filepath"
type missingDisk struct {
nodeNumber int
sliceNumber int
bucketName string
}
"github.com/minio/minio/pkg/donut/disk"
"github.com/minio/minio/pkg/iodine"
)
// Heal heal an existing donut
func (donut API) Heal() error {
var missingDisks []missingDisk
nodeNumber := 0
if err := donut.listDonutBuckets(); err != nil {
return iodine.New(err, nil)
}
disks := make(map[int]disk.Disk)
for _, node := range donut.nodes {
disks, err := node.ListDisks()
nDisks, err := node.ListDisks()
if err != nil {
return iodine.New(err, nil)
}
for i, disk := range disks {
_, err := disk.ListDir(donut.config.DonutName)
if err == nil {
continue
}
missingDisk := missingDisk{
nodeNumber: nodeNumber,
sliceNumber: i,
}
missingDisks = append(missingDisks, missingDisk)
for k, v := range nDisks {
disks[k] = v
}
}
missingDisks := make(map[int]disk.Disk)
for order, disk := range disks {
if !disk.IsUsable() {
missingDisks[order] = disk
}
}
bucketMetadata, err := donut.getDonutBucketMetadata()
if err != nil {
return iodine.New(err, nil)
}
for _, disk := range missingDisks {
disk.MakeDir(donut.config.DonutName)
bucketMetadataWriter, err := disk.CreateFile(filepath.Join(donut.config.DonutName, bucketMetadataConfig))
if err != nil {
return iodine.New(err, nil)
}
defer bucketMetadataWriter.Close()
jenc := json.NewEncoder(bucketMetadataWriter)
if err := jenc.Encode(bucketMetadata); err != nil {
return iodine.New(err, nil)
}
}
for order, disk := range missingDisks {
for bucket := range bucketMetadata.Buckets {
bucketSlice := fmt.Sprintf("%s$0$%d", bucket, order) // TODO handle node slices
err := disk.MakeDir(filepath.Join(donut.config.DonutName, bucketSlice))
if err != nil {
return iodine.New(err, nil)
}
}
}
return nil
// TODO heal data
}

View File

@ -68,3 +68,8 @@ func (donut API) DetachNode(hostname string) error {
delete(donut.nodes, hostname)
return nil
}
// Rebalance - rebalance an existing donut with new disks and nodes
func (donut API) Rebalance() error {
return iodine.New(APINotImplemented{API: "management.Rebalance"}, nil)
}

View File

@ -1,57 +0,0 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package donut
import (
"fmt"
"os"
"strings"
"github.com/minio/minio/pkg/donut/disk"
"github.com/minio/minio/pkg/iodine"
)
// Rebalance -
func (donut API) Rebalance() error {
var totalOffSetLength int
var newDisks []disk.Disk
var existingDirs []os.FileInfo
for _, node := range donut.nodes {
disks, err := node.ListDisks()
if err != nil {
return iodine.New(err, nil)
}
totalOffSetLength = len(disks)
fmt.Println(totalOffSetLength)
for _, disk := range disks {
dirs, err := disk.ListDir(donut.config.DonutName)
if err != nil {
return iodine.New(err, nil)
}
if len(dirs) == 0 {
newDisks = append(newDisks, disk)
}
existingDirs = append(existingDirs, dirs...)
}
}
for _, dir := range existingDirs {
splits := strings.Split(dir.Name(), "$")
bucketName, segment, offset := splits[0], splits[1], splits[2]
fmt.Println(bucketName, segment, offset)
}
return nil
}