xl: add quorum support for create file

This commit is contained in:
Bala FA 2016-04-20 22:44:30 +05:30 committed by Harshavardhana
parent 141a44bfbf
commit 45b3d3e21f
2 changed files with 67 additions and 24 deletions

View File

@ -51,34 +51,67 @@ func closeAndRemoveWriters(writers ...io.WriteCloser) {
// WriteErasure reads predefined blocks, encodes them and writes to // WriteErasure reads predefined blocks, encodes them and writes to
// configured storage disks. // configured storage disks.
func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) { func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) {
var writers = make([]io.WriteCloser, len(xl.storageDisks)) xl.lockNS(volume, path, false)
var sha512Writers = make([]hash.Hash, len(xl.storageDisks)) defer xl.unlockNS(volume, path, false)
var metadataWriters = make([]io.WriteCloser, len(xl.storageDisks))
// get available quorum for existing file path
_, higherVersion := xl.getQuorumDisks(volume, path)
// increment to have next higher version
higherVersion++
quorumDisks := make([]quorumDisk, len(xl.storageDisks))
writers := make([]io.WriteCloser, len(xl.storageDisks))
sha512Writers := make([]hash.Hash, len(xl.storageDisks))
metadataFilePath := slashpath.Join(path, metadataFile)
metadataWriters := make([]io.WriteCloser, len(xl.storageDisks))
// Save additional erasureMetadata. // Save additional erasureMetadata.
modTime := time.Now().UTC() modTime := time.Now().UTC()
// Initialize storage disks, get all the writers and corresponding createFileError := 0
// metadata writers. maxIndex := 0
for index, disk := range xl.storageDisks { for index, disk := range xl.storageDisks {
var err error
erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", index)) erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", index))
writers[index], err = disk.CreateFile(volume, erasurePart) writer, err := disk.CreateFile(volume, erasurePart)
if err != nil { if err != nil {
createFileError++
// we can safely allow CreateFile errors up to len(xl.storageDisks) - xl.writeQuorum
// otherwise return failure
if createFileError <= len(xl.storageDisks)-xl.writeQuorum {
continue
}
// Remove previous temp writers for any failure. // Remove previous temp writers for any failure.
xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...)
reader.CloseWithError(err) reader.CloseWithError(err)
return return
} }
metadataFilePath := slashpath.Join(path, metadataFile)
metadataWriters[index], err = disk.CreateFile(volume, metadataFilePath) // create meta data file
var metadataWriter io.WriteCloser
metadataWriter, err = disk.CreateFile(volume, metadataFilePath)
if err != nil { if err != nil {
createFileError++
// we can safely allow CreateFile errors up to len(xl.storageDisks) - xl.writeQuorum
// otherwise return failure
if createFileError <= len(xl.storageDisks)-xl.writeQuorum {
continue
}
// Remove previous temp writers for any failure. // Remove previous temp writers for any failure.
xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...)
reader.CloseWithError(err) reader.CloseWithError(err)
return return
} }
sha512Writers[index] = fastSha512.New()
writers[maxIndex] = writer
metadataWriters[maxIndex] = metadataWriter
sha512Writers[maxIndex] = fastSha512.New()
quorumDisks[maxIndex] = quorumDisk{disk, index}
maxIndex++
} }
// Allocate 4MiB block size buffer for reading. // Allocate 4MiB block size buffer for reading.
@ -118,16 +151,19 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) {
reader.CloseWithError(err) reader.CloseWithError(err)
return return
} }
// Loop through and write encoded data to all the disks.
for index, encodedData := range blocks { // Loop through and write encoded data to quorum disks.
_, err = writers[index].Write(encodedData) for i := 0; i < maxIndex; i++ {
encodedData := blocks[quorumDisks[i].index]
_, err = writers[i].Write(encodedData)
if err != nil { if err != nil {
// Remove all temp writers upon error. // Remove all temp writers upon error.
xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...)
reader.CloseWithError(err) reader.CloseWithError(err)
return return
} }
sha512Writers[index].Write(encodedData) sha512Writers[i].Write(encodedData)
} }
// Update total written. // Update total written.
totalSize += int64(n) totalSize += int64(n)
@ -141,15 +177,23 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) {
metadata["format.minor"] = "0" metadata["format.minor"] = "0"
metadata["format.patch"] = "0" metadata["format.patch"] = "0"
metadata["file.size"] = strconv.FormatInt(totalSize, 10) metadata["file.size"] = strconv.FormatInt(totalSize, 10)
if len(xl.storageDisks) > len(writers) {
// save file.version only if we wrote to less disks than all disks
metadata["file.version"] = strconv.FormatInt(higherVersion, 10)
}
metadata["file.modTime"] = modTime.Format(timeFormatAMZ) metadata["file.modTime"] = modTime.Format(timeFormatAMZ)
metadata["file.xl.blockSize"] = strconv.Itoa(erasureBlockSize) metadata["file.xl.blockSize"] = strconv.Itoa(erasureBlockSize)
metadata["file.xl.dataBlocks"] = strconv.Itoa(xl.DataBlocks) metadata["file.xl.dataBlocks"] = strconv.Itoa(xl.DataBlocks)
metadata["file.xl.parityBlocks"] = strconv.Itoa(xl.ParityBlocks) metadata["file.xl.parityBlocks"] = strconv.Itoa(xl.ParityBlocks)
// Write all the metadata. // Write all the metadata.
for index, metadataWriter := range metadataWriters { // below case is not handled here
// Case: when storageDisks is 16 and write quorumDisks is 13,
// meta data write failure up to 2 can be considered.
// currently we fail for any meta data writes
for i := 0; i < maxIndex; i++ {
// Save sha512 checksum of each encoded blocks. // Save sha512 checksum of each encoded blocks.
metadata["file.xl.block512Sum"] = hex.EncodeToString(sha512Writers[index].Sum(nil)) metadata["file.xl.block512Sum"] = hex.EncodeToString(sha512Writers[i].Sum(nil))
// Marshal metadata into json strings. // Marshal metadata into json strings.
metadataBytes, err := json.Marshal(metadata) metadataBytes, err := json.Marshal(metadata)
@ -161,7 +205,7 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) {
} }
// Write metadata to disk. // Write metadata to disk.
_, err = metadataWriter.Write(metadataBytes) _, err = metadataWriters[i].Write(metadataBytes)
if err != nil { if err != nil {
xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...) xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...)
reader.CloseWithError(err) reader.CloseWithError(err)
@ -170,10 +214,10 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) {
} }
// Close all writers and metadata writers in routines. // Close all writers and metadata writers in routines.
for index := range xl.storageDisks { for i := 0; i < maxIndex; i++ {
// Safely wrote, now rename to its actual location. // Safely wrote, now rename to its actual location.
writers[index].Close() writers[i].Close()
metadataWriters[index].Close() metadataWriters[i].Close()
} }
// Close the pipe reader and return. // Close the pipe reader and return.

