xl: Rename blockingWriteCloser to waitCloser. (#1376)

This commit is contained in:
Harshavardhana 2016-04-25 16:00:22 -07:00 committed by Harshavardhana
parent 00c697393a
commit 42254b5c4d
3 changed files with 35 additions and 26 deletions

View File

@ -184,7 +184,7 @@ func (n networkFS) CreateFile(volume, path string) (writeCloser io.WriteCloser,
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{
"volume": volume, "volume": volume,
"path": path, "path": path,
}).Debugf("CreateFile http POST failed to upload the data with error %s", err) }).Debugf("CreateFile HTTP POST failed to upload data with error %s", err)
readCloser.CloseWithError(err) readCloser.CloseWithError(err)
return return
} }

View File

@ -38,14 +38,21 @@ const erasureBlockSize = 4 * 1024 * 1024 // 4MiB.
func (xl XL) cleanupCreateFileOps(volume, path string, writers ...io.WriteCloser) { func (xl XL) cleanupCreateFileOps(volume, path string, writers ...io.WriteCloser) {
closeAndRemoveWriters(writers...) closeAndRemoveWriters(writers...)
for _, disk := range xl.storageDisks { for _, disk := range xl.storageDisks {
disk.DeleteFile(volume, path) if err := disk.DeleteFile(volume, path); err != nil {
log.WithFields(logrus.Fields{
"volume": volume,
"path": path,
}).Errorf("DeleteFile failed with %s", err)
}
} }
} }
// Close and remove writers if they are safeFile. // Close and remove writers if they are safeFile.
func closeAndRemoveWriters(writers ...io.WriteCloser) { func closeAndRemoveWriters(writers ...io.WriteCloser) {
for _, writer := range writers { for _, writer := range writers {
safeCloseAndRemove(writer) if err := safeCloseAndRemove(writer); err != nil {
log.Errorf("Closing writer failed with %s", err)
}
} }
} }
@ -128,9 +135,9 @@ func (xl XL) getFileQuorumVersionMap(volume, path string) map[int]int64 {
// WriteErasure reads predefined blocks, encodes them and writes to // WriteErasure reads predefined blocks, encodes them and writes to
// configured storage disks. // configured storage disks.
func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, bwriter *blockingWriteCloser) { func (xl XL) writeErasure(volume, path string, reader *io.PipeReader, wcloser *waitCloser) {
// Release the block writer upon function return. // Release the block writer upon function return.
defer bwriter.Release() defer wcloser.release()
// Get available quorum for existing file path. // Get available quorum for existing file path.
_, higherVersion := xl.getQuorumDisks(volume, path) _, higherVersion := xl.getQuorumDisks(volume, path)
@ -335,12 +342,12 @@ func (xl XL) CreateFile(volume, path string) (writeCloser io.WriteCloser, err er
// Initialize pipe for data pipe line. // Initialize pipe for data pipe line.
pipeReader, pipeWriter := io.Pipe() pipeReader, pipeWriter := io.Pipe()
// Initialize a new blocking writer closer. // Initialize a new wait closer, implements both Write and Close.
blockingWriter := newBlockingWriteCloser(pipeWriter) wcloser := newWaitCloser(pipeWriter)
// Start erasure encoding in routine, reading data block by block from pipeReader. // Start erasure encoding in routine, reading data block by block from pipeReader.
go xl.writeErasure(volume, path, pipeReader, blockingWriter) go xl.writeErasure(volume, path, pipeReader, wcloser)
// Return the blocking writer, caller should start writing to this. // Return the writer, caller should start writing to this.
return blockingWriter, nil return wcloser, nil
} }

View File

@ -21,42 +21,44 @@ import (
"sync" "sync"
) )
// blockingWriteCloser is a WriteCloser that blocks until released. // waitCloser implements a Closer that blocks until released, this
type blockingWriteCloser struct { // prevents the writer closing one end of a pipe while making sure
writer io.WriteCloser // Embedded writer. // that all data is written and committed to disk on the end.
release *sync.WaitGroup // Waitgroup for atomicity. // Additionally this also implements Write().
err error type waitCloser struct {
wg *sync.WaitGroup // Waitgroup for atomicity.
writer io.WriteCloser // Embedded writer.
} }
// Write to the underlying writer. // Write to the underlying writer.
func (b *blockingWriteCloser) Write(data []byte) (int, error) { func (b *waitCloser) Write(data []byte) (int, error) {
return b.writer.Write(data) return b.writer.Write(data)
} }
// Close blocks until another goroutine calls Release(error). Returns // Close blocks until another goroutine calls Release(error). Returns
// error code if either writer fails or Release is called with an error. // error code if either writer fails or Release is called with an error.
func (b *blockingWriteCloser) Close() error { func (b *waitCloser) Close() error {
err := b.writer.Close() err := b.writer.Close()
b.release.Wait() b.wg.Wait()
return err return err
} }
// Release the Close, causing it to unblock. Only call this // release the Close, causing it to unblock. Only call this
// once. Calling it multiple times results in a panic. // once. Calling it multiple times results in a panic.
func (b *blockingWriteCloser) Release() { func (b *waitCloser) release() {
b.release.Done() b.wg.Done()
return return
} }
// newBlockingWriteCloser Creates a new write closer that must be // newWaitCloser creates a new write closer that must be
// released by the read consumer. // released by the read consumer.
func newBlockingWriteCloser(writer io.WriteCloser) *blockingWriteCloser { func newWaitCloser(writer io.WriteCloser) *waitCloser {
// Wait group for the go-routine. // Wait group for the go-routine.
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
// Add to the wait group to wait for. // Add to the wait group to wait for.
wg.Add(1) wg.Add(1)
return &blockingWriteCloser{ return &waitCloser{
writer: writer, wg: wg,
release: wg, writer: writer,
} }
} }