mirror of
https://github.com/minio/minio.git
synced 2025-01-12 15:33:22 -05:00
FS/PutObject: Read() data should be handled even in case of EOF. (#1864)
Fixes #1710
This commit is contained in:
parent
51f3d4e0ca
commit
1b9db9ee6c
22
fs-v1.go
22
fs-v1.go
@ -215,7 +215,9 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
|
|||||||
|
|
||||||
uniqueID := getUUID()
|
uniqueID := getUUID()
|
||||||
|
|
||||||
// Temporary object.
|
// Uploaded object will first be written to the temporary location which will eventually
|
||||||
|
// be renamed to the actual location. It is first written to the temporary location
|
||||||
|
// so that cleaning it up will be easy if the server goes down.
|
||||||
tempObj := path.Join(tmpMetaPrefix, uniqueID)
|
tempObj := path.Join(tmpMetaPrefix, uniqueID)
|
||||||
|
|
||||||
// Initialize md5 writer.
|
// Initialize md5 writer.
|
||||||
@ -228,26 +230,30 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
|
|||||||
return "", toObjectErr(err, bucket, object)
|
return "", toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Allocate buffer.
|
// Allocate a buffer to Read() the object upload stream.
|
||||||
buf := make([]byte, blockSizeV1)
|
buf := make([]byte, blockSizeV1)
|
||||||
|
// Read the buffer till io.EOF and append the read data to
|
||||||
|
// the temporary file.
|
||||||
for {
|
for {
|
||||||
n, rErr := data.Read(buf)
|
n, rErr := data.Read(buf)
|
||||||
if rErr == io.EOF {
|
if rErr != nil && rErr != io.EOF {
|
||||||
break
|
|
||||||
}
|
|
||||||
if rErr != nil {
|
|
||||||
return "", toObjectErr(rErr, bucket, object)
|
return "", toObjectErr(rErr, bucket, object)
|
||||||
}
|
}
|
||||||
|
if n > 0 {
|
||||||
// Update md5 writer.
|
// Update md5 writer.
|
||||||
md5Writer.Write(buf[:n])
|
md5Writer.Write(buf[:n])
|
||||||
m, wErr := fs.storage.AppendFile(minioMetaBucket, tempObj, buf[:n])
|
m, wErr := fs.storage.AppendFile(minioMetaBucket, tempObj, buf[:n])
|
||||||
if wErr != nil {
|
if wErr != nil {
|
||||||
return "", toObjectErr(wErr, bucket, object)
|
return "", toObjectErr(wErr, bucket, object)
|
||||||
}
|
}
|
||||||
if m != int64(len(buf[:n])) {
|
if m != int64(n) {
|
||||||
return "", toObjectErr(errUnexpected, bucket, object)
|
return "", toObjectErr(errUnexpected, bucket, object)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if rErr == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil))
|
newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil))
|
||||||
@ -262,6 +268,8 @@ func (fs fsObjects) PutObject(bucket string, object string, size int64, data io.
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Entire object was written to the temp location, now it's safe to rename it
|
||||||
|
// to the actual location.
|
||||||
err := fs.storage.RenameFile(minioMetaBucket, tempObj, bucket, object)
|
err := fs.storage.RenameFile(minioMetaBucket, tempObj, bucket, object)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", toObjectErr(err, bucket, object)
|
return "", toObjectErr(err, bucket, object)
|
||||||
|
@ -20,12 +20,54 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"crypto/md5"
|
"crypto/md5"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
|
"io"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"gopkg.in/check.v1"
|
"gopkg.in/check.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Return pointer to testOneByteReadEOF{}
|
||||||
|
func newTestReaderEOF(data []byte) io.Reader {
|
||||||
|
return &testOneByteReadEOF{false, data}
|
||||||
|
}
|
||||||
|
|
||||||
|
// OneByteReadEOF - implements io.Reader which returns 1 byte along with io.EOF error.
|
||||||
|
type testOneByteReadEOF struct {
|
||||||
|
eof bool
|
||||||
|
data []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *testOneByteReadEOF) Read(p []byte) (n int, err error) {
|
||||||
|
if r.eof {
|
||||||
|
return 0, io.EOF
|
||||||
|
}
|
||||||
|
n = copy(p, r.data)
|
||||||
|
r.eof = true
|
||||||
|
return n, io.EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return pointer to testOneByteReadNoEOF{}
|
||||||
|
func newTestReaderNoEOF(data []byte) io.Reader {
|
||||||
|
return &testOneByteReadNoEOF{false, data}
|
||||||
|
}
|
||||||
|
|
||||||
|
// testOneByteReadNoEOF - implements io.Reader which returns 1 byte and nil error, but
|
||||||
|
// returns io.EOF on the next Read().
|
||||||
|
type testOneByteReadNoEOF struct {
|
||||||
|
eof bool
|
||||||
|
data []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *testOneByteReadNoEOF) Read(p []byte) (n int, err error) {
|
||||||
|
if r.eof {
|
||||||
|
return 0, io.EOF
|
||||||
|
}
|
||||||
|
n = copy(p, r.data)
|
||||||
|
r.eof = true
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
// APITestSuite - collection of API tests.
|
// APITestSuite - collection of API tests.
|
||||||
func APITestSuite(c *check.C, create func() ObjectLayer) {
|
func APITestSuite(c *check.C, create func() ObjectLayer) {
|
||||||
testMakeBucket(c, create)
|
testMakeBucket(c, create)
|
||||||
@ -34,6 +76,7 @@ func APITestSuite(c *check.C, create func() ObjectLayer) {
|
|||||||
testObjectOverwriteWorks(c, create)
|
testObjectOverwriteWorks(c, create)
|
||||||
testNonExistantBucketOperations(c, create)
|
testNonExistantBucketOperations(c, create)
|
||||||
testBucketRecreateFails(c, create)
|
testBucketRecreateFails(c, create)
|
||||||
|
testPutObject(c, create)
|
||||||
testPutObjectInSubdir(c, create)
|
testPutObjectInSubdir(c, create)
|
||||||
testListBuckets(c, create)
|
testListBuckets(c, create)
|
||||||
testListBucketsOrder(c, create)
|
testListBucketsOrder(c, create)
|
||||||
@ -291,6 +334,31 @@ func testBucketRecreateFails(c *check.C, create func() ObjectLayer) {
|
|||||||
c.Assert(err.Error(), check.Equals, "Bucket exists: string")
|
c.Assert(err.Error(), check.Equals, "Bucket exists: string")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Tests validate PutObject without prefix.
|
||||||
|
func testPutObject(c *check.C, create func() ObjectLayer) {
|
||||||
|
obj := create()
|
||||||
|
content := []byte("testcontent")
|
||||||
|
length := int64(len(content))
|
||||||
|
readerEOF := newTestReaderEOF(content)
|
||||||
|
readerNoEOF := newTestReaderNoEOF(content)
|
||||||
|
err := obj.MakeBucket("bucket")
|
||||||
|
c.Assert(err, check.IsNil)
|
||||||
|
|
||||||
|
var bytesBuffer1 bytes.Buffer
|
||||||
|
_, err = obj.PutObject("bucket", "object", length, readerEOF, nil)
|
||||||
|
c.Assert(err, check.IsNil)
|
||||||
|
err = obj.GetObject("bucket", "object", 0, length, &bytesBuffer1)
|
||||||
|
c.Assert(err, check.IsNil)
|
||||||
|
c.Assert(len(bytesBuffer1.Bytes()), check.Equals, len(content))
|
||||||
|
|
||||||
|
var bytesBuffer2 bytes.Buffer
|
||||||
|
_, err = obj.PutObject("bucket", "object", length, readerNoEOF, nil)
|
||||||
|
c.Assert(err, check.IsNil)
|
||||||
|
err = obj.GetObject("bucket", "object", 0, length, &bytesBuffer2)
|
||||||
|
c.Assert(err, check.IsNil)
|
||||||
|
c.Assert(len(bytesBuffer2.Bytes()), check.Equals, len(content))
|
||||||
|
}
|
||||||
|
|
||||||
// Tests validate PutObject with subdirectory prefix.
|
// Tests validate PutObject with subdirectory prefix.
|
||||||
func testPutObjectInSubdir(c *check.C, create func() ObjectLayer) {
|
func testPutObjectInSubdir(c *check.C, create func() ObjectLayer) {
|
||||||
obj := create()
|
obj := create()
|
||||||
|
Loading…
Reference in New Issue
Block a user