fix: multi-pool setup make sure acquire locks properly (#13280)

This was a regression introduced in '14bb969782'
this has the potential to cause corruption when
there are concurrent overwrites attempting to update
the content on the namespace.

This PR adds a situation where PutObject(), CopyObject()
compete properly for the same locks with NewMultipartUpload()
however it ends up turning off competing locks for the actual
object with GetObject() and DeleteObject() - since they do not
compete due to concurrent I/O on a versioned bucket it can lead
to loss of versions.

This PR fixes this bug with multi-pool setup with replication
that causes corruption of inlined data due to lack of competing
locks in a multi-pool setup.

Instead CompleteMultipartUpload holds the necessary
locks when finishing the transaction, knowing the exact
location of an object to schedule the multipart upload
doesn't need to compete in this manner, a pool id location
for existing object.
This commit is contained in:
Harshavardhana 2021-09-22 21:46:24 -07:00 committed by GitHub
parent f9b104f37b
commit 200caab82b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 47 additions and 34 deletions

View File

@ -443,7 +443,7 @@ func (er erasureObjects) getObjectInfo(ctx context.Context, bucket, object strin
} }
objInfo = fi.ToObjectInfo(bucket, object) objInfo = fi.ToObjectInfo(bucket, object)
if !fi.VersionPurgeStatus().Empty() && opts.VersionID != "" { if opts.VersionID != "" && !fi.VersionPurgeStatus().Empty() {
// Make sure to return object info to provide extra information. // Make sure to return object info to provide extra information.
return objInfo, toObjectErr(errMethodNotAllowed, bucket, object) return objInfo, toObjectErr(errMethodNotAllowed, bucket, object)
} }

View File

