Add concurrent Snowball extraction + options (#15836)

This commit is contained in:
Klaus Post 2022-10-18 22:50:21 +02:00 committed by GitHub
parent 9fff315555
commit bd3dfad8b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 131 additions and 13 deletions

View File

@ -2282,8 +2282,15 @@ func (api objectAPIHandlers) PutObjectExtractHandler(w http.ResponseWriter, r *h
} }
return nil return nil
} }
var opts untarOptions
opts.ignoreDirs = strings.EqualFold(r.Header.Get(xhttp.MinIOSnowballIgnoreDirs), "true")
opts.ignoreErrs = strings.EqualFold(r.Header.Get(xhttp.MinIOSnowballIgnoreErrors), "true")
opts.prefixAll = r.Header.Get(xhttp.MinIOSnowballPrefix)
if opts.prefixAll != "" {
opts.prefixAll = trimLeadingSlash(pathJoin(opts.prefixAll, slashSeparator))
}
if err = untar(hreader, putObjectTar); err != nil { if err = untar(ctx, hreader, putObjectTar, opts); err != nil {
apiErr := errorCodes.ToAPIErr(s3Err) apiErr := errorCodes.ToAPIErr(s3Err)
// If not set, convert or use BadRequest // If not set, convert or use BadRequest
if s3Err == ErrNone { if s3Err == ErrNone {

View File

@ -1052,6 +1052,13 @@ var poolBuf8k = sync.Pool{
}, },
} }
var poolBuf128k = sync.Pool{
New: func() interface{} {
b := make([]byte, 128<<10)
return b
},
}
// waitForHTTPStream will wait for responses where // waitForHTTPStream will wait for responses where
// streamHTTPResponse has been used. // streamHTTPResponse has been used.
// The returned reader contains the payload and must be closed if no error is returned. // The returned reader contains the payload and must be closed if no error is returned.

View File

@ -22,16 +22,20 @@ import (
"bufio" "bufio"
"bytes" "bytes"
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
"io/fs"
"os" "os"
"path" "path"
"runtime" "runtime"
"sync"
"github.com/cosnicolaou/pbzip2" "github.com/cosnicolaou/pbzip2"
"github.com/klauspost/compress/s2" "github.com/klauspost/compress/s2"
"github.com/klauspost/compress/zstd" "github.com/klauspost/compress/zstd"
gzip "github.com/klauspost/pgzip" gzip "github.com/klauspost/pgzip"
"github.com/minio/minio/internal/logger"
"github.com/pierrec/lz4" "github.com/pierrec/lz4"
) )
@ -97,7 +101,36 @@ var magicHeaders = []struct {
}, },
} }
func untar(r io.Reader, putObject func(reader io.Reader, info os.FileInfo, name string) error) error { type untarOptions struct {
ignoreDirs bool
ignoreErrs bool
prefixAll string
}
// disconnectReader will ensure that no reads can take place on
// the upstream reader after close has been called.
type disconnectReader struct {
r io.Reader
mu sync.Mutex
}
func (d *disconnectReader) Read(p []byte) (n int, err error) {
d.mu.Lock()
defer d.mu.Unlock()
if d.r != nil {
return d.r.Read(p)
}
return 0, errors.New("reader closed")
}
func (d *disconnectReader) Close() error {
d.mu.Lock()
d.r = nil
d.mu.Unlock()
return nil
}
func untar(ctx context.Context, r io.Reader, putObject func(reader io.Reader, info os.FileInfo, name string) error, o untarOptions) error {
bf := bufio.NewReader(r) bf := bufio.NewReader(r)
switch f := detect(bf); f { switch f := detect(bf); f {
case formatGzip: case formatGzip:
@ -117,7 +150,7 @@ func untar(r io.Reader, putObject func(reader io.Reader, info os.FileInfo, name
defer dec.Close() defer dec.Close()
r = dec r = dec
case formatBZ2: case formatBZ2:
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
r = pbzip2.NewReader(ctx, bf, pbzip2.DecompressionOptions( r = pbzip2.NewReader(ctx, bf, pbzip2.DecompressionOptions(
pbzip2.BZConcurrency((runtime.GOMAXPROCS(0)+1)/2), pbzip2.BZConcurrency((runtime.GOMAXPROCS(0)+1)/2),
@ -131,17 +164,32 @@ func untar(r io.Reader, putObject func(reader io.Reader, info os.FileInfo, name
} }
tarReader := tar.NewReader(r) tarReader := tar.NewReader(r)
n := 0 n := 0
for { asyncWriters := make(chan struct{}, 16)
header, err := tarReader.Next() var wg sync.WaitGroup
var asyncErr error
var asyncErrMu sync.Mutex
for {
if !o.ignoreErrs {
asyncErrMu.Lock()
err := asyncErr
asyncErrMu.Unlock()
if err != nil {
return err
}
}
header, err := tarReader.Next()
switch { switch {
// if no more files are found return // if no more files are found return
case err == io.EOF: case err == io.EOF:
return nil wg.Wait()
return asyncErr
// return any other error // return any other error
case err != nil: case err != nil:
wg.Wait()
extra := "" extra := ""
if n > 0 { if n > 0 {
extra = fmt.Sprintf(" after %d successful object(s)", n) extra = fmt.Sprintf(" after %d successful object(s)", n)
@ -160,18 +208,66 @@ func untar(r io.Reader, putObject func(reader io.Reader, info os.FileInfo, name
switch header.Typeflag { switch header.Typeflag {
case tar.TypeDir: // = directory case tar.TypeDir: // = directory
if err := putObject(tarReader, header.FileInfo(), trimLeadingSlash(pathJoin(name, slashSeparator))); err != nil { if o.ignoreDirs {
return err continue
} }
n++ name = trimLeadingSlash(pathJoin(name, slashSeparator))
case tar.TypeReg, tar.TypeChar, tar.TypeBlock, tar.TypeFifo, tar.TypeGNUSparse: // = regular case tar.TypeReg, tar.TypeChar, tar.TypeBlock, tar.TypeFifo, tar.TypeGNUSparse: // = regular
if err := putObject(tarReader, header.FileInfo(), trimLeadingSlash(path.Clean(name))); err != nil { name = trimLeadingSlash(path.Clean(name))
return err
}
n++
default: default:
// ignore symlink'ed // ignore symlink'ed
continue continue
} }
if o.prefixAll != "" {
name = pathJoin(o.prefixAll, name)
}
// Do small files async
n++
if header.Size <= smallFileThreshold {
asyncWriters <- struct{}{}
b := poolBuf128k.Get().([]byte)
if cap(b) < int(header.Size) {
b = make([]byte, smallFileThreshold)
}
b = b[:header.Size]
if _, err := io.ReadFull(tarReader, b); err != nil {
return err
}
wg.Add(1)
go func(name string, fi fs.FileInfo, b []byte) {
rc := disconnectReader{r: bytes.NewReader(b)}
defer func() {
rc.Close()
<-asyncWriters
wg.Done()
poolBuf128k.Put(b)
}()
if err := putObject(&rc, fi, name); err != nil {
if o.ignoreErrs {
logger.LogIf(ctx, err)
return
}
asyncErrMu.Lock()
if asyncErr == nil {
asyncErr = err
}
asyncErrMu.Unlock()
}
}(name, header.FileInfo(), b)
continue
}
// Sync upload.
rc := disconnectReader{r: tarReader}
if err := putObject(&rc, header.FileInfo(), name); err != nil {
rc.Close()
if o.ignoreErrs {
logger.LogIf(ctx, err)
continue
}
return err
}
rc.Close()
} }
} }

View File

@ -91,6 +91,14 @@ const (
AmzBucketReplicationStatus = "X-Amz-Replication-Status" AmzBucketReplicationStatus = "X-Amz-Replication-Status"
AmzSnowballExtract = "X-Amz-Meta-Snowball-Auto-Extract" AmzSnowballExtract = "X-Amz-Meta-Snowball-Auto-Extract"
// MinIOSnowballIgnoreDirs will skip creating empty directory objects.
MinIOSnowballIgnoreDirs = "X-Amz-Meta-Minio-Snowball-Ignore-Dirs"
// MinIOSnowballIgnoreErrors will ignore recoverable errors, typically single files failing to upload.
// An error will be printed to console instead.
MinIOSnowballIgnoreErrors = "X-Amz-Meta-Minio-Snowball-Ignore-Errors"
// MinIOSnowballPrefix will apply this prefix (plus / at end) to all extracted objects
MinIOSnowballPrefix = "X-Amz-Meta-Minio-Snowball-Prefix"
// Object lock enabled // Object lock enabled
AmzObjectLockEnabled = "x-amz-bucket-object-lock-enabled" AmzObjectLockEnabled = "x-amz-bucket-object-lock-enabled"