xl: Add more fixes and cleanup.

Simplify cleanup of temporary files during createFile operations.
This commit is contained in:
Harshavardhana 2016-04-19 12:30:10 -07:00
parent ada0f82b9a
commit b76f3f1d62
5 changed files with 82 additions and 62 deletions

View File

@ -42,3 +42,6 @@ var errVolumeAccessDenied = errors.New("volume access denied")
// errVolumeAccessDenied - cannot access file, insufficient permissions. // errVolumeAccessDenied - cannot access file, insufficient permissions.
var errFileAccessDenied = errors.New("file access denied") var errFileAccessDenied = errors.New("file access denied")
// errReadQuorum - did not meet read quorum.
var errReadQuorum = errors.New("I/O error. do not meet read quorum")

View File

@ -32,6 +32,15 @@ import (
// Erasure block size. // Erasure block size.
const erasureBlockSize = 4 * 1024 * 1024 // 4MiB. const erasureBlockSize = 4 * 1024 * 1024 // 4MiB.
// cleanupCreateFileOps - cleans up all the temporary files and other
// temporary data upon any failure.
func (xl XL) cleanupCreateFileOps(volume, path string, writers ...io.WriteCloser) {
closeAndRemoveWriters(writers...)
for _, disk := range xl.storageDisks {
disk.DeleteFile(volume, path)
}
}
// Close and remove writers if they are safeFile. // Close and remove writers if they are safeFile.
func closeAndRemoveWriters(writers ...io.WriteCloser) { func closeAndRemoveWriters(writers ...io.WriteCloser) {
for _, writer := range writers { for _, writer := range writers {
@ -54,9 +63,7 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) {
writers[index], err = disk.CreateFile(volume, erasurePart) writers[index], err = disk.CreateFile(volume, erasurePart)
if err != nil { if err != nil {
// Remove previous temp writers for any failure. // Remove previous temp writers for any failure.
closeAndRemoveWriters(writers...) xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...)
closeAndRemoveWriters(metadataWriters...)
deletePathAll(volume, path, xl.storageDisks...)
reader.CloseWithError(err) reader.CloseWithError(err)
return return
} }
@ -64,9 +71,7 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) {
metadataWriters[index], err = disk.CreateFile(volume, metadataFilePath) metadataWriters[index], err = disk.CreateFile(volume, metadataFilePath)
if err != nil { if err != nil {
// Remove previous temp writers for any failure. // Remove previous temp writers for any failure.
closeAndRemoveWriters(writers...) xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...)
closeAndRemoveWriters(metadataWriters...)
deletePathAll(volume, path, xl.storageDisks...)
reader.CloseWithError(err) reader.CloseWithError(err)
return return
} }
@ -83,9 +88,7 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) {
// Any unexpected errors, close the pipe reader with error. // Any unexpected errors, close the pipe reader with error.
if err != io.ErrUnexpectedEOF && err != io.EOF { if err != io.ErrUnexpectedEOF && err != io.EOF {
// Remove all temp writers. // Remove all temp writers.
closeAndRemoveWriters(writers...) xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...)
closeAndRemoveWriters(metadataWriters...)
deletePathAll(volume, path, xl.storageDisks...)
reader.CloseWithError(err) reader.CloseWithError(err)
return return
} }
@ -100,9 +103,7 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) {
blocks, err = xl.ReedSolomon.Split(buffer[0:n]) blocks, err = xl.ReedSolomon.Split(buffer[0:n])
if err != nil { if err != nil {
// Remove all temp writers. // Remove all temp writers.
closeAndRemoveWriters(writers...) xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...)
closeAndRemoveWriters(metadataWriters...)
deletePathAll(volume, path, xl.storageDisks...)
reader.CloseWithError(err) reader.CloseWithError(err)
return return
} }
@ -110,9 +111,7 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) {
err = xl.ReedSolomon.Encode(blocks) err = xl.ReedSolomon.Encode(blocks)
if err != nil { if err != nil {
// Remove all temp writers upon error. // Remove all temp writers upon error.
closeAndRemoveWriters(writers...) xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...)
closeAndRemoveWriters(metadataWriters...)
deletePathAll(volume, path, xl.storageDisks...)
reader.CloseWithError(err) reader.CloseWithError(err)
return return
} }
@ -121,9 +120,7 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) {
_, err = writers[index].Write(encodedData) _, err = writers[index].Write(encodedData)
if err != nil { if err != nil {
// Remove all temp writers upon error. // Remove all temp writers upon error.
closeAndRemoveWriters(writers...) xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...)
closeAndRemoveWriters(metadataWriters...)
deletePathAll(volume, path, xl.storageDisks...)
reader.CloseWithError(err) reader.CloseWithError(err)
return return
} }
@ -157,18 +154,16 @@ func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) {
// Marshal metadata into json strings. // Marshal metadata into json strings.
metadataBytes, err := json.Marshal(metadata) metadataBytes, err := json.Marshal(metadata)
if err != nil { if err != nil {
closeAndRemoveWriters(writers...) // Remove temporary files.
closeAndRemoveWriters(metadataWriters...) xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...)
deletePathAll(volume, path, xl.storageDisks...)
reader.CloseWithError(err) reader.CloseWithError(err)
return return
} }
// Write metadata to disk.
_, err = metadataWriter.Write(metadataBytes) _, err = metadataWriter.Write(metadataBytes)
if err != nil { if err != nil {
closeAndRemoveWriters(writers...) xl.cleanupCreateFileOps(volume, path, append(writers, metadataWriters...)...)
closeAndRemoveWriters(metadataWriters...)
deletePathAll(volume, path, xl.storageDisks...)
reader.CloseWithError(err) reader.CloseWithError(err)
return return
} }

