From d8bda2dd92ff29e2ff712e6cea84966c47d23799 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 26 Mar 2021 17:15:09 -0700 Subject: [PATCH] [feat] Add targz transparent extract support (#11849) This feature brings in support for auto extraction of objects onto MinIO's namespace from an incoming tar gzipped stream, the only expected metadata sent by the client is to set `snowball-auto-extract`. All the contents from the tar stream are saved as folders and objects on the namespace. fixes #8715 --- cmd/api-router.go | 5 + cmd/format_string.go | 28 +++++ cmd/http/headers.go | 2 + cmd/object-handlers.go | 279 ++++++++++++++++++++++++++++++++++++++++- cmd/untar.go | 156 +++++++++++++++++++++++ 5 files changed, 469 insertions(+), 1 deletion(-) create mode 100644 cmd/format_string.go create mode 100644 cmd/untar.go diff --git a/cmd/api-router.go b/cmd/api-router.go index 81f3159e8..88ba6e16f 100644 --- a/cmd/api-router.go +++ b/cmd/api-router.go @@ -178,9 +178,14 @@ func registerAPIRouter(router *mux.Router) { bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc( collectAPIStats("putobjectlegalhold", maxClients(httpTraceAll(api.PutObjectLegalHoldHandler)))).Queries("legal-hold", "") + // PutObject with auto-extract support for zip + bucket.Methods(http.MethodPut).Path("/{object:.+}").HeadersRegexp(xhttp.AmzSnowballExtract, "true").HandlerFunc( + collectAPIStats("putobject", maxClients(httpTraceHdrs(api.PutObjectExtractHandler)))) + // PutObject bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc( collectAPIStats("putobject", maxClients(httpTraceHdrs(api.PutObjectHandler)))) + // DeleteObject bucket.Methods(http.MethodDelete).Path("/{object:.+}").HandlerFunc( collectAPIStats("deleteobject", maxClients(httpTraceAll(api.DeleteObjectHandler)))) diff --git a/cmd/format_string.go b/cmd/format_string.go new file mode 100644 index 000000000..ce068152a --- /dev/null +++ b/cmd/format_string.go @@ -0,0 +1,28 @@ +// Code generated by "stringer -type=format -trimprefix=format untar.go"; DO NOT EDIT. + +package cmd + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[formatUnknown-0] + _ = x[formatGzip-1] + _ = x[formatZstd-2] + _ = x[formatLZ4-3] + _ = x[formatS2-4] + _ = x[formatBZ2-5] +} + +const _format_name = "UnknownGzipZstdLZ4S2BZ2" + +var _format_index = [...]uint8{0, 7, 11, 15, 18, 20, 23} + +func (i format) String() string { + if i < 0 || i >= format(len(_format_index)-1) { + return "format(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _format_name[_format_index[i]:_format_index[i+1]] +} diff --git a/cmd/http/headers.go b/cmd/http/headers.go index 3a04cd15e..2fb24afea 100644 --- a/cmd/http/headers.go +++ b/cmd/http/headers.go @@ -88,6 +88,8 @@ const ( AmzObjectLockLegalHold = "X-Amz-Object-Lock-Legal-Hold" AmzObjectLockBypassGovernance = "X-Amz-Bypass-Governance-Retention" AmzBucketReplicationStatus = "X-Amz-Replication-Status" + AmzSnowballExtract = "X-Amz-Meta-Snowball-Auto-Extract" + // Multipart parts count AmzMpPartsCount = "x-amz-mp-parts-count" diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index fabaeb4cf..1a39eadbf 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -19,17 +19,18 @@ package cmd import ( "bufio" "context" + "encoding/hex" "encoding/xml" "fmt" "io" "net/http" "net/http/httptest" "net/url" + "os" "sort" "strconv" "strings" "sync" - "time" "github.com/google/uuid" @@ -113,6 +114,7 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrBadRequest), r.URL, guessIsBrowserReq(r)) return } + vars := mux.Vars(r) bucket := vars["bucket"] object, err := unescapePath(vars["object"]) @@ -1672,6 +1674,281 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req }) } +// PutObjectExtractHandler - PUT Object extract is an extended API +// based off from AWS Snowball feature to auto extract compressed +// stream will be extracted in the same directory it is stored in +// and the folder structures will be built out accordingly. +func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "PutObjectExtract") + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI := api.ObjectAPI() + if objectAPI == nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r)) + return + } + + if crypto.S3KMS.IsRequested(r.Header) { // SSE-KMS is not supported + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r)) + return + } + + if _, ok := crypto.IsRequested(r.Header); !objectAPI.IsEncryptionSupported() && ok { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r)) + return + } + + vars := mux.Vars(r) + bucket := vars["bucket"] + object, err := unescapePath(vars["object"]) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + // X-Amz-Copy-Source shouldn't be set for this call. + if _, ok := r.Header[xhttp.AmzCopySource]; ok { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopySource), r.URL, guessIsBrowserReq(r)) + return + } + + // Validate storage class metadata if present + sc := r.Header.Get(xhttp.AmzStorageClass) + if sc != "" { + if !storageclass.IsValid(sc) { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidStorageClass), r.URL, guessIsBrowserReq(r)) + return + } + } + + clientETag, err := etag.FromContentMD5(r.Header) + if err != nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidDigest), r.URL, guessIsBrowserReq(r)) + return + } + + /// if Content-Length is unknown/missing, deny the request + size := r.ContentLength + rAuthType := getRequestAuthType(r) + if rAuthType == authTypeStreamingSigned { + if sizeStr, ok := r.Header[xhttp.AmzDecodedContentLength]; ok { + if sizeStr[0] == "" { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL, guessIsBrowserReq(r)) + return + } + size, err = strconv.ParseInt(sizeStr[0], 10, 64) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + } + } + + if size == -1 { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL, guessIsBrowserReq(r)) + return + } + + /// maximum Upload size for objects in a single operation + if isMaxObjectSize(size) { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEntityTooLarge), r.URL, guessIsBrowserReq(r)) + return + } + + var ( + md5hex = clientETag.String() + sha256hex = "" + reader io.Reader = r.Body + s3Err APIErrorCode + putObject = objectAPI.PutObject + ) + + // Check if put is allowed + if s3Err = isPutActionAllowed(ctx, rAuthType, bucket, object, r, iampolicy.PutObjectAction); s3Err != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r)) + return + } + + switch rAuthType { + case authTypeStreamingSigned: + // Initialize stream signature verifier. + reader, s3Err = newSignV4ChunkedReader(r) + if s3Err != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r)) + return + } + case authTypeSignedV2, authTypePresignedV2: + s3Err = isReqAuthenticatedV2(r) + if s3Err != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r)) + return + } + + case authTypePresigned, authTypeSigned: + if s3Err = reqSignatureV4Verify(r, globalServerRegion, serviceS3); s3Err != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r)) + return + } + if !skipContentSha256Cksum(r) { + sha256hex = getContentSha256Cksum(r, serviceS3) + } + } + + hreader, err := hash.NewReader(reader, size, md5hex, sha256hex, size) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + if err := enforceBucketQuota(ctx, bucket, size); err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + // Check if bucket encryption is enabled + _, err = globalBucketSSEConfigSys.Get(bucket) + // This request header needs to be set prior to setting ObjectOptions + if (globalAutoEncryption || err == nil) && !crypto.SSEC.IsRequested(r.Header) { + r.Header.Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionAES) + } + + retPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, iampolicy.PutObjectRetentionAction) + holdPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, iampolicy.PutObjectLegalHoldAction) + + if api.CacheAPI() != nil { + putObject = api.CacheAPI().PutObject + } + + getObjectInfo := objectAPI.GetObjectInfo + if api.CacheAPI() != nil { + getObjectInfo = api.CacheAPI().GetObjectInfo + } + + putObjectTar := func(reader io.Reader, info os.FileInfo, object string) { + size := info.Size() + metadata := map[string]string{ + xhttp.AmzStorageClass: sc, + } + + actualSize := size + if objectAPI.IsCompressionSupported() && isCompressible(r.Header, object) && size > 0 { + // Storing the compression metadata. + metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV2 + metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(size, 10) + + actualReader, err := hash.NewReader(reader, size, "", "", actualSize) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + // Set compression metrics. + s2c := newS2CompressReader(actualReader, actualSize) + defer s2c.Close() + reader = etag.Wrap(s2c, actualReader) + size = -1 // Since compressed size is un-predictable. + } + + hashReader, err := hash.NewReader(reader, size, "", "", actualSize) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + rawReader := hashReader + pReader := NewPutObjReader(rawReader) + + // get encryption options + opts, err := putOpts(ctx, r, bucket, object, metadata) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + opts.MTime = info.ModTime() + + retentionMode, retentionDate, legalHold, s3Err := checkPutObjectLockAllowed(ctx, r, bucket, object, getObjectInfo, retPerms, holdPerms) + if s3Err == ErrNone && retentionMode.Valid() { + metadata[strings.ToLower(xhttp.AmzObjectLockMode)] = string(retentionMode) + metadata[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = retentionDate.UTC().Format(iso8601TimeFormat) + } + + if s3Err == ErrNone && legalHold.Status.Valid() { + metadata[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = string(legalHold.Status) + } + + if s3Err != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r)) + return + } + + if ok, _ := mustReplicate(ctx, r, bucket, object, metadata, ""); ok { + metadata[xhttp.AmzBucketReplicationStatus] = replication.Pending.String() + } + + if r.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String() { + if s3Err = isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, iampolicy.ReplicateObjectAction); s3Err != ErrNone { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r)) + return + } + } + + var objectEncryptionKey crypto.ObjectKey + if objectAPI.IsEncryptionSupported() { + if _, ok := crypto.IsRequested(r.Header); ok && !HasSuffix(object, SlashSeparator) { // handle SSE requests + if crypto.SSECopy.IsRequested(r.Header) { + writeErrorResponse(ctx, w, toAPIError(ctx, errInvalidEncryptionParameters), r.URL, guessIsBrowserReq(r)) + return + } + + reader, objectEncryptionKey, err = EncryptRequest(hashReader, r, bucket, object, metadata) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + wantSize := int64(-1) + if size >= 0 { + info := ObjectInfo{Size: size} + wantSize = info.EncryptedSize() + } + + // do not try to verify encrypted content + hashReader, err = hash.NewReader(etag.Wrap(reader, hashReader), wantSize, "", "", actualSize) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + pReader, err = pReader.WithEncryption(hashReader, &objectEncryptionKey) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + } + } + + // Ensure that metadata does not contain sensitive information + crypto.RemoveSensitiveEntries(metadata) + + // Create the object.. + objInfo, err := putObject(ctx, bucket, object, pReader, opts) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r)) + return + } + + if replicate, sync := mustReplicate(ctx, r, bucket, object, metadata, ""); replicate { + scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync) + } + + } + + untar(hreader, putObjectTar) + + w.Header()[xhttp.ETag] = []string{`"` + hex.EncodeToString(hreader.MD5Current()) + `"`} + writeSuccessResponseHeadersOnly(w) +} + /// Multipart objectAPIHandlers // NewMultipartUploadHandler - New multipart upload. diff --git a/cmd/untar.go b/cmd/untar.go new file mode 100644 index 000000000..e862ba034 --- /dev/null +++ b/cmd/untar.go @@ -0,0 +1,156 @@ +/* + * MinIO Cloud Storage, (C) 2021 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "archive/tar" + "bufio" + "bytes" + "compress/bzip2" + "fmt" + "io" + "os" + "path" + + "github.com/klauspost/compress/s2" + "github.com/klauspost/compress/zstd" + gzip "github.com/klauspost/pgzip" + "github.com/pierrec/lz4" +) + +func detect(r *bufio.Reader) format { + z, err := r.Peek(4) + if err != nil { + return formatUnknown + } + for _, f := range magicHeaders { + if bytes.Equal(f.header, z[:len(f.header)]) { + return f.f + } + } + return formatUnknown +} + +//go:generate stringer -type=format -trimprefix=format $GOFILE +type format int + +const ( + formatUnknown format = iota + formatGzip + formatZstd + formatLZ4 + formatS2 + formatBZ2 +) + +var magicHeaders = []struct { + header []byte + f format +}{ + { + header: []byte{0x1f, 0x8b, 8}, + f: formatGzip, + }, + { + // Zstd default header. + header: []byte{0x28, 0xb5, 0x2f, 0xfd}, + f: formatZstd, + }, + { + // Zstd skippable frame header. + header: []byte{0x2a, 0x4d, 0x18}, + f: formatZstd, + }, + { + // LZ4 + header: []byte{0x4, 0x22, 0x4d, 0x18}, + f: formatLZ4, + }, + { + // Snappy/S2 stream + header: []byte{0xff, 0x06, 0x00, 0x00}, + f: formatS2, + }, + { + header: []byte{0x42, 0x5a, 'h'}, + f: formatBZ2, + }, +} + +func untar(r io.Reader, putObject func(reader io.Reader, info os.FileInfo, name string)) error { + bf := bufio.NewReader(r) + switch f := detect(bf); f { + case formatGzip: + gz, err := gzip.NewReader(bf) + if err != nil { + return err + } + defer gz.Close() + r = gz + case formatS2: + r = s2.NewReader(bf) + case formatZstd: + dec, err := zstd.NewReader(bf) + if err != nil { + return err + } + defer dec.Close() + r = dec + case formatBZ2: + r = bzip2.NewReader(bf) + case formatLZ4: + r = lz4.NewReader(bf) + case formatUnknown: + r = bf + default: + return fmt.Errorf("Unsupported format %s", f) + } + tarReader := tar.NewReader(r) + for { + header, err := tarReader.Next() + + switch { + + // if no more files are found return + case err == io.EOF: + return nil + + // return any other error + case err != nil: + return err + + // if the header is nil, just skip it (not sure how this happens) + case header == nil: + continue + } + + name := header.Name + if name == slashSeparator { + continue + } + + switch header.Typeflag { + case tar.TypeDir: // = directory + putObject(tarReader, header.FileInfo(), trimLeadingSlash(pathJoin(name, slashSeparator))) + case tar.TypeReg, tar.TypeChar, tar.TypeBlock, tar.TypeFifo, tar.TypeGNUSparse: // = regular + putObject(tarReader, header.FileInfo(), trimLeadingSlash(path.Clean(name))) + default: + // ignore symlink'ed + continue + } + } +}