mirror of
https://github.com/minio/minio.git
synced 2025-04-21 19:14:39 -04:00
fs: listObjects shouldn't take FS locks while listing (#10248)
This commit is contained in:
parent
900eebb9a4
commit
79ed7ce451
55
cmd/fs-v1.go
55
cmd/fs-v1.go
@ -906,6 +906,56 @@ func (fs *FSObjects) defaultFsJSON(object string) fsMetaV1 {
|
|||||||
return fsMeta
|
return fsMeta
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (fs *FSObjects) getObjectInfoNoFSLock(ctx context.Context, bucket, object string) (oi ObjectInfo, e error) {
|
||||||
|
fsMeta := fsMetaV1{}
|
||||||
|
if HasSuffix(object, SlashSeparator) {
|
||||||
|
fi, err := fsStatDir(ctx, pathJoin(fs.fsPath, bucket, object))
|
||||||
|
if err != nil {
|
||||||
|
return oi, err
|
||||||
|
}
|
||||||
|
return fsMeta.ToObjectInfo(bucket, object, fi), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
fsMetaPath := pathJoin(fs.fsPath, minioMetaBucket, bucketMetaPrefix, bucket, object, fs.metaJSONFile)
|
||||||
|
// Read `fs.json` to perhaps contend with
|
||||||
|
// parallel Put() operations.
|
||||||
|
|
||||||
|
rc, _, err := fsOpenFile(ctx, fsMetaPath, 0)
|
||||||
|
if err == nil {
|
||||||
|
fsMetaBuf, rerr := ioutil.ReadAll(rc)
|
||||||
|
rc.Close()
|
||||||
|
if rerr == nil {
|
||||||
|
var json = jsoniter.ConfigCompatibleWithStandardLibrary
|
||||||
|
if rerr = json.Unmarshal(fsMetaBuf, &fsMeta); rerr != nil {
|
||||||
|
// For any error to read fsMeta, set default ETag and proceed.
|
||||||
|
fsMeta = fs.defaultFsJSON(object)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// For any error to read fsMeta, set default ETag and proceed.
|
||||||
|
fsMeta = fs.defaultFsJSON(object)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return a default etag and content-type based on the object's extension.
|
||||||
|
if err == errFileNotFound {
|
||||||
|
fsMeta = fs.defaultFsJSON(object)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ignore if `fs.json` is not available, this is true for pre-existing data.
|
||||||
|
if err != nil && err != errFileNotFound {
|
||||||
|
logger.LogIf(ctx, err)
|
||||||
|
return oi, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stat the file to get file size.
|
||||||
|
fi, err := fsStatFile(ctx, pathJoin(fs.fsPath, bucket, object))
|
||||||
|
if err != nil {
|
||||||
|
return oi, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return fsMeta.ToObjectInfo(bucket, object, fi), nil
|
||||||
|
}
|
||||||
|
|
||||||
// getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo.
|
// getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo.
|
||||||
func (fs *FSObjects) getObjectInfo(ctx context.Context, bucket, object string) (oi ObjectInfo, e error) {
|
func (fs *FSObjects) getObjectInfo(ctx context.Context, bucket, object string) (oi ObjectInfo, e error) {
|
||||||
fsMeta := fsMetaV1{}
|
fsMeta := fsMetaV1{}
|
||||||
@ -1391,14 +1441,13 @@ func (fs *FSObjects) ListObjectVersions(ctx context.Context, bucket, prefix, mar
|
|||||||
// ListObjects - list all objects at prefix upto maxKeys., optionally delimited by '/'. Maintains the list pool
|
// ListObjects - list all objects at prefix upto maxKeys., optionally delimited by '/'. Maintains the list pool
|
||||||
// state for future re-entrant list requests.
|
// state for future re-entrant list requests.
|
||||||
func (fs *FSObjects) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) {
|
func (fs *FSObjects) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) {
|
||||||
|
|
||||||
atomic.AddInt64(&fs.activeIOCount, 1)
|
atomic.AddInt64(&fs.activeIOCount, 1)
|
||||||
defer func() {
|
defer func() {
|
||||||
atomic.AddInt64(&fs.activeIOCount, -1)
|
atomic.AddInt64(&fs.activeIOCount, -1)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return listObjects(ctx, fs, bucket, prefix, marker, delimiter, maxKeys, fs.listPool,
|
return listObjects(ctx, fs, bucket, prefix, marker, delimiter, maxKeys, fs.listPool,
|
||||||
fs.listDirFactory(), fs.getObjectInfo, fs.getObjectInfo)
|
fs.listDirFactory(), fs.getObjectInfoNoFSLock, fs.getObjectInfoNoFSLock)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetObjectTags - get object tags from an existing object
|
// GetObjectTags - get object tags from an existing object
|
||||||
@ -1495,7 +1544,7 @@ func (fs *FSObjects) HealBucket(ctx context.Context, bucket string, dryRun, remo
|
|||||||
// error walker returns error. Optionally if context.Done() is received
|
// error walker returns error. Optionally if context.Done() is received
|
||||||
// then Walk() stops the walker.
|
// then Walk() stops the walker.
|
||||||
func (fs *FSObjects) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts ObjectOptions) error {
|
func (fs *FSObjects) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts ObjectOptions) error {
|
||||||
return fsWalk(ctx, fs, bucket, prefix, fs.listDirFactory(), results, fs.getObjectInfo, fs.getObjectInfo)
|
return fsWalk(ctx, fs, bucket, prefix, fs.listDirFactory(), results, fs.getObjectInfoNoFSLock, fs.getObjectInfoNoFSLock)
|
||||||
}
|
}
|
||||||
|
|
||||||
// HealObjects - no-op for fs. Valid only for Erasure.
|
// HealObjects - no-op for fs. Valid only for Erasure.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user