View File

@ -18,44 +18,56 @@ package main
import "sync" import "sync"
// nameSpaceParam - carries name space resource.
type nameSpaceParam struct { type nameSpaceParam struct {
volume string volume string
path string path string
} }
// nameSpaceLock - provides primitives for locking critical namespace regions.
type nameSpaceLock struct { type nameSpaceLock struct {
rwMutex *sync.RWMutex rwMutex *sync.RWMutex
count uint rcount uint
wcount uint
} }
func (nsLock nameSpaceLock) InUse() bool { func (nsLock *nameSpaceLock) InUse() bool {
return nsLock.count != 0 return nsLock.rcount != 0 || nsLock.wcount != 0
} }
func (nsLock nameSpaceLock) Lock() { // Lock acquires write lock and increments the namespace counter.
func (nsLock *nameSpaceLock) Lock() {
nsLock.Lock() nsLock.Lock()
nsLock.count++ nsLock.wcount++
} }
func (nsLock nameSpaceLock) Unlock() { // Unlock releases write lock and decrements the namespace counter.
func (nsLock *nameSpaceLock) Unlock() {
nsLock.Unlock() nsLock.Unlock()
if nsLock.count != 0 { if nsLock.wcount != 0 {
nsLock.count-- nsLock.wcount--
} }
} }
func (nsLock nameSpaceLock) RLock() { // RLock acquires read lock and increments the namespace counter.
func (nsLock *nameSpaceLock) RLock() {
nsLock.RLock() nsLock.RLock()
nsLock.count++ nsLock.rcount++
} }
func (nsLock nameSpaceLock) RUnlock() { // RUnlock release read lock and decrements the namespace counter.
func (nsLock *nameSpaceLock) RUnlock() {
nsLock.RUnlock() nsLock.RUnlock()
if nsLock.count != 0 { if nsLock.rcount != 0 {
nsLock.count-- nsLock.rcount--
} }
} }
func newNameSpaceLock() nameSpaceLock { // newNSLock - provides a new instance of namespace locking primitives.
return nameSpaceLock{rwMutex: &sync.RWMutex{}, count: 0} func newNSLock() *nameSpaceLock {
return &nameSpaceLock{
rwMutex: &sync.RWMutex{},
rcount: 0,
wcount: 0,
}
} }

View File

@ -133,17 +133,19 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
return nil, errInvalidArgument return nil, errInvalidArgument
} }
xl.lockNameSpace(volume, path, true) // Acquire a read lock.
defer xl.unlockNameSpace(volume, path, true) readLock := true
xl.lockNS(volume, path, readLock)
defer xl.unlockNS(volume, path, readLock)
// check read quorum // Check read quorum.
quorumDisks := xl.getReadFileQuorumDisks(volume, path) quorumDisks := xl.getReadFileQuorumDisks(volume, path)
if len(quorumDisks) < xl.readQuorum { if len(quorumDisks) < xl.readQuorum {
return nil, errors.New("I/O error. do not meet read quorum") return nil, errReadQuorum
} }
// get file size // Get file size.
size, err := xl.getFileSize(volume, path, quorumDisks[0].disk) fileSize, err := xl.getFileSize(volume, path, quorumDisks[0].disk)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -174,7 +176,7 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
// Initialize pipe. // Initialize pipe.
pipeReader, pipeWriter := io.Pipe() pipeReader, pipeWriter := io.Pipe()
go func() { go func() {
var totalLeft = size var totalLeft = fileSize
// Read until the totalLeft. // Read until the totalLeft.
for totalLeft > 0 { for totalLeft > 0 {
// Figure out the right blockSize as it was encoded before. // Figure out the right blockSize as it was encoded before.
@ -210,6 +212,7 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
pipeWriter.CloseWithError(err) pipeWriter.CloseWithError(err)
return return
} }
// Verify the blocks. // Verify the blocks.
var ok bool var ok bool
ok, err = xl.ReedSolomon.Verify(enBlocks) ok, err = xl.ReedSolomon.Verify(enBlocks)
@ -217,6 +220,7 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
pipeWriter.CloseWithError(err) pipeWriter.CloseWithError(err)
return return
} }
// Verification failed, blocks require reconstruction. // Verification failed, blocks require reconstruction.
if !ok { if !ok {
err = xl.ReedSolomon.Reconstruct(enBlocks) err = xl.ReedSolomon.Reconstruct(enBlocks)
@ -237,15 +241,18 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
return return
} }
} }
// Join the decoded blocks. // Join the decoded blocks.
err = xl.ReedSolomon.Join(pipeWriter, enBlocks, curBlockSize) err = xl.ReedSolomon.Join(pipeWriter, enBlocks, curBlockSize)
if err != nil { if err != nil {
pipeWriter.CloseWithError(err) pipeWriter.CloseWithError(err)
return return
} }
// Save what's left after reading erasureBlockSize. // Save what's left after reading erasureBlockSize.
totalLeft = totalLeft - erasureBlockSize totalLeft = totalLeft - erasureBlockSize
} }
// Cleanly end the pipe after a successful decoding. // Cleanly end the pipe after a successful decoding.
pipeWriter.Close() pipeWriter.Close()

