diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 99adf5689..ae2cbd2a7 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -2282,8 +2282,15 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h } return nil } + var opts untarOptions + opts.ignoreDirs = strings.EqualFold(r.Header.Get(xhttp.MinIOSnowballIgnoreDirs), "true") + opts.ignoreErrs = strings.EqualFold(r.Header.Get(xhttp.MinIOSnowballIgnoreErrors), "true") + opts.prefixAll = r.Header.Get(xhttp.MinIOSnowballPrefix) + if opts.prefixAll != "" { + opts.prefixAll = trimLeadingSlash(pathJoin(opts.prefixAll, slashSeparator)) + } - if err = untar(hreader, putObjectTar); err != nil { + if err = untar(ctx, hreader, putObjectTar, opts); err != nil { apiErr := errorCodes.ToAPIErr(s3Err) // If not set, convert or use BadRequest if s3Err == ErrNone { diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 74e69ab0a..9c33595e3 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -1052,6 +1052,13 @@ var poolBuf8k = sync.Pool{ }, } +var poolBuf128k = sync.Pool{ + New: func() interface{} { + b := make([]byte, 128<<10) + return b + }, +} + // waitForHTTPStream will wait for responses where // streamHTTPResponse has been used. // The returned reader contains the payload and must be closed if no error is returned. diff --git a/cmd/untar.go b/cmd/untar.go index dd489fb8b..3af5fbdb0 100644 --- a/cmd/untar.go +++ b/cmd/untar.go @@ -22,16 +22,20 @@ import ( "bufio" "bytes" "context" + "errors" "fmt" "io" + "io/fs" "os" "path" "runtime" + "sync" "github.com/cosnicolaou/pbzip2" "github.com/klauspost/compress/s2" "github.com/klauspost/compress/zstd" gzip "github.com/klauspost/pgzip" + "github.com/minio/minio/internal/logger" "github.com/pierrec/lz4" ) @@ -97,7 +101,36 @@ var magicHeaders = []struct { }, } -func untar(r io.Reader, putObject func(reader io.Reader, info os.FileInfo, name string) error) error { +type untarOptions struct { + ignoreDirs bool + ignoreErrs bool + prefixAll string +} + +// disconnectReader will ensure that no reads can take place on +// the upstream reader after close has been called. +type disconnectReader struct { + r io.Reader + mu sync.Mutex +} + +func (d *disconnectReader) Read(p []byte) (n int, err error) { + d.mu.Lock() + defer d.mu.Unlock() + if d.r != nil { + return d.r.Read(p) + } + return 0, errors.New("reader closed") +} + +func (d *disconnectReader) Close() error { + d.mu.Lock() + d.r = nil + d.mu.Unlock() + return nil +} + +func untar(ctx context.Context, r io.Reader, putObject func(reader io.Reader, info os.FileInfo, name string) error, o untarOptions) error { bf := bufio.NewReader(r) switch f := detect(bf); f { case formatGzip: @@ -117,7 +150,7 @@ func untar(r io.Reader, putObject func(reader io.Reader, info os.FileInfo, name defer dec.Close() r = dec case formatBZ2: - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) defer cancel() r = pbzip2.NewReader(ctx, bf, pbzip2.DecompressionOptions( pbzip2.BZConcurrency((runtime.GOMAXPROCS(0)+1)/2), @@ -131,17 +164,32 @@ func untar(r io.Reader, putObject func(reader io.Reader, info os.FileInfo, name } tarReader := tar.NewReader(r) n := 0 - for { - header, err := tarReader.Next() + asyncWriters := make(chan struct{}, 16) + var wg sync.WaitGroup + var asyncErr error + var asyncErrMu sync.Mutex + for { + if !o.ignoreErrs { + asyncErrMu.Lock() + err := asyncErr + asyncErrMu.Unlock() + if err != nil { + return err + } + } + + header, err := tarReader.Next() switch { // if no more files are found return case err == io.EOF: - return nil + wg.Wait() + return asyncErr // return any other error case err != nil: + wg.Wait() extra := "" if n > 0 { extra = fmt.Sprintf(" after %d successful object(s)", n) @@ -160,18 +208,66 @@ func untar(r io.Reader, putObject func(reader io.Reader, info os.FileInfo, name switch header.Typeflag { case tar.TypeDir: // = directory - if err := putObject(tarReader, header.FileInfo(), trimLeadingSlash(pathJoin(name, slashSeparator))); err != nil { - return err + if o.ignoreDirs { + continue } - n++ + name = trimLeadingSlash(pathJoin(name, slashSeparator)) case tar.TypeReg, tar.TypeChar, tar.TypeBlock, tar.TypeFifo, tar.TypeGNUSparse: // = regular - if err := putObject(tarReader, header.FileInfo(), trimLeadingSlash(path.Clean(name))); err != nil { - return err - } - n++ + name = trimLeadingSlash(path.Clean(name)) default: // ignore symlink'ed continue } + if o.prefixAll != "" { + name = pathJoin(o.prefixAll, name) + } + + // Do small files async + n++ + if header.Size <= smallFileThreshold { + asyncWriters <- struct{}{} + b := poolBuf128k.Get().([]byte) + if cap(b) < int(header.Size) { + b = make([]byte, smallFileThreshold) + } + b = b[:header.Size] + if _, err := io.ReadFull(tarReader, b); err != nil { + return err + } + wg.Add(1) + go func(name string, fi fs.FileInfo, b []byte) { + rc := disconnectReader{r: bytes.NewReader(b)} + defer func() { + rc.Close() + <-asyncWriters + wg.Done() + poolBuf128k.Put(b) + }() + if err := putObject(&rc, fi, name); err != nil { + if o.ignoreErrs { + logger.LogIf(ctx, err) + return + } + asyncErrMu.Lock() + if asyncErr == nil { + asyncErr = err + } + asyncErrMu.Unlock() + } + }(name, header.FileInfo(), b) + continue + } + + // Sync upload. + rc := disconnectReader{r: tarReader} + if err := putObject(&rc, header.FileInfo(), name); err != nil { + rc.Close() + if o.ignoreErrs { + logger.LogIf(ctx, err) + continue + } + return err + } + rc.Close() } } diff --git a/internal/http/headers.go b/internal/http/headers.go index 5ce24d198..68b1f9a7a 100644 --- a/internal/http/headers.go +++ b/internal/http/headers.go @@ -91,6 +91,14 @@ const ( AmzBucketReplicationStatus = "X-Amz-Replication-Status" AmzSnowballExtract = "X-Amz-Meta-Snowball-Auto-Extract" + // MinIOSnowballIgnoreDirs will skip creating empty directory objects. + MinIOSnowballIgnoreDirs = "X-Amz-Meta-Minio-Snowball-Ignore-Dirs" + // MinIOSnowballIgnoreErrors will ignore recoverable errors, typically single files failing to upload. + // An error will be printed to console instead. + MinIOSnowballIgnoreErrors = "X-Amz-Meta-Minio-Snowball-Ignore-Errors" + // MinIOSnowballPrefix will apply this prefix (plus / at end) to all extracted objects + MinIOSnowballPrefix = "X-Amz-Meta-Minio-Snowball-Prefix" + // Object lock enabled AmzObjectLockEnabled = "x-amz-bucket-object-lock-enabled"