mirror of
https://github.com/minio/minio.git
synced 2025-11-07 04:42:56 -05:00
Put object client disconnect (#7824)
Fail putObject and postpolicy in case client prematurely disconnects Use request's context to cancel lock requests on client disconnects
This commit is contained in:
committed by
kannappanr
parent
edbd8709ec
commit
338e9a9be9
@@ -498,6 +498,9 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
|
||||
|
||||
bucket := mux.Vars(r)["bucket"]
|
||||
|
||||
// To detect if the client has disconnected.
|
||||
r.Body = &detectDisconnect{r.Body, r.Context().Done()}
|
||||
|
||||
// Require Content-Length to be set in the request
|
||||
size := r.ContentLength
|
||||
if size < 0 {
|
||||
|
||||
@@ -2440,7 +2440,7 @@ func migrateConfigToMinioSys(objAPI ObjectLayer) (err error) {
|
||||
// As object layer's GetObject() and PutObject() take respective lock on minioMetaBucket
|
||||
// and configFile, take a transaction lock to avoid data race between readConfig()
|
||||
// and saveConfig().
|
||||
objLock := globalNSMutex.NewNSLock(minioMetaBucket, transactionConfigFile)
|
||||
objLock := globalNSMutex.NewNSLock(context.Background(), minioMetaBucket, transactionConfigFile)
|
||||
if err = objLock.GetLock(globalOperationTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -2492,7 +2492,7 @@ func migrateMinioSysConfig(objAPI ObjectLayer) error {
|
||||
// As object layer's GetObject() and PutObject() take respective lock on minioMetaBucket
|
||||
// and configFile, take a transaction lock to avoid data race between readConfig()
|
||||
// and saveConfig().
|
||||
objLock := globalNSMutex.NewNSLock(minioMetaBucket, transactionConfigFile)
|
||||
objLock := globalNSMutex.NewNSLock(context.Background(), minioMetaBucket, transactionConfigFile)
|
||||
if err := objLock.GetLock(globalOperationTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -55,7 +55,7 @@ func sweepRound(ctx context.Context, objAPI ObjectLayer) error {
|
||||
zeroDynamicTimeout := newDynamicTimeout(zeroDuration, zeroDuration)
|
||||
|
||||
// General lock so we avoid parallel daily sweep by different instances.
|
||||
sweepLock := globalNSMutex.NewNSLock("system", "daily-sweep")
|
||||
sweepLock := globalNSMutex.NewNSLock(ctx, "system", "daily-sweep")
|
||||
if err := sweepLock.GetLock(zeroDynamicTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -303,7 +303,7 @@ func (cfs *cacheFSObjects) PutObject(ctx context.Context, bucket string, object
|
||||
data := r.Reader
|
||||
fs := cfs.FSObjects
|
||||
// Lock the object.
|
||||
objectLock := fs.nsMutex.NewNSLock(bucket, object)
|
||||
objectLock := fs.nsMutex.NewNSLock(ctx, bucket, object)
|
||||
if err := objectLock.GetLock(globalObjectTimeout); err != nil {
|
||||
return objInfo, err
|
||||
}
|
||||
@@ -489,7 +489,7 @@ func (cfs *cacheFSObjects) NewMultipartUpload(ctx context.Context, bucket, objec
|
||||
// moveBucketToTrash clears cacheFSObjects of bucket contents and moves it to trash folder.
|
||||
func (cfs *cacheFSObjects) moveBucketToTrash(ctx context.Context, bucket string) (err error) {
|
||||
fs := cfs.FSObjects
|
||||
bucketLock := fs.nsMutex.NewNSLock(bucket, "")
|
||||
bucketLock := fs.nsMutex.NewNSLock(ctx, bucket, "")
|
||||
if err = bucketLock.GetLock(globalObjectTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -491,7 +491,7 @@ func formatXLGetDeploymentID(refFormat *formatXLV3, formats []*formatXLV3) (stri
|
||||
func formatXLFixDeploymentID(ctx context.Context, endpoints EndpointList, storageDisks []StorageAPI, refFormat *formatXLV3) (err error) {
|
||||
// Acquire lock on format.json
|
||||
mutex := newNSLock(globalIsDistXL)
|
||||
formatLock := mutex.NewNSLock(minioMetaBucket, formatConfigFile)
|
||||
formatLock := mutex.NewNSLock(ctx, minioMetaBucket, formatConfigFile)
|
||||
if err = formatLock.GetLock(globalHealingTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -634,7 +634,7 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
|
||||
}
|
||||
|
||||
// Hold write lock on the object.
|
||||
destLock := fs.nsMutex.NewNSLock(bucket, object)
|
||||
destLock := fs.nsMutex.NewNSLock(ctx, bucket, object)
|
||||
if err = destLock.GetLock(globalObjectTimeout); err != nil {
|
||||
return oi, err
|
||||
}
|
||||
|
||||
20
cmd/fs-v1.go
20
cmd/fs-v1.go
@@ -286,7 +286,7 @@ func (fs *FSObjects) statBucketDir(ctx context.Context, bucket string) (os.FileI
|
||||
// MakeBucketWithLocation - create a new bucket, returns if it
|
||||
// already exists.
|
||||
func (fs *FSObjects) MakeBucketWithLocation(ctx context.Context, bucket, location string) error {
|
||||
bucketLock := fs.nsMutex.NewNSLock(bucket, "")
|
||||
bucketLock := fs.nsMutex.NewNSLock(ctx, bucket, "")
|
||||
if err := bucketLock.GetLock(globalObjectTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -309,7 +309,7 @@ func (fs *FSObjects) MakeBucketWithLocation(ctx context.Context, bucket, locatio
|
||||
|
||||
// GetBucketInfo - fetch bucket metadata info.
|
||||
func (fs *FSObjects) GetBucketInfo(ctx context.Context, bucket string) (bi BucketInfo, e error) {
|
||||
bucketLock := fs.nsMutex.NewNSLock(bucket, "")
|
||||
bucketLock := fs.nsMutex.NewNSLock(ctx, bucket, "")
|
||||
if e := bucketLock.GetRLock(globalObjectTimeout); e != nil {
|
||||
return bi, e
|
||||
}
|
||||
@@ -372,7 +372,7 @@ func (fs *FSObjects) ListBuckets(ctx context.Context) ([]BucketInfo, error) {
|
||||
// DeleteBucket - delete a bucket and all the metadata associated
|
||||
// with the bucket including pending multipart, object metadata.
|
||||
func (fs *FSObjects) DeleteBucket(ctx context.Context, bucket string) error {
|
||||
bucketLock := fs.nsMutex.NewNSLock(bucket, "")
|
||||
bucketLock := fs.nsMutex.NewNSLock(ctx, bucket, "")
|
||||
if err := bucketLock.GetLock(globalObjectTimeout); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return err
|
||||
@@ -408,7 +408,7 @@ func (fs *FSObjects) DeleteBucket(ctx context.Context, bucket string) error {
|
||||
func (fs *FSObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (oi ObjectInfo, e error) {
|
||||
cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))
|
||||
if !cpSrcDstSame {
|
||||
objectDWLock := fs.nsMutex.NewNSLock(dstBucket, dstObject)
|
||||
objectDWLock := fs.nsMutex.NewNSLock(ctx, dstBucket, dstObject)
|
||||
if err := objectDWLock.GetLock(globalObjectTimeout); err != nil {
|
||||
return oi, err
|
||||
}
|
||||
@@ -480,7 +480,7 @@ func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string,
|
||||
|
||||
if lockType != noLock {
|
||||
// Lock the object before reading.
|
||||
lock := fs.nsMutex.NewNSLock(bucket, object)
|
||||
lock := fs.nsMutex.NewNSLock(ctx, bucket, object)
|
||||
switch lockType {
|
||||
case writeLock:
|
||||
if err = lock.GetLock(globalObjectTimeout); err != nil {
|
||||
@@ -567,7 +567,7 @@ func (fs *FSObjects) GetObject(ctx context.Context, bucket, object string, offse
|
||||
}
|
||||
|
||||
// Lock the object before reading.
|
||||
objectLock := fs.nsMutex.NewNSLock(bucket, object)
|
||||
objectLock := fs.nsMutex.NewNSLock(ctx, bucket, object)
|
||||
if err := objectLock.GetRLock(globalObjectTimeout); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return err
|
||||
@@ -739,7 +739,7 @@ func (fs *FSObjects) getObjectInfo(ctx context.Context, bucket, object string) (
|
||||
// getObjectInfoWithLock - reads object metadata and replies back ObjectInfo.
|
||||
func (fs *FSObjects) getObjectInfoWithLock(ctx context.Context, bucket, object string) (oi ObjectInfo, e error) {
|
||||
// Lock the object before reading.
|
||||
objectLock := fs.nsMutex.NewNSLock(bucket, object)
|
||||
objectLock := fs.nsMutex.NewNSLock(ctx, bucket, object)
|
||||
if err := objectLock.GetRLock(globalObjectTimeout); err != nil {
|
||||
return oi, err
|
||||
}
|
||||
@@ -764,7 +764,7 @@ func (fs *FSObjects) getObjectInfoWithLock(ctx context.Context, bucket, object s
|
||||
func (fs *FSObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (oi ObjectInfo, e error) {
|
||||
oi, err := fs.getObjectInfoWithLock(ctx, bucket, object)
|
||||
if err == errCorruptedFormat || err == io.EOF {
|
||||
objectLock := fs.nsMutex.NewNSLock(bucket, object)
|
||||
objectLock := fs.nsMutex.NewNSLock(ctx, bucket, object)
|
||||
if err = objectLock.GetLock(globalObjectTimeout); err != nil {
|
||||
return oi, toObjectErr(err, bucket, object)
|
||||
}
|
||||
@@ -810,7 +810,7 @@ func (fs *FSObjects) PutObject(ctx context.Context, bucket string, object string
|
||||
return ObjectInfo{}, err
|
||||
}
|
||||
// Lock the object.
|
||||
objectLock := fs.nsMutex.NewNSLock(bucket, object)
|
||||
objectLock := fs.nsMutex.NewNSLock(ctx, bucket, object)
|
||||
if err := objectLock.GetLock(globalObjectTimeout); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return objInfo, err
|
||||
@@ -965,7 +965,7 @@ func (fs *FSObjects) DeleteObjects(ctx context.Context, bucket string, objects [
|
||||
// and there are no rollbacks supported.
|
||||
func (fs *FSObjects) DeleteObject(ctx context.Context, bucket, object string) error {
|
||||
// Acquire a write lock before deleting the object.
|
||||
objectLock := fs.nsMutex.NewNSLock(bucket, object)
|
||||
objectLock := fs.nsMutex.NewNSLock(ctx, bucket, object)
|
||||
if err := objectLock.GetLock(globalOperationTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/minio/dsync"
|
||||
"github.com/minio/dsync/v2"
|
||||
)
|
||||
|
||||
// lockRequesterInfo stores various info from the client for each lock that is requested.
|
||||
|
||||
@@ -28,7 +28,7 @@ import (
|
||||
|
||||
"net/url"
|
||||
|
||||
"github.com/minio/dsync"
|
||||
"github.com/minio/dsync/v2"
|
||||
"github.com/minio/minio/cmd/http"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/cmd/rest"
|
||||
|
||||
@@ -19,7 +19,7 @@ package cmd
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/minio/dsync"
|
||||
"github.com/minio/dsync/v2"
|
||||
xnet "github.com/minio/minio/pkg/net"
|
||||
)
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/minio/dsync"
|
||||
"github.com/minio/dsync/v2"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
xnet "github.com/minio/minio/pkg/net"
|
||||
)
|
||||
|
||||
@@ -27,7 +27,7 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/minio/dsync"
|
||||
"github.com/minio/dsync/v2"
|
||||
"github.com/minio/lsync"
|
||||
"github.com/minio/minio-go/v6/pkg/set"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
@@ -127,7 +127,7 @@ type nsLockMap struct {
|
||||
}
|
||||
|
||||
// Lock the namespace resource.
|
||||
func (n *nsLockMap) lock(volume, path string, lockSource, opsID string, readLock bool, timeout time.Duration) (locked bool) {
|
||||
func (n *nsLockMap) lock(ctx context.Context, volume, path string, lockSource, opsID string, readLock bool, timeout time.Duration) (locked bool) {
|
||||
var nsLk *nsLock
|
||||
|
||||
n.lockMapMutex.Lock()
|
||||
@@ -135,7 +135,7 @@ func (n *nsLockMap) lock(volume, path string, lockSource, opsID string, readLock
|
||||
nsLk, found := n.lockMap[param]
|
||||
if !found {
|
||||
n.lockMap[param] = &nsLock{
|
||||
LRWMutex: &lsync.LRWMutex{},
|
||||
LRWMutex: lsync.NewLRWMutex(ctx),
|
||||
ref: 1,
|
||||
}
|
||||
nsLk = n.lockMap[param]
|
||||
@@ -199,7 +199,7 @@ func (n *nsLockMap) Lock(volume, path, opsID string, timeout time.Duration) (loc
|
||||
readLock := false // This is a write lock.
|
||||
|
||||
lockSource := getSource() // Useful for debugging
|
||||
return n.lock(volume, path, lockSource, opsID, readLock, timeout)
|
||||
return n.lock(context.Background(), volume, path, lockSource, opsID, readLock, timeout)
|
||||
}
|
||||
|
||||
// Unlock - unlocks any previously acquired write locks.
|
||||
@@ -213,7 +213,7 @@ func (n *nsLockMap) RLock(volume, path, opsID string, timeout time.Duration) (lo
|
||||
readLock := true
|
||||
|
||||
lockSource := getSource() // Useful for debugging
|
||||
return n.lock(volume, path, lockSource, opsID, readLock, timeout)
|
||||
return n.lock(context.Background(), volume, path, lockSource, opsID, readLock, timeout)
|
||||
}
|
||||
|
||||
// RUnlock - unlocks any previously acquired read locks.
|
||||
@@ -241,7 +241,7 @@ func (n *nsLockMap) ForceUnlock(volume, path string) {
|
||||
// are blocking can now proceed as normal and any new locks will also
|
||||
// participate normally.
|
||||
if n.isDistXL { // For distributed mode, broadcast ForceUnlock message.
|
||||
dsync.NewDRWMutex(pathJoin(volume, path), globalDsync).ForceUnlock()
|
||||
dsync.NewDRWMutex(context.Background(), pathJoin(volume, path), globalDsync).ForceUnlock()
|
||||
}
|
||||
|
||||
// Remove lock from the map.
|
||||
@@ -291,6 +291,7 @@ func (di *distLockInstance) RUnlock() {
|
||||
|
||||
// localLockInstance - frontend/top-level interface for namespace locks.
|
||||
type localLockInstance struct {
|
||||
ctx context.Context
|
||||
ns *nsLockMap
|
||||
volume, path, opsID string
|
||||
}
|
||||
@@ -298,12 +299,12 @@ type localLockInstance struct {
|
||||
// NewNSLock - returns a lock instance for a given volume and
|
||||
// path. The returned lockInstance object encapsulates the nsLockMap,
|
||||
// volume, path and operation ID.
|
||||
func (n *nsLockMap) NewNSLock(volume, path string) RWLocker {
|
||||
func (n *nsLockMap) NewNSLock(ctx context.Context, volume, path string) RWLocker {
|
||||
opsID := mustGetUUID()
|
||||
if n.isDistXL {
|
||||
return &distLockInstance{dsync.NewDRWMutex(pathJoin(volume, path), globalDsync), volume, path, opsID}
|
||||
return &distLockInstance{dsync.NewDRWMutex(ctx, pathJoin(volume, path), globalDsync), volume, path, opsID}
|
||||
}
|
||||
return &localLockInstance{n, volume, path, opsID}
|
||||
return &localLockInstance{ctx, n, volume, path, opsID}
|
||||
}
|
||||
|
||||
// Lock - block until write lock is taken or timeout has occurred.
|
||||
@@ -311,7 +312,7 @@ func (li *localLockInstance) GetLock(timeout *dynamicTimeout) (timedOutErr error
|
||||
lockSource := getSource()
|
||||
start := UTCNow()
|
||||
readLock := false
|
||||
if !li.ns.lock(li.volume, li.path, lockSource, li.opsID, readLock, timeout.Timeout()) {
|
||||
if !li.ns.lock(li.ctx, li.volume, li.path, lockSource, li.opsID, readLock, timeout.Timeout()) {
|
||||
timeout.LogFailure()
|
||||
return OperationTimedOut{Path: li.path}
|
||||
}
|
||||
@@ -330,7 +331,7 @@ func (li *localLockInstance) GetRLock(timeout *dynamicTimeout) (timedOutErr erro
|
||||
lockSource := getSource()
|
||||
start := UTCNow()
|
||||
readLock := true
|
||||
if !li.ns.lock(li.volume, li.path, lockSource, li.opsID, readLock, timeout.Timeout()) {
|
||||
if !li.ns.lock(li.ctx, li.volume, li.path, lockSource, li.opsID, readLock, timeout.Timeout()) {
|
||||
timeout.LogFailure()
|
||||
return OperationTimedOut{Path: li.path}
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
@@ -31,7 +32,7 @@ func TestGetSource(t *testing.T) {
|
||||
currentSource := func() string { return getSource() }
|
||||
gotSource := currentSource()
|
||||
// Hard coded line number, 32, in the "expectedSource" value
|
||||
expectedSource := "[namespace-lock_test.go:32:TestGetSource()]"
|
||||
expectedSource := "[namespace-lock_test.go:33:TestGetSource()]"
|
||||
if gotSource != expectedSource {
|
||||
t.Errorf("expected : %s, got : %s", expectedSource, gotSource)
|
||||
}
|
||||
@@ -204,7 +205,7 @@ func TestNamespaceForceUnlockTest(t *testing.T) {
|
||||
isDistXL := false
|
||||
initNSLock(isDistXL)
|
||||
// Create lock.
|
||||
lock := globalNSMutex.NewNSLock("bucket", "object")
|
||||
lock := globalNSMutex.NewNSLock(context.Background(), "bucket", "object")
|
||||
if lock.GetLock(newDynamicTimeout(60*time.Second, time.Second)) != nil {
|
||||
t.Fatalf("Failed to get lock")
|
||||
}
|
||||
@@ -215,7 +216,7 @@ func TestNamespaceForceUnlockTest(t *testing.T) {
|
||||
|
||||
go func() {
|
||||
// Try to claim lock again.
|
||||
anotherLock := globalNSMutex.NewNSLock("bucket", "object")
|
||||
anotherLock := globalNSMutex.NewNSLock(context.Background(), "bucket", "object")
|
||||
if anotherLock.GetLock(newDynamicTimeout(60*time.Second, time.Second)) != nil {
|
||||
t.Errorf("Failed to get lock")
|
||||
return
|
||||
|
||||
@@ -602,7 +602,7 @@ func (sys *NotificationSys) initListeners(ctx context.Context, objAPI ObjectLaye
|
||||
// As object layer's GetObject() and PutObject() take respective lock on minioMetaBucket
|
||||
// and configFile, take a transaction lock to avoid data race between readConfig()
|
||||
// and saveConfig().
|
||||
objLock := globalNSMutex.NewNSLock(minioMetaBucket, transactionConfigFile)
|
||||
objLock := globalNSMutex.NewNSLock(ctx, minioMetaBucket, transactionConfigFile)
|
||||
if err := objLock.GetRLock(globalOperationTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -1058,7 +1058,7 @@ func SaveListener(objAPI ObjectLayer, bucketName string, eventNames []event.Name
|
||||
// As object layer's GetObject() and PutObject() take respective lock on minioMetaBucket
|
||||
// and configFile, take a transaction lock to avoid data race between readConfig()
|
||||
// and saveConfig().
|
||||
objLock := globalNSMutex.NewNSLock(minioMetaBucket, transactionConfigFile)
|
||||
objLock := globalNSMutex.NewNSLock(ctx, minioMetaBucket, transactionConfigFile)
|
||||
if err := objLock.GetLock(globalOperationTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -1109,7 +1109,7 @@ func RemoveListener(objAPI ObjectLayer, bucketName string, targetID event.Target
|
||||
// As object layer's GetObject() and PutObject() take respective lock on minioMetaBucket
|
||||
// and configFile, take a transaction lock to avoid data race between readConfig()
|
||||
// and saveConfig().
|
||||
objLock := globalNSMutex.NewNSLock(minioMetaBucket, transactionConfigFile)
|
||||
objLock := globalNSMutex.NewNSLock(ctx, minioMetaBucket, transactionConfigFile)
|
||||
if err := objLock.GetLock(globalOperationTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -793,3 +793,18 @@ func (cr *snappyCompressReader) Read(p []byte) (int, error) {
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Returns error if the cancelCh has been closed (indicating that S3 client has disconnected)
|
||||
type detectDisconnect struct {
|
||||
io.ReadCloser
|
||||
cancelCh <-chan struct{}
|
||||
}
|
||||
|
||||
func (d *detectDisconnect) Read(p []byte) (int, error) {
|
||||
select {
|
||||
case <-d.cancelCh:
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
default:
|
||||
return d.ReadCloser.Read(p)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1064,6 +1064,9 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
|
||||
bucket := vars["bucket"]
|
||||
object := vars["object"]
|
||||
|
||||
// To detect if the client has disconnected.
|
||||
r.Body = &detectDisconnect{r.Body, r.Context().Done()}
|
||||
|
||||
// X-Amz-Copy-Source shouldn't be set for this call.
|
||||
if _, ok := r.Header["X-Amz-Copy-Source"]; ok {
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrInvalidCopySource), r.URL, guessIsBrowserReq(r))
|
||||
|
||||
@@ -27,7 +27,7 @@ import (
|
||||
"syscall"
|
||||
|
||||
"github.com/minio/cli"
|
||||
"github.com/minio/dsync"
|
||||
"github.com/minio/dsync/v2"
|
||||
xhttp "github.com/minio/minio/cmd/http"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/certs"
|
||||
|
||||
@@ -433,7 +433,7 @@ func newContext(r *http.Request, w http.ResponseWriter, api string) context.Cont
|
||||
BucketName: bucket,
|
||||
ObjectName: object,
|
||||
}
|
||||
return logger.SetReqInfo(context.Background(), reqInfo)
|
||||
return logger.SetReqInfo(r.Context(), reqInfo)
|
||||
}
|
||||
|
||||
// isNetworkOrHostDown - if there was a network error or if the host is down.
|
||||
|
||||
@@ -686,7 +686,7 @@ func (s *xlSets) CopyObject(ctx context.Context, srcBucket, srcObject, destBucke
|
||||
}
|
||||
|
||||
if !cpSrcDstSame {
|
||||
objectDWLock := destSet.nsMutex.NewNSLock(destBucket, destObject)
|
||||
objectDWLock := destSet.nsMutex.NewNSLock(ctx, destBucket, destObject)
|
||||
if err := objectDWLock.GetLock(globalObjectTimeout); err != nil {
|
||||
return objInfo, err
|
||||
}
|
||||
@@ -1157,7 +1157,7 @@ func formatsToDrivesInfo(endpoints EndpointList, formats []*formatXLV3, sErrs []
|
||||
// healing in a distributed setup.
|
||||
func (s *xlSets) ReloadFormat(ctx context.Context, dryRun bool) (err error) {
|
||||
// Acquire lock on format.json
|
||||
formatLock := s.getHashedSet(formatConfigFile).nsMutex.NewNSLock(minioMetaBucket, formatConfigFile)
|
||||
formatLock := s.getHashedSet(formatConfigFile).nsMutex.NewNSLock(ctx, minioMetaBucket, formatConfigFile)
|
||||
if err = formatLock.GetRLock(globalHealingTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -1278,7 +1278,7 @@ func markRootDisksAsDown(storageDisks []StorageAPI) {
|
||||
// coded data in it.
|
||||
func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealResultItem, err error) {
|
||||
// Acquire lock on format.json
|
||||
formatLock := s.getHashedSet(formatConfigFile).nsMutex.NewNSLock(minioMetaBucket, formatConfigFile)
|
||||
formatLock := s.getHashedSet(formatConfigFile).nsMutex.NewNSLock(ctx, minioMetaBucket, formatConfigFile)
|
||||
if err = formatLock.GetLock(globalHealingTimeout); err != nil {
|
||||
return madmin.HealResultItem{}, err
|
||||
}
|
||||
@@ -1431,7 +1431,7 @@ func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealRe
|
||||
|
||||
// HealBucket - heals inconsistent buckets and bucket metadata on all sets.
|
||||
func (s *xlSets) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (result madmin.HealResultItem, err error) {
|
||||
bucketLock := globalNSMutex.NewNSLock(bucket, "")
|
||||
bucketLock := globalNSMutex.NewNSLock(ctx, bucket, "")
|
||||
if err := bucketLock.GetLock(globalHealingTimeout); err != nil {
|
||||
return result, err
|
||||
}
|
||||
|
||||
@@ -151,7 +151,7 @@ func (xl xlObjects) getBucketInfo(ctx context.Context, bucketName string) (bucke
|
||||
|
||||
// GetBucketInfo - returns BucketInfo for a bucket.
|
||||
func (xl xlObjects) GetBucketInfo(ctx context.Context, bucket string) (bi BucketInfo, e error) {
|
||||
bucketLock := xl.nsMutex.NewNSLock(bucket, "")
|
||||
bucketLock := xl.nsMutex.NewNSLock(ctx, bucket, "")
|
||||
if e := bucketLock.GetRLock(globalObjectTimeout); e != nil {
|
||||
return bi, e
|
||||
}
|
||||
@@ -237,7 +237,7 @@ func deleteDanglingBucket(ctx context.Context, storageDisks []StorageAPI, dErrs
|
||||
|
||||
// DeleteBucket - deletes a bucket.
|
||||
func (xl xlObjects) DeleteBucket(ctx context.Context, bucket string) error {
|
||||
bucketLock := xl.nsMutex.NewNSLock(bucket, "")
|
||||
bucketLock := xl.nsMutex.NewNSLock(ctx, bucket, "")
|
||||
if err := bucketLock.GetLock(globalObjectTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -717,7 +717,7 @@ func (xl xlObjects) HealObject(ctx context.Context, bucket, object string, dryRu
|
||||
}
|
||||
|
||||
// Lock the object before healing.
|
||||
objectLock := xl.nsMutex.NewNSLock(bucket, object)
|
||||
objectLock := xl.nsMutex.NewNSLock(ctx, bucket, object)
|
||||
if lerr := objectLock.GetRLock(globalHealingTimeout); lerr != nil {
|
||||
return defaultHealResult(latestXLMeta, storageDisks, errs, bucket, object), lerr
|
||||
}
|
||||
|
||||
@@ -258,7 +258,7 @@ func (xl xlObjects) NewMultipartUpload(ctx context.Context, bucket, object strin
|
||||
func (xl xlObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject, uploadID string, partID int, startOffset int64, length int64, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (pi PartInfo, e error) {
|
||||
// Hold read locks on source object only if we are
|
||||
// going to read data from source object.
|
||||
objectSRLock := xl.nsMutex.NewNSLock(srcBucket, srcObject)
|
||||
objectSRLock := xl.nsMutex.NewNSLock(ctx, srcBucket, srcObject)
|
||||
if err := objectSRLock.GetRLock(globalObjectTimeout); err != nil {
|
||||
return pi, err
|
||||
}
|
||||
@@ -300,7 +300,7 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
|
||||
uploadIDLockPath := xl.getUploadIDLockPath(bucket, object, uploadID)
|
||||
|
||||
// pre-check upload id lock.
|
||||
preUploadIDLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDLockPath)
|
||||
preUploadIDLock := xl.nsMutex.NewNSLock(ctx, minioMetaMultipartBucket, uploadIDLockPath)
|
||||
if err := preUploadIDLock.GetRLock(globalOperationTimeout); err != nil {
|
||||
return pi, err
|
||||
}
|
||||
@@ -398,7 +398,7 @@ func (xl xlObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
|
||||
}
|
||||
|
||||
// post-upload check (write) lock
|
||||
postUploadIDLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDLockPath)
|
||||
postUploadIDLock := xl.nsMutex.NewNSLock(ctx, minioMetaMultipartBucket, uploadIDLockPath)
|
||||
if err = postUploadIDLock.GetLock(globalOperationTimeout); err != nil {
|
||||
return pi, err
|
||||
}
|
||||
@@ -492,7 +492,7 @@ func (xl xlObjects) ListObjectParts(ctx context.Context, bucket, object, uploadI
|
||||
}
|
||||
// Hold lock so that there is no competing
|
||||
// abort-multipart-upload or complete-multipart-upload.
|
||||
uploadIDLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket,
|
||||
uploadIDLock := xl.nsMutex.NewNSLock(ctx, minioMetaMultipartBucket,
|
||||
xl.getUploadIDLockPath(bucket, object, uploadID))
|
||||
if err := uploadIDLock.GetLock(globalListingTimeout); err != nil {
|
||||
return result, err
|
||||
@@ -596,7 +596,7 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
|
||||
return oi, err
|
||||
}
|
||||
// Hold write lock on the object.
|
||||
destLock := xl.nsMutex.NewNSLock(bucket, object)
|
||||
destLock := xl.nsMutex.NewNSLock(ctx, bucket, object)
|
||||
if err := destLock.GetLock(globalObjectTimeout); err != nil {
|
||||
return oi, err
|
||||
}
|
||||
@@ -611,7 +611,7 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
|
||||
//
|
||||
// 2) no one does a parallel complete-multipart-upload on this
|
||||
// multipart upload
|
||||
uploadIDLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDLockPath)
|
||||
uploadIDLock := xl.nsMutex.NewNSLock(ctx, minioMetaMultipartBucket, uploadIDLockPath)
|
||||
if err := uploadIDLock.GetLock(globalOperationTimeout); err != nil {
|
||||
return oi, err
|
||||
}
|
||||
@@ -815,7 +815,7 @@ func (xl xlObjects) AbortMultipartUpload(ctx context.Context, bucket, object, up
|
||||
uploadIDLockPath := xl.getUploadIDLockPath(bucket, object, uploadID)
|
||||
// Hold lock so that there is no competing
|
||||
// complete-multipart-upload or put-object-part.
|
||||
uploadIDLock := xl.nsMutex.NewNSLock(minioMetaMultipartBucket, uploadIDLockPath)
|
||||
uploadIDLock := xl.nsMutex.NewNSLock(ctx, minioMetaMultipartBucket, uploadIDLockPath)
|
||||
if err := uploadIDLock.GetLock(globalOperationTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -125,7 +125,7 @@ func (xl xlObjects) GetObjectNInfo(ctx context.Context, bucket, object string, r
|
||||
|
||||
// Acquire lock
|
||||
if lockType != noLock {
|
||||
lock := xl.nsMutex.NewNSLock(bucket, object)
|
||||
lock := xl.nsMutex.NewNSLock(ctx, bucket, object)
|
||||
switch lockType {
|
||||
case writeLock:
|
||||
if err = lock.GetLock(globalObjectTimeout); err != nil {
|
||||
@@ -188,7 +188,7 @@ func (xl xlObjects) GetObjectNInfo(ctx context.Context, bucket, object string, r
|
||||
// length indicates the total length of the object.
|
||||
func (xl xlObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error {
|
||||
// Lock the object before reading.
|
||||
objectLock := xl.nsMutex.NewNSLock(bucket, object)
|
||||
objectLock := xl.nsMutex.NewNSLock(ctx, bucket, object)
|
||||
if err := objectLock.GetRLock(globalObjectTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -369,7 +369,7 @@ func (xl xlObjects) getObjectInfoDir(ctx context.Context, bucket, object string)
|
||||
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
|
||||
func (xl xlObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (oi ObjectInfo, e error) {
|
||||
// Lock the object before reading.
|
||||
objectLock := xl.nsMutex.NewNSLock(bucket, object)
|
||||
objectLock := xl.nsMutex.NewNSLock(ctx, bucket, object)
|
||||
if err := objectLock.GetRLock(globalObjectTimeout); err != nil {
|
||||
return oi, err
|
||||
}
|
||||
@@ -504,7 +504,7 @@ func (xl xlObjects) PutObject(ctx context.Context, bucket string, object string,
|
||||
}
|
||||
|
||||
// Lock the object.
|
||||
objectLock := xl.nsMutex.NewNSLock(bucket, object)
|
||||
objectLock := xl.nsMutex.NewNSLock(ctx, bucket, object)
|
||||
if err := objectLock.GetLock(globalObjectTimeout); err != nil {
|
||||
return objInfo, err
|
||||
}
|
||||
@@ -857,7 +857,7 @@ func (xl xlObjects) deleteObjects(ctx context.Context, bucket string, objects []
|
||||
continue
|
||||
}
|
||||
// Acquire a write lock before deleting the object.
|
||||
objectLocks[i] = xl.nsMutex.NewNSLock(bucket, object)
|
||||
objectLocks[i] = xl.nsMutex.NewNSLock(ctx, bucket, object)
|
||||
if errs[i] = objectLocks[i].GetLock(globalOperationTimeout); errs[i] != nil {
|
||||
continue
|
||||
}
|
||||
@@ -961,7 +961,7 @@ func (xl xlObjects) DeleteObjects(ctx context.Context, bucket string, objects []
|
||||
// response to the client request.
|
||||
func (xl xlObjects) DeleteObject(ctx context.Context, bucket, object string) (err error) {
|
||||
// Acquire a write lock before deleting the object.
|
||||
objectLock := xl.nsMutex.NewNSLock(bucket, object)
|
||||
objectLock := xl.nsMutex.NewNSLock(ctx, bucket, object)
|
||||
if perr := objectLock.GetLock(globalOperationTimeout); perr != nil {
|
||||
return perr
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user