return errors if dataDir is missing during HeadObject() (#18477)

Bonus: allow replication to attempt Deletes/Puts when
the remote returns quorum errors of some kind, this is
to ensure that MinIO can rewrite the namespace with the
latest version that exists on the source.
This commit is contained in:
Harshavardhana 2023-11-20 21:33:47 -08:00 committed by GitHub
parent 51aa59a737
commit a4cfb5e1ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 164 additions and 100 deletions

View File

@ -5,11 +5,19 @@ set -x
## change working directory ## change working directory
cd .github/workflows/multipart/ cd .github/workflows/multipart/
docker-compose -f docker-compose-site1.yaml rm -s -f function cleanup() {
docker-compose -f docker-compose-site2.yaml rm -s -f docker-compose -f docker-compose-site1.yaml rm -s -f || true
for volume in $(docker volume ls -q | grep minio); do docker-compose -f docker-compose-site2.yaml rm -s -f || true
docker volume rm ${volume} for volume in $(docker volume ls -q | grep minio); do
done docker volume rm ${volume} || true
done
docker system prune -f || true
docker volume prune -f || true
docker volume rm $(docker volume ls -q -f dangling=true) || true
}
cleanup
if [ ! -f ./mc ]; then if [ ! -f ./mc ]; then
wget --quiet -O mc https://dl.minio.io/client/mc/release/linux-amd64/mc && wget --quiet -O mc https://dl.minio.io/client/mc/release/linux-amd64/mc &&
@ -101,15 +109,7 @@ if [ $failed_count_site2 -ne 0 ]; then
exit 1 exit 1
fi fi
docker-compose -f docker-compose-site1.yaml rm -s -f cleanup
docker-compose -f docker-compose-site2.yaml rm -s -f
for volume in $(docker volume ls -q | grep minio); do
docker volume rm ${volume}
done
docker system prune -f || true
docker volume prune -f || true
docker volume rm $(docker volume ls -q -f dangling=true) || true
## change working directory ## change working directory
cd ../../../ cd ../../../

View File

@ -639,19 +639,23 @@ func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationI
IsReplicationReadyForDeleteMarker: true, IsReplicationReadyForDeleteMarker: true,
}, },
}) })
serr := ErrorRespToObjectError(err, dobj.Bucket, dobj.ObjectName, dobj.VersionID)
switch { switch {
case isErrMethodNotAllowed(ErrorRespToObjectError(err, dobj.Bucket, dobj.ObjectName)): case isErrMethodNotAllowed(serr):
// delete marker already replicated // delete marker already replicated
if dobj.VersionID == "" && rinfo.VersionPurgeStatus.Empty() { if dobj.VersionID == "" && rinfo.VersionPurgeStatus.Empty() {
rinfo.ReplicationStatus = replication.Completed rinfo.ReplicationStatus = replication.Completed
return return
} }
case isErrObjectNotFound(ErrorRespToObjectError(err, dobj.Bucket, dobj.ObjectName)): case isErrObjectNotFound(serr), isErrVersionNotFound(serr):
// version being purged is already not found on target. // version being purged is already not found on target.
if !rinfo.VersionPurgeStatus.Empty() { if !rinfo.VersionPurgeStatus.Empty() {
rinfo.VersionPurgeStatus = Complete rinfo.VersionPurgeStatus = Complete
return return
} }
case isErrReadQuorum(serr), isErrWriteQuorum(serr):
// destination has some quorum issues, perform removeObject() anyways
// to complete the operation.
default: default:
if err != nil && minio.IsNetworkOrHostDown(err, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) { if err != nil && minio.IsNetworkOrHostDown(err, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
globalBucketTargetSys.markOffline(tgt.EndpointURL()) globalBucketTargetSys.markOffline(tgt.EndpointURL())
@ -1383,7 +1387,6 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
rinfo.Duration = time.Since(startTime) rinfo.Duration = time.Since(startTime)
}() }()
rAction = replicateAll
oi, cerr := tgt.StatObject(ctx, tgt.Bucket, object, minio.StatObjectOptions{ oi, cerr := tgt.StatObject(ctx, tgt.Bucket, object, minio.StatObjectOptions{
VersionID: objInfo.VersionID, VersionID: objInfo.VersionID,
Internal: minio.AdvancedGetOptions{ Internal: minio.AdvancedGetOptions{
@ -1420,16 +1423,19 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object
} }
return return
} }
} } else {
// if target returns error other than NoSuchKey, defer replication attempt // if target returns error other than NoSuchKey, defer replication attempt
if cerr != nil {
if minio.IsNetworkOrHostDown(cerr, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) { if minio.IsNetworkOrHostDown(cerr, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) {
globalBucketTargetSys.markOffline(tgt.EndpointURL()) globalBucketTargetSys.markOffline(tgt.EndpointURL())
} }
errResp := minio.ToErrorResponse(cerr) serr := ErrorRespToObjectError(cerr, bucket, object, objInfo.VersionID)
switch errResp.Code { switch {
case "NoSuchKey", "NoSuchVersion", "SlowDownRead": case isErrMethodNotAllowed(serr):
rAction = replicateAll
case isErrObjectNotFound(serr), isErrVersionNotFound(serr):
rAction = replicateAll
case isErrReadQuorum(serr), isErrWriteQuorum(serr):
rAction = replicateAll rAction = replicateAll
default: default:
rinfo.Err = cerr rinfo.Err = cerr

View File

@ -246,7 +246,7 @@ func TestListOnlineDisks(t *testing.T) {
t.Fatalf("Failed to putObject %v", err) t.Fatalf("Failed to putObject %v", err)
} }
partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true)
fi, err := getLatestFileInfo(ctx, partsMetadata, z.serverPools[0].sets[0].defaultParityCount, errs) fi, err := getLatestFileInfo(ctx, partsMetadata, z.serverPools[0].sets[0].defaultParityCount, errs)
if err != nil { if err != nil {
t.Fatalf("Failed to getLatestFileInfo %v", err) t.Fatalf("Failed to getLatestFileInfo %v", err)
@ -424,7 +424,7 @@ func TestListOnlineDisksSmallObjects(t *testing.T) {
t.Fatalf("Failed to putObject %v", err) t.Fatalf("Failed to putObject %v", err)
} }
partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", true) partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", true, true)
fi, err := getLatestFileInfo(ctx, partsMetadata, z.serverPools[0].sets[0].defaultParityCount, errs) fi, err := getLatestFileInfo(ctx, partsMetadata, z.serverPools[0].sets[0].defaultParityCount, errs)
if err != nil { if err != nil {
t.Fatalf("Failed to getLatestFileInfo %v", err) t.Fatalf("Failed to getLatestFileInfo %v", err)
@ -534,7 +534,7 @@ func TestDisksWithAllParts(t *testing.T) {
t.Fatalf("Failed to putObject %v", err) t.Fatalf("Failed to putObject %v", err)
} }
_, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) _, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true)
readQuorum := len(erasureDisks) / 2 readQuorum := len(erasureDisks) / 2
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil { if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
t.Fatalf("Failed to read xl meta data %v", reducedErr) t.Fatalf("Failed to read xl meta data %v", reducedErr)
@ -542,7 +542,7 @@ func TestDisksWithAllParts(t *testing.T) {
// Test 1: Test that all disks are returned without any failures with // Test 1: Test that all disks are returned without any failures with
// unmodified meta data // unmodified meta data
partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true)
if err != nil { if err != nil {
t.Fatalf("Failed to read xl meta data %v", err) t.Fatalf("Failed to read xl meta data %v", err)
} }

View File

@ -409,7 +409,7 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
} }
// Re-read when we have lock... // Re-read when we have lock...
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, bucket, object, versionID, true) partsMetadata, errs := readAllFileInfo(ctx, storageDisks, bucket, object, versionID, true, true)
if isAllNotFound(errs) { if isAllNotFound(errs) {
err := errFileNotFound err := errFileNotFound
if versionID != "" { if versionID != "" {
@ -1162,7 +1162,7 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version
// Perform quick read without lock. // Perform quick read without lock.
// This allows to quickly check if all is ok or all are missing. // This allows to quickly check if all is ok or all are missing.
_, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID, false) _, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID, false, false)
if isAllNotFound(errs) { if isAllNotFound(errs) {
err := errFileNotFound err := errFileNotFound
if versionID != "" { if versionID != "" {

View File

@ -266,7 +266,7 @@ func TestHealing(t *testing.T) {
} }
disk := er.getDisks()[0] disk := er.getDisks()[0]
fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false) fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -289,7 +289,7 @@ func TestHealing(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false) fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -318,7 +318,7 @@ func TestHealing(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", false) fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -426,11 +426,11 @@ func TestHealingVersioned(t *testing.T) {
} }
disk := er.getDisks()[0] disk := er.getDisks()[0]
fileInfoPreHeal1, err := disk.ReadVersion(context.Background(), bucket, object, oi1.VersionID, false) fileInfoPreHeal1, err := disk.ReadVersion(context.Background(), bucket, object, oi1.VersionID, ReadOptions{ReadData: false, Healing: true})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
fileInfoPreHeal2, err := disk.ReadVersion(context.Background(), bucket, object, oi2.VersionID, false) fileInfoPreHeal2, err := disk.ReadVersion(context.Background(), bucket, object, oi2.VersionID, ReadOptions{ReadData: false, Healing: true})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -453,11 +453,11 @@ func TestHealingVersioned(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
fileInfoPostHeal1, err := disk.ReadVersion(context.Background(), bucket, object, oi1.VersionID, false) fileInfoPostHeal1, err := disk.ReadVersion(context.Background(), bucket, object, oi1.VersionID, ReadOptions{ReadData: false, Healing: true})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
fileInfoPostHeal2, err := disk.ReadVersion(context.Background(), bucket, object, oi2.VersionID, false) fileInfoPostHeal2, err := disk.ReadVersion(context.Background(), bucket, object, oi2.VersionID, ReadOptions{ReadData: false, Healing: true})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -489,7 +489,7 @@ func TestHealingVersioned(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
fileInfoPostHeal1, err = disk.ReadVersion(context.Background(), bucket, object, "", false) fileInfoPostHeal1, err = disk.ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -499,7 +499,7 @@ func TestHealingVersioned(t *testing.T) {
t.Fatal("HealObject failed") t.Fatal("HealObject failed")
} }
fileInfoPostHeal2, err = disk.ReadVersion(context.Background(), bucket, object, "", false) fileInfoPostHeal2, err = disk.ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -637,7 +637,7 @@ func TestHealingDanglingObject(t *testing.T) {
// Restore... // Restore...
setDisks(orgDisks[:4]...) setDisks(orgDisks[:4]...)
fileInfoPreHeal, err := disks[0].ReadVersion(context.Background(), bucket, object, "", false) fileInfoPreHeal, err := disks[0].ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -654,7 +654,7 @@ func TestHealingDanglingObject(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
fileInfoPostHeal, err := disks[0].ReadVersion(context.Background(), bucket, object, "", false) fileInfoPostHeal, err := disks[0].ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -684,7 +684,7 @@ func TestHealingDanglingObject(t *testing.T) {
setDisks(orgDisks[:4]...) setDisks(orgDisks[:4]...)
disk := getDisk(0) disk := getDisk(0)
fileInfoPreHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", false) fileInfoPreHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -702,7 +702,7 @@ func TestHealingDanglingObject(t *testing.T) {
} }
disk = getDisk(0) disk = getDisk(0)
fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", false) fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -733,7 +733,7 @@ func TestHealingDanglingObject(t *testing.T) {
setDisks(orgDisks[:4]...) setDisks(orgDisks[:4]...)
disk = getDisk(0) disk = getDisk(0)
fileInfoPreHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", false) fileInfoPreHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -751,7 +751,7 @@ func TestHealingDanglingObject(t *testing.T) {
} }
disk = getDisk(0) disk = getDisk(0)
fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", false) fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -836,7 +836,7 @@ func TestHealCorrectQuorum(t *testing.T) {
er := set.sets[0] er := set.sets[0]
erasureDisks := er.getDisks() erasureDisks := er.getDisks()
fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true)
nfi, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs) nfi, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs)
if errors.Is(err, errFileNotFound) { if errors.Is(err, errFileNotFound) {
continue continue
@ -858,12 +858,12 @@ func TestHealCorrectQuorum(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true)
if countErrs(errs, nil) != len(fileInfos) { if countErrs(errs, nil) != len(fileInfos) {
t.Fatal("Expected all xl.meta healed, but partial heal detected") t.Fatal("Expected all xl.meta healed, but partial heal detected")
} }
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, minioMetaBucket, cfgFile, "", false) fileInfos, errs = readAllFileInfo(ctx, erasureDisks, minioMetaBucket, cfgFile, "", false, true)
nfi, err = getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs) nfi, err = getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs)
if errors.Is(err, errFileNotFound) { if errors.Is(err, errFileNotFound) {
continue continue
@ -885,7 +885,7 @@ func TestHealCorrectQuorum(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, minioMetaBucket, cfgFile, "", false) fileInfos, errs = readAllFileInfo(ctx, erasureDisks, minioMetaBucket, cfgFile, "", false, true)
if countErrs(errs, nil) != len(fileInfos) { if countErrs(errs, nil) != len(fileInfos) {
t.Fatal("Expected all xl.meta healed, but partial heal detected") t.Fatal("Expected all xl.meta healed, but partial heal detected")
} }
@ -970,7 +970,7 @@ func TestHealObjectCorruptedPools(t *testing.T) {
t.Fatalf("Failed to heal object - %v", err) t.Fatalf("Failed to heal object - %v", err)
} }
fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true)
fi, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs) fi, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs)
if err != nil { if err != nil {
t.Fatalf("Failed to getLatestFileInfo - %v", err) t.Fatalf("Failed to getLatestFileInfo - %v", err)
@ -998,7 +998,7 @@ func TestHealObjectCorruptedPools(t *testing.T) {
t.Errorf("Expected nil but received %v", err) t.Errorf("Expected nil but received %v", err)
} }
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true)
nfi, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs) nfi, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs)
if err != nil { if err != nil {
t.Fatalf("Failed to getLatestFileInfo - %v", err) t.Fatalf("Failed to getLatestFileInfo - %v", err)
@ -1029,7 +1029,7 @@ func TestHealObjectCorruptedPools(t *testing.T) {
t.Errorf("Expected nil but received %v", err) t.Errorf("Expected nil but received %v", err)
} }
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true)
nfi, err = getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs) nfi, err = getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs)
if err != nil { if err != nil {
t.Fatalf("Failed to getLatestFileInfo - %v", err) t.Fatalf("Failed to getLatestFileInfo - %v", err)
@ -1133,7 +1133,7 @@ func TestHealObjectCorruptedXLMeta(t *testing.T) {
firstDisk := erasureDisks[0] firstDisk := erasureDisks[0]
// Test 1: Remove the object backend files from the first disk. // Test 1: Remove the object backend files from the first disk.
fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true)
fi, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs) fi, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs)
if err != nil { if err != nil {
t.Fatalf("Failed to getLatestFileInfo - %v", err) t.Fatalf("Failed to getLatestFileInfo - %v", err)
@ -1156,7 +1156,7 @@ func TestHealObjectCorruptedXLMeta(t *testing.T) {
t.Errorf("Expected xl.meta file to be present but stat failed - %v", err) t.Errorf("Expected xl.meta file to be present but stat failed - %v", err)
} }
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true)
nfi1, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs) nfi1, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs)
if err != nil { if err != nil {
t.Fatalf("Failed to getLatestFileInfo - %v", err) t.Fatalf("Failed to getLatestFileInfo - %v", err)
@ -1179,7 +1179,7 @@ func TestHealObjectCorruptedXLMeta(t *testing.T) {
t.Errorf("Expected nil but received %v", err) t.Errorf("Expected nil but received %v", err)
} }
fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true)
nfi2, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs) nfi2, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs)
if err != nil { if err != nil {
t.Fatalf("Failed to getLatestFileInfo - %v", err) t.Fatalf("Failed to getLatestFileInfo - %v", err)
@ -1277,7 +1277,7 @@ func TestHealObjectCorruptedParts(t *testing.T) {
firstDisk := erasureDisks[0] firstDisk := erasureDisks[0]
secondDisk := erasureDisks[1] secondDisk := erasureDisks[1]
fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true)
fi, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs) fi, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs)
if err != nil { if err != nil {
t.Fatalf("Failed to getLatestFileInfo - %v", err) t.Fatalf("Failed to getLatestFileInfo - %v", err)

View File

@ -150,9 +150,14 @@ func hashOrder(key string, cardinality int) []int {
// Reads all `xl.meta` metadata as a FileInfo slice. // Reads all `xl.meta` metadata as a FileInfo slice.
// Returns error slice indicating the failed metadata reads. // Returns error slice indicating the failed metadata reads.
func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string, readData bool) ([]FileInfo, []error) { func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string, readData, healing bool) ([]FileInfo, []error) {
metadataArray := make([]FileInfo, len(disks)) metadataArray := make([]FileInfo, len(disks))
opts := ReadOptions{
ReadData: readData,
Healing: healing,
}
g := errgroup.WithNErrs(len(disks)) g := errgroup.WithNErrs(len(disks))
// Read `xl.meta` in parallel across disks. // Read `xl.meta` in parallel across disks.
for index := range disks { for index := range disks {
@ -161,7 +166,7 @@ func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, ve
if disks[index] == nil { if disks[index] == nil {
return errDiskNotFound return errDiskNotFound
} }
metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID, readData) metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID, opts)
return err return err
}, index) }, index)
} }