@ -339,6 +339,22 @@ func (z *erasureServerPools) getPoolIdxExisting(ctx context.Context, bucket, obj
return z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{}) return z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{})
} }
func (z *erasureServerPools) getPoolIdxNoLock(ctx context.Context, bucket, object string, size int64) (idx int, err error) {
idx, err = z.getPoolIdxExistingNoLock(ctx, bucket, object)
if err != nil && !isErrObjectNotFound(err) {
return idx, err
}
if isErrObjectNotFound(err) {
idx = z.getAvailablePoolIdx(ctx, bucket, object, size)
if idx < 0 {
return -1, toObjectErr(errDiskFull)
}
}
return idx, nil
}
// getPoolIdx returns the found previous object and its corresponding pool idx, // getPoolIdx returns the found previous object and its corresponding pool idx,
// if none are found falls back to most available space pool. // if none are found falls back to most available space pool.
func (z *erasureServerPools) getPoolIdx(ctx context.Context, bucket, object string, size int64) (idx int, err error) { func (z *erasureServerPools) getPoolIdx(ctx context.Context, bucket, object string, size int64) (idx int, err error) {
@ -791,7 +807,7 @@ func (z *erasureServerPools) PutObject(ctx context.Context, bucket string, objec
return z.serverPools[0].PutObject(ctx, bucket, object, data, opts) return z.serverPools[0].PutObject(ctx, bucket, object, data, opts)
} }
if !opts.NoLock { if !opts.NoLock {
ns := z.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object, "newMultipartObject.lck")) ns := z.NewNSLock(bucket, object)
lkctx, err := ns.GetLock(ctx, globalOperationTimeout) lkctx, err := ns.GetLock(ctx, globalOperationTimeout)
if err != nil { if err != nil {
return ObjectInfo{}, err return ObjectInfo{}, err
@ -801,7 +817,7 @@ func (z *erasureServerPools) PutObject(ctx context.Context, bucket string, objec
opts.NoLock = true opts.NoLock = true
} }
idx, err := z.getPoolIdx(ctx, bucket, object, data.Size()) idx, err := z.getPoolIdxNoLock(ctx, bucket, object, data.Size())
if err != nil { if err != nil {
return ObjectInfo{}, err return ObjectInfo{}, err
} }
@ -939,7 +955,7 @@ func (z *erasureServerPools) CopyObject(ctx context.Context, srcBucket, srcObjec
cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject)) cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))
if !dstOpts.NoLock { if !dstOpts.NoLock {
ns := z.NewNSLock(minioMetaMultipartBucket, pathJoin(dstBucket, dstObject, "newMultipartObject.lck")) ns := z.NewNSLock(dstBucket, dstObject)
lkctx, err := ns.GetLock(ctx, globalOperationTimeout) lkctx, err := ns.GetLock(ctx, globalOperationTimeout)
if err != nil { if err != nil {
return ObjectInfo{}, err return ObjectInfo{}, err
@ -949,7 +965,7 @@ func (z *erasureServerPools) CopyObject(ctx context.Context, srcBucket, srcObjec
dstOpts.NoLock = true dstOpts.NoLock = true
} }
poolIdx, err := z.getPoolIdx(ctx, dstBucket, dstObject, srcInfo.Size) poolIdx, err := z.getPoolIdxNoLock(ctx, dstBucket, dstObject, srcInfo.Size)
if err != nil { if err != nil {
return objInfo, err return objInfo, err
} }
@ -982,6 +998,7 @@ func (z *erasureServerPools) CopyObject(ctx context.Context, srcBucket, srcObjec
Versioned: dstOpts.Versioned, Versioned: dstOpts.Versioned,
VersionID: dstOpts.VersionID, VersionID: dstOpts.VersionID,
MTime: dstOpts.MTime, MTime: dstOpts.MTime,
NoLock: true,
} }
return z.serverPools[poolIdx].PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts) return z.serverPools[poolIdx].PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts)
@ -1162,14 +1179,6 @@ func (z *erasureServerPools) NewMultipartUpload(ctx context.Context, bucket, obj
return z.serverPools[0].NewMultipartUpload(ctx, bucket, object, opts) return z.serverPools[0].NewMultipartUpload(ctx, bucket, object, opts)
} }
ns := z.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object, "newMultipartObject.lck"))
lkctx, err := ns.GetLock(ctx, globalOperationTimeout)
if err != nil {
return "", err
}
ctx = lkctx.Context()
defer ns.Unlock(lkctx.Cancel)
for idx, pool := range z.serverPools { for idx, pool := range z.serverPools {
result, err := pool.ListMultipartUploads(ctx, bucket, object, "", "", "", maxUploadsList) result, err := pool.ListMultipartUploads(ctx, bucket, object, "", "", "", maxUploadsList)
if err != nil { if err != nil {
@ -1183,6 +1192,8 @@ func (z *erasureServerPools) NewMultipartUpload(ctx context.Context, bucket, obj
} }
} }
// any parallel writes on the object will block for this poolIdx
// to return since this holds a read lock on the namespace.
idx, err := z.getPoolIdx(ctx, bucket, object, -1) idx, err := z.getPoolIdx(ctx, bucket, object, -1)
if err != nil { if err != nil {
return "", err return "", err

View File

@ -1492,9 +1492,9 @@ func readXLMetaNoData(r io.Reader, size int64) ([]byte, error) {
buf = append(buf, make([]byte, extra)...) buf = append(buf, make([]byte, extra)...)
_, err := io.ReadFull(r, buf[has:]) _, err := io.ReadFull(r, buf[has:])
if err != nil { if err != nil {
if err == io.EOF { if errors.Is(err, io.EOF) {
// Returned if we read nothing. // Returned if we read nothing.
return io.ErrUnexpectedEOF return fmt.Errorf("readXLMetaNoData.readMore: %w", io.ErrUnexpectedEOF)
} }
return fmt.Errorf("readXLMetaNoData.readMore: %w", err) return fmt.Errorf("readXLMetaNoData.readMore: %w", err)
} }

View File

@ -162,8 +162,6 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, lockLossCallback func(), i
// Create lock array to capture the successful lockers // Create lock array to capture the successful lockers
locks := make([]string, len(restClnts)) locks := make([]string, len(restClnts))
log("lockBlocking %s/%s for %#v: lockType readLock(%t), additional opts: %#v\n", id, source, dm.Names, isReadLock, opts)
// Add total timeout // Add total timeout
ctx, cancel := context.WithTimeout(ctx, opts.Timeout) ctx, cancel := context.WithTimeout(ctx, opts.Timeout)
defer cancel() defer cancel()
@ -183,6 +181,8 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, lockLossCallback func(), i
} }
} }
log("lockBlocking %s/%s for %#v: lockType readLock(%t), additional opts: %#v, quorum: %d, tolerance: %d, lockClients: %d\n", id, source, dm.Names, isReadLock, opts, quorum, tolerance, len(restClnts))
tolerance = len(restClnts) - quorum tolerance = len(restClnts) - quorum
for { for {
@ -259,15 +259,16 @@ func forceUnlock(ctx context.Context, ds *Dsync, id string) {
restClnts, _ := ds.GetLockers() restClnts, _ := ds.GetLockers()
args := LockArgs{
UID: id,
}
var wg sync.WaitGroup var wg sync.WaitGroup
for index, c := range restClnts { for index, c := range restClnts {
wg.Add(1) wg.Add(1)
// Send refresh request to all nodes // Send refresh request to all nodes
go func(index int, c NetLocker) { go func(index int, c NetLocker) {
defer wg.Done() defer wg.Done()
args := LockArgs{
UID: id,
}
c.ForceUnlock(ctx, args) c.ForceUnlock(ctx, args)
}(index, c) }(index, c)
} }
@ -286,6 +287,10 @@ func refresh(ctx context.Context, ds *Dsync, id, source string, quorum int) (boo
ch := make(chan refreshResult, len(restClnts)) ch := make(chan refreshResult, len(restClnts))
var wg sync.WaitGroup var wg sync.WaitGroup
args := LockArgs{
UID: id,
}
for index, c := range restClnts { for index, c := range restClnts {
wg.Add(1) wg.Add(1)
// Send refresh request to all nodes // Send refresh request to all nodes
@ -297,10 +302,6 @@ func refresh(ctx context.Context, ds *Dsync, id, source string, quorum int) (boo
return return
} }
args := LockArgs{
UID: id,
}
ctx, cancel := context.WithTimeout(ctx, drwMutexRefreshCallTimeout) ctx, cancel := context.WithTimeout(ctx, drwMutexRefreshCallTimeout)
defer cancel() defer cancel()
@ -382,6 +383,14 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
ch := make(chan Granted, len(restClnts)) ch := make(chan Granted, len(restClnts))
var wg sync.WaitGroup var wg sync.WaitGroup
args := LockArgs{
Owner: owner,
UID: id,
Resources: lockNames,
Source: source,
Quorum: quorum,
}
// Combined timeout for the lock attempt. // Combined timeout for the lock attempt.
ctx, cancel := context.WithTimeout(ctx, drwMutexAcquireTimeout) ctx, cancel := context.WithTimeout(ctx, drwMutexAcquireTimeout)
defer cancel() defer cancel()
@ -398,14 +407,6 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
return return
} }
args := LockArgs{
Owner: owner,
UID: id,
Resources: lockNames,
Source: source,
Quorum: quorum,
}
var locked bool var locked bool
var err error var err error
if isReadLock { if isReadLock {
@ -465,9 +466,10 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
quorumLocked := checkQuorumLocked(locks, quorum) && locksFailed <= tolerance quorumLocked := checkQuorumLocked(locks, quorum) && locksFailed <= tolerance
if !quorumLocked { if !quorumLocked {
log("Releasing all acquired locks now abandoned after quorum was not met\n") log("dsync: Unable to acquire lock in quorum %#v\n", args)
// Release all acquired locks without quorum.
if !releaseAll(ds, tolerance, owner, locks, isReadLock, restClnts, lockNames...) { if !releaseAll(ds, tolerance, owner, locks, isReadLock, restClnts, lockNames...) {
log("Unable to release acquired locks, stale locks might be present\n") log("Unable to release acquired locks, these locks will expire automatically %#v\n", args)
} }
} }