diff --git a/cmd/bucket-lifecycle.go b/cmd/bucket-lifecycle.go index 05f229301..5f5771961 100644 --- a/cmd/bucket-lifecycle.go +++ b/cmd/bucket-lifecycle.go @@ -342,7 +342,7 @@ func getTransitionedObjectReader(ctx context.Context, bucket, object string, rs closer := func() { reader.Close() } - return fn(reader, h, opts.CheckPrecondFn, closer) + return fn(reader, h, closer) } // RestoreRequestType represents type of restore. diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 6799fdb26..eeda5dd91 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -1122,7 +1122,7 @@ func proxyGetToReplicationTarget(ctx context.Context, bucket, object string, rs } closeReader := func() { obj.Close() } - reader, err := fn(obj, h, opts.CheckPrecondFn, closeReader) + reader, err := fn(obj, h, closeReader) if err != nil { return nil, false } diff --git a/cmd/disk-cache-backend.go b/cmd/disk-cache-backend.go index c68732cbb..92d5d1f24 100644 --- a/cmd/disk-cache-backend.go +++ b/cmd/disk-cache-backend.go @@ -963,7 +963,7 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang // case of incomplete read. pipeCloser := func() { pr.CloseWithError(nil) } - gr, gerr := fn(pr, h, opts.CheckPrecondFn, pipeCloser) + gr, gerr := fn(pr, h, pipeCloser) if gerr != nil { return gr, numHits, gerr } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 2a736f936..e804f9c93 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -212,7 +212,7 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri pr.CloseWithError(nil) } - return fn(pr, h, opts.CheckPrecondFn, pipeCloser, nsUnlocker) + return fn(pr, h, pipeCloser, nsUnlocker) } func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI) error { diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index cdb479afb..e5a86b941 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -617,114 +617,44 @@ func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object unlockOnDefer = true } - lockType = noLock // do not take locks at lower levels checkPrecondFn := opts.CheckPrecondFn - opts.CheckPrecondFn = nil - results := make([]struct { - zIdx int - gr *GetObjectReader - err error - }, len(z.serverPools)) - - var wg sync.WaitGroup - for i, pool := range z.serverPools { - wg.Add(1) - go func(i int, pool *erasureSets) { - defer wg.Done() - results[i].zIdx = i - results[i].gr, results[i].err = pool.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts) - }(i, pool) - } - wg.Wait() - - // Sort the objInfos such that we always serve latest - // this is a defensive change to handle any duplicate - // content that may have been created, we always serve - // the latest object. - sort.Slice(results, func(i, j int) bool { - var mtimeI, mtimeJ time.Time - - if results[i].gr != nil { - mtimeI = results[i].gr.ObjInfo.ModTime - } - if results[j].gr != nil { - mtimeJ = results[j].gr.ObjInfo.ModTime - } - // On tiebreaks, choose the earliest. - if mtimeI.Equal(mtimeJ) { - return results[i].zIdx < results[j].zIdx - } - return mtimeI.After(mtimeJ) - }) - - var found = -1 - for i, res := range results { - if res.err == nil { - // Select earliest result - found = i - break - } - if !isErrObjectNotFound(res.err) && !isErrVersionNotFound(res.err) { - for _, res := range results { - res.gr.Close() + opts.CheckPrecondFn = nil // do not need to apply pre-conditions at lower layer. + opts.NoLock = true // no locks needed at lower levels for getObjectInfo() + objInfo, zIdx, err := z.getLatestObjectInfoWithIdx(ctx, bucket, object, opts) + if err != nil { + if objInfo.DeleteMarker { + if opts.VersionID == "" { + return &GetObjectReader{ + ObjInfo: objInfo, + }, toObjectErr(errFileNotFound, bucket, object) } - return nil, res.err + // Make sure to return object info to provide extra information. + return &GetObjectReader{ + ObjInfo: objInfo, + }, toObjectErr(errMethodNotAllowed, bucket, object) } + return nil, err } - if found < 0 { - object = decodeDirObject(object) - if opts.VersionID != "" { - return gr, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID} - } - return gr, ObjectNotFound{Bucket: bucket, Object: object} - } - - // Check preconditions. - if checkPrecondFn != nil && checkPrecondFn(results[found].gr.ObjInfo) { - for _, res := range results { - res.gr.Close() - } + // check preconditions before reading the stream. + if checkPrecondFn != nil && checkPrecondFn(objInfo) { return nil, PreConditionFailed{} } - // Close all others - for i, res := range results { - if i == found { - continue - } - res.gr.Close() - } - - return results[found].gr, nil + lockType = noLock // do not take locks at lower levels for GetObjectNInfo() + return z.serverPools[zIdx].GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts) } -func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { - if err = checkGetObjArgs(ctx, bucket, object); err != nil { - return objInfo, err - } - +// getLatestObjectInfoWithIdx returns the objectInfo of the latest object from multiple pools (this function +// is present in-case there were duplicate writes to both pools, this function also returns the +// additional index where the latest object exists, that is used to start the GetObject stream. +func (z *erasureServerPools) getLatestObjectInfoWithIdx(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, int, error) { object = encodeDirObject(object) - - if z.SinglePool() { - return z.serverPools[0].GetObjectInfo(ctx, bucket, object, opts) - } - - // Lock the object before reading. - lk := z.NewNSLock(bucket, object) - lkctx, err := lk.GetRLock(ctx, globalOperationTimeout) - if err != nil { - return ObjectInfo{}, err - } - ctx = lkctx.Context() - defer lk.RUnlock(lkctx.Cancel) - results := make([]struct { zIdx int oi ObjectInfo err error }, len(z.serverPools)) - opts.NoLock = true // avoid taking locks at lower levels for multi-pool setups. var wg sync.WaitGroup for i, pool := range z.serverPools { wg.Add(1) @@ -748,24 +678,52 @@ func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object s } return a.oi.ModTime.After(b.oi.ModTime) }) + for _, res := range results { - // Return first found. err := res.err if err == nil { - return res.oi, nil + return res.oi, res.zIdx, nil } if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { // some errors such as MethodNotAllowed for delete marker // should be returned upwards. - return res.oi, err + return res.oi, res.zIdx, err } } object = decodeDirObject(object) if opts.VersionID != "" { - return objInfo, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID} + return ObjectInfo{}, -1, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID} } - return objInfo, ObjectNotFound{Bucket: bucket, Object: object} + return ObjectInfo{}, -1, ObjectNotFound{Bucket: bucket, Object: object} +} + +func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { + if err = checkGetObjArgs(ctx, bucket, object); err != nil { + return objInfo, err + } + + object = encodeDirObject(object) + + if z.SinglePool() { + return z.serverPools[0].GetObjectInfo(ctx, bucket, object, opts) + } + + if !opts.NoLock { + opts.NoLock = true // avoid taking locks at lower levels for multi-pool setups. + + // Lock the object before reading. + lk := z.NewNSLock(bucket, object) + lkctx, err := lk.GetRLock(ctx, globalOperationTimeout) + if err != nil { + return ObjectInfo{}, err + } + ctx = lkctx.Context() + defer lk.RUnlock(lkctx.Cancel) + } + + objInfo, _, err = z.getLatestObjectInfoWithIdx(ctx, bucket, object, opts) + return objInfo, err } // PutObject - writes an object to least used erasure pool. diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 8e7e86fac..8a855469b 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -806,7 +806,7 @@ func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string, return nil, err } - return objReaderFn(reader, h, opts.CheckPrecondFn, closeFn, rwPoolUnlocker, nsUnlocker) + return objReaderFn(reader, h, closeFn, rwPoolUnlocker, nsUnlocker) } // Create a new fs.json file, if the existing one is corrupt. Should happen very rarely. diff --git a/cmd/gateway/s3/gateway-s3-sse.go b/cmd/gateway/s3/gateway-s3-sse.go index a16a8f80f..e0d4048c7 100644 --- a/cmd/gateway/s3/gateway-s3-sse.go +++ b/cmd/gateway/s3/gateway-s3-sse.go @@ -345,7 +345,7 @@ func (l *s3EncObjects) GetObjectNInfo(ctx context.Context, bucket, object string // Setup cleanup function to cause the above go-routine to // exit in case of partial read pipeCloser := func() { pr.Close() } - return fn(pr, h, o.CheckPrecondFn, pipeCloser) + return fn(pr, h, pipeCloser) } // GetObjectInfo reads object info and replies back ObjectInfo diff --git a/cmd/gateway/s3/gateway-s3.go b/cmd/gateway/s3/gateway-s3.go index 8e3065fdf..493551e8d 100644 --- a/cmd/gateway/s3/gateway-s3.go +++ b/cmd/gateway/s3/gateway-s3.go @@ -404,7 +404,7 @@ func (l *s3Objects) GetObjectNInfo(ctx context.Context, bucket, object string, r // Setup cleanup function to cause the above go-routine to // exit in case of partial read pipeCloser := func() { pr.Close() } - return fn(pr, h, opts.CheckPrecondFn, pipeCloser) + return fn(pr, h, pipeCloser) } // GetObject reads an object from S3. Supports additional diff --git a/cmd/object-api-utils.go b/cmd/object-api-utils.go index 4478976b0..4929a0f54 100644 --- a/cmd/object-api-utils.go +++ b/cmd/object-api-utils.go @@ -582,7 +582,7 @@ func NewGetObjectReaderFromReader(r io.Reader, oi ObjectInfo, opts ObjectOptions // GetObjectReader and an error. Request headers are passed to provide // encryption parameters. cleanupFns allow cleanup funcs to be // registered for calling after usage of the reader. -type ObjReaderFn func(inputReader io.Reader, h http.Header, pcfn CheckPreconditionFn, cleanupFns ...func()) (r *GetObjectReader, err error) +type ObjReaderFn func(inputReader io.Reader, h http.Header, cleanupFns ...func()) (r *GetObjectReader, err error) // NewGetObjectReader creates a new GetObjectReader. The cleanUpFns // are called on Close() in FIFO order as passed in ObjReadFn(). NOTE: It is @@ -591,6 +591,10 @@ type ObjReaderFn func(inputReader io.Reader, h http.Header, pcfn CheckPreconditi func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions) ( fn ObjReaderFn, off, length int64, err error) { + if opts.CheckPrecondFn != nil && opts.CheckPrecondFn(oi) { + return nil, 0, 0, PreConditionFailed{} + } + if rs == nil && opts.PartNumber > 0 { rs = partNumberToRangeSpec(oi, opts.PartNumber) } @@ -648,21 +652,15 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions) ( return nil, 0, 0, errInvalidRange } } - fn = func(inputReader io.Reader, h http.Header, pcfn CheckPreconditionFn, cFns ...func()) (r *GetObjectReader, err error) { - if opts.CheckPrecondFn != nil && opts.CheckPrecondFn(oi) { - for _, cFn := range cFns { - cFn() - } - return nil, PreConditionFailed{} - } + fn = func(inputReader io.Reader, h http.Header, cFns ...func()) (r *GetObjectReader, err error) { if isEncrypted { copySource := h.Get(xhttp.AmzServerSideEncryptionCopyCustomerAlgorithm) != "" // Attach decrypter on inputReader inputReader, err = DecryptBlocksRequestR(inputReader, h, 0, firstPart, oi, copySource) if err != nil { // Call the cleanup funcs - for _, cFn := range cFns { - cFn() + for i := len(cFns) - 1; i >= 0; i-- { + cFns[i]() } return nil, err } @@ -674,8 +672,8 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions) ( if decOff > 0 { if err = s2Reader.Skip(decOff); err != nil { // Call the cleanup funcs - for _, cFn := range cFns { - cFn() + for i := len(cFns) - 1; i >= 0; i-- { + cFns[i]() } return nil, err } @@ -727,7 +725,7 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions) ( // a reader that returns the desired range of // encrypted bytes. The header parameter is used to // provide encryption parameters. - fn = func(inputReader io.Reader, h http.Header, pcfn CheckPreconditionFn, cFns ...func()) (r *GetObjectReader, err error) { + fn = func(inputReader io.Reader, h http.Header, cFns ...func()) (r *GetObjectReader, err error) { copySource := h.Get(xhttp.AmzServerSideEncryptionCopyCustomerAlgorithm) != "" // Attach decrypter on inputReader @@ -741,14 +739,6 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions) ( return nil, err } - if opts.CheckPrecondFn != nil && opts.CheckPrecondFn(oi) { - // Call the cleanup funcs - for i := len(cFns) - 1; i >= 0; i-- { - cFns[i]() - } - return nil, PreConditionFailed{} - } - oi.ETag = getDecryptedETag(h, oi, false) // Apply the skipLen and limit on the @@ -770,14 +760,7 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions) ( if err != nil { return nil, 0, 0, err } - fn = func(inputReader io.Reader, _ http.Header, pcfn CheckPreconditionFn, cFns ...func()) (r *GetObjectReader, err error) { - if opts.CheckPrecondFn != nil && opts.CheckPrecondFn(oi) { - // Call the cleanup funcs - for i := len(cFns) - 1; i >= 0; i-- { - cFns[i]() - } - return nil, PreConditionFailed{} - } + fn = func(inputReader io.Reader, _ http.Header, cFns ...func()) (r *GetObjectReader, err error) { r = &GetObjectReader{ ObjInfo: oi, Reader: inputReader,