View File

@ -82,10 +82,9 @@ type quorumDisk struct {
index int index int
} }
// getReadFileQuorumDisks - get the current quorum disks. // getQuorumDisks - get the current quorum disks.
func (xl XL) getReadFileQuorumDisks(volume, path string) (quorumDisks []quorumDisk) { func (xl XL) getQuorumDisks(volume, path string) (quorumDisks []quorumDisk, higherVersion int64) {
diskVersionMap := xl.getMetaFileVersionMap(volume, path) diskVersionMap := xl.getMetaFileVersionMap(volume, path)
higherVersion := int64(0)
for diskIndex, formatVersion := range diskVersionMap { for diskIndex, formatVersion := range diskVersionMap {
if formatVersion > higherVersion { if formatVersion > higherVersion {
higherVersion = formatVersion higherVersion = formatVersion
@ -142,7 +141,7 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
defer xl.unlockNS(volume, path, readLock) defer xl.unlockNS(volume, path, readLock)
// Check read quorum. // Check read quorum.
quorumDisks := xl.getReadFileQuorumDisks(volume, path) quorumDisks, _ := xl.getQuorumDisks(volume, path)
if len(quorumDisks) < xl.readQuorum { if len(quorumDisks) < xl.readQuorum {
return nil, errReadQuorum return nil, errReadQuorum
} }