From cc1d8f005737269c3f3b9e7808334f5c8e974193 Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Mon, 28 Nov 2022 19:20:55 +0100 Subject: [PATCH] Check for abandoned data when healing (#16122) --- cmd/data-scanner-metric.go | 2 + cmd/data-scanner.go | 13 ++ cmd/erasure-healing.go | 53 ++++++-- cmd/erasure-healing_test.go | 215 ++++++++++++++++++++++++++++++++ cmd/erasure-server-pool.go | 34 ++++- cmd/erasure-sets.go | 5 + cmd/global-heal.go | 7 +- cmd/healingmetric_string.go | 5 +- cmd/naughty-disk_test.go | 7 ++ cmd/object-api-interface.go | 1 + cmd/scannermetric_string.go | 20 +-- cmd/storage-interface.go | 5 + cmd/storage-rest-client.go | 18 +++ cmd/storage-rest-common.go | 1 + cmd/storage-rest-server.go | 14 +++ cmd/storagemetric_string.go | 7 +- cmd/xl-storage-disk-id-check.go | 13 ++ cmd/xl-storage-format-v2.go | 34 +++++ cmd/xl-storage.go | 99 +++++++++++++++ 19 files changed, 530 insertions(+), 23 deletions(-) diff --git a/cmd/data-scanner-metric.go b/cmd/data-scanner-metric.go index 0c43f5acc..3fac10c26 100644 --- a/cmd/data-scanner-metric.go +++ b/cmd/data-scanner-metric.go @@ -47,6 +47,8 @@ const ( scannerMetricILM scannerMetricCheckReplication scannerMetricYield + scannerMetricCleanAbandoned + scannerMetricApplyNonCurrent // START Trace metrics: scannerMetricStartTrace diff --git a/cmd/data-scanner.go b/cmd/data-scanner.go index a04aa6134..7158c36cf 100644 --- a/cmd/data-scanner.go +++ b/cmd/data-scanner.go @@ -1015,6 +1015,8 @@ func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ Ob if i.lifeCycle == nil { return fivs, nil } + done := globalScannerMetrics.time(scannerMetricApplyNonCurrent) + defer done() _, days, lim := i.lifeCycle.NoncurrentVersionsExpirationLimit(lifecycle.ObjectOpts{Name: i.objectPath()}) if lim == 0 || len(fivs) <= lim+1 { // fewer than lim _noncurrent_ versions @@ -1071,6 +1073,17 @@ func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ Ob // applyVersionActions will apply lifecycle checks on all versions of a scanned item. Returns versions that remain // after applying lifecycle checks configured. func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fivs []FileInfo) ([]FileInfo, error) { + if i.heal.enabled { + if healDeleteDangling { + done := globalScannerMetrics.time(scannerMetricCleanAbandoned) + err := o.CheckAbandonedParts(ctx, i.bucket, i.objectPath(), madmin.HealOpts{Remove: healDeleteDangling}) + done() + if err != nil { + logger.LogIf(ctx, fmt.Errorf("unable to check object %s/%s for abandoned data: %w", i.bucket, i.objectPath(), err)) + } + } + } + return i.applyNewerNoncurrentVersionLimit(ctx, o, fivs) } diff --git a/cmd/erasure-healing.go b/cmd/erasure-healing.go index 5f0d68953..960ed0b7a 100644 --- a/cmd/erasure-healing.go +++ b/cmd/erasure-healing.go @@ -40,6 +40,7 @@ type healingMetric uint8 const ( healingMetricBucket healingMetric = iota healingMetricObject + healingMetricCheckAbandonedParts ) // AcceptableDelta returns 'true' if the fi.DiskMTime is under @@ -93,7 +94,7 @@ func (er erasureObjects) healBucket(ctx context.Context, storageDisks []StorageA if globalTrace.NumSubscribers(madmin.TraceHealing) > 0 { startTime := time.Now() defer func() { - healTrace(healingMetricBucket, startTime, bucket, "", "", opts, err, &res) + healTrace(healingMetricBucket, startTime, bucket, "", "", &opts, err, &res) }() } @@ -294,7 +295,7 @@ func shouldHealObjectOnDisk(erErr, dataErr error, meta FileInfo, latestMeta File } // Heals an object by re-writing corrupt/missing erasure blocks. -func (er erasureObjects) healObject(ctx context.Context, bucket string, object string, versionID string, opts madmin.HealOpts) (result madmin.HealResultItem, err error) { +func (er *erasureObjects) healObject(ctx context.Context, bucket string, object string, versionID string, opts madmin.HealOpts) (result madmin.HealResultItem, err error) { dryRun := opts.DryRun scanMode := opts.ScanMode @@ -304,7 +305,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s if globalTrace.NumSubscribers(madmin.TraceHealing) > 0 { startTime := time.Now() defer func() { - healTrace(healingMetricObject, startTime, bucket, object, versionID, opts, err, &result) + healTrace(healingMetricObject, startTime, bucket, object, versionID, &opts, err, &result) }() } // Initialize heal result object @@ -673,9 +674,45 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s return result, nil } +// checkAbandonedParts will check if an object has abandoned parts, +// meaning data-dirs or inlined data that are no longer referenced by the xl.meta +// Errors are generally ignored by this function. +func (er *erasureObjects) checkAbandonedParts(ctx context.Context, bucket string, object string, opts madmin.HealOpts) (err error) { + if !opts.Remove || opts.DryRun { + return nil + } + if globalTrace.NumSubscribers(madmin.TraceHealing) > 0 { + startTime := time.Now() + defer func() { + healTrace(healingMetricCheckAbandonedParts, startTime, bucket, object, "", nil, err, nil) + }() + } + if !opts.NoLock { + lk := er.NewNSLock(bucket, object) + lkctx, err := lk.GetLock(ctx, globalOperationTimeout) + if err != nil { + return err + } + ctx = lkctx.Context() + defer lk.Unlock(lkctx.Cancel) + } + var wg sync.WaitGroup + for _, disk := range er.getDisks() { + if disk != nil { + wg.Add(1) + go func(disk StorageAPI) { + defer wg.Done() + _ = disk.CleanAbandonedData(ctx, bucket, object) + }(disk) + } + } + wg.Wait() + return nil +} + // healObjectDir - heals object directory specifically, this special call // is needed since we do not have a special backend format for directories. -func (er erasureObjects) healObjectDir(ctx context.Context, bucket, object string, dryRun bool, remove bool) (hr madmin.HealResultItem, err error) { +func (er *erasureObjects) healObjectDir(ctx context.Context, bucket, object string, dryRun bool, remove bool) (hr madmin.HealResultItem, err error) { storageDisks := er.getDisks() storageEndpoints := er.getEndpoints() @@ -766,7 +803,7 @@ func (er erasureObjects) healObjectDir(ctx context.Context, bucket, object strin // Populates default heal result item entries with possible values when we are returning prematurely. // This is to ensure that in any circumstance we are not returning empty arrays with wrong values. -func (er erasureObjects) defaultHealResult(lfi FileInfo, storageDisks []StorageAPI, storageEndpoints []Endpoint, errs []error, bucket, object, versionID string) madmin.HealResultItem { +func (er *erasureObjects) defaultHealResult(lfi FileInfo, storageDisks []StorageAPI, storageEndpoints []Endpoint, errs []error, bucket, object, versionID string) madmin.HealResultItem { // Initialize heal result object result := madmin.HealResultItem{ Type: madmin.HealItemObject, @@ -1005,16 +1042,18 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version } // healTrace sends healing results to trace output. -func healTrace(funcName healingMetric, startTime time.Time, bucket, object, versionID string, opts madmin.HealOpts, err error, result *madmin.HealResultItem) { +func healTrace(funcName healingMetric, startTime time.Time, bucket, object, versionID string, opts *madmin.HealOpts, err error, result *madmin.HealResultItem) { tr := madmin.TraceInfo{ TraceType: madmin.TraceHealing, Time: startTime, NodeName: globalLocalNodeName, FuncName: "heal." + funcName.String(), Duration: time.Since(startTime), - Message: fmt.Sprintf("dry:%v, rm:%v, recreate:%v mode:%v", opts.DryRun, opts.Remove, opts.Recreate, opts.ScanMode), Path: pathJoin(bucket, decodeDirObject(object)), } + if opts != nil { + tr.Message = fmt.Sprintf("dry:%v, rm:%v, recreate:%v mode:%v", opts.DryRun, opts.Remove, opts.Recreate, opts.ScanMode) + } if versionID != "" && versionID != "null" { tr.Path += " v=" + versionID } diff --git a/cmd/erasure-healing_test.go b/cmd/erasure-healing_test.go index 82387cbaf..115f3ce58 100644 --- a/cmd/erasure-healing_test.go +++ b/cmd/erasure-healing_test.go @@ -31,6 +31,7 @@ import ( "time" "github.com/dustin/go-humanize" + uuid2 "github.com/google/uuid" "github.com/minio/madmin-go" ) @@ -276,6 +277,12 @@ func TestHealing(t *testing.T) { t.Fatal(err) } + // Checking abandoned parts should do nothing + err = er.checkAbandonedParts(ctx, bucket, object, madmin.HealOpts{ScanMode: madmin.HealNormalScan, Remove: true}) + if err != nil { + t.Fatal(err) + } + _, err = er.HealObject(ctx, bucket, object, "", madmin.HealOpts{ScanMode: madmin.HealNormalScan}) if err != nil { t.Fatal(err) @@ -320,6 +327,214 @@ func TestHealing(t *testing.T) { t.Fatal("HealObject failed") } + uuid, _ := uuid2.NewRandom() + for _, drive := range fsDirs { + dir := path.Join(drive, bucket, object, uuid.String()) + err = os.MkdirAll(dir, os.ModePerm) + if err != nil { + t.Fatal(err) + } + err = os.WriteFile(pathJoin(dir, "part.1"), []byte("some data"), os.ModePerm) + if err != nil { + t.Fatal(err) + } + } + + // This should remove all the unreferenced parts. + err = er.checkAbandonedParts(ctx, bucket, object, madmin.HealOpts{ScanMode: madmin.HealNormalScan, Remove: true}) + if err != nil { + t.Fatal(err) + } + + for _, drive := range fsDirs { + dir := path.Join(drive, bucket, object, uuid.String()) + _, err := os.ReadFile(pathJoin(dir, "part.1")) + if err == nil { + t.Fatal("expected data dit to be cleaned up") + } + } + + // Remove the bucket - to simulate the case where bucket was + // created when the disk was down. + err = os.RemoveAll(path.Join(fsDirs[0], bucket)) + if err != nil { + t.Fatal(err) + } + // This would create the bucket. + _, err = er.HealBucket(ctx, bucket, madmin.HealOpts{ + DryRun: false, + Remove: false, + }) + if err != nil { + t.Fatal(err) + } + // Stat the bucket to make sure that it was created. + _, err = er.getDisks()[0].StatVol(context.Background(), bucket) + if err != nil { + t.Fatal(err) + } +} + +// Tests both object and bucket healing. +func TestHealingVersioned(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + obj, fsDirs, err := prepareErasure16(ctx) + if err != nil { + t.Fatal(err) + } + defer obj.Shutdown(context.Background()) + + // initialize the server and obtain the credentials and root. + // credentials are necessary to sign the HTTP request. + if err = newTestConfig(globalMinioDefaultRegion, obj); err != nil { + t.Fatalf("Unable to initialize server config. %s", err) + } + + defer removeRoots(fsDirs) + + z := obj.(*erasureServerPools) + er := z.serverPools[0].sets[0] + + // Create "bucket" + err = obj.MakeBucketWithLocation(ctx, "bucket", MakeBucketOptions{VersioningEnabled: true}) + if err != nil { + t.Fatal(err) + } + + bucket := "bucket" + object := "object" + + data := make([]byte, 1*humanize.MiByte) + length := int64(len(data)) + _, err = rand.Read(data) + if err != nil { + t.Fatal(err) + } + + oi1, err := obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(data), length, "", ""), ObjectOptions{}) + if err != nil { + t.Fatal(err) + } + // 2nd version. + _, _ = rand.Read(data) + oi2, err := obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(data), length, "", ""), ObjectOptions{}) + if err != nil { + t.Fatal(err) + } + + disk := er.getDisks()[0] + fileInfoPreHeal1, err := disk.ReadVersion(context.Background(), bucket, object, oi1.VersionID, false) + if err != nil { + t.Fatal(err) + } + fileInfoPreHeal2, err := disk.ReadVersion(context.Background(), bucket, object, oi2.VersionID, false) + if err != nil { + t.Fatal(err) + } + + // Remove the object - to simulate the case where the disk was down when the object + // was created. + err = removeAll(pathJoin(disk.String(), bucket, object)) + if err != nil { + t.Fatal(err) + } + + // Checking abandoned parts should do nothing + err = er.checkAbandonedParts(ctx, bucket, object, madmin.HealOpts{ScanMode: madmin.HealNormalScan, Remove: true}) + if err != nil { + t.Fatal(err) + } + + _, err = er.HealObject(ctx, bucket, object, "", madmin.HealOpts{ScanMode: madmin.HealNormalScan}) + if err != nil { + t.Fatal(err) + } + + fileInfoPostHeal1, err := disk.ReadVersion(context.Background(), bucket, object, oi1.VersionID, false) + if err != nil { + t.Fatal(err) + } + fileInfoPostHeal2, err := disk.ReadVersion(context.Background(), bucket, object, oi2.VersionID, false) + if err != nil { + t.Fatal(err) + } + + // After heal the meta file should be as expected. + if !fileInfoPreHeal1.Equals(fileInfoPostHeal1) { + t.Fatal("HealObject failed") + } + if !fileInfoPreHeal1.Equals(fileInfoPostHeal2) { + t.Fatal("HealObject failed") + } + + err = os.RemoveAll(path.Join(fsDirs[0], bucket, object, "xl.meta")) + if err != nil { + t.Fatal(err) + } + + // Write xl.meta with different modtime to simulate the case where a disk had + // gone down when an object was replaced by a new object. + fileInfoOutDated := fileInfoPreHeal1 + fileInfoOutDated.ModTime = time.Now() + err = disk.WriteMetadata(context.Background(), bucket, object, fileInfoOutDated) + if err != nil { + t.Fatal(err) + } + + _, err = er.HealObject(ctx, bucket, object, "", madmin.HealOpts{ScanMode: madmin.HealDeepScan}) + if err != nil { + t.Fatal(err) + } + + fileInfoPostHeal1, err = disk.ReadVersion(context.Background(), bucket, object, "", false) + if err != nil { + t.Fatal(err) + } + + // After heal the meta file should be as expected. + if !fileInfoPreHeal1.Equals(fileInfoPostHeal1) { + t.Fatal("HealObject failed") + } + + fileInfoPostHeal2, err = disk.ReadVersion(context.Background(), bucket, object, "", false) + if err != nil { + t.Fatal(err) + } + + // After heal the meta file should be as expected. + if !fileInfoPreHeal2.Equals(fileInfoPostHeal2) { + t.Fatal("HealObject failed") + } + + uuid, _ := uuid2.NewRandom() + for _, drive := range fsDirs { + dir := path.Join(drive, bucket, object, uuid.String()) + err = os.MkdirAll(dir, os.ModePerm) + if err != nil { + t.Fatal(err) + } + err = os.WriteFile(pathJoin(dir, "part.1"), []byte("some data"), os.ModePerm) + if err != nil { + t.Fatal(err) + } + } + + // This should remove all the unreferenced parts. + err = er.checkAbandonedParts(ctx, bucket, object, madmin.HealOpts{ScanMode: madmin.HealNormalScan, Remove: true}) + if err != nil { + t.Fatal(err) + } + + for _, drive := range fsDirs { + dir := path.Join(drive, bucket, object, uuid.String()) + _, err := os.ReadFile(pathJoin(dir, "part.1")) + if err == nil { + t.Fatal("expected data dit to be cleaned up") + } + } + // Remove the bucket - to simulate the case where bucket was // created when the disk was down. err = os.RemoveAll(path.Join(fsDirs[0], bucket)) diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 41cb25827..6c659572b 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -2072,7 +2072,12 @@ func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix str if err != nil { return healObjectFn(bucket, entry.name, "") } - + if opts.Remove && !opts.DryRun { + err := z.CheckAbandonedParts(ctx, bucket, entry.name, opts) + if err != nil { + logger.LogIf(ctx, fmt.Errorf("unable to check object %s/%s for abandoned data: %w", bucket, entry.name, err)) + } + } for _, version := range fivs.Versions { err := healObjectFn(bucket, version.Name, version.VersionID) if err != nil && !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { @@ -2424,3 +2429,30 @@ func (z *erasureServerPools) RestoreTransitionedObject(ctx context.Context, buck return z.serverPools[idx].RestoreTransitionedObject(ctx, bucket, object, opts) } + +func (z *erasureServerPools) CheckAbandonedParts(ctx context.Context, bucket, object string, opts madmin.HealOpts) error { + object = encodeDirObject(object) + if z.SinglePool() { + return z.serverPools[0].CheckAbandonedParts(ctx, bucket, object, opts) + } + errs := make([]error, len(z.serverPools)) + var wg sync.WaitGroup + for idx, pool := range z.serverPools { + if z.IsSuspended(idx) { + continue + } + wg.Add(1) + go func(idx int, pool *erasureSets) { + defer wg.Done() + err := pool.CheckAbandonedParts(ctx, bucket, object, opts) + if err != nil && !isErrObjectNotFound(err) && !isErrVersionNotFound(err) { + errs[idx] = err + } + }(idx, pool) + } + wg.Wait() + for _, err := range errs { + return err + } + return nil +} diff --git a/cmd/erasure-sets.go b/cmd/erasure-sets.go index 17f53a68b..8c9d1c392 100644 --- a/cmd/erasure-sets.go +++ b/cmd/erasure-sets.go @@ -1412,3 +1412,8 @@ func (s *erasureSets) TransitionObject(ctx context.Context, bucket, object strin func (s *erasureSets) RestoreTransitionedObject(ctx context.Context, bucket, object string, opts ObjectOptions) error { return s.getHashedSet(object).RestoreTransitionedObject(ctx, bucket, object, opts) } + +// CheckAbandonedParts - check object for abandoned parts. +func (s *erasureSets) CheckAbandonedParts(ctx context.Context, bucket, object string, opts madmin.HealOpts) error { + return s.getHashedSet(object).checkAbandonedParts(ctx, bucket, object, opts) +} diff --git a/cmd/global-heal.go b/cmd/global-heal.go index 43b38ba73..f483a1f78 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -342,7 +342,12 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string, // erasureObjects layer needs object names to be encoded encodedEntryName := encodeDirObject(entry.name) - + if healDeleteDangling { + err := er.checkAbandonedParts(ctx, bucket, encodedEntryName, madmin.HealOpts{Remove: healDeleteDangling}) + if err != nil { + logger.LogIf(ctx, fmt.Errorf("unable to check object %s/%s for abandoned data: %w", bucket, entry.name, err)) + } + } for _, version := range fivs.Versions { if _, err := er.HealObject(ctx, bucket, encodedEntryName, version.VersionID, madmin.HealOpts{ diff --git a/cmd/healingmetric_string.go b/cmd/healingmetric_string.go index d940d67ef..012fc7aa9 100644 --- a/cmd/healingmetric_string.go +++ b/cmd/healingmetric_string.go @@ -10,11 +10,12 @@ func _() { var x [1]struct{} _ = x[healingMetricBucket-0] _ = x[healingMetricObject-1] + _ = x[healingMetricCheckAbandonedParts-2] } -const _healingMetric_name = "BucketObject" +const _healingMetric_name = "BucketObjectCheckAbandonedParts" -var _healingMetric_index = [...]uint8{0, 6, 12} +var _healingMetric_index = [...]uint8{0, 6, 12, 31} func (i healingMetric) String() string { if i >= healingMetric(len(_healingMetric_index)-1) { diff --git a/cmd/naughty-disk_test.go b/cmd/naughty-disk_test.go index 017c6ce36..84a2b31eb 100644 --- a/cmd/naughty-disk_test.go +++ b/cmd/naughty-disk_test.go @@ -309,3 +309,10 @@ func (d *naughtyDisk) ReadMultiple(ctx context.Context, req ReadMultipleReq, res } return d.disk.ReadMultiple(ctx, req, resp) } + +func (d *naughtyDisk) CleanAbandonedData(ctx context.Context, volume string, path string) error { + if err := d.calcError(); err != nil { + return err + } + return d.disk.CleanAbandonedData(ctx, volume, path) +} diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 0236e63fd..2dc275809 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -249,6 +249,7 @@ type ObjectLayer interface { HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, fn HealObjectFn) error + CheckAbandonedParts(ctx context.Context, bucket, object string, opts madmin.HealOpts) error // Returns health of the backend Health(ctx context.Context, opts HealthOptions) HealthResult diff --git a/cmd/scannermetric_string.go b/cmd/scannermetric_string.go index 7d9743803..4f69dea55 100644 --- a/cmd/scannermetric_string.go +++ b/cmd/scannermetric_string.go @@ -18,18 +18,20 @@ func _() { _ = x[scannerMetricILM-7] _ = x[scannerMetricCheckReplication-8] _ = x[scannerMetricYield-9] - _ = x[scannerMetricStartTrace-10] - _ = x[scannerMetricScanObject-11] - _ = x[scannerMetricLastRealtime-12] - _ = x[scannerMetricScanFolder-13] - _ = x[scannerMetricScanCycle-14] - _ = x[scannerMetricScanBucketDisk-15] - _ = x[scannerMetricLast-16] + _ = x[scannerMetricCleanAbandoned-10] + _ = x[scannerMetricApplyNonCurrent-11] + _ = x[scannerMetricStartTrace-12] + _ = x[scannerMetricScanObject-13] + _ = x[scannerMetricLastRealtime-14] + _ = x[scannerMetricScanFolder-15] + _ = x[scannerMetricScanCycle-16] + _ = x[scannerMetricScanBucketDisk-17] + _ = x[scannerMetricLast-18] } -const _scannerMetric_name = "ReadMetadataCheckMissingSaveUsageApplyAllApplyVersionTierObjSweepHealCheckILMCheckReplicationYieldStartTraceScanObjectLastRealtimeScanFolderScanCycleScanBucketDiskLast" +const _scannerMetric_name = "ReadMetadataCheckMissingSaveUsageApplyAllApplyVersionTierObjSweepHealCheckILMCheckReplicationYieldCleanAbandonedApplyNonCurrentStartTraceScanObjectLastRealtimeScanFolderScanCycleScanBucketDiskLast" -var _scannerMetric_index = [...]uint8{0, 12, 24, 33, 41, 53, 65, 74, 77, 93, 98, 108, 118, 130, 140, 149, 163, 167} +var _scannerMetric_index = [...]uint8{0, 12, 24, 33, 41, 53, 65, 74, 77, 93, 98, 112, 127, 137, 147, 159, 169, 178, 192, 196} func (i scannerMetric) String() string { if i >= scannerMetric(len(_scannerMetric_index)-1) { diff --git a/cmd/storage-interface.go b/cmd/storage-interface.go index 85573148e..950884a33 100644 --- a/cmd/storage-interface.go +++ b/cmd/storage-interface.go @@ -99,6 +99,7 @@ type StorageAPI interface { VerifyFile(ctx context.Context, volume, path string, fi FileInfo) error StatInfoFile(ctx context.Context, volume, path string, glob bool) (stat []StatInfo, err error) ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) error + CleanAbandonedData(ctx context.Context, volume string, path string) error // Write all data, syncs the data to disk. // Should be used for smaller payloads. @@ -279,3 +280,7 @@ func (p *unrecognizedDisk) ReadMultiple(ctx context.Context, req ReadMultipleReq close(resp) return errDiskNotFound } + +func (p *unrecognizedDisk) CleanAbandonedData(ctx context.Context, volume string, path string) error { + return errDiskNotFound +} diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 0792c1c11..646a1bd30 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -777,6 +777,24 @@ func (client *storageRESTClient) ReadMultiple(ctx context.Context, req ReadMulti } } +// CleanAbandonedData will read metadata of the object on disk +// and delete any data directories and inline data that isn't referenced in metadata. +func (client *storageRESTClient) CleanAbandonedData(ctx context.Context, volume string, path string) error { + values := make(url.Values) + values.Set(storageRESTVolume, volume) + values.Set(storageRESTFilePath, path) + respBody, err := client.call(ctx, storageRESTMethodCleanAbandoned, values, nil, -1) + if err != nil { + return err + } + defer xhttp.DrainBody(respBody) + respReader, err := waitForHTTPResponse(respBody) + if err == nil { + io.Copy(io.Discard, respReader) + } + return err +} + // Close - marks the client as closed. func (client *storageRESTClient) Close() error { client.restClient.Close() diff --git a/cmd/storage-rest-common.go b/cmd/storage-rest-common.go index b6326e317..c6c149454 100644 --- a/cmd/storage-rest-common.go +++ b/cmd/storage-rest-common.go @@ -54,6 +54,7 @@ const ( storageRESTMethodWalkDir = "/walkdir" storageRESTMethodStatInfoFile = "/statfile" storageRESTMethodReadMultiple = "/readmultiple" + storageRESTMethodCleanAbandoned = "/cleanabandoned" ) const ( diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 7eccbe318..9a555606d 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -768,6 +768,19 @@ func (s *storageRESTServer) RenameFileHandler(w http.ResponseWriter, r *http.Req } } +// CleanAbandonedDataHandler - Clean unused data directories. +func (s *storageRESTServer) CleanAbandonedDataHandler(w http.ResponseWriter, r *http.Request) { + if !s.IsValid(w, r) { + return + } + volume := r.Form.Get(storageRESTVolume) + filePath := r.Form.Get(storageRESTFilePath) + if volume == "" || filePath == "" { + return // Ignore + } + keepHTTPResponseAlive(w)(s.storage.CleanAbandonedData(r.Context(), volume, filePath)) +} + // closeNotifier is itself a ReadCloser that will notify when either an error occurs or // the Close() function is called. type closeNotifier struct { @@ -1379,6 +1392,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalkDir).HandlerFunc(httpTraceHdrs(server.WalkDirHandler)) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodStatInfoFile).HandlerFunc(httpTraceHdrs(server.StatInfoFile)) subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadMultiple).HandlerFunc(httpTraceHdrs(server.ReadMultiple)) + subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodCleanAbandoned).HandlerFunc(httpTraceHdrs(server.CleanAbandonedDataHandler)) } } } diff --git a/cmd/storagemetric_string.go b/cmd/storagemetric_string.go index a01cf54d4..eb1bb34ce 100644 --- a/cmd/storagemetric_string.go +++ b/cmd/storagemetric_string.go @@ -34,12 +34,13 @@ func _() { _ = x[storageMetricReadAll-23] _ = x[storageMetricStatInfoFile-24] _ = x[storageMetricReadMultiple-25] - _ = x[storageMetricLast-26] + _ = x[storageMetricDeleteAbandonedParts-26] + _ = x[storageMetricLast-27] } -const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadXLReadAllStatInfoFileReadMultipleLast" +const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadXLReadAllStatInfoFileReadMultipleDeleteAbandonedPartsLast" -var _storageMetric_index = [...]uint16{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 134, 148, 158, 166, 179, 192, 206, 217, 223, 230, 242, 254, 258} +var _storageMetric_index = [...]uint16{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 134, 148, 158, 166, 179, 192, 206, 217, 223, 230, 242, 254, 274, 278} func (i storageMetric) String() string { if i >= storageMetric(len(_storageMetric_index)-1) { diff --git a/cmd/xl-storage-disk-id-check.go b/cmd/xl-storage-disk-id-check.go index ac8b12879..520e6ff90 100644 --- a/cmd/xl-storage-disk-id-check.go +++ b/cmd/xl-storage-disk-id-check.go @@ -65,6 +65,7 @@ const ( storageMetricReadAll storageMetricStatInfoFile storageMetricReadMultiple + storageMetricDeleteAbandonedParts // .... add more @@ -526,6 +527,18 @@ func (p *xlStorageDiskIDCheck) ReadMultiple(ctx context.Context, req ReadMultipl return p.storage.ReadMultiple(ctx, req, resp) } +// CleanAbandonedData will read metadata of the object on disk +// and delete any data directories and inline data that isn't referenced in metadata. +func (p *xlStorageDiskIDCheck) CleanAbandonedData(ctx context.Context, volume string, path string) error { + ctx, done, err := p.TrackDiskHealth(ctx, storageMetricDeleteAbandonedParts, volume, path) + if err != nil { + return err + } + defer done(&err) + + return p.storage.CleanAbandonedData(ctx, volume, path) +} + func storageTrace(s storageMetric, startTime time.Time, duration time.Duration, path string) madmin.TraceInfo { return madmin.TraceInfo{ TraceType: madmin.TraceStorage, diff --git a/cmd/xl-storage-format-v2.go b/cmd/xl-storage-format-v2.go index 1cf14b59c..5f62a2f0f 100644 --- a/cmd/xl-storage-format-v2.go +++ b/cmd/xl-storage-format-v2.go @@ -1191,6 +1191,40 @@ func (x *xlMetaV2) setIdx(idx int, ver xlMetaV2Version) (err error) { return nil } +// getDataDirs will return all data directories in the metadata +// as well as all version ids used for inline data. +func (x *xlMetaV2) getDataDirs() ([]string, error) { + dds := make([]string, len(x.versions)*2) + for i, ver := range x.versions { + if ver.header.Type == DeleteType { + continue + } + + obj, err := x.getIdx(i) + if err != nil { + return nil, err + } + switch ver.header.Type { + case ObjectType: + if obj.ObjectV2 == nil { + return nil, errors.New("obj.ObjectV2 unexpectedly nil") + } + dds = append(dds, uuid.UUID(obj.ObjectV2.DataDir).String()) + if obj.ObjectV2.VersionID == [16]byte{} { + dds = append(dds, nullVersionID) + } else { + dds = append(dds, uuid.UUID(obj.ObjectV2.VersionID).String()) + } + case LegacyType: + if obj.ObjectV1 == nil { + return nil, errors.New("obj.ObjectV1 unexpectedly nil") + } + dds = append(dds, obj.ObjectV1.DataDir) + } + } + return dds, nil +} + // sortByModTime will sort versions by modtime in descending order, // meaning index 0 will be latest version. func (x *xlMetaV2) sortByModTime() { diff --git a/cmd/xl-storage.go b/cmd/xl-storage.go index 648ecdb9d..164f4d4fc 100644 --- a/cmd/xl-storage.go +++ b/cmd/xl-storage.go @@ -36,6 +36,7 @@ import ( "time" "github.com/dustin/go-humanize" + "github.com/google/uuid" jsoniter "github.com/json-iterator/go" "github.com/minio/madmin-go" "github.com/minio/minio/internal/bucket/lifecycle" @@ -2708,3 +2709,101 @@ func (s *xlStorage) StatInfoFile(ctx context.Context, volume, path string, glob } return stat, nil } + +// CleanAbandonedData will read metadata of the object on disk +// and delete any data directories and inline data that isn't referenced in metadata. +// Metadata itself is not modified, only inline data. +func (s *xlStorage) CleanAbandonedData(ctx context.Context, volume string, path string) error { + if volume == "" || path == "" { + return nil // Ignore + } + + volumeDir, err := s.getVolDir(volume) + if err != nil { + return err + } + baseDir := pathJoin(volumeDir, path+slashSeparator) + metaPath := pathutil.Join(baseDir, xlStorageFormatFile) + buf, _, err := s.readAllData(ctx, volumeDir, metaPath) + if err != nil { + return err + } + defer metaDataPoolPut(buf) + + if !isXL2V1Format(buf) { + return nil + } + var xl xlMetaV2 + err = xl.LoadOrConvert(buf) + if err != nil { + return err + } + foundDirs := make(map[string]struct{}, len(xl.versions)) + err = readDirFn(baseDir, func(name string, typ os.FileMode) error { + if !typ.IsDir() { + return nil + } + // See if directory has a UUID name. + base := filepath.Base(name) + _, err := uuid.Parse(base) + if err == nil { + foundDirs[base] = struct{}{} + } + return nil + }) + if err != nil { + return err + } + wantDirs, err := xl.getDataDirs() + if err != nil { + return err + } + + // Delete all directories we expect to be there. + for _, dir := range wantDirs { + delete(foundDirs, dir) + } + + // Delete excessive directories. + // Do not abort on context errors. + for dir := range foundDirs { + toRemove := pathJoin(volumeDir, path, dir+SlashSeparator) + err := s.deleteFile(volumeDir, toRemove, true, true) + diskHealthCheckOK(ctx, err) + } + + // Do the same for inline data + dirs, err := xl.data.list() + if err != nil { + return err + } + // Clear and repopulate + for k := range foundDirs { + delete(foundDirs, k) + } + // Populate into map + for _, k := range dirs { + foundDirs[k] = struct{}{} + } + // Delete all directories we expect to be there. + for _, dir := range wantDirs { + delete(foundDirs, dir) + } + + // Delete excessive inline entries. + if len(foundDirs) > 0 { + // Convert to slice. + dirs = dirs[:0] + for dir := range foundDirs { + dirs = append(dirs, dir) + } + if xl.data.remove(dirs...) { + newBuf, err := xl.AppendTo(metaDataPoolGet()) + if err == nil { + defer metaDataPoolPut(newBuf) + return s.writeAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf, false) + } + } + } + return nil +}