mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
xl: Remove check-dir in ReadVersion (#11200)
The only purpose of check-dir flag in ReadVersion is to return 404 when an object has xl.meta but without data. This is causing an extract call to the disk which can be penalizing in case of busy system where disks receive many concurrent access.
This commit is contained in:
parent
aa85af4d1a
commit
677e80c0f8
@ -33,6 +33,7 @@ type parallelReader struct {
|
||||
readers []io.ReaderAt
|
||||
orgReaders []io.ReaderAt
|
||||
dataBlocks int
|
||||
errs []error
|
||||
offset int64
|
||||
shardSize int64
|
||||
shardFileSize int64
|
||||
@ -49,6 +50,7 @@ func newParallelReader(readers []io.ReaderAt, e Erasure, offset, totalLength int
|
||||
return ¶llelReader{
|
||||
readers: readers,
|
||||
orgReaders: readers,
|
||||
errs: make([]error, len(readers)),
|
||||
dataBlocks: e.dataBlocks,
|
||||
offset: (offset / e.blockSize) * e.ShardSize(),
|
||||
shardSize: e.ShardSize(),
|
||||
@ -169,6 +171,8 @@ func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) {
|
||||
// This will be communicated upstream.
|
||||
p.orgReaders[bufIdx] = nil
|
||||
p.readers[i] = nil
|
||||
p.errs[i] = err
|
||||
|
||||
// Since ReadAt returned error, trigger another read.
|
||||
readTriggerCh <- true
|
||||
return
|
||||
@ -191,7 +195,7 @@ func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) {
|
||||
return newBuf, nil
|
||||
}
|
||||
|
||||
return nil, errErasureReadQuorum
|
||||
return nil, reduceReadQuorumErrs(context.Background(), p.errs, objectOpIgnoredErrs, p.dataBlocks)
|
||||
}
|
||||
|
||||
type errDecodeHealRequired struct {
|
||||
|
@ -67,7 +67,7 @@ func TestHealing(t *testing.T) {
|
||||
}
|
||||
|
||||
disk := er.getDisks()[0]
|
||||
fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false)
|
||||
fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -84,7 +84,7 @@ func TestHealing(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false)
|
||||
fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -113,7 +113,7 @@ func TestHealing(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", false)
|
||||
fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -113,21 +113,9 @@ func hashOrder(key string, cardinality int) []int {
|
||||
return nums
|
||||
}
|
||||
|
||||
// Reads all `xl.meta` metadata as a FileInfo slice and checks if the data dir exists as well,
|
||||
// otherwise returns errFileNotFound (or errFileVersionNotFound)
|
||||
func getAllObjectFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string) ([]FileInfo, []error) {
|
||||
return readVersionFromDisks(ctx, disks, bucket, object, versionID, true)
|
||||
}
|
||||
|
||||
// Reads all `xl.meta` metadata as a FileInfo slice.
|
||||
// Returns error slice indicating the failed metadata reads.
|
||||
func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string) ([]FileInfo, []error) {
|
||||
return readVersionFromDisks(ctx, disks, bucket, object, versionID, false)
|
||||
}
|
||||
|
||||
// Reads all `xl.meta` metadata as a FileInfo slice and checks if the data dir
|
||||
// exists as well, if checkDataDir is set to true.
|
||||
func readVersionFromDisks(ctx context.Context, disks []StorageAPI, bucket, object, versionID string, checkDataDir bool) ([]FileInfo, []error) {
|
||||
metadataArray := make([]FileInfo, len(disks))
|
||||
|
||||
g := errgroup.WithNErrs(len(disks))
|
||||
@ -138,7 +126,7 @@ func readVersionFromDisks(ctx context.Context, disks []StorageAPI, bucket, objec
|
||||
if disks[index] == nil {
|
||||
return errDiskNotFound
|
||||
}
|
||||
metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID, checkDataDir)
|
||||
metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID)
|
||||
if err != nil {
|
||||
if !IsErr(err, errFileNotFound, errVolumeNotFound, errFileVersionNotFound, errDiskNotFound) {
|
||||
logger.LogOnceIf(ctx, err, disks[index].String())
|
||||
|
@ -113,7 +113,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
|
||||
}
|
||||
for _, uploadIDDir := range uploadIDDirs {
|
||||
uploadIDPath := pathJoin(shaDir, uploadIDDir)
|
||||
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "", false)
|
||||
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "")
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
@ -127,7 +127,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
|
||||
return
|
||||
}
|
||||
for _, tmpDir := range tmpDirs {
|
||||
fi, err := disk.ReadVersion(ctx, minioMetaTmpBucket, tmpDir, "", false)
|
||||
fi, err := disk.ReadVersion(ctx, minioMetaTmpBucket, tmpDir, "")
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
@ -181,7 +181,7 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec
|
||||
if populatedUploadIds.Contains(uploadID) {
|
||||
continue
|
||||
}
|
||||
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "", false)
|
||||
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "")
|
||||
if err != nil {
|
||||
return result, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
@ -368,7 +368,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
|
||||
disks := er.getDisks()
|
||||
|
||||
// Read metadata associated with the object from all disks.
|
||||
metaArr, errs := getAllObjectFileInfo(ctx, disks, bucket, object, opts.VersionID)
|
||||
metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID)
|
||||
|
||||
readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs)
|
||||
if err != nil {
|
||||
|
@ -288,16 +288,38 @@ func TestGetObjectNoQuorum(t *testing.T) {
|
||||
bucket := "bucket"
|
||||
object := "object"
|
||||
opts := ObjectOptions{}
|
||||
|
||||
// Test use case 1: All disks are online, xl.meta are present, but data are missing
|
||||
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, disk := range xl.getDisks() {
|
||||
files, _ := disk.ListDir(ctx, bucket, object, -1)
|
||||
for _, file := range files {
|
||||
if file != "xl.meta" {
|
||||
disk.Delete(ctx, bucket, pathJoin(object, file), true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
err = xl.GetObject(ctx, bucket, object, 0, int64(len("abcd")), ioutil.Discard, "", opts)
|
||||
if err != toObjectErr(errFileNotFound, bucket, object) {
|
||||
t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err)
|
||||
}
|
||||
|
||||
// Test use case 2: Make 9 disks offline, which leaves less than quorum number of disks
|
||||
// in a 16 disk Erasure setup. The original disks are 'replaced' with
|
||||
// naughtyDisks that fail after 'f' successful StorageAPI method
|
||||
// invocations, where f - [0,2)
|
||||
|
||||
// Create "object" under "bucket".
|
||||
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Make 9 disks offline, which leaves less than quorum number of disks
|
||||
// in a 16 disk Erasure setup. The original disks are 'replaced' with
|
||||
// naughtyDisks that fail after 'f' successful StorageAPI method
|
||||
// invocations, where f - [0,2)
|
||||
for f := 0; f < 2; f++ {
|
||||
diskErrors := make(map[int]error)
|
||||
for i := 0; i <= f; i++ {
|
||||
@ -320,9 +342,84 @@ func TestGetObjectNoQuorum(t *testing.T) {
|
||||
// Fetch object from store.
|
||||
err = xl.GetObject(ctx, bucket, object, 0, int64(len("abcd")), ioutil.Discard, "", opts)
|
||||
if err != toObjectErr(errErasureReadQuorum, bucket, object) {
|
||||
t.Errorf("Expected putObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err)
|
||||
t.Errorf("Expected GetObject to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestHeadObjectNoQuorum(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Create an instance of xl backend.
|
||||
obj, fsDirs, err := prepareErasure16(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Cleanup backend directories.
|
||||
defer obj.Shutdown(context.Background())
|
||||
defer removeRoots(fsDirs)
|
||||
|
||||
z := obj.(*erasureServerPools)
|
||||
xl := z.serverPools[0].sets[0]
|
||||
|
||||
// Create "bucket"
|
||||
err = obj.MakeBucketWithLocation(ctx, "bucket", BucketOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
bucket := "bucket"
|
||||
object := "object"
|
||||
opts := ObjectOptions{}
|
||||
|
||||
// Test use case 1: All disks are online, xl.meta are present, but data are missing
|
||||
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for _, disk := range xl.getDisks() {
|
||||
files, _ := disk.ListDir(ctx, bucket, object, -1)
|
||||
for _, file := range files {
|
||||
if file != "xl.meta" {
|
||||
disk.Delete(ctx, bucket, pathJoin(object, file), true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_, err = xl.GetObjectInfo(ctx, bucket, object, opts)
|
||||
if err != nil {
|
||||
t.Errorf("Expected StatObject to succeed if data dir are not found, but failed with %v", err)
|
||||
}
|
||||
|
||||
// Test use case 2: Make 9 disks offline, which leaves less than quorum number of disks
|
||||
// in a 16 disk Erasure setup. The original disks are 'replaced' with
|
||||
// naughtyDisks that fail after 'f' successful StorageAPI method
|
||||
// invocations, where f - [0,2)
|
||||
|
||||
// Create "object" under "bucket".
|
||||
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
erasureDisks := xl.getDisks()
|
||||
for i := range erasureDisks[:10] {
|
||||
erasureDisks[i] = nil
|
||||
}
|
||||
|
||||
z.serverPools[0].erasureDisksMu.Lock()
|
||||
xl.getDisks = func() []StorageAPI {
|
||||
return erasureDisks
|
||||
}
|
||||
z.serverPools[0].erasureDisksMu.Unlock()
|
||||
|
||||
// Fetch object from store.
|
||||
_, err = xl.GetObjectInfo(ctx, bucket, object, opts)
|
||||
if err != toObjectErr(errErasureReadQuorum, bucket, object) {
|
||||
t.Errorf("Expected getObjectInfo to fail with %v, but failed with %v", toObjectErr(errErasureWriteQuorum, bucket, object), err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPutObjectNoQuorum(t *testing.T) {
|
||||
|
@ -388,7 +388,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
|
||||
continue
|
||||
}
|
||||
|
||||
_, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(0), "", false)
|
||||
_, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(0), "")
|
||||
if err != nil {
|
||||
time.Sleep(retryDelay)
|
||||
retries++
|
||||
@ -463,7 +463,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
|
||||
continue
|
||||
}
|
||||
|
||||
_, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(partN), "", false)
|
||||
_, err := disks[0].ReadVersion(ctx, minioMetaBucket, o.objectPath(partN), "")
|
||||
if err != nil {
|
||||
time.Sleep(retryDelay)
|
||||
retries++
|
||||
|
@ -252,11 +252,11 @@ func (d *naughtyDisk) DeleteVersion(ctx context.Context, volume, path string, fi
|
||||
return d.disk.DeleteVersion(ctx, volume, path, fi)
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) {
|
||||
func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) {
|
||||
if err := d.calcError(); err != nil {
|
||||
return FileInfo{}, err
|
||||
}
|
||||
return d.disk.ReadVersion(ctx, volume, path, versionID, checkDataDir)
|
||||
return d.disk.ReadVersion(ctx, volume, path, versionID)
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) WriteAll(ctx context.Context, volume string, path string, b []byte) (err error) {
|
||||
|
@ -58,7 +58,7 @@ type StorageAPI interface {
|
||||
DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) error
|
||||
DeleteVersions(ctx context.Context, volume string, versions []FileInfo) []error
|
||||
WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error
|
||||
ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (FileInfo, error)
|
||||
ReadVersion(ctx context.Context, volume, path, versionID string) (FileInfo, error)
|
||||
RenameData(ctx context.Context, srcVolume, srcPath, dataDir, dstVolume, dstPath string) error
|
||||
|
||||
// File operations.
|
||||
|
@ -396,12 +396,11 @@ func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcP
|
||||
return err
|
||||
}
|
||||
|
||||
func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) {
|
||||
func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) {
|
||||
values := make(url.Values)
|
||||
values.Set(storageRESTVolume, volume)
|
||||
values.Set(storageRESTFilePath, path)
|
||||
values.Set(storageRESTVersionID, versionID)
|
||||
values.Set(storageRESTCheckDataDir, strconv.FormatBool(checkDataDir))
|
||||
|
||||
respBody, err := client.call(ctx, storageRESTMethodReadVersion, values, nil, -1)
|
||||
if err != nil {
|
||||
|
@ -59,7 +59,6 @@ const (
|
||||
storageRESTDirPath = "dir-path"
|
||||
storageRESTFilePath = "file-path"
|
||||
storageRESTVersionID = "version-id"
|
||||
storageRESTCheckDataDir = "check-data-dir"
|
||||
storageRESTTotalVersions = "total-versions"
|
||||
storageRESTSrcVolume = "source-volume"
|
||||
storageRESTSrcPath = "source-path"
|
||||
|
@ -327,13 +327,7 @@ func (s *storageRESTServer) ReadVersionHandler(w http.ResponseWriter, r *http.Re
|
||||
volume := vars[storageRESTVolume]
|
||||
filePath := vars[storageRESTFilePath]
|
||||
versionID := vars[storageRESTVersionID]
|
||||
checkDataDir, err := strconv.ParseBool(vars[storageRESTCheckDataDir])
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID, checkDataDir)
|
||||
fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID)
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
return
|
||||
@ -1043,7 +1037,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersion).HandlerFunc(httpTraceHdrs(server.DeleteVersionHandler)).
|
||||
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(httpTraceHdrs(server.ReadVersionHandler)).
|
||||
Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTVersionID, storageRESTCheckDataDir)...)
|
||||
Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTVersionID)...)
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodRenameData).HandlerFunc(httpTraceHdrs(server.RenameDataHandler)).
|
||||
Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDataDir,
|
||||
storageRESTDstVolume, storageRESTDstPath)...)
|
||||
|
@ -272,12 +272,12 @@ func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path s
|
||||
return p.storage.WriteMetadata(ctx, volume, path, fi)
|
||||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) {
|
||||
func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) {
|
||||
if err = p.checkDiskStale(); err != nil {
|
||||
return fi, err
|
||||
}
|
||||
|
||||
return p.storage.ReadVersion(ctx, volume, path, versionID, checkDataDir)
|
||||
return p.storage.ReadVersion(ctx, volume, path, versionID)
|
||||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error) {
|
||||
|
@ -1123,7 +1123,7 @@ func (s *xlStorage) renameLegacyMetadata(volume, path string) error {
|
||||
}
|
||||
|
||||
// ReadVersion - reads metadata and returns FileInfo at path `xl.meta`
|
||||
func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) {
|
||||
func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) {
|
||||
buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile))
|
||||
if err != nil {
|
||||
if err == errFileNotFound {
|
||||
@ -1158,28 +1158,7 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
|
||||
return fi, errFileNotFound
|
||||
}
|
||||
|
||||
fi, err = getFileInfo(buf, volume, path, versionID)
|
||||
if err != nil {
|
||||
return fi, err
|
||||
}
|
||||
// skip checking data dir when object is transitioned - after transition, data dir will
|
||||
// be empty with just the metadata left behind.
|
||||
if fi.TransitionStatus != "" {
|
||||
checkDataDir = false
|
||||
}
|
||||
if fi.DataDir != "" && checkDataDir {
|
||||
if _, err = s.StatVol(ctx, pathJoin(volume, path, fi.DataDir, slashSeparator)); err != nil {
|
||||
if err == errVolumeNotFound {
|
||||
if versionID != "" {
|
||||
return fi, errFileVersionNotFound
|
||||
}
|
||||
return fi, errFileNotFound
|
||||
}
|
||||
return fi, err
|
||||
}
|
||||
}
|
||||
|
||||
return fi, nil
|
||||
return getFileInfo(buf, volume, path, versionID)
|
||||
}
|
||||
|
||||
// ReadAll reads from r until an error or EOF and returns the data it read.
|
||||
|
Loading…
Reference in New Issue
Block a user