Add object compression support (#6292)

Add support for streaming (golang/LZ77/snappy) compression.
This commit is contained in:
Praveen raj Mani
2018-09-28 09:06:17 +05:30
committed by Nitish Tiwari
parent 5c765bc63e
commit ce9d36d954
57 changed files with 1321 additions and 173 deletions

View File

@@ -38,7 +38,7 @@ import (
var (
configJSON = []byte(`{
"version": "29",
"version": "30",
"credential": {
"accessKey": "minio",
"secretKey": "minio123"
@@ -186,20 +186,24 @@ var (
"endpoint": ""
}
}
},
"logger": {
},
"logger": {
"console": {
"enabled": true
"enabled": true
},
"http": {
"1": {
"enabled": false,
"endpoint": "http://user:example@localhost:9001/api/endpoint"
}
"target1": {
"enabled": false,
"endpoint": "https://username:password@example.com/api"
}
}
}
}`)
},
"compress": {
"enabled": false,
"extensions":[".txt",".log",".csv",".json"],
"mime-types":["text/csv","text/plain","application/json"]
}
}`)
)
// adminXLTestBed - encapsulates subsystems that need to be setup for

View File

@@ -292,6 +292,7 @@ const (
ErrMissingHeaders
ErrAdminConfigNotificationTargetsFailed
ErrAdminProfilerNotEnabled
ErrInvalidDecompressedSize
)
// error code to APIError structure, these fields carry respective
@@ -1403,6 +1404,11 @@ var errorCodeResponse = map[APIErrorCode]APIError{
Description: "Some headers in the query are missing from the file. Check the file and try again.",
HTTPStatusCode: http.StatusBadRequest,
},
ErrInvalidDecompressedSize: {
Code: "XMinioInvalidDecompressedSize",
Description: "The data provided is unfit for decompression",
HTTPStatusCode: http.StatusBadRequest,
},
// Add your error structure here.
}
@@ -1622,6 +1628,12 @@ func toAPIErrorCode(err error) (apiErr APIErrorCode) {
}
// Compression errors
switch err {
case errInvalidDecompressedSize:
apiErr = ErrInvalidDecompressedSize
}
if apiErr != ErrNone {
// If there was a match in the above switch case.
return apiErr

View File

@@ -104,7 +104,11 @@ func setObjectHeaders(w http.ResponseWriter, objInfo ObjectInfo, rs *HTTPRangeSp
if err != nil {
return err
}
case objInfo.IsCompressed():
totalObjectSize = objInfo.GetActualSize()
if totalObjectSize < 0 {
return errInvalidDecompressedSize
}
default:
totalObjectSize = objInfo.Size
}

View File

@@ -244,7 +244,7 @@ func isReqAuthenticated(r *http.Request, region string) (s3Error APIErrorCode) {
// Verify 'Content-Md5' and/or 'X-Amz-Content-Sha256' if present.
// The verification happens implicit during reading.
reader, err := hash.NewReader(r.Body, -1, hex.EncodeToString(contentMD5), hex.EncodeToString(contentSHA256))
reader, err := hash.NewReader(r.Body, -1, hex.EncodeToString(contentMD5), hex.EncodeToString(contentSHA256), -1)
if err != nil {
return toAPIErrorCode(err)
}

View File

@@ -102,7 +102,17 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http
}
for i := range listObjectsV2Info.Objects {
if crypto.IsEncrypted(listObjectsV2Info.Objects[i].UserDefined) {
var actualSize int64
if listObjectsV2Info.Objects[i].IsCompressed() {
// Read the decompressed size from the meta.json.
actualSize = listObjectsV2Info.Objects[i].GetActualSize()
if actualSize < 0 {
writeErrorResponse(w, ErrInvalidDecompressedSize, r.URL)
return
}
// Set the info.Size to the actualSize.
listObjectsV2Info.Objects[i].Size = actualSize
} else if crypto.IsEncrypted(listObjectsV2Info.Objects[i].UserDefined) {
listObjectsV2Info.Objects[i].Size, err = listObjectsV2Info.Objects[i].DecryptedSize()
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
@@ -168,7 +178,17 @@ func (api objectAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *http
}
for i := range listObjectsInfo.Objects {
if crypto.IsEncrypted(listObjectsInfo.Objects[i].UserDefined) {
var actualSize int64
if listObjectsInfo.Objects[i].IsCompressed() {
// Read the decompressed size from the meta.json.
actualSize = listObjectsInfo.Objects[i].GetActualSize()
if actualSize < 0 {
writeErrorResponse(w, ErrInvalidDecompressedSize, r.URL)
return
}
// Set the info.Size to the actualSize.
listObjectsInfo.Objects[i].Size = actualSize
} else if crypto.IsEncrypted(listObjectsInfo.Objects[i].UserDefined) {
listObjectsInfo.Objects[i].Size, err = listObjectsInfo.Objects[i].DecryptedSize()
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
@@ -176,7 +196,6 @@ func (api objectAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *http
}
}
}
response := generateListObjectsV1Response(bucket, prefix, marker, delimiter, maxKeys, listObjectsInfo)
// Write success response.

View File

@@ -590,7 +590,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
return
}
hashReader, err := hash.NewReader(fileBody, fileSize, "", "")
hashReader, err := hash.NewReader(fileBody, fileSize, "", "", fileSize)
if err != nil {
logger.LogIf(ctx, err)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
@@ -614,7 +614,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
return
}
info := ObjectInfo{Size: fileSize}
hashReader, err = hash.NewReader(reader, info.EncryptedSize(), "", "") // do not try to verify encrypted content
hashReader, err = hash.NewReader(reader, info.EncryptedSize(), "", "", fileSize) // do not try to verify encrypted content
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return

View File

@@ -96,7 +96,18 @@ func handleCommonCmdArgs(ctx *cli.Context) {
setConfigDir(configDirAbs)
}
// Parses the given compression exclude list `extensions` or `content-types`.
func parseCompressIncludes(includes []string) ([]string, error) {
for _, e := range includes {
if len(e) == 0 {
return nil, uiErrInvalidCompressionIncludesValue(nil).Msg("extension/mime-type (%s) cannot be empty", e)
}
}
return includes, nil
}
func handleCommonEnvVars() {
compressEnvDelimiter := ","
// Start profiler if env is set.
if profiler := os.Getenv("_MINIO_PROFILER"); profiler != "" {
var err error
@@ -268,4 +279,28 @@ func handleCommonEnvVars() {
globalKMSKeyID = kmsConf.Vault.Key.Name
globalKMSConfig = kmsConf
}
if compress := os.Getenv("MINIO_COMPRESS"); compress != "" {
globalIsCompressionEnabled = strings.EqualFold(compress, "true")
}
compressExtensions := os.Getenv("MINIO_COMPRESS_EXTENSIONS")
compressMimeTypes := os.Getenv("MINIO_COMPRESS_MIMETYPES")
if compressExtensions != "" || compressMimeTypes != "" {
globalIsEnvCompression = true
if compressExtensions != "" {
extensions, err := parseCompressIncludes(strings.Split(compressExtensions, compressEnvDelimiter))
if err != nil {
logger.Fatal(err, "Invalid MINIO_COMPRESS_EXTENSIONS value (`%s`)", extensions)
}
globalCompressExtensions = extensions
}
if compressMimeTypes != "" {
contenttypes, err := parseCompressIncludes(strings.Split(compressMimeTypes, compressEnvDelimiter))
if err != nil {
logger.Fatal(err, "Invalid MINIO_COMPRESS_MIMETYPES value (`%s`)", contenttypes)
}
globalCompressMimeTypes = contenttypes
}
}
}

View File

@@ -40,9 +40,9 @@ import (
// 6. Make changes in config-current_test.go for any test change
// Config version
const serverConfigVersion = "29"
const serverConfigVersion = "30"
type serverConfig = serverConfigV29
type serverConfig = serverConfigV30
var (
// globalServerConfig server config.
@@ -228,6 +228,18 @@ func (s *serverConfig) Validate() error {
return nil
}
// SetCompressionConfig sets the current compression config
func (s *serverConfig) SetCompressionConfig(extensions []string, mimeTypes []string) {
s.Compression.Extensions = extensions
s.Compression.MimeTypes = mimeTypes
s.Compression.Enabled = globalIsCompressionEnabled
}
// GetCompressionConfig gets the current compression config
func (s *serverConfig) GetCompressionConfig() compressionConfig {
return s.Compression
}
func (s *serverConfig) loadFromEnvs() {
// If env is set override the credentials from config file.
if globalIsEnvCreds {
@@ -253,6 +265,10 @@ func (s *serverConfig) loadFromEnvs() {
if globalKMS != nil {
s.KMS = globalKMSConfig
}
if globalIsEnvCompression {
s.SetCompressionConfig(globalCompressExtensions, globalCompressMimeTypes)
}
}
// TestNotificationTargets tries to establish connections to all notification
@@ -366,6 +382,8 @@ func (s *serverConfig) ConfigDiff(t *serverConfig) string {
return "StorageClass configuration differs"
case !reflect.DeepEqual(s.Cache, t.Cache):
return "Cache configuration differs"
case !reflect.DeepEqual(s.Compression, t.Compression):
return "Compression configuration differs"
case !reflect.DeepEqual(s.Notify.AMQP, t.Notify.AMQP):
return "AMQP Notification configuration differs"
case !reflect.DeepEqual(s.Notify.NATS, t.Notify.NATS):
@@ -417,6 +435,11 @@ func newServerConfig() *serverConfig {
},
KMS: crypto.KMSConfig{},
Notify: notifier{},
Compression: compressionConfig{
Enabled: false,
Extensions: globalCompressExtensions,
MimeTypes: globalCompressMimeTypes,
},
}
// Make sure to initialize notification configs.
@@ -480,6 +503,12 @@ func (s *serverConfig) loadToCachedConfigs() {
globalKMSKeyID = globalKMSConfig.Vault.Key.Name
}
}
if !globalIsCompressionEnabled {
compressionConf := s.GetCompressionConfig()
globalCompressExtensions = compressionConf.Extensions
globalCompressMimeTypes = compressionConf.MimeTypes
globalIsCompressionEnabled = compressionConf.Enabled
}
}
// newConfig - initialize a new server config, saves env parameters if

View File

@@ -2387,6 +2387,7 @@ func migrateV27ToV28() error {
// config V28 is backward compatible with V27, load the old
// config file in serverConfigV28 struct and initialize KMSConfig
srvConfig := &serverConfigV28{}
_, err := quick.LoadConfig(configFile, globalEtcdClient, srvConfig)
if os.IsNotExist(err) {
@@ -2409,12 +2410,40 @@ func migrateV27ToV28() error {
return nil
}
// Migrates '.minio.sys/config.json' v27 to v28.
// Migrates '.minio.sys/config.json' to v30.
func migrateMinioSysConfig(objAPI ObjectLayer) error {
if err := migrateV27ToV28MinioSys(objAPI); err != nil {
return err
}
return migrateV28ToV29MinioSys(objAPI)
if err := migrateV28ToV29MinioSys(objAPI); err != nil {
return err
}
return migrateV29ToV30MinioSys(objAPI)
}
func migrateV29ToV30MinioSys(objAPI ObjectLayer) error {
configFile := path.Join(minioConfigPrefix, minioConfigFile)
srvConfig, err := readServerConfig(context.Background(), objAPI)
if err == errConfigNotFound {
return nil
} else if err != nil {
return fmt.Errorf("Unable to load config file. %v", err)
}
if srvConfig.Version != "29" {
return nil
}
srvConfig.Version = "30"
// Init compression config.For future migration, Compression config needs to be copied over from previous version.
srvConfig.Compression.Enabled = false
srvConfig.Compression.Extensions = globalCompressExtensions
srvConfig.Compression.MimeTypes = globalCompressMimeTypes
if err = saveServerConfig(context.Background(), objAPI, srvConfig); err != nil {
return fmt.Errorf("Failed to migrate config from 29 to 30 . %v", err)
}
logger.Info(configMigrateMSGTemplate, configFile, "29", "30")
return nil
}
func migrateV28ToV29MinioSys(objAPI ObjectLayer) error {
@@ -2431,7 +2460,7 @@ func migrateV28ToV29MinioSys(objAPI ObjectLayer) error {
srvConfig.Version = "29"
if err = saveServerConfig(context.Background(), objAPI, srvConfig); err != nil {
return fmt.Errorf("Failed to migrate config from 28 to 29. %v", err)
return fmt.Errorf("Failed to migrate config from ‘28’ to ‘29’. %v", err)
}
logger.Info(configMigrateMSGTemplate, configFile, "28", "29")
@@ -2453,7 +2482,7 @@ func migrateV27ToV28MinioSys(objAPI ObjectLayer) error {
srvConfig.Version = "28"
srvConfig.KMS = crypto.KMSConfig{}
if err = saveServerConfig(context.Background(), objAPI, srvConfig); err != nil {
return fmt.Errorf("Failed to migrate config from 27 to 28. %v", err)
return fmt.Errorf("Failed to migrate config from ‘27’ to ‘28’. %v", err)
}
logger.Info(configMigrateMSGTemplate, configFile, "27", "28")

View File

@@ -318,7 +318,6 @@ func TestServerConfigMigrateFaultyConfig(t *testing.T) {
if err := migrateV26ToV27(); err == nil {
t.Fatal("migrateConfigV26ToV27() should fail with a corrupted json")
}
if err := migrateV27ToV28(); err == nil {
t.Fatal("migrateConfigV27ToV28() should fail with a corrupted json")
}

View File

@@ -755,8 +755,16 @@ type serverConfigV28 struct {
Logger loggerConfig `json:"logger"`
}
// serverConfigV29 is just like version '28', browser and domain are deprecated.
type serverConfigV29 struct {
// compressionConfig represents the compression settings.
type compressionConfig struct {
Enabled bool `json:"enabled"`
Extensions []string `json:"extensions"`
MimeTypes []string `json:"mime-types"`
}
// serverConfigV30 is just like version '29', stores additionally
// extensions and mimetypes fields for compression.
type serverConfigV30 struct {
quick.Config `json:"-"` // ignore interfaces
Version string `json:"version"`
@@ -780,4 +788,7 @@ type serverConfigV29 struct {
// Logger configuration
Logger loggerConfig `json:"logger"`
// Compression configuration
Compression compressionConfig `json:"compress"`
}

View File

@@ -172,7 +172,7 @@ func checkServerConfig(ctx context.Context, objAPI ObjectLayer) error {
}
func saveConfig(objAPI ObjectLayer, configFile string, data []byte) error {
hashReader, err := hash.NewReader(bytes.NewReader(data), int64(len(data)), "", getSHA256Hash(data))
hashReader, err := hash.NewReader(bytes.NewReader(data), int64(len(data)), "", getSHA256Hash(data), int64(len(data)))
if err != nil {
return err
}

View File

@@ -354,7 +354,7 @@ func (cfs *cacheFSObjects) PutObject(ctx context.Context, bucket string, object
}
// Validate input data size and it can never be less than zero.
if data.Size() < 0 {
if data.Size() < -1 {
logger.LogIf(ctx, errInvalidArgument)
return ObjectInfo{}, errInvalidArgument
}

View File

@@ -244,7 +244,7 @@ func (c cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string,
// Initialize pipe.
pipeReader, pipeWriter := io.Pipe()
teeReader := io.TeeReader(bkReader, pipeWriter)
hashReader, herr := hash.NewReader(pipeReader, bkReader.ObjInfo.Size, "", "")
hashReader, herr := hash.NewReader(pipeReader, bkReader.ObjInfo.Size, "", "", bkReader.ObjInfo.Size)
if herr != nil {
bkReader.Close()
return nil, herr
@@ -314,7 +314,7 @@ func (c cacheObjects) GetObject(ctx context.Context, bucket, object string, star
}
// Initialize pipe.
pipeReader, pipeWriter := io.Pipe()
hashReader, err := hash.NewReader(pipeReader, objInfo.Size, "", "")
hashReader, err := hash.NewReader(pipeReader, objInfo.Size, "", "", objInfo.Size)
if err != nil {
return err
}
@@ -671,13 +671,13 @@ func (c cacheObjects) PutObject(ctx context.Context, bucket, object string, r *h
objInfo = ObjectInfo{}
// Initialize pipe to stream data to backend
pipeReader, pipeWriter := io.Pipe()
hashReader, err := hash.NewReader(pipeReader, size, r.MD5HexString(), r.SHA256HexString())
hashReader, err := hash.NewReader(pipeReader, size, r.MD5HexString(), r.SHA256HexString(), r.ActualSize())
if err != nil {
return ObjectInfo{}, err
}
// Initialize pipe to stream data to cache
rPipe, wPipe := io.Pipe()
cHashReader, err := hash.NewReader(rPipe, size, r.MD5HexString(), r.SHA256HexString())
cHashReader, err := hash.NewReader(rPipe, size, r.MD5HexString(), r.SHA256HexString(), r.ActualSize())
if err != nil {
return ObjectInfo{}, err
}
@@ -764,13 +764,13 @@ func (c cacheObjects) PutObjectPart(ctx context.Context, bucket, object, uploadI
info = PartInfo{}
// Initialize pipe to stream data to backend
pipeReader, pipeWriter := io.Pipe()
hashReader, err := hash.NewReader(pipeReader, size, data.MD5HexString(), data.SHA256HexString())
hashReader, err := hash.NewReader(pipeReader, size, data.MD5HexString(), data.SHA256HexString(), data.ActualSize())
if err != nil {
return
}
// Initialize pipe to stream data to cache
rPipe, wPipe := io.Pipe()
cHashReader, err := hash.NewReader(rPipe, size, data.MD5HexString(), data.SHA256HexString())
cHashReader, err := hash.NewReader(rPipe, size, data.MD5HexString(), data.SHA256HexString(), data.ActualSize())
if err != nil {
return
}

View File

@@ -195,7 +195,7 @@ func TestDiskCache(t *testing.T) {
opts := ObjectOptions{}
byteReader := bytes.NewReader([]byte(content))
hashReader, err := hash.NewReader(byteReader, int64(size), "", "")
hashReader, err := hash.NewReader(byteReader, int64(size), "", "", int64(size))
if err != nil {
t.Fatal(err)
}
@@ -270,7 +270,7 @@ func TestDiskCacheMaxUse(t *testing.T) {
opts := ObjectOptions{}
byteReader := bytes.NewReader([]byte(content))
hashReader, err := hash.NewReader(byteReader, int64(size), "", "")
hashReader, err := hash.NewReader(byteReader, int64(size), "", "", int64(size))
if err != nil {
t.Fatal(err)
}

View File

@@ -159,3 +159,7 @@ func (api *DummyObjectLayer) IsNotificationSupported() (b bool) {
func (api *DummyObjectLayer) IsEncryptionSupported() (b bool) {
return
}
func (api *DummyObjectLayer) IsCompressionSupported() (b bool) {
return
}

View File

@@ -91,6 +91,7 @@ func writeDataBlocks(ctx context.Context, dst io.Writer, enBlocks [][]byte, data
// Copy the block.
n, err := io.Copy(dst, bytes.NewReader(block))
if err != nil {
// The writer will be closed incase of range queries, which will emit ErrClosedPipe.
if !strings.Contains(err.Error(), "read/write on closed pipe") {
logger.LogIf(ctx, err)
}

View File

@@ -223,11 +223,13 @@ func parseFSPartsArray(fsMetaBuf []byte) []objectPartInfo {
name := gjson.Get(partJSON, "name").String()
etag := gjson.Get(partJSON, "etag").String()
size := gjson.Get(partJSON, "size").Int()
actualSize := gjson.Get(partJSON, "actualSize").Int()
partsArray = append(partsArray, objectPartInfo{
Number: int(number),
Name: name,
ETag: etag,
Size: size,
Number: int(number),
Name: name,
ETag: etag,
Size: size,
ActualSize: int64(actualSize),
})
return true
})

View File

@@ -46,21 +46,25 @@ func (fs *FSObjects) getMultipartSHADir(bucket, object string) string {
}
// Returns partNumber.etag
func (fs *FSObjects) encodePartFile(partNumber int, etag string) string {
return fmt.Sprintf("%.5d.%s", partNumber, etag)
func (fs *FSObjects) encodePartFile(partNumber int, etag string, actualSize int64) string {
return fmt.Sprintf("%.5d.%s.%d", partNumber, etag, actualSize)
}
// Returns partNumber and etag
func (fs *FSObjects) decodePartFile(name string) (partNumber int, etag string, err error) {
func (fs *FSObjects) decodePartFile(name string) (partNumber int, etag string, actualSize int64, err error) {
result := strings.Split(name, ".")
if len(result) != 2 {
return 0, "", errUnexpected
if len(result) != 3 {
return 0, "", 0, errUnexpected
}
partNumber, err = strconv.Atoi(result[0])
if err != nil {
return 0, "", errUnexpected
return 0, "", 0, errUnexpected
}
return partNumber, result[1], nil
actualSize, err = strconv.ParseInt(result[2], 10, 64)
if err != nil {
return 0, "", 0, errUnexpected
}
return partNumber, result[1], actualSize, nil
}
// Appends parts to an appendFile sequentially.
@@ -95,7 +99,7 @@ func (fs *FSObjects) backgroundAppend(ctx context.Context, bucket, object, uploa
if entry == fs.metaJSONFile {
continue
}
partNumber, etag, err := fs.decodePartFile(entry)
partNumber, etag, actualSize, err := fs.decodePartFile(entry)
if err != nil {
logger.GetReqInfo(ctx).AppendTags("entry", entry)
logger.LogIf(ctx, err)
@@ -119,7 +123,7 @@ func (fs *FSObjects) backgroundAppend(ctx context.Context, bucket, object, uploa
return
}
file.parts = append(file.parts, PartInfo{PartNumber: nextPartNumber, ETag: etag})
file.parts = append(file.parts, PartInfo{PartNumber: partNumber, ETag: etag, ActualSize: actualSize})
nextPartNumber++
}
}
@@ -276,8 +280,8 @@ func (fs *FSObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
return pi, toObjectErr(err, bucket)
}
// Validate input data size and it can never be less than zero.
if data.Size() < 0 {
// Validate input data size and it can never be less than -1.
if data.Size() < -1 {
logger.LogIf(ctx, errInvalidArgument)
return pi, toObjectErr(errInvalidArgument)
}
@@ -322,7 +326,7 @@ func (fs *FSObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
if etag == "" {
etag = GenETag()
}
partPath := pathJoin(uploadIDDir, fs.encodePartFile(partID, etag))
partPath := pathJoin(uploadIDDir, fs.encodePartFile(partID, etag, data.ActualSize()))
if err = fsRenameFile(ctx, tmpPartPath, partPath); err != nil {
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
@@ -339,6 +343,7 @@ func (fs *FSObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
LastModified: fi.ModTime(),
ETag: etag,
Size: fi.Size(),
ActualSize: data.ActualSize(),
}, nil
}
@@ -365,8 +370,7 @@ func (fs *FSObjects) ListObjectParts(ctx context.Context, bucket, object, upload
}
uploadIDDir := fs.getUploadIDDir(bucket, object, uploadID)
_, err := fsStatFile(ctx, pathJoin(uploadIDDir, fs.metaJSONFile))
if err != nil {
if _, err := fsStatFile(ctx, pathJoin(uploadIDDir, fs.metaJSONFile)); err != nil {
if err == errFileNotFound || err == errFileAccessDenied {
return result, InvalidUploadID{UploadID: uploadID}
}
@@ -384,7 +388,7 @@ func (fs *FSObjects) ListObjectParts(ctx context.Context, bucket, object, upload
if entry == fs.metaJSONFile {
continue
}
partNumber, etag1, derr := fs.decodePartFile(entry)
partNumber, etag1, _, derr := fs.decodePartFile(entry)
if derr != nil {
logger.LogIf(ctx, derr)
return result, toObjectErr(derr)
@@ -394,11 +398,11 @@ func (fs *FSObjects) ListObjectParts(ctx context.Context, bucket, object, upload
partsMap[partNumber] = etag1
continue
}
stat1, serr := fsStatFile(ctx, pathJoin(uploadIDDir, fs.encodePartFile(partNumber, etag1)))
stat1, serr := fsStatFile(ctx, pathJoin(uploadIDDir, getPartFile(entries, partNumber, etag1)))
if serr != nil {
return result, toObjectErr(serr)
}
stat2, serr := fsStatFile(ctx, pathJoin(uploadIDDir, fs.encodePartFile(partNumber, etag2)))
stat2, serr := fsStatFile(ctx, pathJoin(uploadIDDir, getPartFile(entries, partNumber, etag2)))
if serr != nil {
return result, toObjectErr(serr)
}
@@ -408,8 +412,19 @@ func (fs *FSObjects) ListObjectParts(ctx context.Context, bucket, object, upload
}
var parts []PartInfo
var actualSize int64
for partNumber, etag := range partsMap {
parts = append(parts, PartInfo{PartNumber: partNumber, ETag: etag})
partFile := getPartFile(entries, partNumber, etag)
if partFile == "" {
return result, InvalidPart{}
}
// Read the actualSize from the pathFileName.
subParts := strings.Split(partFile, ".")
actualSize, err = strconv.ParseInt(subParts[len(subParts)-1], 10, 64)
if err != nil {
return result, InvalidPart{}
}
parts = append(parts, PartInfo{PartNumber: partNumber, ETag: etag, ActualSize: actualSize})
}
sort.Slice(parts, func(i int, j int) bool {
return parts[i].PartNumber < parts[j].PartNumber
@@ -439,12 +454,12 @@ func (fs *FSObjects) ListObjectParts(ctx context.Context, bucket, object, upload
}
for i, part := range result.Parts {
var stat os.FileInfo
stat, err = fsStatFile(ctx, pathJoin(uploadIDDir, fs.encodePartFile(part.PartNumber, part.ETag)))
stat, err = fsStatFile(ctx, pathJoin(uploadIDDir, fs.encodePartFile(part.PartNumber, part.ETag, part.ActualSize)))
if err != nil {
return result, toObjectErr(err)
}
result.Parts[i].LastModified = stat.ModTime()
result.Parts[i].Size = stat.Size()
result.Parts[i].Size = part.ActualSize
}
fsMetaBytes, err := ioutil.ReadFile(pathJoin(uploadIDDir, fs.metaJSONFile))
@@ -464,6 +479,9 @@ func (fs *FSObjects) ListObjectParts(ctx context.Context, bucket, object, upload
//
// Implements S3 compatible Complete multipart API.
func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, parts []CompletePart) (oi ObjectInfo, e error) {
var actualSize int64
if err := checkCompleteMultipartArgs(ctx, bucket, object, fs); err != nil {
return oi, toObjectErr(err)
}
@@ -500,9 +518,37 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
// Allocate parts similar to incoming slice.
fsMeta.Parts = make([]objectPartInfo, len(parts))
entries, err := readDir(uploadIDDir)
if err != nil {
logger.GetReqInfo(ctx).AppendTags("uploadIDDir", uploadIDDir)
logger.LogIf(ctx, err)
return oi, err
}
// Save consolidated actual size.
var objectActualSize int64
// Validate all parts and then commit to disk.
for i, part := range parts {
partPath := pathJoin(uploadIDDir, fs.encodePartFile(part.PartNumber, part.ETag))
partFile := getPartFile(entries, part.PartNumber, part.ETag)
if partFile == "" {
return oi, InvalidPart{
PartNumber: part.PartNumber,
GotETag: part.ETag,
}
}
// Read the actualSize from the pathFileName.
subParts := strings.Split(partFile, ".")
actualSize, err = strconv.ParseInt(subParts[len(subParts)-1], 10, 64)
if err != nil {
return oi, InvalidPart{
PartNumber: part.PartNumber,
GotETag: part.ETag,
}
}
partPath := pathJoin(uploadIDDir, partFile)
var fi os.FileInfo
fi, err = fsStatFile(ctx, partPath)
if err != nil {
@@ -512,24 +558,28 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
return oi, err
}
if partSize == -1 {
partSize = fi.Size()
partSize = actualSize
}
fsMeta.Parts[i] = objectPartInfo{
Number: part.PartNumber,
ETag: part.ETag,
Size: fi.Size(),
Number: part.PartNumber,
ETag: part.ETag,
Size: fi.Size(),
ActualSize: actualSize,
}
// Consolidate the actual size.
objectActualSize += actualSize
if i == len(parts)-1 {
break
}
// All parts except the last part has to be atleast 5MB.
if !isMinAllowedPartSize(fi.Size()) {
if !isMinAllowedPartSize(actualSize) {
return oi, PartTooSmall{
PartNumber: part.PartNumber,
PartSize: fi.Size(),
PartSize: actualSize,
PartETag: part.ETag,
}
}
@@ -573,7 +623,8 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
if appendFallback {
fsRemoveFile(ctx, file.filePath)
for _, part := range parts {
partPath := pathJoin(uploadIDDir, fs.encodePartFile(part.PartNumber, part.ETag))
partPath := getPartFile(entries, part.PartNumber, part.ETag)
partPath = pathJoin(uploadIDDir, partPath)
err = mioutil.AppendFile(appendFilePath, partPath)
if err != nil {
logger.LogIf(ctx, err)
@@ -612,6 +663,8 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
fsMeta.Meta = make(map[string]string)
}
fsMeta.Meta["etag"] = s3MD5
// Save consolidated actual size.
fsMeta.Meta[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(objectActualSize, 10)
if _, err = fsMeta.WriteTo(metaFile); err != nil {
logger.LogIf(ctx, err)
return oi, toObjectErr(err, bucket, object)

View File

@@ -662,7 +662,10 @@ func (fs *FSObjects) getObject(ctx context.Context, bucket, object string, offse
buf := make([]byte, int(bufSize))
_, err = io.CopyBuffer(writer, io.LimitReader(reader, length), buf)
logger.LogIf(ctx, err)
// The writer will be closed incase of range queries, which will emit ErrClosedPipe.
if err == io.ErrClosedPipe {
err = nil
}
return toObjectErr(err, bucket, object)
}
@@ -882,7 +885,7 @@ func (fs *FSObjects) putObject(ctx context.Context, bucket string, object string
}
// Validate input data size and it can never be less than zero.
if data.Size() < 0 {
if data.Size() < -1 {
logger.LogIf(ctx, errInvalidArgument)
return ObjectInfo{}, errInvalidArgument
}
@@ -1325,3 +1328,8 @@ func (fs *FSObjects) IsNotificationSupported() bool {
func (fs *FSObjects) IsEncryptionSupported() bool {
return true
}
// IsCompressionSupported returns whether compression is applicable for this layer.
func (fs *FSObjects) IsCompressionSupported() bool {
return true
}

View File

@@ -151,3 +151,8 @@ func (a GatewayUnsupported) IsNotificationSupported() bool {
func (a GatewayUnsupported) IsEncryptionSupported() bool {
return false
}
// IsCompressionSupported returns whether compression is applicable for this layer.
func (a GatewayUnsupported) IsCompressionSupported() bool {
return false
}

View File

@@ -1148,3 +1148,8 @@ func (a *azureObjects) DeleteBucketPolicy(ctx context.Context, bucket string) er
err := container.SetPermissions(perm, nil)
return azureToObjectError(err)
}
// IsCompressionSupported returns whether compression is applicable for this layer.
func (a *azureObjects) IsCompressionSupported() bool {
return false
}

View File

@@ -840,3 +840,8 @@ func (l *b2Objects) DeleteBucketPolicy(ctx context.Context, bucket string) error
logger.LogIf(ctx, err)
return b2ToObjectError(err)
}
// IsCompressionSupported returns whether compression is applicable for this layer.
func (l *b2Objects) IsCompressionSupported() bool {
return false
}

View File

@@ -1453,3 +1453,8 @@ func (l *gcsGateway) DeleteBucketPolicy(ctx context.Context, bucket string) erro
return nil
}
// IsCompressionSupported returns whether compression is applicable for this layer.
func (l *gcsGateway) IsCompressionSupported() bool {
return false
}

View File

@@ -659,3 +659,8 @@ func (t *tritonObjects) DeleteObject(ctx context.Context, bucket, object string)
return nil
}
// IsCompressionSupported returns whether compression is applicable for this layer.
func (t *tritonObjects) IsCompressionSupported() bool {
return false
}

View File

@@ -125,3 +125,8 @@ type nasObjects struct {
func (l *nasObjects) IsNotificationSupported() bool {
return false
}
// IsCompressionSupported returns whether compression is applicable for this layer.
func (l *nasObjects) IsCompressionSupported() bool {
return false
}

View File

@@ -1095,3 +1095,8 @@ func (l *ossObjects) DeleteBucketPolicy(ctx context.Context, bucket string) erro
}
return nil
}
// IsCompressionSupported returns whether compression is applicable for this layer.
func (l *ossObjects) IsCompressionSupported() bool {
return false
}

View File

@@ -538,3 +538,8 @@ func (l *s3Objects) DeleteBucketPolicy(ctx context.Context, bucket string) error
}
return nil
}
// IsCompressionSupported returns whether compression is applicable for this layer.
func (l *s3Objects) IsCompressionSupported() bool {
return false
}

View File

@@ -651,3 +651,8 @@ func (s *siaObjects) deleteTempFileWhenUploadCompletes(ctx context.Context, temp
os.Remove(tempFile)
}
// IsCompressionSupported returns whether compression is applicable for this layer.
func (s *siaObjects) IsCompressionSupported() bool {
return false
}

View File

@@ -227,6 +227,23 @@ var (
globalKMS crypto.KMS
// KMS config
globalKMSConfig crypto.KMSConfig
// Is compression include extensions/content-types set.
globalIsEnvCompression bool
// Is compression enabeld.
globalIsCompressionEnabled = false
// Include-list for compression.
globalCompressExtensions = []string{".txt", ".log", ".csv", ".json"}
globalCompressMimeTypes = []string{"text/csv", "text/plain", "application/json"}
// Some standard object extensions which we strictly dis-allow for compression.
standardExcludeCompressExtensions = []string{".gz", ".bz2", ".rar", ".zip", ".7z"}
// Some standard content-types which we strictly dis-allow for compression.
standardExcludeCompressContentTypes = []string{"video/*", "audio/*", "application/zip", "application/x-gzip", "application/x-zip-compressed", " application/x-compress", "application/x-spoon"}
// Add new variable global values here.
)

View File

@@ -258,6 +258,9 @@ type PartInfo struct {
// Size in bytes of the part.
Size int64
// Decompressed Size.
ActualSize int64
}
// MultipartInfo - represents metadata in progress multipart upload.

View File

@@ -96,4 +96,7 @@ type ObjectLayer interface {
// Supported operations check
IsNotificationSupported() bool
IsEncryptionSupported() bool
// Compression support check.
IsCompressionSupported() bool
}

View File

@@ -25,15 +25,18 @@ import (
"net/http"
"path"
"runtime"
"strconv"
"strings"
"sync"
"time"
"unicode/utf8"
snappy "github.com/golang/snappy"
"github.com/minio/minio/cmd/crypto"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/dns"
"github.com/minio/minio/pkg/ioutil"
"github.com/minio/minio/pkg/wildcard"
"github.com/skyrings/skyring-common/tools/uuid"
)
@@ -301,6 +304,107 @@ func getRandomHostPort(records []dns.SrvRecord) (string, int) {
return srvRecord.Host, srvRecord.Port
}
// IsCompressed returns true if the object is marked as compressed.
func (o ObjectInfo) IsCompressed() bool {
_, ok := o.UserDefined[ReservedMetadataPrefix+"compression"]
return ok
}
// GetActualSize - read the decompressed size from the meta json.
func (o ObjectInfo) GetActualSize() int64 {
metadata := o.UserDefined
sizeStr, ok := metadata[ReservedMetadataPrefix+"actual-size"]
if ok {
size, err := strconv.ParseInt(sizeStr, 10, 64)
if err == nil {
return size
}
}
return -1
}
// Disabling compression for encrypted enabled requests.
// Using compression and encryption together enables room for side channel attacks.
// Eliminate non-compressible objects by extensions/content-types.
func isCompressible(header http.Header, object string) bool {
if hasServerSideEncryptionHeader(header) || excludeForCompression(header, object) {
return false
}
return true
}
// Eliminate the non-compressible objects.
func excludeForCompression(header http.Header, object string) bool {
objStr := object
contentType := header.Get("Content-Type")
if globalIsCompressionEnabled {
// We strictly disable compression for standard extensions/content-types (`compressed`).
if hasStringSuffixInSlice(objStr, standardExcludeCompressExtensions) || hasPattern(standardExcludeCompressContentTypes, contentType) {
return true
}
// Filter compression includes.
if len(globalCompressExtensions) > 0 || len(globalCompressMimeTypes) > 0 {
extensions := globalCompressExtensions
mimeTypes := globalCompressMimeTypes
if hasStringSuffixInSlice(objStr, extensions) || hasPattern(mimeTypes, contentType) {
return false
}
return true
}
return false
}
return true
}
// Utility which returns if a string is present in the list.
func hasStringSuffixInSlice(str string, list []string) bool {
for _, v := range list {
if strings.HasSuffix(str, v) {
return true
}
}
return false
}
// Returns true if any of the given wildcard patterns match the matchStr.
func hasPattern(patterns []string, matchStr string) bool {
for _, pattern := range patterns {
if ok := wildcard.MatchSimple(pattern, matchStr); ok {
return true
}
}
return false
}
// Returns the part file name which matches the partNumber and etag.
func getPartFile(entries []string, partNumber int, etag string) string {
for _, entry := range entries {
if strings.HasPrefix(entry, fmt.Sprintf("%.5d.%s.", partNumber, etag)) {
return entry
}
}
return ""
}
// Returs the compressed offset which should be skipped.
func getCompressedOffsets(objectInfo ObjectInfo, offset int64) (int64, int64) {
var compressedOffset int64
var skipLength int64
var cumulativeActualSize int64
if len(objectInfo.Parts) > 0 {
for _, part := range objectInfo.Parts {
cumulativeActualSize += part.ActualSize
if cumulativeActualSize <= offset {
compressedOffset += part.Size
} else {
skipLength = cumulativeActualSize - part.ActualSize
break
}
}
}
return compressedOffset, offset - skipLength
}
// byBucketName is a collection satisfying sort.Interface.
type byBucketName []BucketInfo
@@ -352,6 +456,7 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, cleanUpFns ...func())
}()
isEncrypted := crypto.IsEncrypted(oi.UserDefined)
isCompressed := oi.IsCompressed()
var skipLen int64
// Calculate range to read (different for
// e.g. encrypted/compressed objects)
@@ -407,6 +512,50 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, cleanUpFns ...func())
}
return r, nil
}
case isCompressed:
// Read the decompressed size from the meta.json.
actualSize := oi.GetActualSize()
if actualSize < 0 {
return nil, 0, 0, errInvalidDecompressedSize
}
off, length = int64(0), oi.Size
decOff, decLength := int64(0), actualSize
if rs != nil {
off, length, err = rs.GetOffsetLength(actualSize)
if err != nil {
return nil, 0, 0, err
}
// Incase of range based queries on multiparts, the offset and length are reduced.
off, decOff = getCompressedOffsets(oi, off)
decLength = length
length = oi.Size - off
// For negative length we read everything.
if decLength < 0 {
decLength = actualSize - decOff
}
// Reply back invalid range if the input offset and length fall out of range.
if decOff > actualSize || decOff+decLength > actualSize {
return nil, 0, 0, errInvalidRange
}
}
fn = func(inputReader io.Reader, _ http.Header, cFns ...func()) (r *GetObjectReader, err error) {
// Decompression reader.
snappyReader := snappy.NewReader(inputReader)
// Apply the skipLen and limit on the
// decompressed stream
decReader := io.LimitReader(ioutil.NewSkipReader(snappyReader, decOff), decLength)
oi.Size = decLength
// Assemble the GetObjectReader
r = &GetObjectReader{
ObjInfo: oi,
pReader: decReader,
cleanUpFns: append(cleanUpFns, cFns...),
}
return r, nil
}
default:
off, length, err = rs.GetOffsetLength(oi.Size)

View File

@@ -18,6 +18,7 @@ package cmd
import (
"context"
"net/http"
"reflect"
"testing"
)
@@ -296,3 +297,250 @@ func TestCleanMetadataKeys(t *testing.T) {
}
}
}
// Tests isCompressed method
func TestIsCompressed(t *testing.T) {
testCases := []struct {
objInfo ObjectInfo
result bool
}{
{
objInfo: ObjectInfo{
UserDefined: map[string]string{"X-Minio-Internal-compression": "golang/snappy/LZ77",
"content-type": "application/octet-stream",
"etag": "b3ff3ef3789147152fbfbc50efba4bfd-2"},
},
result: true,
},
{
objInfo: ObjectInfo{
UserDefined: map[string]string{"X-Minio-Internal-XYZ": "golang/snappy/LZ77",
"content-type": "application/octet-stream",
"etag": "b3ff3ef3789147152fbfbc50efba4bfd-2"},
},
result: false,
},
{
objInfo: ObjectInfo{
UserDefined: map[string]string{"content-type": "application/octet-stream",
"etag": "b3ff3ef3789147152fbfbc50efba4bfd-2"},
},
result: false,
},
}
for i, test := range testCases {
got := test.objInfo.IsCompressed()
if got != test.result {
t.Errorf("Test %d - expected %v but received %v",
i+1, test.result, got)
}
}
}
// Tests excludeForCompression.
func TestExcludeForCompression(t *testing.T) {
testCases := []struct {
object string
header http.Header
result bool
}{
{
object: "object.txt",
header: http.Header{
"Content-Type": []string{"application/zip"},
},
result: true,
},
{
object: "object.zip",
header: http.Header{
"Content-Type": []string{"application/XYZ"},
},
result: true,
},
{
object: "object.json",
header: http.Header{
"Content-Type": []string{"application/json"},
},
result: false,
},
{
object: "object.txt",
header: http.Header{
"Content-Type": []string{"text/plain"},
},
result: false,
},
}
for i, test := range testCases {
globalIsCompressionEnabled = true
got := excludeForCompression(test.header, test.object)
globalIsCompressionEnabled = false
if got != test.result {
t.Errorf("Test %d - expected %v but received %v",
i+1, test.result, got)
}
}
}
// Test getPartFile function.
func TestGetPartFile(t *testing.T) {
testCases := []struct {
entries []string
partNumber int
etag string
result string
}{
{
entries: []string{"00001.8a034f82cb9cb31140d87d3ce2a9ede3.67108864", "fs.json", "00002.d73d8ab724016dfb051e2d3584495c54.32891137"},
partNumber: 1,
etag: "8a034f82cb9cb31140d87d3ce2a9ede3",
result: "00001.8a034f82cb9cb31140d87d3ce2a9ede3.67108864",
},
{
entries: []string{"00001.8a034f82cb9cb31140d87d3ce2a9ede3.67108864", "fs.json", "00002.d73d8ab724016dfb051e2d3584495c54.32891137"},
partNumber: 2,
etag: "d73d8ab724016dfb051e2d3584495c54",
result: "00002.d73d8ab724016dfb051e2d3584495c54.32891137",
},
{
entries: []string{"00001.8a034f82cb9cb31140d87d3ce2a9ede3.67108864", "fs.json", "00002.d73d8ab724016dfb051e2d3584495c54.32891137"},
partNumber: 1,
etag: "d73d8ab724016dfb051e2d3584495c54",
result: "",
},
}
for i, test := range testCases {
got := getPartFile(test.entries, test.partNumber, test.etag)
if got != test.result {
t.Errorf("Test %d - expected %s but received %s",
i+1, test.result, got)
}
}
}
func TestGetActualSize(t *testing.T) {
testCases := []struct {
objInfo ObjectInfo
result int64
}{
{
objInfo: ObjectInfo{
UserDefined: map[string]string{"X-Minio-Internal-compression": "golang/snappy/LZ77",
"X-Minio-Internal-actual-size": "100000001",
"content-type": "application/octet-stream",
"etag": "b3ff3ef3789147152fbfbc50efba4bfd-2"},
Parts: []objectPartInfo{
{
Size: 39235668,
ActualSize: 67108864,
},
{
Size: 19177372,
ActualSize: 32891137,
},
},
},
result: 100000001,
},
{
objInfo: ObjectInfo{
UserDefined: map[string]string{"X-Minio-Internal-compression": "golang/snappy/LZ77",
"X-Minio-Internal-actual-size": "841",
"content-type": "application/octet-stream",
"etag": "b3ff3ef3789147152fbfbc50efba4bfd-2"},
Parts: []objectPartInfo{},
},
result: 841,
},
{
objInfo: ObjectInfo{
UserDefined: map[string]string{"X-Minio-Internal-compression": "golang/snappy/LZ77",
"content-type": "application/octet-stream",
"etag": "b3ff3ef3789147152fbfbc50efba4bfd-2"},
Parts: []objectPartInfo{},
},
result: -1,
},
}
for i, test := range testCases {
got := test.objInfo.GetActualSize()
if got != test.result {
t.Errorf("Test %d - expected %d but received %d",
i+1, test.result, got)
}
}
}
func TestGetCompressedOffsets(t *testing.T) {
testCases := []struct {
objInfo ObjectInfo
offset int64
startOffset int64
snappyStartOffset int64
}{
{
objInfo: ObjectInfo{
Parts: []objectPartInfo{
{
Size: 39235668,
ActualSize: 67108864,
},
{
Size: 19177372,
ActualSize: 32891137,
},
},
},
offset: 79109865,
startOffset: 39235668,
snappyStartOffset: 12001001,
},
{
objInfo: ObjectInfo{
Parts: []objectPartInfo{
{
Size: 39235668,
ActualSize: 67108864,
},
{
Size: 19177372,
ActualSize: 32891137,
},
},
},
offset: 19109865,
startOffset: 0,
snappyStartOffset: 19109865,
},
{
objInfo: ObjectInfo{
Parts: []objectPartInfo{
{
Size: 39235668,
ActualSize: 67108864,
},
{
Size: 19177372,
ActualSize: 32891137,
},
},
},
offset: 0,
startOffset: 0,
snappyStartOffset: 0,
},
}
for i, test := range testCases {
startOffset, snappyStartOffset := getCompressedOffsets(test.objInfo, test.offset)
if startOffset != test.startOffset {
t.Errorf("Test %d - expected startOffset %d but received %d",
i+1, test.startOffset, startOffset)
}
if snappyStartOffset != test.snappyStartOffset {
t.Errorf("Test %d - expected snappyOffset %d but received %d",
i+1, test.snappyStartOffset, snappyStartOffset)
}
}
}

View File

@@ -31,6 +31,7 @@ import (
"strconv"
"strings"
snappy "github.com/golang/snappy"
"github.com/gorilla/mux"
miniogo "github.com/minio/minio-go"
"github.com/minio/minio/cmd/crypto"
@@ -56,6 +57,10 @@ var supportedHeadGetReqParams = map[string]string{
"response-content-disposition": "Content-Disposition",
}
const (
compressionAlgorithmV1 = "golang/snappy/LZ77"
)
// setHeadGetRespHeaders - set any requested parameters as response headers.
func setHeadGetRespHeaders(w http.ResponseWriter, reqParams url.Values) {
for k, v := range reqParams {
@@ -402,7 +407,6 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
if !httpWriter.HasWritten() && !statusCodeWritten { // write error response only if no data or headers has been written to client yet
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
}
httpWriter.Close()
return
}
@@ -718,9 +722,50 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
srcInfo.metadataOnly = true
}
var reader io.Reader = gr
// Checks if a remote putobject call is needed for CopyObject operation
// 1. If source and destination bucket names are same, it means no call needed to etcd to get destination info
// 2. If destination bucket doesn't exist locally, only then a etcd call is needed
var isRemoteCallRequired = func(ctx context.Context, src, dst string, objAPI ObjectLayer) bool {
if src == dst {
return false
}
_, berr := objAPI.GetBucketInfo(ctx, dst)
return berr == toObjectErr(errVolumeNotFound, dst)
}
srcInfo.Reader, err = hash.NewReader(reader, srcInfo.Size, "", "")
var reader io.Reader
var length = srcInfo.Size
// No need to compress for remote etcd calls
// Pass the decompressed stream to such calls.
if srcInfo.IsCompressed() && !isRemoteCallRequired(ctx, srcBucket, dstBucket, objectAPI) {
var sreader io.Reader
var swriter io.Writer
// Open a pipe for compression.
// Where snappyWriter is piped to srcInfo.Reader.
// gr writes to snappyWriter.
snappyReader, snappyWriter := io.Pipe()
reader = snappyReader
length = -1
swriter = snappy.NewWriter(snappyWriter)
sreader = gr
go func() {
defer snappyWriter.Close()
// Compress the decompressed source object.
if _, err = io.Copy(swriter, sreader); err != nil {
return
}
}()
} else {
// Remove the metadata for remote calls.
delete(srcInfo.UserDefined, ReservedMetadataPrefix+"compression")
delete(srcInfo.UserDefined, ReservedMetadataPrefix+"actual-size")
reader = gr
}
srcInfo.Reader, err = hash.NewReader(reader, length, "", "", srcInfo.Size)
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
@@ -731,7 +776,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
size := srcInfo.Size
var encMetadata = make(map[string]string)
if objectAPI.IsEncryptionSupported() {
if objectAPI.IsEncryptionSupported() && !srcInfo.IsCompressed() {
var oldKey, newKey []byte
sseCopyS3 := crypto.S3.IsEncrypted(srcInfo.UserDefined)
sseCopyC := crypto.SSECopy.IsRequested(r.Header)
@@ -802,7 +847,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
}
}
srcInfo.Reader, err = hash.NewReader(reader, size, "", "") // do not try to verify encrypted content
srcInfo.Reader, err = hash.NewReader(reader, size, "", "", size) // do not try to verify encrypted content
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
@@ -839,17 +884,6 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
var objInfo ObjectInfo
// Checks if a remote putobject call is needed for CopyObject operation
// 1. If source and destination bucket names are same, it means no call needed to etcd to get destination info
// 2. If destination bucket doesn't exist locally, only then a etcd call is needed
var isRemoteCallRequired = func(ctx context.Context, src, dst string, objAPI ObjectLayer) bool {
if src == dst {
return false
}
_, berr := objAPI.GetBucketInfo(ctx, dst)
return berr == toObjectErr(errVolumeNotFound, dst)
}
// Returns a minio-go Client configured to access remote host described by destDNSRecord
// Applicable only in a federated deployment
var getRemoteInstanceClient = func(host string, port int) (*miniogo.Core, error) {
@@ -905,6 +939,10 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
host, port = "", ""
}
if srcInfo.IsCompressed() {
objInfo.Size = srcInfo.GetActualSize()
}
// Notify object created event.
sendEvent(eventArgs{
EventName: event.ObjectCreatedCopy,
@@ -1064,7 +1102,40 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
}
}
hashReader, err := hash.NewReader(reader, size, md5hex, sha256hex)
var hashError error
actualSize := size
if objectAPI.IsCompressionSupported() && isCompressible(r.Header, object) && size > 0 {
// Storing the compression metadata.
metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV1
metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(size, 10)
pipeReader, pipeWriter := io.Pipe()
snappyWriter := snappy.NewWriter(pipeWriter)
actualReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize)
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
go func() {
defer pipeWriter.Close()
defer snappyWriter.Close()
// Writing to the compressed writer.
_, err = io.CopyN(snappyWriter, actualReader, actualSize)
if err != nil {
hashError = err
return
}
}()
// Set compression metrics.
size = -1 // Since compressed size is un-predictable.
md5hex = "" // Do not try to verify the content.
sha256hex = ""
reader = pipeReader
}
hashReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize)
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
@@ -1086,7 +1157,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
return
}
info := ObjectInfo{Size: size}
hashReader, err = hash.NewReader(reader, info.EncryptedSize(), "", "") // do not try to verify encrypted content
hashReader, err = hash.NewReader(reader, info.EncryptedSize(), "", "", size) // do not try to verify encrypted content
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
@@ -1108,6 +1179,11 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
return
}
if objInfo.IsCompressed() {
// Ignore compressed ETag.
objInfo.ETag = objInfo.ETag + "-1"
}
w.Header().Set("ETag", "\""+objInfo.ETag+"\"")
if objectAPI.IsEncryptionSupported() {
if crypto.IsEncrypted(objInfo.UserDefined) {
@@ -1121,6 +1197,15 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
}
}
if hashError != nil {
if hashError == io.ErrUnexpectedEOF {
writeErrorResponse(w, ErrIncompleteBody, r.URL)
} else {
writeErrorResponse(w, toAPIErrorCode(hashError), r.URL)
}
return
}
writeSuccessResponseHeadersOnly(w)
// Get host and port from Request.RemoteAddr.
@@ -1218,6 +1303,11 @@ func (api objectAPIHandlers) NewMultipartUploadHandler(w http.ResponseWriter, r
// Ensure that metadata does not contain sensitive information
crypto.RemoveSensitiveEntries(metadata)
if objectAPI.IsCompressionSupported() && isCompressible(r.Header, object) {
// Storing the compression metadata.
metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV1
}
newMultipartUpload := objectAPI.NewMultipartUpload
if api.CacheAPI() != nil && !hasServerSideEncryptionHeader(r.Header) {
newMultipartUpload = api.CacheAPI().NewMultipartUpload
@@ -1325,6 +1415,9 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
defer gr.Close()
srcInfo := gr.ObjInfo
var actualPartSize int64
actualPartSize = srcInfo.Size
// Special care for CopyObjectPart
if partRangeErr := checkCopyPartRangeWithSize(rs, srcInfo.Size); partRangeErr != nil {
writeCopyPartErr(w, partRangeErr, r.URL)
@@ -1339,6 +1432,10 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
// Get the object offset & length
startOffset, length, _ := rs.GetOffsetLength(srcInfo.Size)
if rangeHeader != "" {
actualPartSize = length
}
if objectAPI.IsEncryptionSupported() {
if crypto.IsEncrypted(srcInfo.UserDefined) {
decryptedSize, decryptErr := srcInfo.DecryptedSize()
@@ -1356,14 +1453,42 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
return
}
var reader io.Reader = gr
var reader io.Reader
var getLength = length
srcInfo.Reader, err = hash.NewReader(reader, length, "", "")
// Need to decompress only for range-enabled copy parts.
if srcInfo.IsCompressed() && rangeHeader != "" {
var sreader io.Reader
var swriter io.Writer
// Open a pipe for compression.
// Where snappyWriter is piped to srcInfo.Reader.
// gr writes to snappyWriter.
snappyReader, snappyWriter := io.Pipe()
reader = snappyReader
length = -1
swriter = snappy.NewWriter(snappyWriter)
sreader = gr
go func() {
defer snappyWriter.Close()
// Compress the decompressed source object.
if _, err = io.Copy(swriter, sreader); err != nil {
return
}
}()
} else {
reader = gr
}
srcInfo.Reader, err = hash.NewReader(reader, length, "", "", actualPartSize)
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
if objectAPI.IsEncryptionSupported() {
if objectAPI.IsEncryptionSupported() && !srcInfo.IsCompressed() {
var li ListPartsInfo
li, err = objectAPI.ListObjectParts(ctx, dstBucket, dstObject, uploadID, 0, 1)
if err != nil {
@@ -1404,7 +1529,7 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
info := ObjectInfo{Size: length}
size := info.EncryptedSize()
srcInfo.Reader, err = hash.NewReader(reader, size, "", "")
srcInfo.Reader, err = hash.NewReader(reader, size, "", "", actualPartSize)
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
@@ -1550,9 +1675,48 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
}
}
hashReader, err := hash.NewReader(reader, size, md5hex, sha256hex)
actualSize := size
var pipeReader *io.PipeReader
var pipeWriter *io.PipeWriter
var li ListPartsInfo
li, err = objectAPI.ListObjectParts(ctx, bucket, object, uploadID, 0, 1)
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
// Read compression metadata preserved in the init multipart for the decision.
_, compressPart := li.UserDefined[ReservedMetadataPrefix+"compression"]
isCompressed := false
if objectAPI.IsCompressionSupported() && compressPart {
pipeReader, pipeWriter = io.Pipe()
snappyWriter := snappy.NewWriter(pipeWriter)
actualReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize)
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
go func() {
defer pipeWriter.Close()
defer snappyWriter.Close()
// Writing to the compressed writer.
_, err = io.CopyN(snappyWriter, actualReader, actualSize)
if err != nil {
// The ErrorResponse is already written in putObjectPart Handle.
return
}
}()
// Set compression metrics.
size = -1 // Since compressed size is un-predictable.
md5hex = "" // Do not try to verify the content.
sha256hex = ""
reader = pipeReader
isCompressed = true
}
hashReader, err := hash.NewReader(reader, size, md5hex, sha256hex, actualSize)
if err != nil {
// Verify if the underlying error is signature mismatch.
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
@@ -1566,7 +1730,7 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
}
}
if objectAPI.IsEncryptionSupported() {
if objectAPI.IsEncryptionSupported() && !isCompressed {
var li ListPartsInfo
li, err = objectAPI.ListObjectParts(ctx, bucket, object, uploadID, 0, 1)
if err != nil {
@@ -1609,7 +1773,7 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
}
info := ObjectInfo{Size: size}
hashReader, err = hash.NewReader(reader, info.EncryptedSize(), "", "") // do not try to verify encrypted content
hashReader, err = hash.NewReader(reader, info.EncryptedSize(), "", "", size) // do not try to verify encrypted content
if err != nil {
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
@@ -1627,6 +1791,11 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
return
}
if isCompressed {
pipeWriter.Close()
// Suppress compressed ETag.
partInfo.ETag = partInfo.ETag + "-1"
}
if partInfo.ETag != "" {
w.Header().Set("ETag", "\""+partInfo.ETag+"\"")
}
@@ -1765,6 +1934,11 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
// Complete parts.
var completeParts []CompletePart
for _, part := range complMultipartUpload.Parts {
// Avoiding for gateway parts.
// `strings.TrimPrefix` does not work here as intended. So `Replace` is used instead.
if objectAPI.IsCompressionSupported() {
part.ETag = strings.Replace(part.ETag, "-1", "", -1) // For compressed multiparts, We append '-1' for part.ETag.
}
part.ETag = canonicalizeETag(part.ETag)
completeParts = append(completeParts, part)
}

View File

@@ -1411,7 +1411,7 @@ func testAPICopyObjectPartHandlerSanity(obj ObjectLayer, instanceType, bucketNam
}
a := 0
b := globalMinPartSize - 1
b := globalMinPartSize
var parts []CompletePart
for partNumber := 1; partNumber <= 2; partNumber++ {
// initialize HTTP NewRecorder, this records any mutations to response writer inside the handler.
@@ -1431,7 +1431,7 @@ func testAPICopyObjectPartHandlerSanity(obj ObjectLayer, instanceType, bucketNam
// Since `apiRouter` satisfies `http.Handler` it has a ServeHTTP to execute the logic of the handler.
// Call the ServeHTTP to execute the handler, `func (api objectAPIHandlers) CopyObjectHandler` handles the request.
a = globalMinPartSize
a = globalMinPartSize + 1
b = len(bytesData[0].byteData) - 1
apiRouter.ServeHTTP(rec, req)
if rec.Code != http.StatusOK {

View File

@@ -918,7 +918,7 @@ func (s *posix) createFile(volume, path string) (f *os.File, err error) {
// Currently we use fallocate when available to avoid disk fragmentation as much as possible
func (s *posix) PrepareFile(volume, path string, fileSize int64) (err error) {
// It doesn't make sense to create a negative-sized file
if fileSize <= 0 {
if fileSize < -1 {
return errInvalidArgument
}
@@ -949,8 +949,11 @@ func (s *posix) PrepareFile(volume, path string, fileSize int64) (err error) {
// Close upon return.
defer w.Close()
// Allocate needed disk space to append data
e := Fallocate(int(w.Fd()), 0, fileSize)
var e error
if fileSize > 0 {
// Allocate needed disk space to append data
e = Fallocate(int(w.Fd()), 0, fileSize)
}
// Ignore errors when Fallocate is not supported in the current system
if e != nil && !isSysErrNoSys(e) && !isSysErrOpNotSupported(e) {

View File

@@ -2728,6 +2728,8 @@ func (s *TestSuiteCommon) TestObjectMultipart(c *check) {
c.Assert(response.StatusCode, http.StatusOK)
var parts []CompletePart
for _, part := range completeUploads.Parts {
// For compressed objects, we dont treat E-Tag as checksum.
part.ETag = strings.Replace(part.ETag, "-1", "", -1)
part.ETag = canonicalizeETag(part.ETag)
parts = append(parts, part)
}

View File

@@ -137,7 +137,7 @@ func calculateSignedChunkLength(chunkDataSize int64) int64 {
}
func mustGetHashReader(t TestErrHandler, data io.Reader, size int64, md5hex, sha256hex string) *hash.Reader {
hr, err := hash.NewReader(data, size, md5hex, sha256hex)
hr, err := hash.NewReader(data, size, md5hex, sha256hex, size)
if err != nil {
t.Fatal(err)
}

View File

@@ -73,3 +73,6 @@ var errFirstDiskWait = errors.New("Waiting on other disks")
// error returned when a bucket already exists
var errBucketAlreadyExists = errors.New("Your previous request to create the named bucket succeeded and you already own it")
// error returned for a negative actual size.
var errInvalidDecompressedSize = errors.New("Invalid Decompressed Size")

View File

@@ -197,4 +197,10 @@ Example 1:
"Please contact Minio at https://slack.minio.io",
"",
)
uiErrInvalidCompressionIncludesValue = newUIErrFn(
"Invalid compression include value",
"Please check the passed value",
"Compress extensions/mime-types are delimited by `,`. For eg, MINIO_COMPRESS_ATTR=\"A,B,C\"",
)
)

View File

@@ -29,9 +29,11 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"time"
humanize "github.com/dustin/go-humanize"
snappy "github.com/golang/snappy"
"github.com/gorilla/mux"
"github.com/gorilla/rpc/v2/json2"
miniogopolicy "github.com/minio/minio-go/pkg/policy"
@@ -648,7 +650,39 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
return
}
hashReader, err := hash.NewReader(r.Body, size, "", "")
reader := r.Body
var hashError error
actualSize := size
if objectAPI.IsCompressionSupported() && isCompressible(r.Header, object) && size > 0 {
// Storing the compression metadata.
metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV1
metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(size, 10)
pipeReader, pipeWriter := io.Pipe()
snappyWriter := snappy.NewWriter(pipeWriter)
actualReader, err := hash.NewReader(reader, size, "", "", actualSize)
if err != nil {
writeWebErrorResponse(w, err)
return
}
go func() {
defer pipeWriter.Close()
defer snappyWriter.Close()
// Writing to the compressed writer.
_, err = io.CopyN(snappyWriter, actualReader, actualSize)
if err != nil {
hashError = err
return
}
}()
// Set compression metrics.
size = -1 // Since compressed size is un-predictable.
reader = pipeReader
}
hashReader, err := hash.NewReader(reader, size, "", "", actualSize)
if err != nil {
writeWebErrorResponse(w, err)
return
@@ -668,6 +702,11 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
return
}
if hashError != nil {
writeWebErrorResponse(w, hashError)
return
}
// Notify object created event.
sendEvent(eventArgs{
EventName: event.ObjectCreatedPut,
@@ -679,6 +718,7 @@ func (web *webAPIHandlers) Upload(w http.ResponseWriter, r *http.Request) {
// Download - file download handler.
func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) {
var wg sync.WaitGroup
objectAPI := web.ObjectAPI()
if objectAPI == nil {
writeWebErrorResponse(w, errServerNotInitialized)
@@ -703,22 +743,28 @@ func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) {
return
}
}
opts := ObjectOptions{}
getObjectInfo := objectAPI.GetObjectInfo
getObject := objectAPI.GetObject
if web.CacheAPI() != nil {
getObject = web.CacheAPI().GetObject
}
getObjectInfo := objectAPI.GetObjectInfo
if web.CacheAPI() != nil {
getObjectInfo = web.CacheAPI().GetObjectInfo
getObject = web.CacheAPI().GetObject
}
objInfo, err := getObjectInfo(context.Background(), bucket, object, opts)
if err != nil {
writeWebErrorResponse(w, err)
return
}
length := objInfo.Size
var actualSize int64
if objInfo.IsCompressed() {
// Read the decompressed size from the meta.json.
actualSize = objInfo.GetActualSize()
if actualSize < 0 {
return
}
}
if objectAPI.IsEncryptionSupported() {
if _, err = DecryptObjectInfo(objInfo, r.Header); err != nil {
writeWebErrorResponse(w, err)
@@ -730,7 +776,37 @@ func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) {
}
var startOffset int64
var writer io.Writer
writer = w
var decompressReader *io.PipeReader
var compressWriter *io.PipeWriter
if objInfo.IsCompressed() {
var pipeErr error
// The decompress metrics are set.
snappyStartOffset := 0
snappyLength := actualSize
// Open a pipe for compression
// Where compressWriter is actually passed to the getObject
decompressReader, compressWriter = io.Pipe()
snappyReader := snappy.NewReader(decompressReader)
// The limit is set to the actual size.
responseWriter := ioutil.LimitedWriter(w, int64(snappyStartOffset), snappyLength)
wg.Add(1) //For closures.
go func() {
defer wg.Done()
// Finally, writes to the client.
if _, pipeErr = io.Copy(responseWriter, snappyReader); pipeErr != nil {
return
}
// Close the compressWriter if the data is read already.
// Closing the pipe, releases the writer passed to the getObject.
compressWriter.Close()
}()
writer = compressWriter
} else {
writer = w
}
if objectAPI.IsEncryptionSupported() && crypto.S3.IsEncrypted(objInfo.UserDefined) {
// Response writer should be limited early on for decryption upto required length,
// additionally also skipping mod(offset)64KiB boundaries.
@@ -743,12 +819,17 @@ func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) {
}
w.Header().Set(crypto.SSEHeader, crypto.SSEAlgorithmAES256)
}
httpWriter := ioutil.WriteOnClose(writer)
// Add content disposition.
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s\"", path.Base(object)))
if err = getObject(context.Background(), bucket, object, 0, -1, httpWriter, "", opts); err != nil {
httpWriter.Close()
if objInfo.IsCompressed() {
wg.Wait()
}
/// No need to print error, response writer already written to.
return
}
@@ -758,6 +839,10 @@ func (web *webAPIHandlers) Download(w http.ResponseWriter, r *http.Request) {
return
}
}
if objInfo.IsCompressed() {
// Wait for decompression go-routine to retire.
wg.Wait()
}
}
// DownloadZipArgs - Argument for downloading a bunch of files as a zip file.
@@ -771,6 +856,7 @@ type DownloadZipArgs struct {
// Takes a list of objects and creates a zip file that sent as the response body.
func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) {
var wg sync.WaitGroup
objectAPI := web.ObjectAPI()
if objectAPI == nil {
writeWebErrorResponse(w, errServerNotInitialized)
@@ -818,14 +904,17 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) {
getObjectInfo = web.CacheAPI().GetObjectInfo
}
opts := ObjectOptions{}
var length int64
for _, object := range args.Objects {
var decompressReader *io.PipeReader
var compressWriter *io.PipeWriter
// Writes compressed object file to the response.
zipit := func(objectName string) error {
info, err := getObjectInfo(context.Background(), args.BucketName, objectName, opts)
if err != nil {
return err
}
length := info.Size
length = info.Size
if objectAPI.IsEncryptionSupported() {
if _, err = DecryptObjectInfo(info, r.Header); err != nil {
writeWebErrorResponse(w, err)
@@ -835,20 +924,57 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) {
length, _ = info.DecryptedSize()
}
}
length = info.Size
var actualSize int64
if info.IsCompressed() {
// Read the decompressed size from the meta.json.
actualSize = info.GetActualSize()
// Set the info.Size to the actualSize.
info.Size = actualSize
}
header := &zip.FileHeader{
Name: strings.TrimPrefix(objectName, args.Prefix),
Method: zip.Deflate,
UncompressedSize64: uint64(length),
UncompressedSize: uint32(length),
}
wr, err := archive.CreateHeader(header)
zipWriter, err := archive.CreateHeader(header)
if err != nil {
writeWebErrorResponse(w, errUnexpected)
return err
}
var startOffset int64
var writer io.Writer
writer = wr
if info.IsCompressed() {
var pipeErr error
// The decompress metrics are set.
snappyStartOffset := 0
snappyLength := actualSize
// Open a pipe for compression
// Where compressWriter is actually passed to the getObject
decompressReader, compressWriter = io.Pipe()
snappyReader := snappy.NewReader(decompressReader)
// The limit is set to the actual size.
responseWriter := ioutil.LimitedWriter(zipWriter, int64(snappyStartOffset), snappyLength)
wg.Add(1) //For closures.
go func() {
defer wg.Done()
// Finally, writes to the client.
if _, pipeErr = io.Copy(responseWriter, snappyReader); pipeErr != nil {
return
}
// Close the compressWriter if the data is read already.
// Closing the pipe, releases the writer passed to the getObject.
compressWriter.Close()
}()
writer = compressWriter
} else {
writer = zipWriter
}
if objectAPI.IsEncryptionSupported() && crypto.S3.IsEncrypted(info.UserDefined) {
// Response writer should be limited early on for decryption upto required length,
// additionally also skipping mod(offset)64KiB boundaries.
@@ -861,6 +987,11 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) {
}
httpWriter := ioutil.WriteOnClose(writer)
if err = getObject(context.Background(), args.BucketName, objectName, 0, length, httpWriter, "", opts); err != nil {
httpWriter.Close()
if info.IsCompressed() {
// Wait for decompression go-routine to retire.
wg.Wait()
}
return err
}
if err = httpWriter.Close(); err != nil {
@@ -869,6 +1000,10 @@ func (web *webAPIHandlers) DownloadZip(w http.ResponseWriter, r *http.Request) {
return err
}
}
if info.IsCompressed() {
// Wait for decompression go-routine to retire.
wg.Wait()
}
return nil
}

View File

@@ -519,6 +519,11 @@ func (s *xlSets) IsEncryptionSupported() bool {
return s.getHashedSet("").IsEncryptionSupported()
}
// IsCompressionSupported returns whether compression is applicable for this layer.
func (s *xlSets) IsCompressionSupported() bool {
return s.getHashedSet("").IsCompressionSupported()
}
// DeleteBucket - deletes a bucket on all sets simultaneously,
// even if one of the sets fail to delete buckets, we proceed to
// undo a successful operation.

View File

@@ -299,3 +299,8 @@ func (xl xlObjects) IsNotificationSupported() bool {
func (xl xlObjects) IsEncryptionSupported() bool {
return true
}
// IsCompressionSupported returns whether compression is applicable for this layer.
func (xl xlObjects) IsCompressionSupported() bool {
return true
}

View File

@@ -34,10 +34,11 @@ const erasureAlgorithmKlauspost = "klauspost/reedsolomon/vandermonde"
// objectPartInfo Info of each part kept in the multipart metadata
// file after CompleteMultipartUpload() is called.
type objectPartInfo struct {
Number int `json:"number"`
Name string `json:"name"`
ETag string `json:"etag"`
Size int64 `json:"size"`
Number int `json:"number"`
Name string `json:"name"`
ETag string `json:"etag"`
Size int64 `json:"size"`
ActualSize int64 `json:"actualSize"`
}
// byObjectPartNumber is a collection satisfying sort.Interface.
@@ -250,12 +251,13 @@ func objectPartIndex(parts []objectPartInfo, partNumber int) int {
}
// AddObjectPart - add a new object part in order.
func (m *xlMetaV1) AddObjectPart(partNumber int, partName string, partETag string, partSize int64) {
func (m *xlMetaV1) AddObjectPart(partNumber int, partName string, partETag string, partSize int64, actualSize int64) {
partInfo := objectPartInfo{
Number: partNumber,
Name: partName,
ETag: partETag,
Size: partSize,
Number: partNumber,
Name: partName,
ETag: partETag,
Size: partSize,
ActualSize: actualSize,
}
// Update part info if it already exists.

View File

@@ -28,6 +28,8 @@ import (
humanize "github.com/dustin/go-humanize"
)
const ActualSize = 1000
// Tests for reading XL object info.
func TestXLReadStat(t *testing.T) {
ExecObjectLayerDiskAlteredTest(t, testXLReadStat)
@@ -213,7 +215,7 @@ func TestAddObjectPart(t *testing.T) {
for _, testCase := range testCases {
if testCase.expectedIndex > -1 {
partNumString := strconv.Itoa(testCase.partNum)
xlMeta.AddObjectPart(testCase.partNum, "part."+partNumString, "etag."+partNumString, int64(testCase.partNum+humanize.MiByte))
xlMeta.AddObjectPart(testCase.partNum, "part."+partNumString, "etag."+partNumString, int64(testCase.partNum+humanize.MiByte), ActualSize)
}
if index := objectPartIndex(xlMeta.Parts, testCase.partNum); index != testCase.expectedIndex {
@@ -245,7 +247,7 @@ func TestObjectPartIndex(t *testing.T) {
// Add some parts for testing.
for _, testCase := range testCases {
partNumString := strconv.Itoa(testCase.partNum)
xlMeta.AddObjectPart(testCase.partNum, "part."+partNumString, "etag."+partNumString, int64(testCase.partNum+humanize.MiByte))
xlMeta.AddObjectPart(testCase.partNum, "part."+partNumString, "etag."+partNumString, int64(testCase.partNum+humanize.MiByte), ActualSize)
}
// Add failure test case.
@@ -274,7 +276,7 @@ func TestObjectToPartOffset(t *testing.T) {
// Total size of all parts is 5,242,899 bytes.
for _, partNum := range []int{1, 2, 4, 5, 7} {
partNumString := strconv.Itoa(partNum)
xlMeta.AddObjectPart(partNum, "part."+partNumString, "etag."+partNumString, int64(partNum+humanize.MiByte))
xlMeta.AddObjectPart(partNum, "part."+partNumString, "etag."+partNumString, int64(partNum+humanize.MiByte), ActualSize)
}
testCases := []struct {

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"path"
"sort"
"strconv"
"strings"
"sync"
"time"
@@ -289,7 +290,7 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
}
// Validate input data size and it can never be less than zero.
if data.Size() < 0 {
if data.Size() < -1 {
logger.LogIf(ctx, errInvalidArgument)
return pi, toObjectErr(errInvalidArgument)
}
@@ -348,7 +349,7 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
// Delete the temporary object part. If PutObjectPart succeeds there would be nothing to delete.
defer xl.deleteObject(ctx, minioMetaTmpBucket, tmpPart, writeQuorum, false)
if data.Size() > 0 {
if data.Size() > 0 || data.Size() == -1 {
if pErr := xl.prepareFile(ctx, minioMetaTmpBucket, tmpPartPath, data.Size(), onlineDisks, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, writeQuorum); err != nil {
return pi, toObjectErr(pErr, bucket, object)
@@ -365,12 +366,12 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
switch size := data.Size(); {
case size == 0:
buffer = make([]byte, 1) // Allocate atleast a byte to reach EOF
case size == -1 || size > blockSizeV1:
buffer = xl.bp.Get()
defer xl.bp.Put(buffer)
case size < blockSizeV1:
// No need to allocate fully blockSizeV1 buffer if the incoming data is smaller.
buffer = make([]byte, size, 2*size)
default:
buffer = xl.bp.Get()
defer xl.bp.Put(buffer)
}
if len(buffer) > int(xlMeta.Erasure.BlockSize) {
@@ -442,7 +443,7 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
md5hex := hex.EncodeToString(data.MD5Current())
// Add the current part.
xlMeta.AddObjectPart(partID, partSuffix, md5hex, n)
xlMeta.AddObjectPart(partID, partSuffix, md5hex, n, data.ActualSize())
for i, disk := range onlineDisks {
if disk == OfflineDisk {
@@ -477,6 +478,7 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
LastModified: fi.ModTime,
ETag: md5hex,
Size: fi.Size,
ActualSize: data.ActualSize(),
}, nil
}
@@ -630,6 +632,9 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
// Calculate full object size.
var objectSize int64
// Calculate consolidated actual size.
var objectActualSize int64
// Pick one from the first valid metadata.
xlMeta, err := pickValidXLMeta(ctx, partsMetadata, modTime, writeQuorum)
if err != nil {
@@ -673,15 +678,15 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
}
// All parts except the last part has to be atleast 5MB.
if (i < len(parts)-1) && !isMinAllowedPartSize(currentXLMeta.Parts[partIdx].Size) {
if (i < len(parts)-1) && !isMinAllowedPartSize(currentXLMeta.Parts[partIdx].ActualSize) {
logger.LogIf(ctx, PartTooSmall{
PartNumber: part.PartNumber,
PartSize: currentXLMeta.Parts[partIdx].Size,
PartSize: currentXLMeta.Parts[partIdx].ActualSize,
PartETag: part.ETag,
})
return oi, PartTooSmall{
PartNumber: part.PartNumber,
PartSize: currentXLMeta.Parts[partIdx].Size,
PartSize: currentXLMeta.Parts[partIdx].ActualSize,
PartETag: part.ETag,
}
}
@@ -696,12 +701,16 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
// Save for total object size.
objectSize += currentXLMeta.Parts[partIdx].Size
// Save the consolidated actual size.
objectActualSize += currentXLMeta.Parts[partIdx].ActualSize
// Add incoming parts.
xlMeta.Parts[i] = objectPartInfo{
Number: part.PartNumber,
ETag: part.ETag,
Size: currentXLMeta.Parts[partIdx].Size,
Name: fmt.Sprintf("part.%d", part.PartNumber),
Number: part.PartNumber,
ETag: part.ETag,
Size: currentXLMeta.Parts[partIdx].Size,
Name: fmt.Sprintf("part.%d", part.PartNumber),
ActualSize: currentXLMeta.Parts[partIdx].ActualSize,
}
}
@@ -712,6 +721,9 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
// Save successfully calculated md5sum.
xlMeta.Meta["etag"] = s3MD5
// Save the consolidated actual size.
xlMeta.Meta[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(objectActualSize, 10)
tempUploadIDPath := uploadID
// Update all xl metadata, make sure to not modify fields like

View File

@@ -147,7 +147,7 @@ func (xl xlObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBuc
pipeWriter.Close() // Close writer explicitly signaling we wrote all data.
}()
hashReader, err := hash.NewReader(pipeReader, length, "", "")
hashReader, err := hash.NewReader(pipeReader, length, "", "", length)
if err != nil {
logger.LogIf(ctx, err)
return oi, toObjectErr(err, dstBucket, dstObject)
@@ -646,7 +646,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
}
// Validate input data size and it can never be less than zero.
if data.Size() < 0 {
if data.Size() < -1 {
logger.LogIf(ctx, errInvalidArgument)
return ObjectInfo{}, toObjectErr(errInvalidArgument)
}
@@ -687,12 +687,12 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
switch size := data.Size(); {
case size == 0:
buffer = make([]byte, 1) // Allocate atleast a byte to reach EOF
case size == -1 || size > blockSizeV1:
buffer = xl.bp.Get()
defer xl.bp.Put(buffer)
case size < blockSizeV1:
// No need to allocate fully blockSizeV1 buffer if the incoming data is smaller.
buffer = make([]byte, size, 2*size)
default:
buffer = xl.bp.Get()
defer xl.bp.Put(buffer)
}
if len(buffer) > int(xlMeta.Erasure.BlockSize) {
@@ -716,7 +716,8 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
// Hint the filesystem to pre-allocate one continuous large block.
// This is only an optimization.
var curPartReader io.Reader
if curPartSize > 0 {
if curPartSize >= 0 {
pErr := xl.prepareFile(ctx, minioMetaTmpBucket, tempErasureObj, curPartSize, onlineDisks, xlMeta.Erasure.BlockSize, xlMeta.Erasure.DataBlocks, writeQuorum)
if pErr != nil {
return ObjectInfo{}, toObjectErr(pErr, bucket, object)
@@ -743,11 +744,23 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
// Should return IncompleteBody{} error when reader has fewer bytes
// than specified in request header.
if n < curPartSize {
if n < curPartSize && data.Size() > 0 {
logger.LogIf(ctx, IncompleteBody{})
return ObjectInfo{}, IncompleteBody{}
}
if n == 0 && data.Size() == -1 {
// The last part of a compressed object will always be empty
// Since the compressed size is unpredictable.
// Hence removing the last (empty) part from all `xl.disks`.
dErr := xl.deleteObject(ctx, minioMetaTmpBucket, tempErasureObj, writeQuorum, true)
if dErr != nil {
return ObjectInfo{}, toObjectErr(dErr, minioMetaTmpBucket, tempErasureObj)
}
break
}
// Update the total written size
sizeWritten += n
@@ -756,7 +769,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string,
onlineDisks[i] = nil
continue
}
partsMetadata[i].AddObjectPart(partIdx, partName, "", n)
partsMetadata[i].AddObjectPart(partIdx, partName, "", n, data.ActualSize())
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{partName, DefaultBitrotAlgorithm, w.Sum()})
}

View File

@@ -202,6 +202,7 @@ func parseXLParts(xlMetaBuf []byte) []objectPartInfo {
info.Name = p.Get("name").String()
info.ETag = p.Get("etag").String()
info.Size = p.Get("size").Int()
info.ActualSize = p.Get("actualSize").Int()
partInfo[i] = info
}
return partInfo
@@ -410,7 +411,7 @@ var (
// calculatePartSizeFromIdx calculates the part size according to input index.
// returns error if totalSize is -1, partSize is 0, partIndex is 0.
func calculatePartSizeFromIdx(ctx context.Context, totalSize int64, partSize int64, partIndex int) (currPartSize int64, err error) {
if totalSize < 0 {
if totalSize < -1 {
logger.LogIf(ctx, errInvalidArgument)
return 0, errInvalidArgument
}

View File

@@ -379,7 +379,7 @@ func TestGetPartSizeFromIdx(t *testing.T) {
// partIndex is 0, returns error.
{10, 1, 0, errPartSizeIndex},
// Total size is -1, returns error.
{-1, 10, 1, errInvalidArgument},
{-2, 10, 1, errInvalidArgument},
}
for i, testCaseFailure := range testCasesFailure {