Posix CreateFile should work for compressed lengths (#7584)

This commit is contained in:
Praveen raj Mani 2019-05-01 04:57:31 +05:30 committed by kannappanr
parent a436f2baa5
commit c113d4e49c
6 changed files with 61 additions and 75 deletions

View File

@ -32,19 +32,9 @@ type streamingBitrotWriter struct {
h hash.Hash h hash.Hash
shardSize int64 shardSize int64
canClose chan struct{} // Needed to avoid race explained in Close() call. canClose chan struct{} // Needed to avoid race explained in Close() call.
// Following two fields are used only to make sure that Write(p) is called such that
// len(p) is always the block size except the last block, i.e prevent programmer errors.
currentBlockIdx int
verifyTillIdx int
} }
func (b *streamingBitrotWriter) Write(p []byte) (int, error) { func (b *streamingBitrotWriter) Write(p []byte) (int, error) {
if b.currentBlockIdx < b.verifyTillIdx && int64(len(p)) != b.shardSize {
// All blocks except last should be of the length b.shardSize
logger.LogIf(context.Background(), errUnexpected)
return 0, errUnexpected
}
if len(p) == 0 { if len(p) == 0 {
return 0, nil return 0, nil
} }
@ -57,7 +47,6 @@ func (b *streamingBitrotWriter) Write(p []byte) (int, error) {
return 0, err return 0, err
} }
n, err = b.iow.Write(p) n, err = b.iow.Write(p)
b.currentBlockIdx++
return n, err return n, err
} }
@ -78,10 +67,13 @@ func (b *streamingBitrotWriter) Close() error {
func newStreamingBitrotWriter(disk StorageAPI, volume, filePath string, length int64, algo BitrotAlgorithm, shardSize int64) io.WriteCloser { func newStreamingBitrotWriter(disk StorageAPI, volume, filePath string, length int64, algo BitrotAlgorithm, shardSize int64) io.WriteCloser {
r, w := io.Pipe() r, w := io.Pipe()
h := algo.New() h := algo.New()
bw := &streamingBitrotWriter{w, h, shardSize, make(chan struct{}), 0, int(length / shardSize)} bw := &streamingBitrotWriter{w, h, shardSize, make(chan struct{})}
go func() { go func() {
bitrotSumsTotalSize := ceilFrac(length, shardSize) * int64(h.Size()) // Size used for storing bitrot checksums. totalFileSize := int64(-1) // For compressed objects length will be unknown (represented by length=-1)
totalFileSize := bitrotSumsTotalSize + length if length != -1 {
bitrotSumsTotalSize := ceilFrac(length, shardSize) * int64(h.Size()) // Size used for storing bitrot checksums.
totalFileSize = bitrotSumsTotalSize + length
}
err := disk.CreateFile(volume, filePath, totalFileSize, r) err := disk.CreateFile(volume, filePath, totalFileSize, r)
if err != nil { if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("storageDisk", disk.String()) reqInfo := (&logger.ReqInfo{}).AppendTags("storageDisk", disk.String())

View File

@ -31,19 +31,9 @@ type wholeBitrotWriter struct {
filePath string filePath string
shardSize int64 // This is the shard size of the erasure logic shardSize int64 // This is the shard size of the erasure logic
hash.Hash // For bitrot hash hash.Hash // For bitrot hash
// Following two fields are used only to make sure that Write(p) is called such that
// len(p) is always the block size except the last block and prevent programmer errors.
currentBlockIdx int
lastBlockIdx int
} }
func (b *wholeBitrotWriter) Write(p []byte) (int, error) { func (b *wholeBitrotWriter) Write(p []byte) (int, error) {
if b.currentBlockIdx < b.lastBlockIdx && int64(len(p)) != b.shardSize {
// All blocks except last should be of the length b.shardSize
logger.LogIf(context.Background(), errUnexpected)
return 0, errUnexpected
}
err := b.disk.AppendFile(b.volume, b.filePath, p) err := b.disk.AppendFile(b.volume, b.filePath, p)
if err != nil { if err != nil {
logger.LogIf(context.Background(), err) logger.LogIf(context.Background(), err)
@ -54,7 +44,6 @@ func (b *wholeBitrotWriter) Write(p []byte) (int, error) {
logger.LogIf(context.Background(), err) logger.LogIf(context.Background(), err)
return 0, err return 0, err
} }
b.currentBlockIdx++
return len(p), nil return len(p), nil
} }
@ -63,8 +52,8 @@ func (b *wholeBitrotWriter) Close() error {
} }
// Returns whole-file bitrot writer. // Returns whole-file bitrot writer.
func newWholeBitrotWriter(disk StorageAPI, volume, filePath string, length int64, algo BitrotAlgorithm, shardSize int64) io.WriteCloser { func newWholeBitrotWriter(disk StorageAPI, volume, filePath string, algo BitrotAlgorithm, shardSize int64) io.WriteCloser {
return &wholeBitrotWriter{disk, volume, filePath, shardSize, algo.New(), 0, int(length / shardSize)} return &wholeBitrotWriter{disk, volume, filePath, shardSize, algo.New()}
} }
// Implementation to verify bitrot for the whole file. // Implementation to verify bitrot for the whole file.

View File

@ -120,7 +120,7 @@ func newBitrotWriter(disk StorageAPI, volume, filePath string, length int64, alg
if algo == HighwayHash256S { if algo == HighwayHash256S {
return newStreamingBitrotWriter(disk, volume, filePath, length, algo, shardSize) return newStreamingBitrotWriter(disk, volume, filePath, length, algo, shardSize)
} }
return newWholeBitrotWriter(disk, volume, filePath, length, algo, shardSize) return newWholeBitrotWriter(disk, volume, filePath, algo, shardSize)
} }
func newBitrotReader(disk StorageAPI, bucket string, filePath string, tillOffset int64, algo BitrotAlgorithm, sum []byte, shardSize int64) io.ReaderAt { func newBitrotReader(disk StorageAPI, bucket string, filePath string, tillOffset int64, algo BitrotAlgorithm, sum []byte, shardSize int64) io.ReaderAt {

View File

@ -103,6 +103,9 @@ func (e *Erasure) ShardFileSize(totalLength int64) int64 {
if totalLength == 0 { if totalLength == 0 {
return 0 return 0
} }
if totalLength == -1 {
return -1
}
numShards := totalLength / e.blockSize numShards := totalLength / e.blockSize
lastBlockSize := totalLength % int64(e.blockSize) lastBlockSize := totalLength % int64(e.blockSize)
lastShardSize := ceilFrac(lastBlockSize, int64(e.dataBlocks)) lastShardSize := ceilFrac(lastBlockSize, int64(e.dataBlocks))

View File

@ -187,7 +187,7 @@ func newPosix(path string) (*posix, error) {
p := &posix{ p := &posix{
connected: true, connected: true,
diskPath: path, diskPath: path,
// 1MiB buffer pool for posix internal operations. // 4MiB buffer pool for posix internal operations.
pool: sync.Pool{ pool: sync.Pool{
New: func() interface{} { New: func() interface{} {
b := directio.AlignedBlock(posixWriteBlockSize) b := directio.AlignedBlock(posixWriteBlockSize)
@ -1028,10 +1028,9 @@ func (s *posix) ReadFileStream(volume, path string, offset, length int64) (io.Re
// CreateFile - creates the file. // CreateFile - creates the file.
func (s *posix) CreateFile(volume, path string, fileSize int64, r io.Reader) (err error) { func (s *posix) CreateFile(volume, path string, fileSize int64, r io.Reader) (err error) {
if fileSize < 0 { if fileSize < -1 {
return errInvalidArgument return errInvalidArgument
} }
defer func() { defer func() {
if err == errFaultyDisk { if err == errFaultyDisk {
atomic.AddInt32(&s.ioErrCount, 1) atomic.AddInt32(&s.ioErrCount, 1)
@ -1120,69 +1119,69 @@ func (s *posix) CreateFile(volume, path string, fileSize int64, r io.Reader) (er
defer s.pool.Put(bufp) defer s.pool.Put(bufp)
buf := *bufp buf := *bufp
var written int64
dioCount := int(fileSize) / len(buf) // Writes remaining bytes in the buffer.
for i := 0; i < dioCount; i++ { writeRemaining := func(w *os.File, buf []byte) (remainingWritten int, err error) {
var n int var n int
_, err = io.ReadFull(r, buf) remaining := len(buf)
if err != nil { // The following logic writes the remainging data such that it writes whatever best is possible (aligned buffer)
return err // in O_DIRECT mode and remaining (unaligned buffer) in non-O_DIRECT mode.
}
n, err = w.Write(buf)
if err != nil {
return err
}
written += int64(n)
}
// The following logic writes the remainging data such that it writes whatever best is possible (aligned buffer)
// in O_DIRECT mode and remaining (unaligned buffer) in non-O_DIRECT mode.
remaining := fileSize % int64(len(buf))
if remaining != 0 {
buf = buf[:remaining]
_, err = io.ReadFull(r, buf)
if err != nil {
return err
}
remainingAligned := (remaining / directioAlignSize) * directioAlignSize remainingAligned := (remaining / directioAlignSize) * directioAlignSize
remainingAlignedBuf := buf[:remainingAligned] remainingAlignedBuf := buf[:remainingAligned]
remainingUnalignedBuf := buf[remainingAligned:] remainingUnalignedBuf := buf[remainingAligned:]
if len(remainingAlignedBuf) > 0 { if len(remainingAlignedBuf) > 0 {
var n int
n, err = w.Write(remainingAlignedBuf) n, err = w.Write(remainingAlignedBuf)
if err != nil { if err != nil {
return err return 0, err
} }
written += int64(n) remainingWritten += n
} }
if len(remainingUnalignedBuf) > 0 { if len(remainingUnalignedBuf) > 0 {
var n int
// Write on O_DIRECT fds fail if buffer is not 4K aligned, hence disable O_DIRECT. // Write on O_DIRECT fds fail if buffer is not 4K aligned, hence disable O_DIRECT.
if err = disk.DisableDirectIO(w); err != nil { if err = disk.DisableDirectIO(w); err != nil {
return err return 0, err
} }
n, err = w.Write(remainingUnalignedBuf) n, err = w.Write(remainingUnalignedBuf)
if err != nil {
return 0, err
}
remainingWritten += n
}
return remainingWritten, nil
}
var written int
for {
var n int
n, err = io.ReadFull(r, buf)
switch err {
case nil:
n, err = w.Write(buf)
if err != nil { if err != nil {
return err return err
} }
written += int64(n) written += n
case io.ErrUnexpectedEOF:
n, err = writeRemaining(w, buf[:n])
if err != nil {
return err
}
written += n
fallthrough
case io.EOF:
if fileSize != -1 {
if written < int(fileSize) {
return errLessData
}
if written > int(fileSize) {
return errMoreData
}
}
return nil
default:
return err
} }
} }
// Do some sanity checks.
_, err = io.ReadFull(r, buf)
if err != io.EOF {
return errMoreData
}
if written < fileSize {
return errLessData
}
if written > fileSize {
return errMoreData
}
return nil
} }
func (s *posix) WriteAll(volume, path string, buf []byte) (err error) { func (s *posix) WriteAll(volume, path string, buf []byte) (err error) {

View File

@ -350,6 +350,9 @@ func calculatePartSizeFromIdx(ctx context.Context, totalSize int64, partSize int
logger.LogIf(ctx, errPartSizeIndex) logger.LogIf(ctx, errPartSizeIndex)
return 0, errPartSizeIndex return 0, errPartSizeIndex
} }
if totalSize == -1 {
return -1, nil
}
if totalSize > 0 { if totalSize > 0 {
// Compute the total count of parts // Compute the total count of parts
partsCount := totalSize/partSize + 1 partsCount := totalSize/partSize + 1