[feat] Add targz transparent extract support (#11849)

This feature brings in support for auto extraction
of objects onto MinIO's namespace from an incoming
tar gzipped stream, the only expected metadata sent
by the client is to set `snowball-auto-extract`.

All the contents from the tar stream are saved as
folders and objects on the namespace.

fixes #8715
This commit is contained in:
Harshavardhana 2021-03-26 17:15:09 -07:00 committed by GitHub
parent df42b128db
commit d8bda2dd92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 469 additions and 1 deletions

View File

@ -178,9 +178,14 @@ func registerAPIRouter(router *mux.Router) {
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc( bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(
collectAPIStats("putobjectlegalhold", maxClients(httpTraceAll(api.PutObjectLegalHoldHandler)))).Queries("legal-hold", "") collectAPIStats("putobjectlegalhold", maxClients(httpTraceAll(api.PutObjectLegalHoldHandler)))).Queries("legal-hold", "")
// PutObject with auto-extract support for zip
bucket.Methods(http.MethodPut).Path("/{object:.+}").HeadersRegexp(xhttp.AmzSnowballExtract, "true").HandlerFunc(
collectAPIStats("putobject", maxClients(httpTraceHdrs(api.PutObjectExtractHandler))))
// PutObject // PutObject
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc( bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(
collectAPIStats("putobject", maxClients(httpTraceHdrs(api.PutObjectHandler)))) collectAPIStats("putobject", maxClients(httpTraceHdrs(api.PutObjectHandler))))
// DeleteObject // DeleteObject
bucket.Methods(http.MethodDelete).Path("/{object:.+}").HandlerFunc( bucket.Methods(http.MethodDelete).Path("/{object:.+}").HandlerFunc(
collectAPIStats("deleteobject", maxClients(httpTraceAll(api.DeleteObjectHandler)))) collectAPIStats("deleteobject", maxClients(httpTraceAll(api.DeleteObjectHandler))))

28
cmd/format_string.go Normal file
View File

@ -0,0 +1,28 @@
// Code generated by "stringer -type=format -trimprefix=format untar.go"; DO NOT EDIT.
package cmd
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[formatUnknown-0]
_ = x[formatGzip-1]
_ = x[formatZstd-2]
_ = x[formatLZ4-3]
_ = x[formatS2-4]
_ = x[formatBZ2-5]
}
const _format_name = "UnknownGzipZstdLZ4S2BZ2"
var _format_index = [...]uint8{0, 7, 11, 15, 18, 20, 23}
func (i format) String() string {
if i < 0 || i >= format(len(_format_index)-1) {
return "format(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _format_name[_format_index[i]:_format_index[i+1]]
}

View File

@ -88,6 +88,8 @@ const (
AmzObjectLockLegalHold = "X-Amz-Object-Lock-Legal-Hold" AmzObjectLockLegalHold = "X-Amz-Object-Lock-Legal-Hold"
AmzObjectLockBypassGovernance = "X-Amz-Bypass-Governance-Retention" AmzObjectLockBypassGovernance = "X-Amz-Bypass-Governance-Retention"
AmzBucketReplicationStatus = "X-Amz-Replication-Status" AmzBucketReplicationStatus = "X-Amz-Replication-Status"
AmzSnowballExtract = "X-Amz-Meta-Snowball-Auto-Extract"
// Multipart parts count // Multipart parts count
AmzMpPartsCount = "x-amz-mp-parts-count" AmzMpPartsCount = "x-amz-mp-parts-count"

View File

@ -19,17 +19,18 @@ package cmd
import ( import (
"bufio" "bufio"
"context" "context"
"encoding/hex"
"encoding/xml" "encoding/xml"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url" "net/url"
"os"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
@ -113,6 +114,7 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrBadRequest), r.URL, guessIsBrowserReq(r)) writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrBadRequest), r.URL, guessIsBrowserReq(r))
return return
} }
vars := mux.Vars(r) vars := mux.Vars(r)
bucket := vars["bucket"] bucket := vars["bucket"]
object, err := unescapePath(vars["object"]) object, err := unescapePath(vars["object"])
@ -1672,6 +1674,281 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
}) })
} }
// PutObjectExtractHandler - PUT Object extract is an extended API
// based off from AWS Snowball feature to auto extract compressed
// stream will be extracted in the same directory it is stored in
// and the folder structures will be built out accordingly.
func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "PutObjectExtract")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL, guessIsBrowserReq(r))
return
}
if crypto.S3KMS.IsRequested(r.Header) { // SSE-KMS is not supported
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r))
return
}
if _, ok := crypto.IsRequested(r.Header); !objectAPI.IsEncryptionSupported() && ok {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r))
return
}
vars := mux.Vars(r)
bucket := vars["bucket"]
object, err := unescapePath(vars["object"])
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
// X-Amz-Copy-Source shouldn't be set for this call.
if _, ok := r.Header[xhttp.AmzCopySource]; ok {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopySource), r.URL, guessIsBrowserReq(r))
return
}
// Validate storage class metadata if present
sc := r.Header.Get(xhttp.AmzStorageClass)
if sc != "" {
if !storageclass.IsValid(sc) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidStorageClass), r.URL, guessIsBrowserReq(r))
return
}
}
clientETag, err := etag.FromContentMD5(r.Header)
if err != nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidDigest), r.URL, guessIsBrowserReq(r))
return
}
/// if Content-Length is unknown/missing, deny the request
size := r.ContentLength
rAuthType := getRequestAuthType(r)
if rAuthType == authTypeStreamingSigned {
if sizeStr, ok := r.Header[xhttp.AmzDecodedContentLength]; ok {
if sizeStr[0] == "" {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL, guessIsBrowserReq(r))
return
}
size, err = strconv.ParseInt(sizeStr[0], 10, 64)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
}
}
if size == -1 {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrMissingContentLength), r.URL, guessIsBrowserReq(r))
return
}
/// maximum Upload size for objects in a single operation
if isMaxObjectSize(size) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrEntityTooLarge), r.URL, guessIsBrowserReq(r))
return
}
var (
md5hex = clientETag.String()
sha256hex = ""
reader io.Reader = r.Body
s3Err APIErrorCode
putObject = objectAPI.PutObject
)
// Check if put is allowed
if s3Err = isPutActionAllowed(ctx, rAuthType, bucket, object, r, iampolicy.PutObjectAction); s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r))
return
}
switch rAuthType {
case authTypeStreamingSigned:
// Initialize stream signature verifier.
reader, s3Err = newSignV4ChunkedReader(r)
if s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r))
return
}
case authTypeSignedV2, authTypePresignedV2:
s3Err = isReqAuthenticatedV2(r)
if s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r))
return
}
case authTypePresigned, authTypeSigned:
if s3Err = reqSignatureV4Verify(r, globalServerRegion, serviceS3); s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r))
return
}
if !skipContentSha256Cksum(r) {
sha256hex = getContentSha256Cksum(r, serviceS3)
}
}
hreader, err := hash.NewReader(reader, size, md5hex, sha256hex, size)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
if err := enforceBucketQuota(ctx, bucket, size); err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
// Check if bucket encryption is enabled
_, err = globalBucketSSEConfigSys.Get(bucket)
// This request header needs to be set prior to setting ObjectOptions
if (globalAutoEncryption || err == nil) && !crypto.SSEC.IsRequested(r.Header) {
r.Header.Set(xhttp.AmzServerSideEncryption, xhttp.AmzEncryptionAES)
}
retPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, iampolicy.PutObjectRetentionAction)
holdPerms := isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, iampolicy.PutObjectLegalHoldAction)
if api.CacheAPI() != nil {
putObject = api.CacheAPI().PutObject
}
getObjectInfo := objectAPI.GetObjectInfo
if api.CacheAPI() != nil {
getObjectInfo = api.CacheAPI().GetObjectInfo
}
putObjectTar := func(reader io.Reader, info os.FileInfo, object string) {
size := info.Size()
metadata := map[string]string{
xhttp.AmzStorageClass: sc,
}
actualSize := size
if objectAPI.IsCompressionSupported() && isCompressible(r.Header, object) && size > 0 {
// Storing the compression metadata.
metadata[ReservedMetadataPrefix+"compression"] = compressionAlgorithmV2
metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(size, 10)
actualReader, err := hash.NewReader(reader, size, "", "", actualSize)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
// Set compression metrics.
s2c := newS2CompressReader(actualReader, actualSize)
defer s2c.Close()
reader = etag.Wrap(s2c, actualReader)
size = -1 // Since compressed size is un-predictable.
}
hashReader, err := hash.NewReader(reader, size, "", "", actualSize)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
rawReader := hashReader
pReader := NewPutObjReader(rawReader)
// get encryption options
opts, err := putOpts(ctx, r, bucket, object, metadata)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
opts.MTime = info.ModTime()
retentionMode, retentionDate, legalHold, s3Err := checkPutObjectLockAllowed(ctx, r, bucket, object, getObjectInfo, retPerms, holdPerms)
if s3Err == ErrNone && retentionMode.Valid() {
metadata[strings.ToLower(xhttp.AmzObjectLockMode)] = string(retentionMode)
metadata[strings.ToLower(xhttp.AmzObjectLockRetainUntilDate)] = retentionDate.UTC().Format(iso8601TimeFormat)
}
if s3Err == ErrNone && legalHold.Status.Valid() {
metadata[strings.ToLower(xhttp.AmzObjectLockLegalHold)] = string(legalHold.Status)
}
if s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r))
return
}
if ok, _ := mustReplicate(ctx, r, bucket, object, metadata, ""); ok {
metadata[xhttp.AmzBucketReplicationStatus] = replication.Pending.String()
}
if r.Header.Get(xhttp.AmzBucketReplicationStatus) == replication.Replica.String() {
if s3Err = isPutActionAllowed(ctx, getRequestAuthType(r), bucket, object, r, iampolicy.ReplicateObjectAction); s3Err != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Err), r.URL, guessIsBrowserReq(r))
return
}
}
var objectEncryptionKey crypto.ObjectKey
if objectAPI.IsEncryptionSupported() {
if _, ok := crypto.IsRequested(r.Header); ok && !HasSuffix(object, SlashSeparator) { // handle SSE requests
if crypto.SSECopy.IsRequested(r.Header) {
writeErrorResponse(ctx, w, toAPIError(ctx, errInvalidEncryptionParameters), r.URL, guessIsBrowserReq(r))
return
}
reader, objectEncryptionKey, err = EncryptRequest(hashReader, r, bucket, object, metadata)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
wantSize := int64(-1)
if size >= 0 {
info := ObjectInfo{Size: size}
wantSize = info.EncryptedSize()
}
// do not try to verify encrypted content
hashReader, err = hash.NewReader(etag.Wrap(reader, hashReader), wantSize, "", "", actualSize)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
pReader, err = pReader.WithEncryption(hashReader, &objectEncryptionKey)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
}
}
// Ensure that metadata does not contain sensitive information
crypto.RemoveSensitiveEntries(metadata)
// Create the object..
objInfo, err := putObject(ctx, bucket, object, pReader, opts)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
return
}
if replicate, sync := mustReplicate(ctx, r, bucket, object, metadata, ""); replicate {
scheduleReplication(ctx, objInfo.Clone(), objectAPI, sync)
}
}
untar(hreader, putObjectTar)
w.Header()[xhttp.ETag] = []string{`"` + hex.EncodeToString(hreader.MD5Current()) + `"`}
writeSuccessResponseHeadersOnly(w)
}
/// Multipart objectAPIHandlers /// Multipart objectAPIHandlers
// NewMultipartUploadHandler - New multipart upload. // NewMultipartUploadHandler - New multipart upload.