View File

@ -72,7 +72,7 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object
// Read metadata associated with the object from all disks. // Read metadata associated with the object from all disks.
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket,
uploadIDPath, "", false) uploadIDPath, "", false, false)
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount) readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount)
if err != nil { if err != nil {
@ -200,7 +200,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto
readDirFn(pathJoin(diskPath, minioMetaMultipartBucket), func(shaDir string, typ os.FileMode) error { readDirFn(pathJoin(diskPath, minioMetaMultipartBucket), func(shaDir string, typ os.FileMode) error {
readDirFn(pathJoin(diskPath, minioMetaMultipartBucket, shaDir), func(uploadIDDir string, typ os.FileMode) error { readDirFn(pathJoin(diskPath, minioMetaMultipartBucket, shaDir), func(uploadIDDir string, typ os.FileMode) error {
uploadIDPath := pathJoin(shaDir, uploadIDDir) uploadIDPath := pathJoin(shaDir, uploadIDDir)
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "", false) fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "", ReadOptions{})
if err != nil { if err != nil {
return nil return nil
} }

View File

@ -91,7 +91,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
// Read metadata associated with the object from all disks. // Read metadata associated with the object from all disks.
if srcOpts.VersionID != "" { if srcOpts.VersionID != "" {
metaArr, errs = readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID, true) metaArr, errs = readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID, true, false)
} else { } else {
metaArr, errs = readAllXL(ctx, storageDisks, srcBucket, srcObject, true, false, true) metaArr, errs = readAllXL(ctx, storageDisks, srcBucket, srcObject, true, false, true)
} }
@ -699,7 +699,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s
// Read metadata associated with the object from all disks. // Read metadata associated with the object from all disks.
if opts.VersionID != "" { if opts.VersionID != "" {
metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, readData) metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, readData, false)
} else { } else {
metaArr, errs = readAllXL(ctx, disks, bucket, object, readData, opts.InclFreeVersions, true) metaArr, errs = readAllXL(ctx, disks, bucket, object, readData, opts.InclFreeVersions, true)
} }
@ -1859,7 +1859,7 @@ func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object s
// Read metadata associated with the object from all disks. // Read metadata associated with the object from all disks.
if opts.VersionID != "" { if opts.VersionID != "" {
metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false) metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false, false)
} else { } else {
metaArr, errs = readAllXL(ctx, disks, bucket, object, false, false, true) metaArr, errs = readAllXL(ctx, disks, bucket, object, false, false, true)
} }
@ -1932,7 +1932,7 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
// Read metadata associated with the object from all disks. // Read metadata associated with the object from all disks.
if opts.VersionID != "" { if opts.VersionID != "" {
metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false) metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false, false)
} else { } else {
metaArr, errs = readAllXL(ctx, disks, bucket, object, false, false, true) metaArr, errs = readAllXL(ctx, disks, bucket, object, false, false, true)
} }

