mirror of
https://github.com/minio/minio.git
synced 2025-01-26 14:13:16 -05:00
fs/XL: Return IncompleteBody{} error for short writes (#2228)
This commit is contained in:
parent
27a5b61f40
commit
5cc9e4e214
@ -28,7 +28,7 @@ import (
|
|||||||
// erasureCreateFile - writes an entire stream by erasure coding to
|
// erasureCreateFile - writes an entire stream by erasure coding to
|
||||||
// all the disks, writes also calculate individual block's checksum
|
// all the disks, writes also calculate individual block's checksum
|
||||||
// for future bit-rot protection.
|
// for future bit-rot protection.
|
||||||
func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader, blockSize int64, dataBlocks int, parityBlocks int, writeQuorum int) (size int64, checkSums []string, err error) {
|
func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader, blockSize int64, dataBlocks int, parityBlocks int, writeQuorum int) (bytesWritten int64, checkSums []string, err error) {
|
||||||
// Allocated blockSized buffer for reading.
|
// Allocated blockSized buffer for reading.
|
||||||
buf := make([]byte, blockSize)
|
buf := make([]byte, blockSize)
|
||||||
|
|
||||||
@ -47,7 +47,7 @@ func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader
|
|||||||
// We have reached EOF on the first byte read, io.Reader
|
// We have reached EOF on the first byte read, io.Reader
|
||||||
// must be 0bytes, we don't need to erasure code
|
// must be 0bytes, we don't need to erasure code
|
||||||
// data. Will create a 0byte file instead.
|
// data. Will create a 0byte file instead.
|
||||||
if size == 0 {
|
if bytesWritten == 0 {
|
||||||
blocks = make([][]byte, len(disks))
|
blocks = make([][]byte, len(disks))
|
||||||
rErr = appendFile(disks, volume, path, blocks, hashWriters, writeQuorum)
|
rErr = appendFile(disks, volume, path, blocks, hashWriters, writeQuorum)
|
||||||
if rErr != nil {
|
if rErr != nil {
|
||||||
@ -72,7 +72,7 @@ func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader
|
|||||||
if err = appendFile(disks, volume, path, blocks, hashWriters, writeQuorum); err != nil {
|
if err = appendFile(disks, volume, path, blocks, hashWriters, writeQuorum); err != nil {
|
||||||
return 0, nil, err
|
return 0, nil, err
|
||||||
}
|
}
|
||||||
size += int64(n)
|
bytesWritten += int64(n)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -80,7 +80,7 @@ func erasureCreateFile(disks []StorageAPI, volume, path string, reader io.Reader
|
|||||||
for i := range checkSums {
|
for i := range checkSums {
|
||||||
checkSums[i] = hex.EncodeToString(hashWriters[i].Sum(nil))
|
checkSums[i] = hex.EncodeToString(hashWriters[i].Sum(nil))
|
||||||
}
|
}
|
||||||
return size, checkSums, nil
|
return bytesWritten, checkSums, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// encodeData - encodes incoming data buffer into
|
// encodeData - encodes incoming data buffer into
|
||||||
|
41
fs-createfile.go
Normal file
41
fs-createfile.go
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
/*
|
||||||
|
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import "io"
|
||||||
|
|
||||||
|
func fsCreateFile(disk StorageAPI, reader io.Reader, buf []byte, tmpBucket, tempObj string) (int64, error) {
|
||||||
|
bytesWritten := int64(0)
|
||||||
|
// Read the buffer till io.EOF and append the read data to the temporary file.
|
||||||
|
for {
|
||||||
|
n, rErr := reader.Read(buf)
|
||||||
|
if rErr != nil && rErr != io.EOF {
|
||||||
|
return 0, rErr
|
||||||
|
}
|
||||||
|
bytesWritten += int64(n)
|
||||||
|
if n > 0 {
|
||||||
|
wErr := disk.AppendFile(tmpBucket, tempObj, buf[0:n])
|
||||||
|
if wErr != nil {
|
||||||
|
return 0, wErr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if rErr == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return bytesWritten, nil
|
||||||
|
}
|
@ -314,32 +314,22 @@ func (fs fsObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
|
|||||||
limitDataReader = data
|
limitDataReader = data
|
||||||
}
|
}
|
||||||
|
|
||||||
// Allocate buffer for staging buffer.
|
teeReader := io.TeeReader(limitDataReader, md5Writer)
|
||||||
bufSize := int64(readSizeV1)
|
bufSize := int64(readSizeV1)
|
||||||
if size > 0 && bufSize > size {
|
if size > 0 && bufSize > size {
|
||||||
bufSize = size
|
bufSize = size
|
||||||
}
|
}
|
||||||
var buf = make([]byte, int(bufSize))
|
buf := make([]byte, int(bufSize))
|
||||||
|
bytesWritten, cErr := fsCreateFile(fs.storage, teeReader, buf, minioMetaBucket, tmpPartPath)
|
||||||
// Read up to required size
|
if cErr != nil {
|
||||||
totalLeft := size
|
fs.storage.DeleteFile(minioMetaBucket, tmpPartPath)
|
||||||
for {
|
return "", toObjectErr(cErr, minioMetaBucket, tmpPartPath)
|
||||||
n, err := io.ReadFull(limitDataReader, buf)
|
}
|
||||||
if err == io.EOF {
|
// Should return IncompleteBody{} error when reader has fewer
|
||||||
break
|
// bytes than specified in request header.
|
||||||
}
|
if bytesWritten < size {
|
||||||
if err != nil && err != io.ErrUnexpectedEOF {
|
fs.storage.DeleteFile(minioMetaBucket, tmpPartPath)
|
||||||
return "", toObjectErr(err, bucket, object)
|
return "", IncompleteBody{}
|
||||||
}
|
|
||||||
// Update md5 writer.
|
|
||||||
md5Writer.Write(buf[:n])
|
|
||||||
if err = fs.storage.AppendFile(minioMetaBucket, tmpPartPath, buf[:n]); err != nil {
|
|
||||||
return "", toObjectErr(err, bucket, object)
|
|
||||||
}
|
|
||||||
|
|
||||||
if totalLeft -= int64(n); size >= 0 && totalLeft <= 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate if payload is valid.
|
// Validate if payload is valid.
|
||||||
|
30
fs-v1.go
30
fs-v1.go
@ -369,30 +369,24 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
|
|||||||
return "", toObjectErr(err, bucket, object)
|
return "", toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Allocate a buffer to Read() the object upload stream.
|
// Allocate a buffer to Read() from request body
|
||||||
bufSize := int64(readSizeV1)
|
bufSize := int64(readSizeV1)
|
||||||
if size > 0 && bufSize > size {
|
if size > 0 && bufSize > size {
|
||||||
bufSize = size
|
bufSize = size
|
||||||
}
|
}
|
||||||
buf := make([]byte, int(bufSize))
|
buf := make([]byte, int(bufSize))
|
||||||
|
teeReader := io.TeeReader(limitDataReader, md5Writer)
|
||||||
|
bytesWritten, err := fsCreateFile(fs.storage, teeReader, buf, minioMetaBucket, tempObj)
|
||||||
|
if err != nil {
|
||||||
|
fs.storage.DeleteFile(minioMetaBucket, tempObj)
|
||||||
|
return "", toObjectErr(err, bucket, object)
|
||||||
|
}
|
||||||
|
|
||||||
// Read the buffer till io.EOF and append the read data to the temporary file.
|
// Should return IncompleteBody{} error when reader has fewer
|
||||||
for {
|
// bytes than specified in request header.
|
||||||
n, rErr := limitDataReader.Read(buf)
|
if bytesWritten < size {
|
||||||
if rErr != nil && rErr != io.EOF {
|
fs.storage.DeleteFile(minioMetaBucket, tempObj)
|
||||||
return "", toObjectErr(rErr, bucket, object)
|
return "", IncompleteBody{}
|
||||||
}
|
|
||||||
if n > 0 {
|
|
||||||
// Update md5 writer.
|
|
||||||
md5Writer.Write(buf[0:n])
|
|
||||||
wErr := fs.storage.AppendFile(minioMetaBucket, tempObj, buf[0:n])
|
|
||||||
if wErr != nil {
|
|
||||||
return "", toObjectErr(wErr, bucket, object)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if rErr == io.EOF {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -265,7 +265,7 @@ func testObjectAPIPutObjectPart(obj ObjectLayer, instanceType string, t TestErrH
|
|||||||
// Test case - 13.
|
// Test case - 13.
|
||||||
// Input with size more than the size of actual data inside the reader.
|
// Input with size more than the size of actual data inside the reader.
|
||||||
{bucket, object, uploadID, 1, "abcd", "a35", int64(len("abcd") + 1), false, "",
|
{bucket, object, uploadID, 1, "abcd", "a35", int64(len("abcd") + 1), false, "",
|
||||||
fmt.Errorf("%s", "Bad digest: Expected a35 is not valid with what we calculated e2fc714c4727ee9395f324cd2e7f331f")},
|
IncompleteBody{}},
|
||||||
// Test case - 14.
|
// Test case - 14.
|
||||||
// Input with size less than the size of actual data inside the reader.
|
// Input with size less than the size of actual data inside the reader.
|
||||||
{bucket, object, uploadID, 1, "abcd", "a35", int64(len("abcd") - 1), false, "",
|
{bucket, object, uploadID, 1, "abcd", "a35", int64(len("abcd") - 1), false, "",
|
||||||
|
@ -84,7 +84,7 @@ func testObjectAPIPutObject(obj ObjectLayer, instanceType string, t TestErrHandl
|
|||||||
// Test case - 8.
|
// Test case - 8.
|
||||||
// Input with size more than the size of actual data inside the reader.
|
// Input with size more than the size of actual data inside the reader.
|
||||||
{bucket, object, "abcd", map[string]string{"md5Sum": "a35"}, int64(len("abcd") + 1), false, "",
|
{bucket, object, "abcd", map[string]string{"md5Sum": "a35"}, int64(len("abcd") + 1), false, "",
|
||||||
fmt.Errorf("%s", "Bad digest: Expected a35 is not valid with what we calculated e2fc714c4727ee9395f324cd2e7f331f")},
|
IncompleteBody{}},
|
||||||
// Test case - 9.
|
// Test case - 9.
|
||||||
// Input with size less than the size of actual data inside the reader.
|
// Input with size less than the size of actual data inside the reader.
|
||||||
{bucket, object, "abcd", map[string]string{"md5Sum": "a35"}, int64(len("abcd") - 1), false, "",
|
{bucket, object, "abcd", map[string]string{"md5Sum": "a35"}, int64(len("abcd") - 1), false, "",
|
||||||
|
@ -385,6 +385,14 @@ func (xl xlObjects) PutObjectPart(bucket, object, uploadID string, partID int, s
|
|||||||
|
|
||||||
// Erasure code data and write across all disks.
|
// Erasure code data and write across all disks.
|
||||||
sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaBucket, tmpPartPath, teeReader, xlMeta.Erasure.BlockSize, xl.dataBlocks, xl.parityBlocks, xl.writeQuorum)
|
sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaBucket, tmpPartPath, teeReader, xlMeta.Erasure.BlockSize, xl.dataBlocks, xl.parityBlocks, xl.writeQuorum)
|
||||||
|
if err != nil {
|
||||||
|
return "", toObjectErr(err, minioMetaBucket, tmpPartPath)
|
||||||
|
}
|
||||||
|
// Should return IncompleteBody{} error when reader has fewer bytes
|
||||||
|
// than specified in request header.
|
||||||
|
if sizeWritten < size {
|
||||||
|
return "", IncompleteBody{}
|
||||||
|
}
|
||||||
|
|
||||||
// For size == -1, perhaps client is sending in chunked encoding
|
// For size == -1, perhaps client is sending in chunked encoding
|
||||||
// set the size as size that was actually written.
|
// set the size as size that was actually written.
|
||||||
|
@ -117,7 +117,8 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
|
|||||||
// Object cache enabled block.
|
// Object cache enabled block.
|
||||||
if xlMeta.Stat.Size > 0 && xl.objCacheEnabled {
|
if xlMeta.Stat.Size > 0 && xl.objCacheEnabled {
|
||||||
// Validate if we have previous cache.
|
// Validate if we have previous cache.
|
||||||
cachedBuffer, err := xl.objCache.Open(path.Join(bucket, object))
|
var cachedBuffer io.ReadSeeker
|
||||||
|
cachedBuffer, err = xl.objCache.Open(path.Join(bucket, object))
|
||||||
if err == nil { // Cache hit.
|
if err == nil { // Cache hit.
|
||||||
// Advance the buffer to offset as if it was read.
|
// Advance the buffer to offset as if it was read.
|
||||||
if _, err = cachedBuffer.Seek(startOffset, 0); err != nil { // Seek to the offset.
|
if _, err = cachedBuffer.Seek(startOffset, 0); err != nil { // Seek to the offset.
|
||||||
@ -435,8 +436,17 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
|
|||||||
// Erasure code data and write across all disks.
|
// Erasure code data and write across all disks.
|
||||||
sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaBucket, tempErasureObj, teeReader, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, xl.writeQuorum)
|
sizeWritten, checkSums, err := erasureCreateFile(onlineDisks, minioMetaBucket, tempErasureObj, teeReader, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, xlMeta.Erasure.ParityBlocks, xl.writeQuorum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// Create file failed, delete temporary object.
|
||||||
|
xl.deleteObject(minioMetaTmpBucket, tempObj)
|
||||||
return "", toObjectErr(err, minioMetaBucket, tempErasureObj)
|
return "", toObjectErr(err, minioMetaBucket, tempErasureObj)
|
||||||
}
|
}
|
||||||
|
// Should return IncompleteBody{} error when reader has fewer bytes
|
||||||
|
// than specified in request header.
|
||||||
|
if sizeWritten < size {
|
||||||
|
// Short write, delete temporary object.
|
||||||
|
xl.deleteObject(minioMetaTmpBucket, tempObj)
|
||||||
|
return "", IncompleteBody{}
|
||||||
|
}
|
||||||
|
|
||||||
// For size == -1, perhaps client is sending in chunked encoding
|
// For size == -1, perhaps client is sending in chunked encoding
|
||||||
// set the size as size that was actually written.
|
// set the size as size that was actually written.
|
||||||
@ -490,6 +500,8 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
|
|||||||
// Check if an object is present as one of the parent dir.
|
// Check if an object is present as one of the parent dir.
|
||||||
// -- FIXME. (needs a new kind of lock).
|
// -- FIXME. (needs a new kind of lock).
|
||||||
if xl.parentDirIsObject(bucket, path.Dir(object)) {
|
if xl.parentDirIsObject(bucket, path.Dir(object)) {
|
||||||
|
// Parent (in the namespace) is an object, delete temporary object.
|
||||||
|
xl.deleteObject(minioMetaTmpBucket, tempObj)
|
||||||
return "", toObjectErr(errFileAccessDenied, bucket, object)
|
return "", toObjectErr(errFileAccessDenied, bucket, object)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,7 +45,7 @@ func TestRepeatPutObjectPart(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
fiveMBBytes := bytes.Repeat([]byte("a"), 5*1024*124)
|
fiveMBBytes := bytes.Repeat([]byte("a"), 5*1024*1024)
|
||||||
md5Writer := md5.New()
|
md5Writer := md5.New()
|
||||||
md5Writer.Write(fiveMBBytes)
|
md5Writer.Write(fiveMBBytes)
|
||||||
md5Hex := hex.EncodeToString(md5Writer.Sum(nil))
|
md5Hex := hex.EncodeToString(md5Writer.Sum(nil))
|
||||||
|
Loading…
x
Reference in New Issue
Block a user