From ff726969aa89820a25ac6086a29827d4e97421ef Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Wed, 25 Sep 2019 23:08:24 -0700 Subject: [PATCH] Switch to Snappy -> S2 compression (#8189) --- cmd/globals.go | 10 +-- cmd/notification.go | 2 +- cmd/object-api-utils.go | 158 +++++++++++++++++++---------------- cmd/object-api-utils_test.go | 81 ++++++++++++++---- cmd/object-handlers.go | 23 +++-- cmd/posix-list-dir_test.go | 4 + cmd/storage-rest-client.go | 8 +- cmd/storage-rest-server.go | 2 + cmd/web-handlers.go | 66 ++++----------- docs/compression/README.md | 25 +++++- go.mod | 3 +- go.sum | 2 + 12 files changed, 224 insertions(+), 160 deletions(-) diff --git a/cmd/globals.go b/cmd/globals.go index 76d96c8a3..5da16a471 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -251,18 +251,18 @@ var ( // configuration must be present. globalAutoEncryption bool - // Is compression include extensions/content-types set. + // Is compression include extensions/content-types set? globalIsEnvCompression bool - // Is compression enabeld. + // Is compression enabled? globalIsCompressionEnabled = false // Include-list for compression. - globalCompressExtensions = []string{".txt", ".log", ".csv", ".json"} - globalCompressMimeTypes = []string{"text/csv", "text/plain", "application/json"} + globalCompressExtensions = []string{".txt", ".log", ".csv", ".json", ".tar", ".xml", ".bin"} + globalCompressMimeTypes = []string{"text/*", "application/json", "application/xml"} // Some standard object extensions which we strictly dis-allow for compression. - standardExcludeCompressExtensions = []string{".gz", ".bz2", ".rar", ".zip", ".7z"} + standardExcludeCompressExtensions = []string{".gz", ".bz2", ".rar", ".zip", ".7z", ".xz", ".mp4", ".mkv", ".mov"} // Some standard content-types which we strictly dis-allow for compression. standardExcludeCompressContentTypes = []string{"video/*", "audio/*", "application/zip", "application/x-gzip", "application/x-zip-compressed", " application/x-compress", "application/x-spoon"} diff --git a/cmd/notification.go b/cmd/notification.go index 04f388157..5eecbe3a4 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -17,7 +17,6 @@ package cmd import ( - "archive/zip" "bytes" "context" "encoding/json" @@ -31,6 +30,7 @@ import ( "sync" "time" + "github.com/klauspost/compress/zip" "github.com/minio/minio/cmd/crypto" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/event" diff --git a/cmd/object-api-utils.go b/cmd/object-api-utils.go index 96d616fb2..2e88cfea6 100644 --- a/cmd/object-api-utils.go +++ b/cmd/object-api-utils.go @@ -33,7 +33,8 @@ import ( "time" "unicode/utf8" - snappy "github.com/golang/snappy" + "github.com/klauspost/compress/s2" + "github.com/klauspost/readahead" "github.com/minio/minio-go/v6/pkg/s3utils" "github.com/minio/minio/cmd/crypto" xhttp "github.com/minio/minio/cmd/http" @@ -56,6 +57,12 @@ const ( minioMetaTmpBucket = minioMetaBucket + "/tmp" // DNS separator (period), used for bucket name validation. dnsDelimiter = "." + // On compressed files bigger than this; + compReadAheadSize = 100 << 20 + // Read this many buffers ahead. + compReadAheadBuffers = 5 + // Size of each buffer. + compReadAheadBufSize = 1 << 20 ) // isMinioBucket returns true if given bucket is a MinIO internal @@ -337,6 +344,22 @@ func (o ObjectInfo) IsCompressed() bool { return ok } +// IsCompressedOK returns whether the object is compressed and can be decompressed. +func (o ObjectInfo) IsCompressedOK() (bool, error) { + scheme, ok := o.UserDefined[ReservedMetadataPrefix+"compression"] + if !ok { + return false, nil + } + if crypto.IsEncrypted(o.UserDefined) { + return true, fmt.Errorf("compression %q and encryption enabled on same object", scheme) + } + switch scheme { + case compressionAlgorithmV1, compressionAlgorithmV2: + return true, nil + } + return true, fmt.Errorf("unknown compression scheme: %s", scheme) +} + // GetActualSize - read the decompressed size from the meta json. func (o ObjectInfo) GetActualSize() int64 { metadata := o.UserDefined @@ -364,29 +387,34 @@ func isCompressible(header http.Header, object string) bool { func excludeForCompression(header http.Header, object string) bool { objStr := object contentType := header.Get(xhttp.ContentType) - if globalIsCompressionEnabled { - // We strictly disable compression for standard extensions/content-types (`compressed`). - if hasStringSuffixInSlice(objStr, standardExcludeCompressExtensions) || hasPattern(standardExcludeCompressContentTypes, contentType) { - return true - } - // Filter compression includes. - if len(globalCompressExtensions) > 0 || len(globalCompressMimeTypes) > 0 { - extensions := globalCompressExtensions - mimeTypes := globalCompressMimeTypes - if hasStringSuffixInSlice(objStr, extensions) || hasPattern(mimeTypes, contentType) { - return false - } - return true - } + if !globalIsCompressionEnabled { + return true + } + + // We strictly disable compression for standard extensions/content-types (`compressed`). + if hasStringSuffixInSlice(objStr, standardExcludeCompressExtensions) || hasPattern(standardExcludeCompressContentTypes, contentType) { + return true + } + + // Filter compression includes. + if len(globalCompressExtensions) == 0 || len(globalCompressMimeTypes) == 0 { + return false + } + + extensions := globalCompressExtensions + mimeTypes := globalCompressMimeTypes + if hasStringSuffixInSlice(objStr, extensions) || hasPattern(mimeTypes, contentType) { return false } return true } // Utility which returns if a string is present in the list. +// Comparison is case insensitive. func hasStringSuffixInSlice(str string, list []string) bool { + str = strings.ToLower(str) for _, v := range list { - if strings.HasSuffix(str, v) { + if strings.HasSuffix(str, strings.ToLower(v)) { return true } } @@ -413,7 +441,7 @@ func getPartFile(entries []string, partNumber int, etag string) string { return "" } -// Returs the compressed offset which should be skipped. +// Returns the compressed offset which should be skipped. func getCompressedOffsets(objectInfo ObjectInfo, offset int64) (int64, int64) { var compressedOffset int64 var skipLength int64 @@ -494,7 +522,10 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, pcfn CheckCopyPrecondi }() isEncrypted := crypto.IsEncrypted(oi.UserDefined) - isCompressed := oi.IsCompressed() + isCompressed, err := oi.IsCompressedOK() + if err != nil { + return nil, 0, 0, err + } var skipLen int64 // Calculate range to read (different for // e.g. encrypted/compressed objects) @@ -575,7 +606,7 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, pcfn CheckCopyPrecondi if err != nil { return nil, 0, 0, err } - // Incase of range based queries on multiparts, the offset and length are reduced. + // In case of range based queries on multiparts, the offset and length are reduced. off, decOff = getCompressedOffsets(oi, off) decLength = length length = oi.Size - off @@ -602,10 +633,23 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, pcfn CheckCopyPrecondi } } // Decompression reader. - snappyReader := snappy.NewReader(inputReader) - // Apply the skipLen and limit on the - // decompressed stream - decReader := io.LimitReader(ioutil.NewSkipReader(snappyReader, decOff), decLength) + s2Reader := s2.NewReader(inputReader) + // Apply the skipLen and limit on the decompressed stream. + err = s2Reader.Skip(decOff) + if err != nil { + return nil, err + } + + decReader := io.LimitReader(s2Reader, decLength) + if decLength > compReadAheadSize { + rah, err := readahead.NewReaderSize(decReader, compReadAheadBuffers, compReadAheadBufSize) + if err == nil { + decReader = rah + cFns = append(cFns, func() { + rah.Close() + }) + } + } oi.Size = decLength // Assemble the GetObjectReader @@ -760,55 +804,29 @@ func CleanMinioInternalMetadataKeys(metadata map[string]string) map[string]strin return newMeta } -// snappyCompressReader compresses data as it reads -// from the underlying io.Reader. -type snappyCompressReader struct { - r io.Reader - w *snappy.Writer - closed bool - buf bytes.Buffer -} - -func newSnappyCompressReader(r io.Reader) *snappyCompressReader { - cr := &snappyCompressReader{r: r} - cr.w = snappy.NewBufferedWriter(&cr.buf) - return cr -} - -func (cr *snappyCompressReader) Read(p []byte) (int, error) { - if cr.closed { - // if snappy writer is closed r has been completely read, - // return any remaining data in buf. - return cr.buf.Read(p) - } - - // read from original using p as buffer - nr, readErr := cr.r.Read(p) - - // write read bytes to snappy writer - nw, err := cr.w.Write(p[:nr]) - if err != nil { - return 0, err - } - if nw != nr { - return 0, io.ErrShortWrite - } - - // if last of data from reader, close snappy writer to flush - if readErr == io.EOF { - err := cr.w.Close() - cr.closed = true +// newS2CompressReader will read data from r, compress it and return the compressed data as a Reader. +// Use Close to ensure resources are released on incomplete streams. +func newS2CompressReader(r io.Reader) io.ReadCloser { + pr, pw := io.Pipe() + comp := s2.NewWriter(pw) + // Copy input to compressor + go func() { + _, err := io.Copy(comp, r) if err != nil { - return 0, err + comp.Close() + pw.CloseWithError(err) + return } - } - - // read compressed bytes out of buf - n, err := cr.buf.Read(p) - if readErr != io.EOF && (err == nil || err == io.EOF) { - err = readErr - } - return n, err + // Close the stream. + err = comp.Close() + if err != nil { + pw.CloseWithError(err) + return + } + // Everything ok, do regular close. + pw.Close() + }() + return pr } // Returns error if the cancelCh has been closed (indicating that S3 client has disconnected) diff --git a/cmd/object-api-utils_test.go b/cmd/object-api-utils_test.go index b9a9f5a3c..d781f0ae2 100644 --- a/cmd/object-api-utils_test.go +++ b/cmd/object-api-utils_test.go @@ -1,5 +1,5 @@ /* - * MinIO Cloud Storage, (C) 2016 MinIO, Inc. + * MinIO Cloud Storage, (C) 2016-2019 MinIO, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,9 +21,11 @@ import ( "io" "net/http" "reflect" + "strconv" "testing" - "github.com/golang/snappy" + "github.com/klauspost/compress/s2" + "github.com/minio/minio/cmd/crypto" ) // Tests validate bucket name. @@ -298,10 +300,11 @@ func TestIsCompressed(t *testing.T) { testCases := []struct { objInfo ObjectInfo result bool + err bool }{ { objInfo: ObjectInfo{ - UserDefined: map[string]string{"X-Minio-Internal-compression": "golang/snappy/LZ77", + UserDefined: map[string]string{"X-Minio-Internal-compression": compressionAlgorithmV1, "content-type": "application/octet-stream", "etag": "b3ff3ef3789147152fbfbc50efba4bfd-2"}, }, @@ -309,7 +312,35 @@ func TestIsCompressed(t *testing.T) { }, { objInfo: ObjectInfo{ - UserDefined: map[string]string{"X-Minio-Internal-XYZ": "golang/snappy/LZ77", + UserDefined: map[string]string{"X-Minio-Internal-compression": compressionAlgorithmV2, + "content-type": "application/octet-stream", + "etag": "b3ff3ef3789147152fbfbc50efba4bfd-2"}, + }, + result: true, + }, + { + objInfo: ObjectInfo{ + UserDefined: map[string]string{"X-Minio-Internal-compression": "unknown/compression/type", + "content-type": "application/octet-stream", + "etag": "b3ff3ef3789147152fbfbc50efba4bfd-2"}, + }, + result: true, + err: true, + }, + { + objInfo: ObjectInfo{ + UserDefined: map[string]string{"X-Minio-Internal-compression": compressionAlgorithmV2, + "content-type": "application/octet-stream", + "etag": "b3ff3ef3789147152fbfbc50efba4bfd-2", + crypto.SSEIV: "yes", + }, + }, + result: true, + err: true, + }, + { + objInfo: ObjectInfo{ + UserDefined: map[string]string{"X-Minio-Internal-XYZ": "klauspost/compress/s2", "content-type": "application/octet-stream", "etag": "b3ff3ef3789147152fbfbc50efba4bfd-2"}, }, @@ -324,11 +355,21 @@ func TestIsCompressed(t *testing.T) { }, } for i, test := range testCases { - got := test.objInfo.IsCompressed() - if got != test.result { - t.Errorf("Test %d - expected %v but received %v", - i+1, test.result, got) - } + t.Run(strconv.Itoa(i), func(t *testing.T) { + got := test.objInfo.IsCompressed() + if got != test.result { + t.Errorf("IsCompressed: Expected %v but received %v", + test.result, got) + } + got, gErr := test.objInfo.IsCompressedOK() + if got != test.result { + t.Errorf("IsCompressedOK: Expected %v but received %v", + test.result, got) + } + if gErr != nil != test.err { + t.Errorf("IsCompressedOK: want error: %t, got error: %v", test.err, gErr) + } + }) } } @@ -367,6 +408,13 @@ func TestExcludeForCompression(t *testing.T) { }, result: false, }, + { + object: "object", + header: http.Header{ + "Content-Type": []string{"text/something"}, + }, + result: false, + }, } for i, test := range testCases { globalIsCompressionEnabled = true @@ -422,7 +470,7 @@ func TestGetActualSize(t *testing.T) { }{ { objInfo: ObjectInfo{ - UserDefined: map[string]string{"X-Minio-Internal-compression": "golang/snappy/LZ77", + UserDefined: map[string]string{"X-Minio-Internal-compression": "klauspost/compress/s2", "X-Minio-Internal-actual-size": "100000001", "content-type": "application/octet-stream", "etag": "b3ff3ef3789147152fbfbc50efba4bfd-2"}, @@ -441,7 +489,7 @@ func TestGetActualSize(t *testing.T) { }, { objInfo: ObjectInfo{ - UserDefined: map[string]string{"X-Minio-Internal-compression": "golang/snappy/LZ77", + UserDefined: map[string]string{"X-Minio-Internal-compression": "klauspost/compress/s2", "X-Minio-Internal-actual-size": "841", "content-type": "application/octet-stream", "etag": "b3ff3ef3789147152fbfbc50efba4bfd-2"}, @@ -451,7 +499,7 @@ func TestGetActualSize(t *testing.T) { }, { objInfo: ObjectInfo{ - UserDefined: map[string]string{"X-Minio-Internal-compression": "golang/snappy/LZ77", + UserDefined: map[string]string{"X-Minio-Internal-compression": "klauspost/compress/s2", "content-type": "application/octet-stream", "etag": "b3ff3ef3789147152fbfbc50efba4bfd-2"}, Parts: []ObjectPartInfo{}, @@ -540,7 +588,7 @@ func TestGetCompressedOffsets(t *testing.T) { } } -func TestSnappyCompressReader(t *testing.T) { +func TestS2CompressReader(t *testing.T) { tests := []struct { name string data []byte @@ -554,7 +602,8 @@ func TestSnappyCompressReader(t *testing.T) { t.Run(tt.name, func(t *testing.T) { buf := make([]byte, 100) // make small buffer to ensure multiple reads are required for large case - r := newSnappyCompressReader(bytes.NewReader(tt.data)) + r := newS2CompressReader(bytes.NewReader(tt.data)) + defer r.Close() var rdrBuf bytes.Buffer _, err := io.CopyBuffer(&rdrBuf, r, buf) @@ -563,7 +612,7 @@ func TestSnappyCompressReader(t *testing.T) { } var stdBuf bytes.Buffer - w := snappy.NewBufferedWriter(&stdBuf) + w := s2.NewWriter(&stdBuf) _, err = io.CopyBuffer(w, bytes.NewReader(tt.data), buf) if err != nil { t.Fatal(err) @@ -582,7 +631,7 @@ func TestSnappyCompressReader(t *testing.T) { } var decBuf bytes.Buffer - decRdr := snappy.NewReader(&rdrBuf) + decRdr := s2.NewReader(&rdrBuf) _, err = io.Copy(&decBuf, decRdr) if err != nil { t.Fatal(err) diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index ede57371b..d5b781e7b 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -61,6 +61,7 @@ var supportedHeadGetReqParams = map[string]string{ const ( compressionAlgorithmV1 = "golang/snappy/LZ77" + compressionAlgorithmV2 = "klauspost/compress/s2" ) // setHeadGetRespHeaders - set any requested parameters as response headers. @@ -800,13 +801,15 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re if isCompressed { compressMetadata = make(map[string]string, 2) // Preserving the compression metadata. - compressMetadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV1 + compressMetadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV2 compressMetadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(actualSize, 10) // Remove all source encrypted related metadata to // avoid copying them in target object. crypto.RemoveInternalEntries(srcInfo.UserDefined) - reader = newSnappyCompressReader(gr) + s2c := newS2CompressReader(gr) + defer s2c.Close() + reader = s2c length = -1 } else { // Remove the metadata for remote calls. @@ -1175,7 +1178,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req if objectAPI.IsCompressionSupported() && isCompressible(r.Header, object) && size > 0 { // Storing the compression metadata. - metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV1 + metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV2 metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(size, 10) actualReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize, globalCLIContext.StrictS3Compat) @@ -1185,7 +1188,9 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req } // Set compression metrics. - reader = newSnappyCompressReader(actualReader) + s2c := newS2CompressReader(actualReader) + defer s2c.Close() + reader = s2c size = -1 // Since compressed size is un-predictable. md5hex = "" // Do not try to verify the content. sha256hex = "" @@ -1389,7 +1394,7 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r if objectAPI.IsCompressionSupported() && isCompressible(r.Header, object) { // Storing the compression metadata. - metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV1 + metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV2 } opts, err = putOpts(ctx, r, bucket, object, metadata) @@ -1632,7 +1637,9 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt isCompressed := compressPart // Compress only if the compression is enabled during initial multipart. if isCompressed { - reader = newSnappyCompressReader(gr) + s2c := newS2CompressReader(gr) + defer s2c.Close() + reader = s2c length = -1 } else { reader = gr @@ -1872,7 +1879,9 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http } // Set compression metrics. - reader = newSnappyCompressReader(actualReader) + s2c := newS2CompressReader(actualReader) + defer s2c.Close() + reader = s2c size = -1 // Since compressed size is un-predictable. md5hex = "" // Do not try to verify the content. sha256hex = "" diff --git a/cmd/posix-list-dir_test.go b/cmd/posix-list-dir_test.go index 8cc711ce6..93e2f664a 100644 --- a/cmd/posix-list-dir_test.go +++ b/cmd/posix-list-dir_test.go @@ -129,6 +129,10 @@ func setupTestReadDirGeneric(t *testing.T) (testResults []result) { // Test to read non-empty directory with symlinks. func setupTestReadDirSymlink(t *testing.T) (testResults []result) { + if runtime.GOOS != "Windows" { + t.Log("symlinks not available on windows") + return nil + } dir := mustSetupDir(t) entries := []string{} for i := 0; i < 10; i++ { diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 99a4b5d1a..c7bf92671 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -20,16 +20,14 @@ import ( "bufio" "bytes" "crypto/tls" + "encoding/gob" + "encoding/hex" + "fmt" "io" "io/ioutil" "net/url" "path" "strconv" - - "encoding/gob" - "encoding/hex" - - "fmt" "strings" "github.com/minio/minio/cmd/http" diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 085d73fbe..924b82b5c 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -358,10 +358,12 @@ func (s *storageRESTServer) ReadFileStreamHandler(w http.ResponseWriter, r *http return } defer rc.Close() + w.Header().Set(xhttp.ContentLength, strconv.Itoa(length)) io.Copy(w, rc) w.(http.Flusher).Flush() + } // readMetadata func provides the function types for reading leaf metadata. diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index 898d85b49..023e26a17 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -17,7 +17,6 @@ package cmd import ( - "archive/zip" "context" "encoding/json" "fmt" @@ -29,13 +28,12 @@ import ( "runtime" "strconv" "strings" - "sync" "time" - humanize "github.com/dustin/go-humanize" - snappy "github.com/golang/snappy" + "github.com/dustin/go-humanize" "github.com/gorilla/mux" "github.com/gorilla/rpc/v2/json2" + "github.com/klauspost/compress/zip" miniogopolicy "github.com/minio/minio-go/v6/pkg/policy" "github.com/minio/minio-go/v6/pkg/s3utils" "github.com/minio/minio-go/v6/pkg/set" @@ -995,7 +993,7 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { } if objectAPI.IsCompressionSupported() && isCompressible(r.Header, object) && size > 0 { // Storing the compression metadata. - metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV1 + metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV2 metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(size, 10) actualReader, err := hash.NewReader(reader, size, "", "", actualSize, globalCLIContext.StrictS3Compat) @@ -1006,7 +1004,9 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { // Set compression metrics. size = -1 // Since compressed size is un-predictable. - reader = newSnappyCompressReader(actualReader) + s2c := newS2CompressReader(actualReader) + defer s2c.Close() + reader = s2c hashReader, err = hash.NewReader(reader, size, "", "", actualSize, globalCLIContext.StrictS3Compat) if err != nil { writeWebErrorResponse(w, err) @@ -1234,7 +1234,6 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) { ctx := newContext(r, w, "WebDownloadZip") defer logger.AuditLog(w, r, "WebDownloadZip", mustGetClaimsFromToken(r)) - var wg sync.WaitGroup objectAPI := web.ObjectAPI() if objectAPI == nil { writeWebErrorResponse(w, errServerNotInitialized) @@ -1306,7 +1305,6 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) { archive := zip.NewWriter(w) defer archive.Close() - var length int64 for _, object := range args.Objects { // Writes compressed object file to the response. zipit := func(objectName string) error { @@ -1318,58 +1316,28 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) { defer gr.Close() info := gr.ObjInfo - - var actualSize int64 if info.IsCompressed() { - // Read the decompressed size from the meta.json. - actualSize = info.GetActualSize() - // Set the info.Size to the actualSize. - info.Size = actualSize + // For reporting, set the file size to the uncompressed size. + info.Size = info.GetActualSize() } header := &zip.FileHeader{ - Name: strings.TrimPrefix(objectName, args.Prefix), - Method: zip.Deflate, - UncompressedSize64: uint64(length), - UncompressedSize: uint32(length), + Name: strings.TrimPrefix(objectName, args.Prefix), + Method: zip.Deflate, } - zipWriter, err := archive.CreateHeader(header) + if hasStringSuffixInSlice(info.Name, standardExcludeCompressExtensions) || hasPattern(standardExcludeCompressContentTypes, info.ContentType) { + // We strictly disable compression for standard extensions/content-types. + header.Method = zip.Store + } + writer, err := archive.CreateHeader(header) if err != nil { writeWebErrorResponse(w, errUnexpected) return err } - var writer io.Writer - - if info.IsCompressed() { - // Open a pipe for compression - // Where compressWriter is actually passed to the getObject - decompressReader, compressWriter := io.Pipe() - snappyReader := snappy.NewReader(decompressReader) - - // The limit is set to the actual size. - responseWriter := ioutil.LimitedWriter(zipWriter, 0, actualSize) - wg.Add(1) //For closures. - go func() { - defer wg.Done() - // Finally, writes to the client. - _, perr := io.Copy(responseWriter, snappyReader) - - // Close the compressWriter if the data is read already. - // Closing the pipe, releases the writer passed to the getObject. - compressWriter.CloseWithError(perr) - }() - writer = compressWriter - } else { - writer = zipWriter - } httpWriter := ioutil.WriteOnClose(writer) // Write object content to response body if _, err = io.Copy(httpWriter, gr); err != nil { httpWriter.Close() - if info.IsCompressed() { - // Wait for decompression go-routine to retire. - wg.Wait() - } if !httpWriter.HasWritten() { // write error response only if no data or headers has been written to client yet writeWebErrorResponse(w, err) } @@ -1382,10 +1350,6 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) { return err } } - if info.IsCompressed() { - // Wait for decompression go-routine to retire. - wg.Wait() - } // Notify object accessed via a GET request. sendEvent(eventArgs{ diff --git a/docs/compression/README.md b/docs/compression/README.md index 5580858da..4d8bac963 100644 --- a/docs/compression/README.md +++ b/docs/compression/README.md @@ -1,6 +1,10 @@ # Compression Guide [![Slack](https://slack.min.io/slack?type=svg)](https://slack.min.io) -MinIO server allows streaming compression to ensure efficient disk space usage. Compression happens inflight, i.e objects are compressed before being written to disk(s). MinIO uses [`golang/snappy`](https://github.com/golang/snappy) streaming compression due to its stability and performance. +MinIO server allows streaming compression to ensure efficient disk space usage. Compression happens inflight, i.e objects are compressed before being written to disk(s). MinIO uses [`klauspost/compress/s2`](https://github.com/klauspost/compress/tree/master/s2) streaming compression due to its stability and performance. + +This algorithm is specifically optimized for machine generated content. Write throughput is typically at least 300MB/s per CPU core. Decompression speed is typically at least 1GB/s. +This means that in cases where raw IO is below these numbers compression will not only reduce disk usage but also help increase system throughput. +Typically enabling compression on spinning disk systems will increase speed when the content can be compressed. ## Get Started @@ -15,13 +19,26 @@ Compression can be enabled by updating the `compress` config settings for MinIO ```json "compress": { "enabled": true, - "extensions": [".txt",".log",".csv", ".json"], - "mime-types": ["text/csv","text/plain","application/json"] + "extensions": [".txt",".log",".csv", ".json", ".tar"], + "mime-types": ["text/*","application/json","application/xml"] } ``` Since text, log, csv, json files are highly compressible, These extensions/mime-types are included by default for compression. +Having compression enabled and no extensions or mime types will attempt to compress anything that isn't explicitly known to be already compressed content. +Settings for enabling compression on all content, except for types listed below: + +```json +"compress": { + "enabled": true, + "extensions": [], + "mime-types": [] +} +``` + +Incompressible content will be skipped with quite low CPU usage and storage overhead, typically at several GB/s. + To update the configuration, use `mc admin config get` command to get the current configuration file for the minio cluster in json format, and save it locally. ```sh @@ -69,6 +86,8 @@ export MINIO_COMPRESS_MIMETYPES="application/pdf" | `application/x-compress` | | `application/x-xz` | +All files with these extensions and mime types are excluded from compression, even if compression is enabled for all types. + - MinIO does not support encryption with compression because compression and encryption together potentially enables room for side channel attacks like [`CRIME and BREACH`](https://blog.minio.io/c-e-compression-encryption-cb6b7f04a369) - MinIO does not support compression for Gateway (Azure/GCS/NAS) implementations. diff --git a/go.mod b/go.mod index b55bdebca..6cbb67784 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,6 @@ require ( github.com/fatih/color v1.7.0 github.com/fatih/structs v1.1.0 github.com/go-sql-driver/mysql v1.4.1 - github.com/golang/snappy v0.0.1 github.com/gomodule/redigo v2.0.0+incompatible github.com/gorilla/handlers v1.4.0 github.com/gorilla/mux v1.7.0 @@ -29,7 +28,7 @@ require ( github.com/hashicorp/vault v1.1.0 github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf github.com/json-iterator/go v1.1.7 - github.com/klauspost/compress v1.5.0 + github.com/klauspost/compress v1.8.3 github.com/klauspost/pgzip v1.2.1 github.com/klauspost/readahead v1.3.0 github.com/klauspost/reedsolomon v1.9.1 diff --git a/go.sum b/go.sum index 358a99905..13c96658f 100644 --- a/go.sum +++ b/go.sum @@ -340,6 +340,8 @@ github.com/klauspost/compress v1.3.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0 github.com/klauspost/compress v1.4.1/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.5.0 h1:iDac0ZKbmSA4PRrRuXXjZL8C7UoJan8oBYxXkMzEQrI= github.com/klauspost/compress v1.5.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/klauspost/compress v1.8.3 h1:CkLseiEYMM/fRb0RIg9mXB+Iwgmle+U9KGFu+JCO4Ec= +github.com/klauspost/compress v1.8.3/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/cpuid v0.0.0-20160106104451-349c67577817/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid v1.2.0/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid v1.2.1 h1:vJi+O/nMdFt0vqm8NZBI6wzALWdA2X+egi0ogNyrC/w=