From ce9d36d954ebb93db52e6055ec9e21594f746e33 Mon Sep 17 00:00:00 2001 From: Praveen raj Mani Date: Fri, 28 Sep 2018 09:06:17 +0530 Subject: [PATCH] Add object compression support (#6292) Add support for streaming (golang/LZ77/snappy) compression. --- cmd/admin-handlers_test.go | 26 +- cmd/api-errors.go | 12 + cmd/api-headers.go | 6 +- cmd/auth-handler.go | 2 +- cmd/bucket-handlers-listobjects.go | 25 +- cmd/bucket-handlers.go | 4 +- cmd/common-main.go | 35 +++ cmd/config-current.go | 33 ++- cmd/config-migrate.go | 37 ++- cmd/config-migrate_test.go | 1 - cmd/config-versions.go | 15 +- cmd/config.go | 2 +- cmd/disk-cache-fs.go | 2 +- cmd/disk-cache.go | 12 +- cmd/disk-cache_test.go | 4 +- cmd/dummy-object-layer_test.go | 4 + cmd/erasure-utils.go | 1 + cmd/fs-v1-metadata.go | 10 +- cmd/fs-v1-multipart.go | 109 ++++++-- cmd/fs-v1.go | 12 +- cmd/gateway-unsupported.go | 5 + cmd/gateway/azure/gateway-azure.go | 5 + cmd/gateway/b2/gateway-b2.go | 5 + cmd/gateway/gcs/gateway-gcs.go | 5 + cmd/gateway/manta/gateway-manta.go | 5 + cmd/gateway/nas/gateway-nas.go | 5 + cmd/gateway/oss/gateway-oss.go | 5 + cmd/gateway/s3/gateway-s3.go | 5 + cmd/gateway/sia/gateway-sia.go | 5 + cmd/globals.go | 17 ++ cmd/object-api-datatypes.go | 3 + cmd/object-api-interface.go | 3 + cmd/object-api-utils.go | 149 +++++++++++ cmd/object-api-utils_test.go | 248 ++++++++++++++++++ cmd/object-handlers.go | 226 ++++++++++++++-- cmd/object-handlers_test.go | 4 +- cmd/posix.go | 9 +- cmd/server_test.go | 2 + cmd/test-utils_test.go | 2 +- cmd/typed-errors.go | 3 + cmd/ui-errors.go | 6 + cmd/web-handlers.go | 155 ++++++++++- cmd/xl-sets.go | 5 + cmd/xl-v1-bucket.go | 5 + cmd/xl-v1-metadata.go | 20 +- cmd/xl-v1-metadata_test.go | 8 +- cmd/xl-v1-multipart.go | 38 ++- cmd/xl-v1-object.go | 29 +- cmd/xl-v1-utils.go | 3 +- cmd/xl-v1-utils_test.go | 2 +- docs/compression/README.md | 81 ++++++ pkg/hash/reader.go | 12 +- pkg/hash/reader_test.go | 40 ++- vendor/github.com/golang/snappy/encode.go | 2 +- .../github.com/golang/snappy/encode_amd64.go | 2 +- vendor/github.com/golang/snappy/snappy.go | 17 +- vendor/vendor.json | 6 +- 57 files changed, 1321 insertions(+), 173 deletions(-) create mode 100644 docs/compression/README.md diff --git a/cmd/admin-handlers_test.go b/cmd/admin-handlers_test.go index 7232f2696..4dc694bc8 100644 --- a/cmd/admin-handlers_test.go +++ b/cmd/admin-handlers_test.go @@ -38,7 +38,7 @@ import ( var ( configJSON = []byte(`{ - "version": "29", + "version": "30", "credential": { "accessKey": "minio", "secretKey": "minio123" @@ -186,20 +186,24 @@ var ( "endpoint": "" } } - }, - "logger": { + }, + "logger": { "console": { - "enabled": true + "enabled": true }, "http": { - "1": { - "enabled": false, - "endpoint": "http://user:example@localhost:9001/api/endpoint" - } + "target1": { + "enabled": false, + "endpoint": "https://username:password@example.com/api" + } } - } - - }`) + }, + "compress": { + "enabled": false, + "extensions":[".txt",".log",".csv",".json"], + "mime-types":["text/csv","text/plain","application/json"] + } +}`) ) // adminXLTestBed - encapsulates subsystems that need to be setup for diff --git a/cmd/api-errors.go b/cmd/api-errors.go index 532853bc6..6847d097f 100644 --- a/cmd/api-errors.go +++ b/cmd/api-errors.go @@ -292,6 +292,7 @@ const ( ErrMissingHeaders ErrAdminConfigNotificationTargetsFailed ErrAdminProfilerNotEnabled + ErrInvalidDecompressedSize ) // error code to APIError structure, these fields carry respective @@ -1403,6 +1404,11 @@ var errorCodeResponse = map[APIErrorCode]APIError{ Description: "Some headers in the query are missing from the file. Check the file and try again.", HTTPStatusCode: http.StatusBadRequest, }, + ErrInvalidDecompressedSize: { + Code: "XMinioInvalidDecompressedSize", + Description: "The data provided is unfit for decompression", + HTTPStatusCode: http.StatusBadRequest, + }, // Add your error structure here. } @@ -1622,6 +1628,12 @@ func toAPIErrorCode(err error) (apiErr APIErrorCode) { } + // Compression errors + switch err { + case errInvalidDecompressedSize: + apiErr = ErrInvalidDecompressedSize + } + if apiErr != ErrNone { // If there was a match in the above switch case. return apiErr diff --git a/cmd/api-headers.go b/cmd/api-headers.go index f99131a4f..7ddc508f8 100644 --- a/cmd/api-headers.go +++ b/cmd/api-headers.go @@ -104,7 +104,11 @@ func setObjectHeaders(w http.ResponseWriter, objInfo ObjectInfo, rs *HTTPRangeSp if err != nil { return err } - + case objInfo.IsCompressed(): + totalObjectSize = objInfo.GetActualSize() + if totalObjectSize < 0 { + return errInvalidDecompressedSize + } default: totalObjectSize = objInfo.Size } diff --git a/cmd/auth-handler.go b/cmd/auth-handler.go index d6bac44e8..c4bd19d46 100644 --- a/cmd/auth-handler.go +++ b/cmd/auth-handler.go @@ -244,7 +244,7 @@ func isReqAuthenticated(r *http.Request, region string) (s3Error APIErrorCode) { // Verify 'Content-Md5' and/or 'X-Amz-Content-Sha256' if present. // The verification happens implicit during reading. - reader, err := hash.NewReader(r.Body, -1, hex.EncodeToString(contentMD5), hex.EncodeToString(contentSHA256)) + reader, err := hash.NewReader(r.Body, -1, hex.EncodeToString(contentMD5), hex.EncodeToString(contentSHA256), -1) if err != nil { return toAPIErrorCode(err) } diff --git a/cmd/bucket-handlers-listobjects.go b/cmd/bucket-handlers-listobjects.go index 22a80c73d..e827d54d6 100644 --- a/cmd/bucket-handlers-listobjects.go +++ b/cmd/bucket-handlers-listobjects.go @@ -102,7 +102,17 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http } for i := range listObjectsV2Info.Objects { - if crypto.IsEncrypted(listObjectsV2Info.Objects[i].UserDefined) { + var actualSize int64 + if listObjectsV2Info.Objects[i].IsCompressed() { + // Read the decompressed size from the meta.json. + actualSize = listObjectsV2Info.Objects[i].GetActualSize() + if actualSize < 0 { + writeErrorResponse(w, ErrInvalidDecompressedSize, r.URL) + return + } + // Set the info.Size to the actualSize. + listObjectsV2Info.Objects[i].Size = actualSize + } else if crypto.IsEncrypted(listObjectsV2Info.Objects[i].UserDefined) { listObjectsV2Info.Objects[i].Size, err = listObjectsV2Info.Objects[i].DecryptedSize() if err != nil { writeErrorResponse(w, toAPIErrorCode(err), r.URL) @@ -168,7 +178,17 @@ func (api objectAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *http } for i := range listObjectsInfo.Objects { - if crypto.IsEncrypted(listObjectsInfo.Objects[i].UserDefined) { + var actualSize int64 + if listObjectsInfo.Objects[i].IsCompressed() { + // Read the decompressed size from the meta.json. + actualSize = listObjectsInfo.Objects[i].GetActualSize() + if actualSize < 0 { + writeErrorResponse(w, ErrInvalidDecompressedSize, r.URL) + return + } + // Set the info.Size to the actualSize. + listObjectsInfo.Objects[i].Size = actualSize + } else if crypto.IsEncrypted(listObjectsInfo.Objects[i].UserDefined) { listObjectsInfo.Objects[i].Size, err = listObjectsInfo.Objects[i].DecryptedSize() if err != nil { writeErrorResponse(w, toAPIErrorCode(err), r.URL) @@ -176,7 +196,6 @@ func (api objectAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *http } } } - response := generateListObjectsV1Response(bucket, prefix, marker, delimiter, maxKeys, listObjectsInfo) // Write success response. diff --git a/cmd/bucket-handlers.go b/cmd/bucket-handlers.go index 7262bf9a3..b55caf456 100644 --- a/cmd/bucket-handlers.go +++ b/cmd/bucket-handlers.go @@ -590,7 +590,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h return } - hashReader, err := hash.NewReader(fileBody, fileSize, "", "") + hashReader, err := hash.NewReader(fileBody, fileSize, "", "", fileSize) if err != nil { logger.LogIf(ctx, err) writeErrorResponse(w, toAPIErrorCode(err), r.URL) @@ -614,7 +614,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h return } info := ObjectInfo{Size: fileSize} - hashReader, err = hash.NewReader(reader, info.EncryptedSize(), "", "") // do not try to verify encrypted content + hashReader, err = hash.NewReader(reader, info.EncryptedSize(), "", "", fileSize) // do not try to verify encrypted content if err != nil { writeErrorResponse(w, toAPIErrorCode(err), r.URL) return diff --git a/cmd/common-main.go b/cmd/common-main.go index 525a6285f..0a698d1a3 100644 --- a/cmd/common-main.go +++ b/cmd/common-main.go @@ -96,7 +96,18 @@ func handleCommonCmdArgs(ctx *cli.Context) { setConfigDir(configDirAbs) } +// Parses the given compression exclude list `extensions` or `content-types`. +func parseCompressIncludes(includes []string) ([]string, error) { + for _, e := range includes { + if len(e) == 0 { + return nil, uiErrInvalidCompressionIncludesValue(nil).Msg("extension/mime-type (%s) cannot be empty", e) + } + } + return includes, nil +} + func handleCommonEnvVars() { + compressEnvDelimiter := "," // Start profiler if env is set. if profiler := os.Getenv("_MINIO_PROFILER"); profiler != "" { var err error @@ -268,4 +279,28 @@ func handleCommonEnvVars() { globalKMSKeyID = kmsConf.Vault.Key.Name globalKMSConfig = kmsConf } + + if compress := os.Getenv("MINIO_COMPRESS"); compress != "" { + globalIsCompressionEnabled = strings.EqualFold(compress, "true") + } + + compressExtensions := os.Getenv("MINIO_COMPRESS_EXTENSIONS") + compressMimeTypes := os.Getenv("MINIO_COMPRESS_MIMETYPES") + if compressExtensions != "" || compressMimeTypes != "" { + globalIsEnvCompression = true + if compressExtensions != "" { + extensions, err := parseCompressIncludes(strings.Split(compressExtensions, compressEnvDelimiter)) + if err != nil { + logger.Fatal(err, "Invalid MINIO_COMPRESS_EXTENSIONS value (`%s`)", extensions) + } + globalCompressExtensions = extensions + } + if compressMimeTypes != "" { + contenttypes, err := parseCompressIncludes(strings.Split(compressMimeTypes, compressEnvDelimiter)) + if err != nil { + logger.Fatal(err, "Invalid MINIO_COMPRESS_MIMETYPES value (`%s`)", contenttypes) + } + globalCompressMimeTypes = contenttypes + } + } } diff --git a/cmd/config-current.go b/cmd/config-current.go index d386a240a..9e7949cd8 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -40,9 +40,9 @@ import ( // 6. Make changes in config-current_test.go for any test change // Config version -const serverConfigVersion = "29" +const serverConfigVersion = "30" -type serverConfig = serverConfigV29 +type serverConfig = serverConfigV30 var ( // globalServerConfig server config. @@ -228,6 +228,18 @@ func (s *serverConfig) Validate() error { return nil } +// SetCompressionConfig sets the current compression config +func (s *serverConfig) SetCompressionConfig(extensions []string, mimeTypes []string) { + s.Compression.Extensions = extensions + s.Compression.MimeTypes = mimeTypes + s.Compression.Enabled = globalIsCompressionEnabled +} + +// GetCompressionConfig gets the current compression config +func (s *serverConfig) GetCompressionConfig() compressionConfig { + return s.Compression +} + func (s *serverConfig) loadFromEnvs() { // If env is set override the credentials from config file. if globalIsEnvCreds { @@ -253,6 +265,10 @@ func (s *serverConfig) loadFromEnvs() { if globalKMS != nil { s.KMS = globalKMSConfig } + + if globalIsEnvCompression { + s.SetCompressionConfig(globalCompressExtensions, globalCompressMimeTypes) + } } // TestNotificationTargets tries to establish connections to all notification @@ -366,6 +382,8 @@ func (s *serverConfig) ConfigDiff(t *serverConfig) string { return "StorageClass configuration differs" case !reflect.DeepEqual(s.Cache, t.Cache): return "Cache configuration differs" + case !reflect.DeepEqual(s.Compression, t.Compression): + return "Compression configuration differs" case !reflect.DeepEqual(s.Notify.AMQP, t.Notify.AMQP): return "AMQP Notification configuration differs" case !reflect.DeepEqual(s.Notify.NATS, t.Notify.NATS): @@ -417,6 +435,11 @@ func newServerConfig() *serverConfig { }, KMS: crypto.KMSConfig{}, Notify: notifier{}, + Compression: compressionConfig{ + Enabled: false, + Extensions: globalCompressExtensions, + MimeTypes: globalCompressMimeTypes, + }, } // Make sure to initialize notification configs. @@ -480,6 +503,12 @@ func (s *serverConfig) loadToCachedConfigs() { globalKMSKeyID = globalKMSConfig.Vault.Key.Name } } + if !globalIsCompressionEnabled { + compressionConf := s.GetCompressionConfig() + globalCompressExtensions = compressionConf.Extensions + globalCompressMimeTypes = compressionConf.MimeTypes + globalIsCompressionEnabled = compressionConf.Enabled + } } // newConfig - initialize a new server config, saves env parameters if diff --git a/cmd/config-migrate.go b/cmd/config-migrate.go index dfb8257ff..278534961 100644 --- a/cmd/config-migrate.go +++ b/cmd/config-migrate.go @@ -2387,6 +2387,7 @@ func migrateV27ToV28() error { // config V28 is backward compatible with V27, load the old // config file in serverConfigV28 struct and initialize KMSConfig + srvConfig := &serverConfigV28{} _, err := quick.LoadConfig(configFile, globalEtcdClient, srvConfig) if os.IsNotExist(err) { @@ -2409,12 +2410,40 @@ func migrateV27ToV28() error { return nil } -// Migrates '.minio.sys/config.json' v27 to v28. +// Migrates '.minio.sys/config.json' to v30. func migrateMinioSysConfig(objAPI ObjectLayer) error { if err := migrateV27ToV28MinioSys(objAPI); err != nil { return err } - return migrateV28ToV29MinioSys(objAPI) + if err := migrateV28ToV29MinioSys(objAPI); err != nil { + return err + } + return migrateV29ToV30MinioSys(objAPI) +} + +func migrateV29ToV30MinioSys(objAPI ObjectLayer) error { + configFile := path.Join(minioConfigPrefix, minioConfigFile) + srvConfig, err := readServerConfig(context.Background(), objAPI) + if err == errConfigNotFound { + return nil + } else if err != nil { + return fmt.Errorf("Unable to load config file. %v", err) + } + if srvConfig.Version != "29" { + return nil + } + + srvConfig.Version = "30" + // Init compression config.For future migration, Compression config needs to be copied over from previous version. + srvConfig.Compression.Enabled = false + srvConfig.Compression.Extensions = globalCompressExtensions + srvConfig.Compression.MimeTypes = globalCompressMimeTypes + if err = saveServerConfig(context.Background(), objAPI, srvConfig); err != nil { + return fmt.Errorf("Failed to migrate config from 29 to 30 . %v", err) + } + + logger.Info(configMigrateMSGTemplate, configFile, "29", "30") + return nil } func migrateV28ToV29MinioSys(objAPI ObjectLayer) error { @@ -2431,7 +2460,7 @@ func migrateV28ToV29MinioSys(objAPI ObjectLayer) error { srvConfig.Version = "29" if err = saveServerConfig(context.Background(), objAPI, srvConfig); err != nil { - return fmt.Errorf("Failed to migrate config from ‘28’ to ‘29’. %v", err) + return fmt.Errorf("Failed to migrate config from ‘28’ to ‘29’. %v", err) } logger.Info(configMigrateMSGTemplate, configFile, "28", "29") @@ -2453,7 +2482,7 @@ func migrateV27ToV28MinioSys(objAPI ObjectLayer) error { srvConfig.Version = "28" srvConfig.KMS = crypto.KMSConfig{} if err = saveServerConfig(context.Background(), objAPI, srvConfig); err != nil { - return fmt.Errorf("Failed to migrate config from ‘27’ to ‘28’. %v", err) + return fmt.Errorf("Failed to migrate config from ‘27’ to ‘28’. %v", err) } logger.Info(configMigrateMSGTemplate, configFile, "27", "28") diff --git a/cmd/config-migrate_test.go b/cmd/config-migrate_test.go index 8116c4320..8fe5c8021 100644 --- a/cmd/config-migrate_test.go +++ b/cmd/config-migrate_test.go @@ -318,7 +318,6 @@ func TestServerConfigMigrateFaultyConfig(t *testing.T) { if err := migrateV26ToV27(); err == nil { t.Fatal("migrateConfigV26ToV27() should fail with a corrupted json") } - if err := migrateV27ToV28(); err == nil { t.Fatal("migrateConfigV27ToV28() should fail with a corrupted json") } diff --git a/cmd/config-versions.go b/cmd/config-versions.go index 4c6b4163a..ba8baa28d 100644 --- a/cmd/config-versions.go +++ b/cmd/config-versions.go @@ -755,8 +755,16 @@ type serverConfigV28 struct { Logger loggerConfig `json:"logger"` } -// serverConfigV29 is just like version '28', browser and domain are deprecated. -type serverConfigV29 struct { +// compressionConfig represents the compression settings. +type compressionConfig struct { + Enabled bool `json:"enabled"` + Extensions []string `json:"extensions"` + MimeTypes []string `json:"mime-types"` +} + +// serverConfigV30 is just like version '29', stores additionally +// extensions and mimetypes fields for compression. +type serverConfigV30 struct { quick.Config `json:"-"` // ignore interfaces Version string `json:"version"` @@ -780,4 +788,7 @@ type serverConfigV29 struct { // Logger configuration Logger loggerConfig `json:"logger"` + + // Compression configuration + Compression compressionConfig `json:"compress"` } diff --git a/cmd/config.go b/cmd/config.go index b939ae68f..b362ef455 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -172,7 +172,7 @@ func checkServerConfig(ctx context.Context, objAPI ObjectLayer) error { } func saveConfig(objAPI ObjectLayer, configFile string, data []byte) error { - hashReader, err := hash.NewReader(bytes.NewReader(data), int64(len(data)), "", getSHA256Hash(data)) + hashReader, err := hash.NewReader(bytes.NewReader(data), int64(len(data)), "", getSHA256Hash(data), int64(len(data))) if err != nil { return err } diff --git a/cmd/disk-cache-fs.go b/cmd/disk-cache-fs.go index b92a85dea..94a564ceb 100644 --- a/cmd/disk-cache-fs.go +++ b/cmd/disk-cache-fs.go @@ -354,7 +354,7 @@ func (cfs *cacheFSObjects) PutObject(ctx context.Context, bucket string, object } // Validate input data size and it can never be less than zero. - if data.Size() < 0 { + if data.Size() < -1 { logger.LogIf(ctx, errInvalidArgument) return ObjectInfo{}, errInvalidArgument } diff --git a/cmd/disk-cache.go b/cmd/disk-cache.go index d5aa14ea9..6d08233a5 100644 --- a/cmd/disk-cache.go +++ b/cmd/disk-cache.go @@ -244,7 +244,7 @@ func (c cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string, // Initialize pipe. pipeReader, pipeWriter := io.Pipe() teeReader := io.TeeReader(bkReader, pipeWriter) - hashReader, herr := hash.NewReader(pipeReader, bkReader.ObjInfo.Size, "", "") + hashReader, herr := hash.NewReader(pipeReader, bkReader.ObjInfo.Size, "", "", bkReader.ObjInfo.Size) if herr != nil { bkReader.Close() return nil, herr @@ -314,7 +314,7 @@ func (c cacheObjects) GetObject(ctx context.Context, bucket, object string, star } // Initialize pipe. pipeReader, pipeWriter := io.Pipe() - hashReader, err := hash.NewReader(pipeReader, objInfo.Size, "", "") + hashReader, err := hash.NewReader(pipeReader, objInfo.Size, "", "", objInfo.Size) if err != nil { return err } @@ -671,13 +671,13 @@ func (c cacheObjects) PutObject(ctx context.Context, bucket, object string, r *h objInfo = ObjectInfo{} // Initialize pipe to stream data to backend pipeReader, pipeWriter := io.Pipe() - hashReader, err := hash.NewReader(pipeReader, size, r.MD5HexString(), r.SHA256HexString()) + hashReader, err := hash.NewReader(pipeReader, size, r.MD5HexString(), r.SHA256HexString(), r.ActualSize()) if err != nil { return ObjectInfo{}, err } // Initialize pipe to stream data to cache rPipe, wPipe := io.Pipe() - cHashReader, err := hash.NewReader(rPipe, size, r.MD5HexString(), r.SHA256HexString()) + cHashReader, err := hash.NewReader(rPipe, size, r.MD5HexString(), r.SHA256HexString(), r.ActualSize()) if err != nil { return ObjectInfo{}, err } @@ -764,13 +764,13 @@ func (c cacheObjects) PutObjectPart(ctx context.Context, bucket, object, uploadI info = PartInfo{} // Initialize pipe to stream data to backend pipeReader, pipeWriter := io.Pipe() - hashReader, err := hash.NewReader(pipeReader, size, data.MD5HexString(), data.SHA256HexString()) + hashReader, err := hash.NewReader(pipeReader, size, data.MD5HexString(), data.SHA256HexString(), data.ActualSize()) if err != nil { return } // Initialize pipe to stream data to cache rPipe, wPipe := io.Pipe() - cHashReader, err := hash.NewReader(rPipe, size, data.MD5HexString(), data.SHA256HexString()) + cHashReader, err := hash.NewReader(rPipe, size, data.MD5HexString(), data.SHA256HexString(), data.ActualSize()) if err != nil { return } diff --git a/cmd/disk-cache_test.go b/cmd/disk-cache_test.go index 839c3d51a..83bb36452 100644 --- a/cmd/disk-cache_test.go +++ b/cmd/disk-cache_test.go @@ -195,7 +195,7 @@ func TestDiskCache(t *testing.T) { opts := ObjectOptions{} byteReader := bytes.NewReader([]byte(content)) - hashReader, err := hash.NewReader(byteReader, int64(size), "", "") + hashReader, err := hash.NewReader(byteReader, int64(size), "", "", int64(size)) if err != nil { t.Fatal(err) } @@ -270,7 +270,7 @@ func TestDiskCacheMaxUse(t *testing.T) { opts := ObjectOptions{} byteReader := bytes.NewReader([]byte(content)) - hashReader, err := hash.NewReader(byteReader, int64(size), "", "") + hashReader, err := hash.NewReader(byteReader, int64(size), "", "", int64(size)) if err != nil { t.Fatal(err) } diff --git a/cmd/dummy-object-layer_test.go b/cmd/dummy-object-layer_test.go index e8bcf9430..cbf50b4aa 100644 --- a/cmd/dummy-object-layer_test.go +++ b/cmd/dummy-object-layer_test.go @@ -159,3 +159,7 @@ func (api *DummyObjectLayer) IsNotificationSupported() (b bool) { func (api *DummyObjectLayer) IsEncryptionSupported() (b bool) { return } + +func (api *DummyObjectLayer) IsCompressionSupported() (b bool) { + return +} diff --git a/cmd/erasure-utils.go b/cmd/erasure-utils.go index 2cda3d78c..411c6751e 100644 --- a/cmd/erasure-utils.go +++ b/cmd/erasure-utils.go @@ -91,6 +91,7 @@ func writeDataBlocks(ctx context.Context, dst io.Writer, enBlocks [][]byte, data // Copy the block. n, err := io.Copy(dst, bytes.NewReader(block)) if err != nil { + // The writer will be closed incase of range queries, which will emit ErrClosedPipe. if !strings.Contains(err.Error(), "read/write on closed pipe") { logger.LogIf(ctx, err) } diff --git a/cmd/fs-v1-metadata.go b/cmd/fs-v1-metadata.go index f7727aee6..0e8bccd2f 100644 --- a/cmd/fs-v1-metadata.go +++ b/cmd/fs-v1-metadata.go @@ -223,11 +223,13 @@ func parseFSPartsArray(fsMetaBuf []byte) []objectPartInfo { name := gjson.Get(partJSON, "name").String() etag := gjson.Get(partJSON, "etag").String() size := gjson.Get(partJSON, "size").Int() + actualSize := gjson.Get(partJSON, "actualSize").Int() partsArray = append(partsArray, objectPartInfo{ - Number: int(number), - Name: name, - ETag: etag, - Size: size, + Number: int(number), + Name: name, + ETag: etag, + Size: size, + ActualSize: int64(actualSize), }) return true }) diff --git a/cmd/fs-v1-multipart.go b/cmd/fs-v1-multipart.go index 5c1fc38be..8cfa4f84d 100644 --- a/cmd/fs-v1-multipart.go +++ b/cmd/fs-v1-multipart.go @@ -46,21 +46,25 @@ func (fs *FSObjects) getMultipartSHADir(bucket, object string) string { } // Returns partNumber.etag -func (fs *FSObjects) encodePartFile(partNumber int, etag string) string { - return fmt.Sprintf("%.5d.%s", partNumber, etag) +func (fs *FSObjects) encodePartFile(partNumber int, etag string, actualSize int64) string { + return fmt.Sprintf("%.5d.%s.%d", partNumber, etag, actualSize) } // Returns partNumber and etag -func (fs *FSObjects) decodePartFile(name string) (partNumber int, etag string, err error) { +func (fs *FSObjects) decodePartFile(name string) (partNumber int, etag string, actualSize int64, err error) { result := strings.Split(name, ".") - if len(result) != 2 { - return 0, "", errUnexpected + if len(result) != 3 { + return 0, "", 0, errUnexpected } partNumber, err = strconv.Atoi(result[0]) if err != nil { - return 0, "", errUnexpected + return 0, "", 0, errUnexpected } - return partNumber, result[1], nil + actualSize, err = strconv.ParseInt(result[2], 10, 64) + if err != nil { + return 0, "", 0, errUnexpected + } + return partNumber, result[1], actualSize, nil } // Appends parts to an appendFile sequentially. @@ -95,7 +99,7 @@ func (fs *FSObjects) backgroundAppend(ctx context.Context, bucket, object, uploa if entry == fs.metaJSONFile { continue } - partNumber, etag, err := fs.decodePartFile(entry) + partNumber, etag, actualSize, err := fs.decodePartFile(entry) if err != nil { logger.GetReqInfo(ctx).AppendTags("entry", entry) logger.LogIf(ctx, err) @@ -119,7 +123,7 @@ func (fs *FSObjects) backgroundAppend(ctx context.Context, bucket, object, uploa return } - file.parts = append(file.parts, PartInfo{PartNumber: nextPartNumber, ETag: etag}) + file.parts = append(file.parts, PartInfo{PartNumber: partNumber, ETag: etag, ActualSize: actualSize}) nextPartNumber++ } } @@ -276,8 +280,8 @@ func (fs *FSObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID return pi, toObjectErr(err, bucket) } - // Validate input data size and it can never be less than zero. - if data.Size() < 0 { + // Validate input data size and it can never be less than -1. + if data.Size() < -1 { logger.LogIf(ctx, errInvalidArgument) return pi, toObjectErr(errInvalidArgument) } @@ -322,7 +326,7 @@ func (fs *FSObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID if etag == "" { etag = GenETag() } - partPath := pathJoin(uploadIDDir, fs.encodePartFile(partID, etag)) + partPath := pathJoin(uploadIDDir, fs.encodePartFile(partID, etag, data.ActualSize())) if err = fsRenameFile(ctx, tmpPartPath, partPath); err != nil { return pi, toObjectErr(err, minioMetaMultipartBucket, partPath) @@ -339,6 +343,7 @@ func (fs *FSObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID LastModified: fi.ModTime(), ETag: etag, Size: fi.Size(), + ActualSize: data.ActualSize(), }, nil } @@ -365,8 +370,7 @@ func (fs *FSObjects) ListObjectParts(ctx context.Context, bucket, object, upload } uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID) - _, err := fsStatFile(ctx, pathJoin(uploadIDDir, fs.metaJSONFile)) - if err != nil { + if _, err := fsStatFile(ctx, pathJoin(uploadIDDir, fs.metaJSONFile)); err != nil { if err == errFileNotFound || err == errFileAccessDenied { return result, InvalidUploadID{UploadID: uploadID} } @@ -384,7 +388,7 @@ func (fs *FSObjects) ListObjectParts(ctx context.Context, bucket, object, upload if entry == fs.metaJSONFile { continue } - partNumber, etag1, derr := fs.decodePartFile(entry) + partNumber, etag1, _, derr := fs.decodePartFile(entry) if derr != nil { logger.LogIf(ctx, derr) return result, toObjectErr(derr) @@ -394,11 +398,11 @@ func (fs *FSObjects) ListObjectParts(ctx context.Context, bucket, object, upload partsMap[partNumber] = etag1 continue } - stat1, serr := fsStatFile(ctx, pathJoin(uploadIDDir, fs.encodePartFile(partNumber, etag1))) + stat1, serr := fsStatFile(ctx, pathJoin(uploadIDDir, getPartFile(entries, partNumber, etag1))) if serr != nil { return result, toObjectErr(serr) } - stat2, serr := fsStatFile(ctx, pathJoin(uploadIDDir, fs.encodePartFile(partNumber, etag2))) + stat2, serr := fsStatFile(ctx, pathJoin(uploadIDDir, getPartFile(entries, partNumber, etag2))) if serr != nil { return result, toObjectErr(serr) } @@ -408,8 +412,19 @@ func (fs *FSObjects) ListObjectParts(ctx context.Context, bucket, object, upload } var parts []PartInfo + var actualSize int64 for partNumber, etag := range partsMap { - parts = append(parts, PartInfo{PartNumber: partNumber, ETag: etag}) + partFile := getPartFile(entries, partNumber, etag) + if partFile == "" { + return result, InvalidPart{} + } + // Read the actualSize from the pathFileName. + subParts := strings.Split(partFile, ".") + actualSize, err = strconv.ParseInt(subParts[len(subParts)-1], 10, 64) + if err != nil { + return result, InvalidPart{} + } + parts = append(parts, PartInfo{PartNumber: partNumber, ETag: etag, ActualSize: actualSize}) } sort.Slice(parts, func(i int, j int) bool { return parts[i].PartNumber < parts[j].PartNumber @@ -439,12 +454,12 @@ func (fs *FSObjects) ListObjectParts(ctx context.Context, bucket, object, upload } for i, part := range result.Parts { var stat os.FileInfo - stat, err = fsStatFile(ctx, pathJoin(uploadIDDir, fs.encodePartFile(part.PartNumber, part.ETag))) + stat, err = fsStatFile(ctx, pathJoin(uploadIDDir, fs.encodePartFile(part.PartNumber, part.ETag, part.ActualSize))) if err != nil { return result, toObjectErr(err) } result.Parts[i].LastModified = stat.ModTime() - result.Parts[i].Size = stat.Size() + result.Parts[i].Size = part.ActualSize } fsMetaBytes, err := ioutil.ReadFile(pathJoin(uploadIDDir, fs.metaJSONFile)) @@ -464,6 +479,9 @@ func (fs *FSObjects) ListObjectParts(ctx context.Context, bucket, object, upload // // Implements S3 compatible Complete multipart API. func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, parts []CompletePart) (oi ObjectInfo, e error) { + + var actualSize int64 + if err := checkCompleteMultipartArgs(ctx, bucket, object, fs); err != nil { return oi, toObjectErr(err) } @@ -500,9 +518,37 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string, // Allocate parts similar to incoming slice. fsMeta.Parts = make([]objectPartInfo, len(parts)) + entries, err := readDir(uploadIDDir) + if err != nil { + logger.GetReqInfo(ctx).AppendTags("uploadIDDir", uploadIDDir) + logger.LogIf(ctx, err) + return oi, err + } + + // Save consolidated actual size. + var objectActualSize int64 // Validate all parts and then commit to disk. for i, part := range parts { - partPath := pathJoin(uploadIDDir, fs.encodePartFile(part.PartNumber, part.ETag)) + partFile := getPartFile(entries, part.PartNumber, part.ETag) + if partFile == "" { + return oi, InvalidPart{ + PartNumber: part.PartNumber, + GotETag: part.ETag, + } + } + + // Read the actualSize from the pathFileName. + subParts := strings.Split(partFile, ".") + actualSize, err = strconv.ParseInt(subParts[len(subParts)-1], 10, 64) + if err != nil { + return oi, InvalidPart{ + PartNumber: part.PartNumber, + GotETag: part.ETag, + } + } + + partPath := pathJoin(uploadIDDir, partFile) + var fi os.FileInfo fi, err = fsStatFile(ctx, partPath) if err != nil { @@ -512,24 +558,28 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string, return oi, err } if partSize == -1 { - partSize = fi.Size() + partSize = actualSize } fsMeta.Parts[i] = objectPartInfo{ - Number: part.PartNumber, - ETag: part.ETag, - Size: fi.Size(), + Number: part.PartNumber, + ETag: part.ETag, + Size: fi.Size(), + ActualSize: actualSize, } + // Consolidate the actual size. + objectActualSize += actualSize + if i == len(parts)-1 { break } // All parts except the last part has to be atleast 5MB. - if !isMinAllowedPartSize(fi.Size()) { + if !isMinAllowedPartSize(actualSize) { return oi, PartTooSmall{ PartNumber: part.PartNumber, - PartSize: fi.Size(), + PartSize: actualSize, PartETag: part.ETag, } } @@ -573,7 +623,8 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string, if appendFallback { fsRemoveFile(ctx, file.filePath) for _, part := range parts { - partPath := pathJoin(uploadIDDir, fs.encodePartFile(part.PartNumber, part.ETag)) + partPath := getPartFile(entries, part.PartNumber, part.ETag) + partPath = pathJoin(uploadIDDir, partPath) err = mioutil.AppendFile(appendFilePath, partPath) if err != nil { logger.LogIf(ctx, err) @@ -612,6 +663,8 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string, fsMeta.Meta = make(map[string]string) } fsMeta.Meta["etag"] = s3MD5 + // Save consolidated actual size. + fsMeta.Meta[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(objectActualSize, 10) if _, err = fsMeta.WriteTo(metaFile); err != nil { logger.LogIf(ctx, err) return oi, toObjectErr(err, bucket, object) diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index c0aeeb9f1..db7d5aa02 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -662,7 +662,10 @@ func (fs *FSObjects) getObject(ctx context.Context, bucket, object string, offse buf := make([]byte, int(bufSize)) _, err = io.CopyBuffer(writer, io.LimitReader(reader, length), buf) - logger.LogIf(ctx, err) + // The writer will be closed incase of range queries, which will emit ErrClosedPipe. + if err == io.ErrClosedPipe { + err = nil + } return toObjectErr(err, bucket, object) } @@ -882,7 +885,7 @@ func (fs *FSObjects) putObject(ctx context.Context, bucket string, object string } // Validate input data size and it can never be less than zero. - if data.Size() < 0 { + if data.Size() < -1 { logger.LogIf(ctx, errInvalidArgument) return ObjectInfo{}, errInvalidArgument } @@ -1325,3 +1328,8 @@ func (fs *FSObjects) IsNotificationSupported() bool { func (fs *FSObjects) IsEncryptionSupported() bool { return true } + +// IsCompressionSupported returns whether compression is applicable for this layer. +func (fs *FSObjects) IsCompressionSupported() bool { + return true +} diff --git a/cmd/gateway-unsupported.go b/cmd/gateway-unsupported.go index 0fbbcfb20..c6dc8b0d7 100644 --- a/cmd/gateway-unsupported.go +++ b/cmd/gateway-unsupported.go @@ -151,3 +151,8 @@ func (a GatewayUnsupported) IsNotificationSupported() bool { func (a GatewayUnsupported) IsEncryptionSupported() bool { return false } + +// IsCompressionSupported returns whether compression is applicable for this layer. +func (a GatewayUnsupported) IsCompressionSupported() bool { + return false +} diff --git a/cmd/gateway/azure/gateway-azure.go b/cmd/gateway/azure/gateway-azure.go index df40fd138..7d2fe2cbc 100644 --- a/cmd/gateway/azure/gateway-azure.go +++ b/cmd/gateway/azure/gateway-azure.go @@ -1148,3 +1148,8 @@ func (a *azureObjects) DeleteBucketPolicy(ctx context.Context, bucket string) er err := container.SetPermissions(perm, nil) return azureToObjectError(err) } + +// IsCompressionSupported returns whether compression is applicable for this layer. +func (a *azureObjects) IsCompressionSupported() bool { + return false +} diff --git a/cmd/gateway/b2/gateway-b2.go b/cmd/gateway/b2/gateway-b2.go index 771c6aea1..5128b5552 100644 --- a/cmd/gateway/b2/gateway-b2.go +++ b/cmd/gateway/b2/gateway-b2.go @@ -840,3 +840,8 @@ func (l *b2Objects) DeleteBucketPolicy(ctx context.Context, bucket string) error logger.LogIf(ctx, err) return b2ToObjectError(err) } + +// IsCompressionSupported returns whether compression is applicable for this layer. +func (l *b2Objects) IsCompressionSupported() bool { + return false +} diff --git a/cmd/gateway/gcs/gateway-gcs.go b/cmd/gateway/gcs/gateway-gcs.go index 810ba48ca..37a546c32 100644 --- a/cmd/gateway/gcs/gateway-gcs.go +++ b/cmd/gateway/gcs/gateway-gcs.go @@ -1453,3 +1453,8 @@ func (l *gcsGateway) DeleteBucketPolicy(ctx context.Context, bucket string) erro return nil } + +// IsCompressionSupported returns whether compression is applicable for this layer. +func (l *gcsGateway) IsCompressionSupported() bool { + return false +} diff --git a/cmd/gateway/manta/gateway-manta.go b/cmd/gateway/manta/gateway-manta.go index 3719ec151..8b21e3926 100644 --- a/cmd/gateway/manta/gateway-manta.go +++ b/cmd/gateway/manta/gateway-manta.go @@ -659,3 +659,8 @@ func (t *tritonObjects) DeleteObject(ctx context.Context, bucket, object string) return nil } + +// IsCompressionSupported returns whether compression is applicable for this layer. +func (t *tritonObjects) IsCompressionSupported() bool { + return false +} diff --git a/cmd/gateway/nas/gateway-nas.go b/cmd/gateway/nas/gateway-nas.go index a1d7b9756..d6c89e73e 100644 --- a/cmd/gateway/nas/gateway-nas.go +++ b/cmd/gateway/nas/gateway-nas.go @@ -125,3 +125,8 @@ type nasObjects struct { func (l *nasObjects) IsNotificationSupported() bool { return false } + +// IsCompressionSupported returns whether compression is applicable for this layer. +func (l *nasObjects) IsCompressionSupported() bool { + return false +} diff --git a/cmd/gateway/oss/gateway-oss.go b/cmd/gateway/oss/gateway-oss.go index 1d989f43c..1084891c3 100644 --- a/cmd/gateway/oss/gateway-oss.go +++ b/cmd/gateway/oss/gateway-oss.go @@ -1095,3 +1095,8 @@ func (l *ossObjects) DeleteBucketPolicy(ctx context.Context, bucket string) erro } return nil } + +// IsCompressionSupported returns whether compression is applicable for this layer. +func (l *ossObjects) IsCompressionSupported() bool { + return false +} diff --git a/cmd/gateway/s3/gateway-s3.go b/cmd/gateway/s3/gateway-s3.go index b23a3d065..a47d482d7 100644 --- a/cmd/gateway/s3/gateway-s3.go +++ b/cmd/gateway/s3/gateway-s3.go @@ -538,3 +538,8 @@ func (l *s3Objects) DeleteBucketPolicy(ctx context.Context, bucket string) error } return nil } + +// IsCompressionSupported returns whether compression is applicable for this layer. +func (l *s3Objects) IsCompressionSupported() bool { + return false +} diff --git a/cmd/gateway/sia/gateway-sia.go b/cmd/gateway/sia/gateway-sia.go index 6f2f9d71e..7b8c9ffa0 100644 --- a/cmd/gateway/sia/gateway-sia.go +++ b/cmd/gateway/sia/gateway-sia.go @@ -651,3 +651,8 @@ func (s *siaObjects) deleteTempFileWhenUploadCompletes(ctx context.Context, temp os.Remove(tempFile) } + +// IsCompressionSupported returns whether compression is applicable for this layer. +func (s *siaObjects) IsCompressionSupported() bool { + return false +} diff --git a/cmd/globals.go b/cmd/globals.go index e6a2bda28..fb50bda3c 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -227,6 +227,23 @@ var ( globalKMS crypto.KMS // KMS config globalKMSConfig crypto.KMSConfig + + // Is compression include extensions/content-types set. + globalIsEnvCompression bool + + // Is compression enabeld. + globalIsCompressionEnabled = false + + // Include-list for compression. + globalCompressExtensions = []string{".txt", ".log", ".csv", ".json"} + globalCompressMimeTypes = []string{"text/csv", "text/plain", "application/json"} + + // Some standard object extensions which we strictly dis-allow for compression. + standardExcludeCompressExtensions = []string{".gz", ".bz2", ".rar", ".zip", ".7z"} + + // Some standard content-types which we strictly dis-allow for compression. + standardExcludeCompressContentTypes = []string{"video/*", "audio/*", "application/zip", "application/x-gzip", "application/x-zip-compressed", " application/x-compress", "application/x-spoon"} + // Add new variable global values here. ) diff --git a/cmd/object-api-datatypes.go b/cmd/object-api-datatypes.go index 835e61460..3b2208b50 100644 --- a/cmd/object-api-datatypes.go +++ b/cmd/object-api-datatypes.go @@ -258,6 +258,9 @@ type PartInfo struct { // Size in bytes of the part. Size int64 + + // Decompressed Size. + ActualSize int64 } // MultipartInfo - represents metadata in progress multipart upload. diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 489810b61..529f1a904 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -96,4 +96,7 @@ type ObjectLayer interface { // Supported operations check IsNotificationSupported() bool IsEncryptionSupported() bool + + // Compression support check. + IsCompressionSupported() bool } diff --git a/cmd/object-api-utils.go b/cmd/object-api-utils.go index 7d1119b0f..2da1bccea 100644 --- a/cmd/object-api-utils.go +++ b/cmd/object-api-utils.go @@ -25,15 +25,18 @@ import ( "net/http" "path" "runtime" + "strconv" "strings" "sync" "time" "unicode/utf8" + snappy "github.com/golang/snappy" "github.com/minio/minio/cmd/crypto" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/dns" "github.com/minio/minio/pkg/ioutil" + "github.com/minio/minio/pkg/wildcard" "github.com/skyrings/skyring-common/tools/uuid" ) @@ -301,6 +304,107 @@ func getRandomHostPort(records []dns.SrvRecord) (string, int) { return srvRecord.Host, srvRecord.Port } +// IsCompressed returns true if the object is marked as compressed. +func (o ObjectInfo) IsCompressed() bool { + _, ok := o.UserDefined[ReservedMetadataPrefix+"compression"] + return ok +} + +// GetActualSize - read the decompressed size from the meta json. +func (o ObjectInfo) GetActualSize() int64 { + metadata := o.UserDefined + sizeStr, ok := metadata[ReservedMetadataPrefix+"actual-size"] + if ok { + size, err := strconv.ParseInt(sizeStr, 10, 64) + if err == nil { + return size + } + } + return -1 +} + +// Disabling compression for encrypted enabled requests. +// Using compression and encryption together enables room for side channel attacks. +// Eliminate non-compressible objects by extensions/content-types. +func isCompressible(header http.Header, object string) bool { + if hasServerSideEncryptionHeader(header) || excludeForCompression(header, object) { + return false + } + return true +} + +// Eliminate the non-compressible objects. +func excludeForCompression(header http.Header, object string) bool { + objStr := object + contentType := header.Get("Content-Type") + if globalIsCompressionEnabled { + // We strictly disable compression for standard extensions/content-types (`compressed`). + if hasStringSuffixInSlice(objStr, standardExcludeCompressExtensions) || hasPattern(standardExcludeCompressContentTypes, contentType) { + return true + } + // Filter compression includes. + if len(globalCompressExtensions) > 0 || len(globalCompressMimeTypes) > 0 { + extensions := globalCompressExtensions + mimeTypes := globalCompressMimeTypes + if hasStringSuffixInSlice(objStr, extensions) || hasPattern(mimeTypes, contentType) { + return false + } + return true + } + return false + } + return true +} + +// Utility which returns if a string is present in the list. +func hasStringSuffixInSlice(str string, list []string) bool { + for _, v := range list { + if strings.HasSuffix(str, v) { + return true + } + } + return false +} + +// Returns true if any of the given wildcard patterns match the matchStr. +func hasPattern(patterns []string, matchStr string) bool { + for _, pattern := range patterns { + if ok := wildcard.MatchSimple(pattern, matchStr); ok { + return true + } + } + return false +} + +// Returns the part file name which matches the partNumber and etag. +func getPartFile(entries []string, partNumber int, etag string) string { + for _, entry := range entries { + if strings.HasPrefix(entry, fmt.Sprintf("%.5d.%s.", partNumber, etag)) { + return entry + } + } + return "" +} + +// Returs the compressed offset which should be skipped. +func getCompressedOffsets(objectInfo ObjectInfo, offset int64) (int64, int64) { + var compressedOffset int64 + var skipLength int64 + var cumulativeActualSize int64 + if len(objectInfo.Parts) > 0 { + for _, part := range objectInfo.Parts { + cumulativeActualSize += part.ActualSize + if cumulativeActualSize <= offset { + compressedOffset += part.Size + } else { + skipLength = cumulativeActualSize - part.ActualSize + break + } + } + } + return compressedOffset, offset - skipLength +} + // byBucketName is a collection satisfying sort.Interface. type byBucketName []BucketInfo @@ -352,6 +456,7 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, cleanUpFns ...func()) }() isEncrypted := crypto.IsEncrypted(oi.UserDefined) + isCompressed := oi.IsCompressed() var skipLen int64 // Calculate range to read (different for // e.g. encrypted/compressed objects) @@ -407,6 +512,50 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, cleanUpFns ...func()) } return r, nil } + case isCompressed: + // Read the decompressed size from the meta.json. + actualSize := oi.GetActualSize() + if actualSize < 0 { + return nil, 0, 0, errInvalidDecompressedSize + } + off, length = int64(0), oi.Size + decOff, decLength := int64(0), actualSize + if rs != nil { + off, length, err = rs.GetOffsetLength(actualSize) + if err != nil { + return nil, 0, 0, err + } + // Incase of range based queries on multiparts, the offset and length are reduced. + off, decOff = getCompressedOffsets(oi, off) + decLength = length + length = oi.Size - off + + // For negative length we read everything. + if decLength < 0 { + decLength = actualSize - decOff + } + + // Reply back invalid range if the input offset and length fall out of range. + if decOff > actualSize || decOff+decLength > actualSize { + return nil, 0, 0, errInvalidRange + } + } + fn = func(inputReader io.Reader, _ http.Header, cFns ...func()) (r *GetObjectReader, err error) { + // Decompression reader. + snappyReader := snappy.NewReader(inputReader) + // Apply the skipLen and limit on the + // decompressed stream + decReader := io.LimitReader(ioutil.NewSkipReader(snappyReader, decOff), decLength) + oi.Size = decLength + + // Assemble the GetObjectReader + r = &GetObjectReader{ + ObjInfo: oi, + pReader: decReader, + cleanUpFns: append(cleanUpFns, cFns...), + } + return r, nil + } default: off, length, err = rs.GetOffsetLength(oi.Size) diff --git a/cmd/object-api-utils_test.go b/cmd/object-api-utils_test.go index b9d8781b6..e2d75da80 100644 --- a/cmd/object-api-utils_test.go +++ b/cmd/object-api-utils_test.go @@ -18,6 +18,7 @@ package cmd import ( "context" + "net/http" "reflect" "testing" ) @@ -296,3 +297,250 @@ func TestCleanMetadataKeys(t *testing.T) { } } } + +// Tests isCompressed method +func TestIsCompressed(t *testing.T) { + testCases := []struct { + objInfo ObjectInfo + result bool + }{ + { + objInfo: ObjectInfo{ + UserDefined: map[string]string{"X-Minio-Internal-compression": "golang/snappy/LZ77", + "content-type": "application/octet-stream", + "etag": "b3ff3ef3789147152fbfbc50efba4bfd-2"}, + }, + result: true, + }, + { + objInfo: ObjectInfo{ + UserDefined: map[string]string{"X-Minio-Internal-XYZ": "golang/snappy/LZ77", + "content-type": "application/octet-stream", + "etag": "b3ff3ef3789147152fbfbc50efba4bfd-2"}, + }, + result: false, + }, + { + objInfo: ObjectInfo{ + UserDefined: map[string]string{"content-type": "application/octet-stream", + "etag": "b3ff3ef3789147152fbfbc50efba4bfd-2"}, + }, + result: false, + }, + } + for i, test := range testCases { + got := test.objInfo.IsCompressed() + if got != test.result { + t.Errorf("Test %d - expected %v but received %v", + i+1, test.result, got) + } + } +} + +// Tests excludeForCompression. +func TestExcludeForCompression(t *testing.T) { + testCases := []struct { + object string + header http.Header + result bool + }{ + { + object: "object.txt", + header: http.Header{ + "Content-Type": []string{"application/zip"}, + }, + result: true, + }, + { + object: "object.zip", + header: http.Header{ + "Content-Type": []string{"application/XYZ"}, + }, + result: true, + }, + { + object: "object.json", + header: http.Header{ + "Content-Type": []string{"application/json"}, + }, + result: false, + }, + { + object: "object.txt", + header: http.Header{ + "Content-Type": []string{"text/plain"}, + }, + result: false, + }, + } + for i, test := range testCases { + globalIsCompressionEnabled = true + got := excludeForCompression(test.header, test.object) + globalIsCompressionEnabled = false + if got != test.result { + t.Errorf("Test %d - expected %v but received %v", + i+1, test.result, got) + } + } +} + +// Test getPartFile function. +func TestGetPartFile(t *testing.T) { + testCases := []struct { + entries []string + partNumber int + etag string + result string + }{ + { + entries: []string{"00001.8a034f82cb9cb31140d87d3ce2a9ede3.67108864", "fs.json", "00002.d73d8ab724016dfb051e2d3584495c54.32891137"}, + partNumber: 1, + etag: "8a034f82cb9cb31140d87d3ce2a9ede3", + result: "00001.8a034f82cb9cb31140d87d3ce2a9ede3.67108864", + }, + { + entries: []string{"00001.8a034f82cb9cb31140d87d3ce2a9ede3.67108864", "fs.json", "00002.d73d8ab724016dfb051e2d3584495c54.32891137"}, + partNumber: 2, + etag: "d73d8ab724016dfb051e2d3584495c54", + result: "00002.d73d8ab724016dfb051e2d3584495c54.32891137", + }, + { + entries: []string{"00001.8a034f82cb9cb31140d87d3ce2a9ede3.67108864", "fs.json", "00002.d73d8ab724016dfb051e2d3584495c54.32891137"}, + partNumber: 1, + etag: "d73d8ab724016dfb051e2d3584495c54", + result: "", + }, + } + for i, test := range testCases { + got := getPartFile(test.entries, test.partNumber, test.etag) + if got != test.result { + t.Errorf("Test %d - expected %s but received %s", + i+1, test.result, got) + } + } +} + +func TestGetActualSize(t *testing.T) { + testCases := []struct { + objInfo ObjectInfo + result int64 + }{ + { + objInfo: ObjectInfo{ + UserDefined: map[string]string{"X-Minio-Internal-compression": "golang/snappy/LZ77", + "X-Minio-Internal-actual-size": "100000001", + "content-type": "application/octet-stream", + "etag": "b3ff3ef3789147152fbfbc50efba4bfd-2"}, + Parts: []objectPartInfo{ + { + Size: 39235668, + ActualSize: 67108864, + }, + { + Size: 19177372, + ActualSize: 32891137, + }, + }, + }, + result: 100000001, + }, + { + objInfo: ObjectInfo{ + UserDefined: map[string]string{"X-Minio-Internal-compression": "golang/snappy/LZ77", + "X-Minio-Internal-actual-size": "841", + "content-type": "application/octet-stream", + "etag": "b3ff3ef3789147152fbfbc50efba4bfd-2"}, + Parts: []objectPartInfo{}, + }, + result: 841, + }, + { + objInfo: ObjectInfo{ + UserDefined: map[string]string{"X-Minio-Internal-compression": "golang/snappy/LZ77", + "content-type": "application/octet-stream", + "etag": "b3ff3ef3789147152fbfbc50efba4bfd-2"}, + Parts: []objectPartInfo{}, + }, + result: -1, + }, + } + for i, test := range testCases { + got := test.objInfo.GetActualSize() + if got != test.result { + t.Errorf("Test %d - expected %d but received %d", + i+1, test.result, got) + } + } +} + +func TestGetCompressedOffsets(t *testing.T) { + testCases := []struct { + objInfo ObjectInfo + offset int64 + startOffset int64 + snappyStartOffset int64 + }{ + { + objInfo: ObjectInfo{ + Parts: []objectPartInfo{ + { + Size: 39235668, + ActualSize: 67108864, + }, + { + Size: 19177372, + ActualSize: 32891137, + }, + }, + }, + offset: 79109865, + startOffset: 39235668, + snappyStartOffset: 12001001, + }, + { + objInfo: ObjectInfo{ + Parts: []objectPartInfo{ + { + Size: 39235668, + ActualSize: 67108864, + }, + { + Size: 19177372, + ActualSize: 32891137, + }, + }, + }, + offset: 19109865, + startOffset: 0, + snappyStartOffset: 19109865, + }, + { + objInfo: ObjectInfo{ + Parts: []objectPartInfo{ + { + Size: 39235668, + ActualSize: 67108864, + }, + { + Size: 19177372, + ActualSize: 32891137, + }, + }, + }, + offset: 0, + startOffset: 0, + snappyStartOffset: 0, + }, + } + for i, test := range testCases { + startOffset, snappyStartOffset := getCompressedOffsets(test.objInfo, test.offset) + if startOffset != test.startOffset { + t.Errorf("Test %d - expected startOffset %d but received %d", + i+1, test.startOffset, startOffset) + } + if snappyStartOffset != test.snappyStartOffset { + t.Errorf("Test %d - expected snappyOffset %d but received %d", + i+1, test.snappyStartOffset, snappyStartOffset) + } + } +} diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 720bf5726..913b21723 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -31,6 +31,7 @@ import ( "strconv" "strings" + snappy "github.com/golang/snappy" "github.com/gorilla/mux" miniogo "github.com/minio/minio-go" "github.com/minio/minio/cmd/crypto" @@ -56,6 +57,10 @@ var supportedHeadGetReqParams = map[string]string{ "response-content-disposition": "Content-Disposition", } +const ( + compressionAlgorithmV1 = "golang/snappy/LZ77" +) + // setHeadGetRespHeaders - set any requested parameters as response headers. func setHeadGetRespHeaders(w http.ResponseWriter, reqParams url.Values) { for k, v := range reqParams { @@ -402,7 +407,6 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req if !httpWriter.HasWritten() && !statusCodeWritten { // write error response only if no data or headers has been written to client yet writeErrorResponse(w, toAPIErrorCode(err), r.URL) } - httpWriter.Close() return } @@ -718,9 +722,50 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re srcInfo.metadataOnly = true } - var reader io.Reader = gr + // Checks if a remote putobject call is needed for CopyObject operation + // 1. If source and destination bucket names are same, it means no call needed to etcd to get destination info + // 2. If destination bucket doesn't exist locally, only then a etcd call is needed + var isRemoteCallRequired = func(ctx context.Context, src, dst string, objAPI ObjectLayer) bool { + if src == dst { + return false + } + _, berr := objAPI.GetBucketInfo(ctx, dst) + return berr == toObjectErr(errVolumeNotFound, dst) + } - srcInfo.Reader, err = hash.NewReader(reader, srcInfo.Size, "", "") + var reader io.Reader + var length = srcInfo.Size + // No need to compress for remote etcd calls + // Pass the decompressed stream to such calls. + if srcInfo.IsCompressed() && !isRemoteCallRequired(ctx, srcBucket, dstBucket, objectAPI) { + var sreader io.Reader + var swriter io.Writer + + // Open a pipe for compression. + // Where snappyWriter is piped to srcInfo.Reader. + // gr writes to snappyWriter. + snappyReader, snappyWriter := io.Pipe() + reader = snappyReader + length = -1 + + swriter = snappy.NewWriter(snappyWriter) + sreader = gr + + go func() { + defer snappyWriter.Close() + // Compress the decompressed source object. + if _, err = io.Copy(swriter, sreader); err != nil { + return + } + }() + } else { + // Remove the metadata for remote calls. + delete(srcInfo.UserDefined, ReservedMetadataPrefix+"compression") + delete(srcInfo.UserDefined, ReservedMetadataPrefix+"actual-size") + reader = gr + } + + srcInfo.Reader, err = hash.NewReader(reader, length, "", "", srcInfo.Size) if err != nil { writeErrorResponse(w, toAPIErrorCode(err), r.URL) return @@ -731,7 +776,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re size := srcInfo.Size var encMetadata = make(map[string]string) - if objectAPI.IsEncryptionSupported() { + if objectAPI.IsEncryptionSupported() && !srcInfo.IsCompressed() { var oldKey, newKey []byte sseCopyS3 := crypto.S3.IsEncrypted(srcInfo.UserDefined) sseCopyC := crypto.SSECopy.IsRequested(r.Header) @@ -802,7 +847,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re } } - srcInfo.Reader, err = hash.NewReader(reader, size, "", "") // do not try to verify encrypted content + srcInfo.Reader, err = hash.NewReader(reader, size, "", "", size) // do not try to verify encrypted content if err != nil { writeErrorResponse(w, toAPIErrorCode(err), r.URL) return @@ -839,17 +884,6 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re var objInfo ObjectInfo - // Checks if a remote putobject call is needed for CopyObject operation - // 1. If source and destination bucket names are same, it means no call needed to etcd to get destination info - // 2. If destination bucket doesn't exist locally, only then a etcd call is needed - var isRemoteCallRequired = func(ctx context.Context, src, dst string, objAPI ObjectLayer) bool { - if src == dst { - return false - } - _, berr := objAPI.GetBucketInfo(ctx, dst) - return berr == toObjectErr(errVolumeNotFound, dst) - } - // Returns a minio-go Client configured to access remote host described by destDNSRecord // Applicable only in a federated deployment var getRemoteInstanceClient = func(host string, port int) (*miniogo.Core, error) { @@ -905,6 +939,10 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re host, port = "", "" } + if srcInfo.IsCompressed() { + objInfo.Size = srcInfo.GetActualSize() + } + // Notify object created event. sendEvent(eventArgs{ EventName: event.ObjectCreatedCopy, @@ -1064,7 +1102,40 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req } } - hashReader, err := hash.NewReader(reader, size, md5hex, sha256hex) + var hashError error + actualSize := size + + if objectAPI.IsCompressionSupported() && isCompressible(r.Header, object) && size > 0 { + // Storing the compression metadata. + metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV1 + metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(size, 10) + + pipeReader, pipeWriter := io.Pipe() + snappyWriter := snappy.NewWriter(pipeWriter) + + actualReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize) + if err != nil { + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } + go func() { + defer pipeWriter.Close() + defer snappyWriter.Close() + // Writing to the compressed writer. + _, err = io.CopyN(snappyWriter, actualReader, actualSize) + if err != nil { + hashError = err + return + } + }() + // Set compression metrics. + size = -1 // Since compressed size is un-predictable. + md5hex = "" // Do not try to verify the content. + sha256hex = "" + reader = pipeReader + } + + hashReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize) if err != nil { writeErrorResponse(w, toAPIErrorCode(err), r.URL) return @@ -1086,7 +1157,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req return } info := ObjectInfo{Size: size} - hashReader, err = hash.NewReader(reader, info.EncryptedSize(), "", "") // do not try to verify encrypted content + hashReader, err = hash.NewReader(reader, info.EncryptedSize(), "", "", size) // do not try to verify encrypted content if err != nil { writeErrorResponse(w, toAPIErrorCode(err), r.URL) return @@ -1108,6 +1179,11 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req return } + if objInfo.IsCompressed() { + // Ignore compressed ETag. + objInfo.ETag = objInfo.ETag + "-1" + } + w.Header().Set("ETag", "\""+objInfo.ETag+"\"") if objectAPI.IsEncryptionSupported() { if crypto.IsEncrypted(objInfo.UserDefined) { @@ -1121,6 +1197,15 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req } } + if hashError != nil { + if hashError == io.ErrUnexpectedEOF { + writeErrorResponse(w, ErrIncompleteBody, r.URL) + } else { + writeErrorResponse(w, toAPIErrorCode(hashError), r.URL) + } + return + } + writeSuccessResponseHeadersOnly(w) // Get host and port from Request.RemoteAddr. @@ -1218,6 +1303,11 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r // Ensure that metadata does not contain sensitive information crypto.RemoveSensitiveEntries(metadata) + if objectAPI.IsCompressionSupported() && isCompressible(r.Header, object) { + // Storing the compression metadata. + metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV1 + } + newMultipartUpload := objectAPI.NewMultipartUpload if api.CacheAPI() != nil && !hasServerSideEncryptionHeader(r.Header) { newMultipartUpload = api.CacheAPI().NewMultipartUpload @@ -1325,6 +1415,9 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt defer gr.Close() srcInfo := gr.ObjInfo + var actualPartSize int64 + actualPartSize = srcInfo.Size + // Special care for CopyObjectPart if partRangeErr := checkCopyPartRangeWithSize(rs, srcInfo.Size); partRangeErr != nil { writeCopyPartErr(w, partRangeErr, r.URL) @@ -1339,6 +1432,10 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt // Get the object offset & length startOffset, length, _ := rs.GetOffsetLength(srcInfo.Size) + if rangeHeader != "" { + actualPartSize = length + } + if objectAPI.IsEncryptionSupported() { if crypto.IsEncrypted(srcInfo.UserDefined) { decryptedSize, decryptErr := srcInfo.DecryptedSize() @@ -1356,14 +1453,42 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt return } - var reader io.Reader = gr + var reader io.Reader var getLength = length - srcInfo.Reader, err = hash.NewReader(reader, length, "", "") + + // Need to decompress only for range-enabled copy parts. + if srcInfo.IsCompressed() && rangeHeader != "" { + var sreader io.Reader + var swriter io.Writer + + // Open a pipe for compression. + // Where snappyWriter is piped to srcInfo.Reader. + // gr writes to snappyWriter. + snappyReader, snappyWriter := io.Pipe() + reader = snappyReader + length = -1 + + swriter = snappy.NewWriter(snappyWriter) + sreader = gr + + go func() { + defer snappyWriter.Close() + // Compress the decompressed source object. + if _, err = io.Copy(swriter, sreader); err != nil { + return + } + }() + } else { + reader = gr + } + + srcInfo.Reader, err = hash.NewReader(reader, length, "", "", actualPartSize) if err != nil { writeErrorResponse(w, toAPIErrorCode(err), r.URL) return } - if objectAPI.IsEncryptionSupported() { + + if objectAPI.IsEncryptionSupported() && !srcInfo.IsCompressed() { var li ListPartsInfo li, err = objectAPI.ListObjectParts(ctx, dstBucket, dstObject, uploadID, 0, 1) if err != nil { @@ -1404,7 +1529,7 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt info := ObjectInfo{Size: length} size := info.EncryptedSize() - srcInfo.Reader, err = hash.NewReader(reader, size, "", "") + srcInfo.Reader, err = hash.NewReader(reader, size, "", "", actualPartSize) if err != nil { writeErrorResponse(w, toAPIErrorCode(err), r.URL) return @@ -1550,9 +1675,48 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http } } - hashReader, err := hash.NewReader(reader, size, md5hex, sha256hex) + actualSize := size + var pipeReader *io.PipeReader + var pipeWriter *io.PipeWriter + + var li ListPartsInfo + li, err = objectAPI.ListObjectParts(ctx, bucket, object, uploadID, 0, 1) + if err != nil { + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } + // Read compression metadata preserved in the init multipart for the decision. + _, compressPart := li.UserDefined[ReservedMetadataPrefix+"compression"] + + isCompressed := false + if objectAPI.IsCompressionSupported() && compressPart { + pipeReader, pipeWriter = io.Pipe() + snappyWriter := snappy.NewWriter(pipeWriter) + actualReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize) + if err != nil { + writeErrorResponse(w, toAPIErrorCode(err), r.URL) + return + } + go func() { + defer pipeWriter.Close() + defer snappyWriter.Close() + // Writing to the compressed writer. + _, err = io.CopyN(snappyWriter, actualReader, actualSize) + if err != nil { + // The ErrorResponse is already written in putObjectPart Handle. + return + } + }() + // Set compression metrics. + size = -1 // Since compressed size is un-predictable. + md5hex = "" // Do not try to verify the content. + sha256hex = "" + reader = pipeReader + isCompressed = true + } + + hashReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize) if err != nil { - // Verify if the underlying error is signature mismatch. writeErrorResponse(w, toAPIErrorCode(err), r.URL) return } @@ -1566,7 +1730,7 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http } } - if objectAPI.IsEncryptionSupported() { + if objectAPI.IsEncryptionSupported() && !isCompressed { var li ListPartsInfo li, err = objectAPI.ListObjectParts(ctx, bucket, object, uploadID, 0, 1) if err != nil { @@ -1609,7 +1773,7 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http } info := ObjectInfo{Size: size} - hashReader, err = hash.NewReader(reader, info.EncryptedSize(), "", "") // do not try to verify encrypted content + hashReader, err = hash.NewReader(reader, info.EncryptedSize(), "", "", size) // do not try to verify encrypted content if err != nil { writeErrorResponse(w, toAPIErrorCode(err), r.URL) return @@ -1627,6 +1791,11 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http writeErrorResponse(w, toAPIErrorCode(err), r.URL) return } + if isCompressed { + pipeWriter.Close() + // Suppress compressed ETag. + partInfo.ETag = partInfo.ETag + "-1" + } if partInfo.ETag != "" { w.Header().Set("ETag", "\""+partInfo.ETag+"\"") } @@ -1765,6 +1934,11 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite // Complete parts. var completeParts []CompletePart for _, part := range complMultipartUpload.Parts { + // Avoiding for gateway parts. + // `strings.TrimPrefix` does not work here as intended. So `Replace` is used instead. + if objectAPI.IsCompressionSupported() { + part.ETag = strings.Replace(part.ETag, "-1", "", -1) // For compressed multiparts, We append '-1' for part.ETag. + } part.ETag = canonicalizeETag(part.ETag) completeParts = append(completeParts, part) } diff --git a/cmd/object-handlers_test.go b/cmd/object-handlers_test.go index 68b367e58..4a146af79 100644 --- a/cmd/object-handlers_test.go +++ b/cmd/object-handlers_test.go @@ -1411,7 +1411,7 @@ func testAPICopyObjectPartHandlerSanity(obj ObjectLayer, instanceType, bucketNam } a := 0 - b := globalMinPartSize - 1 + b := globalMinPartSize var parts []CompletePart for partNumber := 1; partNumber <= 2; partNumber++ { // initialize HTTP NewRecorder, this records any mutations to response writer inside the handler. @@ -1431,7 +1431,7 @@ func testAPICopyObjectPartHandlerSanity(obj ObjectLayer, instanceType, bucketNam // Since `apiRouter` satisfies `http.Handler` it has a ServeHTTP to execute the logic of the handler. // Call the ServeHTTP to execute the handler, `func (api objectAPIHandlers) CopyObjectHandler` handles the request. - a = globalMinPartSize + a = globalMinPartSize + 1 b = len(bytesData[0].byteData) - 1 apiRouter.ServeHTTP(rec, req) if rec.Code != http.StatusOK { diff --git a/cmd/posix.go b/cmd/posix.go index ad083a390..821cf2278 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -918,7 +918,7 @@ func (s *posix) createFile(volume, path string) (f *os.File, err error) { // Currently we use fallocate when available to avoid disk fragmentation as much as possible func (s *posix) PrepareFile(volume, path string, fileSize int64) (err error) { // It doesn't make sense to create a negative-sized file - if fileSize <= 0 { + if fileSize < -1 { return errInvalidArgument } @@ -949,8 +949,11 @@ func (s *posix) PrepareFile(volume, path string, fileSize int64) (err error) { // Close upon return. defer w.Close() - // Allocate needed disk space to append data - e := Fallocate(int(w.Fd()), 0, fileSize) + var e error + if fileSize > 0 { + // Allocate needed disk space to append data + e = Fallocate(int(w.Fd()), 0, fileSize) + } // Ignore errors when Fallocate is not supported in the current system if e != nil && !isSysErrNoSys(e) && !isSysErrOpNotSupported(e) { diff --git a/cmd/server_test.go b/cmd/server_test.go index 5b5e0c496..683fe4012 100644 --- a/cmd/server_test.go +++ b/cmd/server_test.go @@ -2728,6 +2728,8 @@ func (s *TestSuiteCommon) TestObjectMultipart(c *check) { c.Assert(response.StatusCode, http.StatusOK) var parts []CompletePart for _, part := range completeUploads.Parts { + // For compressed objects, we dont treat E-Tag as checksum. + part.ETag = strings.Replace(part.ETag, "-1", "", -1) part.ETag = canonicalizeETag(part.ETag) parts = append(parts, part) } diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index 9d067357a..821a22043 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -137,7 +137,7 @@ func calculateSignedChunkLength(chunkDataSize int64) int64 { } func mustGetHashReader(t TestErrHandler, data io.Reader, size int64, md5hex, sha256hex string) *hash.Reader { - hr, err := hash.NewReader(data, size, md5hex, sha256hex) + hr, err := hash.NewReader(data, size, md5hex, sha256hex, size) if err != nil { t.Fatal(err) } diff --git a/cmd/typed-errors.go b/cmd/typed-errors.go index be00ddca6..6eb8c1b74 100644 --- a/cmd/typed-errors.go +++ b/cmd/typed-errors.go @@ -73,3 +73,6 @@ var errFirstDiskWait = errors.New("Waiting on other disks") // error returned when a bucket already exists var errBucketAlreadyExists = errors.New("Your previous request to create the named bucket succeeded and you already own it") + +// error returned for a negative actual size. +var errInvalidDecompressedSize = errors.New("Invalid Decompressed Size") diff --git a/cmd/ui-errors.go b/cmd/ui-errors.go index 5bdd8490a..423b652c1 100644 --- a/cmd/ui-errors.go +++ b/cmd/ui-errors.go @@ -197,4 +197,10 @@ Example 1: "Please contact Minio at https://slack.minio.io", "", ) + + uiErrInvalidCompressionIncludesValue = newUIErrFn( + "Invalid compression include value", + "Please check the passed value", + "Compress extensions/mime-types are delimited by `,`. For eg, MINIO_COMPRESS_ATTR=\"A,B,C\"", + ) ) diff --git a/cmd/web-handlers.go b/cmd/web-handlers.go index b618aa235..45ca7594d 100644 --- a/cmd/web-handlers.go +++ b/cmd/web-handlers.go @@ -29,9 +29,11 @@ import ( "runtime" "strconv" "strings" + "sync" "time" humanize "github.com/dustin/go-humanize" + snappy "github.com/golang/snappy" "github.com/gorilla/mux" "github.com/gorilla/rpc/v2/json2" miniogopolicy "github.com/minio/minio-go/pkg/policy" @@ -648,7 +650,39 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { return } - hashReader, err := hash.NewReader(r.Body, size, "", "") + reader := r.Body + var hashError error + actualSize := size + + if objectAPI.IsCompressionSupported() && isCompressible(r.Header, object) && size > 0 { + // Storing the compression metadata. + metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV1 + metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(size, 10) + + pipeReader, pipeWriter := io.Pipe() + snappyWriter := snappy.NewWriter(pipeWriter) + + actualReader, err := hash.NewReader(reader, size, "", "", actualSize) + if err != nil { + writeWebErrorResponse(w, err) + return + } + go func() { + defer pipeWriter.Close() + defer snappyWriter.Close() + // Writing to the compressed writer. + _, err = io.CopyN(snappyWriter, actualReader, actualSize) + if err != nil { + hashError = err + return + } + }() + // Set compression metrics. + size = -1 // Since compressed size is un-predictable. + reader = pipeReader + } + + hashReader, err := hash.NewReader(reader, size, "", "", actualSize) if err != nil { writeWebErrorResponse(w, err) return @@ -668,6 +702,11 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { return } + if hashError != nil { + writeWebErrorResponse(w, hashError) + return + } + // Notify object created event. sendEvent(eventArgs{ EventName: event.ObjectCreatedPut, @@ -679,6 +718,7 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) { // Download - file download handler. func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) { + var wg sync.WaitGroup objectAPI := web.ObjectAPI() if objectAPI == nil { writeWebErrorResponse(w, errServerNotInitialized) @@ -703,22 +743,28 @@ func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) { return } } + opts := ObjectOptions{} + getObjectInfo := objectAPI.GetObjectInfo getObject := objectAPI.GetObject - if web.CacheAPI() != nil { - getObject = web.CacheAPI().GetObject - } - getObjectInfo := objectAPI.GetObjectInfo if web.CacheAPI() != nil { getObjectInfo = web.CacheAPI().GetObjectInfo + getObject = web.CacheAPI().GetObject } objInfo, err := getObjectInfo(context.Background(), bucket, object, opts) if err != nil { writeWebErrorResponse(w, err) return } - length := objInfo.Size + var actualSize int64 + if objInfo.IsCompressed() { + // Read the decompressed size from the meta.json. + actualSize = objInfo.GetActualSize() + if actualSize < 0 { + return + } + } if objectAPI.IsEncryptionSupported() { if _, err = DecryptObjectInfo(objInfo, r.Header); err != nil { writeWebErrorResponse(w, err) @@ -730,7 +776,37 @@ func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) { } var startOffset int64 var writer io.Writer - writer = w + var decompressReader *io.PipeReader + var compressWriter *io.PipeWriter + if objInfo.IsCompressed() { + var pipeErr error + + // The decompress metrics are set. + snappyStartOffset := 0 + snappyLength := actualSize + + // Open a pipe for compression + // Where compressWriter is actually passed to the getObject + decompressReader, compressWriter = io.Pipe() + snappyReader := snappy.NewReader(decompressReader) + + // The limit is set to the actual size. + responseWriter := ioutil.LimitedWriter(w, int64(snappyStartOffset), snappyLength) + wg.Add(1) //For closures. + go func() { + defer wg.Done() + // Finally, writes to the client. + if _, pipeErr = io.Copy(responseWriter, snappyReader); pipeErr != nil { + return + } + // Close the compressWriter if the data is read already. + // Closing the pipe, releases the writer passed to the getObject. + compressWriter.Close() + }() + writer = compressWriter + } else { + writer = w + } if objectAPI.IsEncryptionSupported() && crypto.S3.IsEncrypted(objInfo.UserDefined) { // Response writer should be limited early on for decryption upto required length, // additionally also skipping mod(offset)64KiB boundaries. @@ -743,12 +819,17 @@ func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) { } w.Header().Set(crypto.SSEHeader, crypto.SSEAlgorithmAES256) } + httpWriter := ioutil.WriteOnClose(writer) // Add content disposition. w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s\"", path.Base(object))) if err = getObject(context.Background(), bucket, object, 0, -1, httpWriter, "", opts); err != nil { + httpWriter.Close() + if objInfo.IsCompressed() { + wg.Wait() + } /// No need to print error, response writer already written to. return } @@ -758,6 +839,10 @@ func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) { return } } + if objInfo.IsCompressed() { + // Wait for decompression go-routine to retire. + wg.Wait() + } } // DownloadZipArgs - Argument for downloading a bunch of files as a zip file. @@ -771,6 +856,7 @@ type DownloadZipArgs struct { // Takes a list of objects and creates a zip file that sent as the response body. func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) { + var wg sync.WaitGroup objectAPI := web.ObjectAPI() if objectAPI == nil { writeWebErrorResponse(w, errServerNotInitialized) @@ -818,14 +904,17 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) { getObjectInfo = web.CacheAPI().GetObjectInfo } opts := ObjectOptions{} + var length int64 for _, object := range args.Objects { + var decompressReader *io.PipeReader + var compressWriter *io.PipeWriter // Writes compressed object file to the response. zipit := func(objectName string) error { info, err := getObjectInfo(context.Background(), args.BucketName, objectName, opts) if err != nil { return err } - length := info.Size + length = info.Size if objectAPI.IsEncryptionSupported() { if _, err = DecryptObjectInfo(info, r.Header); err != nil { writeWebErrorResponse(w, err) @@ -835,20 +924,57 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) { length, _ = info.DecryptedSize() } } + length = info.Size + var actualSize int64 + if info.IsCompressed() { + // Read the decompressed size from the meta.json. + actualSize = info.GetActualSize() + // Set the info.Size to the actualSize. + info.Size = actualSize + } header := &zip.FileHeader{ Name: strings.TrimPrefix(objectName, args.Prefix), Method: zip.Deflate, UncompressedSize64: uint64(length), UncompressedSize: uint32(length), } - wr, err := archive.CreateHeader(header) + zipWriter, err := archive.CreateHeader(header) if err != nil { writeWebErrorResponse(w, errUnexpected) return err } var startOffset int64 var writer io.Writer - writer = wr + + if info.IsCompressed() { + var pipeErr error + + // The decompress metrics are set. + snappyStartOffset := 0 + snappyLength := actualSize + + // Open a pipe for compression + // Where compressWriter is actually passed to the getObject + decompressReader, compressWriter = io.Pipe() + snappyReader := snappy.NewReader(decompressReader) + + // The limit is set to the actual size. + responseWriter := ioutil.LimitedWriter(zipWriter, int64(snappyStartOffset), snappyLength) + wg.Add(1) //For closures. + go func() { + defer wg.Done() + // Finally, writes to the client. + if _, pipeErr = io.Copy(responseWriter, snappyReader); pipeErr != nil { + return + } + // Close the compressWriter if the data is read already. + // Closing the pipe, releases the writer passed to the getObject. + compressWriter.Close() + }() + writer = compressWriter + } else { + writer = zipWriter + } if objectAPI.IsEncryptionSupported() && crypto.S3.IsEncrypted(info.UserDefined) { // Response writer should be limited early on for decryption upto required length, // additionally also skipping mod(offset)64KiB boundaries. @@ -861,6 +987,11 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) { } httpWriter := ioutil.WriteOnClose(writer) if err = getObject(context.Background(), args.BucketName, objectName, 0, length, httpWriter, "", opts); err != nil { + httpWriter.Close() + if info.IsCompressed() { + // Wait for decompression go-routine to retire. + wg.Wait() + } return err } if err = httpWriter.Close(); err != nil { @@ -869,6 +1000,10 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) { return err } } + if info.IsCompressed() { + // Wait for decompression go-routine to retire. + wg.Wait() + } return nil } diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index 9bd61778a..da1deff88 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -519,6 +519,11 @@ func (s *xlSets) IsEncryptionSupported() bool { return s.getHashedSet("").IsEncryptionSupported() } +// IsCompressionSupported returns whether compression is applicable for this layer. +func (s *xlSets) IsCompressionSupported() bool { + return s.getHashedSet("").IsCompressionSupported() +} + // DeleteBucket - deletes a bucket on all sets simultaneously, // even if one of the sets fail to delete buckets, we proceed to // undo a successful operation. diff --git a/cmd/xl-v1-bucket.go b/cmd/xl-v1-bucket.go index e3875560d..1200c6ba4 100644 --- a/cmd/xl-v1-bucket.go +++ b/cmd/xl-v1-bucket.go @@ -299,3 +299,8 @@ func (xl xlObjects) IsNotificationSupported() bool { func (xl xlObjects) IsEncryptionSupported() bool { return true } + +// IsCompressionSupported returns whether compression is applicable for this layer. +func (xl xlObjects) IsCompressionSupported() bool { + return true +} diff --git a/cmd/xl-v1-metadata.go b/cmd/xl-v1-metadata.go index 33f34767c..00f6e1407 100644 --- a/cmd/xl-v1-metadata.go +++ b/cmd/xl-v1-metadata.go @@ -34,10 +34,11 @@ const erasureAlgorithmKlauspost = "klauspost/reedsolomon/vandermonde" // objectPartInfo Info of each part kept in the multipart metadata // file after CompleteMultipartUpload() is called. type objectPartInfo struct { - Number int `json:"number"` - Name string `json:"name"` - ETag string `json:"etag"` - Size int64 `json:"size"` + Number int `json:"number"` + Name string `json:"name"` + ETag string `json:"etag"` + Size int64 `json:"size"` + ActualSize int64 `json:"actualSize"` } // byObjectPartNumber is a collection satisfying sort.Interface. @@ -250,12 +251,13 @@ func objectPartIndex(parts []objectPartInfo, partNumber int) int { } // AddObjectPart - add a new object part in order. -func (m *xlMetaV1) AddObjectPart(partNumber int, partName string, partETag string, partSize int64) { +func (m *xlMetaV1) AddObjectPart(partNumber int, partName string, partETag string, partSize int64, actualSize int64) { partInfo := objectPartInfo{ - Number: partNumber, - Name: partName, - ETag: partETag, - Size: partSize, + Number: partNumber, + Name: partName, + ETag: partETag, + Size: partSize, + ActualSize: actualSize, } // Update part info if it already exists. diff --git a/cmd/xl-v1-metadata_test.go b/cmd/xl-v1-metadata_test.go index 7411de84a..2108cf721 100644 --- a/cmd/xl-v1-metadata_test.go +++ b/cmd/xl-v1-metadata_test.go @@ -28,6 +28,8 @@ import ( humanize "github.com/dustin/go-humanize" ) +const ActualSize = 1000 + // Tests for reading XL object info. func TestXLReadStat(t *testing.T) { ExecObjectLayerDiskAlteredTest(t, testXLReadStat) @@ -213,7 +215,7 @@ func TestAddObjectPart(t *testing.T) { for _, testCase := range testCases { if testCase.expectedIndex > -1 { partNumString := strconv.Itoa(testCase.partNum) - xlMeta.AddObjectPart(testCase.partNum, "part."+partNumString, "etag."+partNumString, int64(testCase.partNum+humanize.MiByte)) + xlMeta.AddObjectPart(testCase.partNum, "part."+partNumString, "etag."+partNumString, int64(testCase.partNum+humanize.MiByte), ActualSize) } if index := objectPartIndex(xlMeta.Parts, testCase.partNum); index != testCase.expectedIndex { @@ -245,7 +247,7 @@ func TestObjectPartIndex(t *testing.T) { // Add some parts for testing. for _, testCase := range testCases { partNumString := strconv.Itoa(testCase.partNum) - xlMeta.AddObjectPart(testCase.partNum, "part."+partNumString, "etag."+partNumString, int64(testCase.partNum+humanize.MiByte)) + xlMeta.AddObjectPart(testCase.partNum, "part."+partNumString, "etag."+partNumString, int64(testCase.partNum+humanize.MiByte), ActualSize) } // Add failure test case. @@ -274,7 +276,7 @@ func TestObjectToPartOffset(t *testing.T) { // Total size of all parts is 5,242,899 bytes. for _, partNum := range []int{1, 2, 4, 5, 7} { partNumString := strconv.Itoa(partNum) - xlMeta.AddObjectPart(partNum, "part."+partNumString, "etag."+partNumString, int64(partNum+humanize.MiByte)) + xlMeta.AddObjectPart(partNum, "part."+partNumString, "etag."+partNumString, int64(partNum+humanize.MiByte), ActualSize) } testCases := []struct { diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index 4dc517a96..615aee96b 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -22,6 +22,7 @@ import ( "fmt" "path" "sort" + "strconv" "strings" "sync" "time" @@ -289,7 +290,7 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID } // Validate input data size and it can never be less than zero. - if data.Size() < 0 { + if data.Size() < -1 { logger.LogIf(ctx, errInvalidArgument) return pi, toObjectErr(errInvalidArgument) } @@ -348,7 +349,7 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID // Delete the temporary object part. If PutObjectPart succeeds there would be nothing to delete. defer xl.deleteObject(ctx, minioMetaTmpBucket, tmpPart, writeQuorum, false) - if data.Size() > 0 { + if data.Size() > 0 || data.Size() == -1 { if pErr := xl.prepareFile(ctx, minioMetaTmpBucket, tmpPartPath, data.Size(), onlineDisks, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, writeQuorum); err != nil { return pi, toObjectErr(pErr, bucket, object) @@ -365,12 +366,12 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID switch size := data.Size(); { case size == 0: buffer = make([]byte, 1) // Allocate atleast a byte to reach EOF + case size == -1 || size > blockSizeV1: + buffer = xl.bp.Get() + defer xl.bp.Put(buffer) case size < blockSizeV1: // No need to allocate fully blockSizeV1 buffer if the incoming data is smaller. buffer = make([]byte, size, 2*size) - default: - buffer = xl.bp.Get() - defer xl.bp.Put(buffer) } if len(buffer) > int(xlMeta.Erasure.BlockSize) { @@ -442,7 +443,7 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID md5hex := hex.EncodeToString(data.MD5Current()) // Add the current part. - xlMeta.AddObjectPart(partID, partSuffix, md5hex, n) + xlMeta.AddObjectPart(partID, partSuffix, md5hex, n, data.ActualSize()) for i, disk := range onlineDisks { if disk == OfflineDisk { @@ -477,6 +478,7 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID LastModified: fi.ModTime, ETag: md5hex, Size: fi.Size, + ActualSize: data.ActualSize(), }, nil } @@ -630,6 +632,9 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string, // Calculate full object size. var objectSize int64 + // Calculate consolidated actual size. + var objectActualSize int64 + // Pick one from the first valid metadata. xlMeta, err := pickValidXLMeta(ctx, partsMetadata, modTime, writeQuorum) if err != nil { @@ -673,15 +678,15 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string, } // All parts except the last part has to be atleast 5MB. - if (i < len(parts)-1) && !isMinAllowedPartSize(currentXLMeta.Parts[partIdx].Size) { + if (i < len(parts)-1) && !isMinAllowedPartSize(currentXLMeta.Parts[partIdx].ActualSize) { logger.LogIf(ctx, PartTooSmall{ PartNumber: part.PartNumber, - PartSize: currentXLMeta.Parts[partIdx].Size, + PartSize: currentXLMeta.Parts[partIdx].ActualSize, PartETag: part.ETag, }) return oi, PartTooSmall{ PartNumber: part.PartNumber, - PartSize: currentXLMeta.Parts[partIdx].Size, + PartSize: currentXLMeta.Parts[partIdx].ActualSize, PartETag: part.ETag, } } @@ -696,12 +701,16 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string, // Save for total object size. objectSize += currentXLMeta.Parts[partIdx].Size + // Save the consolidated actual size. + objectActualSize += currentXLMeta.Parts[partIdx].ActualSize + // Add incoming parts. xlMeta.Parts[i] = objectPartInfo{ - Number: part.PartNumber, - ETag: part.ETag, - Size: currentXLMeta.Parts[partIdx].Size, - Name: fmt.Sprintf("part.%d", part.PartNumber), + Number: part.PartNumber, + ETag: part.ETag, + Size: currentXLMeta.Parts[partIdx].Size, + Name: fmt.Sprintf("part.%d", part.PartNumber), + ActualSize: currentXLMeta.Parts[partIdx].ActualSize, } } @@ -712,6 +721,9 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string, // Save successfully calculated md5sum. xlMeta.Meta["etag"] = s3MD5 + // Save the consolidated actual size. + xlMeta.Meta[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(objectActualSize, 10) + tempUploadIDPath := uploadID // Update all xl metadata, make sure to not modify fields like diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index 0c1d5f99f..933b3b8bf 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -147,7 +147,7 @@ func (xl xlObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBuc pipeWriter.Close() // Close writer explicitly signaling we wrote all data. }() - hashReader, err := hash.NewReader(pipeReader, length, "", "") + hashReader, err := hash.NewReader(pipeReader, length, "", "", length) if err != nil { logger.LogIf(ctx, err) return oi, toObjectErr(err, dstBucket, dstObject) @@ -646,7 +646,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, } // Validate input data size and it can never be less than zero. - if data.Size() < 0 { + if data.Size() < -1 { logger.LogIf(ctx, errInvalidArgument) return ObjectInfo{}, toObjectErr(errInvalidArgument) } @@ -687,12 +687,12 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, switch size := data.Size(); { case size == 0: buffer = make([]byte, 1) // Allocate atleast a byte to reach EOF + case size == -1 || size > blockSizeV1: + buffer = xl.bp.Get() + defer xl.bp.Put(buffer) case size < blockSizeV1: // No need to allocate fully blockSizeV1 buffer if the incoming data is smaller. buffer = make([]byte, size, 2*size) - default: - buffer = xl.bp.Get() - defer xl.bp.Put(buffer) } if len(buffer) > int(xlMeta.Erasure.BlockSize) { @@ -716,7 +716,8 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, // Hint the filesystem to pre-allocate one continuous large block. // This is only an optimization. var curPartReader io.Reader - if curPartSize > 0 { + + if curPartSize >= 0 { pErr := xl.prepareFile(ctx, minioMetaTmpBucket, tempErasureObj, curPartSize, onlineDisks, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, writeQuorum) if pErr != nil { return ObjectInfo{}, toObjectErr(pErr, bucket, object) @@ -743,11 +744,23 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, // Should return IncompleteBody{} error when reader has fewer bytes // than specified in request header. - if n < curPartSize { + + if n < curPartSize && data.Size() > 0 { logger.LogIf(ctx, IncompleteBody{}) return ObjectInfo{}, IncompleteBody{} } + if n == 0 && data.Size() == -1 { + // The last part of a compressed object will always be empty + // Since the compressed size is unpredictable. + // Hence removing the last (empty) part from all `xl.disks`. + dErr := xl.deleteObject(ctx, minioMetaTmpBucket, tempErasureObj, writeQuorum, true) + if dErr != nil { + return ObjectInfo{}, toObjectErr(dErr, minioMetaTmpBucket, tempErasureObj) + } + break + } + // Update the total written size sizeWritten += n @@ -756,7 +769,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, onlineDisks[i] = nil continue } - partsMetadata[i].AddObjectPart(partIdx, partName, "", n) + partsMetadata[i].AddObjectPart(partIdx, partName, "", n, data.ActualSize()) partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{partName, DefaultBitrotAlgorithm, w.Sum()}) } diff --git a/cmd/xl-v1-utils.go b/cmd/xl-v1-utils.go index be5efa084..c8aa3af3d 100644 --- a/cmd/xl-v1-utils.go +++ b/cmd/xl-v1-utils.go @@ -202,6 +202,7 @@ func parseXLParts(xlMetaBuf []byte) []objectPartInfo { info.Name = p.Get("name").String() info.ETag = p.Get("etag").String() info.Size = p.Get("size").Int() + info.ActualSize = p.Get("actualSize").Int() partInfo[i] = info } return partInfo @@ -410,7 +411,7 @@ var ( // calculatePartSizeFromIdx calculates the part size according to input index. // returns error if totalSize is -1, partSize is 0, partIndex is 0. func calculatePartSizeFromIdx(ctx context.Context, totalSize int64, partSize int64, partIndex int) (currPartSize int64, err error) { - if totalSize < 0 { + if totalSize < -1 { logger.LogIf(ctx, errInvalidArgument) return 0, errInvalidArgument } diff --git a/cmd/xl-v1-utils_test.go b/cmd/xl-v1-utils_test.go index b07eeb036..eba90e962 100644 --- a/cmd/xl-v1-utils_test.go +++ b/cmd/xl-v1-utils_test.go @@ -379,7 +379,7 @@ func TestGetPartSizeFromIdx(t *testing.T) { // partIndex is 0, returns error. {10, 1, 0, errPartSizeIndex}, // Total size is -1, returns error. - {-1, 10, 1, errInvalidArgument}, + {-2, 10, 1, errInvalidArgument}, } for i, testCaseFailure := range testCasesFailure { diff --git a/docs/compression/README.md b/docs/compression/README.md new file mode 100644 index 000000000..b2518a942 --- /dev/null +++ b/docs/compression/README.md @@ -0,0 +1,81 @@ +# Compression Guide [![Slack](https://slack.minio.io/slack?type=svg)](https://slack.minio.io) + +Minio server allows streaming compression to ensure efficient disk space usage. Compression happens inflight, i.e objects are compressed before being written to disk(s). Minio uses [`golang/snappy`](https://github.com/golang/snappy) streaming compression due to its stability and performance. + +## Get Started + +### 1. Prerequisites + +Install Minio - [Minio Quickstart Guide](https://docs.minio.io/docs/minio-quickstart-guide). + +### 2. Run Minio with compression + +Compression can be enabled by updating the `compress` config settings for Minio server config. Config `compress` settings take extensions and mime-types to be compressed. + +```json +"compress": { + "enabled": true, + "extensions": [".txt",".log",".csv", ".json"], + "mime-types": ["text/csv","text/plain","application/json"] +} +``` + +Since text, log, csv, json files are highly compressible, These extensions/mime-types are included by default for compression. + +To update the configuration, use `mc admin config get` command to get the current configuration file for the minio cluster in json format, and save it locally. + +```sh +$ mc admin config get myminio/ > /tmp/myconfig +``` + +After updating the compression configuration in /tmp/myconfig , use `mc admin config set` command to update the configuration for the cluster. Restart the Minio server to put the changes into effect. + +```sh +$ mc admin config set myminio < /tmp/myconfig +``` + +The compression settings may also be set through environment variables. When set, environment variables override the defined `compress` config settings in the server config. + +```bash +export MINIO_COMPRESS="true" +export MINIO_COMPRESS_EXTENSIONS=".pdf,.doc" +export MINIO_COMPRESS_MIMETYPES="application/pdf" +``` + +### 3. Note + +- Already compressed objects are not fit for compression since they do not have compressible patterns. Such objects do not produce efficient [`Run-length encoding (RLE)`](https://en.wikipedia.org/wiki/Run-length_encoding) which is a fitness factor for a lossless data compression. Below is a list of common files and content-types which are not suitable for compression. + + - Extensions + + | `gz` | (GZIP) + | `bz2` | (BZIP2) + | `rar` | (WinRAR) + | `zip` | (ZIP) + | `7z` | (7-Zip) + + - Content-Types + + | `video/*` | + | `audio/*` | + | `application/zip` | + | `application/x-gzip` | + | `application/zip` | + | `application/x-compress` | + | `application/x-spoon` | + +- Minio does not support encryption with compression because compression and encryption together enables room for side channel attacks like [`CRIME and BREACH`](https://en.wikipedia.org/wiki/CRIME) + +- Minio does not support compression for Gateway (Azure/GCS/NAS) implementations. + +## To test the setup + +To test this setup, practice put calls to the server using `mc` and use `mc ls` on the data directory to view the size of the object. + +## Explore Further + +- [Use `mc` with Minio Server](https://docs.minio.io/docs/minio-client-quickstart-guide) +- [Use `aws-cli` with Minio Server](https://docs.minio.io/docs/aws-cli-with-minio) +- [Use `s3cmd` with Minio Server](https://docs.minio.io/docs/s3cmd-with-minio) +- [Use `minio-go` SDK with Minio Server](https://docs.minio.io/docs/golang-client-quickstart-guide) +- [The Minio documentation website](https://docs.minio.io) diff --git a/pkg/hash/reader.go b/pkg/hash/reader.go index 4bcd5bade..19648db07 100644 --- a/pkg/hash/reader.go +++ b/pkg/hash/reader.go @@ -33,8 +33,9 @@ var errNestedReader = errors.New("Nesting of Reader detected, not allowed") // Reader writes what it reads from an io.Reader to an MD5 and SHA256 hash.Hash. // Reader verifies that the content of the io.Reader matches the expected checksums. type Reader struct { - src io.Reader - size int64 + src io.Reader + size int64 + actualSize int64 md5sum, sha256sum []byte // Byte values of md5sum, sha256sum of client sent values. md5Hash, sha256Hash hash.Hash @@ -42,7 +43,7 @@ type Reader struct { // NewReader returns a new hash Reader which computes the MD5 sum and // SHA256 sum (if set) of the provided io.Reader at EOF. -func NewReader(src io.Reader, size int64, md5Hex, sha256Hex string) (*Reader, error) { +func NewReader(src io.Reader, size int64, md5Hex, sha256Hex string, actualSize int64) (*Reader, error) { if _, ok := src.(*Reader); ok { return nil, errNestedReader } @@ -71,6 +72,7 @@ func NewReader(src io.Reader, size int64, md5Hex, sha256Hex string) (*Reader, er size: size, md5Hash: md5.New(), sha256Hash: sha256Hash, + actualSize: actualSize, }, nil } @@ -98,6 +100,10 @@ func (r *Reader) Read(p []byte) (n int, err error) { // data. func (r *Reader) Size() int64 { return r.size } +// ActualSize returns the pre-modified size of the object. +// DecompressedSize - For compressed objects. +func (r *Reader) ActualSize() int64 { return r.actualSize } + // MD5 - returns byte md5 value func (r *Reader) MD5() []byte { return r.md5sum diff --git a/pkg/hash/reader_test.go b/pkg/hash/reader_test.go index 277bffc0c..a8013a2dc 100644 --- a/pkg/hash/reader_test.go +++ b/pkg/hash/reader_test.go @@ -26,7 +26,7 @@ import ( // Tests functions like Size(), MD5*(), SHA256*() func TestHashReaderHelperMethods(t *testing.T) { - r, err := NewReader(bytes.NewReader([]byte("abcd")), 4, "e2fc714c4727ee9395f324cd2e7f331f", "88d4266fd4e6338d13b845fcf289579d209c897823b9217da3e161936f031589") + r, err := NewReader(bytes.NewReader([]byte("abcd")), 4, "e2fc714c4727ee9395f324cd2e7f331f", "88d4266fd4e6338d13b845fcf289579d209c897823b9217da3e161936f031589", 4) if err != nil { t.Fatal(err) } @@ -46,6 +46,9 @@ func TestHashReaderHelperMethods(t *testing.T) { if r.Size() != 4 { t.Errorf("Expected size 4, got %d", r.Size()) } + if r.ActualSize() != 4 { + t.Errorf("Expected size 4, got %d", r.ActualSize()) + } expectedMD5, err := hex.DecodeString("e2fc714c4727ee9395f324cd2e7f331f") if err != nil { t.Fatal(err) @@ -67,19 +70,22 @@ func TestHashReaderVerification(t *testing.T) { testCases := []struct { src io.Reader size int64 + actualSize int64 md5hex, sha256hex string err error }{ // Success, no checksum verification provided. { - src: bytes.NewReader([]byte("abcd")), - size: 4, + src: bytes.NewReader([]byte("abcd")), + size: 4, + actualSize: 4, }, // Failure md5 mismatch. { - src: bytes.NewReader([]byte("abcd")), - size: 4, - md5hex: "d41d8cd98f00b204e9800998ecf8427f", + src: bytes.NewReader([]byte("abcd")), + size: 4, + actualSize: 4, + md5hex: "d41d8cd98f00b204e9800998ecf8427f", err: BadDigest{ "d41d8cd98f00b204e9800998ecf8427f", "e2fc714c4727ee9395f324cd2e7f331f", @@ -87,9 +93,10 @@ func TestHashReaderVerification(t *testing.T) { }, // Failure sha256 mismatch. { - src: bytes.NewReader([]byte("abcd")), - size: 4, - sha256hex: "88d4266fd4e6338d13b845fcf289579d209c897823b9217da3e161936f031580", + src: bytes.NewReader([]byte("abcd")), + size: 4, + actualSize: 4, + sha256hex: "88d4266fd4e6338d13b845fcf289579d209c897823b9217da3e161936f031580", err: SHA256Mismatch{ "88d4266fd4e6338d13b845fcf289579d209c897823b9217da3e161936f031580", "88d4266fd4e6338d13b845fcf289579d209c897823b9217da3e161936f031589", @@ -97,7 +104,7 @@ func TestHashReaderVerification(t *testing.T) { }, } for i, testCase := range testCases { - r, err := NewReader(testCase.src, testCase.size, testCase.md5hex, testCase.sha256hex) + r, err := NewReader(testCase.src, testCase.size, testCase.md5hex, testCase.sha256hex, testCase.actualSize) if err != nil { t.Fatalf("Test %d: Initializing reader failed %s", i+1, err) } @@ -115,6 +122,7 @@ func TestHashReaderInvalidArguments(t *testing.T) { testCases := []struct { src io.Reader size int64 + actualSize int64 md5hex, sha256hex string success bool expectedErr error @@ -123,6 +131,7 @@ func TestHashReaderInvalidArguments(t *testing.T) { { src: bytes.NewReader([]byte("abcd")), size: 4, + actualSize: 4, md5hex: "invalid-md5", success: false, expectedErr: BadDigest{}, @@ -131,6 +140,7 @@ func TestHashReaderInvalidArguments(t *testing.T) { { src: bytes.NewReader([]byte("abcd")), size: 4, + actualSize: 4, sha256hex: "invalid-sha256", success: false, expectedErr: SHA256Mismatch{}, @@ -139,19 +149,21 @@ func TestHashReaderInvalidArguments(t *testing.T) { { src: &Reader{src: bytes.NewReader([]byte("abcd"))}, size: 4, + actualSize: 4, success: false, expectedErr: errNestedReader, }, // Expected inputs, NewReader() will succeed. { - src: bytes.NewReader([]byte("abcd")), - size: 4, - success: true, + src: bytes.NewReader([]byte("abcd")), + size: 4, + actualSize: 4, + success: true, }, } for i, testCase := range testCases { - _, err := NewReader(testCase.src, testCase.size, testCase.md5hex, testCase.sha256hex) + _, err := NewReader(testCase.src, testCase.size, testCase.md5hex, testCase.sha256hex, testCase.actualSize) if err != nil && testCase.success { t.Errorf("Test %d: Expected success, but got error %s instead", i+1, err) } diff --git a/vendor/github.com/golang/snappy/encode.go b/vendor/github.com/golang/snappy/encode.go index 874968906..8d393e904 100644 --- a/vendor/github.com/golang/snappy/encode.go +++ b/vendor/github.com/golang/snappy/encode.go @@ -138,7 +138,7 @@ func NewBufferedWriter(w io.Writer) *Writer { } } -// Writer is an io.Writer than can write Snappy-compressed bytes. +// Writer is an io.Writer that can write Snappy-compressed bytes. type Writer struct { w io.Writer err error diff --git a/vendor/github.com/golang/snappy/encode_amd64.go b/vendor/github.com/golang/snappy/encode_amd64.go index 2a56fb504..150d91bc8 100644 --- a/vendor/github.com/golang/snappy/encode_amd64.go +++ b/vendor/github.com/golang/snappy/encode_amd64.go @@ -26,4 +26,4 @@ func extendMatch(src []byte, i, j int) int // encodeBlock has the same semantics as in encode_other.go. // //go:noescape -func encodeBlock(dst, src []byte) (d int) \ No newline at end of file +func encodeBlock(dst, src []byte) (d int) diff --git a/vendor/github.com/golang/snappy/snappy.go b/vendor/github.com/golang/snappy/snappy.go index 0cf5e379c..ece692ea4 100644 --- a/vendor/github.com/golang/snappy/snappy.go +++ b/vendor/github.com/golang/snappy/snappy.go @@ -2,10 +2,21 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// Package snappy implements the snappy block-based compression format. -// It aims for very high speeds and reasonable compression. +// Package snappy implements the Snappy compression format. It aims for very +// high speeds and reasonable compression. // -// The C++ snappy implementation is at https://github.com/google/snappy +// There are actually two Snappy formats: block and stream. They are related, +// but different: trying to decompress block-compressed data as a Snappy stream +// will fail, and vice versa. The block format is the Decode and Encode +// functions and the stream format is the Reader and Writer types. +// +// The block format, the more common case, is used when the complete size (the +// number of bytes) of the original data is known upfront, at the time +// compression starts. The stream format, also known as the framing format, is +// for when that isn't always true. +// +// The canonical, C++ implementation is at https://github.com/google/snappy and +// it only implements the block format. package snappy // import "github.com/golang/snappy" import ( diff --git a/vendor/vendor.json b/vendor/vendor.json index b065a0561..2bb924d21 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -359,10 +359,10 @@ "revisionTime": "2018-04-30T18:52:41Z" }, { - "checksumSHA1": "W+E/2xXcE1GmJ0Qb784ald0Fn6I=", + "checksumSHA1": "h1d2lPZf6j2dW/mIqVnd1RdykDo=", "path": "github.com/golang/snappy", - "revision": "d9eb7a3d35ec988b8585d4a0068e462c27d28380", - "revisionTime": "2016-05-29T05:00:41Z" + "revision": "2e65f85255dbc3072edf28d6b5b8efc472979f5a", + "revisionTime": "2018-05-18T05:39:59Z" }, { "checksumSHA1": "0x0CoHbgoWngucjKSDPFodeL8ek=",