mirror of
https://github.com/minio/minio.git
synced 2024-12-24 22:25:54 -05:00
xl: Fix ReadFile to keep the order always for reading the data back. (#1339)
Also fixes a stackoverflow bug in namespace locking.
This commit is contained in:
parent
c7bf471c9e
commit
141a44bfbf
@ -297,6 +297,9 @@ func (o objectAPI) NewMultipartUpload(bucket, object string) (string, *probe.Err
|
||||
// uploadIDPath doesn't exist, so create empty file to reserve the name
|
||||
var w io.WriteCloser
|
||||
if w, e = o.storage.CreateFile(minioMetaVolume, uploadIDPath); e == nil {
|
||||
// Just write some data for erasure code, rather than zero bytes.
|
||||
w.Write([]byte(uploadID))
|
||||
// Close the writer.
|
||||
if e = w.Close(); e != nil {
|
||||
return "", probe.NewError(e)
|
||||
}
|
||||
|
@ -55,6 +55,9 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) {
|
||||
var sha512Writers = make([]hash.Hash, len(xl.storageDisks))
|
||||
var metadataWriters = make([]io.WriteCloser, len(xl.storageDisks))
|
||||
|
||||
// Save additional erasureMetadata.
|
||||
modTime := time.Now().UTC()
|
||||
|
||||
// Initialize storage disks, get all the writers and corresponding
|
||||
// metadata writers.
|
||||
for index, disk := range xl.storageDisks {
|
||||
@ -131,9 +134,6 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) {
|
||||
}
|
||||
}
|
||||
|
||||
// Save additional erasureMetadata.
|
||||
modTime := time.Now().UTC()
|
||||
|
||||
// Initialize metadata map, save all erasure related metadata.
|
||||
metadata := make(map[string]string)
|
||||
metadata["version"] = minioVersion
|
||||
|
@ -27,39 +27,38 @@ type nameSpaceParam struct {
|
||||
// nameSpaceLock - provides primitives for locking critical namespace regions.
|
||||
type nameSpaceLock struct {
|
||||
rwMutex *sync.RWMutex
|
||||
rcount uint
|
||||
wcount uint
|
||||
count uint
|
||||
}
|
||||
|
||||
func (nsLock *nameSpaceLock) InUse() bool {
|
||||
return nsLock.rcount != 0 || nsLock.wcount != 0
|
||||
return nsLock.count != 0
|
||||
}
|
||||
|
||||
// Lock acquires write lock and increments the namespace counter.
|
||||
func (nsLock *nameSpaceLock) Lock() {
|
||||
nsLock.Lock()
|
||||
nsLock.wcount++
|
||||
nsLock.rwMutex.Lock()
|
||||
nsLock.count++
|
||||
}
|
||||
|
||||
// Unlock releases write lock and decrements the namespace counter.
|
||||
func (nsLock *nameSpaceLock) Unlock() {
|
||||
nsLock.Unlock()
|
||||
if nsLock.wcount != 0 {
|
||||
nsLock.wcount--
|
||||
nsLock.rwMutex.Unlock()
|
||||
if nsLock.count != 0 {
|
||||
nsLock.count--
|
||||
}
|
||||
}
|
||||
|
||||
// RLock acquires read lock and increments the namespace counter.
|
||||
func (nsLock *nameSpaceLock) RLock() {
|
||||
nsLock.RLock()
|
||||
nsLock.rcount++
|
||||
nsLock.rwMutex.RLock()
|
||||
nsLock.count++
|
||||
}
|
||||
|
||||
// RUnlock release read lock and decrements the namespace counter.
|
||||
func (nsLock *nameSpaceLock) RUnlock() {
|
||||
nsLock.RUnlock()
|
||||
if nsLock.rcount != 0 {
|
||||
nsLock.rcount--
|
||||
nsLock.rwMutex.RUnlock()
|
||||
if nsLock.count != 0 {
|
||||
nsLock.count--
|
||||
}
|
||||
}
|
||||
|
||||
@ -67,7 +66,6 @@ func (nsLock *nameSpaceLock) RUnlock() {
|
||||
func newNSLock() *nameSpaceLock {
|
||||
return &nameSpaceLock{
|
||||
rwMutex: &sync.RWMutex{},
|
||||
rcount: 0,
|
||||
wcount: 0,
|
||||
count: 0,
|
||||
}
|
||||
}
|
||||
|
@ -44,38 +44,37 @@ func getEncodedBlockLen(inputLen, dataBlocks int) (curBlockSize int) {
|
||||
return
|
||||
}
|
||||
|
||||
func (xl XL) getMetaDataFileVersions(volume, path string) (diskVersionMap map[StorageAPI]int64) {
|
||||
func (xl XL) getMetaFileVersionMap(volume, path string) (diskFileVersionMap map[int]int64) {
|
||||
metadataFilePath := slashpath.Join(path, metadataFile)
|
||||
// set offset to 0 to read entire file
|
||||
// Set offset to 0 to read entire file.
|
||||
offset := int64(0)
|
||||
metadata := make(map[string]string)
|
||||
|
||||
// read meta data from all disks
|
||||
for _, disk := range xl.storageDisks {
|
||||
diskVersionMap[disk] = -1
|
||||
// Allocate disk index format map - do not use maps directly without allocating.
|
||||
diskFileVersionMap = make(map[int]int64)
|
||||
|
||||
if metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset); err != nil {
|
||||
// error reading meta data file
|
||||
// TODO: log it
|
||||
// TODO - all errors should be logged here.
|
||||
|
||||
// Read meta data from all disks
|
||||
for index, disk := range xl.storageDisks {
|
||||
diskFileVersionMap[index] = -1
|
||||
|
||||
metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset)
|
||||
if err != nil {
|
||||
continue
|
||||
} else if err := json.NewDecoder(metadataReader).Decode(&metadata); err != nil {
|
||||
// error in parsing json
|
||||
// TODO: log it
|
||||
} else if err = json.NewDecoder(metadataReader).Decode(&metadata); err != nil {
|
||||
continue
|
||||
} else if _, ok := metadata["file.version"]; !ok {
|
||||
// missing "file.version" is completely valid
|
||||
diskVersionMap[disk] = 0
|
||||
continue
|
||||
} else if fileVersion, err := strconv.ParseInt(metadata["file.version"], 10, 64); err != nil {
|
||||
// version is not a number
|
||||
// TODO: log it
|
||||
continue
|
||||
} else {
|
||||
diskVersionMap[disk] = fileVersion
|
||||
diskFileVersionMap[index] = 0
|
||||
}
|
||||
// Convert string to integer.
|
||||
fileVersion, err := strconv.ParseInt(metadata["file.version"], 10, 64)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
diskFileVersionMap[index] = fileVersion
|
||||
}
|
||||
|
||||
return
|
||||
return diskFileVersionMap
|
||||
}
|
||||
|
||||
type quorumDisk struct {
|
||||
@ -83,24 +82,28 @@ type quorumDisk struct {
|
||||
index int
|
||||
}
|
||||
|
||||
// getReadFileQuorumDisks - get the current quorum disks.
|
||||
func (xl XL) getReadFileQuorumDisks(volume, path string) (quorumDisks []quorumDisk) {
|
||||
diskVersionMap := xl.getMetaDataFileVersions(volume, path)
|
||||
diskVersionMap := xl.getMetaFileVersionMap(volume, path)
|
||||
higherVersion := int64(0)
|
||||
i := 0
|
||||
for disk, version := range diskVersionMap {
|
||||
if version > higherVersion {
|
||||
higherVersion = version
|
||||
quorumDisks = []quorumDisk{{disk, i}}
|
||||
} else if version == higherVersion {
|
||||
quorumDisks = append(quorumDisks, quorumDisk{disk, i})
|
||||
for diskIndex, formatVersion := range diskVersionMap {
|
||||
if formatVersion > higherVersion {
|
||||
higherVersion = formatVersion
|
||||
quorumDisks = []quorumDisk{{
|
||||
disk: xl.storageDisks[diskIndex],
|
||||
index: diskIndex,
|
||||
}}
|
||||
} else if formatVersion == higherVersion {
|
||||
quorumDisks = append(quorumDisks, quorumDisk{
|
||||
disk: xl.storageDisks[diskIndex],
|
||||
index: diskIndex,
|
||||
})
|
||||
}
|
||||
|
||||
i++
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// getFileSize - extract file size from metadata.
|
||||
func (xl XL) getFileSize(volume, path string, disk StorageAPI) (size int64, err error) {
|
||||
metadataFilePath := slashpath.Join(path, metadataFile)
|
||||
// set offset to 0 to read entire file
|
||||
@ -133,10 +136,10 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
|
||||
return nil, errInvalidArgument
|
||||
}
|
||||
|
||||
// Acquire a read lock. - TODO - disable this due to stack overflow bug.
|
||||
// readLock := true
|
||||
// xl.lockNS(volume, path, readLock)
|
||||
// defer xl.unlockNS(volume, path, readLock)
|
||||
// Acquire a read lock.
|
||||
readLock := true
|
||||
xl.lockNS(volume, path, readLock)
|
||||
defer xl.unlockNS(volume, path, readLock)
|
||||
|
||||
// Check read quorum.
|
||||
quorumDisks := xl.getReadFileQuorumDisks(volume, path)
|
||||
@ -151,26 +154,30 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
|
||||
}
|
||||
totalBlocks := xl.DataBlocks + xl.ParityBlocks // Total blocks.
|
||||
|
||||
readers := []io.ReadCloser{}
|
||||
readers := make([]io.ReadCloser, len(quorumDisks))
|
||||
readFileError := 0
|
||||
i := 0
|
||||
for _, quorumDisk := range quorumDisks {
|
||||
erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", quorumDisk.index))
|
||||
var erasuredPartReader io.ReadCloser
|
||||
if erasuredPartReader, err = quorumDisk.disk.ReadFile(volume, erasurePart, offset); err != nil {
|
||||
// we can safely allow ReadFile errors up to len(quorumDisks) - xl.readQuorum
|
||||
// We can safely allow ReadFile errors up to len(quorumDisks) - xl.readQuorum
|
||||
// otherwise return failure
|
||||
if readFileError < len(quorumDisks)-xl.readQuorum {
|
||||
// Set the reader to 'nil' to be able to reconstruct later.
|
||||
readers[quorumDisk.index] = nil
|
||||
readFileError++
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO: handle currently available io.Reader in readers variable
|
||||
return nil, err
|
||||
// Control reaches here we do not have quorum
|
||||
// anymore. Close all the readers.
|
||||
for _, reader := range readers {
|
||||
if reader != nil {
|
||||
reader.Close()
|
||||
}
|
||||
}
|
||||
return nil, errReadQuorum
|
||||
}
|
||||
|
||||
readers[i] = erasuredPartReader
|
||||
i++
|
||||
readers[quorumDisk.index] = erasuredPartReader
|
||||
}
|
||||
|
||||
// Initialize pipe.
|
||||
|
Loading…
Reference in New Issue
Block a user