156
cmd/untar.go Normal file
View File

@ -0,0 +1,156 @@
/*
* MinIO Cloud Storage, (C) 2021 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"archive/tar"
"bufio"
"bytes"
"compress/bzip2"
"fmt"
"io"
"os"
"path"
"github.com/klauspost/compress/s2"
"github.com/klauspost/compress/zstd"
gzip "github.com/klauspost/pgzip"
"github.com/pierrec/lz4"
)
func detect(r *bufio.Reader) format {
z, err := r.Peek(4)
if err != nil {
return formatUnknown
}
for _, f := range magicHeaders {
if bytes.Equal(f.header, z[:len(f.header)]) {
return f.f
}
}
return formatUnknown
}
//go:generate stringer -type=format -trimprefix=format $GOFILE
type format int
const (
formatUnknown format = iota
formatGzip
formatZstd
formatLZ4
formatS2
formatBZ2
)
var magicHeaders = []struct {
header []byte
f format
}{
{
header: []byte{0x1f, 0x8b, 8},
f: formatGzip,
},
{
// Zstd default header.
header: []byte{0x28, 0xb5, 0x2f, 0xfd},
f: formatZstd,
},
{
// Zstd skippable frame header.
header: []byte{0x2a, 0x4d, 0x18},
f: formatZstd,
},
{
// LZ4
header: []byte{0x4, 0x22, 0x4d, 0x18},
f: formatLZ4,
},
{
// Snappy/S2 stream
header: []byte{0xff, 0x06, 0x00, 0x00},
f: formatS2,
},
{
header: []byte{0x42, 0x5a, 'h'},
f: formatBZ2,
},
}
func untar(r io.Reader, putObject func(reader io.Reader, info os.FileInfo, name string)) error {
bf := bufio.NewReader(r)
switch f := detect(bf); f {
case formatGzip:
gz, err := gzip.NewReader(bf)
if err != nil {
return err
}
defer gz.Close()
r = gz
case formatS2:
r = s2.NewReader(bf)
case formatZstd:
dec, err := zstd.NewReader(bf)
if err != nil {
return err
}
defer dec.Close()
r = dec
case formatBZ2:
r = bzip2.NewReader(bf)
case formatLZ4:
r = lz4.NewReader(bf)
case formatUnknown:
r = bf
default:
return fmt.Errorf("Unsupported format %s", f)
}
tarReader := tar.NewReader(r)
for {
header, err := tarReader.Next()
switch {
// if no more files are found return
case err == io.EOF:
return nil
// return any other error
case err != nil:
return err
// if the header is nil, just skip it (not sure how this happens)
case header == nil:
continue
}
name := header.Name
if name == slashSeparator {
continue
}
switch header.Typeflag {
case tar.TypeDir: // = directory
putObject(tarReader, header.FileInfo(), trimLeadingSlash(pathJoin(name, slashSeparator)))
case tar.TypeReg, tar.TypeChar, tar.TypeBlock, tar.TypeFifo, tar.TypeGNUSparse: // = regular
putObject(tarReader, header.FileInfo(), trimLeadingSlash(path.Clean(name)))
default:
// ignore symlink'ed
continue
}
}
}