fix: apply pre-conditions first on object metadata (#12545)

This change in error flow complies with AWS S3 behavior
for applications depending on specific error conditions.

fixes #12543
This commit is contained in:
Harshavardhana
2021-06-24 09:44:00 -07:00
committed by GitHub
parent 9bf1ac0bb6
commit 41caf89cf4
9 changed files with 74 additions and 133 deletions

View File

@@ -617,114 +617,44 @@ func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object
unlockOnDefer = true
}
lockType = noLock // do not take locks at lower levels
checkPrecondFn := opts.CheckPrecondFn
opts.CheckPrecondFn = nil
results := make([]struct {
zIdx int
gr *GetObjectReader
err error
}, len(z.serverPools))
var wg sync.WaitGroup
for i, pool := range z.serverPools {
wg.Add(1)
go func(i int, pool *erasureSets) {
defer wg.Done()
results[i].zIdx = i
results[i].gr, results[i].err = pool.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
}(i, pool)
}
wg.Wait()
// Sort the objInfos such that we always serve latest
// this is a defensive change to handle any duplicate
// content that may have been created, we always serve
// the latest object.
sort.Slice(results, func(i, j int) bool {
var mtimeI, mtimeJ time.Time
if results[i].gr != nil {
mtimeI = results[i].gr.ObjInfo.ModTime
}
if results[j].gr != nil {
mtimeJ = results[j].gr.ObjInfo.ModTime
}
// On tiebreaks, choose the earliest.
if mtimeI.Equal(mtimeJ) {
return results[i].zIdx < results[j].zIdx
}
return mtimeI.After(mtimeJ)
})
var found = -1
for i, res := range results {
if res.err == nil {
// Select earliest result
found = i
break
}
if !isErrObjectNotFound(res.err) && !isErrVersionNotFound(res.err) {
for _, res := range results {
res.gr.Close()
opts.CheckPrecondFn = nil // do not need to apply pre-conditions at lower layer.
opts.NoLock = true // no locks needed at lower levels for getObjectInfo()
objInfo, zIdx, err := z.getLatestObjectInfoWithIdx(ctx, bucket, object, opts)
if err != nil {
if objInfo.DeleteMarker {
if opts.VersionID == "" {
return &GetObjectReader{
ObjInfo: objInfo,
}, toObjectErr(errFileNotFound, bucket, object)
}
return nil, res.err
// Make sure to return object info to provide extra information.
return &GetObjectReader{
ObjInfo: objInfo,
}, toObjectErr(errMethodNotAllowed, bucket, object)
}
return nil, err
}
if found < 0 {
object = decodeDirObject(object)
if opts.VersionID != "" {
return gr, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID}
}
return gr, ObjectNotFound{Bucket: bucket, Object: object}
}
// Check preconditions.
if checkPrecondFn != nil && checkPrecondFn(results[found].gr.ObjInfo) {
for _, res := range results {
res.gr.Close()
}
// check preconditions before reading the stream.
if checkPrecondFn != nil && checkPrecondFn(objInfo) {
return nil, PreConditionFailed{}
}
// Close all others
for i, res := range results {
if i == found {
continue
}
res.gr.Close()
}
return results[found].gr, nil
lockType = noLock // do not take locks at lower levels for GetObjectNInfo()
return z.serverPools[zIdx].GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
}
func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
if err = checkGetObjArgs(ctx, bucket, object); err != nil {
return objInfo, err
}
// getLatestObjectInfoWithIdx returns the objectInfo of the latest object from multiple pools (this function
// is present in-case there were duplicate writes to both pools, this function also returns the
// additional index where the latest object exists, that is used to start the GetObject stream.
func (z *erasureServerPools) getLatestObjectInfoWithIdx(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, int, error) {
object = encodeDirObject(object)
if z.SinglePool() {
return z.serverPools[0].GetObjectInfo(ctx, bucket, object, opts)
}
// Lock the object before reading.
lk := z.NewNSLock(bucket, object)
lkctx, err := lk.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return ObjectInfo{}, err
}
ctx = lkctx.Context()
defer lk.RUnlock(lkctx.Cancel)
results := make([]struct {
zIdx int
oi ObjectInfo
err error
}, len(z.serverPools))
opts.NoLock = true // avoid taking locks at lower levels for multi-pool setups.
var wg sync.WaitGroup
for i, pool := range z.serverPools {
wg.Add(1)
@@ -748,24 +678,52 @@ func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object s
}
return a.oi.ModTime.After(b.oi.ModTime)
})
for _, res := range results {
// Return first found.
err := res.err
if err == nil {
return res.oi, nil
return res.oi, res.zIdx, nil
}
if !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
// some errors such as MethodNotAllowed for delete marker
// should be returned upwards.
return res.oi, err
return res.oi, res.zIdx, err
}
}
object = decodeDirObject(object)
if opts.VersionID != "" {
return objInfo, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID}
return ObjectInfo{}, -1, VersionNotFound{Bucket: bucket, Object: object, VersionID: opts.VersionID}
}
return objInfo, ObjectNotFound{Bucket: bucket, Object: object}
return ObjectInfo{}, -1, ObjectNotFound{Bucket: bucket, Object: object}
}
func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
if err = checkGetObjArgs(ctx, bucket, object); err != nil {
return objInfo, err
}
object = encodeDirObject(object)
if z.SinglePool() {
return z.serverPools[0].GetObjectInfo(ctx, bucket, object, opts)
}
if !opts.NoLock {
opts.NoLock = true // avoid taking locks at lower levels for multi-pool setups.
// Lock the object before reading.
lk := z.NewNSLock(bucket, object)
lkctx, err := lk.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return ObjectInfo{}, err
}
ctx = lkctx.Context()
defer lk.RUnlock(lkctx.Cancel)
}
objInfo, _, err = z.getLatestObjectInfoWithIdx(ctx, bucket, object, opts)
return objInfo, err
}
// PutObject - writes an object to least used erasure pool.