mirror of
https://github.com/minio/minio.git
synced 2025-01-11 23:13:23 -05:00
Handle couple of cases of OOM conditions, move caching to GetObject() rather than PutObject()
This commit is contained in:
parent
d07d0c670a
commit
3109909355
@ -325,10 +325,16 @@ func (d donutDriver) GetObject(w io.Writer, bucketName, objectName string) (int6
|
||||
return 0, iodine.New(drivers.InternalError{}, nil)
|
||||
}
|
||||
}
|
||||
n, err := io.CopyN(w, reader, size)
|
||||
pw := newProxyWriter(w)
|
||||
n, err := io.CopyN(pw, reader, size)
|
||||
if err != nil {
|
||||
return 0, iodine.New(err, nil)
|
||||
}
|
||||
// Save in memory for future reads
|
||||
d.objects.Set(objectKey, pw.writtenBytes)
|
||||
// free up
|
||||
pw.writtenBytes = nil
|
||||
go debug.FreeOSMemory()
|
||||
return n, nil
|
||||
}
|
||||
written, err := io.Copy(w, bytes.NewBuffer(data))
|
||||
@ -492,31 +498,22 @@ func (d donutDriver) ListObjects(bucketName string, resources drivers.BucketReso
|
||||
return results, resources, nil
|
||||
}
|
||||
|
||||
type proxyReader struct {
|
||||
reader io.Reader
|
||||
readBytes []byte
|
||||
type proxyWriter struct {
|
||||
writer io.Writer
|
||||
writtenBytes []byte
|
||||
}
|
||||
|
||||
func (r *proxyReader) free(p []byte) {
|
||||
p = nil
|
||||
go debug.FreeOSMemory()
|
||||
}
|
||||
func (r *proxyReader) Read(p []byte) (n int, err error) {
|
||||
defer r.free(p)
|
||||
n, err = r.reader.Read(p)
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
r.readBytes = append(r.readBytes, p[0:n]...)
|
||||
return
|
||||
}
|
||||
func (r *proxyWriter) Write(p []byte) (n int, err error) {
|
||||
n, err = r.writer.Write(p)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
r.readBytes = append(r.readBytes, p[0:n]...)
|
||||
r.writtenBytes = append(r.writtenBytes, p[0:n]...)
|
||||
return
|
||||
}
|
||||
|
||||
func newProxyReader(r io.Reader) *proxyReader {
|
||||
return &proxyReader{reader: r, readBytes: nil}
|
||||
func newProxyWriter(w io.Writer) *proxyWriter {
|
||||
return &proxyWriter{writer: w, writtenBytes: nil}
|
||||
}
|
||||
|
||||
// CreateObject creates a new object
|
||||
@ -565,8 +562,7 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM
|
||||
}
|
||||
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
|
||||
}
|
||||
newReader := newProxyReader(reader)
|
||||
objMetadata, err := d.donut.PutObject(bucketName, objectName, expectedMD5Sum, newReader, metadata)
|
||||
objMetadata, err := d.donut.PutObject(bucketName, objectName, expectedMD5Sum, reader, metadata)
|
||||
if err != nil {
|
||||
switch iodine.ToError(err).(type) {
|
||||
case donut.BadDigest:
|
||||
@ -574,10 +570,6 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM
|
||||
}
|
||||
return "", iodine.New(err, errParams)
|
||||
}
|
||||
d.objects.Set(objectKey, newReader.readBytes)
|
||||
// free up
|
||||
newReader.readBytes = nil
|
||||
go debug.FreeOSMemory()
|
||||
newObject := drivers.ObjectMetadata{
|
||||
Bucket: bucketName,
|
||||
Key: objectName,
|
||||
|
@ -193,6 +193,7 @@ func (d donutDriver) createObjectPart(bucketName, objectName, uploadID string, p
|
||||
d.lock.Unlock()
|
||||
// setting up for de-allocation
|
||||
readBytes = nil
|
||||
go debug.FreeOSMemory()
|
||||
|
||||
md5Sum := hex.EncodeToString(md5SumBytes)
|
||||
// Verify if the written object is equal to what is expected, only if it is requested as such
|
||||
@ -258,6 +259,7 @@ func (d donutDriver) CompleteMultipartUpload(bucketName, objectName, uploadID st
|
||||
|
||||
d.lock.Lock()
|
||||
var size int64
|
||||
fullHasher := md5.New()
|
||||
var fullObject bytes.Buffer
|
||||
for i := 1; i <= len(parts); i++ {
|
||||
recvMD5 := parts[i]
|
||||
@ -280,7 +282,8 @@ func (d donutDriver) CompleteMultipartUpload(bucketName, objectName, uploadID st
|
||||
Key: getMultipartKey(objectName, uploadID, i),
|
||||
}, nil)
|
||||
}
|
||||
_, err = io.Copy(&fullObject, bytes.NewBuffer(object))
|
||||
mw := io.MultiWriter(&fullObject, fullHasher)
|
||||
_, err = io.Copy(mw, bytes.NewReader(object))
|
||||
if err != nil {
|
||||
return "", iodine.New(err, nil)
|
||||
}
|
||||
@ -289,9 +292,9 @@ func (d donutDriver) CompleteMultipartUpload(bucketName, objectName, uploadID st
|
||||
}
|
||||
d.lock.Unlock()
|
||||
|
||||
md5sumSlice := md5.Sum(fullObject.Bytes())
|
||||
md5sumSlice := fullHasher.Sum(nil)
|
||||
// this is needed for final verification inside CreateObject, do not convert this to hex
|
||||
md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:])
|
||||
md5sum := base64.StdEncoding.EncodeToString(md5sumSlice)
|
||||
etag, err := d.CreateObject(bucketName, objectName, "", md5sum, size, &fullObject)
|
||||
if err != nil {
|
||||
// No need to call internal cleanup functions here, caller will call AbortMultipartUpload()
|
||||
@ -299,6 +302,8 @@ func (d donutDriver) CompleteMultipartUpload(bucketName, objectName, uploadID st
|
||||
return "", iodine.New(err, nil)
|
||||
}
|
||||
fullObject.Reset()
|
||||
go debug.FreeOSMemory()
|
||||
|
||||
d.cleanupMultiparts(bucketName, objectName, uploadID)
|
||||
d.cleanupMultipartSession(bucketName, objectName, uploadID)
|
||||
return etag, nil
|
||||
@ -421,5 +426,5 @@ func (d donutDriver) expiredPart(a ...interface{}) {
|
||||
for _, storedBucket := range d.storedBuckets {
|
||||
delete(storedBucket.partMetadata, key)
|
||||
}
|
||||
debug.FreeOSMemory()
|
||||
go debug.FreeOSMemory()
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user