mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
fix: inlined objects don't need to honor long locks (#17039)
This commit is contained in:
parent
18515a4e3b
commit
6825bd7e75
@ -694,7 +694,7 @@ func (r *BatchJobReplicateV1) ReplicateToTarget(ctx context.Context, api ObjectL
|
||||
Versioned: versioned,
|
||||
VersionSuspended: versionSuspended,
|
||||
}
|
||||
rd, err := api.GetObjectNInfo(ctx, srcBucket, srcObject, nil, http.Header{}, readLock, opts)
|
||||
rd, err := api.GetObjectNInfo(ctx, srcBucket, srcObject, nil, http.Header{}, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1085,7 +1085,7 @@ func (ri ReplicateObjectInfo) replicateObject(ctx context.Context, objectAPI Obj
|
||||
versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object)
|
||||
versionSuspended := globalBucketVersioningSys.PrefixSuspended(bucket, object)
|
||||
|
||||
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{
|
||||
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, ObjectOptions{
|
||||
VersionID: objInfo.VersionID,
|
||||
Versioned: versioned,
|
||||
VersionSuspended: versionSuspended,
|
||||
@ -1233,7 +1233,7 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
|
||||
versioned := globalBucketVersioningSys.PrefixEnabled(bucket, object)
|
||||
versionSuspended := globalBucketVersioningSys.PrefixSuspended(bucket, object)
|
||||
|
||||
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, readLock, ObjectOptions{
|
||||
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, nil, http.Header{}, ObjectOptions{
|
||||
VersionID: objInfo.VersionID,
|
||||
Versioned: versioned,
|
||||
VersionSuspended: versionSuspended,
|
||||
|
@ -30,12 +30,7 @@ import (
|
||||
var errConfigNotFound = errors.New("config file not found")
|
||||
|
||||
func readConfigWithMetadata(ctx context.Context, store objectIO, configFile string, opts ObjectOptions) ([]byte, ObjectInfo, error) {
|
||||
lockType := readLock
|
||||
if opts.NoLock {
|
||||
lockType = noLock // erasureObjects.GetObjectNInfo honors lockType argument but not opts.NoLock.
|
||||
}
|
||||
|
||||
r, err := store.GetObjectNInfo(ctx, minioMetaBucket, configFile, nil, http.Header{}, lockType, opts)
|
||||
r, err := store.GetObjectNInfo(ctx, minioMetaBucket, configFile, nil, http.Header{}, opts)
|
||||
if err != nil {
|
||||
if isErrObjectNotFound(err) {
|
||||
return nil, ObjectInfo{}, errConfigNotFound
|
||||
|
@ -873,7 +873,7 @@ func (d *dataUsageCache) merge(other dataUsageCache) {
|
||||
}
|
||||
|
||||
type objectIO interface {
|
||||
GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (reader *GetObjectReader, err error)
|
||||
GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (reader *GetObjectReader, err error)
|
||||
PutObject(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
|
||||
}
|
||||
|
||||
@ -889,7 +889,7 @@ func (d *dataUsageCache) load(ctx context.Context, store objectIO, name string)
|
||||
// Caches are read+written without locks,
|
||||
retries := 0
|
||||
for retries < 5 {
|
||||
r, err := store.GetObjectNInfo(ctx, dataUsageBucket, name, nil, http.Header{}, noLock, ObjectOptions{NoLock: true})
|
||||
r, err := store.GetObjectNInfo(ctx, dataUsageBucket, name, nil, http.Header{}, ObjectOptions{NoLock: true})
|
||||
if err != nil {
|
||||
switch err.(type) {
|
||||
case ObjectNotFound, BucketNotFound:
|
||||
|
@ -80,7 +80,7 @@ type CacheStorageInfo struct {
|
||||
// CacheObjectLayer implements primitives for cache object API layer.
|
||||
type CacheObjectLayer interface {
|
||||
// Object operations.
|
||||
GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error)
|
||||
GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error)
|
||||
GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
|
||||
DeleteObject(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error)
|
||||
DeleteObjects(ctx context.Context, bucket string, objects []ObjectToDelete, opts ObjectOptions) ([]DeletedObject, []error)
|
||||
@ -117,7 +117,7 @@ type cacheObjects struct {
|
||||
// Cache stats
|
||||
cacheStats *CacheStats
|
||||
|
||||
InnerGetObjectNInfoFn func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error)
|
||||
InnerGetObjectNInfoFn func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error)
|
||||
InnerGetObjectInfoFn func(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
|
||||
InnerDeleteObjectFn func(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
|
||||
InnerPutObjectFn func(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
|
||||
@ -231,16 +231,16 @@ func (c *cacheObjects) incCacheStats(size int64) {
|
||||
c.cacheStats.incBytesServed(size)
|
||||
}
|
||||
|
||||
func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
|
||||
func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error) {
|
||||
if c.isCacheExclude(bucket, object) || c.skipCache() {
|
||||
return c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
|
||||
return c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, opts)
|
||||
}
|
||||
var cc *cacheControl
|
||||
var cacheObjSize int64
|
||||
// fetch diskCache if object is currently cached or nearest available cache drive
|
||||
dcache, err := c.getCacheToLoc(ctx, bucket, object)
|
||||
if err != nil {
|
||||
return c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
|
||||
return c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, opts)
|
||||
}
|
||||
|
||||
cacheReader, numCacheHits, cacheErr := dcache.Get(ctx, bucket, object, rs, h, opts)
|
||||
@ -269,7 +269,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
|
||||
if cc != nil && cc.noStore {
|
||||
cacheReader.Close()
|
||||
c.cacheStats.incMiss()
|
||||
bReader, err := c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
|
||||
bReader, err := c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, opts)
|
||||
bReader.ObjInfo.CacheLookupStatus = CacheHit
|
||||
bReader.ObjInfo.CacheStatus = CacheMiss
|
||||
return bReader, err
|
||||
@ -304,7 +304,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
|
||||
cacheReader.Close()
|
||||
}
|
||||
c.cacheStats.incMiss()
|
||||
return c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
|
||||
return c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, opts)
|
||||
}
|
||||
// skip cache for objects with locks
|
||||
objRetention := objectlock.GetObjectRetentionMeta(objInfo.UserDefined)
|
||||
@ -314,7 +314,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
|
||||
cacheReader.Close()
|
||||
}
|
||||
c.cacheStats.incMiss()
|
||||
return c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
|
||||
return c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, opts)
|
||||
}
|
||||
if cacheErr == nil {
|
||||
// if ETag matches for stale cache entry, serve from cache
|
||||
@ -332,7 +332,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
|
||||
// Reaching here implies cache miss
|
||||
c.cacheStats.incMiss()
|
||||
|
||||
bkReader, bkErr := c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, lockType, opts)
|
||||
bkReader, bkErr := c.InnerGetObjectNInfoFn(ctx, bucket, object, rs, h, opts)
|
||||
|
||||
if bkErr != nil {
|
||||
return bkReader, bkErr
|
||||
@ -359,7 +359,7 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
|
||||
// if range caching is disabled, download entire object.
|
||||
rs = nil
|
||||
// fill cache in the background for range GET requests
|
||||
bReader, bErr := c.InnerGetObjectNInfoFn(GlobalContext, bucket, object, rs, h, lockType, opts)
|
||||
bReader, bErr := c.InnerGetObjectNInfoFn(GlobalContext, bucket, object, rs, h, opts)
|
||||
if bErr != nil {
|
||||
return
|
||||
}
|
||||
@ -713,7 +713,7 @@ func (c *cacheObjects) PutObject(ctx context.Context, bucket, object string, r *
|
||||
if err == nil {
|
||||
go func() {
|
||||
// fill cache in the background
|
||||
bReader, bErr := c.InnerGetObjectNInfoFn(GlobalContext, bucket, object, nil, http.Header{}, readLock, ObjectOptions{})
|
||||
bReader, bErr := c.InnerGetObjectNInfoFn(GlobalContext, bucket, object, nil, http.Header{}, ObjectOptions{})
|
||||
if bErr != nil {
|
||||
return
|
||||
}
|
||||
@ -857,8 +857,8 @@ func newServerCacheObjects(ctx context.Context, config cache.Config) (CacheObjec
|
||||
InnerGetObjectInfoFn: func(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
|
||||
return newObjectLayerFn().GetObjectInfo(ctx, bucket, object, opts)
|
||||
},
|
||||
InnerGetObjectNInfoFn: func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
|
||||
return newObjectLayerFn().GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
|
||||
InnerGetObjectNInfoFn: func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error) {
|
||||
return newObjectLayerFn().GetObjectNInfo(ctx, bucket, object, rs, h, opts)
|
||||
},
|
||||
InnerDeleteObjectFn: func(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
|
||||
return newObjectLayerFn().DeleteObject(ctx, bucket, object, opts)
|
||||
@ -1143,7 +1143,7 @@ func (c *cacheObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject,
|
||||
End: startOffset + length,
|
||||
}
|
||||
// fill cache in the background
|
||||
bReader, bErr := c.InnerGetObjectNInfoFn(GlobalContext, srcBucket, srcObject, rs, http.Header{}, readLock, ObjectOptions{})
|
||||
bReader, bErr := c.InnerGetObjectNInfoFn(GlobalContext, srcBucket, srcObject, rs, http.Header{}, ObjectOptions{})
|
||||
if bErr != nil {
|
||||
return
|
||||
}
|
||||
@ -1176,7 +1176,7 @@ func (c *cacheObjects) CompleteMultipartUpload(ctx context.Context, bucket, obje
|
||||
_, err := dcache.CompleteMultipartUpload(bgContext(ctx), bucket, object, uploadID, uploadedParts, oi, opts)
|
||||
if err != nil {
|
||||
// fill cache in the background
|
||||
bReader, bErr := c.InnerGetObjectNInfoFn(GlobalContext, bucket, object, nil, http.Header{}, readLock, ObjectOptions{})
|
||||
bReader, bErr := c.InnerGetObjectNInfoFn(GlobalContext, bucket, object, nil, http.Header{}, ObjectOptions{})
|
||||
if bErr != nil {
|
||||
return
|
||||
}
|
||||
|
@ -1634,7 +1634,7 @@ func TestHealLastDataShard(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
firstGr, err := obj.GetObjectNInfo(ctx, bucket, object, nil, nil, noLock, ObjectOptions{})
|
||||
firstGr, err := obj.GetObjectNInfo(ctx, bucket, object, nil, nil, ObjectOptions{NoLock: true})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -1664,7 +1664,7 @@ func TestHealLastDataShard(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
secondGr, err := obj.GetObjectNInfo(ctx, bucket, object, nil, nil, noLock, ObjectOptions{})
|
||||
secondGr, err := obj.GetObjectNInfo(ctx, bucket, object, nil, nil, ObjectOptions{NoLock: true})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -234,6 +234,7 @@ func (fi FileInfo) ToObjectInfo(bucket, object string, versioned bool) ObjectInf
|
||||
}
|
||||
}
|
||||
objInfo.Checksum = fi.Checksum
|
||||
objInfo.Inlined = fi.InlineData()
|
||||
// Success.
|
||||
return objInfo
|
||||
}
|
||||
|
@ -183,7 +183,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
|
||||
|
||||
// GetObjectNInfo - returns object info and an object
|
||||
// Read(Closer). When err != nil, the returned reader is always nil.
|
||||
func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
|
||||
func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error) {
|
||||
auditObjectErasureSet(ctx, object, &er)
|
||||
|
||||
// This is a special call attempted first to check for SOS-API calls.
|
||||
@ -203,25 +203,26 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
|
||||
}()
|
||||
|
||||
// Acquire lock
|
||||
if lockType != noLock {
|
||||
if !opts.NoLock {
|
||||
lock := er.NewNSLock(bucket, object)
|
||||
switch lockType {
|
||||
case writeLock:
|
||||
lkctx, err := lock.GetLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ctx = lkctx.Context()
|
||||
nsUnlocker = func() { lock.Unlock(lkctx) }
|
||||
case readLock:
|
||||
lkctx, err := lock.GetRLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ctx = lkctx.Context()
|
||||
nsUnlocker = func() { lock.RUnlock(lkctx) }
|
||||
}
|
||||
|
||||
// Release lock when the metadata is verified, and reader
|
||||
// is ready to be read.
|
||||
//
|
||||
// This is possible to be lock free because
|
||||
// - xl.meta for inlined objects has already read the data
|
||||
// into memory, any mutation on xl.meta subsequently is
|
||||
// inconsequential to the overall read operation.
|
||||
// - xl.meta metadata is still verified for quorum under lock()
|
||||
// however writing the response doesn't need to serialize
|
||||
// concurrent writers
|
||||
unlockOnDefer = true
|
||||
nsUnlocker = func() { lock.RUnlock(lkctx) }
|
||||
}
|
||||
|
||||
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts, true)
|
||||
@ -265,6 +266,7 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
|
||||
ObjInfo: objInfo,
|
||||
}, toObjectErr(errMethodNotAllowed, bucket, object)
|
||||
}
|
||||
|
||||
if objInfo.IsRemote() {
|
||||
gr, err := getTransitionedObjectReader(ctx, bucket, object, rs, h, objInfo, opts)
|
||||
if err != nil {
|
||||
@ -278,7 +280,10 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
unlockOnDefer = false
|
||||
|
||||
if unlockOnDefer {
|
||||
unlockOnDefer = fi.InlineData()
|
||||
}
|
||||
|
||||
pr, pw := xioutil.WaitPipe()
|
||||
go func() {
|
||||
@ -291,9 +296,13 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
|
||||
pr.CloseWithError(nil)
|
||||
}
|
||||
|
||||
if !unlockOnDefer {
|
||||
return fn(pr, h, pipeCloser, nsUnlocker)
|
||||
}
|
||||
|
||||
return fn(pr, h, pipeCloser)
|
||||
}
|
||||
|
||||
func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI) error {
|
||||
// Reorder online disks based on erasure distribution order.
|
||||
// Reorder parts metadata based on erasure distribution order.
|
||||
|
@ -532,7 +532,7 @@ func TestGetObjectNoQuorum(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
gr, err := xl.GetObjectNInfo(ctx, bucket, object, nil, nil, readLock, opts)
|
||||
gr, err := xl.GetObjectNInfo(ctx, bucket, object, nil, nil, opts)
|
||||
if err != nil {
|
||||
if err != toObjectErr(errErasureReadQuorum, bucket, object) {
|
||||
t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureReadQuorum, bucket, object), err)
|
||||
@ -577,7 +577,7 @@ func TestGetObjectNoQuorum(t *testing.T) {
|
||||
}
|
||||
z.serverPools[0].erasureDisksMu.Unlock()
|
||||
// Fetch object from store.
|
||||
gr, err := xl.GetObjectNInfo(ctx, bucket, object, nil, nil, readLock, opts)
|
||||
gr, err := xl.GetObjectNInfo(ctx, bucket, object, nil, nil, opts)
|
||||
if err != nil {
|
||||
if err != toObjectErr(errErasureReadQuorum, bucket, object) {
|
||||
t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureReadQuorum, bucket, object), err)
|
||||
@ -831,7 +831,7 @@ func TestPutObjectSmallInlineData(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
gr, err := obj.GetObjectNInfo(ctx, bucket, object, nil, nil, readLock, ObjectOptions{})
|
||||
gr, err := obj.GetObjectNInfo(ctx, bucket, object, nil, nil, ObjectOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Expected GetObject to succeed, but failed with %v", err)
|
||||
}
|
||||
@ -855,7 +855,7 @@ func TestPutObjectSmallInlineData(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
gr, err = obj.GetObjectNInfo(ctx, bucket, object, nil, nil, readLock, ObjectOptions{})
|
||||
gr, err = obj.GetObjectNInfo(ctx, bucket, object, nil, nil, ObjectOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Expected GetObject to succeed, but failed with %v", err)
|
||||
}
|
||||
@ -1099,7 +1099,7 @@ func TestGetObjectInlineNotInline(t *testing.T) {
|
||||
}
|
||||
|
||||
// Try to read the object and check its md5sum
|
||||
gr, err := objLayer.GetObjectNInfo(ctx, "testbucket", "file", nil, nil, readLock, ObjectOptions{})
|
||||
gr, err := objLayer.GetObjectNInfo(ctx, "testbucket", "file", nil, nil, ObjectOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Expected GetObject to succeed, but failed with %v", err)
|
||||
}
|
||||
@ -1193,7 +1193,7 @@ func TestGetObjectWithOutdatedDisks(t *testing.T) {
|
||||
sets.erasureDisksMu.Lock()
|
||||
xl.getDisks = func() []StorageAPI { return origErasureDisks }
|
||||
sets.erasureDisksMu.Unlock()
|
||||
gr, err := z.GetObjectNInfo(ctx, testCase.bucket, testCase.object, nil, nil, readLock, ObjectOptions{VersionID: got.VersionID})
|
||||
gr, err := z.GetObjectNInfo(ctx, testCase.bucket, testCase.object, nil, nil, ObjectOptions{VersionID: got.VersionID})
|
||||
if err != nil {
|
||||
t.Fatalf("Expected GetObject to succeed, but failed with %v", err)
|
||||
}
|
||||
|
@ -838,10 +838,10 @@ func (z *erasureServerPools) decommissionPool(ctx context.Context, idx int, pool
|
||||
encodeDirObject(version.Name),
|
||||
nil,
|
||||
http.Header{},
|
||||
noLock, // all mutations are blocked reads are safe without locks.
|
||||
ObjectOptions{
|
||||
VersionID: version.VersionID,
|
||||
NoDecryption: true,
|
||||
NoLock: true,
|
||||
})
|
||||
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
||||
// object deleted by the application, nothing to do here we move on.
|
||||
|
@ -550,10 +550,10 @@ func (z *erasureServerPools) rebalanceBucket(ctx context.Context, bucket string,
|
||||
encodeDirObject(version.Name),
|
||||
nil,
|
||||
http.Header{},
|
||||
noLock, // all mutations are blocked reads are safe without locks.
|
||||
ObjectOptions{
|
||||
VersionID: version.VersionID,
|
||||
NoDecryption: true,
|
||||
NoLock: true,
|
||||
})
|
||||
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
||||
// object deleted by the application, nothing to do here we move on.
|
||||
|
@ -773,7 +773,7 @@ func (z *erasureServerPools) MakeBucket(ctx context.Context, bucket string, opts
|
||||
return nil
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
|
||||
func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error) {
|
||||
if err = checkGetObjArgs(ctx, bucket, object); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -781,7 +781,7 @@ func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object
|
||||
object = encodeDirObject(object)
|
||||
|
||||
if z.SinglePool() {
|
||||
return z.serverPools[0].GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
|
||||
return z.serverPools[0].GetObjectNInfo(ctx, bucket, object, rs, h, opts)
|
||||
}
|
||||
|
||||
var unlockOnDefer bool
|
||||
@ -793,24 +793,14 @@ func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object
|
||||
}()
|
||||
|
||||
// Acquire lock
|
||||
if lockType != noLock {
|
||||
if !opts.NoLock {
|
||||
lock := z.NewNSLock(bucket, object)
|
||||
switch lockType {
|
||||
case writeLock:
|
||||
lkctx, err := lock.GetLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ctx = lkctx.Context()
|
||||
nsUnlocker = func() { lock.Unlock(lkctx) }
|
||||
case readLock:
|
||||
lkctx, err := lock.GetRLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ctx = lkctx.Context()
|
||||
nsUnlocker = func() { lock.RUnlock(lkctx) }
|
||||
}
|
||||
unlockOnDefer = true
|
||||
}
|
||||
|
||||
@ -838,14 +828,17 @@ func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object
|
||||
return nil, PreConditionFailed{}
|
||||
}
|
||||
|
||||
lockType = noLock // do not take locks at lower levels for GetObjectNInfo()
|
||||
gr, err = z.serverPools[zIdx].GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
|
||||
opts.NoLock = true
|
||||
gr, err = z.serverPools[zIdx].GetObjectNInfo(ctx, bucket, object, rs, h, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if unlockOnDefer {
|
||||
unlockOnDefer = false
|
||||
unlockOnDefer = gr.ObjInfo.Inlined
|
||||
}
|
||||
|
||||
if !unlockOnDefer {
|
||||
return gr.WithCleanupFuncs(nsUnlocker), nil
|
||||
}
|
||||
return gr, nil
|
||||
|
@ -756,9 +756,9 @@ func listDeletedBuckets(ctx context.Context, storageDisks []StorageAPI, delBucke
|
||||
// --- Object Operations ---
|
||||
|
||||
// GetObjectNInfo - returns object info and locked object ReadCloser
|
||||
func (s *erasureSets) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
|
||||
func (s *erasureSets) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, err error) {
|
||||
set := s.getHashedSet(object)
|
||||
return set.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
|
||||
return set.GetObjectNInfo(ctx, bucket, object, rs, h, opts)
|
||||
}
|
||||
|
||||
// PutObject - writes an object to hashedSet based on the object name.
|
||||
|
@ -193,6 +193,9 @@ type ObjectInfo struct {
|
||||
// Checksums added on upload.
|
||||
// Encoded, maybe encrypted.
|
||||
Checksum []byte
|
||||
|
||||
// Inlined
|
||||
Inlined bool
|
||||
}
|
||||
|
||||
// ArchiveInfo returns any saved zip archive meta information.
|
||||
|
@ -179,15 +179,6 @@ func (o *ObjectOptions) PutReplicationState() (r ReplicationState) {
|
||||
return
|
||||
}
|
||||
|
||||
// LockType represents required locking for ObjectLayer operations
|
||||
type LockType int
|
||||
|
||||
const (
|
||||
noLock LockType = iota
|
||||
readLock
|
||||
writeLock
|
||||
)
|
||||
|
||||
// ObjectLayer implements primitives for object API layer.
|
||||
type ObjectLayer interface {
|
||||
// Locking operations on object.
|
||||
@ -214,12 +205,12 @@ type ObjectLayer interface {
|
||||
// Object operations.
|
||||
|
||||
// GetObjectNInfo returns a GetObjectReader that satisfies the
|
||||
// ReadCloser interface. The Close method unlocks the object
|
||||
// after reading, so it must always be called after usage.
|
||||
// ReadCloser interface. The Close method runs any cleanup
|
||||
// functions, so it must always be called after reading till EOF
|
||||
//
|
||||
// IMPORTANTLY, when implementations return err != nil, this
|
||||
// function MUST NOT return a non-nil ReadCloser.
|
||||
GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (reader *GetObjectReader, err error)
|
||||
GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (reader *GetObjectReader, err error)
|
||||
GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error)
|
||||
PutObject(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
|
||||
CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error)
|
||||
@ -272,7 +263,7 @@ func GetObject(ctx context.Context, api ObjectLayer, bucket, object string, star
|
||||
}
|
||||
Range := &HTTPRangeSpec{Start: startOffset, End: startOffset + length}
|
||||
|
||||
reader, err := api.GetObjectNInfo(ctx, bucket, object, Range, header, readLock, opts)
|
||||
reader, err := api.GetObjectNInfo(ctx, bucket, object, Range, header, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -238,7 +238,8 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r
|
||||
Start: offset,
|
||||
End: -1,
|
||||
}
|
||||
return getObjectNInfo(ctx, bucket, object, rs, r.Header, noLock, opts)
|
||||
opts.NoLock = true
|
||||
return getObjectNInfo(ctx, bucket, object, rs, r.Header, opts)
|
||||
},
|
||||
actualSize,
|
||||
)
|
||||
@ -402,7 +403,7 @@ func (api objectAPIHandlers) getObjectHandler(ctx context.Context, objectAPI Obj
|
||||
}
|
||||
|
||||
var proxy proxyResult
|
||||
gr, err := getObjectNInfo(ctx, bucket, object, rs, r.Header, readLock, opts)
|
||||
gr, err := getObjectNInfo(ctx, bucket, object, rs, r.Header, opts)
|
||||
if err != nil {
|
||||
var (
|
||||
reader *GetObjectReader
|
||||
@ -1098,19 +1099,12 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
|
||||
return checkCopyObjectPreconditions(ctx, w, r, o)
|
||||
}
|
||||
getOpts.CheckPrecondFn = checkCopyPrecondFn
|
||||
|
||||
// FIXME: a possible race exists between a parallel
|
||||
// GetObject v/s CopyObject with metadata updates, ideally
|
||||
// we should be holding write lock here but it is not
|
||||
// possible due to other constraints such as knowing
|
||||
// the type of source content etc.
|
||||
lock := noLock
|
||||
if !cpSrcDstSame {
|
||||
lock = readLock
|
||||
if cpSrcDstSame {
|
||||
getOpts.NoLock = true
|
||||
}
|
||||
|
||||
var rs *HTTPRangeSpec
|
||||
gr, err := getObjectNInfo(ctx, srcBucket, srcObject, rs, r.Header, lock, getOpts)
|
||||
gr, err := getObjectNInfo(ctx, srcBucket, srcObject, rs, r.Header, getOpts)
|
||||
if err != nil {
|
||||
if isErrPreconditionFailed(err) {
|
||||
return
|
||||
|
@ -1268,7 +1268,7 @@ func testAPIPutObjectStreamSigV4Handler(obj ObjectLayer, instanceType, bucketNam
|
||||
t.Fatalf("Test %d: %s: ContentEncoding is set to \"%s\" which is unexpected, expected \"%s\"", i+1, instanceType, objInfo.ContentEncoding, expectedContentEncoding)
|
||||
}
|
||||
buffer := new(bytes.Buffer)
|
||||
r, err := obj.GetObjectNInfo(context.Background(), testCase.bucketName, testCase.objectName, nil, nil, readLock, opts)
|
||||
r, err := obj.GetObjectNInfo(context.Background(), testCase.bucketName, testCase.objectName, nil, nil, opts)
|
||||
if err != nil {
|
||||
t.Fatalf("Test %d: %s: Failed to fetch the copied object: <ERROR> %s", i+1, instanceType, err)
|
||||
}
|
||||
@ -1559,7 +1559,7 @@ func testAPIPutObjectHandler(obj ObjectLayer, instanceType, bucketName string, a
|
||||
if testCase.expectedRespStatus == http.StatusOK {
|
||||
buffer := new(bytes.Buffer)
|
||||
// Fetch the object to check whether the content is same as the one uploaded via PutObject.
|
||||
gr, err := obj.GetObjectNInfo(context.Background(), testCase.bucketName, testCase.objectName, nil, nil, readLock, opts)
|
||||
gr, err := obj.GetObjectNInfo(context.Background(), testCase.bucketName, testCase.objectName, nil, nil, opts)
|
||||
if err != nil {
|
||||
t.Fatalf("Test %d: %s: Failed to fetch the copied object: <ERROR> %s", i, instanceType, err)
|
||||
}
|
||||
@ -1608,7 +1608,7 @@ func testAPIPutObjectHandler(obj ObjectLayer, instanceType, bucketName string, a
|
||||
if testCase.expectedRespStatus == http.StatusOK {
|
||||
buffer := new(bytes.Buffer)
|
||||
// Fetch the object to check whether the content is same as the one uploaded via PutObject.
|
||||
gr, err := obj.GetObjectNInfo(context.Background(), testCase.bucketName, testCase.objectName, nil, nil, readLock, opts)
|
||||
gr, err := obj.GetObjectNInfo(context.Background(), testCase.bucketName, testCase.objectName, nil, nil, opts)
|
||||
if err != nil {
|
||||
t.Fatalf("Test %d: %s: Failed to fetch the copied object: <ERROR> %s", i, instanceType, err)
|
||||
}
|
||||
@ -1769,7 +1769,7 @@ func testAPICopyObjectPartHandlerSanity(obj ObjectLayer, instanceType, bucketNam
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
r, err := obj.GetObjectNInfo(context.Background(), bucketName, testObject, nil, nil, readLock, ObjectOptions{})
|
||||
r, err := obj.GetObjectNInfo(context.Background(), bucketName, testObject, nil, nil, ObjectOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Test: %s reading completed file failed: <ERROR> %v", instanceType, err)
|
||||
}
|
||||
@ -2485,7 +2485,7 @@ func testAPICopyObjectHandler(obj ObjectLayer, instanceType, bucketName string,
|
||||
// Note that this goes directly to the file system,
|
||||
// so encryption/compression may interfere at some point.
|
||||
buffers[0].Reset()
|
||||
r, err := obj.GetObjectNInfo(context.Background(), testCase.bucketName, testCase.newObjectName, nil, nil, readLock, opts)
|
||||
r, err := obj.GetObjectNInfo(context.Background(), testCase.bucketName, testCase.newObjectName, nil, nil, opts)
|
||||
if err != nil {
|
||||
t.Fatalf("Test %d: %s reading completed file failed: <ERROR> %v", i, instanceType, err)
|
||||
}
|
||||
|
@ -353,7 +353,7 @@ func (api objectAPIHandlers) CopyObjectPartHandler(w http.ResponseWriter, r *htt
|
||||
return false
|
||||
}
|
||||
getOpts.CheckPrecondFn = checkCopyPartPrecondFn
|
||||
gr, err := getObjectNInfo(ctx, srcBucket, srcObject, rs, r.Header, readLock, getOpts)
|
||||
gr, err := getObjectNInfo(ctx, srcBucket, srcObject, rs, r.Header, getOpts)
|
||||
if err != nil {
|
||||
if isErrPreconditionFailed(err) {
|
||||
return
|
||||
|
@ -188,7 +188,7 @@ func (api objectAPIHandlers) getObjectInArchiveFileHandler(ctx context.Context,
|
||||
end = zipObjInfo.Size
|
||||
}
|
||||
rs := &HTTPRangeSpec{Start: file.Offset, End: end}
|
||||
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, zipPath, rs, nil, readLock, opts)
|
||||
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, zipPath, rs, nil, opts)
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
|
||||
return
|
||||
@ -331,7 +331,7 @@ func getFilesListFromZIPObject(ctx context.Context, objectAPI ObjectLayer, bucke
|
||||
var objSize int64
|
||||
for {
|
||||
rs := &HTTPRangeSpec{IsSuffixLength: true, Start: int64(-size)}
|
||||
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, rs, nil, readLock, opts)
|
||||
gr, err := objectAPI.GetObjectNInfo(ctx, bucket, object, rs, nil, opts)
|
||||
if err != nil {
|
||||
return nil, ObjectInfo{}, err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user