diff --git a/.github/workflows/multipart/migrate.sh b/.github/workflows/multipart/migrate.sh index 5d00cd5c0..534e512f6 100755 --- a/.github/workflows/multipart/migrate.sh +++ b/.github/workflows/multipart/migrate.sh @@ -5,11 +5,19 @@ set -x ## change working directory cd .github/workflows/multipart/ -docker-compose -f docker-compose-site1.yaml rm -s -f -docker-compose -f docker-compose-site2.yaml rm -s -f -for volume in $(docker volume ls -q | grep minio); do - docker volume rm ${volume} -done +function cleanup() { + docker-compose -f docker-compose-site1.yaml rm -s -f || true + docker-compose -f docker-compose-site2.yaml rm -s -f || true + for volume in $(docker volume ls -q | grep minio); do + docker volume rm ${volume} || true + done + + docker system prune -f || true + docker volume prune -f || true + docker volume rm $(docker volume ls -q -f dangling=true) || true +} + +cleanup if [ ! -f ./mc ]; then wget --quiet -O mc https://dl.minio.io/client/mc/release/linux-amd64/mc && @@ -101,15 +109,7 @@ if [ $failed_count_site2 -ne 0 ]; then exit 1 fi -docker-compose -f docker-compose-site1.yaml rm -s -f -docker-compose -f docker-compose-site2.yaml rm -s -f -for volume in $(docker volume ls -q | grep minio); do - docker volume rm ${volume} -done - -docker system prune -f || true -docker volume prune -f || true -docker volume rm $(docker volume ls -q -f dangling=true) || true +cleanup ## change working directory cd ../../../ diff --git a/cmd/bucket-replication.go b/cmd/bucket-replication.go index 65b58812c..cd6edc6d9 100644 --- a/cmd/bucket-replication.go +++ b/cmd/bucket-replication.go @@ -639,19 +639,23 @@ func replicateDeleteToTarget(ctx context.Context, dobj DeletedObjectReplicationI IsReplicationReadyForDeleteMarker: true, }, }) + serr := ErrorRespToObjectError(err, dobj.Bucket, dobj.ObjectName, dobj.VersionID) switch { - case isErrMethodNotAllowed(ErrorRespToObjectError(err, dobj.Bucket, dobj.ObjectName)): + case isErrMethodNotAllowed(serr): // delete marker already replicated if dobj.VersionID == "" && rinfo.VersionPurgeStatus.Empty() { rinfo.ReplicationStatus = replication.Completed return } - case isErrObjectNotFound(ErrorRespToObjectError(err, dobj.Bucket, dobj.ObjectName)): + case isErrObjectNotFound(serr), isErrVersionNotFound(serr): // version being purged is already not found on target. if !rinfo.VersionPurgeStatus.Empty() { rinfo.VersionPurgeStatus = Complete return } + case isErrReadQuorum(serr), isErrWriteQuorum(serr): + // destination has some quorum issues, perform removeObject() anyways + // to complete the operation. default: if err != nil && minio.IsNetworkOrHostDown(err, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) { globalBucketTargetSys.markOffline(tgt.EndpointURL()) @@ -1383,7 +1387,6 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object rinfo.Duration = time.Since(startTime) }() - rAction = replicateAll oi, cerr := tgt.StatObject(ctx, tgt.Bucket, object, minio.StatObjectOptions{ VersionID: objInfo.VersionID, Internal: minio.AdvancedGetOptions{ @@ -1420,16 +1423,19 @@ func (ri ReplicateObjectInfo) replicateAll(ctx context.Context, objectAPI Object } return } - } - // if target returns error other than NoSuchKey, defer replication attempt - if cerr != nil { + } else { + // if target returns error other than NoSuchKey, defer replication attempt if minio.IsNetworkOrHostDown(cerr, true) && !globalBucketTargetSys.isOffline(tgt.EndpointURL()) { globalBucketTargetSys.markOffline(tgt.EndpointURL()) } - errResp := minio.ToErrorResponse(cerr) - switch errResp.Code { - case "NoSuchKey", "NoSuchVersion", "SlowDownRead": + serr := ErrorRespToObjectError(cerr, bucket, object, objInfo.VersionID) + switch { + case isErrMethodNotAllowed(serr): + rAction = replicateAll + case isErrObjectNotFound(serr), isErrVersionNotFound(serr): + rAction = replicateAll + case isErrReadQuorum(serr), isErrWriteQuorum(serr): rAction = replicateAll default: rinfo.Err = cerr diff --git a/cmd/erasure-healing-common_test.go b/cmd/erasure-healing-common_test.go index 0e87af5ef..2515fcf27 100644 --- a/cmd/erasure-healing-common_test.go +++ b/cmd/erasure-healing-common_test.go @@ -246,7 +246,7 @@ func TestListOnlineDisks(t *testing.T) { t.Fatalf("Failed to putObject %v", err) } - partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) + partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true) fi, err := getLatestFileInfo(ctx, partsMetadata, z.serverPools[0].sets[0].defaultParityCount, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo %v", err) @@ -424,7 +424,7 @@ func TestListOnlineDisksSmallObjects(t *testing.T) { t.Fatalf("Failed to putObject %v", err) } - partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", true) + partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", true, true) fi, err := getLatestFileInfo(ctx, partsMetadata, z.serverPools[0].sets[0].defaultParityCount, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo %v", err) @@ -534,7 +534,7 @@ func TestDisksWithAllParts(t *testing.T) { t.Fatalf("Failed to putObject %v", err) } - _, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) + _, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true) readQuorum := len(erasureDisks) / 2 if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil { t.Fatalf("Failed to read xl meta data %v", reducedErr) @@ -542,7 +542,7 @@ func TestDisksWithAllParts(t *testing.T) { // Test 1: Test that all disks are returned without any failures with // unmodified meta data - partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) + partsMetadata, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true) if err != nil { t.Fatalf("Failed to read xl meta data %v", err) } diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 218d87a65..21106c0aa 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -409,7 +409,7 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object } // Re-read when we have lock... - partsMetadata, errs := readAllFileInfo(ctx, storageDisks, bucket, object, versionID, true) + partsMetadata, errs := readAllFileInfo(ctx, storageDisks, bucket, object, versionID, true, true) if isAllNotFound(errs) { err := errFileNotFound if versionID != "" { @@ -1162,7 +1162,7 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version // Perform quick read without lock. // This allows to quickly check if all is ok or all are missing. - _, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID, false) + _, errs := readAllFileInfo(healCtx, storageDisks, bucket, object, versionID, false, false) if isAllNotFound(errs) { err := errFileNotFound if versionID != "" { diff --git a/cmd/erasure-healing_test.go b/cmd/erasure-healing_test.go index 2a28dc79f..864604b87 100644 --- a/cmd/erasure-healing_test.go +++ b/cmd/erasure-healing_test.go @@ -266,7 +266,7 @@ func TestHealing(t *testing.T) { } disk := er.getDisks()[0] - fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false) + fileInfoPreHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } @@ -289,7 +289,7 @@ func TestHealing(t *testing.T) { t.Fatal(err) } - fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", false) + fileInfoPostHeal, err := disk.ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } @@ -318,7 +318,7 @@ func TestHealing(t *testing.T) { t.Fatal(err) } - fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", false) + fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } @@ -426,11 +426,11 @@ func TestHealingVersioned(t *testing.T) { } disk := er.getDisks()[0] - fileInfoPreHeal1, err := disk.ReadVersion(context.Background(), bucket, object, oi1.VersionID, false) + fileInfoPreHeal1, err := disk.ReadVersion(context.Background(), bucket, object, oi1.VersionID, ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } - fileInfoPreHeal2, err := disk.ReadVersion(context.Background(), bucket, object, oi2.VersionID, false) + fileInfoPreHeal2, err := disk.ReadVersion(context.Background(), bucket, object, oi2.VersionID, ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } @@ -453,11 +453,11 @@ func TestHealingVersioned(t *testing.T) { t.Fatal(err) } - fileInfoPostHeal1, err := disk.ReadVersion(context.Background(), bucket, object, oi1.VersionID, false) + fileInfoPostHeal1, err := disk.ReadVersion(context.Background(), bucket, object, oi1.VersionID, ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } - fileInfoPostHeal2, err := disk.ReadVersion(context.Background(), bucket, object, oi2.VersionID, false) + fileInfoPostHeal2, err := disk.ReadVersion(context.Background(), bucket, object, oi2.VersionID, ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } @@ -489,7 +489,7 @@ func TestHealingVersioned(t *testing.T) { t.Fatal(err) } - fileInfoPostHeal1, err = disk.ReadVersion(context.Background(), bucket, object, "", false) + fileInfoPostHeal1, err = disk.ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } @@ -499,7 +499,7 @@ func TestHealingVersioned(t *testing.T) { t.Fatal("HealObject failed") } - fileInfoPostHeal2, err = disk.ReadVersion(context.Background(), bucket, object, "", false) + fileInfoPostHeal2, err = disk.ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } @@ -637,7 +637,7 @@ func TestHealingDanglingObject(t *testing.T) { // Restore... setDisks(orgDisks[:4]...) - fileInfoPreHeal, err := disks[0].ReadVersion(context.Background(), bucket, object, "", false) + fileInfoPreHeal, err := disks[0].ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } @@ -654,7 +654,7 @@ func TestHealingDanglingObject(t *testing.T) { t.Fatal(err) } - fileInfoPostHeal, err := disks[0].ReadVersion(context.Background(), bucket, object, "", false) + fileInfoPostHeal, err := disks[0].ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } @@ -684,7 +684,7 @@ func TestHealingDanglingObject(t *testing.T) { setDisks(orgDisks[:4]...) disk := getDisk(0) - fileInfoPreHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", false) + fileInfoPreHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } @@ -702,7 +702,7 @@ func TestHealingDanglingObject(t *testing.T) { } disk = getDisk(0) - fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", false) + fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } @@ -733,7 +733,7 @@ func TestHealingDanglingObject(t *testing.T) { setDisks(orgDisks[:4]...) disk = getDisk(0) - fileInfoPreHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", false) + fileInfoPreHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } @@ -751,7 +751,7 @@ func TestHealingDanglingObject(t *testing.T) { } disk = getDisk(0) - fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", false) + fileInfoPostHeal, err = disk.ReadVersion(context.Background(), bucket, object, "", ReadOptions{ReadData: false, Healing: true}) if err != nil { t.Fatal(err) } @@ -836,7 +836,7 @@ func TestHealCorrectQuorum(t *testing.T) { er := set.sets[0] erasureDisks := er.getDisks() - fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) + fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true) nfi, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs) if errors.Is(err, errFileNotFound) { continue @@ -858,12 +858,12 @@ func TestHealCorrectQuorum(t *testing.T) { t.Fatal(err) } - fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) + fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true) if countErrs(errs, nil) != len(fileInfos) { t.Fatal("Expected all xl.meta healed, but partial heal detected") } - fileInfos, errs = readAllFileInfo(ctx, erasureDisks, minioMetaBucket, cfgFile, "", false) + fileInfos, errs = readAllFileInfo(ctx, erasureDisks, minioMetaBucket, cfgFile, "", false, true) nfi, err = getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs) if errors.Is(err, errFileNotFound) { continue @@ -885,7 +885,7 @@ func TestHealCorrectQuorum(t *testing.T) { t.Fatal(err) } - fileInfos, errs = readAllFileInfo(ctx, erasureDisks, minioMetaBucket, cfgFile, "", false) + fileInfos, errs = readAllFileInfo(ctx, erasureDisks, minioMetaBucket, cfgFile, "", false, true) if countErrs(errs, nil) != len(fileInfos) { t.Fatal("Expected all xl.meta healed, but partial heal detected") } @@ -970,7 +970,7 @@ func TestHealObjectCorruptedPools(t *testing.T) { t.Fatalf("Failed to heal object - %v", err) } - fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) + fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true) fi, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo - %v", err) @@ -998,7 +998,7 @@ func TestHealObjectCorruptedPools(t *testing.T) { t.Errorf("Expected nil but received %v", err) } - fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) + fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true) nfi, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo - %v", err) @@ -1029,7 +1029,7 @@ func TestHealObjectCorruptedPools(t *testing.T) { t.Errorf("Expected nil but received %v", err) } - fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) + fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true) nfi, err = getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo - %v", err) @@ -1133,7 +1133,7 @@ func TestHealObjectCorruptedXLMeta(t *testing.T) { firstDisk := erasureDisks[0] // Test 1: Remove the object backend files from the first disk. - fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) + fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true) fi, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo - %v", err) @@ -1156,7 +1156,7 @@ func TestHealObjectCorruptedXLMeta(t *testing.T) { t.Errorf("Expected xl.meta file to be present but stat failed - %v", err) } - fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) + fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true) nfi1, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo - %v", err) @@ -1179,7 +1179,7 @@ func TestHealObjectCorruptedXLMeta(t *testing.T) { t.Errorf("Expected nil but received %v", err) } - fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) + fileInfos, errs = readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true) nfi2, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo - %v", err) @@ -1277,7 +1277,7 @@ func TestHealObjectCorruptedParts(t *testing.T) { firstDisk := erasureDisks[0] secondDisk := erasureDisks[1] - fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false) + fileInfos, errs := readAllFileInfo(ctx, erasureDisks, bucket, object, "", false, true) fi, err := getLatestFileInfo(ctx, fileInfos, er.defaultParityCount, errs) if err != nil { t.Fatalf("Failed to getLatestFileInfo - %v", err) diff --git a/cmd/erasure-metadata-utils.go b/cmd/erasure-metadata-utils.go index b401ac377..ef3bc3f0f 100644 --- a/cmd/erasure-metadata-utils.go +++ b/cmd/erasure-metadata-utils.go @@ -150,9 +150,14 @@ func hashOrder(key string, cardinality int) []int { // Reads all `xl.meta` metadata as a FileInfo slice. // Returns error slice indicating the failed metadata reads. -func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string, readData bool) ([]FileInfo, []error) { +func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, versionID string, readData, healing bool) ([]FileInfo, []error) { metadataArray := make([]FileInfo, len(disks)) + opts := ReadOptions{ + ReadData: readData, + Healing: healing, + } + g := errgroup.WithNErrs(len(disks)) // Read `xl.meta` in parallel across disks. for index := range disks { @@ -161,7 +166,7 @@ func readAllFileInfo(ctx context.Context, disks []StorageAPI, bucket, object, ve if disks[index] == nil { return errDiskNotFound } - metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID, readData) + metadataArray[index], err = disks[index].ReadVersion(ctx, bucket, object, versionID, opts) return err }, index) } diff --git a/cmd/erasure-multipart.go b/cmd/erasure-multipart.go index d82c46668..4ff65c0c4 100644 --- a/cmd/erasure-multipart.go +++ b/cmd/erasure-multipart.go @@ -72,7 +72,7 @@ func (er erasureObjects) checkUploadIDExists(ctx context.Context, bucket, object // Read metadata associated with the object from all disks. partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, - uploadIDPath, "", false) + uploadIDPath, "", false, false) readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs, er.defaultParityCount) if err != nil { @@ -200,7 +200,7 @@ func (er erasureObjects) cleanupStaleUploadsOnDisk(ctx context.Context, disk Sto readDirFn(pathJoin(diskPath, minioMetaMultipartBucket), func(shaDir string, typ os.FileMode) error { readDirFn(pathJoin(diskPath, minioMetaMultipartBucket, shaDir), func(uploadIDDir string, typ os.FileMode) error { uploadIDPath := pathJoin(shaDir, uploadIDDir) - fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "", false) + fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "", ReadOptions{}) if err != nil { return nil } diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index ff324c364..003792e22 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -91,7 +91,7 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d // Read metadata associated with the object from all disks. if srcOpts.VersionID != "" { - metaArr, errs = readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID, true) + metaArr, errs = readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID, true, false) } else { metaArr, errs = readAllXL(ctx, storageDisks, srcBucket, srcObject, true, false, true) } @@ -699,7 +699,7 @@ func (er erasureObjects) getObjectFileInfo(ctx context.Context, bucket, object s // Read metadata associated with the object from all disks. if opts.VersionID != "" { - metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, readData) + metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, readData, false) } else { metaArr, errs = readAllXL(ctx, disks, bucket, object, readData, opts.InclFreeVersions, true) } @@ -1859,7 +1859,7 @@ func (er erasureObjects) PutObjectMetadata(ctx context.Context, bucket, object s // Read metadata associated with the object from all disks. if opts.VersionID != "" { - metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false) + metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false, false) } else { metaArr, errs = readAllXL(ctx, disks, bucket, object, false, false, true) } @@ -1932,7 +1932,7 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin // Read metadata associated with the object from all disks. if opts.VersionID != "" { - metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false) + metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false, false) } else { metaArr, errs = readAllXL(ctx, disks, bucket, object, false, false, true) } diff --git a/cmd/erasure-object_test.go b/cmd/erasure-object_test.go index 4e411d050..3878eba60 100644 --- a/cmd/erasure-object_test.go +++ b/cmd/erasure-object_test.go @@ -908,7 +908,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts1, errs1 := readAllFileInfo(ctx, erasureDisks, bucket, object1, "", false) + parts1, errs1 := readAllFileInfo(ctx, erasureDisks, bucket, object1, "", false, false) parts1SC := globalStorageClass // Object for test case 2 - No StorageClass defined, MetaData in PutObject requesting RRS Class @@ -920,7 +920,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts2, errs2 := readAllFileInfo(ctx, erasureDisks, bucket, object2, "", false) + parts2, errs2 := readAllFileInfo(ctx, erasureDisks, bucket, object2, "", false, false) parts2SC := globalStorageClass // Object for test case 3 - No StorageClass defined, MetaData in PutObject requesting Standard Storage Class @@ -932,7 +932,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts3, errs3 := readAllFileInfo(ctx, erasureDisks, bucket, object3, "", false) + parts3, errs3 := readAllFileInfo(ctx, erasureDisks, bucket, object3, "", false, false) parts3SC := globalStorageClass // Object for test case 4 - Standard StorageClass defined as Parity 6, MetaData in PutObject requesting Standard Storage Class @@ -950,7 +950,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts4, errs4 := readAllFileInfo(ctx, erasureDisks, bucket, object4, "", false) + parts4, errs4 := readAllFileInfo(ctx, erasureDisks, bucket, object4, "", false, false) parts4SC := storageclass.Config{ Standard: storageclass.StorageClass{ Parity: 6, @@ -973,7 +973,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts5, errs5 := readAllFileInfo(ctx, erasureDisks, bucket, object5, "", false) + parts5, errs5 := readAllFileInfo(ctx, erasureDisks, bucket, object5, "", false, false) parts5SC := globalStorageClass // Object for test case 6 - RRS StorageClass defined as Parity 2, MetaData in PutObject requesting Standard Storage Class @@ -994,7 +994,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts6, errs6 := readAllFileInfo(ctx, erasureDisks, bucket, object6, "", false) + parts6, errs6 := readAllFileInfo(ctx, erasureDisks, bucket, object6, "", false, false) parts6SC := storageclass.Config{ RRS: storageclass.StorageClass{ Parity: 2, @@ -1017,7 +1017,7 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin t.Fatalf("Failed to putObject %v", err) } - parts7, errs7 := readAllFileInfo(ctx, erasureDisks, bucket, object7, "", false) + parts7, errs7 := readAllFileInfo(ctx, erasureDisks, bucket, object7, "", false, false) parts7SC := storageclass.Config{ Standard: storageclass.StorageClass{ Parity: 5, diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index b6ff0e04e..a4ba0a096 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -427,7 +427,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt continue } _, err := disk.ReadVersion(ctx, minioMetaBucket, - o.objectPath(0), "", false) + o.objectPath(0), "", ReadOptions{}) if err != nil { time.Sleep(retryDelay250) retries++ @@ -504,7 +504,7 @@ func (er *erasureObjects) streamMetadataParts(ctx context.Context, o listPathOpt continue } _, err := disk.ReadVersion(ctx, minioMetaBucket, - o.objectPath(partN), "", false) + o.objectPath(partN), "", ReadOptions{}) if err != nil { time.Sleep(retryDelay250) retries++ diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index 1a32eeb28..9523aeda4 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -260,11 +260,11 @@ func (d *naughtyDisk) DeleteVersion(ctx context.Context, volume, path string, fi return d.disk.DeleteVersion(ctx, volume, path, fi, forceDelMarker) } -func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) { +func (d *naughtyDisk) ReadVersion(ctx context.Context, volume, path, versionID string, opts ReadOptions) (fi FileInfo, err error) { if err := d.calcError(); err != nil { return FileInfo{}, err } - return d.disk.ReadVersion(ctx, volume, path, versionID, readData) + return d.disk.ReadVersion(ctx, volume, path, versionID, opts) } func (d *naughtyDisk) WriteAll(ctx context.Context, volume string, path string, b []byte) (err error) { diff --git a/cmd/object-api-errors.go b/cmd/object-api-errors.go index e36887627..080979c20 100644 --- a/cmd/object-api-errors.go +++ b/cmd/object-api-errors.go @@ -686,6 +686,12 @@ func isErrReadQuorum(err error) bool { return errors.As(err, &rquorum) } +// isErrWriteQuorum check if the error type is InsufficentWriteQuorum +func isErrWriteQuorum(err error) bool { + var rquorum InsufficientWriteQuorum + return errors.As(err, &rquorum) +} + // isErrObjectNotFound - Check if error type is ObjectNotFound. func isErrObjectNotFound(err error) bool { var objNotFound ObjectNotFound diff --git a/cmd/object-api-utils_test.go b/cmd/object-api-utils_test.go index 65572577f..da726d5af 100644 --- a/cmd/object-api-utils_test.go +++ b/cmd/object-api-utils_test.go @@ -111,7 +111,7 @@ func testPathTraversalExploit(obj ObjectLayer, instanceType, bucketName string, z := obj.(*erasureServerPools) xl := z.serverPools[0].sets[0] erasureDisks := xl.getDisks() - parts, errs := readAllFileInfo(ctx, erasureDisks, bucketName, objectName, "", false) + parts, errs := readAllFileInfo(ctx, erasureDisks, bucketName, objectName, "", false, false) for i := range parts { if errs[i] == nil { if parts[i].Name == objectName { diff --git a/cmd/post-policy_test.go b/cmd/post-policy_test.go index c2bcd0ed4..a301491b1 100644 --- a/cmd/post-policy_test.go +++ b/cmd/post-policy_test.go @@ -164,7 +164,7 @@ func testPostPolicyReservedBucketExploit(obj ObjectLayer, instanceType string, d z := obj.(*erasureServerPools) xl := z.serverPools[0].sets[0] erasureDisks := xl.getDisks() - parts, errs := readAllFileInfo(ctx, erasureDisks, bucketName, objectName+"/upload.txt", "", false) + parts, errs := readAllFileInfo(ctx, erasureDisks, bucketName, objectName+"/upload.txt", "", false, false) for i := range parts { if errs[i] == nil { if parts[i].Name == objectName+"/upload.txt" { diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 1ac1cc2d7..36bd539a6 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -83,7 +83,7 @@ type StorageAPI interface { DeleteVersions(ctx context.Context, volume string, versions []FileInfoVersions) []error WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo, opts UpdateMetadataOpts) error - ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (FileInfo, error) + ReadVersion(ctx context.Context, volume, path, versionID string, opts ReadOptions) (FileInfo, error) ReadXL(ctx context.Context, volume, path string, readData bool) (RawFileInfo, error) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) (uint64, error) @@ -260,7 +260,7 @@ func (p *unrecognizedDisk) WriteMetadata(ctx context.Context, volume, path strin return errDiskNotFound } -func (p *unrecognizedDisk) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) { +func (p *unrecognizedDisk) ReadVersion(ctx context.Context, volume, path, versionID string, opts ReadOptions) (fi FileInfo, err error) { return fi, errDiskNotFound } diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 9c4586d05..ec5fb8a0e 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -480,15 +480,16 @@ func readMsgpReaderPoolPut(r *msgp.Reader) { } } -func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) { +func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, versionID string, opts ReadOptions) (fi FileInfo, err error) { // Use websocket when not reading data. - if !readData { + if !opts.ReadData { resp, err := storageReadVersionHandler.Call(ctx, client.gridConn, grid.NewMSSWith(map[string]string{ storageRESTDiskID: client.diskID, storageRESTVolume: volume, storageRESTFilePath: path, storageRESTVersionID: versionID, - storageRESTReadData: "false", + storageRESTReadData: strconv.FormatBool(opts.ReadData), + storageRESTHealing: strconv.FormatBool(opts.Healing), })) if err != nil { return fi, toStorageErr(err) @@ -500,7 +501,8 @@ func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, values.Set(storageRESTVolume, volume) values.Set(storageRESTFilePath, path) values.Set(storageRESTVersionID, versionID) - values.Set(storageRESTReadData, strconv.FormatBool(readData)) + values.Set(storageRESTReadData, strconv.FormatBool(opts.ReadData)) + values.Set(storageRESTHealing, strconv.FormatBool(opts.Healing)) respBody, err := client.call(ctx, storageRESTMethodReadVersion, values, nil, -1) if err != nil { diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index 8e9beb1b1..66b8be8ee 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -20,7 +20,7 @@ package cmd //go:generate msgp -file $GOFILE -unexported const ( - storageRESTVersion = "v50" // Added DiskInfo metrics query + storageRESTVersion = "v51" // Added ReadVersions readOptions storageRESTVersionPrefix = SlashSeparator + storageRESTVersion storageRESTPrefix = minioReservedBucketPath + "/storage" ) @@ -56,6 +56,7 @@ const ( storageRESTFilePath = "file-path" storageRESTVersionID = "version-id" storageRESTReadData = "read-data" + storageRESTHealing = "healing" storageRESTTotalVersions = "total-versions" storageRESTSrcVolume = "source-volume" storageRESTSrcPath = "source-path" diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 59adf69f0..7408d8242 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -386,7 +386,12 @@ func (s *storageRESTServer) ReadVersionHandlerWS(params *grid.MSS) (*FileInfo, * return nil, grid.NewRemoteErr(err) } - fi, err := s.storage.ReadVersion(context.Background(), volume, filePath, versionID, readData) + healing, err := strconv.ParseBool(params.Get(storageRESTHealing)) + if err != nil { + return nil, grid.NewRemoteErr(err) + } + + fi, err := s.storage.ReadVersion(context.Background(), volume, filePath, versionID, ReadOptions{ReadData: readData, Healing: healing}) if err != nil { return nil, grid.NewRemoteErr(err) } @@ -406,8 +411,12 @@ func (s *storageRESTServer) ReadVersionHandler(w http.ResponseWriter, r *http.Re s.writeErrorResponse(w, err) return } - - fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID, readData) + healing, err := strconv.ParseBool(r.Form.Get(storageRESTHealing)) + if err != nil { + s.writeErrorResponse(w, err) + return + } + fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID, ReadOptions{ReadData: readData, Healing: healing}) if err != nil { s.writeErrorResponse(w, err) return diff --git a/cmd/utils.go b/cmd/utils.go index 516782bab..d8c5c9546 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -103,10 +103,10 @@ func ErrorRespToObjectError(err error, params ...string) error { if len(params) >= 1 { bucket = params[0] } - if len(params) == 2 { + if len(params) >= 2 { object = params[1] } - if len(params) == 3 { + if len(params) >= 3 { versionID = params[2] } @@ -122,6 +122,10 @@ func ErrorRespToObjectError(err error, params ...string) error { } switch minioErr.Code { + case "SlowDownWrite": + err = InsufficientWriteQuorum{Bucket: bucket, Object: object} + case "SlowDownRead": + err = InsufficientReadQuorum{Bucket: bucket, Object: object} case "PreconditionFailed": err = PreConditionFailed{} case "InvalidRange": diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index bd66378f4..c80ea2ffc 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -608,7 +608,7 @@ func (p *xlStorageDiskIDCheck) WriteMetadata(ctx context.Context, volume, path s return w.Run(func() error { return p.storage.WriteMetadata(ctx, volume, path, fi) }) } -func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) { +func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, versionID string, opts ReadOptions) (fi FileInfo, err error) { ctx, done, err := p.TrackDiskHealth(ctx, storageMetricReadVersion, volume, path) if err != nil { return fi, err @@ -617,7 +617,7 @@ func (p *xlStorageDiskIDCheck) ReadVersion(ctx context.Context, volume, path, ve w := xioutil.NewDeadlineWorker(diskMaxTimeout) rerr := w.Run(func() error { - fi, err = p.storage.ReadVersion(ctx, volume, path, versionID, readData) + fi, err = p.storage.ReadVersion(ctx, volume, path, versionID, opts) return err }) if rerr != nil { diff --git a/cmd/xl-storage-format-v2.go b/cmd/xl-storage-format-v2.go index 2531f233e..72a058f05 100644 --- a/cmd/xl-storage-format-v2.go +++ b/cmd/xl-storage-format-v2.go @@ -1186,6 +1186,8 @@ func (x *xlMetaV2) AppendTo(dst []byte) ([]byte, error) { return append(dst, x.data...), nil } +const emptyUUID = "00000000-0000-0000-0000-000000000000" + func (x *xlMetaV2) findVersionStr(key string) (idx int, ver *xlMetaV2Version, err error) { if key == nullVersionID { key = "" diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 82b79334f..5ade78e6e 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -1426,10 +1426,16 @@ func (s *xlStorage) ReadXL(ctx context.Context, volume, path string, readData bo }, err } +// ReadOptions optional inputs for ReadVersion +type ReadOptions struct { + ReadData bool + Healing bool +} + // ReadVersion - reads metadata and returns FileInfo at path `xl.meta` // for all objects less than `32KiB` this call returns data as well // along with metadata. -func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (fi FileInfo, err error) { +func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID string, opts ReadOptions) (fi FileInfo, err error) { volumeDir, err := s.getVolDir(volume) if err != nil { return fi, err @@ -1440,6 +1446,8 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str return fi, err } + readData := opts.ReadData + buf, dmTime, err := s.readRaw(ctx, volume, volumeDir, filePath, readData) if err != nil { if err == errFileNotFound { @@ -1473,9 +1481,9 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str return fi, nil } - // For overwritten objects without header we might have a conflict with - // data written later. - // Check the data path if there is a part with data. + // For overwritten objects without header we might have a + // conflict with data written later. Check the data path + // if there is a part with data. partPath := fmt.Sprintf("part.%d", fi.Parts[0].Number) dataPath := pathJoin(path, fi.DataDir, partPath) _, lerr := Lstat(pathJoin(volumeDir, dataPath)) @@ -1504,6 +1512,22 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str } } + if !skipAccessChecks(volume) && !opts.Healing && fi.TransitionStatus == "" && !fi.InlineData() && len(fi.Data) == 0 && fi.DataDir != "" && fi.DataDir != emptyUUID { + // Verify if the dataDir is present or not when the data + // is not inlined to make sure we return correct errors + // during HeadObject(). + + // Healing must not come here and return error, since healing + // deals with dataDirs directly, let healing fix things automatically. + if lerr := Access(pathJoin(volumeDir, path, fi.DataDir)); lerr != nil { + if os.IsNotExist(lerr) { + // Data dir is missing we must return errFileCorrupted + return FileInfo{}, errFileCorrupt + } + return FileInfo{}, osErrToFileErr(lerr) + } + } + return fi, nil } diff --git a/cmd/xl-storage_test.go b/cmd/xl-storage_test.go index c56cbbcad..88b0b9c7d 100644 --- a/cmd/xl-storage_test.go +++ b/cmd/xl-storage_test.go @@ -235,7 +235,7 @@ func TestXLStorageReadVersionLegacy(t *testing.T) { t.Fatalf("Unable to create a file \"as-file\", %s", err) } - fi, err := xlStorage.ReadVersion(context.Background(), "exists-legacy", "as-file", "", false) + fi, err := xlStorage.ReadVersion(context.Background(), "exists-legacy", "as-file", "", ReadOptions{}) if err != nil { t.Fatalf("Unable to read older 'xl.json' content: %s", err) } @@ -254,6 +254,7 @@ func TestXLStorageReadVersion(t *testing.T) { } xlMeta, _ := os.ReadFile("testdata/xl.meta") + fi, _ := getFileInfo(xlMeta, "exists", "as-file", "", false, true) // Create files for the test cases. if err = xlStorage.MakeVol(context.Background(), "exists"); err != nil { @@ -268,6 +269,9 @@ func TestXLStorageReadVersion(t *testing.T) { if err = xlStorage.AppendFile(context.Background(), "exists", "as-file-parent/xl.meta", xlMeta); err != nil { t.Fatalf("Unable to create a file \"as-file-parent\", %s", err) } + if err = xlStorage.MakeVol(context.Background(), "exists/as-file/"+fi.DataDir); err != nil { + t.Fatalf("Unable to create a dataDir %s, %s", fi.DataDir, err) + } // TestXLStoragecases to validate different conditions for ReadVersion API. testCases := []struct { @@ -321,7 +325,7 @@ func TestXLStorageReadVersion(t *testing.T) { // Run through all the test cases and validate for ReadVersion. for i, testCase := range testCases { - _, err = xlStorage.ReadVersion(context.Background(), testCase.volume, testCase.path, "", false) + _, err = xlStorage.ReadVersion(context.Background(), testCase.volume, testCase.path, "", ReadOptions{}) if err != testCase.err { t.Fatalf("TestXLStorage %d: Expected err \"%s\", got err \"%s\"", i+1, testCase.err, err) } @@ -1650,7 +1654,7 @@ func TestXLStorageDeleteVersion(t *testing.T) { for i := range versions { versions[i] = uuid.New().String() fi := FileInfo{ - Name: object, Volume: volume, VersionID: versions[i], ModTime: UTCNow(), DataDir: uuid.NewString(), Size: 10000, + Name: object, Volume: volume, VersionID: versions[i], ModTime: UTCNow(), DataDir: "", Size: 10000, Erasure: ErasureInfo{ Algorithm: erasureAlgorithm, DataBlocks: 4, @@ -1670,7 +1674,7 @@ func TestXLStorageDeleteVersion(t *testing.T) { t.Helper() for i := range versions { shouldExist := !deleted[i] - fi, err := xl.ReadVersion(ctx, volume, object, versions[i], false) + fi, err := xl.ReadVersion(ctx, volume, object, versions[i], ReadOptions{}) if shouldExist { if err != nil { t.Fatalf("Version %s should exist, but got err %v", versions[i], err) @@ -1717,7 +1721,7 @@ func TestXLStorageDeleteVersion(t *testing.T) { checkVerExist(t) // Meta should be deleted now... - fi, err := xl.ReadVersion(ctx, volume, object, "", false) + fi, err := xl.ReadVersion(ctx, volume, object, "", ReadOptions{}) if err != errFileNotFound { t.Fatalf("Object %s should not exist, but returned: %#v", object, fi) } diff --git a/docs/distributed/decom.sh b/docs/distributed/decom.sh index 5dba69195..659ef3f38 100755 --- a/docs/distributed/decom.sh +++ b/docs/distributed/decom.sh @@ -14,6 +14,7 @@ if [ ! -f ./mc ]; then fi export CI=true +export MINIO_SCANNER_SPEED=fastest (minio server /tmp/xl/{1...10}/disk{0...1} 2>&1 >/tmp/decom.log) & pid=$! @@ -64,7 +65,7 @@ export MC_HOST_mytier="http://minioadmin:minioadmin@localhost:9001/" ./mc ls -r myminio/bucket2/ >bucket2_ns.txt ./mc ls -r --versions myminio/bucket2/ >bucket2_ns_versions.txt -sleep 10 +sleep 30 ./mc ls -r --versions mytier/tiered/ >tiered_ns_versions.txt