diff --git a/cmd/erasure-healing_test.go b/cmd/erasure-healing_test.go index 2fa53820b..b00041a83 100644 --- a/cmd/erasure-healing_test.go +++ b/cmd/erasure-healing_test.go @@ -67,7 +67,7 @@ func TestHealing(t *testing.T) { } disk := er.getDisks()[0] - fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "") + fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false) if err != nil { t.Fatal(err) } @@ -84,7 +84,7 @@ func TestHealing(t *testing.T) { t.Fatal(err) } - fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "") + fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false) if err != nil { t.Fatal(err) } @@ -113,7 +113,7 @@ func TestHealing(t *testing.T) { t.Fatal(err) } - fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "") + fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", false) if err != nil { t.Fatal(err) } diff --git a/cmd/erasure-metadata-utils.go b/cmd/erasure-metadata-utils.go index befa7e4f2..dca0212bc 100644 --- a/cmd/erasure-metadata-utils.go +++ b/cmd/erasure-metadata-utils.go @@ -113,9 +113,21 @@ func hashOrder(key string, cardinality int) []int { return nums } +// Reads all `xl.meta` metadata as a FileInfo slice and checks if the data dir exists as well, +// otherwise returns errFileNotFound (or errFileVersionNotFound) +func getAllObjectFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string) ([]FileInfo, []error) { + return readVersionFromDisks(ctx, disks, bucket, object, versionID, true) +} + // Reads all `xl.meta` metadata as a FileInfo slice. // Returns error slice indicating the failed metadata reads. func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string) ([]FileInfo, []error) { + return readVersionFromDisks(ctx, disks, bucket, object, versionID, false) +} + +// Reads all `xl.meta` metadata as a FileInfo slice and checks if the data dir +// exists as well, if checkDataDir is set to true. +func readVersionFromDisks(ctx context.Context, disks []StorageAPI, bucket, object, versionID string, checkDataDir bool) ([]FileInfo, []error) { metadataArray := make([]FileInfo, len(disks)) g := errgroup.WithNErrs(len(disks)) @@ -126,7 +138,7 @@ func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, ve if disks[index] == nil { return errDiskNotFound } - metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID) + metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID, checkDataDir) if err != nil { if err != errFileNotFound && err != errVolumeNotFound && err != errFileVersionNotFound { logger.GetReqInfo(ctx).AppendTags("disk", disks[index].String()) diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index 01f7676b5..27c9b8b73 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -43,7 +43,25 @@ func (er erasureObjects) getMultipartSHADir(bucket, object string) string { // checkUploadIDExists - verify if a given uploadID exists and is valid. func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string) error { - _, _, _, err := er.getObjectFileInfo(ctx, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), ObjectOptions{}) + disks := er.getDisks() + + // Read metadata associated with the object from all disks. + metaArr, errs := readAllFileInfo(ctx, disks, minioMetaMultipartBucket, er.getUploadIDDir(bucket, object, uploadID), "") + + readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs) + if err != nil { + return err + } + + if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil { + return toObjectErr(reducedErr, bucket, object) + } + + // List all online disks. + _, modTime := listOnlineDisks(disks, metaArr, errs) + + // Pick latest valid metadata. + _, err = pickValidFileInfo(ctx, metaArr, modTime, readQuorum) return err } @@ -110,7 +128,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto } for _, uploadIDDir := range uploadIDDirs { uploadIDPath := pathJoin(shaDir, uploadIDDir) - fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "") + fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "", false) if err != nil { continue } @@ -124,7 +142,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto return } for _, tmpDir := range tmpDirs { - fi, err := disk.ReadVersion(ctx, minioMetaTmpBucket, tmpDir, "") + fi, err := disk.ReadVersion(ctx, minioMetaTmpBucket, tmpDir, "", false) if err != nil { continue } @@ -178,7 +196,7 @@ func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, objec if populatedUploadIds.Contains(uploadID) { continue } - fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "") + fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "", false) if err != nil { return result, toObjectErr(err, bucket, object) } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index 7a9224eb4..fe71dd134 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -355,7 +355,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s disks := er.getDisks() // Read metadata associated with the object from all disks. - metaArr, errs := readAllFileInfo(ctx, disks, bucket, object, opts.VersionID) + metaArr, errs := getAllObjectFileInfo(ctx, disks, bucket, object, opts.VersionID) readQuorum, _, err := objectQuorumFromMeta(ctx, er, metaArr, errs) if err != nil { diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index 4146a5b4e..911ccc094 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -259,11 +259,11 @@ func (d *naughtyDisk) DeleteVersion(ctx context.Context, volume, path string, fi return d.disk.DeleteVersion(ctx, volume, path, fi) } -func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { +func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) { if err := d.calcError(); err != nil { return FileInfo{}, err } - return d.disk.ReadVersion(ctx, volume, path, versionID) + return d.disk.ReadVersion(ctx, volume, path, versionID, checkDataDir) } func (d *naughtyDisk) WriteAll(ctx context.Context, volume string, path string, reader io.Reader) (err error) { diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index e6b056612..da3bc1591 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -59,7 +59,7 @@ type StorageAPI interface { DeleteVersion(ctx context.Context, volume, path string, fi FileInfo) error DeleteVersions(ctx context.Context, volume string, versions []FileInfo) []error WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error - ReadVersion(ctx context.Context, volume, path, versionID string) (FileInfo, error) + ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (FileInfo, error) RenameData(ctx context.Context, srcVolume, srcPath, dataDir, dstVolume, dstPath string) error // File operations. diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 24a9062ca..82afc0ad8 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -366,11 +366,12 @@ func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcP return err } -func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { +func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) { values := make(url.Values) values.Set(storageRESTVolume, volume) values.Set(storageRESTFilePath, path) values.Set(storageRESTVersionID, versionID) + values.Set(storageRESTCheckDataDir, strconv.FormatBool(checkDataDir)) respBody, err := client.call(ctx, storageRESTMethodReadVersion, values, nil, -1) if err != nil { diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index b4badbc5b..6145eb2c3 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -17,7 +17,7 @@ package cmd const ( - storageRESTVersion = "v20" // Re-implementation of storage layer + storageRESTVersion = "v21" // Add checkDataDir in ReadVersion API storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) @@ -60,6 +60,7 @@ const ( storageRESTDirPath = "dir-path" storageRESTFilePath = "file-path" storageRESTVersionID = "version-id" + storageRESTCheckDataDir = "check-data-dir" storageRESTTotalVersions = "total-versions" storageRESTSrcVolume = "source-volume" storageRESTSrcPath = "source-path" diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 582bee117..20293e6b7 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -326,8 +326,13 @@ func (s *storageRESTServer) ReadVersionHandler(w http.ResponseWriter, r *http.Re volume := vars[storageRESTVolume] filePath := vars[storageRESTFilePath] versionID := vars[storageRESTVersionID] + checkDataDir, err := strconv.ParseBool(vars[storageRESTCheckDataDir]) + if err != nil { + s.writeErrorResponse(w, err) + return + } - fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID) + fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID, checkDataDir) if err != nil { s.writeErrorResponse(w, err) return @@ -925,7 +930,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerSets Endpoint subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersion).HandlerFunc(httpTraceHdrs(server.DeleteVersionHandler)). Queries(restQueries(storageRESTVolume, storageRESTFilePath)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(httpTraceHdrs(server.ReadVersionHandler)). - Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTVersionID)...) + Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTVersionID, storageRESTCheckDataDir)...) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodRenameData).HandlerFunc(httpTraceHdrs(server.RenameDataHandler)). Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDataDir, storageRESTDstVolume, storageRESTDstPath)...) diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index b1640b8e3..248905acd 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -288,12 +288,12 @@ func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path s return p.storage.WriteMetadata(ctx, volume, path, fi) } -func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { +func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) { if err = p.checkDiskStale(); err != nil { return fi, err } - return p.storage.ReadVersion(ctx, volume, path, versionID) + return p.storage.ReadVersion(ctx, volume, path, versionID, checkDataDir) } func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path string) (buf []byte, err error) { diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 2f2b2a685..8df3865da 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -1318,7 +1318,7 @@ func (s *xlStorage) renameLegacyMetadata(volume, path string) error { } // ReadVersion - reads metadata and returns FileInfo at path `xl.meta` -func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string) (fi FileInfo, err error) { +func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string, checkDataDir bool) (fi FileInfo, err error) { buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile)) if err != nil { if err == errFileNotFound { @@ -1341,7 +1341,24 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str return fi, errFileNotFound } - return getFileInfo(buf, volume, path, versionID) + fi, err = getFileInfo(buf, volume, path, versionID) + if err != nil { + return fi, err + } + + if fi.DataDir != "" && checkDataDir { + if _, err = s.StatVol(ctx, pathJoin(volume, path, fi.DataDir, slashSeparator)); err != nil { + if err == errVolumeNotFound { + if versionID != "" { + return fi, errFileVersionNotFound + } + return fi, errFileNotFound + } + return fi, err + } + } + + return fi, nil } // ReadAll reads from r until an error or EOF and returns the data it read.