View File

@ -908,7 +908,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
t.Fatalf("Failed to putObject %v", err) t.Fatalf("Failed to putObject %v", err)
} }
parts1, errs1 := readAllFileInfo(ctx, erasureDisks, bucket, object1, "", false) parts1, errs1 := readAllFileInfo(ctx, erasureDisks, bucket, object1, "", false, false)
parts1SC := globalStorageClass parts1SC := globalStorageClass
// Object for test case 2 - No StorageClass defined, MetaData in PutObject requesting RRS Class // Object for test case 2 - No StorageClass defined, MetaData in PutObject requesting RRS Class
@ -920,7 +920,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
t.Fatalf("Failed to putObject %v", err) t.Fatalf("Failed to putObject %v", err)
} }
parts2, errs2 := readAllFileInfo(ctx, erasureDisks, bucket, object2, "", false) parts2, errs2 := readAllFileInfo(ctx, erasureDisks, bucket, object2, "", false, false)
parts2SC := globalStorageClass parts2SC := globalStorageClass
// Object for test case 3 - No StorageClass defined, MetaData in PutObject requesting Standard Storage Class // Object for test case 3 - No StorageClass defined, MetaData in PutObject requesting Standard Storage Class
@ -932,7 +932,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
t.Fatalf("Failed to putObject %v", err) t.Fatalf("Failed to putObject %v", err)
} }
parts3, errs3 := readAllFileInfo(ctx, erasureDisks, bucket, object3, "", false) parts3, errs3 := readAllFileInfo(ctx, erasureDisks, bucket, object3, "", false, false)
parts3SC := globalStorageClass parts3SC := globalStorageClass
// Object for test case 4 - Standard StorageClass defined as Parity 6, MetaData in PutObject requesting Standard Storage Class // Object for test case 4 - Standard StorageClass defined as Parity 6, MetaData in PutObject requesting Standard Storage Class
@ -950,7 +950,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
t.Fatalf("Failed to putObject %v", err) t.Fatalf("Failed to putObject %v", err)
} }
parts4, errs4 := readAllFileInfo(ctx, erasureDisks, bucket, object4, "", false) parts4, errs4 := readAllFileInfo(ctx, erasureDisks, bucket, object4, "", false, false)
parts4SC := storageclass.Config{ parts4SC := storageclass.Config{
Standard: storageclass.StorageClass{ Standard: storageclass.StorageClass{
Parity: 6, Parity: 6,
@ -973,7 +973,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
t.Fatalf("Failed to putObject %v", err) t.Fatalf("Failed to putObject %v", err)
} }
parts5, errs5 := readAllFileInfo(ctx, erasureDisks, bucket, object5, "", false) parts5, errs5 := readAllFileInfo(ctx, erasureDisks, bucket, object5, "", false, false)
parts5SC := globalStorageClass parts5SC := globalStorageClass
// Object for test case 6 - RRS StorageClass defined as Parity 2, MetaData in PutObject requesting Standard Storage Class // Object for test case 6 - RRS StorageClass defined as Parity 2, MetaData in PutObject requesting Standard Storage Class
@ -994,7 +994,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
t.Fatalf("Failed to putObject %v", err) t.Fatalf("Failed to putObject %v", err)
} }
parts6, errs6 := readAllFileInfo(ctx, erasureDisks, bucket, object6, "", false) parts6, errs6 := readAllFileInfo(ctx, erasureDisks, bucket, object6, "", false, false)
parts6SC := storageclass.Config{ parts6SC := storageclass.Config{
RRS: storageclass.StorageClass{ RRS: storageclass.StorageClass{
Parity: 2, Parity: 2,
@ -1017,7 +1017,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
t.Fatalf("Failed to putObject %v", err) t.Fatalf("Failed to putObject %v", err)
} }
parts7, errs7 := readAllFileInfo(ctx, erasureDisks, bucket, object7, "", false) parts7, errs7 := readAllFileInfo(ctx, erasureDisks, bucket, object7, "", false, false)
parts7SC := storageclass.Config{ parts7SC := storageclass.Config{
Standard: storageclass.StorageClass{ Standard: storageclass.StorageClass{
Parity: 5, Parity: 5,

View File

@ -427,7 +427,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
continue continue
} }
_, err := disk.ReadVersion(ctx, minioMetaBucket, _, err := disk.ReadVersion(ctx, minioMetaBucket,
o.objectPath(0), "", false) o.objectPath(0), "", ReadOptions{})
if err != nil { if err != nil {
time.Sleep(retryDelay250) time.Sleep(retryDelay250)
retries++ retries++
@ -504,7 +504,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt
continue continue
} }
_, err := disk.ReadVersion(ctx, minioMetaBucket, _, err := disk.ReadVersion(ctx, minioMetaBucket,
o.objectPath(partN), "", false) o.objectPath(partN), "", ReadOptions{})
if err != nil { if err != nil {
time.Sleep(retryDelay250) time.Sleep(retryDelay250)
retries++ retries++

View File

@ -260,11 +260,11 @@ func (d *naughtyDisk) DeleteVersion(ctx context.Context, volume, path string, fi
return d.disk.DeleteVersion(ctx, volume, path, fi, forceDelMarker) return d.disk.DeleteVersion(ctx, volume, path, fi, forceDelMarker)
} }
func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) { func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string, opts ReadOptions) (fi FileInfo, err error) {
if err := d.calcError(); err != nil { if err := d.calcError(); err != nil {
return FileInfo{}, err return FileInfo{}, err
} }
return d.disk.ReadVersion(ctx, volume, path, versionID, readData) return d.disk.ReadVersion(ctx, volume, path, versionID, opts)
} }
func (d *naughtyDisk) WriteAll(ctx context.Context, volume string, path string, b []byte) (err error) { func (d *naughtyDisk) WriteAll(ctx context.Context, volume string, path string, b []byte) (err error) {

View File

@ -686,6 +686,12 @@ func isErrReadQuorum(err error) bool {
return errors.As(err, &rquorum) return errors.As(err, &rquorum)
} }
// isErrWriteQuorum check if the error type is InsufficentWriteQuorum
func isErrWriteQuorum(err error) bool {
var rquorum InsufficientWriteQuorum
return errors.As(err, &rquorum)
}
// isErrObjectNotFound - Check if error type is ObjectNotFound. // isErrObjectNotFound - Check if error type is ObjectNotFound.
func isErrObjectNotFound(err error) bool { func isErrObjectNotFound(err error) bool {
var objNotFound ObjectNotFound var objNotFound ObjectNotFound

View File

@ -111,7 +111,7 @@ func testPathTraversalExploit(obj ObjectLayer, instanceType, bucketName string,
z := obj.(*erasureServerPools) z := obj.(*erasureServerPools)
xl := z.serverPools[0].sets[0] xl := z.serverPools[0].sets[0]
erasureDisks := xl.getDisks() erasureDisks := xl.getDisks()
parts, errs := readAllFileInfo(ctx, erasureDisks, bucketName, objectName, "", false) parts, errs := readAllFileInfo(ctx, erasureDisks, bucketName, objectName, "", false, false)
for i := range parts { for i := range parts {
if errs[i] == nil { if errs[i] == nil {
if parts[i].Name == objectName { if parts[i].Name == objectName {

View File

@ -164,7 +164,7 @@ func testPostPolicyReservedBucketExploit(obj ObjectLayer, instanceType string, d
z := obj.(*erasureServerPools) z := obj.(*erasureServerPools)
xl := z.serverPools[0].sets[0] xl := z.serverPools[0].sets[0]
erasureDisks := xl.getDisks() erasureDisks := xl.getDisks()
parts, errs := readAllFileInfo(ctx, erasureDisks, bucketName, objectName+"/upload.txt", "", false) parts, errs := readAllFileInfo(ctx, erasureDisks, bucketName, objectName+"/upload.txt", "", false, false)
for i := range parts { for i := range parts {
if errs[i] == nil { if errs[i] == nil {
if parts[i].Name == objectName+"/upload.txt" { if parts[i].Name == objectName+"/upload.txt" {

View File

@ -83,7 +83,7 @@ type StorageAPI interface {
DeleteVersions(ctx context.Context, volume string, versions []FileInfoVersions) []error DeleteVersions(ctx context.Context, volume string, versions []FileInfoVersions) []error
WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error
UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo, opts UpdateMetadataOpts) error UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo, opts UpdateMetadataOpts) error
ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (FileInfo, error) ReadVersion(ctx context.Context, volume, path, versionID string, opts ReadOptions) (FileInfo, error)
ReadXL(ctx context.Context, volume, path string, readData bool) (RawFileInfo, error) ReadXL(ctx context.Context, volume, path string, readData bool) (RawFileInfo, error)
RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) (uint64, error) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) (uint64, error)
@ -260,7 +260,7 @@ func (p *unrecognizedDisk) WriteMetadata(ctx context.Context, volume, path strin
return errDiskNotFound return errDiskNotFound
} }
func (p *unrecognizedDisk) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) { func (p *unrecognizedDisk) ReadVersion(ctx context.Context, volume, path, versionID string, opts ReadOptions) (fi FileInfo, err error) {
return fi, errDiskNotFound return fi, errDiskNotFound
} }

View File

@ -480,15 +480,16 @@ func readMsgpReaderPoolPut(r *msgp.Reader) {
} }
} }
func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) { func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string, opts ReadOptions) (fi FileInfo, err error) {
// Use websocket when not reading data. // Use websocket when not reading data.
if !readData { if !opts.ReadData {
resp, err := storageReadVersionHandler.Call(ctx, client.gridConn, grid.NewMSSWith(map[string]string{ resp, err := storageReadVersionHandler.Call(ctx, client.gridConn, grid.NewMSSWith(map[string]string{
storageRESTDiskID: client.diskID, storageRESTDiskID: client.diskID,
storageRESTVolume: volume, storageRESTVolume: volume,
storageRESTFilePath: path, storageRESTFilePath: path,
storageRESTVersionID: versionID, storageRESTVersionID: versionID,
storageRESTReadData: "false", storageRESTReadData: strconv.FormatBool(opts.ReadData),
storageRESTHealing: strconv.FormatBool(opts.Healing),
})) }))
if err != nil { if err != nil {
return fi, toStorageErr(err) return fi, toStorageErr(err)
@ -500,7 +501,8 @@ func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path,
values.Set(storageRESTVolume, volume) values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path) values.Set(storageRESTFilePath, path)
values.Set(storageRESTVersionID, versionID) values.Set(storageRESTVersionID, versionID)
values.Set(storageRESTReadData, strconv.FormatBool(readData)) values.Set(storageRESTReadData, strconv.FormatBool(opts.ReadData))
values.Set(storageRESTHealing, strconv.FormatBool(opts.Healing))
respBody, err := client.call(ctx, storageRESTMethodReadVersion, values, nil, -1) respBody, err := client.call(ctx, storageRESTMethodReadVersion, values, nil, -1)
if err != nil { if err != nil {

View File

@ -20,7 +20,7 @@ package cmd
//go:generate msgp -file $GOFILE -unexported //go:generate msgp -file $GOFILE -unexported
const ( const (
storageRESTVersion = "v50" // Added DiskInfo metrics query storageRESTVersion = "v51" // Added ReadVersions readOptions
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
storageRESTPrefix = minioReservedBucketPath + "/storage" storageRESTPrefix = minioReservedBucketPath + "/storage"
) )
@ -56,6 +56,7 @@ const (
storageRESTFilePath = "file-path" storageRESTFilePath = "file-path"
storageRESTVersionID = "version-id" storageRESTVersionID = "version-id"
storageRESTReadData = "read-data" storageRESTReadData = "read-data"
storageRESTHealing = "healing"
storageRESTTotalVersions = "total-versions" storageRESTTotalVersions = "total-versions"
storageRESTSrcVolume = "source-volume" storageRESTSrcVolume = "source-volume"
storageRESTSrcPath = "source-path" storageRESTSrcPath = "source-path"

View File

@ -386,7 +386,12 @@ func (s *storageRESTServer) ReadVersionHandlerWS(params *grid.MSS) (*FileInfo, *
return nil, grid.NewRemoteErr(err) return nil, grid.NewRemoteErr(err)
} }
fi, err := s.storage.ReadVersion(context.Background(), volume, filePath, versionID, readData) healing, err := strconv.ParseBool(params.Get(storageRESTHealing))
if err != nil {
return nil, grid.NewRemoteErr(err)
}
fi, err := s.storage.ReadVersion(context.Background(), volume, filePath, versionID, ReadOptions{ReadData: readData, Healing: healing})
if err != nil { if err != nil {
return nil, grid.NewRemoteErr(err) return nil, grid.NewRemoteErr(err)
} }
@ -406,8 +411,12 @@ func (s *storageRESTServer) ReadVersionHandler(w http.ResponseWriter, r *http.Re
s.writeErrorResponse(w, err) s.writeErrorResponse(w, err)
return return
} }
healing, err := strconv.ParseBool(r.Form.Get(storageRESTHealing))
fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID, readData) if err != nil {
s.writeErrorResponse(w, err)
return
}
fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID, ReadOptions{ReadData: readData, Healing: healing})
if err != nil { if err != nil {
s.writeErrorResponse(w, err) s.writeErrorResponse(w, err)
return return

View File

@ -103,10 +103,10 @@ func ErrorRespToObjectError(err error, params ...string) error {
if len(params) >= 1 { if len(params) >= 1 {
bucket = params[0] bucket = params[0]
} }
if len(params) == 2 { if len(params) >= 2 {
object = params[1] object = params[1]
} }
if len(params) == 3 { if len(params) >= 3 {
versionID = params[2] versionID = params[2]
} }
@ -122,6 +122,10 @@ func ErrorRespToObjectError(err error, params ...string) error {
} }
switch minioErr.Code { switch minioErr.Code {
case "SlowDownWrite":
err = InsufficientWriteQuorum{Bucket: bucket, Object: object}
case "SlowDownRead":
err = InsufficientReadQuorum{Bucket: bucket, Object: object}
case "PreconditionFailed": case "PreconditionFailed":
err = PreConditionFailed{} err = PreConditionFailed{}
case "InvalidRange": case "InvalidRange":

View File

@ -608,7 +608,7 @@ func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path s
return w.Run(func() error { return p.storage.WriteMetadata(ctx, volume, path, fi) }) return w.Run(func() error { return p.storage.WriteMetadata(ctx, volume, path, fi) })
} }
func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) { func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string, opts ReadOptions) (fi FileInfo, err error) {
ctx, done, err := p.TrackDiskHealth(ctx, storageMetricReadVersion, volume, path) ctx, done, err := p.TrackDiskHealth(ctx, storageMetricReadVersion, volume, path)
if err != nil { if err != nil {
return fi, err return fi, err
@ -617,7 +617,7 @@ func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, ve
w := xioutil.NewDeadlineWorker(diskMaxTimeout) w := xioutil.NewDeadlineWorker(diskMaxTimeout)
rerr := w.Run(func() error { rerr := w.Run(func() error {
fi, err = p.storage.ReadVersion(ctx, volume, path, versionID, readData) fi, err = p.storage.ReadVersion(ctx, volume, path, versionID, opts)
return err return err
}) })
if rerr != nil { if rerr != nil {

View File

@ -1186,6 +1186,8 @@ func (x *xlMetaV2) AppendTo(dst []byte) ([]byte, error) {
return append(dst, x.data...), nil return append(dst, x.data...), nil
} }
const emptyUUID = "00000000-0000-0000-0000-000000000000"
func (x *xlMetaV2) findVersionStr(key string) (idx int, ver *xlMetaV2Version, err error) { func (x *xlMetaV2) findVersionStr(key string) (idx int, ver *xlMetaV2Version, err error) {
if key == nullVersionID { if key == nullVersionID {
key = "" key = ""

View File

@ -1426,10 +1426,16 @@ func (s *xlStorage) ReadXL(ctx context.Context, volume, path string, readData bo
}, err }, err
} }
// ReadOptions optional inputs for ReadVersion
type ReadOptions struct {
ReadData bool
Healing bool
}
// ReadVersion - reads metadata and returns FileInfo at path `xl.meta` // ReadVersion - reads metadata and returns FileInfo at path `xl.meta`
// for all objects less than `32KiB` this call returns data as well // for all objects less than `32KiB` this call returns data as well
// along with metadata. // along with metadata.
func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) { func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string, opts ReadOptions) (fi FileInfo, err error) {
volumeDir, err := s.getVolDir(volume) volumeDir, err := s.getVolDir(volume)
if err != nil { if err != nil {
return fi, err return fi, err
@ -1440,6 +1446,8 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
return fi, err return fi, err
} }
readData := opts.ReadData
buf, dmTime, err := s.readRaw(ctx, volume, volumeDir, filePath, readData) buf, dmTime, err := s.readRaw(ctx, volume, volumeDir, filePath, readData)
if err != nil { if err != nil {
if err == errFileNotFound { if err == errFileNotFound {
@ -1473,9 +1481,9 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
return fi, nil return fi, nil
} }
// For overwritten objects without header we might have a conflict with // For overwritten objects without header we might have a
// data written later. // conflict with data written later. Check the data path
// Check the data path if there is a part with data. // if there is a part with data.
partPath := fmt.Sprintf("part.%d", fi.Parts[0].Number) partPath := fmt.Sprintf("part.%d", fi.Parts[0].Number)
dataPath := pathJoin(path, fi.DataDir, partPath) dataPath := pathJoin(path, fi.DataDir, partPath)
_, lerr := Lstat(pathJoin(volumeDir, dataPath)) _, lerr := Lstat(pathJoin(volumeDir, dataPath))
@ -1504,6 +1512,22 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
} }
} }
if !skipAccessChecks(volume) && !opts.Healing && fi.TransitionStatus == "" && !fi.InlineData() && len(fi.Data) == 0 && fi.DataDir != "" && fi.DataDir != emptyUUID {
// Verify if the dataDir is present or not when the data
// is not inlined to make sure we return correct errors
// during HeadObject().
// Healing must not come here and return error, since healing
// deals with dataDirs directly, let healing fix things automatically.
if lerr := Access(pathJoin(volumeDir, path, fi.DataDir)); lerr != nil {
if os.IsNotExist(lerr) {
// Data dir is missing we must return errFileCorrupted
return FileInfo{}, errFileCorrupt
}
return FileInfo{}, osErrToFileErr(lerr)
}
}
return fi, nil return fi, nil
} }

View File

@ -235,7 +235,7 @@ func TestXLStorageReadVersionLegacy(t *testing.T) {
t.Fatalf("Unable to create a file \"as-file\", %s", err) t.Fatalf("Unable to create a file \"as-file\", %s", err)
} }
fi, err := xlStorage.ReadVersion(context.Background(), "exists-legacy", "as-file", "", false) fi, err := xlStorage.ReadVersion(context.Background(), "exists-legacy", "as-file", "", ReadOptions{})
if err != nil { if err != nil {
t.Fatalf("Unable to read older 'xl.json' content: %s", err) t.Fatalf("Unable to read older 'xl.json' content: %s", err)
} }
@ -254,6 +254,7 @@ func TestXLStorageReadVersion(t *testing.T) {
} }
xlMeta, _ := os.ReadFile("testdata/xl.meta") xlMeta, _ := os.ReadFile("testdata/xl.meta")
fi, _ := getFileInfo(xlMeta, "exists", "as-file", "", false, true)
// Create files for the test cases. // Create files for the test cases.
if err = xlStorage.MakeVol(context.Background(), "exists"); err != nil { if err = xlStorage.MakeVol(context.Background(), "exists"); err != nil {
@ -268,6 +269,9 @@ func TestXLStorageReadVersion(t *testing.T) {
if err = xlStorage.AppendFile(context.Background(), "exists", "as-file-parent/xl.meta", xlMeta); err != nil { if err = xlStorage.AppendFile(context.Background(), "exists", "as-file-parent/xl.meta", xlMeta); err != nil {
t.Fatalf("Unable to create a file \"as-file-parent\", %s", err) t.Fatalf("Unable to create a file \"as-file-parent\", %s", err)
} }
if err = xlStorage.MakeVol(context.Background(), "exists/as-file/"+fi.DataDir); err != nil {
t.Fatalf("Unable to create a dataDir %s, %s", fi.DataDir, err)
}
// TestXLStoragecases to validate different conditions for ReadVersion API. // TestXLStoragecases to validate different conditions for ReadVersion API.
testCases := []struct { testCases := []struct {
@ -321,7 +325,7 @@ func TestXLStorageReadVersion(t *testing.T) {
// Run through all the test cases and validate for ReadVersion. // Run through all the test cases and validate for ReadVersion.
for i, testCase := range testCases { for i, testCase := range testCases {
_, err = xlStorage.ReadVersion(context.Background(), testCase.volume, testCase.path, "", false) _, err = xlStorage.ReadVersion(context.Background(), testCase.volume, testCase.path, "", ReadOptions{})
if err != testCase.err { if err != testCase.err {
t.Fatalf("TestXLStorage %d: Expected err \"%s\", got err \"%s\"", i+1, testCase.err, err) t.Fatalf("TestXLStorage %d: Expected err \"%s\", got err \"%s\"", i+1, testCase.err, err)
} }
@ -1650,7 +1654,7 @@ func TestXLStorageDeleteVersion(t *testing.T) {
for i := range versions { for i := range versions {
versions[i] = uuid.New().String() versions[i] = uuid.New().String()
fi := FileInfo{ fi := FileInfo{
Name: object, Volume: volume, VersionID: versions[i], ModTime: UTCNow(), DataDir: uuid.NewString(), Size: 10000, Name: object, Volume: volume, VersionID: versions[i], ModTime: UTCNow(), DataDir: "", Size: 10000,
Erasure: ErasureInfo{ Erasure: ErasureInfo{
Algorithm: erasureAlgorithm, Algorithm: erasureAlgorithm,
DataBlocks: 4, DataBlocks: 4,
@ -1670,7 +1674,7 @@ func TestXLStorageDeleteVersion(t *testing.T) {
t.Helper() t.Helper()
for i := range versions { for i := range versions {
shouldExist := !deleted[i] shouldExist := !deleted[i]
fi, err := xl.ReadVersion(ctx, volume, object, versions[i], false) fi, err := xl.ReadVersion(ctx, volume, object, versions[i], ReadOptions{})
if shouldExist { if shouldExist {
if err != nil { if err != nil {
t.Fatalf("Version %s should exist, but got err %v", versions[i], err) t.Fatalf("Version %s should exist, but got err %v", versions[i], err)
@ -1717,7 +1721,7 @@ func TestXLStorageDeleteVersion(t *testing.T) {
checkVerExist(t) checkVerExist(t)
// Meta should be deleted now... // Meta should be deleted now...
fi, err := xl.ReadVersion(ctx, volume, object, "", false) fi, err := xl.ReadVersion(ctx, volume, object, "", ReadOptions{})
if err != errFileNotFound { if err != errFileNotFound {
t.Fatalf("Object %s should not exist, but returned: %#v", object, fi) t.Fatalf("Object %s should not exist, but returned: %#v", object, fi)
} }

View File

@ -14,6 +14,7 @@ if [ ! -f ./mc ]; then
fi fi
export CI=true export CI=true
export MINIO_SCANNER_SPEED=fastest
(minio server /tmp/xl/{1...10}/disk{0...1} 2>&1 >/tmp/decom.log) & (minio server /tmp/xl/{1...10}/disk{0...1} 2>&1 >/tmp/decom.log) &
pid=$! pid=$!
@ -64,7 +65,7 @@ export MC_HOST_mytier="http://minioadmin:minioadmin@localhost:9001/"
./mc ls -r myminio/bucket2/ >bucket2_ns.txt ./mc ls -r myminio/bucket2/ >bucket2_ns.txt
./mc ls -r --versions myminio/bucket2/ >bucket2_ns_versions.txt ./mc ls -r --versions myminio/bucket2/ >bucket2_ns_versions.txt
sleep 10 sleep 30
./mc ls -r --versions mytier/tiered/ >tiered_ns_versions.txt ./mc ls -r --versions mytier/tiered/ >tiered_ns_versions.txt