mirror of
https://github.com/minio/minio.git
synced 2025-04-21 02:54:24 -04:00
fix: improve NewObjectReader implementation for careful cleanup usage (#12199)
cleanup functions should never be cleaned before the reader is instantiated, this type of design leads to situations where order of lockers and places for them to use becomes confusing. Allow WithCleanupFuncs() if the caller wishes to add cleanupFns to be run upon close() or an error during initialization of the reader. Also make sure streams are closed before we unlock the resources, this allows for ordered cleanup of resources.
This commit is contained in:
parent
3524c00090
commit
0d3ddf7286
@ -325,6 +325,7 @@ func getTransitionedObjectReader(ctx context.Context, bucket, object string, rs
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("transition storage class not configured")
|
return nil, fmt.Errorf("transition storage class not configured")
|
||||||
}
|
}
|
||||||
|
|
||||||
fn, off, length, err := NewGetObjectReader(rs, oi, opts)
|
fn, off, length, err := NewGetObjectReader(rs, oi, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, ErrorRespToObjectError(err, bucket, object)
|
return nil, ErrorRespToObjectError(err, bucket, object)
|
||||||
|
@ -936,16 +936,16 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang
|
|||||||
objInfo.Size = rngInfo.Size
|
objInfo.Size = rngInfo.Size
|
||||||
rs = nil
|
rs = nil
|
||||||
}
|
}
|
||||||
var nsUnlocker = func() {}
|
|
||||||
// For a directory, we need to send an reader that returns no bytes.
|
// For a directory, we need to send an reader that returns no bytes.
|
||||||
if HasSuffix(object, SlashSeparator) {
|
if HasSuffix(object, SlashSeparator) {
|
||||||
// The lock taken above is released when
|
// The lock taken above is released when
|
||||||
// objReader.Close() is called by the caller.
|
// objReader.Close() is called by the caller.
|
||||||
gr, gerr := NewGetObjectReaderFromReader(bytes.NewBuffer(nil), objInfo, opts, nsUnlocker)
|
gr, gerr := NewGetObjectReaderFromReader(bytes.NewBuffer(nil), objInfo, opts)
|
||||||
return gr, numHits, gerr
|
return gr, numHits, gerr
|
||||||
}
|
}
|
||||||
|
|
||||||
fn, off, length, nErr := NewGetObjectReader(rs, objInfo, opts, nsUnlocker)
|
fn, off, length, nErr := NewGetObjectReader(rs, objInfo, opts)
|
||||||
if nErr != nil {
|
if nErr != nil {
|
||||||
return nil, numHits, nErr
|
return nil, numHits, nErr
|
||||||
}
|
}
|
||||||
|
@ -186,25 +186,30 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
|
|||||||
if objInfo.TransitionStatus == lifecycle.TransitionComplete {
|
if objInfo.TransitionStatus == lifecycle.TransitionComplete {
|
||||||
// If transitioned, stream from transition tier unless object is restored locally or restore date is past.
|
// If transitioned, stream from transition tier unless object is restored locally or restore date is past.
|
||||||
if onDisk := isRestoredObjectOnDisk(objInfo.UserDefined); !onDisk {
|
if onDisk := isRestoredObjectOnDisk(objInfo.UserDefined); !onDisk {
|
||||||
return getTransitionedObjectReader(ctx, bucket, object, rs, h, objInfo, opts)
|
gr, err := getTransitionedObjectReader(ctx, bucket, object, rs, h, objInfo, opts)
|
||||||
}
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
unlockOnDefer = false
|
unlockOnDefer = false
|
||||||
fn, off, length, nErr := NewGetObjectReader(rs, objInfo, opts, nsUnlocker)
|
return gr.WithCleanupFuncs(nsUnlocker), nil
|
||||||
if nErr != nil {
|
|
||||||
return nil, nErr
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn, off, length, err := NewGetObjectReader(rs, objInfo, opts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
unlockOnDefer = false
|
||||||
pr, pw := io.Pipe()
|
pr, pw := io.Pipe()
|
||||||
go func() {
|
go func() {
|
||||||
err := er.getObjectWithFileInfo(ctx, bucket, object, off, length, pw, fi, metaArr, onlineDisks)
|
pw.CloseWithError(er.getObjectWithFileInfo(ctx, bucket, object, off, length, pw, fi, metaArr, onlineDisks))
|
||||||
pw.CloseWithError(err)
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Cleanup function to cause the go routine above to exit, in
|
// Cleanup function to cause the go routine above to exit, in
|
||||||
// case of incomplete read.
|
// case of incomplete read.
|
||||||
pipeCloser := func() { pr.Close() }
|
pipeCloser := func() { pr.Close() }
|
||||||
|
|
||||||
return fn(pr, h, opts.CheckPrecondFn, pipeCloser)
|
return fn(pr, h, opts.CheckPrecondFn, pipeCloser, nsUnlocker)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetObject - reads an object erasured coded across multiple
|
// GetObject - reads an object erasured coded across multiple
|
||||||
|
@ -748,8 +748,10 @@ func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string,
|
|||||||
rwPoolUnlocker = func() { fs.rwPool.Close(fsMetaPath) }
|
rwPoolUnlocker = func() { fs.rwPool.Close(fsMetaPath) }
|
||||||
}
|
}
|
||||||
|
|
||||||
objReaderFn, off, length, err := NewGetObjectReader(rs, objInfo, opts, nsUnlocker, rwPoolUnlocker)
|
objReaderFn, off, length, err := NewGetObjectReader(rs, objInfo, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
rwPoolUnlocker()
|
||||||
|
nsUnlocker()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -777,7 +779,7 @@ func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string,
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return objReaderFn(reader, h, opts.CheckPrecondFn, closeFn)
|
return objReaderFn(reader, h, opts.CheckPrecondFn, closeFn, rwPoolUnlocker, nsUnlocker)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getObject - wrapper for GetObject
|
// getObject - wrapper for GetObject
|
||||||
|
@ -554,14 +554,20 @@ func getCompressedOffsets(objectInfo ObjectInfo, offset int64) (compressedOffset
|
|||||||
// GetObjectReader is a type that wraps a reader with a lock to
|
// GetObjectReader is a type that wraps a reader with a lock to
|
||||||
// provide a ReadCloser interface that unlocks on Close()
|
// provide a ReadCloser interface that unlocks on Close()
|
||||||
type GetObjectReader struct {
|
type GetObjectReader struct {
|
||||||
|
io.Reader
|
||||||
ObjInfo ObjectInfo
|
ObjInfo ObjectInfo
|
||||||
pReader io.Reader
|
|
||||||
|
|
||||||
cleanUpFns []func()
|
cleanUpFns []func()
|
||||||
opts ObjectOptions
|
opts ObjectOptions
|
||||||
once sync.Once
|
once sync.Once
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithCleanupFuncs sets additional cleanup functions to be called when closing
|
||||||
|
// the GetObjectReader.
|
||||||
|
func (g *GetObjectReader) WithCleanupFuncs(fns ...func()) *GetObjectReader {
|
||||||
|
g.cleanUpFns = append(g.cleanUpFns, fns...)
|
||||||
|
return g
|
||||||
|
}
|
||||||
|
|
||||||
// NewGetObjectReaderFromReader sets up a GetObjectReader with a given
|
// NewGetObjectReaderFromReader sets up a GetObjectReader with a given
|
||||||
// reader. This ignores any object properties.
|
// reader. This ignores any object properties.
|
||||||
func NewGetObjectReaderFromReader(r io.Reader, oi ObjectInfo, opts ObjectOptions, cleanupFns ...func()) (*GetObjectReader, error) {
|
func NewGetObjectReaderFromReader(r io.Reader, oi ObjectInfo, opts ObjectOptions, cleanupFns ...func()) (*GetObjectReader, error) {
|
||||||
@ -574,7 +580,7 @@ func NewGetObjectReaderFromReader(r io.Reader, oi ObjectInfo, opts ObjectOptions
|
|||||||
}
|
}
|
||||||
return &GetObjectReader{
|
return &GetObjectReader{
|
||||||
ObjInfo: oi,
|
ObjInfo: oi,
|
||||||
pReader: r,
|
Reader: r,
|
||||||
cleanUpFns: cleanupFns,
|
cleanUpFns: cleanupFns,
|
||||||
opts: opts,
|
opts: opts,
|
||||||
}, nil
|
}, nil
|
||||||
@ -587,26 +593,16 @@ func NewGetObjectReaderFromReader(r io.Reader, oi ObjectInfo, opts ObjectOptions
|
|||||||
type ObjReaderFn func(inputReader io.Reader, h http.Header, pcfn CheckPreconditionFn, cleanupFns ...func()) (r *GetObjectReader, err error)
|
type ObjReaderFn func(inputReader io.Reader, h http.Header, pcfn CheckPreconditionFn, cleanupFns ...func()) (r *GetObjectReader, err error)
|
||||||
|
|
||||||
// NewGetObjectReader creates a new GetObjectReader. The cleanUpFns
|
// NewGetObjectReader creates a new GetObjectReader. The cleanUpFns
|
||||||
// are called on Close() in reverse order as passed here. NOTE: It is
|
// are called on Close() in FIFO order as passed in ObjReadFn(). NOTE: It is
|
||||||
// assumed that clean up functions do not panic (otherwise, they may
|
// assumed that clean up functions do not panic (otherwise, they may
|
||||||
// not all run!).
|
// not all run!).
|
||||||
func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cleanUpFns ...func()) (
|
func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions) (
|
||||||
fn ObjReaderFn, off, length int64, err error) {
|
fn ObjReaderFn, off, length int64, err error) {
|
||||||
|
|
||||||
if rs == nil && opts.PartNumber > 0 {
|
if rs == nil && opts.PartNumber > 0 {
|
||||||
rs = partNumberToRangeSpec(oi, opts.PartNumber)
|
rs = partNumberToRangeSpec(oi, opts.PartNumber)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call the clean-up functions immediately in case of exit
|
|
||||||
// with error
|
|
||||||
defer func() {
|
|
||||||
if err != nil {
|
|
||||||
for i := len(cleanUpFns) - 1; i >= 0; i-- {
|
|
||||||
cleanUpFns[i]()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
_, isEncrypted := crypto.IsEncrypted(oi.UserDefined)
|
_, isEncrypted := crypto.IsEncrypted(oi.UserDefined)
|
||||||
isCompressed, err := oi.IsCompressedOK()
|
isCompressed, err := oi.IsCompressedOK()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -658,11 +654,9 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
fn = func(inputReader io.Reader, h http.Header, pcfn CheckPreconditionFn, cFns ...func()) (r *GetObjectReader, err error) {
|
fn = func(inputReader io.Reader, h http.Header, pcfn CheckPreconditionFn, cFns ...func()) (r *GetObjectReader, err error) {
|
||||||
cFns = append(cleanUpFns, cFns...)
|
|
||||||
if opts.CheckPrecondFn != nil && opts.CheckPrecondFn(oi) {
|
if opts.CheckPrecondFn != nil && opts.CheckPrecondFn(oi) {
|
||||||
// Call the cleanup funcs
|
for _, cFn := range cFns {
|
||||||
for i := len(cFns) - 1; i >= 0; i-- {
|
cFn()
|
||||||
cFns[i]()
|
|
||||||
}
|
}
|
||||||
return nil, PreConditionFailed{}
|
return nil, PreConditionFailed{}
|
||||||
}
|
}
|
||||||
@ -672,8 +666,8 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl
|
|||||||
inputReader, err = DecryptBlocksRequestR(inputReader, h, 0, firstPart, oi, copySource)
|
inputReader, err = DecryptBlocksRequestR(inputReader, h, 0, firstPart, oi, copySource)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Call the cleanup funcs
|
// Call the cleanup funcs
|
||||||
for i := len(cFns) - 1; i >= 0; i-- {
|
for _, cFn := range cFns {
|
||||||
cFns[i]()
|
cFn()
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -685,8 +679,8 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl
|
|||||||
if decOff > 0 {
|
if decOff > 0 {
|
||||||
if err = s2Reader.Skip(decOff); err != nil {
|
if err = s2Reader.Skip(decOff); err != nil {
|
||||||
// Call the cleanup funcs
|
// Call the cleanup funcs
|
||||||
for i := len(cFns) - 1; i >= 0; i-- {
|
for _, cFn := range cFns {
|
||||||
cFns[i]()
|
cFn()
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -697,9 +691,9 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl
|
|||||||
rah, err := readahead.NewReaderSize(decReader, compReadAheadBuffers, compReadAheadBufSize)
|
rah, err := readahead.NewReaderSize(decReader, compReadAheadBuffers, compReadAheadBufSize)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
decReader = rah
|
decReader = rah
|
||||||
cFns = append(cFns, func() {
|
cFns = append([]func(){func() {
|
||||||
rah.Close()
|
rah.Close()
|
||||||
})
|
}}, cFns...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
oi.Size = decLength
|
oi.Size = decLength
|
||||||
@ -707,7 +701,7 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl
|
|||||||
// Assemble the GetObjectReader
|
// Assemble the GetObjectReader
|
||||||
r = &GetObjectReader{
|
r = &GetObjectReader{
|
||||||
ObjInfo: oi,
|
ObjInfo: oi,
|
||||||
pReader: decReader,
|
Reader: decReader,
|
||||||
cleanUpFns: cFns,
|
cleanUpFns: cFns,
|
||||||
opts: opts,
|
opts: opts,
|
||||||
}
|
}
|
||||||
@ -741,7 +735,6 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl
|
|||||||
fn = func(inputReader io.Reader, h http.Header, pcfn CheckPreconditionFn, cFns ...func()) (r *GetObjectReader, err error) {
|
fn = func(inputReader io.Reader, h http.Header, pcfn CheckPreconditionFn, cFns ...func()) (r *GetObjectReader, err error) {
|
||||||
copySource := h.Get(xhttp.AmzServerSideEncryptionCopyCustomerAlgorithm) != ""
|
copySource := h.Get(xhttp.AmzServerSideEncryptionCopyCustomerAlgorithm) != ""
|
||||||
|
|
||||||
cFns = append(cleanUpFns, cFns...)
|
|
||||||
// Attach decrypter on inputReader
|
// Attach decrypter on inputReader
|
||||||
var decReader io.Reader
|
var decReader io.Reader
|
||||||
decReader, err = DecryptBlocksRequestR(inputReader, h, seqNumber, partStart, oi, copySource)
|
decReader, err = DecryptBlocksRequestR(inputReader, h, seqNumber, partStart, oi, copySource)
|
||||||
@ -770,7 +763,7 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl
|
|||||||
// Assemble the GetObjectReader
|
// Assemble the GetObjectReader
|
||||||
r = &GetObjectReader{
|
r = &GetObjectReader{
|
||||||
ObjInfo: oi,
|
ObjInfo: oi,
|
||||||
pReader: decReader,
|
Reader: decReader,
|
||||||
cleanUpFns: cFns,
|
cleanUpFns: cFns,
|
||||||
opts: opts,
|
opts: opts,
|
||||||
}
|
}
|
||||||
@ -783,7 +776,6 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl
|
|||||||
return nil, 0, 0, err
|
return nil, 0, 0, err
|
||||||
}
|
}
|
||||||
fn = func(inputReader io.Reader, _ http.Header, pcfn CheckPreconditionFn, cFns ...func()) (r *GetObjectReader, err error) {
|
fn = func(inputReader io.Reader, _ http.Header, pcfn CheckPreconditionFn, cFns ...func()) (r *GetObjectReader, err error) {
|
||||||
cFns = append(cleanUpFns, cFns...)
|
|
||||||
if opts.CheckPrecondFn != nil && opts.CheckPrecondFn(oi) {
|
if opts.CheckPrecondFn != nil && opts.CheckPrecondFn(oi) {
|
||||||
// Call the cleanup funcs
|
// Call the cleanup funcs
|
||||||
for i := len(cFns) - 1; i >= 0; i-- {
|
for i := len(cFns) - 1; i >= 0; i-- {
|
||||||
@ -793,7 +785,7 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions, cl
|
|||||||
}
|
}
|
||||||
r = &GetObjectReader{
|
r = &GetObjectReader{
|
||||||
ObjInfo: oi,
|
ObjInfo: oi,
|
||||||
pReader: inputReader,
|
Reader: inputReader,
|
||||||
cleanUpFns: cFns,
|
cleanUpFns: cFns,
|
||||||
opts: opts,
|
opts: opts,
|
||||||
}
|
}
|
||||||
@ -815,11 +807,6 @@ func (g *GetObjectReader) Close() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read - to implement Reader interface.
|
|
||||||
func (g *GetObjectReader) Read(p []byte) (n int, err error) {
|
|
||||||
return g.pReader.Read(p)
|
|
||||||
}
|
|
||||||
|
|
||||||
//SealMD5CurrFn seals md5sum with object encryption key and returns sealed
|
//SealMD5CurrFn seals md5sum with object encryption key and returns sealed
|
||||||
// md5sum
|
// md5sum
|
||||||
type SealMD5CurrFn func([]byte) []byte
|
type SealMD5CurrFn func([]byte) []byte
|
||||||
|
Loading…
x
Reference in New Issue
Block a user