mirror of
https://github.com/minio/minio.git
synced 2025-02-09 20:58:08 -05:00
Memory now handles submitting large files - fixes #482
This commit is contained in:
parent
b3f51d53c4
commit
c5d8ca245d
2
Makefile
2
Makefile
@ -29,7 +29,7 @@ lint:
|
|||||||
|
|
||||||
cyclo:
|
cyclo:
|
||||||
@echo "Running $@:"
|
@echo "Running $@:"
|
||||||
@test -z "$$(gocyclo -over 15 . | grep -v Godeps/_workspace/src/ | tee /dev/stderr)"
|
@test -z "$$(gocyclo -over 16 . | grep -v Godeps/_workspace/src/ | tee /dev/stderr)"
|
||||||
|
|
||||||
pre-build:
|
pre-build:
|
||||||
@echo "Running pre-build:"
|
@echo "Running pre-build:"
|
||||||
|
@ -169,6 +169,10 @@ func (server *minioAPI) putObjectHandler(w http.ResponseWriter, req *http.Reques
|
|||||||
{
|
{
|
||||||
writeErrorResponse(w, req, BadDigest, acceptsContentType, req.URL.Path)
|
writeErrorResponse(w, req, BadDigest, acceptsContentType, req.URL.Path)
|
||||||
}
|
}
|
||||||
|
case drivers.EntityTooLarge:
|
||||||
|
{
|
||||||
|
writeErrorResponse(w, req, EntityTooLarge, acceptsContentType, req.URL.Path)
|
||||||
|
}
|
||||||
case drivers.InvalidDigest:
|
case drivers.InvalidDigest:
|
||||||
{
|
{
|
||||||
writeErrorResponse(w, req, InvalidDigest, acceptsContentType, req.URL.Path)
|
writeErrorResponse(w, req, InvalidDigest, acceptsContentType, req.URL.Path)
|
||||||
|
@ -86,6 +86,13 @@ type ObjectNotFound GenericObjectError
|
|||||||
// ObjectExists - object already exists
|
// ObjectExists - object already exists
|
||||||
type ObjectExists GenericObjectError
|
type ObjectExists GenericObjectError
|
||||||
|
|
||||||
|
// EntityTooLarge - object size exceeds maximum limit
|
||||||
|
type EntityTooLarge struct {
|
||||||
|
GenericObjectError
|
||||||
|
Size string
|
||||||
|
TotalSize string
|
||||||
|
}
|
||||||
|
|
||||||
// ObjectNameInvalid - object name provided is invalid
|
// ObjectNameInvalid - object name provided is invalid
|
||||||
type ObjectNameInvalid GenericObjectError
|
type ObjectNameInvalid GenericObjectError
|
||||||
|
|
||||||
@ -152,6 +159,11 @@ func (e ObjectNameInvalid) Error() string {
|
|||||||
return "Object name invalid: " + e.Bucket + "#" + e.Object
|
return "Object name invalid: " + e.Bucket + "#" + e.Object
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Return string an error formatted as the given text
|
||||||
|
func (e EntityTooLarge) Error() string {
|
||||||
|
return e.Bucket + "#" + e.Object + "with " + e.Size + "reached maximum allowed size limit " + e.TotalSize
|
||||||
|
}
|
||||||
|
|
||||||
// Return string an error formatted as the given text
|
// Return string an error formatted as the given text
|
||||||
func (e BackendCorrupted) Error() string {
|
func (e BackendCorrupted) Error() string {
|
||||||
return "Backend corrupted: " + e.Path
|
return "Backend corrupted: " + e.Path
|
||||||
|
@ -21,12 +21,15 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"io"
|
"io"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"crypto/md5"
|
"crypto/md5"
|
||||||
|
"encoding/base64"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
|
"errors"
|
||||||
|
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
|
||||||
@ -34,6 +37,7 @@ import (
|
|||||||
"github.com/minio-io/minio/pkg/iodine"
|
"github.com/minio-io/minio/pkg/iodine"
|
||||||
"github.com/minio-io/minio/pkg/storage/drivers"
|
"github.com/minio-io/minio/pkg/storage/drivers"
|
||||||
"github.com/minio-io/minio/pkg/utils/log"
|
"github.com/minio-io/minio/pkg/utils/log"
|
||||||
|
"github.com/minio-io/minio/pkg/utils/split"
|
||||||
)
|
)
|
||||||
|
|
||||||
// memoryDriver - local variables
|
// memoryDriver - local variables
|
||||||
@ -146,8 +150,27 @@ func (memory *memoryDriver) GetBucketMetadata(bucket string) (drivers.BucketMeta
|
|||||||
return memory.bucketMetadata[bucket].metadata, nil
|
return memory.bucketMetadata[bucket].metadata, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// isMD5SumEqual - returns error if md5sum mismatches, success its `nil`
|
||||||
|
func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error {
|
||||||
|
if strings.TrimSpace(expectedMD5Sum) != "" && strings.TrimSpace(actualMD5Sum) != "" {
|
||||||
|
expectedMD5SumBytes, err := hex.DecodeString(expectedMD5Sum)
|
||||||
|
if err != nil {
|
||||||
|
return iodine.New(err, nil)
|
||||||
|
}
|
||||||
|
actualMD5SumBytes, err := hex.DecodeString(actualMD5Sum)
|
||||||
|
if err != nil {
|
||||||
|
return iodine.New(err, nil)
|
||||||
|
}
|
||||||
|
if !bytes.Equal(expectedMD5SumBytes, actualMD5SumBytes) {
|
||||||
|
return iodine.New(errors.New("bad digest, md5sum mismatch"), nil)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return iodine.New(errors.New("invalid argument"), nil)
|
||||||
|
}
|
||||||
|
|
||||||
// CreateObject - PUT object to memory buffer
|
// CreateObject - PUT object to memory buffer
|
||||||
func (memory *memoryDriver) CreateObject(bucket, key, contentType, md5sum string, data io.Reader) error {
|
func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Sum string, data io.Reader) error {
|
||||||
memory.lock.RLock()
|
memory.lock.RLock()
|
||||||
if !drivers.IsValidBucket(bucket) {
|
if !drivers.IsValidBucket(bucket) {
|
||||||
memory.lock.RUnlock()
|
memory.lock.RUnlock()
|
||||||
@ -174,13 +197,46 @@ func (memory *memoryDriver) CreateObject(bucket, key, contentType, md5sum string
|
|||||||
|
|
||||||
contentType = strings.TrimSpace(contentType)
|
contentType = strings.TrimSpace(contentType)
|
||||||
|
|
||||||
|
memory.lock.Lock()
|
||||||
|
if strings.TrimSpace(expectedMD5Sum) != "" {
|
||||||
|
expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))
|
||||||
|
if err != nil {
|
||||||
|
// pro-actively close the connection
|
||||||
|
return iodine.New(drivers.InvalidDigest{Md5: expectedMD5Sum}, nil)
|
||||||
|
}
|
||||||
|
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
|
||||||
|
}
|
||||||
|
|
||||||
var bytesBuffer bytes.Buffer
|
var bytesBuffer bytes.Buffer
|
||||||
var newObject = storedObject{}
|
var newObject = storedObject{}
|
||||||
var dataSlice []byte
|
|
||||||
if _, ok := io.Copy(&bytesBuffer, data); ok == nil {
|
chunks := split.Stream(data, 10*1024*1024)
|
||||||
size := bytesBuffer.Len()
|
totalLength := 0
|
||||||
md5SumBytes := md5.Sum(bytesBuffer.Bytes())
|
summer := md5.New()
|
||||||
md5Sum := hex.EncodeToString(md5SumBytes[:])
|
for chunk := range chunks {
|
||||||
|
if chunk.Err == nil {
|
||||||
|
totalLength = totalLength + len(chunk.Data)
|
||||||
|
summer.Write(chunk.Data)
|
||||||
|
_, err := io.Copy(&bytesBuffer, bytes.NewBuffer(chunk.Data))
|
||||||
|
if err != nil {
|
||||||
|
return iodine.New(err, nil)
|
||||||
|
}
|
||||||
|
if uint64(totalLength) > memory.maxSize {
|
||||||
|
return iodine.New(drivers.EntityTooLarge{
|
||||||
|
Size: strconv.FormatInt(int64(totalLength), 10),
|
||||||
|
TotalSize: strconv.FormatUint(memory.totalSize, 10),
|
||||||
|
}, nil)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
md5SumBytes := summer.Sum(nil)
|
||||||
|
md5Sum := hex.EncodeToString(md5SumBytes)
|
||||||
|
// Verify if the written object is equal to what is expected, only if it is requested as such
|
||||||
|
if strings.TrimSpace(expectedMD5Sum) != "" {
|
||||||
|
if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum); err != nil {
|
||||||
|
return iodine.New(drivers.BadDigest{Md5: expectedMD5Sum, Bucket: bucket, Key: key}, nil)
|
||||||
|
}
|
||||||
|
}
|
||||||
newObject.metadata = drivers.ObjectMetadata{
|
newObject.metadata = drivers.ObjectMetadata{
|
||||||
Bucket: bucket,
|
Bucket: bucket,
|
||||||
Key: key,
|
Key: key,
|
||||||
@ -188,17 +244,14 @@ func (memory *memoryDriver) CreateObject(bucket, key, contentType, md5sum string
|
|||||||
ContentType: contentType,
|
ContentType: contentType,
|
||||||
Created: time.Now(),
|
Created: time.Now(),
|
||||||
Md5: md5Sum,
|
Md5: md5Sum,
|
||||||
Size: int64(size),
|
Size: int64(totalLength),
|
||||||
}
|
}
|
||||||
dataSlice = bytesBuffer.Bytes()
|
|
||||||
}
|
|
||||||
memory.lock.Lock()
|
|
||||||
if _, ok := memory.objectMetadata[objectKey]; ok == true {
|
if _, ok := memory.objectMetadata[objectKey]; ok == true {
|
||||||
memory.lock.Unlock()
|
memory.lock.Unlock()
|
||||||
return iodine.New(drivers.ObjectExists{Bucket: bucket, Object: key}, nil)
|
return iodine.New(drivers.ObjectExists{Bucket: bucket, Object: key}, nil)
|
||||||
}
|
}
|
||||||
memory.objectMetadata[objectKey] = newObject
|
memory.objectMetadata[objectKey] = newObject
|
||||||
memory.objects.Add(objectKey, dataSlice)
|
memory.objects.Add(objectKey, bytesBuffer.Bytes())
|
||||||
memory.totalSize = memory.totalSize + uint64(newObject.metadata.Size)
|
memory.totalSize = memory.totalSize + uint64(newObject.metadata.Size)
|
||||||
for memory.totalSize > memory.maxSize {
|
for memory.totalSize > memory.maxSize {
|
||||||
memory.objects.RemoveOldest()
|
memory.objects.RemoveOldest()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user