View File

@ -45,23 +45,25 @@ type XL struct {
DataBlocks int DataBlocks int
ParityBlocks int ParityBlocks int
storageDisks []StorageAPI storageDisks []StorageAPI
nameSpaceLockMap map[nameSpaceParam]nameSpaceLock nameSpaceLockMap map[nameSpaceParam]*nameSpaceLock
nameSpaceLockMapMutex *sync.Mutex nameSpaceLockMapMutex *sync.Mutex
readQuorum int readQuorum int
writeQuorum int writeQuorum int
} }
func (xl XL) lockNameSpace(volume, path string, readOnly bool) { // lockNS - locks the given resource, using a previously allocated
// name space lock or initializing a new one.
func (xl XL) lockNS(volume, path string, readLock bool) {
xl.nameSpaceLockMapMutex.Lock() xl.nameSpaceLockMapMutex.Lock()
defer xl.nameSpaceLockMapMutex.Unlock() defer xl.nameSpaceLockMapMutex.Unlock()
param := nameSpaceParam{volume, path} param := nameSpaceParam{volume, path}
nsLock, found := xl.nameSpaceLockMap[param] nsLock, found := xl.nameSpaceLockMap[param]
if !found { if !found {
nsLock = newNameSpaceLock() nsLock = newNSLock()
} }
if readOnly { if readLock {
nsLock.RLock() nsLock.RLock()
} else { } else {
nsLock.Lock() nsLock.Lock()
@ -70,13 +72,14 @@ func (xl XL) lockNameSpace(volume, path string, readOnly bool) {
xl.nameSpaceLockMap[param] = nsLock xl.nameSpaceLockMap[param] = nsLock
} }
func (xl XL) unlockNameSpace(volume, path string, readOnly bool) { // unlockNS - unlocks any previously acquired read or write locks.
func (xl XL) unlockNS(volume, path string, readLock bool) {
xl.nameSpaceLockMapMutex.Lock() xl.nameSpaceLockMapMutex.Lock()
defer xl.nameSpaceLockMapMutex.Unlock() defer xl.nameSpaceLockMapMutex.Unlock()
param := nameSpaceParam{volume, path} param := nameSpaceParam{volume, path}
if nsLock, found := xl.nameSpaceLockMap[param]; found { if nsLock, found := xl.nameSpaceLockMap[param]; found {
if readOnly { if readLock {
nsLock.RUnlock() nsLock.RUnlock()
} else { } else {
nsLock.Unlock() nsLock.Unlock()
@ -136,10 +139,17 @@ func newXL(disks ...string) (StorageAPI, error) {
// Save all the initialized storage disks. // Save all the initialized storage disks.
xl.storageDisks = storageDisks xl.storageDisks = storageDisks
xl.nameSpaceLockMap = make(map[nameSpaceParam]nameSpaceLock) // Initialize name space lock map.
xl.nameSpaceLockMap = make(map[nameSpaceParam]*nameSpaceLock)
xl.nameSpaceLockMapMutex = &sync.Mutex{} xl.nameSpaceLockMapMutex = &sync.Mutex{}
xl.readQuorum = len(xl.storageDisks) / 2
xl.writeQuorum = xl.readQuorum + 3 // Figure out read and write quorum based on number of storage disks.
// Read quorum should be always N/2 + 1 (due to Vandermonde matrix
// erasure requirements)
xl.readQuorum = len(xl.storageDisks)/2 + 1
// Write quorum is assumed if we have total disks + 3
// parity. (Need to discuss this again)
xl.writeQuorum = len(xl.storageDisks)/2 + 3
if xl.writeQuorum > len(xl.storageDisks) { if xl.writeQuorum > len(xl.storageDisks) {
xl.writeQuorum = len(xl.storageDisks) xl.writeQuorum = len(xl.storageDisks)
} }
@ -432,13 +442,6 @@ func (xl XL) StatFile(volume, path string) (FileInfo, error) {
}, nil }, nil
} }
// Delete all path.
func deletePathAll(volume, path string, disks ...StorageAPI) {
for _, disk := range disks {
disk.DeleteFile(volume, path)
}
}
// DeleteFile - delete a file // DeleteFile - delete a file
func (xl XL) DeleteFile(volume, path string) error { func (xl XL) DeleteFile(volume, path string) error {
if !isValidVolname(volume) { if !isValidVolname(volume) {