From d07d0c670a9cf882a366fcb05ca129d515373e61 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 29 Jun 2015 11:42:58 -0700 Subject: [PATCH 1/2] Return back proper errors in writeObjectData(), rename few functions --- pkg/storage/donut/bucket.go | 51 +++++++++++++++++++------------------ 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/pkg/storage/donut/bucket.go b/pkg/storage/donut/bucket.go index db376c363..af5a1c7c8 100644 --- a/pkg/storage/donut/bucket.go +++ b/pkg/storage/donut/bucket.go @@ -212,7 +212,7 @@ func (b bucket) ReadObject(objectName string) (reader io.ReadCloser, size int64, return nil, 0, iodine.New(err, nil) } // read and reply back to GetObject() request in a go-routine - go b.readEncodedData(normalizeObjectName(objectName), writer, objMetadata) + go b.readObjectData(normalizeObjectName(objectName), writer, objMetadata) return reader, objMetadata.Size, nil } @@ -223,7 +223,7 @@ func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5 if objectName == "" || objectData == nil { return ObjectMetadata{}, iodine.New(InvalidArgument{}, nil) } - writers, err := b.getDiskWriters(normalizeObjectName(objectName), "data") + writers, err := b.getWriters(normalizeObjectName(objectName), "data") if err != nil { return ObjectMetadata{}, iodine.New(err, nil) } @@ -247,8 +247,8 @@ func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5 if err != nil { return ObjectMetadata{}, iodine.New(err, nil) } - // encoded data with k, m and write - chunkCount, totalLength, err := b.writeEncodedData(k, m, writers, objectData, sumMD5, sum512) + // write encoded data with k, m and writers + chunkCount, totalLength, err := b.writeObjectData(k, m, writers, objectData, sumMD5, sum512) if err != nil { return ObjectMetadata{}, iodine.New(err, nil) } @@ -310,7 +310,7 @@ func (b bucket) writeObjectMetadata(objectName string, objMetadata ObjectMetadat if objMetadata.Object == "" { return iodine.New(InvalidArgument{}, nil) } - objMetadataWriters, err := b.getDiskWriters(objectName, objectMetadataConfig) + objMetadataWriters, err := b.getWriters(objectName, objectMetadataConfig) if err != nil { return iodine.New(err, nil) } @@ -332,7 +332,7 @@ func (b bucket) readObjectMetadata(objectName string) (ObjectMetadata, error) { if objectName == "" { return ObjectMetadata{}, iodine.New(InvalidArgument{}, nil) } - objMetadataReaders, err := b.getDiskReaders(objectName, objectMetadataConfig) + objMetadataReaders, err := b.getReaders(objectName, objectMetadataConfig) if err != nil { return ObjectMetadata{}, iodine.New(err, nil) } @@ -378,8 +378,8 @@ func (b bucket) getDataAndParity(totalWriters int) (k uint8, m uint8, err error) return k, m, nil } -// writeEncodedData - -func (b bucket) writeEncodedData(k, m uint8, writers []io.WriteCloser, objectData io.Reader, sumMD5, sum512 hash.Hash) (int, int, error) { +// writeObjectData - +func (b bucket) writeObjectData(k, m uint8, writers []io.WriteCloser, objectData io.Reader, sumMD5, sum512 hash.Hash) (int, int, error) { encoder, err := newEncoder(k, m, "Cauchy") if err != nil { return 0, 0, iodine.New(err, nil) @@ -387,16 +387,17 @@ func (b bucket) writeEncodedData(k, m uint8, writers []io.WriteCloser, objectDat chunkCount := 0 totalLength := 0 for chunk := range split.Stream(objectData, 10*1024*1024) { - if chunk.Err == nil { - totalLength = totalLength + len(chunk.Data) - encodedBlocks, _ := encoder.Encode(chunk.Data) - sumMD5.Write(chunk.Data) - sum512.Write(chunk.Data) - for blockIndex, block := range encodedBlocks { - _, err := io.Copy(writers[blockIndex], bytes.NewBuffer(block)) - if err != nil { - return 0, 0, iodine.New(err, nil) - } + if chunk.Err != nil { + return 0, 0, iodine.New(err, nil) + } + totalLength = totalLength + len(chunk.Data) + encodedBlocks, _ := encoder.Encode(chunk.Data) + sumMD5.Write(chunk.Data) + sum512.Write(chunk.Data) + for blockIndex, block := range encodedBlocks { + _, err := io.Copy(writers[blockIndex], bytes.NewBuffer(block)) + if err != nil { + return 0, 0, iodine.New(err, nil) } } chunkCount = chunkCount + 1 @@ -404,9 +405,9 @@ func (b bucket) writeEncodedData(k, m uint8, writers []io.WriteCloser, objectDat return chunkCount, totalLength, nil } -// readEncodedData - -func (b bucket) readEncodedData(objectName string, writer *io.PipeWriter, objMetadata ObjectMetadata) { - readers, err := b.getDiskReaders(objectName, "data") +// readObjectData - +func (b bucket) readObjectData(objectName string, writer *io.PipeWriter, objMetadata ObjectMetadata) { + readers, err := b.getReaders(objectName, "data") if err != nil { writer.CloseWithError(iodine.New(err, nil)) return @@ -490,8 +491,8 @@ func (b bucket) decodeEncodedData(totalLeft, blockSize int64, readers []io.ReadC return decodedData, nil } -// getDiskReaders - -func (b bucket) getDiskReaders(objectName, objectMeta string) ([]io.ReadCloser, error) { +// getReaders - +func (b bucket) getReaders(objectName, objectMeta string) ([]io.ReadCloser, error) { var readers []io.ReadCloser nodeSlice := 0 for _, node := range b.nodes { @@ -514,8 +515,8 @@ func (b bucket) getDiskReaders(objectName, objectMeta string) ([]io.ReadCloser, return readers, nil } -// getDiskWriters - -func (b bucket) getDiskWriters(objectName, objectMeta string) ([]io.WriteCloser, error) { +// getWriters - +func (b bucket) getWriters(objectName, objectMeta string) ([]io.WriteCloser, error) { var writers []io.WriteCloser nodeSlice := 0 for _, node := range b.nodes { From 3109909355c99fdabbcd0f1133cbd499ec0ad309 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 29 Jun 2015 12:23:48 -0700 Subject: [PATCH 2/2] Handle couple of cases of OOM conditions, move caching to GetObject() rather than PutObject() --- pkg/storage/drivers/donut/donut.go | 40 +++++++++++--------------- pkg/storage/drivers/donut/multipart.go | 13 ++++++--- 2 files changed, 25 insertions(+), 28 deletions(-) diff --git a/pkg/storage/drivers/donut/donut.go b/pkg/storage/drivers/donut/donut.go index a38c61867..00660daf7 100644 --- a/pkg/storage/drivers/donut/donut.go +++ b/pkg/storage/drivers/donut/donut.go @@ -325,10 +325,16 @@ func (d donutDriver) GetObject(w io.Writer, bucketName, objectName string) (int6 return 0, iodine.New(drivers.InternalError{}, nil) } } - n, err := io.CopyN(w, reader, size) + pw := newProxyWriter(w) + n, err := io.CopyN(pw, reader, size) if err != nil { return 0, iodine.New(err, nil) } + // Save in memory for future reads + d.objects.Set(objectKey, pw.writtenBytes) + // free up + pw.writtenBytes = nil + go debug.FreeOSMemory() return n, nil } written, err := io.Copy(w, bytes.NewBuffer(data)) @@ -492,31 +498,22 @@ func (d donutDriver) ListObjects(bucketName string, resources drivers.BucketReso return results, resources, nil } -type proxyReader struct { - reader io.Reader - readBytes []byte +type proxyWriter struct { + writer io.Writer + writtenBytes []byte } -func (r *proxyReader) free(p []byte) { - p = nil - go debug.FreeOSMemory() -} -func (r *proxyReader) Read(p []byte) (n int, err error) { - defer r.free(p) - n, err = r.reader.Read(p) - if err == io.EOF || err == io.ErrUnexpectedEOF { - r.readBytes = append(r.readBytes, p[0:n]...) - return - } +func (r *proxyWriter) Write(p []byte) (n int, err error) { + n, err = r.writer.Write(p) if err != nil { return } - r.readBytes = append(r.readBytes, p[0:n]...) + r.writtenBytes = append(r.writtenBytes, p[0:n]...) return } -func newProxyReader(r io.Reader) *proxyReader { - return &proxyReader{reader: r, readBytes: nil} +func newProxyWriter(w io.Writer) *proxyWriter { + return &proxyWriter{writer: w, writtenBytes: nil} } // CreateObject creates a new object @@ -565,8 +562,7 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM } expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes) } - newReader := newProxyReader(reader) - objMetadata, err := d.donut.PutObject(bucketName, objectName, expectedMD5Sum, newReader, metadata) + objMetadata, err := d.donut.PutObject(bucketName, objectName, expectedMD5Sum, reader, metadata) if err != nil { switch iodine.ToError(err).(type) { case donut.BadDigest: @@ -574,10 +570,6 @@ func (d donutDriver) CreateObject(bucketName, objectName, contentType, expectedM } return "", iodine.New(err, errParams) } - d.objects.Set(objectKey, newReader.readBytes) - // free up - newReader.readBytes = nil - go debug.FreeOSMemory() newObject := drivers.ObjectMetadata{ Bucket: bucketName, Key: objectName, diff --git a/pkg/storage/drivers/donut/multipart.go b/pkg/storage/drivers/donut/multipart.go index 1b75a9fbe..2fe388dc3 100644 --- a/pkg/storage/drivers/donut/multipart.go +++ b/pkg/storage/drivers/donut/multipart.go @@ -193,6 +193,7 @@ func (d donutDriver) createObjectPart(bucketName, objectName, uploadID string, p d.lock.Unlock() // setting up for de-allocation readBytes = nil + go debug.FreeOSMemory() md5Sum := hex.EncodeToString(md5SumBytes) // Verify if the written object is equal to what is expected, only if it is requested as such @@ -258,6 +259,7 @@ func (d donutDriver) CompleteMultipartUpload(bucketName, objectName, uploadID st d.lock.Lock() var size int64 + fullHasher := md5.New() var fullObject bytes.Buffer for i := 1; i <= len(parts); i++ { recvMD5 := parts[i] @@ -280,7 +282,8 @@ func (d donutDriver) CompleteMultipartUpload(bucketName, objectName, uploadID st Key: getMultipartKey(objectName, uploadID, i), }, nil) } - _, err = io.Copy(&fullObject, bytes.NewBuffer(object)) + mw := io.MultiWriter(&fullObject, fullHasher) + _, err = io.Copy(mw, bytes.NewReader(object)) if err != nil { return "", iodine.New(err, nil) } @@ -289,9 +292,9 @@ func (d donutDriver) CompleteMultipartUpload(bucketName, objectName, uploadID st } d.lock.Unlock() - md5sumSlice := md5.Sum(fullObject.Bytes()) + md5sumSlice := fullHasher.Sum(nil) // this is needed for final verification inside CreateObject, do not convert this to hex - md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:]) + md5sum := base64.StdEncoding.EncodeToString(md5sumSlice) etag, err := d.CreateObject(bucketName, objectName, "", md5sum, size, &fullObject) if err != nil { // No need to call internal cleanup functions here, caller will call AbortMultipartUpload() @@ -299,6 +302,8 @@ func (d donutDriver) CompleteMultipartUpload(bucketName, objectName, uploadID st return "", iodine.New(err, nil) } fullObject.Reset() + go debug.FreeOSMemory() + d.cleanupMultiparts(bucketName, objectName, uploadID) d.cleanupMultipartSession(bucketName, objectName, uploadID) return etag, nil @@ -421,5 +426,5 @@ func (d donutDriver) expiredPart(a ...interface{}) { for _, storedBucket := range d.storedBuckets { delete(storedBucket.partMetadata, key) } - debug.FreeOSMemory() + go debug.FreeOSMemory() }