xl: add quorum support for read file and name space locking. (#1333)

This commit is contained in:
Bala FA
2016-04-19 22:53:20 +05:30
committed by Harshavardhana
parent a98a7fb1ad
commit ada0f82b9a
3 changed files with 239 additions and 25 deletions

View File

@@ -26,19 +26,12 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/klauspost/reedsolomon"
)
// XL layer structure.
type XL struct {
ReedSolomon reedsolomon.Encoder // Erasure encoder/decoder.
DataBlocks int
ParityBlocks int
storageDisks []StorageAPI
}
const (
// Part metadata file.
metadataFile = "part.json"
@@ -46,6 +39,55 @@ const (
maxErasureBlocks = 16
)
// XL layer structure.
type XL struct {
ReedSolomon reedsolomon.Encoder // Erasure encoder/decoder.
DataBlocks int
ParityBlocks int
storageDisks []StorageAPI
nameSpaceLockMap map[nameSpaceParam]nameSpaceLock
nameSpaceLockMapMutex *sync.Mutex
readQuorum int
writeQuorum int
}
func (xl XL) lockNameSpace(volume, path string, readOnly bool) {
xl.nameSpaceLockMapMutex.Lock()
defer xl.nameSpaceLockMapMutex.Unlock()
param := nameSpaceParam{volume, path}
nsLock, found := xl.nameSpaceLockMap[param]
if !found {
nsLock = newNameSpaceLock()
}
if readOnly {
nsLock.RLock()
} else {
nsLock.Lock()
}
xl.nameSpaceLockMap[param] = nsLock
}
func (xl XL) unlockNameSpace(volume, path string, readOnly bool) {
xl.nameSpaceLockMapMutex.Lock()
defer xl.nameSpaceLockMapMutex.Unlock()
param := nameSpaceParam{volume, path}
if nsLock, found := xl.nameSpaceLockMap[param]; found {
if readOnly {
nsLock.RUnlock()
} else {
nsLock.Unlock()
}
if nsLock.InUse() {
xl.nameSpaceLockMap[param] = nsLock
}
}
}
// newXL instantiate a new XL.
func newXL(disks ...string) (StorageAPI, error) {
// Initialize XL.
@@ -94,6 +136,14 @@ func newXL(disks ...string) (StorageAPI, error) {
// Save all the initialized storage disks.
xl.storageDisks = storageDisks
xl.nameSpaceLockMap = make(map[nameSpaceParam]nameSpaceLock)
xl.nameSpaceLockMapMutex = &sync.Mutex{}
xl.readQuorum = len(xl.storageDisks) / 2
xl.writeQuorum = xl.readQuorum + 3
if xl.writeQuorum > len(xl.storageDisks) {
xl.writeQuorum = len(xl.storageDisks)
}
// Return successfully initialized.
return xl, nil
}
@@ -187,6 +237,7 @@ type fileMetadata struct {
Block512Sum string
DataBlocks int
ParityBlocks int
fileVersion int64
}
// extractMetadata - extract file metadata.
@@ -221,6 +272,16 @@ func (xl XL) extractMetadata(volume, path string) (fileMetadata, error) {
return fileMetadata{}, err
}
// Verify if file.version is parsable.
var fileVersion int64
// missing file.version is valid
if _, ok := metadata["file.version"]; ok {
fileVersion, err = strconv.ParseInt(metadata["file.version"], 10, 64)
if err != nil {
return fileMetadata{}, err
}
}
// Verify if block size is parsable.
var blockSize int64
blockSize, err = strconv.ParseInt(metadata["file.xl.blockSize"], 10, 64)
@@ -254,6 +315,7 @@ func (xl XL) extractMetadata(volume, path string) (fileMetadata, error) {
Block512Sum: sha512Sum,
DataBlocks: dataBlocks,
ParityBlocks: parityBlocks,
fileVersion: fileVersion,
}, nil
}