diff --git a/object-api-multipart.go b/object-api-multipart.go index 2631e76e2..3c56693d5 100644 --- a/object-api-multipart.go +++ b/object-api-multipart.go @@ -297,6 +297,9 @@ func (o objectAPI) NewMultipartUpload(bucket, object string) (string, *probe.Err // uploadIDPath doesn't exist, so create empty file to reserve the name var w io.WriteCloser if w, e = o.storage.CreateFile(minioMetaVolume, uploadIDPath); e == nil { + // Just write some data for erasure code, rather than zero bytes. + w.Write([]byte(uploadID)) + // Close the writer. if e = w.Close(); e != nil { return "", probe.NewError(e) } diff --git a/xl-v1-createfile.go b/xl-v1-createfile.go index 6e73e1f74..d6b34a742 100644 --- a/xl-v1-createfile.go +++ b/xl-v1-createfile.go @@ -55,6 +55,9 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) { var sha512Writers = make([]hash.Hash, len(xl.storageDisks)) var metadataWriters = make([]io.WriteCloser, len(xl.storageDisks)) + // Save additional erasureMetadata. + modTime := time.Now().UTC() + // Initialize storage disks, get all the writers and corresponding // metadata writers. for index, disk := range xl.storageDisks { @@ -131,9 +134,6 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) { } } - // Save additional erasureMetadata. - modTime := time.Now().UTC() - // Initialize metadata map, save all erasure related metadata. metadata := make(map[string]string) metadata["version"] = minioVersion diff --git a/xl-v1-namespace.go b/xl-v1-namespace.go index 697c63d18..2f276af6b 100644 --- a/xl-v1-namespace.go +++ b/xl-v1-namespace.go @@ -27,39 +27,38 @@ type nameSpaceParam struct { // nameSpaceLock - provides primitives for locking critical namespace regions. type nameSpaceLock struct { rwMutex *sync.RWMutex - rcount uint - wcount uint + count uint } func (nsLock *nameSpaceLock) InUse() bool { - return nsLock.rcount != 0 || nsLock.wcount != 0 + return nsLock.count != 0 } // Lock acquires write lock and increments the namespace counter. func (nsLock *nameSpaceLock) Lock() { - nsLock.Lock() - nsLock.wcount++ + nsLock.rwMutex.Lock() + nsLock.count++ } // Unlock releases write lock and decrements the namespace counter. func (nsLock *nameSpaceLock) Unlock() { - nsLock.Unlock() - if nsLock.wcount != 0 { - nsLock.wcount-- + nsLock.rwMutex.Unlock() + if nsLock.count != 0 { + nsLock.count-- } } // RLock acquires read lock and increments the namespace counter. func (nsLock *nameSpaceLock) RLock() { - nsLock.RLock() - nsLock.rcount++ + nsLock.rwMutex.RLock() + nsLock.count++ } // RUnlock release read lock and decrements the namespace counter. func (nsLock *nameSpaceLock) RUnlock() { - nsLock.RUnlock() - if nsLock.rcount != 0 { - nsLock.rcount-- + nsLock.rwMutex.RUnlock() + if nsLock.count != 0 { + nsLock.count-- } } @@ -67,7 +66,6 @@ func (nsLock *nameSpaceLock) RUnlock() { func newNSLock() *nameSpaceLock { return &nameSpaceLock{ rwMutex: &sync.RWMutex{}, - rcount: 0, - wcount: 0, + count: 0, } } diff --git a/xl-v1-readfile.go b/xl-v1-readfile.go index c12ebe70c..c397cdb8a 100644 --- a/xl-v1-readfile.go +++ b/xl-v1-readfile.go @@ -44,38 +44,37 @@ func getEncodedBlockLen(inputLen, dataBlocks int) (curBlockSize int) { return } -func (xl XL) getMetaDataFileVersions(volume, path string) (diskVersionMap map[StorageAPI]int64) { +func (xl XL) getMetaFileVersionMap(volume, path string) (diskFileVersionMap map[int]int64) { metadataFilePath := slashpath.Join(path, metadataFile) - // set offset to 0 to read entire file + // Set offset to 0 to read entire file. offset := int64(0) metadata := make(map[string]string) - // read meta data from all disks - for _, disk := range xl.storageDisks { - diskVersionMap[disk] = -1 + // Allocate disk index format map - do not use maps directly without allocating. + diskFileVersionMap = make(map[int]int64) - if metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset); err != nil { - // error reading meta data file - // TODO: log it + // TODO - all errors should be logged here. + + // Read meta data from all disks + for index, disk := range xl.storageDisks { + diskFileVersionMap[index] = -1 + + metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset) + if err != nil { continue - } else if err := json.NewDecoder(metadataReader).Decode(&metadata); err != nil { - // error in parsing json - // TODO: log it + } else if err = json.NewDecoder(metadataReader).Decode(&metadata); err != nil { continue } else if _, ok := metadata["file.version"]; !ok { - // missing "file.version" is completely valid - diskVersionMap[disk] = 0 - continue - } else if fileVersion, err := strconv.ParseInt(metadata["file.version"], 10, 64); err != nil { - // version is not a number - // TODO: log it - continue - } else { - diskVersionMap[disk] = fileVersion + diskFileVersionMap[index] = 0 } + // Convert string to integer. + fileVersion, err := strconv.ParseInt(metadata["file.version"], 10, 64) + if err != nil { + continue + } + diskFileVersionMap[index] = fileVersion } - - return + return diskFileVersionMap } type quorumDisk struct { @@ -83,24 +82,28 @@ type quorumDisk struct { index int } +// getReadFileQuorumDisks - get the current quorum disks. func (xl XL) getReadFileQuorumDisks(volume, path string) (quorumDisks []quorumDisk) { - diskVersionMap := xl.getMetaDataFileVersions(volume, path) + diskVersionMap := xl.getMetaFileVersionMap(volume, path) higherVersion := int64(0) - i := 0 - for disk, version := range diskVersionMap { - if version > higherVersion { - higherVersion = version - quorumDisks = []quorumDisk{{disk, i}} - } else if version == higherVersion { - quorumDisks = append(quorumDisks, quorumDisk{disk, i}) + for diskIndex, formatVersion := range diskVersionMap { + if formatVersion > higherVersion { + higherVersion = formatVersion + quorumDisks = []quorumDisk{{ + disk: xl.storageDisks[diskIndex], + index: diskIndex, + }} + } else if formatVersion == higherVersion { + quorumDisks = append(quorumDisks, quorumDisk{ + disk: xl.storageDisks[diskIndex], + index: diskIndex, + }) } - - i++ } - return } +// getFileSize - extract file size from metadata. func (xl XL) getFileSize(volume, path string, disk StorageAPI) (size int64, err error) { metadataFilePath := slashpath.Join(path, metadataFile) // set offset to 0 to read entire file @@ -133,10 +136,10 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) return nil, errInvalidArgument } - // Acquire a read lock. - TODO - disable this due to stack overflow bug. - // readLock := true - // xl.lockNS(volume, path, readLock) - // defer xl.unlockNS(volume, path, readLock) + // Acquire a read lock. + readLock := true + xl.lockNS(volume, path, readLock) + defer xl.unlockNS(volume, path, readLock) // Check read quorum. quorumDisks := xl.getReadFileQuorumDisks(volume, path) @@ -151,26 +154,30 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) } totalBlocks := xl.DataBlocks + xl.ParityBlocks // Total blocks. - readers := []io.ReadCloser{} + readers := make([]io.ReadCloser, len(quorumDisks)) readFileError := 0 - i := 0 for _, quorumDisk := range quorumDisks { erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", quorumDisk.index)) var erasuredPartReader io.ReadCloser if erasuredPartReader, err = quorumDisk.disk.ReadFile(volume, erasurePart, offset); err != nil { - // we can safely allow ReadFile errors up to len(quorumDisks) - xl.readQuorum + // We can safely allow ReadFile errors up to len(quorumDisks) - xl.readQuorum // otherwise return failure if readFileError < len(quorumDisks)-xl.readQuorum { + // Set the reader to 'nil' to be able to reconstruct later. + readers[quorumDisk.index] = nil readFileError++ continue } - - // TODO: handle currently available io.Reader in readers variable - return nil, err + // Control reaches here we do not have quorum + // anymore. Close all the readers. + for _, reader := range readers { + if reader != nil { + reader.Close() + } + } + return nil, errReadQuorum } - - readers[i] = erasuredPartReader - i++ + readers[quorumDisk.index] = erasuredPartReader } // Initialize pipe.