fix: serve always only the latest objects (#12487)

due to a historic bug, it is possible that
some objects might exist on multiple pools,
rely on ModTime to return the correct pool.
This commit is contained in:
Harshavardhana 2021-06-10 23:07:16 -07:00 committed by GitHub
parent b154581b65
commit 0385ecbf34
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 81 additions and 11 deletions

View File

@ -234,6 +234,13 @@ func (z *erasureServerPools) getServerPoolsAvailableSpace(ctx context.Context, b
return serverPools return serverPools
} }
// poolObjInfo represents the state of an object per pool
type poolObjInfo struct {
PoolIndex int
ObjInfo ObjectInfo
Err error
}
// getPoolIdxExisting returns the (first) found object pool index containing an object. // getPoolIdxExisting returns the (first) found object pool index containing an object.
// If the object exists, but the latest version is a delete marker, the index with it is still returned. // If the object exists, but the latest version is a delete marker, the index with it is still returned.
// If the object does not exist ObjectNotFound error is returned. // If the object does not exist ObjectNotFound error is returned.
@ -244,35 +251,49 @@ func (z *erasureServerPools) getPoolIdxExisting(ctx context.Context, bucket, obj
return 0, nil return 0, nil
} }
errs := make([]error, len(z.serverPools)) poolObjInfos := make([]poolObjInfo, len(z.serverPools))
objInfos := make([]ObjectInfo, len(z.serverPools))
var wg sync.WaitGroup var wg sync.WaitGroup
for i, pool := range z.serverPools { for i, pool := range z.serverPools {
wg.Add(1) wg.Add(1)
go func(i int, pool *erasureSets) { go func(i int, pool *erasureSets) {
defer wg.Done() defer wg.Done()
objInfos[i], errs[i] = pool.GetObjectInfo(ctx, bucket, object, ObjectOptions{}) // remember the pool index, we may sort the slice original index might be lost.
pinfo := poolObjInfo{
PoolIndex: i,
}
pinfo.ObjInfo, pinfo.Err = pool.GetObjectInfo(ctx, bucket, object, ObjectOptions{})
poolObjInfos[i] = pinfo
}(i, pool) }(i, pool)
} }
wg.Wait() wg.Wait()
for i, err := range errs { // Sort the objInfos such that we always serve latest
if err != nil && !isErrObjectNotFound(err) { // this is a defensive change to handle any duplicate
return -1, err // content that may have been created, we always serve
// the latest object.
sort.Slice(poolObjInfos, func(i, j int) bool {
mtime1 := poolObjInfos[i].ObjInfo.ModTime
mtime2 := poolObjInfos[j].ObjInfo.ModTime
return mtime1.After(mtime2)
})
for _, pinfo := range poolObjInfos {
if pinfo.Err != nil && !isErrObjectNotFound(pinfo.Err) {
return -1, pinfo.Err
} }
if isErrObjectNotFound(err) { if isErrObjectNotFound(pinfo.Err) {
// No object exists or its a delete marker, // No object exists or its a delete marker,
// check objInfo to confirm. // check objInfo to confirm.
if objInfos[i].DeleteMarker && objInfos[i].Name != "" { if pinfo.ObjInfo.DeleteMarker && pinfo.ObjInfo.Name != "" {
return i, nil return pinfo.PoolIndex, nil
} }
// objInfo is not valid, truly the object doesn't // objInfo is not valid, truly the object doesn't
// exist proceed to next pool. // exist proceed to next pool.
continue continue
} }
return i, nil return pinfo.PoolIndex, nil
} }
return -1, toObjectErr(errFileNotFound, bucket, object) return -1, toObjectErr(errFileNotFound, bucket, object)
@ -600,6 +621,9 @@ func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object
grs := make([]*GetObjectReader, len(z.serverPools)) grs := make([]*GetObjectReader, len(z.serverPools))
lockType = noLock // do not take locks at lower levels lockType = noLock // do not take locks at lower levels
checkPrecondFn := opts.CheckPrecondFn
opts.CheckPrecondFn = nil
var wg sync.WaitGroup var wg sync.WaitGroup
for i, pool := range z.serverPools { for i, pool := range z.serverPools {
wg.Add(1) wg.Add(1)
@ -610,6 +634,26 @@ func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object
} }
wg.Wait() wg.Wait()
// Sort the objInfos such that we always serve latest
// this is a defensive change to handle any duplicate
// content that may have been created, we always serve
// the latest object.
less := func(i, j int) bool {
var (
mtime1 time.Time
mtime2 time.Time
)
if grs[i] != nil {
mtime1 = grs[i].ObjInfo.ModTime
}
if grs[j] != nil {
mtime2 = grs[j].ObjInfo.ModTime
}
return mtime1.After(mtime2)
}
sort.Slice(errs, less)
sort.Slice(grs, less)
var found = -1 var found = -1
for i, err := range errs { for i, err := range errs {
if err == nil { if err == nil {
@ -627,6 +671,18 @@ func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object
} }
if found >= 0 { if found >= 0 {
if checkPrecondFn != nil && checkPrecondFn(grs[found].ObjInfo) {
for _, grr := range grs {
grr.Close()
}
return nil, PreConditionFailed{}
}
for i, grr := range grs {
if i == found {
continue
}
grr.Close()
}
return grs[found], nil return grs[found], nil
} }
@ -659,7 +715,6 @@ func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object s
errs := make([]error, len(z.serverPools)) errs := make([]error, len(z.serverPools))
objInfos := make([]ObjectInfo, len(z.serverPools)) objInfos := make([]ObjectInfo, len(z.serverPools))
opts.NoLock = true // avoid taking locks at lower levels for multi-pool setups. opts.NoLock = true // avoid taking locks at lower levels for multi-pool setups.
var wg sync.WaitGroup var wg sync.WaitGroup
for i, pool := range z.serverPools { for i, pool := range z.serverPools {
@ -671,6 +726,18 @@ func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object s
} }
wg.Wait() wg.Wait()
// Sort the objInfos such that we always serve latest
// this is a defensive change to handle any duplicate
// content that may have been created, we always serve
// the latest object.
less := func(i, j int) bool {
mtime1 := objInfos[i].ModTime
mtime2 := objInfos[j].ModTime
return mtime1.After(mtime2)
}
sort.Slice(errs, less)
sort.Slice(objInfos, less)
var found = -1 var found = -1
for i, err := range errs { for i, err := range errs {
if err == nil { if err == nil {

View File

@ -792,6 +792,9 @@ func NewGetObjectReader(rs *HTTPRangeSpec, oi ObjectInfo, opts ObjectOptions) (
// Close - calls the cleanup actions in reverse order // Close - calls the cleanup actions in reverse order
func (g *GetObjectReader) Close() error { func (g *GetObjectReader) Close() error {
if g == nil {
return nil
}
// sync.Once is used here to ensure that Close() is // sync.Once is used here to ensure that Close() is
// idempotent. // idempotent.
g.once.Do(func() { g.once.Do(func() {