mirror of
https://github.com/minio/minio.git
synced 2025-11-09 21:49:46 -05:00
refactor ObjectLayer PutObject and PutObjectPart (#4925)
This change refactor the ObjectLayer PutObject and PutObjectPart functions. Instead of passing an io.Reader and a size to PUT operations ObejectLayer expects an HashReader. A HashReader verifies the MD5 sum (and SHA256 sum if required) of the object. This change updates all all PutObject(Part) calls and removes unnecessary code in all ObjectLayer implementations. Fixes #4923
This commit is contained in:
committed by
Dee Koder
parent
f8024cadbb
commit
79ba4d3f33
@@ -17,9 +17,7 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"encoding/hex"
|
||||
"hash"
|
||||
"io"
|
||||
"path"
|
||||
"strconv"
|
||||
@@ -29,7 +27,6 @@ import (
|
||||
"github.com/minio/minio/pkg/bpool"
|
||||
"github.com/minio/minio/pkg/mimedb"
|
||||
"github.com/minio/minio/pkg/objcache"
|
||||
"github.com/minio/sha256-simd"
|
||||
)
|
||||
|
||||
// list all errors which can be ignored in object operations.
|
||||
@@ -117,7 +114,7 @@ func (xl xlObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string
|
||||
pipeWriter.Close() // Close writer explicitly signalling we wrote all data.
|
||||
}()
|
||||
|
||||
objInfo, err := xl.PutObject(dstBucket, dstObject, length, pipeReader, metadata, "")
|
||||
objInfo, err := xl.PutObject(dstBucket, dstObject, NewHashReader(pipeReader, length, metadata["etag"], ""), metadata)
|
||||
if err != nil {
|
||||
return oi, toObjectErr(err, dstBucket, dstObject)
|
||||
}
|
||||
@@ -432,18 +429,18 @@ func renameObject(disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject
|
||||
// until EOF, erasure codes the data across all disk and additionally
|
||||
// writes `xl.json` which carries the necessary metadata for future
|
||||
// object operations.
|
||||
func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string, sha256sum string) (objInfo ObjectInfo, err error) {
|
||||
func (xl xlObjects) PutObject(bucket string, object string, data *HashReader, metadata map[string]string) (objInfo ObjectInfo, err error) {
|
||||
// This is a special case with size as '0' and object ends with
|
||||
// a slash separator, we treat it like a valid operation and
|
||||
// return success.
|
||||
if isObjectDir(object, size) {
|
||||
if isObjectDir(object, data.Size()) {
|
||||
// Check if an object is present as one of the parent dir.
|
||||
// -- FIXME. (needs a new kind of lock).
|
||||
// -- FIXME (this also causes performance issue when disks are down).
|
||||
if xl.parentDirIsObject(bucket, path.Dir(object)) {
|
||||
return ObjectInfo{}, toObjectErr(traceError(errFileAccessDenied), bucket, object)
|
||||
}
|
||||
return dirObjectInfo(bucket, object, size, metadata), nil
|
||||
return dirObjectInfo(bucket, object, data.Size(), metadata), nil
|
||||
}
|
||||
|
||||
// Validate put object input args.
|
||||
@@ -466,54 +463,27 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
|
||||
uniqueID := mustGetUUID()
|
||||
tempObj := uniqueID
|
||||
|
||||
// Initialize md5 writer.
|
||||
md5Writer := md5.New()
|
||||
|
||||
writers := []io.Writer{md5Writer}
|
||||
|
||||
var sha256Writer hash.Hash
|
||||
if sha256sum != "" {
|
||||
sha256Writer = sha256.New()
|
||||
writers = append(writers, sha256Writer)
|
||||
}
|
||||
// Limit the reader to its provided size if specified.
|
||||
var reader io.Reader = data
|
||||
|
||||
// Proceed to set the cache.
|
||||
var newBuffer io.WriteCloser
|
||||
|
||||
// If caching is enabled, proceed to set the cache.
|
||||
if size > 0 && xl.objCacheEnabled {
|
||||
if data.Size() > 0 && xl.objCacheEnabled {
|
||||
// PutObject invalidates any previously cached object in memory.
|
||||
xl.objCache.Delete(path.Join(bucket, object))
|
||||
|
||||
// Create a new entry in memory of size.
|
||||
newBuffer, err = xl.objCache.Create(path.Join(bucket, object), size)
|
||||
if err == nil {
|
||||
// Create a multi writer to write to both memory and client response.
|
||||
writers = append(writers, newBuffer)
|
||||
}
|
||||
newBuffer, err = xl.objCache.Create(path.Join(bucket, object), data.Size())
|
||||
// Ignore error if cache is full, proceed to write the object.
|
||||
if err != nil && err != objcache.ErrCacheFull {
|
||||
// For any other error return here.
|
||||
return ObjectInfo{}, toObjectErr(traceError(err), bucket, object)
|
||||
}
|
||||
reader = io.TeeReader(data, newBuffer)
|
||||
}
|
||||
|
||||
mw := io.MultiWriter(writers...)
|
||||
|
||||
// Limit the reader to its provided size if specified.
|
||||
var limitDataReader io.Reader
|
||||
if size > 0 {
|
||||
// This is done so that we can avoid erroneous clients sending
|
||||
// more data than the set content size.
|
||||
limitDataReader = io.LimitReader(data, size)
|
||||
} else {
|
||||
// else we read till EOF.
|
||||
limitDataReader = data
|
||||
}
|
||||
|
||||
// Tee reader combines incoming data stream and md5, data read from input stream is written to md5.
|
||||
teeReader := io.TeeReader(limitDataReader, mw)
|
||||
|
||||
// Initialize parts metadata
|
||||
partsMetadata := make([]xlMetaV1, len(xl.storageDisks))
|
||||
|
||||
@@ -550,7 +520,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
|
||||
// Calculate the size of the current part, if size is unknown, curPartSize wil be unknown too.
|
||||
// allowEmptyPart will always be true if this is the first part and false otherwise.
|
||||
var curPartSize int64
|
||||
curPartSize, err = getPartSizeFromIdx(size, globalPutPartSize, partIdx)
|
||||
curPartSize, err = getPartSizeFromIdx(data.Size(), globalPutPartSize, partIdx)
|
||||
if err != nil {
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
@@ -564,7 +534,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
|
||||
}
|
||||
}
|
||||
|
||||
file, erasureErr := storage.CreateFile(io.LimitReader(teeReader, globalPutPartSize), minioMetaTmpBucket, tempErasureObj, buffer, DefaultBitrotAlgorithm, xl.writeQuorum)
|
||||
file, erasureErr := storage.CreateFile(io.LimitReader(reader, globalPutPartSize), minioMetaTmpBucket, tempErasureObj, buffer, DefaultBitrotAlgorithm, xl.writeQuorum)
|
||||
if erasureErr != nil {
|
||||
return ObjectInfo{}, toObjectErr(erasureErr, minioMetaTmpBucket, tempErasureObj)
|
||||
}
|
||||
@@ -596,7 +566,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
|
||||
|
||||
// Check part size for the next index.
|
||||
var partSize int64
|
||||
partSize, err = getPartSizeFromIdx(size, globalPutPartSize, partIdx+1)
|
||||
partSize, err = getPartSizeFromIdx(data.Size(), globalPutPartSize, partIdx+1)
|
||||
if err != nil {
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
@@ -605,25 +575,17 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
|
||||
}
|
||||
}
|
||||
|
||||
// For size == -1, perhaps client is sending in chunked encoding
|
||||
// set the size as size that was actually written.
|
||||
if size == -1 {
|
||||
size = sizeWritten
|
||||
} else {
|
||||
// Check if stored data satisfies what is asked
|
||||
if sizeWritten < size {
|
||||
return ObjectInfo{}, traceError(IncompleteBody{})
|
||||
}
|
||||
if size := data.Size(); size > 0 && sizeWritten < data.Size() {
|
||||
return ObjectInfo{}, traceError(IncompleteBody{})
|
||||
}
|
||||
|
||||
// Save additional erasureMetadata.
|
||||
modTime := UTCNow()
|
||||
|
||||
newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil))
|
||||
// Update the md5sum if not set with the newly calculated one.
|
||||
if len(metadata["etag"]) == 0 {
|
||||
metadata["etag"] = newMD5Hex
|
||||
if err = data.Verify(); err != nil {
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
metadata["etag"] = hex.EncodeToString(data.MD5())
|
||||
|
||||
// Guess content-type from the extension if possible.
|
||||
if metadata["content-type"] == "" {
|
||||
@@ -634,22 +596,6 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
|
||||
}
|
||||
}
|
||||
|
||||
// md5Hex representation.
|
||||
md5Hex := metadata["etag"]
|
||||
if md5Hex != "" {
|
||||
if newMD5Hex != md5Hex {
|
||||
// Returns md5 mismatch.
|
||||
return ObjectInfo{}, traceError(BadDigest{md5Hex, newMD5Hex})
|
||||
}
|
||||
}
|
||||
|
||||
if sha256sum != "" {
|
||||
newSHA256sum := hex.EncodeToString(sha256Writer.Sum(nil))
|
||||
if newSHA256sum != sha256sum {
|
||||
return ObjectInfo{}, traceError(SHA256Mismatch{})
|
||||
}
|
||||
}
|
||||
|
||||
if xl.isObject(bucket, object) {
|
||||
// Rename if an object already exists to temporary location.
|
||||
newUniqueID := mustGetUUID()
|
||||
@@ -670,7 +616,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
|
||||
// Update `xl.json` content on each disks.
|
||||
for index := range partsMetadata {
|
||||
partsMetadata[index].Meta = metadata
|
||||
partsMetadata[index].Stat.Size = size
|
||||
partsMetadata[index].Stat.Size = sizeWritten
|
||||
partsMetadata[index].Stat.ModTime = modTime
|
||||
}
|
||||
|
||||
@@ -686,7 +632,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
|
||||
|
||||
// Once we have successfully renamed the object, Close the buffer which would
|
||||
// save the object on cache.
|
||||
if size > 0 && xl.objCacheEnabled && newBuffer != nil {
|
||||
if sizeWritten > 0 && xl.objCacheEnabled && newBuffer != nil {
|
||||
newBuffer.Close()
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user