mirror of https://github.com/minio/minio.git
Replace snappy.Writer/io.Pipe with snappyCompressReader. (#7316)
Prevents deferred close functions from being called while still attempting to copy reader to snappyWriter. Reduces code duplication when compressing objects.
This commit is contained in:
parent
c54b0c0ca1
commit
ef132c5714
|
@ -675,3 +675,54 @@ func CleanMinioInternalMetadataKeys(metadata map[string]string) map[string]strin
|
||||||
}
|
}
|
||||||
return newMeta
|
return newMeta
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// snappyCompressReader compresses data as it reads
|
||||||
|
// from the underlying io.Reader.
|
||||||
|
type snappyCompressReader struct {
|
||||||
|
r io.Reader
|
||||||
|
w *snappy.Writer
|
||||||
|
closed bool
|
||||||
|
buf bytes.Buffer
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSnappyCompressReader(r io.Reader) *snappyCompressReader {
|
||||||
|
cr := &snappyCompressReader{r: r}
|
||||||
|
cr.w = snappy.NewBufferedWriter(&cr.buf)
|
||||||
|
return cr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cr *snappyCompressReader) Read(p []byte) (int, error) {
|
||||||
|
if cr.closed {
|
||||||
|
// if snappy writer is closed r has been completely read,
|
||||||
|
// return any remaining data in buf.
|
||||||
|
return cr.buf.Read(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
// read from original using p as buffer
|
||||||
|
nr, readErr := cr.r.Read(p)
|
||||||
|
|
||||||
|
// write read bytes to snappy writer
|
||||||
|
nw, err := cr.w.Write(p[:nr])
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
if nw != nr {
|
||||||
|
return 0, io.ErrShortWrite
|
||||||
|
}
|
||||||
|
|
||||||
|
// if last of data from reader, close snappy writer to flush
|
||||||
|
if readErr == io.EOF {
|
||||||
|
err := cr.w.Close()
|
||||||
|
cr.closed = true
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// read compressed bytes out of buf
|
||||||
|
n, err := cr.buf.Read(p)
|
||||||
|
if readErr != io.EOF && (err == nil || err == io.EOF) {
|
||||||
|
err = readErr
|
||||||
|
}
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
|
@ -17,10 +17,14 @@
|
||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/golang/snappy"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Tests validate bucket name.
|
// Tests validate bucket name.
|
||||||
|
@ -544,3 +548,58 @@ func TestGetCompressedOffsets(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSnappyCompressReader(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
data []byte
|
||||||
|
}{
|
||||||
|
{name: "empty", data: nil},
|
||||||
|
{name: "small", data: []byte("hello, world")},
|
||||||
|
{name: "large", data: bytes.Repeat([]byte("hello, world"), 1000)},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
buf := make([]byte, 100) // make small buffer to ensure multiple reads are required for large case
|
||||||
|
|
||||||
|
r := newSnappyCompressReader(bytes.NewReader(tt.data))
|
||||||
|
|
||||||
|
var rdrBuf bytes.Buffer
|
||||||
|
_, err := io.CopyBuffer(&rdrBuf, r, buf)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var stdBuf bytes.Buffer
|
||||||
|
w := snappy.NewBufferedWriter(&stdBuf)
|
||||||
|
_, err = io.CopyBuffer(w, bytes.NewReader(tt.data), buf)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
err = w.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
got = rdrBuf.Bytes()
|
||||||
|
want = stdBuf.Bytes()
|
||||||
|
)
|
||||||
|
if !bytes.Equal(got, want) {
|
||||||
|
t.Errorf("encoded data does not match\n\t%q\n\t%q", got, want)
|
||||||
|
}
|
||||||
|
|
||||||
|
var decBuf bytes.Buffer
|
||||||
|
decRdr := snappy.NewReader(&rdrBuf)
|
||||||
|
_, err = io.Copy(&decBuf, decRdr)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !bytes.Equal(tt.data, decBuf.Bytes()) {
|
||||||
|
t.Errorf("roundtrip failed\n\t%q\n\t%q", tt.data, decBuf.Bytes())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -33,7 +33,6 @@ import (
|
||||||
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
snappy "github.com/golang/snappy"
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
miniogo "github.com/minio/minio-go"
|
miniogo "github.com/minio/minio-go"
|
||||||
"github.com/minio/minio-go/pkg/encrypt"
|
"github.com/minio/minio-go/pkg/encrypt"
|
||||||
|
@ -831,21 +830,9 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
|
||||||
// Remove all source encrypted related metadata to
|
// Remove all source encrypted related metadata to
|
||||||
// avoid copying them in target object.
|
// avoid copying them in target object.
|
||||||
crypto.RemoveInternalEntries(srcInfo.UserDefined)
|
crypto.RemoveInternalEntries(srcInfo.UserDefined)
|
||||||
// Open a pipe for compression.
|
|
||||||
// Where pipeWriter is piped to srcInfo.Reader.
|
reader = newSnappyCompressReader(gr)
|
||||||
// gr writes to pipeWriter.
|
|
||||||
pipeReader, pipeWriter := io.Pipe()
|
|
||||||
reader = pipeReader
|
|
||||||
length = -1
|
length = -1
|
||||||
|
|
||||||
snappyWriter := snappy.NewBufferedWriter(pipeWriter)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
// Compress the decompressed source object.
|
|
||||||
_, cerr := io.Copy(snappyWriter, gr)
|
|
||||||
snappyWriter.Close()
|
|
||||||
pipeWriter.CloseWithError(cerr)
|
|
||||||
}()
|
|
||||||
} else {
|
} else {
|
||||||
// Remove the metadata for remote calls.
|
// Remove the metadata for remote calls.
|
||||||
delete(srcInfo.UserDefined, ReservedMetadataPrefix+"compression")
|
delete(srcInfo.UserDefined, ReservedMetadataPrefix+"compression")
|
||||||
|
@ -1216,28 +1203,17 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
|
||||||
metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV1
|
metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV1
|
||||||
metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(size, 10)
|
metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(size, 10)
|
||||||
|
|
||||||
pipeReader, pipeWriter := io.Pipe()
|
actualReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize)
|
||||||
snappyWriter := snappy.NewBufferedWriter(pipeWriter)
|
|
||||||
|
|
||||||
var actualReader *hash.Reader
|
|
||||||
actualReader, err = hash.NewReader(reader, size, md5hex, sha256hex, actualSize)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
|
||||||
// Writing to the compressed writer.
|
|
||||||
_, cerr := io.CopyN(snappyWriter, actualReader, actualSize)
|
|
||||||
snappyWriter.Close()
|
|
||||||
pipeWriter.CloseWithError(cerr)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Set compression metrics.
|
// Set compression metrics.
|
||||||
|
reader = newSnappyCompressReader(actualReader)
|
||||||
size = -1 // Since compressed size is un-predictable.
|
size = -1 // Since compressed size is un-predictable.
|
||||||
md5hex = "" // Do not try to verify the content.
|
md5hex = "" // Do not try to verify the content.
|
||||||
sha256hex = ""
|
sha256hex = ""
|
||||||
reader = pipeReader
|
|
||||||
}
|
}
|
||||||
|
|
||||||
hashReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize)
|
hashReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize)
|
||||||
|
@ -1654,21 +1630,8 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
|
||||||
isCompressed := compressPart
|
isCompressed := compressPart
|
||||||
// Compress only if the compression is enabled during initial multipart.
|
// Compress only if the compression is enabled during initial multipart.
|
||||||
if isCompressed {
|
if isCompressed {
|
||||||
// Open a pipe for compression.
|
reader = newSnappyCompressReader(gr)
|
||||||
// Where pipeWriter is piped to srcInfo.Reader.
|
|
||||||
// gr writes to pipeWriter.
|
|
||||||
pipeReader, pipeWriter := io.Pipe()
|
|
||||||
reader = pipeReader
|
|
||||||
length = -1
|
length = -1
|
||||||
|
|
||||||
snappyWriter := snappy.NewBufferedWriter(pipeWriter)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
// Compress the decompressed source object.
|
|
||||||
_, cerr := io.Copy(snappyWriter, gr)
|
|
||||||
snappyWriter.Close()
|
|
||||||
pipeWriter.CloseWithError(cerr)
|
|
||||||
}()
|
|
||||||
} else {
|
} else {
|
||||||
reader = gr
|
reader = gr
|
||||||
}
|
}
|
||||||
|
@ -1879,8 +1842,6 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
|
||||||
}
|
}
|
||||||
|
|
||||||
actualSize := size
|
actualSize := size
|
||||||
var pipeReader *io.PipeReader
|
|
||||||
var pipeWriter *io.PipeWriter
|
|
||||||
|
|
||||||
// get encryption options
|
// get encryption options
|
||||||
var opts ObjectOptions
|
var opts ObjectOptions
|
||||||
|
@ -1902,28 +1863,17 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
|
||||||
|
|
||||||
isCompressed := false
|
isCompressed := false
|
||||||
if objectAPI.IsCompressionSupported() && compressPart {
|
if objectAPI.IsCompressionSupported() && compressPart {
|
||||||
pipeReader, pipeWriter = io.Pipe()
|
actualReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize)
|
||||||
snappyWriter := snappy.NewBufferedWriter(pipeWriter)
|
|
||||||
|
|
||||||
var actualReader *hash.Reader
|
|
||||||
actualReader, err = hash.NewReader(reader, size, md5hex, sha256hex, actualSize)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
|
||||||
// Writing to the compressed writer.
|
|
||||||
_, cerr := io.CopyN(snappyWriter, actualReader, actualSize)
|
|
||||||
snappyWriter.Close()
|
|
||||||
pipeWriter.CloseWithError(cerr)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Set compression metrics.
|
// Set compression metrics.
|
||||||
|
reader = newSnappyCompressReader(actualReader)
|
||||||
size = -1 // Since compressed size is un-predictable.
|
size = -1 // Since compressed size is un-predictable.
|
||||||
md5hex = "" // Do not try to verify the content.
|
md5hex = "" // Do not try to verify the content.
|
||||||
sha256hex = ""
|
sha256hex = ""
|
||||||
reader = pipeReader
|
|
||||||
isCompressed = true
|
isCompressed = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2016,7 +1966,6 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
|
||||||
|
|
||||||
etag := partInfo.ETag
|
etag := partInfo.ETag
|
||||||
if isCompressed {
|
if isCompressed {
|
||||||
pipeWriter.Close()
|
|
||||||
// Suppress compressed ETag.
|
// Suppress compressed ETag.
|
||||||
etag = partInfo.ETag + "-1"
|
etag = partInfo.ETag + "-1"
|
||||||
} else if isEncrypted {
|
} else if isEncrypted {
|
||||||
|
|
|
@ -912,26 +912,15 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
|
||||||
metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV1
|
metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV1
|
||||||
metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(size, 10)
|
metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(size, 10)
|
||||||
|
|
||||||
pipeReader, pipeWriter := io.Pipe()
|
actualReader, err := hash.NewReader(reader, size, "", "", actualSize)
|
||||||
snappyWriter := snappy.NewBufferedWriter(pipeWriter)
|
|
||||||
|
|
||||||
var actualReader *hash.Reader
|
|
||||||
actualReader, err = hash.NewReader(reader, size, "", "", actualSize)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeWebErrorResponse(w, err)
|
writeWebErrorResponse(w, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
|
||||||
// Writing to the compressed writer.
|
|
||||||
_, cerr := io.CopyN(snappyWriter, actualReader, actualSize)
|
|
||||||
snappyWriter.Close()
|
|
||||||
pipeWriter.CloseWithError(cerr)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Set compression metrics.
|
// Set compression metrics.
|
||||||
size = -1 // Since compressed size is un-predictable.
|
size = -1 // Since compressed size is un-predictable.
|
||||||
reader = pipeReader
|
reader = newSnappyCompressReader(actualReader)
|
||||||
hashReader, err = hash.NewReader(reader, size, "", "", actualSize)
|
hashReader, err = hash.NewReader(reader, size, "", "", actualSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeWebErrorResponse(w, err)
|
writeWebErrorResponse(w, err)
|
||||||
|
|
Loading…
Reference in New Issue