mirror of
https://github.com/minio/minio.git
synced 2025-01-23 04:33:15 -05:00
Check for abandoned data when healing (#16122)
This commit is contained in:
parent
1f1dcdce65
commit
cc1d8f0057
@ -47,6 +47,8 @@ const (
|
||||
scannerMetricILM
|
||||
scannerMetricCheckReplication
|
||||
scannerMetricYield
|
||||
scannerMetricCleanAbandoned
|
||||
scannerMetricApplyNonCurrent
|
||||
|
||||
// START Trace metrics:
|
||||
scannerMetricStartTrace
|
||||
|
@ -1015,6 +1015,8 @@ func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ Ob
|
||||
if i.lifeCycle == nil {
|
||||
return fivs, nil
|
||||
}
|
||||
done := globalScannerMetrics.time(scannerMetricApplyNonCurrent)
|
||||
defer done()
|
||||
|
||||
_, days, lim := i.lifeCycle.NoncurrentVersionsExpirationLimit(lifecycle.ObjectOpts{Name: i.objectPath()})
|
||||
if lim == 0 || len(fivs) <= lim+1 { // fewer than lim _noncurrent_ versions
|
||||
@ -1071,6 +1073,17 @@ func (i *scannerItem) applyNewerNoncurrentVersionLimit(ctx context.Context, _ Ob
|
||||
// applyVersionActions will apply lifecycle checks on all versions of a scanned item. Returns versions that remain
|
||||
// after applying lifecycle checks configured.
|
||||
func (i *scannerItem) applyVersionActions(ctx context.Context, o ObjectLayer, fivs []FileInfo) ([]FileInfo, error) {
|
||||
if i.heal.enabled {
|
||||
if healDeleteDangling {
|
||||
done := globalScannerMetrics.time(scannerMetricCleanAbandoned)
|
||||
err := o.CheckAbandonedParts(ctx, i.bucket, i.objectPath(), madmin.HealOpts{Remove: healDeleteDangling})
|
||||
done()
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("unable to check object %s/%s for abandoned data: %w", i.bucket, i.objectPath(), err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return i.applyNewerNoncurrentVersionLimit(ctx, o, fivs)
|
||||
}
|
||||
|
||||
|
@ -40,6 +40,7 @@ type healingMetric uint8
|
||||
const (
|
||||
healingMetricBucket healingMetric = iota
|
||||
healingMetricObject
|
||||
healingMetricCheckAbandonedParts
|
||||
)
|
||||
|
||||
// AcceptableDelta returns 'true' if the fi.DiskMTime is under
|
||||
@ -93,7 +94,7 @@ func (er erasureObjects) healBucket(ctx context.Context, storageDisks []StorageA
|
||||
if globalTrace.NumSubscribers(madmin.TraceHealing) > 0 {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
healTrace(healingMetricBucket, startTime, bucket, "", "", opts, err, &res)
|
||||
healTrace(healingMetricBucket, startTime, bucket, "", "", &opts, err, &res)
|
||||
}()
|
||||
}
|
||||
|
||||
@ -294,7 +295,7 @@ func shouldHealObjectOnDisk(erErr, dataErr error, meta FileInfo, latestMeta File
|
||||
}
|
||||
|
||||
// Heals an object by re-writing corrupt/missing erasure blocks.
|
||||
func (er erasureObjects) healObject(ctx context.Context, bucket string, object string, versionID string, opts madmin.HealOpts) (result madmin.HealResultItem, err error) {
|
||||
func (er *erasureObjects) healObject(ctx context.Context, bucket string, object string, versionID string, opts madmin.HealOpts) (result madmin.HealResultItem, err error) {
|
||||
dryRun := opts.DryRun
|
||||
scanMode := opts.ScanMode
|
||||
|
||||
@ -304,7 +305,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
||||
if globalTrace.NumSubscribers(madmin.TraceHealing) > 0 {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
healTrace(healingMetricObject, startTime, bucket, object, versionID, opts, err, &result)
|
||||
healTrace(healingMetricObject, startTime, bucket, object, versionID, &opts, err, &result)
|
||||
}()
|
||||
}
|
||||
// Initialize heal result object
|
||||
@ -673,9 +674,45 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// checkAbandonedParts will check if an object has abandoned parts,
|
||||
// meaning data-dirs or inlined data that are no longer referenced by the xl.meta
|
||||
// Errors are generally ignored by this function.
|
||||
func (er *erasureObjects) checkAbandonedParts(ctx context.Context, bucket string, object string, opts madmin.HealOpts) (err error) {
|
||||
if !opts.Remove || opts.DryRun {
|
||||
return nil
|
||||
}
|
||||
if globalTrace.NumSubscribers(madmin.TraceHealing) > 0 {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
healTrace(healingMetricCheckAbandonedParts, startTime, bucket, object, "", nil, err, nil)
|
||||
}()
|
||||
}
|
||||
if !opts.NoLock {
|
||||
lk := er.NewNSLock(bucket, object)
|
||||
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ctx = lkctx.Context()
|
||||
defer lk.Unlock(lkctx.Cancel)
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
for _, disk := range er.getDisks() {
|
||||
if disk != nil {
|
||||
wg.Add(1)
|
||||
go func(disk StorageAPI) {
|
||||
defer wg.Done()
|
||||
_ = disk.CleanAbandonedData(ctx, bucket, object)
|
||||
}(disk)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
// healObjectDir - heals object directory specifically, this special call
|
||||
// is needed since we do not have a special backend format for directories.
|
||||
func (er erasureObjects) healObjectDir(ctx context.Context, bucket, object string, dryRun bool, remove bool) (hr madmin.HealResultItem, err error) {
|
||||
func (er *erasureObjects) healObjectDir(ctx context.Context, bucket, object string, dryRun bool, remove bool) (hr madmin.HealResultItem, err error) {
|
||||
storageDisks := er.getDisks()
|
||||
storageEndpoints := er.getEndpoints()
|
||||
|
||||
@ -766,7 +803,7 @@ func (er erasureObjects) healObjectDir(ctx context.Context, bucket, object strin
|
||||
|
||||
// Populates default heal result item entries with possible values when we are returning prematurely.
|
||||
// This is to ensure that in any circumstance we are not returning empty arrays with wrong values.
|
||||
func (er erasureObjects) defaultHealResult(lfi FileInfo, storageDisks []StorageAPI, storageEndpoints []Endpoint, errs []error, bucket, object, versionID string) madmin.HealResultItem {
|
||||
func (er *erasureObjects) defaultHealResult(lfi FileInfo, storageDisks []StorageAPI, storageEndpoints []Endpoint, errs []error, bucket, object, versionID string) madmin.HealResultItem {
|
||||
// Initialize heal result object
|
||||
result := madmin.HealResultItem{
|
||||
Type: madmin.HealItemObject,
|
||||
@ -1005,16 +1042,18 @@ func (er erasureObjects) HealObject(ctx context.Context, bucket, object, version
|
||||
}
|
||||
|
||||
// healTrace sends healing results to trace output.
|
||||
func healTrace(funcName healingMetric, startTime time.Time, bucket, object, versionID string, opts madmin.HealOpts, err error, result *madmin.HealResultItem) {
|
||||
func healTrace(funcName healingMetric, startTime time.Time, bucket, object, versionID string, opts *madmin.HealOpts, err error, result *madmin.HealResultItem) {
|
||||
tr := madmin.TraceInfo{
|
||||
TraceType: madmin.TraceHealing,
|
||||
Time: startTime,
|
||||
NodeName: globalLocalNodeName,
|
||||
FuncName: "heal." + funcName.String(),
|
||||
Duration: time.Since(startTime),
|
||||
Message: fmt.Sprintf("dry:%v, rm:%v, recreate:%v mode:%v", opts.DryRun, opts.Remove, opts.Recreate, opts.ScanMode),
|
||||
Path: pathJoin(bucket, decodeDirObject(object)),
|
||||
}
|
||||
if opts != nil {
|
||||
tr.Message = fmt.Sprintf("dry:%v, rm:%v, recreate:%v mode:%v", opts.DryRun, opts.Remove, opts.Recreate, opts.ScanMode)
|
||||
}
|
||||
if versionID != "" && versionID != "null" {
|
||||
tr.Path += " v=" + versionID
|
||||
}
|
||||
|
@ -31,6 +31,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/dustin/go-humanize"
|
||||
uuid2 "github.com/google/uuid"
|
||||
"github.com/minio/madmin-go"
|
||||
)
|
||||
|
||||
@ -276,6 +277,12 @@ func TestHealing(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Checking abandoned parts should do nothing
|
||||
err = er.checkAbandonedParts(ctx, bucket, object, madmin.HealOpts{ScanMode: madmin.HealNormalScan, Remove: true})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = er.HealObject(ctx, bucket, object, "", madmin.HealOpts{ScanMode: madmin.HealNormalScan})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -320,6 +327,214 @@ func TestHealing(t *testing.T) {
|
||||
t.Fatal("HealObject failed")
|
||||
}
|
||||
|
||||
uuid, _ := uuid2.NewRandom()
|
||||
for _, drive := range fsDirs {
|
||||
dir := path.Join(drive, bucket, object, uuid.String())
|
||||
err = os.MkdirAll(dir, os.ModePerm)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = os.WriteFile(pathJoin(dir, "part.1"), []byte("some data"), os.ModePerm)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// This should remove all the unreferenced parts.
|
||||
err = er.checkAbandonedParts(ctx, bucket, object, madmin.HealOpts{ScanMode: madmin.HealNormalScan, Remove: true})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, drive := range fsDirs {
|
||||
dir := path.Join(drive, bucket, object, uuid.String())
|
||||
_, err := os.ReadFile(pathJoin(dir, "part.1"))
|
||||
if err == nil {
|
||||
t.Fatal("expected data dit to be cleaned up")
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the bucket - to simulate the case where bucket was
|
||||
// created when the disk was down.
|
||||
err = os.RemoveAll(path.Join(fsDirs[0], bucket))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// This would create the bucket.
|
||||
_, err = er.HealBucket(ctx, bucket, madmin.HealOpts{
|
||||
DryRun: false,
|
||||
Remove: false,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Stat the bucket to make sure that it was created.
|
||||
_, err = er.getDisks()[0].StatVol(context.Background(), bucket)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Tests both object and bucket healing.
|
||||
func TestHealingVersioned(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
obj, fsDirs, err := prepareErasure16(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer obj.Shutdown(context.Background())
|
||||
|
||||
// initialize the server and obtain the credentials and root.
|
||||
// credentials are necessary to sign the HTTP request.
|
||||
if err = newTestConfig(globalMinioDefaultRegion, obj); err != nil {
|
||||
t.Fatalf("Unable to initialize server config. %s", err)
|
||||
}
|
||||
|
||||
defer removeRoots(fsDirs)
|
||||
|
||||
z := obj.(*erasureServerPools)
|
||||
er := z.serverPools[0].sets[0]
|
||||
|
||||
// Create "bucket"
|
||||
err = obj.MakeBucketWithLocation(ctx, "bucket", MakeBucketOptions{VersioningEnabled: true})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
bucket := "bucket"
|
||||
object := "object"
|
||||
|
||||
data := make([]byte, 1*humanize.MiByte)
|
||||
length := int64(len(data))
|
||||
_, err = rand.Read(data)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
oi1, err := obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(data), length, "", ""), ObjectOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// 2nd version.
|
||||
_, _ = rand.Read(data)
|
||||
oi2, err := obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(data), length, "", ""), ObjectOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
disk := er.getDisks()[0]
|
||||
fileInfoPreHeal1, err := disk.ReadVersion(context.Background(), bucket, object, oi1.VersionID, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
fileInfoPreHeal2, err := disk.ReadVersion(context.Background(), bucket, object, oi2.VersionID, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Remove the object - to simulate the case where the disk was down when the object
|
||||
// was created.
|
||||
err = removeAll(pathJoin(disk.String(), bucket, object))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Checking abandoned parts should do nothing
|
||||
err = er.checkAbandonedParts(ctx, bucket, object, madmin.HealOpts{ScanMode: madmin.HealNormalScan, Remove: true})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = er.HealObject(ctx, bucket, object, "", madmin.HealOpts{ScanMode: madmin.HealNormalScan})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
fileInfoPostHeal1, err := disk.ReadVersion(context.Background(), bucket, object, oi1.VersionID, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
fileInfoPostHeal2, err := disk.ReadVersion(context.Background(), bucket, object, oi2.VersionID, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// After heal the meta file should be as expected.
|
||||
if !fileInfoPreHeal1.Equals(fileInfoPostHeal1) {
|
||||
t.Fatal("HealObject failed")
|
||||
}
|
||||
if !fileInfoPreHeal1.Equals(fileInfoPostHeal2) {
|
||||
t.Fatal("HealObject failed")
|
||||
}
|
||||
|
||||
err = os.RemoveAll(path.Join(fsDirs[0], bucket, object, "xl.meta"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Write xl.meta with different modtime to simulate the case where a disk had
|
||||
// gone down when an object was replaced by a new object.
|
||||
fileInfoOutDated := fileInfoPreHeal1
|
||||
fileInfoOutDated.ModTime = time.Now()
|
||||
err = disk.WriteMetadata(context.Background(), bucket, object, fileInfoOutDated)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = er.HealObject(ctx, bucket, object, "", madmin.HealOpts{ScanMode: madmin.HealDeepScan})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
fileInfoPostHeal1, err = disk.ReadVersion(context.Background(), bucket, object, "", false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// After heal the meta file should be as expected.
|
||||
if !fileInfoPreHeal1.Equals(fileInfoPostHeal1) {
|
||||
t.Fatal("HealObject failed")
|
||||
}
|
||||
|
||||
fileInfoPostHeal2, err = disk.ReadVersion(context.Background(), bucket, object, "", false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// After heal the meta file should be as expected.
|
||||
if !fileInfoPreHeal2.Equals(fileInfoPostHeal2) {
|
||||
t.Fatal("HealObject failed")
|
||||
}
|
||||
|
||||
uuid, _ := uuid2.NewRandom()
|
||||
for _, drive := range fsDirs {
|
||||
dir := path.Join(drive, bucket, object, uuid.String())
|
||||
err = os.MkdirAll(dir, os.ModePerm)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = os.WriteFile(pathJoin(dir, "part.1"), []byte("some data"), os.ModePerm)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// This should remove all the unreferenced parts.
|
||||
err = er.checkAbandonedParts(ctx, bucket, object, madmin.HealOpts{ScanMode: madmin.HealNormalScan, Remove: true})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, drive := range fsDirs {
|
||||
dir := path.Join(drive, bucket, object, uuid.String())
|
||||
_, err := os.ReadFile(pathJoin(dir, "part.1"))
|
||||
if err == nil {
|
||||
t.Fatal("expected data dit to be cleaned up")
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the bucket - to simulate the case where bucket was
|
||||
// created when the disk was down.
|
||||
err = os.RemoveAll(path.Join(fsDirs[0], bucket))
|
||||
|
@ -2072,7 +2072,12 @@ func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix str
|
||||
if err != nil {
|
||||
return healObjectFn(bucket, entry.name, "")
|
||||
}
|
||||
|
||||
if opts.Remove && !opts.DryRun {
|
||||
err := z.CheckAbandonedParts(ctx, bucket, entry.name, opts)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("unable to check object %s/%s for abandoned data: %w", bucket, entry.name, err))
|
||||
}
|
||||
}
|
||||
for _, version := range fivs.Versions {
|
||||
err := healObjectFn(bucket, version.Name, version.VersionID)
|
||||
if err != nil && !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
|
||||
@ -2424,3 +2429,30 @@ func (z *erasureServerPools) RestoreTransitionedObject(ctx context.Context, buck
|
||||
|
||||
return z.serverPools[idx].RestoreTransitionedObject(ctx, bucket, object, opts)
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) CheckAbandonedParts(ctx context.Context, bucket, object string, opts madmin.HealOpts) error {
|
||||
object = encodeDirObject(object)
|
||||
if z.SinglePool() {
|
||||
return z.serverPools[0].CheckAbandonedParts(ctx, bucket, object, opts)
|
||||
}
|
||||
errs := make([]error, len(z.serverPools))
|
||||
var wg sync.WaitGroup
|
||||
for idx, pool := range z.serverPools {
|
||||
if z.IsSuspended(idx) {
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(idx int, pool *erasureSets) {
|
||||
defer wg.Done()
|
||||
err := pool.CheckAbandonedParts(ctx, bucket, object, opts)
|
||||
if err != nil && !isErrObjectNotFound(err) && !isErrVersionNotFound(err) {
|
||||
errs[idx] = err
|
||||
}
|
||||
}(idx, pool)
|
||||
}
|
||||
wg.Wait()
|
||||
for _, err := range errs {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -1412,3 +1412,8 @@ func (s *erasureSets) TransitionObject(ctx context.Context, bucket, object strin
|
||||
func (s *erasureSets) RestoreTransitionedObject(ctx context.Context, bucket, object string, opts ObjectOptions) error {
|
||||
return s.getHashedSet(object).RestoreTransitionedObject(ctx, bucket, object, opts)
|
||||
}
|
||||
|
||||
// CheckAbandonedParts - check object for abandoned parts.
|
||||
func (s *erasureSets) CheckAbandonedParts(ctx context.Context, bucket, object string, opts madmin.HealOpts) error {
|
||||
return s.getHashedSet(object).checkAbandonedParts(ctx, bucket, object, opts)
|
||||
}
|
||||
|
@ -342,7 +342,12 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
|
||||
|
||||
// erasureObjects layer needs object names to be encoded
|
||||
encodedEntryName := encodeDirObject(entry.name)
|
||||
|
||||
if healDeleteDangling {
|
||||
err := er.checkAbandonedParts(ctx, bucket, encodedEntryName, madmin.HealOpts{Remove: healDeleteDangling})
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("unable to check object %s/%s for abandoned data: %w", bucket, entry.name, err))
|
||||
}
|
||||
}
|
||||
for _, version := range fivs.Versions {
|
||||
if _, err := er.HealObject(ctx, bucket, encodedEntryName,
|
||||
version.VersionID, madmin.HealOpts{
|
||||
|
@ -10,11 +10,12 @@ func _() {
|
||||
var x [1]struct{}
|
||||
_ = x[healingMetricBucket-0]
|
||||
_ = x[healingMetricObject-1]
|
||||
_ = x[healingMetricCheckAbandonedParts-2]
|
||||
}
|
||||
|
||||
const _healingMetric_name = "BucketObject"
|
||||
const _healingMetric_name = "BucketObjectCheckAbandonedParts"
|
||||
|
||||
var _healingMetric_index = [...]uint8{0, 6, 12}
|
||||
var _healingMetric_index = [...]uint8{0, 6, 12, 31}
|
||||
|
||||
func (i healingMetric) String() string {
|
||||
if i >= healingMetric(len(_healingMetric_index)-1) {
|
||||
|
@ -309,3 +309,10 @@ func (d *naughtyDisk) ReadMultiple(ctx context.Context, req ReadMultipleReq, res
|
||||
}
|
||||
return d.disk.ReadMultiple(ctx, req, resp)
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) CleanAbandonedData(ctx context.Context, volume string, path string) error {
|
||||
if err := d.calcError(); err != nil {
|
||||
return err
|
||||
}
|
||||
return d.disk.CleanAbandonedData(ctx, volume, path)
|
||||
}
|
||||
|
@ -249,6 +249,7 @@ type ObjectLayer interface {
|
||||
HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (madmin.HealResultItem, error)
|
||||
HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error)
|
||||
HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, fn HealObjectFn) error
|
||||
CheckAbandonedParts(ctx context.Context, bucket, object string, opts madmin.HealOpts) error
|
||||
|
||||
// Returns health of the backend
|
||||
Health(ctx context.Context, opts HealthOptions) HealthResult
|
||||
|
@ -18,18 +18,20 @@ func _() {
|
||||
_ = x[scannerMetricILM-7]
|
||||
_ = x[scannerMetricCheckReplication-8]
|
||||
_ = x[scannerMetricYield-9]
|
||||
_ = x[scannerMetricStartTrace-10]
|
||||
_ = x[scannerMetricScanObject-11]
|
||||
_ = x[scannerMetricLastRealtime-12]
|
||||
_ = x[scannerMetricScanFolder-13]
|
||||
_ = x[scannerMetricScanCycle-14]
|
||||
_ = x[scannerMetricScanBucketDisk-15]
|
||||
_ = x[scannerMetricLast-16]
|
||||
_ = x[scannerMetricCleanAbandoned-10]
|
||||
_ = x[scannerMetricApplyNonCurrent-11]
|
||||
_ = x[scannerMetricStartTrace-12]
|
||||
_ = x[scannerMetricScanObject-13]
|
||||
_ = x[scannerMetricLastRealtime-14]
|
||||
_ = x[scannerMetricScanFolder-15]
|
||||
_ = x[scannerMetricScanCycle-16]
|
||||
_ = x[scannerMetricScanBucketDisk-17]
|
||||
_ = x[scannerMetricLast-18]
|
||||
}
|
||||
|
||||
const _scannerMetric_name = "ReadMetadataCheckMissingSaveUsageApplyAllApplyVersionTierObjSweepHealCheckILMCheckReplicationYieldStartTraceScanObjectLastRealtimeScanFolderScanCycleScanBucketDiskLast"
|
||||
const _scannerMetric_name = "ReadMetadataCheckMissingSaveUsageApplyAllApplyVersionTierObjSweepHealCheckILMCheckReplicationYieldCleanAbandonedApplyNonCurrentStartTraceScanObjectLastRealtimeScanFolderScanCycleScanBucketDiskLast"
|
||||
|
||||
var _scannerMetric_index = [...]uint8{0, 12, 24, 33, 41, 53, 65, 74, 77, 93, 98, 108, 118, 130, 140, 149, 163, 167}
|
||||
var _scannerMetric_index = [...]uint8{0, 12, 24, 33, 41, 53, 65, 74, 77, 93, 98, 112, 127, 137, 147, 159, 169, 178, 192, 196}
|
||||
|
||||
func (i scannerMetric) String() string {
|
||||
if i >= scannerMetric(len(_scannerMetric_index)-1) {
|
||||
|
@ -99,6 +99,7 @@ type StorageAPI interface {
|
||||
VerifyFile(ctx context.Context, volume, path string, fi FileInfo) error
|
||||
StatInfoFile(ctx context.Context, volume, path string, glob bool) (stat []StatInfo, err error)
|
||||
ReadMultiple(ctx context.Context, req ReadMultipleReq, resp chan<- ReadMultipleResp) error
|
||||
CleanAbandonedData(ctx context.Context, volume string, path string) error
|
||||
|
||||
// Write all data, syncs the data to disk.
|
||||
// Should be used for smaller payloads.
|
||||
@ -279,3 +280,7 @@ func (p *unrecognizedDisk) ReadMultiple(ctx context.Context, req ReadMultipleReq
|
||||
close(resp)
|
||||
return errDiskNotFound
|
||||
}
|
||||
|
||||
func (p *unrecognizedDisk) CleanAbandonedData(ctx context.Context, volume string, path string) error {
|
||||
return errDiskNotFound
|
||||
}
|
||||
|
@ -777,6 +777,24 @@ func (client *storageRESTClient) ReadMultiple(ctx context.Context, req ReadMulti
|
||||
}
|
||||
}
|
||||
|
||||
// CleanAbandonedData will read metadata of the object on disk
|
||||
// and delete any data directories and inline data that isn't referenced in metadata.
|
||||
func (client *storageRESTClient) CleanAbandonedData(ctx context.Context, volume string, path string) error {
|
||||
values := make(url.Values)
|
||||
values.Set(storageRESTVolume, volume)
|
||||
values.Set(storageRESTFilePath, path)
|
||||
respBody, err := client.call(ctx, storageRESTMethodCleanAbandoned, values, nil, -1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer xhttp.DrainBody(respBody)
|
||||
respReader, err := waitForHTTPResponse(respBody)
|
||||
if err == nil {
|
||||
io.Copy(io.Discard, respReader)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Close - marks the client as closed.
|
||||
func (client *storageRESTClient) Close() error {
|
||||
client.restClient.Close()
|
||||
|
@ -54,6 +54,7 @@ const (
|
||||
storageRESTMethodWalkDir = "/walkdir"
|
||||
storageRESTMethodStatInfoFile = "/statfile"
|
||||
storageRESTMethodReadMultiple = "/readmultiple"
|
||||
storageRESTMethodCleanAbandoned = "/cleanabandoned"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -768,6 +768,19 @@ func (s *storageRESTServer) RenameFileHandler(w http.ResponseWriter, r *http.Req
|
||||
}
|
||||
}
|
||||
|
||||
// CleanAbandonedDataHandler - Clean unused data directories.
|
||||
func (s *storageRESTServer) CleanAbandonedDataHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
return
|
||||
}
|
||||
volume := r.Form.Get(storageRESTVolume)
|
||||
filePath := r.Form.Get(storageRESTFilePath)
|
||||
if volume == "" || filePath == "" {
|
||||
return // Ignore
|
||||
}
|
||||
keepHTTPResponseAlive(w)(s.storage.CleanAbandonedData(r.Context(), volume, filePath))
|
||||
}
|
||||
|
||||
// closeNotifier is itself a ReadCloser that will notify when either an error occurs or
|
||||
// the Close() function is called.
|
||||
type closeNotifier struct {
|
||||
@ -1379,6 +1392,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalkDir).HandlerFunc(httpTraceHdrs(server.WalkDirHandler))
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodStatInfoFile).HandlerFunc(httpTraceHdrs(server.StatInfoFile))
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadMultiple).HandlerFunc(httpTraceHdrs(server.ReadMultiple))
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodCleanAbandoned).HandlerFunc(httpTraceHdrs(server.CleanAbandonedDataHandler))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -34,12 +34,13 @@ func _() {
|
||||
_ = x[storageMetricReadAll-23]
|
||||
_ = x[storageMetricStatInfoFile-24]
|
||||
_ = x[storageMetricReadMultiple-25]
|
||||
_ = x[storageMetricLast-26]
|
||||
_ = x[storageMetricDeleteAbandonedParts-26]
|
||||
_ = x[storageMetricLast-27]
|
||||
}
|
||||
|
||||
const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadXLReadAllStatInfoFileReadMultipleLast"
|
||||
const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadXLReadAllStatInfoFileReadMultipleDeleteAbandonedPartsLast"
|
||||
|
||||
var _storageMetric_index = [...]uint16{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 134, 148, 158, 166, 179, 192, 206, 217, 223, 230, 242, 254, 258}
|
||||
var _storageMetric_index = [...]uint16{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 134, 148, 158, 166, 179, 192, 206, 217, 223, 230, 242, 254, 274, 278}
|
||||
|
||||
func (i storageMetric) String() string {
|
||||
if i >= storageMetric(len(_storageMetric_index)-1) {
|
||||
|
@ -65,6 +65,7 @@ const (
|
||||
storageMetricReadAll
|
||||
storageMetricStatInfoFile
|
||||
storageMetricReadMultiple
|
||||
storageMetricDeleteAbandonedParts
|
||||
|
||||
// .... add more
|
||||
|
||||
@ -526,6 +527,18 @@ func (p *xlStorageDiskIDCheck) ReadMultiple(ctx context.Context, req ReadMultipl
|
||||
return p.storage.ReadMultiple(ctx, req, resp)
|
||||
}
|
||||
|
||||
// CleanAbandonedData will read metadata of the object on disk
|
||||
// and delete any data directories and inline data that isn't referenced in metadata.
|
||||
func (p *xlStorageDiskIDCheck) CleanAbandonedData(ctx context.Context, volume string, path string) error {
|
||||
ctx, done, err := p.TrackDiskHealth(ctx, storageMetricDeleteAbandonedParts, volume, path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer done(&err)
|
||||
|
||||
return p.storage.CleanAbandonedData(ctx, volume, path)
|
||||
}
|
||||
|
||||
func storageTrace(s storageMetric, startTime time.Time, duration time.Duration, path string) madmin.TraceInfo {
|
||||
return madmin.TraceInfo{
|
||||
TraceType: madmin.TraceStorage,
|
||||
|
@ -1191,6 +1191,40 @@ func (x *xlMetaV2) setIdx(idx int, ver xlMetaV2Version) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// getDataDirs will return all data directories in the metadata
|
||||
// as well as all version ids used for inline data.
|
||||
func (x *xlMetaV2) getDataDirs() ([]string, error) {
|
||||
dds := make([]string, len(x.versions)*2)
|
||||
for i, ver := range x.versions {
|
||||
if ver.header.Type == DeleteType {
|
||||
continue
|
||||
}
|
||||
|
||||
obj, err := x.getIdx(i)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
switch ver.header.Type {
|
||||
case ObjectType:
|
||||
if obj.ObjectV2 == nil {
|
||||
return nil, errors.New("obj.ObjectV2 unexpectedly nil")
|
||||
}
|
||||
dds = append(dds, uuid.UUID(obj.ObjectV2.DataDir).String())
|
||||
if obj.ObjectV2.VersionID == [16]byte{} {
|
||||
dds = append(dds, nullVersionID)
|
||||
} else {
|
||||
dds = append(dds, uuid.UUID(obj.ObjectV2.VersionID).String())
|
||||
}
|
||||
case LegacyType:
|
||||
if obj.ObjectV1 == nil {
|
||||
return nil, errors.New("obj.ObjectV1 unexpectedly nil")
|
||||
}
|
||||
dds = append(dds, obj.ObjectV1.DataDir)
|
||||
}
|
||||
}
|
||||
return dds, nil
|
||||
}
|
||||
|
||||
// sortByModTime will sort versions by modtime in descending order,
|
||||
// meaning index 0 will be latest version.
|
||||
func (x *xlMetaV2) sortByModTime() {
|
||||
|
@ -36,6 +36,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/dustin/go-humanize"
|
||||
"github.com/google/uuid"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
"github.com/minio/madmin-go"
|
||||
"github.com/minio/minio/internal/bucket/lifecycle"
|
||||
@ -2708,3 +2709,101 @@ func (s *xlStorage) StatInfoFile(ctx context.Context, volume, path string, glob
|
||||
}
|
||||
return stat, nil
|
||||
}
|
||||
|
||||
// CleanAbandonedData will read metadata of the object on disk
|
||||
// and delete any data directories and inline data that isn't referenced in metadata.
|
||||
// Metadata itself is not modified, only inline data.
|
||||
func (s *xlStorage) CleanAbandonedData(ctx context.Context, volume string, path string) error {
|
||||
if volume == "" || path == "" {
|
||||
return nil // Ignore
|
||||
}
|
||||
|
||||
volumeDir, err := s.getVolDir(volume)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
baseDir := pathJoin(volumeDir, path+slashSeparator)
|
||||
metaPath := pathutil.Join(baseDir, xlStorageFormatFile)
|
||||
buf, _, err := s.readAllData(ctx, volumeDir, metaPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer metaDataPoolPut(buf)
|
||||
|
||||
if !isXL2V1Format(buf) {
|
||||
return nil
|
||||
}
|
||||
var xl xlMetaV2
|
||||
err = xl.LoadOrConvert(buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
foundDirs := make(map[string]struct{}, len(xl.versions))
|
||||
err = readDirFn(baseDir, func(name string, typ os.FileMode) error {
|
||||
if !typ.IsDir() {
|
||||
return nil
|
||||
}
|
||||
// See if directory has a UUID name.
|
||||
base := filepath.Base(name)
|
||||
_, err := uuid.Parse(base)
|
||||
if err == nil {
|
||||
foundDirs[base] = struct{}{}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
wantDirs, err := xl.getDataDirs()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Delete all directories we expect to be there.
|
||||
for _, dir := range wantDirs {
|
||||
delete(foundDirs, dir)
|
||||
}
|
||||
|
||||
// Delete excessive directories.
|
||||
// Do not abort on context errors.
|
||||
for dir := range foundDirs {
|
||||
toRemove := pathJoin(volumeDir, path, dir+SlashSeparator)
|
||||
err := s.deleteFile(volumeDir, toRemove, true, true)
|
||||
diskHealthCheckOK(ctx, err)
|
||||
}
|
||||
|
||||
// Do the same for inline data
|
||||
dirs, err := xl.data.list()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Clear and repopulate
|
||||
for k := range foundDirs {
|
||||
delete(foundDirs, k)
|
||||
}
|
||||
// Populate into map
|
||||
for _, k := range dirs {
|
||||
foundDirs[k] = struct{}{}
|
||||
}
|
||||
// Delete all directories we expect to be there.
|
||||
for _, dir := range wantDirs {
|
||||
delete(foundDirs, dir)
|
||||
}
|
||||
|
||||
// Delete excessive inline entries.
|
||||
if len(foundDirs) > 0 {
|
||||
// Convert to slice.
|
||||
dirs = dirs[:0]
|
||||
for dir := range foundDirs {
|
||||
dirs = append(dirs, dir)
|
||||
}
|
||||
if xl.data.remove(dirs...) {
|
||||
newBuf, err := xl.AppendTo(metaDataPoolGet())
|
||||
if err == nil {
|
||||
defer metaDataPoolPut(newBuf)
|
||||
return s.writeAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf, false)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user