mirror of
https://github.com/minio/minio.git
synced 2024-12-25 06:35:56 -05:00
Xl layer selfheal quorum2
* xl/selfheal: selfheal based on read quorum on GET * xl: getReadableDisks() also returns whether self-heal is needed so that this info can be used by ReadFile/SelfHeal/StatFile. * xl: trigger selfheal from StatFile.
This commit is contained in:
parent
9bd9441107
commit
becc814531
@ -17,54 +17,50 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
slashpath "path"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
func (xl XL) selfHeal(volume string, path string) error {
|
||||
totalBlocks := xl.DataBlocks + xl.ParityBlocks
|
||||
needsSelfHeal := make([]bool, totalBlocks)
|
||||
var metadata = make(map[string]string)
|
||||
var readers = make([]io.Reader, totalBlocks)
|
||||
var writers = make([]io.WriteCloser, totalBlocks)
|
||||
for index, disk := range xl.storageDisks {
|
||||
metadataFile := slashpath.Join(path, metadataFile)
|
||||
|
||||
// Start from the beginning, we are not reading partial metadata files.
|
||||
offset := int64(0)
|
||||
// Acquire a read lock.
|
||||
readLock := true
|
||||
xl.lockNS(volume, path, readLock)
|
||||
defer xl.unlockNS(volume, path, readLock)
|
||||
|
||||
metadataReader, err := disk.ReadFile(volume, metadataFile, offset)
|
||||
quorumDisks, metadata, doSelfHeal, err := xl.getReadableDisks(volume, path)
|
||||
if err != nil {
|
||||
if err != errFileNotFound {
|
||||
continue
|
||||
}
|
||||
// Needs healing if part.json is not found
|
||||
needsSelfHeal[index] = true
|
||||
continue
|
||||
}
|
||||
defer metadataReader.Close()
|
||||
|
||||
decoder := json.NewDecoder(metadataReader)
|
||||
if err = decoder.Decode(&metadata); err != nil {
|
||||
// needs healing if parts.json is not parsable
|
||||
needsSelfHeal[index] = true
|
||||
}
|
||||
|
||||
erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", index))
|
||||
erasuredPartReader, err := disk.ReadFile(volume, erasurePart, offset)
|
||||
if err != nil {
|
||||
if err == errFileNotFound {
|
||||
// Needs healing if part file not found
|
||||
needsSelfHeal[index] = true
|
||||
}
|
||||
return err
|
||||
}
|
||||
readers[index] = erasuredPartReader
|
||||
defer erasuredPartReader.Close()
|
||||
if !doSelfHeal {
|
||||
return nil
|
||||
}
|
||||
|
||||
size, err := metadata.GetSize()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for index, disk := range quorumDisks {
|
||||
if disk == nil {
|
||||
needsSelfHeal[index] = true
|
||||
continue
|
||||
}
|
||||
erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", index))
|
||||
// If disk.ReadFile returns error and we don't have read quorum it will be taken care as
|
||||
// ReedSolomon.Reconstruct() will fail later.
|
||||
var reader io.ReadCloser
|
||||
offset := int64(0)
|
||||
if reader, err = xl.storageDisks[index].ReadFile(volume, erasurePart, offset); err == nil {
|
||||
readers[index] = reader
|
||||
defer reader.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// Check if there is atleast one part that needs to be healed.
|
||||
@ -85,7 +81,6 @@ func (xl XL) selfHeal(volume string, path string) error {
|
||||
if !shNeeded {
|
||||
continue
|
||||
}
|
||||
var err error
|
||||
erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", index))
|
||||
writers[index], err = xl.storageDisks[index].CreateFile(volume, erasurePart)
|
||||
if err != nil {
|
||||
@ -94,11 +89,6 @@ func (xl XL) selfHeal(volume string, path string) error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
size, err := strconv.ParseInt(metadata["file.size"], 10, 64)
|
||||
if err != nil {
|
||||
closeAndRemoveWriters(writers...)
|
||||
return err
|
||||
}
|
||||
var totalLeft = size
|
||||
for totalLeft > 0 {
|
||||
// Figure out the right blockSize.
|
||||
@ -188,69 +178,12 @@ func (xl XL) selfHeal(volume string, path string) error {
|
||||
writers[index].Close()
|
||||
}
|
||||
|
||||
// Write part.json where ever healing was done.
|
||||
var metadataWriters = make([]io.WriteCloser, len(xl.storageDisks))
|
||||
// Update the quorum metadata after selfheal.
|
||||
errs := xl.setPartsMetadata(volume, path, metadata, needsSelfHeal)
|
||||
for index, shNeeded := range needsSelfHeal {
|
||||
if !shNeeded {
|
||||
continue
|
||||
if shNeeded && errs[index] != nil {
|
||||
return errs[index]
|
||||
}
|
||||
metadataFile := slashpath.Join(path, metadataFile)
|
||||
metadataWriters[index], err = xl.storageDisks[index].CreateFile(volume, metadataFile)
|
||||
if err != nil {
|
||||
closeAndRemoveWriters(writers...)
|
||||
return err
|
||||
}
|
||||
}
|
||||
metadataBytes, err := json.Marshal(metadata)
|
||||
if err != nil {
|
||||
closeAndRemoveWriters(metadataWriters...)
|
||||
return err
|
||||
}
|
||||
for index, shNeeded := range needsSelfHeal {
|
||||
if !shNeeded {
|
||||
continue
|
||||
}
|
||||
_, err = metadataWriters[index].Write(metadataBytes)
|
||||
if err != nil {
|
||||
closeAndRemoveWriters(metadataWriters...)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Metadata written for all the healed parts hence Close() so that
|
||||
// temp files can be committed.
|
||||
for index := range xl.storageDisks {
|
||||
if !needsSelfHeal[index] {
|
||||
continue
|
||||
}
|
||||
metadataWriters[index].Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// self heal.
|
||||
type selfHeal struct {
|
||||
volume string
|
||||
path string
|
||||
errCh chan<- error
|
||||
}
|
||||
|
||||
// selfHealRoutine - starts a go routine and listens on a channel for healing requests.
|
||||
func (xl *XL) selfHealRoutine() {
|
||||
xl.selfHealCh = make(chan selfHeal)
|
||||
|
||||
// Healing request can be made like this:
|
||||
// errCh := make(chan error)
|
||||
// xl.selfHealCh <- selfHeal{"testbucket", "testobject", errCh}
|
||||
// fmt.Println(<-errCh)
|
||||
go func() {
|
||||
for sh := range xl.selfHealCh {
|
||||
if sh.volume == "" || sh.path == "" {
|
||||
sh.errCh <- errors.New("volume or path can not be empty")
|
||||
continue
|
||||
}
|
||||
xl.selfHeal(sh.volume, sh.path)
|
||||
sh.errCh <- nil
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
@ -18,9 +18,15 @@ package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
// error type when key is not found.
|
||||
var errMetadataKeyNotExist = errors.New("Key not found in fileMetadata.")
|
||||
|
||||
// This code is built on similar ideas of http.Header.
|
||||
// Ref - https://golang.org/pkg/net/http/#Header
|
||||
|
||||
@ -66,6 +72,30 @@ func (f fileMetadata) Write(writer io.Writer) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Get file size.
|
||||
func (f fileMetadata) GetSize() (int64, error) {
|
||||
sizes := f.Get("file.size")
|
||||
if sizes == nil {
|
||||
return 0, errMetadataKeyNotExist
|
||||
}
|
||||
sizeStr := sizes[0]
|
||||
return strconv.ParseInt(sizeStr, 10, 64)
|
||||
}
|
||||
|
||||
// Set file size.
|
||||
func (f fileMetadata) SetSize(size int64) {
|
||||
f.Set("file.size", strconv.FormatInt(size, 10))
|
||||
}
|
||||
|
||||
// Get file Modification time.
|
||||
func (f fileMetadata) GetModTime() (time.Time, error) {
|
||||
timeStrs := f.Get("file.modTime")
|
||||
if timeStrs == nil {
|
||||
return time.Time{}, errMetadataKeyNotExist
|
||||
}
|
||||
return time.Parse(timeFormatAMZ, timeStrs[0])
|
||||
}
|
||||
|
||||
// fileMetadataDecode - file metadata decode.
|
||||
func fileMetadataDecode(reader io.Reader) (fileMetadata, error) {
|
||||
metadata := make(fileMetadata)
|
||||
|
@ -45,14 +45,24 @@ func getEncodedBlockLen(inputLen, dataBlocks int) (curBlockSize int) {
|
||||
|
||||
// Returns slice of disks needed for ReadFile operation:
|
||||
// - slice returing readable disks.
|
||||
// - file size
|
||||
// - fileMetadata
|
||||
// - bool value indicating if selfHeal is needed.
|
||||
// - error if any.
|
||||
func (xl XL) getReadableDisks(volume, path string) ([]StorageAPI, int64, error) {
|
||||
func (xl XL) getReadableDisks(volume, path string) ([]StorageAPI, fileMetadata, bool, error) {
|
||||
partsMetadata, errs := xl.getPartsMetadata(volume, path)
|
||||
highestVersion := int64(0)
|
||||
versions := make([]int64, len(xl.storageDisks))
|
||||
quorumDisks := make([]StorageAPI, len(xl.storageDisks))
|
||||
fileSize := int64(0)
|
||||
notFoundCount := 0
|
||||
// If quorum says errFileNotFound return errFileNotFound
|
||||
for _, err := range errs {
|
||||
if err == errFileNotFound {
|
||||
notFoundCount++
|
||||
}
|
||||
}
|
||||
if notFoundCount > xl.readQuorum {
|
||||
return nil, fileMetadata{}, false, errFileNotFound
|
||||
}
|
||||
for index, metadata := range partsMetadata {
|
||||
if errs[index] == nil {
|
||||
if version := metadata.Get("file.version"); version != nil {
|
||||
@ -60,7 +70,7 @@ func (xl XL) getReadableDisks(volume, path string) ([]StorageAPI, int64, error)
|
||||
version, err := strconv.ParseInt(version[0], 10, 64)
|
||||
if err != nil {
|
||||
// Unexpected, return error.
|
||||
return nil, 0, err
|
||||
return nil, fileMetadata{}, false, err
|
||||
}
|
||||
if version > highestVersion {
|
||||
highestVersion = version
|
||||
@ -73,7 +83,6 @@ func (xl XL) getReadableDisks(volume, path string) ([]StorageAPI, int64, error)
|
||||
versions[index] = -1
|
||||
}
|
||||
}
|
||||
|
||||
quorumCount := 0
|
||||
for index, version := range versions {
|
||||
if version == highestVersion {
|
||||
@ -84,25 +93,22 @@ func (xl XL) getReadableDisks(volume, path string) ([]StorageAPI, int64, error)
|
||||
}
|
||||
}
|
||||
if quorumCount < xl.readQuorum {
|
||||
return nil, 0, errReadQuorum
|
||||
return nil, fileMetadata{}, false, errReadQuorum
|
||||
}
|
||||
|
||||
var metadata fileMetadata
|
||||
for index, disk := range quorumDisks {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
if size := partsMetadata[index].Get("file.size"); size != nil {
|
||||
var err error
|
||||
fileSize, err = strconv.ParseInt(size[0], 10, 64)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
metadata = partsMetadata[index]
|
||||
break
|
||||
} else {
|
||||
return nil, 0, errFileSize
|
||||
}
|
||||
}
|
||||
return quorumDisks, fileSize, nil
|
||||
// FIXME: take care of the situation when a disk has failed and been removed
|
||||
// by looking at the error returned from the fs layer. fs-layer will have
|
||||
// to return an error indicating that the disk is not available and should be
|
||||
// different from ErrNotExist.
|
||||
doSelfHeal := quorumCount != len(xl.storageDisks)
|
||||
return quorumDisks, metadata, doSelfHeal, nil
|
||||
}
|
||||
|
||||
// ReadFile - read file
|
||||
@ -115,12 +121,24 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
|
||||
return nil, errInvalidArgument
|
||||
}
|
||||
|
||||
// Acquire a read lock.
|
||||
readLock := true
|
||||
xl.lockNS(volume, path, readLock)
|
||||
quorumDisks, metadata, doSelfHeal, err := xl.getReadableDisks(volume, path)
|
||||
xl.unlockNS(volume, path, readLock)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if doSelfHeal {
|
||||
if err = xl.selfHeal(volume, path); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
xl.lockNS(volume, path, readLock)
|
||||
defer xl.unlockNS(volume, path, readLock)
|
||||
|
||||
quorumDisks, fileSize, err := xl.getReadableDisks(volume, path)
|
||||
fileSize, err := metadata.GetSize()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
|
||||
// Get parts.json metadata as a map slice.
|
||||
// Returns error slice indicating the failed metadata reads.
|
||||
// Read lockNS() should be done by caller.
|
||||
func (xl XL) getPartsMetadata(volume, path string) ([]fileMetadata, []error) {
|
||||
errs := make([]error, len(xl.storageDisks))
|
||||
metadataArray := make([]fileMetadata, len(xl.storageDisks))
|
||||
@ -38,6 +39,7 @@ func (xl XL) getPartsMetadata(volume, path string) ([]fileMetadata, []error) {
|
||||
//
|
||||
// Returns collection of errors, indexed in accordance with input
|
||||
// updateParts order.
|
||||
// Write lockNS() should be done by caller.
|
||||
func (xl XL) setPartsMetadata(volume, path string, metadata fileMetadata, updateParts []bool) []error {
|
||||
metadataFilePath := filepath.Join(path, metadataFile)
|
||||
errs := make([]error, len(xl.storageDisks))
|
||||
|
62
xl-v1.go
62
xl-v1.go
@ -21,10 +21,8 @@ import (
|
||||
"os"
|
||||
slashpath "path"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/klauspost/reedsolomon"
|
||||
)
|
||||
@ -46,9 +44,6 @@ type XL struct {
|
||||
nameSpaceLockMapMutex *sync.Mutex
|
||||
readQuorum int
|
||||
writeQuorum int
|
||||
|
||||
// Heal input/output channel.
|
||||
selfHealCh chan selfHeal
|
||||
}
|
||||
|
||||
// lockNS - locks the given resource, using a previously allocated
|
||||
@ -155,9 +150,6 @@ func newXL(disks ...string) (StorageAPI, error) {
|
||||
xl.writeQuorum = len(xl.storageDisks)
|
||||
}
|
||||
|
||||
// Start self heal go routine, taking inputs over self heal channel.
|
||||
xl.selfHealRoutine()
|
||||
|
||||
// Return successfully initialized.
|
||||
return xl, nil
|
||||
}
|
||||
@ -300,23 +292,6 @@ func (xl XL) isLeafDirectory(volume, leafPath string) (isLeaf bool) {
|
||||
return isLeaf
|
||||
}
|
||||
|
||||
// Returns file size from the metadata.
|
||||
func getFileSize(metadata fileMetadata) (int64, error) {
|
||||
size := metadata.Get("file.size")
|
||||
if size == nil {
|
||||
return 0, errFileSize
|
||||
}
|
||||
return strconv.ParseInt(size[0], 10, 64)
|
||||
}
|
||||
|
||||
func getModTime(metadata fileMetadata) (time.Time, error) {
|
||||
modTime := metadata.Get("file.modTime")
|
||||
if modTime == nil {
|
||||
return time.Time{}, errModTime
|
||||
}
|
||||
return time.Parse(timeFormatAMZ, modTime[0])
|
||||
}
|
||||
|
||||
// extractMetadata - extract file metadata.
|
||||
func (xl XL) extractMetadata(volume, path string) (fileMetadata, error) {
|
||||
metadataFilePath := slashpath.Join(path, metadataFile)
|
||||
@ -348,11 +323,11 @@ func (xl XL) extractFileInfo(volume, path string) (FileInfo, error) {
|
||||
if err != nil {
|
||||
return FileInfo{}, err
|
||||
}
|
||||
fileSize, err := getFileSize(metadata)
|
||||
fileSize, err := metadata.GetSize()
|
||||
if err != nil {
|
||||
return FileInfo{}, err
|
||||
}
|
||||
fileModTime, err := getModTime(metadata)
|
||||
fileModTime, err := metadata.GetModTime()
|
||||
if err != nil {
|
||||
return FileInfo{}, err
|
||||
}
|
||||
@ -438,14 +413,39 @@ func (xl XL) StatFile(volume, path string) (FileInfo, error) {
|
||||
return FileInfo{}, errInvalidArgument
|
||||
}
|
||||
|
||||
// Extract metadata.
|
||||
fileInfo, err := xl.extractFileInfo(volume, path)
|
||||
// Acquire read lock.
|
||||
readLock := true
|
||||
xl.lockNS(volume, path, readLock)
|
||||
_, metadata, doSelfHeal, err := xl.getReadableDisks(volume, path)
|
||||
xl.unlockNS(volume, path, readLock)
|
||||
if err != nil {
|
||||
return FileInfo{}, err
|
||||
}
|
||||
|
||||
// Return fileinfo.
|
||||
return fileInfo, nil
|
||||
if doSelfHeal {
|
||||
if err = xl.selfHeal(volume, path); err != nil {
|
||||
return FileInfo{}, err
|
||||
}
|
||||
}
|
||||
|
||||
// Extract metadata.
|
||||
size, err := metadata.GetSize()
|
||||
if err != nil {
|
||||
return FileInfo{}, err
|
||||
}
|
||||
modTime, err := metadata.GetModTime()
|
||||
if err != nil {
|
||||
return FileInfo{}, err
|
||||
}
|
||||
|
||||
// Return file info.
|
||||
return FileInfo{
|
||||
Volume: volume,
|
||||
Name: path,
|
||||
Size: size,
|
||||
ModTime: modTime,
|
||||
Mode: os.FileMode(0644),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// DeleteFile - delete a file
|
||||
|
Loading…
Reference in New Issue
Block a user