From 200caab82b622a0ec6c9776ee9b555aedbc9b320 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Wed, 22 Sep 2021 21:46:24 -0700 Subject: [PATCH] fix: multi-pool setup make sure acquire locks properly (#13280) This was a regression introduced in '14bb969782' this has the potential to cause corruption when there are concurrent overwrites attempting to update the content on the namespace. This PR adds a situation where PutObject(), CopyObject() compete properly for the same locks with NewMultipartUpload() however it ends up turning off competing locks for the actual object with GetObject() and DeleteObject() - since they do not compete due to concurrent I/O on a versioned bucket it can lead to loss of versions. This PR fixes this bug with multi-pool setup with replication that causes corruption of inlined data due to lack of competing locks in a multi-pool setup. Instead CompleteMultipartUpload holds the necessary locks when finishing the transaction, knowing the exact location of an object to schedule the multipart upload doesn't need to compete in this manner, a pool id location for existing object. --- cmd/erasure-object.go | 2 +- cmd/erasure-server-pool.go | 35 +++++++++++++++++++++----------- cmd/xl-storage-format-v2.go | 4 ++-- internal/dsync/drwmutex.go | 40 +++++++++++++++++++------------------ 4 files changed, 47 insertions(+), 34 deletions(-) diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index c8af4aa97..61abb6438 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -443,7 +443,7 @@ func (er erasureObjects) getObjectInfo(ctx context.Context, bucket, object strin } objInfo = fi.ToObjectInfo(bucket, object) - if !fi.VersionPurgeStatus().Empty() && opts.VersionID != "" { + if opts.VersionID != "" && !fi.VersionPurgeStatus().Empty() { // Make sure to return object info to provide extra information. return objInfo, toObjectErr(errMethodNotAllowed, bucket, object) } diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index d69f591a8..e5f9d8c71 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -339,6 +339,22 @@ func (z *erasureServerPools) getPoolIdxExisting(ctx context.Context, bucket, obj return z.getPoolIdxExistingWithOpts(ctx, bucket, object, ObjectOptions{}) } +func (z *erasureServerPools) getPoolIdxNoLock(ctx context.Context, bucket, object string, size int64) (idx int, err error) { + idx, err = z.getPoolIdxExistingNoLock(ctx, bucket, object) + if err != nil && !isErrObjectNotFound(err) { + return idx, err + } + + if isErrObjectNotFound(err) { + idx = z.getAvailablePoolIdx(ctx, bucket, object, size) + if idx < 0 { + return -1, toObjectErr(errDiskFull) + } + } + + return idx, nil +} + // getPoolIdx returns the found previous object and its corresponding pool idx, // if none are found falls back to most available space pool. func (z *erasureServerPools) getPoolIdx(ctx context.Context, bucket, object string, size int64) (idx int, err error) { @@ -791,7 +807,7 @@ func (z *erasureServerPools) PutObject(ctx context.Context, bucket string, objec return z.serverPools[0].PutObject(ctx, bucket, object, data, opts) } if !opts.NoLock { - ns := z.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object, "newMultipartObject.lck")) + ns := z.NewNSLock(bucket, object) lkctx, err := ns.GetLock(ctx, globalOperationTimeout) if err != nil { return ObjectInfo{}, err @@ -801,7 +817,7 @@ func (z *erasureServerPools) PutObject(ctx context.Context, bucket string, objec opts.NoLock = true } - idx, err := z.getPoolIdx(ctx, bucket, object, data.Size()) + idx, err := z.getPoolIdxNoLock(ctx, bucket, object, data.Size()) if err != nil { return ObjectInfo{}, err } @@ -939,7 +955,7 @@ func (z *erasureServerPools) CopyObject(ctx context.Context, srcBucket, srcObjec cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject)) if !dstOpts.NoLock { - ns := z.NewNSLock(minioMetaMultipartBucket, pathJoin(dstBucket, dstObject, "newMultipartObject.lck")) + ns := z.NewNSLock(dstBucket, dstObject) lkctx, err := ns.GetLock(ctx, globalOperationTimeout) if err != nil { return ObjectInfo{}, err @@ -949,7 +965,7 @@ func (z *erasureServerPools) CopyObject(ctx context.Context, srcBucket, srcObjec dstOpts.NoLock = true } - poolIdx, err := z.getPoolIdx(ctx, dstBucket, dstObject, srcInfo.Size) + poolIdx, err := z.getPoolIdxNoLock(ctx, dstBucket, dstObject, srcInfo.Size) if err != nil { return objInfo, err } @@ -982,6 +998,7 @@ func (z *erasureServerPools) CopyObject(ctx context.Context, srcBucket, srcObjec Versioned: dstOpts.Versioned, VersionID: dstOpts.VersionID, MTime: dstOpts.MTime, + NoLock: true, } return z.serverPools[poolIdx].PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts) @@ -1162,14 +1179,6 @@ func (z *erasureServerPools) NewMultipartUpload(ctx context.Context, bucket, obj return z.serverPools[0].NewMultipartUpload(ctx, bucket, object, opts) } - ns := z.NewNSLock(minioMetaMultipartBucket, pathJoin(bucket, object, "newMultipartObject.lck")) - lkctx, err := ns.GetLock(ctx, globalOperationTimeout) - if err != nil { - return "", err - } - ctx = lkctx.Context() - defer ns.Unlock(lkctx.Cancel) - for idx, pool := range z.serverPools { result, err := pool.ListMultipartUploads(ctx, bucket, object, "", "", "", maxUploadsList) if err != nil { @@ -1183,6 +1192,8 @@ func (z *erasureServerPools) NewMultipartUpload(ctx context.Context, bucket, obj } } + // any parallel writes on the object will block for this poolIdx + // to return since this holds a read lock on the namespace. idx, err := z.getPoolIdx(ctx, bucket, object, -1) if err != nil { return "", err diff --git a/cmd/xl-storage-format-v2.go b/cmd/xl-storage-format-v2.go index 4defc9bcb..78ab0187d 100644 --- a/cmd/xl-storage-format-v2.go +++ b/cmd/xl-storage-format-v2.go @@ -1492,9 +1492,9 @@ func readXLMetaNoData(r io.Reader, size int64) ([]byte, error) { buf = append(buf, make([]byte, extra)...) _, err := io.ReadFull(r, buf[has:]) if err != nil { - if err == io.EOF { + if errors.Is(err, io.EOF) { // Returned if we read nothing. - return io.ErrUnexpectedEOF + return fmt.Errorf("readXLMetaNoData.readMore: %w", io.ErrUnexpectedEOF) } return fmt.Errorf("readXLMetaNoData.readMore: %w", err) } diff --git a/internal/dsync/drwmutex.go b/internal/dsync/drwmutex.go index fb3ff5764..1ab507846 100644 --- a/internal/dsync/drwmutex.go +++ b/internal/dsync/drwmutex.go @@ -162,8 +162,6 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, lockLossCallback func(), i // Create lock array to capture the successful lockers locks := make([]string, len(restClnts)) - log("lockBlocking %s/%s for %#v: lockType readLock(%t), additional opts: %#v\n", id, source, dm.Names, isReadLock, opts) - // Add total timeout ctx, cancel := context.WithTimeout(ctx, opts.Timeout) defer cancel() @@ -183,6 +181,8 @@ func (dm *DRWMutex) lockBlocking(ctx context.Context, lockLossCallback func(), i } } + log("lockBlocking %s/%s for %#v: lockType readLock(%t), additional opts: %#v, quorum: %d, tolerance: %d, lockClients: %d\n", id, source, dm.Names, isReadLock, opts, quorum, tolerance, len(restClnts)) + tolerance = len(restClnts) - quorum for { @@ -259,15 +259,16 @@ func forceUnlock(ctx context.Context, ds *Dsync, id string) { restClnts, _ := ds.GetLockers() + args := LockArgs{ + UID: id, + } + var wg sync.WaitGroup for index, c := range restClnts { wg.Add(1) // Send refresh request to all nodes go func(index int, c NetLocker) { defer wg.Done() - args := LockArgs{ - UID: id, - } c.ForceUnlock(ctx, args) }(index, c) } @@ -286,6 +287,10 @@ func refresh(ctx context.Context, ds *Dsync, id, source string, quorum int) (boo ch := make(chan refreshResult, len(restClnts)) var wg sync.WaitGroup + args := LockArgs{ + UID: id, + } + for index, c := range restClnts { wg.Add(1) // Send refresh request to all nodes @@ -297,10 +302,6 @@ func refresh(ctx context.Context, ds *Dsync, id, source string, quorum int) (boo return } - args := LockArgs{ - UID: id, - } - ctx, cancel := context.WithTimeout(ctx, drwMutexRefreshCallTimeout) defer cancel() @@ -382,6 +383,14 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is ch := make(chan Granted, len(restClnts)) var wg sync.WaitGroup + args := LockArgs{ + Owner: owner, + UID: id, + Resources: lockNames, + Source: source, + Quorum: quorum, + } + // Combined timeout for the lock attempt. ctx, cancel := context.WithTimeout(ctx, drwMutexAcquireTimeout) defer cancel() @@ -398,14 +407,6 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is return } - args := LockArgs{ - Owner: owner, - UID: id, - Resources: lockNames, - Source: source, - Quorum: quorum, - } - var locked bool var err error if isReadLock { @@ -465,9 +466,10 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is quorumLocked := checkQuorumLocked(locks, quorum) && locksFailed <= tolerance if !quorumLocked { - log("Releasing all acquired locks now abandoned after quorum was not met\n") + log("dsync: Unable to acquire lock in quorum %#v\n", args) + // Release all acquired locks without quorum. if !releaseAll(ds, tolerance, owner, locks, isReadLock, restClnts, lockNames...) { - log("Unable to release acquired locks, stale locks might be present\n") + log("Unable to release acquired locks, these locks will expire automatically %#v\n", args) } }