mirror of https://github.com/minio/minio.git
make renameData() more defensive during overwrites (#19548)
instead upon any error in renameData(), we still preserve the existing dataDir in some form for recoverability in strange situations such as out of disk space type errors. Bonus: avoid running list and heal() instead allow versions disparity to return the actual versions, uuid to heal. Currently limit this to 100 versions and lesser disparate objects. an undo now reverts back the xl.meta from xl.meta.bkp during overwrites on such flaky setups. Bonus: Save N depth syscalls via skipping the parents upon overwrites and versioned updates. Flaky setup examples are stretch clusters with regular packet drops etc, we need to add some defensive code around to avoid dangling objects.
This commit is contained in:
parent
ee1047bd52
commit
9693c382a8
|
@ -24,10 +24,7 @@ if [ ! -f ./mc ]; then
|
||||||
chmod +x mc
|
chmod +x mc
|
||||||
fi
|
fi
|
||||||
|
|
||||||
(
|
go install -v github.com/minio/minio/docs/debugging/s3-check-md5@latest
|
||||||
cd ./docs/debugging/s3-check-md5
|
|
||||||
go install -v
|
|
||||||
)
|
|
||||||
|
|
||||||
export RELEASE=RELEASE.2023-08-29T23-07-35Z
|
export RELEASE=RELEASE.2023-08-29T23-07-35Z
|
||||||
|
|
||||||
|
|
|
@ -87,10 +87,7 @@ function verify_rewrite() {
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
(
|
go install -v github.com/minio/minio/docs/debugging/s3-check-md5@latest
|
||||||
cd ./docs/debugging/s3-check-md5
|
|
||||||
go install -v
|
|
||||||
)
|
|
||||||
|
|
||||||
if ! s3-check-md5 \
|
if ! s3-check-md5 \
|
||||||
-debug \
|
-debug \
|
||||||
|
|
|
@ -19,6 +19,7 @@ package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
"hash/crc32"
|
"hash/crc32"
|
||||||
|
|
||||||
|
@ -26,14 +27,17 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// figure out the most commonVersions across disk that satisfies
|
// figure out the most commonVersions across disk that satisfies
|
||||||
// the 'writeQuorum' this function returns '0' if quorum cannot
|
// the 'writeQuorum' this function returns "" if quorum cannot
|
||||||
// be achieved and disks have too many inconsistent versions.
|
// be achieved and disks have too many inconsistent versions.
|
||||||
func reduceCommonVersions(diskVersions []uint64, writeQuorum int) (commonVersions uint64) {
|
func reduceCommonVersions(diskVersions [][]byte, writeQuorum int) (versions []byte) {
|
||||||
diskVersionsCount := make(map[uint64]int)
|
diskVersionsCount := make(map[uint64]int)
|
||||||
for _, versions := range diskVersions {
|
for _, versions := range diskVersions {
|
||||||
diskVersionsCount[versions]++
|
if len(versions) > 0 {
|
||||||
|
diskVersionsCount[binary.BigEndian.Uint64(versions)]++
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var commonVersions uint64
|
||||||
max := 0
|
max := 0
|
||||||
for versions, count := range diskVersionsCount {
|
for versions, count := range diskVersionsCount {
|
||||||
if max < count {
|
if max < count {
|
||||||
|
@ -43,10 +47,38 @@ func reduceCommonVersions(diskVersions []uint64, writeQuorum int) (commonVersion
|
||||||
}
|
}
|
||||||
|
|
||||||
if max >= writeQuorum {
|
if max >= writeQuorum {
|
||||||
return commonVersions
|
for _, versions := range diskVersions {
|
||||||
|
if binary.BigEndian.Uint64(versions) == commonVersions {
|
||||||
|
return versions
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0
|
return []byte{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// figure out the most commonVersions across disk that satisfies
|
||||||
|
// the 'writeQuorum' this function returns '0' if quorum cannot
|
||||||
|
// be achieved and disks have too many inconsistent versions.
|
||||||
|
func reduceCommonDataDir(dataDirs []string, writeQuorum int) (dataDir string) {
|
||||||
|
dataDirsCount := make(map[string]int)
|
||||||
|
for _, ddir := range dataDirs {
|
||||||
|
dataDirsCount[ddir]++
|
||||||
|
}
|
||||||
|
|
||||||
|
max := 0
|
||||||
|
for ddir, count := range dataDirsCount {
|
||||||
|
if max < count {
|
||||||
|
max = count
|
||||||
|
dataDir = ddir
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if max >= writeQuorum {
|
||||||
|
return dataDir
|
||||||
|
}
|
||||||
|
|
||||||
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns number of errors that occurred the most (incl. nil) and the
|
// Returns number of errors that occurred the most (incl. nil) and the
|
||||||
|
|
|
@ -1311,24 +1311,28 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Rename the multipart object to final location.
|
// Rename the multipart object to final location.
|
||||||
onlineDisks, versionsDisparity, err := renameData(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath,
|
onlineDisks, versions, oldDataDir, err := renameData(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath,
|
||||||
partsMetadata, bucket, object, writeQuorum)
|
partsMetadata, bucket, object, writeQuorum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return oi, toObjectErr(err, bucket, object)
|
return oi, toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !opts.Speedtest && versionsDisparity {
|
if err = er.commitRenameDataDir(ctx, bucket, object, oldDataDir, onlineDisks); err != nil {
|
||||||
|
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !opts.Speedtest && len(versions) > 0 {
|
||||||
globalMRFState.addPartialOp(partialOperation{
|
globalMRFState.addPartialOp(partialOperation{
|
||||||
bucket: bucket,
|
bucket: bucket,
|
||||||
object: object,
|
object: object,
|
||||||
queued: time.Now(),
|
queued: time.Now(),
|
||||||
allVersions: true,
|
versions: versions,
|
||||||
setIndex: er.setIndex,
|
setIndex: er.setIndex,
|
||||||
poolIndex: er.poolIndex,
|
poolIndex: er.poolIndex,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
if !opts.Speedtest && !versionsDisparity {
|
if !opts.Speedtest && len(versions) == 0 {
|
||||||
// Check if there is any offline disk and add it to the MRF list
|
// Check if there is any offline disk and add it to the MRF list
|
||||||
for _, disk := range onlineDisks {
|
for _, disk := range onlineDisks {
|
||||||
if disk != nil && disk.IsOnline() {
|
if disk != nil && disk.IsOnline() {
|
||||||
|
|
|
@ -48,7 +48,6 @@ import (
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
"github.com/minio/pkg/v2/mimedb"
|
"github.com/minio/pkg/v2/mimedb"
|
||||||
"github.com/minio/pkg/v2/sync/errgroup"
|
"github.com/minio/pkg/v2/sync/errgroup"
|
||||||
"github.com/minio/pkg/v2/wildcard"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// list all errors which can be ignored in object operations.
|
// list all errors which can be ignored in object operations.
|
||||||
|
@ -1006,7 +1005,7 @@ func (er erasureObjects) getObjectInfoAndQuorum(ctx context.Context, bucket, obj
|
||||||
}
|
}
|
||||||
|
|
||||||
// Similar to rename but renames data from srcEntry to dstEntry at dataDir
|
// Similar to rename but renames data from srcEntry to dstEntry at dataDir
|
||||||
func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry string, metadata []FileInfo, dstBucket, dstEntry string, writeQuorum int) ([]StorageAPI, bool, error) {
|
func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry string, metadata []FileInfo, dstBucket, dstEntry string, writeQuorum int) ([]StorageAPI, []byte, string, error) {
|
||||||
g := errgroup.WithNErrs(len(disks))
|
g := errgroup.WithNErrs(len(disks))
|
||||||
|
|
||||||
fvID := mustGetUUID()
|
fvID := mustGetUUID()
|
||||||
|
@ -1014,7 +1013,8 @@ func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry str
|
||||||
metadata[index].SetTierFreeVersionID(fvID)
|
metadata[index].SetTierFreeVersionID(fvID)
|
||||||
}
|
}
|
||||||
|
|
||||||
diskVersions := make([]uint64, len(disks))
|
diskVersions := make([][]byte, len(disks))
|
||||||
|
dataDirs := make([]string, len(disks))
|
||||||
// Rename file on all underlying storage disks.
|
// Rename file on all underlying storage disks.
|
||||||
for index := range disks {
|
for index := range disks {
|
||||||
index := index
|
index := index
|
||||||
|
@ -1033,11 +1033,12 @@ func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry str
|
||||||
if !fi.IsValid() {
|
if !fi.IsValid() {
|
||||||
return errFileCorrupt
|
return errFileCorrupt
|
||||||
}
|
}
|
||||||
sign, err := disks[index].RenameData(ctx, srcBucket, srcEntry, fi, dstBucket, dstEntry, RenameOptions{})
|
resp, err := disks[index].RenameData(ctx, srcBucket, srcEntry, fi, dstBucket, dstEntry, RenameOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
diskVersions[index] = sign
|
diskVersions[index] = resp.Sign
|
||||||
|
dataDirs[index] = resp.OldDataDir
|
||||||
return nil
|
return nil
|
||||||
}, index)
|
}, index)
|
||||||
}
|
}
|
||||||
|
@ -1045,8 +1046,6 @@ func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry str
|
||||||
// Wait for all renames to finish.
|
// Wait for all renames to finish.
|
||||||
errs := g.Wait()
|
errs := g.Wait()
|
||||||
|
|
||||||
var versionsDisparity bool
|
|
||||||
|
|
||||||
err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
dg := errgroup.WithNErrs(len(disks))
|
dg := errgroup.WithNErrs(len(disks))
|
||||||
|
@ -1060,27 +1059,35 @@ func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry str
|
||||||
// caller this dangling object will be now scheduled to be removed
|
// caller this dangling object will be now scheduled to be removed
|
||||||
// via active healing.
|
// via active healing.
|
||||||
dg.Go(func() error {
|
dg.Go(func() error {
|
||||||
return disks[index].DeleteVersion(context.Background(), dstBucket, dstEntry, metadata[index], false, DeleteOptions{UndoWrite: true})
|
return disks[index].DeleteVersion(context.Background(), dstBucket, dstEntry, metadata[index], false, DeleteOptions{
|
||||||
|
UndoWrite: true,
|
||||||
|
OldDataDir: dataDirs[index],
|
||||||
|
})
|
||||||
}, index)
|
}, index)
|
||||||
}
|
}
|
||||||
dg.Wait()
|
dg.Wait()
|
||||||
}
|
}
|
||||||
|
var dataDir string
|
||||||
|
var versions []byte
|
||||||
if err == nil {
|
if err == nil {
|
||||||
versions := reduceCommonVersions(diskVersions, writeQuorum)
|
versions = reduceCommonVersions(diskVersions, writeQuorum)
|
||||||
for index, dversions := range diskVersions {
|
for index, dversions := range diskVersions {
|
||||||
if errs[index] != nil {
|
if errs[index] != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if versions != dversions {
|
if !bytes.Equal(dversions, versions) {
|
||||||
versionsDisparity = true
|
if len(dversions) > len(versions) {
|
||||||
|
versions = dversions
|
||||||
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
dataDir = reduceCommonDataDir(dataDirs, writeQuorum)
|
||||||
}
|
}
|
||||||
|
|
||||||
// We can safely allow RenameData errors up to len(er.getDisks()) - writeQuorum
|
// We can safely allow RenameData errors up to len(er.getDisks()) - writeQuorum
|
||||||
// otherwise return failure.
|
// otherwise return failure.
|
||||||
return evalDisks(disks, errs), versionsDisparity, err
|
return evalDisks(disks, errs), versions, dataDir, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (er erasureObjects) putMetacacheObject(ctx context.Context, key string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
func (er erasureObjects) putMetacacheObject(ctx context.Context, key string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
||||||
|
@ -1229,44 +1236,6 @@ func (er erasureObjects) PutObject(ctx context.Context, bucket string, object st
|
||||||
return er.putObject(ctx, bucket, object, data, opts)
|
return er.putObject(ctx, bucket, object, data, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Heal up to two versions of one object when there is disparity between disks
|
|
||||||
func healObjectVersionsDisparity(bucket string, entry metaCacheEntry, scanMode madmin.HealScanMode) error {
|
|
||||||
if entry.isDir() {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
// We might land at .metacache, .trash, .multipart
|
|
||||||
// no need to heal them skip, only when bucket
|
|
||||||
// is '.minio.sys'
|
|
||||||
if bucket == minioMetaBucket {
|
|
||||||
if wildcard.Match("buckets/*/.metacache/*", entry.name) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if wildcard.Match("tmp/*", entry.name) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if wildcard.Match("multipart/*", entry.name) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if wildcard.Match("tmp-old/*", entry.name) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fivs, err := entry.fileInfoVersions(bucket)
|
|
||||||
if err != nil {
|
|
||||||
healObject(bucket, entry.name, "", madmin.HealDeepScan)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(fivs.Versions) <= 2 {
|
|
||||||
for _, version := range fivs.Versions {
|
|
||||||
healObject(bucket, entry.name, version.VersionID, scanMode)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// putObject wrapper for erasureObjects PutObject
|
// putObject wrapper for erasureObjects PutObject
|
||||||
func (er erasureObjects) putObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
func (er erasureObjects) putObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
||||||
if !opts.NoAuditLog {
|
if !opts.NoAuditLog {
|
||||||
|
@ -1547,7 +1516,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rename the successfully written temporary object to final location.
|
// Rename the successfully written temporary object to final location.
|
||||||
onlineDisks, versionsDisparity, err := renameData(ctx, onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, bucket, object, writeQuorum)
|
onlineDisks, versions, oldDataDir, err := renameData(ctx, onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, bucket, object, writeQuorum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, errFileNotFound) {
|
if errors.Is(err, errFileNotFound) {
|
||||||
return ObjectInfo{}, toObjectErr(errErasureWriteQuorum, bucket, object)
|
return ObjectInfo{}, toObjectErr(errErasureWriteQuorum, bucket, object)
|
||||||
|
@ -1555,6 +1524,10 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
||||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err = er.commitRenameDataDir(ctx, bucket, object, oldDataDir, onlineDisks); err != nil {
|
||||||
|
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||||
|
}
|
||||||
|
|
||||||
for i := 0; i < len(onlineDisks); i++ {
|
for i := 0; i < len(onlineDisks); i++ {
|
||||||
if onlineDisks[i] != nil && onlineDisks[i].IsOnline() {
|
if onlineDisks[i] != nil && onlineDisks[i].IsOnline() {
|
||||||
// Object info is the same in all disks, so we can pick
|
// Object info is the same in all disks, so we can pick
|
||||||
|
@ -1569,7 +1542,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
||||||
// When there is versions disparity we are healing
|
// When there is versions disparity we are healing
|
||||||
// the content implicitly for all versions, we can
|
// the content implicitly for all versions, we can
|
||||||
// avoid triggering another MRF heal for offline drives.
|
// avoid triggering another MRF heal for offline drives.
|
||||||
if !versionsDisparity {
|
if len(versions) == 0 {
|
||||||
// Whether a disk was initially or becomes offline
|
// Whether a disk was initially or becomes offline
|
||||||
// during this upload, send it to the MRF list.
|
// during this upload, send it to the MRF list.
|
||||||
for i := 0; i < len(onlineDisks); i++ {
|
for i := 0; i < len(onlineDisks); i++ {
|
||||||
|
@ -1582,12 +1555,12 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
globalMRFState.addPartialOp(partialOperation{
|
globalMRFState.addPartialOp(partialOperation{
|
||||||
bucket: bucket,
|
bucket: bucket,
|
||||||
object: object,
|
object: object,
|
||||||
queued: time.Now(),
|
queued: time.Now(),
|
||||||
allVersions: true,
|
versions: versions,
|
||||||
setIndex: er.setIndex,
|
setIndex: er.setIndex,
|
||||||
poolIndex: er.poolIndex,
|
poolIndex: er.poolIndex,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1797,6 +1770,30 @@ func (er erasureObjects) DeleteObjects(ctx context.Context, bucket string, objec
|
||||||
return dobjects, errs
|
return dobjects, errs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (er erasureObjects) commitRenameDataDir(ctx context.Context, bucket, object, dataDir string, onlineDisks []StorageAPI) error {
|
||||||
|
if dataDir == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
g := errgroup.WithNErrs(len(onlineDisks))
|
||||||
|
for index := range onlineDisks {
|
||||||
|
index := index
|
||||||
|
g.Go(func() error {
|
||||||
|
if onlineDisks[index] == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return onlineDisks[index].Delete(ctx, bucket, pathJoin(object, dataDir), DeleteOptions{
|
||||||
|
Recursive: true,
|
||||||
|
})
|
||||||
|
}, index)
|
||||||
|
}
|
||||||
|
for _, err := range g.Wait() {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (er erasureObjects) deletePrefix(ctx context.Context, bucket, prefix string) error {
|
func (er erasureObjects) deletePrefix(ctx context.Context, bucket, prefix string) error {
|
||||||
disks := er.getDisks()
|
disks := er.getDisks()
|
||||||
g := errgroup.WithNErrs(len(disks))
|
g := errgroup.WithNErrs(len(disks))
|
||||||
|
|
|
@ -530,16 +530,7 @@ func healObject(bucket, object, versionID string, scan madmin.HealScanMode) erro
|
||||||
// Get background heal sequence to send elements to heal
|
// Get background heal sequence to send elements to heal
|
||||||
bgSeq, ok := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
|
bgSeq, ok := globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID)
|
||||||
if ok {
|
if ok {
|
||||||
return bgSeq.queueHealTask(healSource{
|
return bgSeq.healObject(bucket, object, versionID, scan)
|
||||||
bucket: bucket,
|
|
||||||
object: object,
|
|
||||||
versionID: versionID,
|
|
||||||
noWait: true, // do not block callers.
|
|
||||||
opts: &madmin.HealOpts{
|
|
||||||
Remove: healDeleteDangling, // if found dangling purge it.
|
|
||||||
ScanMode: scan,
|
|
||||||
},
|
|
||||||
}, madmin.HealItemObject)
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
12
cmd/mrf.go
12
cmd/mrf.go
|
@ -21,6 +21,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
"github.com/minio/madmin-go/v3"
|
"github.com/minio/madmin-go/v3"
|
||||||
"github.com/minio/pkg/v2/wildcard"
|
"github.com/minio/pkg/v2/wildcard"
|
||||||
)
|
)
|
||||||
|
@ -35,7 +36,7 @@ type partialOperation struct {
|
||||||
bucket string
|
bucket string
|
||||||
object string
|
object string
|
||||||
versionID string
|
versionID string
|
||||||
allVersions bool
|
versions []byte
|
||||||
setIndex, poolIndex int
|
setIndex, poolIndex int
|
||||||
queued time.Time
|
queued time.Time
|
||||||
scanMode madmin.HealScanMode
|
scanMode madmin.HealScanMode
|
||||||
|
@ -111,8 +112,13 @@ func (m *mrfState) healRoutine(z *erasureServerPools) {
|
||||||
if u.object == "" {
|
if u.object == "" {
|
||||||
healBucket(u.bucket, scan)
|
healBucket(u.bucket, scan)
|
||||||
} else {
|
} else {
|
||||||
if u.allVersions {
|
if len(u.versions) > 0 {
|
||||||
z.serverPools[u.poolIndex].sets[u.setIndex].listAndHeal(u.bucket, u.object, u.scanMode, healObjectVersionsDisparity)
|
vers := len(u.versions) / 16
|
||||||
|
if vers > 0 {
|
||||||
|
for i := 0; i < vers; i++ {
|
||||||
|
healObject(u.bucket, u.object, uuid.UUID(u.versions[16*i:]).String(), scan)
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
healObject(u.bucket, u.object, u.versionID, scan)
|
healObject(u.bucket, u.object, u.versionID, scan)
|
||||||
}
|
}
|
||||||
|
|
|
@ -201,9 +201,9 @@ func (d *naughtyDisk) AppendFile(ctx context.Context, volume string, path string
|
||||||
return d.disk.AppendFile(ctx, volume, path, buf)
|
return d.disk.AppendFile(ctx, volume, path, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *naughtyDisk) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string, opts RenameOptions) (uint64, error) {
|
func (d *naughtyDisk) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string, opts RenameOptions) (RenameDataResp, error) {
|
||||||
if err := d.calcError(); err != nil {
|
if err := d.calcError(); err != nil {
|
||||||
return 0, err
|
return RenameDataResp{}, err
|
||||||
}
|
}
|
||||||
return d.disk.RenameData(ctx, srcVolume, srcPath, fi, dstVolume, dstPath, opts)
|
return d.disk.RenameData(ctx, srcVolume, srcPath, fi, dstVolume, dstPath, opts)
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,8 @@ type DeleteOptions struct {
|
||||||
Recursive bool `msg:"r"`
|
Recursive bool `msg:"r"`
|
||||||
Immediate bool `msg:"i"`
|
Immediate bool `msg:"i"`
|
||||||
UndoWrite bool `msg:"u"`
|
UndoWrite bool `msg:"u"`
|
||||||
|
// OldDataDir of the previous object
|
||||||
|
OldDataDir string `msg:"o,omitempty"` // old data dir used only when to revert a rename()
|
||||||
}
|
}
|
||||||
|
|
||||||
// BaseOptions represents common options for all Storage API calls
|
// BaseOptions represents common options for all Storage API calls
|
||||||
|
@ -490,8 +492,14 @@ type WriteAllHandlerParams struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// RenameDataResp - RenameData()'s response.
|
// RenameDataResp - RenameData()'s response.
|
||||||
|
// Provides information about the final state of Rename()
|
||||||
|
// - on xl.meta (array of versions) on disk to check for version disparity
|
||||||
|
// - on rewrite dataDir on disk that must be additionally purged
|
||||||
|
// only after as a 2-phase call, allowing the older dataDir to
|
||||||
|
// hang-around in-case we need some form of recovery.
|
||||||
type RenameDataResp struct {
|
type RenameDataResp struct {
|
||||||
Signature uint64 `msg:"sig"`
|
Sign []byte
|
||||||
|
OldDataDir string // contains '<uuid>', it is designed to be passed as value to Delete(bucket, pathJoin(object, dataDir))
|
||||||
}
|
}
|
||||||
|
|
||||||
// LocalDiskIDs - GetLocalIDs response.
|
// LocalDiskIDs - GetLocalIDs response.
|
||||||
|
|
|
@ -514,6 +514,12 @@ func (z *DeleteOptions) DecodeMsg(dc *msgp.Reader) (err error) {
|
||||||
err = msgp.WrapError(err, "UndoWrite")
|
err = msgp.WrapError(err, "UndoWrite")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
case "o":
|
||||||
|
z.OldDataDir, err = dc.ReadString()
|
||||||
|
if err != nil {
|
||||||
|
err = msgp.WrapError(err, "OldDataDir")
|
||||||
|
return
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
err = dc.Skip()
|
err = dc.Skip()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -527,9 +533,24 @@ func (z *DeleteOptions) DecodeMsg(dc *msgp.Reader) (err error) {
|
||||||
|
|
||||||
// EncodeMsg implements msgp.Encodable
|
// EncodeMsg implements msgp.Encodable
|
||||||
func (z *DeleteOptions) EncodeMsg(en *msgp.Writer) (err error) {
|
func (z *DeleteOptions) EncodeMsg(en *msgp.Writer) (err error) {
|
||||||
// map header, size 4
|
// check for omitted fields
|
||||||
|
zb0001Len := uint32(5)
|
||||||
|
var zb0001Mask uint8 /* 5 bits */
|
||||||
|
_ = zb0001Mask
|
||||||
|
if z.OldDataDir == "" {
|
||||||
|
zb0001Len--
|
||||||
|
zb0001Mask |= 0x10
|
||||||
|
}
|
||||||
|
// variable map header, size zb0001Len
|
||||||
|
err = en.Append(0x80 | uint8(zb0001Len))
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if zb0001Len == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
// write "BaseOptions"
|
// write "BaseOptions"
|
||||||
err = en.Append(0x84, 0xab, 0x42, 0x61, 0x73, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73)
|
err = en.Append(0xab, 0x42, 0x61, 0x73, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -569,15 +590,39 @@ func (z *DeleteOptions) EncodeMsg(en *msgp.Writer) (err error) {
|
||||||
err = msgp.WrapError(err, "UndoWrite")
|
err = msgp.WrapError(err, "UndoWrite")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if (zb0001Mask & 0x10) == 0 { // if not omitted
|
||||||
|
// write "o"
|
||||||
|
err = en.Append(0xa1, 0x6f)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err = en.WriteString(z.OldDataDir)
|
||||||
|
if err != nil {
|
||||||
|
err = msgp.WrapError(err, "OldDataDir")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// MarshalMsg implements msgp.Marshaler
|
// MarshalMsg implements msgp.Marshaler
|
||||||
func (z *DeleteOptions) MarshalMsg(b []byte) (o []byte, err error) {
|
func (z *DeleteOptions) MarshalMsg(b []byte) (o []byte, err error) {
|
||||||
o = msgp.Require(b, z.Msgsize())
|
o = msgp.Require(b, z.Msgsize())
|
||||||
// map header, size 4
|
// check for omitted fields
|
||||||
|
zb0001Len := uint32(5)
|
||||||
|
var zb0001Mask uint8 /* 5 bits */
|
||||||
|
_ = zb0001Mask
|
||||||
|
if z.OldDataDir == "" {
|
||||||
|
zb0001Len--
|
||||||
|
zb0001Mask |= 0x10
|
||||||
|
}
|
||||||
|
// variable map header, size zb0001Len
|
||||||
|
o = append(o, 0x80|uint8(zb0001Len))
|
||||||
|
if zb0001Len == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
// string "BaseOptions"
|
// string "BaseOptions"
|
||||||
o = append(o, 0x84, 0xab, 0x42, 0x61, 0x73, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73)
|
o = append(o, 0xab, 0x42, 0x61, 0x73, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73)
|
||||||
// map header, size 0
|
// map header, size 0
|
||||||
_ = z.BaseOptions
|
_ = z.BaseOptions
|
||||||
o = append(o, 0x80)
|
o = append(o, 0x80)
|
||||||
|
@ -590,6 +635,11 @@ func (z *DeleteOptions) MarshalMsg(b []byte) (o []byte, err error) {
|
||||||
// string "u"
|
// string "u"
|
||||||
o = append(o, 0xa1, 0x75)
|
o = append(o, 0xa1, 0x75)
|
||||||
o = msgp.AppendBool(o, z.UndoWrite)
|
o = msgp.AppendBool(o, z.UndoWrite)
|
||||||
|
if (zb0001Mask & 0x10) == 0 { // if not omitted
|
||||||
|
// string "o"
|
||||||
|
o = append(o, 0xa1, 0x6f)
|
||||||
|
o = msgp.AppendString(o, z.OldDataDir)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -652,6 +702,12 @@ func (z *DeleteOptions) UnmarshalMsg(bts []byte) (o []byte, err error) {
|
||||||
err = msgp.WrapError(err, "UndoWrite")
|
err = msgp.WrapError(err, "UndoWrite")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
case "o":
|
||||||
|
z.OldDataDir, bts, err = msgp.ReadStringBytes(bts)
|
||||||
|
if err != nil {
|
||||||
|
err = msgp.WrapError(err, "OldDataDir")
|
||||||
|
return
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
bts, err = msgp.Skip(bts)
|
bts, err = msgp.Skip(bts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -666,7 +722,7 @@ func (z *DeleteOptions) UnmarshalMsg(bts []byte) (o []byte, err error) {
|
||||||
|
|
||||||
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
|
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
|
||||||
func (z *DeleteOptions) Msgsize() (s int) {
|
func (z *DeleteOptions) Msgsize() (s int) {
|
||||||
s = 1 + 12 + 1 + 2 + msgp.BoolSize + 2 + msgp.BoolSize + 2 + msgp.BoolSize
|
s = 1 + 12 + 1 + 2 + msgp.BoolSize + 2 + msgp.BoolSize + 2 + msgp.BoolSize + 2 + msgp.StringPrefixSize + len(z.OldDataDir)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4751,10 +4807,16 @@ func (z *RenameDataResp) DecodeMsg(dc *msgp.Reader) (err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
switch msgp.UnsafeString(field) {
|
switch msgp.UnsafeString(field) {
|
||||||
case "sig":
|
case "Sign":
|
||||||
z.Signature, err = dc.ReadUint64()
|
z.Sign, err = dc.ReadBytes(z.Sign)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = msgp.WrapError(err, "Signature")
|
err = msgp.WrapError(err, "Sign")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case "OldDataDir":
|
||||||
|
z.OldDataDir, err = dc.ReadString()
|
||||||
|
if err != nil {
|
||||||
|
err = msgp.WrapError(err, "OldDataDir")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
|
@ -4769,28 +4831,41 @@ func (z *RenameDataResp) DecodeMsg(dc *msgp.Reader) (err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// EncodeMsg implements msgp.Encodable
|
// EncodeMsg implements msgp.Encodable
|
||||||
func (z RenameDataResp) EncodeMsg(en *msgp.Writer) (err error) {
|
func (z *RenameDataResp) EncodeMsg(en *msgp.Writer) (err error) {
|
||||||
// map header, size 1
|
// map header, size 2
|
||||||
// write "sig"
|
// write "Sign"
|
||||||
err = en.Append(0x81, 0xa3, 0x73, 0x69, 0x67)
|
err = en.Append(0x82, 0xa4, 0x53, 0x69, 0x67, 0x6e)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err = en.WriteUint64(z.Signature)
|
err = en.WriteBytes(z.Sign)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = msgp.WrapError(err, "Signature")
|
err = msgp.WrapError(err, "Sign")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// write "OldDataDir"
|
||||||
|
err = en.Append(0xaa, 0x4f, 0x6c, 0x64, 0x44, 0x61, 0x74, 0x61, 0x44, 0x69, 0x72)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err = en.WriteString(z.OldDataDir)
|
||||||
|
if err != nil {
|
||||||
|
err = msgp.WrapError(err, "OldDataDir")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// MarshalMsg implements msgp.Marshaler
|
// MarshalMsg implements msgp.Marshaler
|
||||||
func (z RenameDataResp) MarshalMsg(b []byte) (o []byte, err error) {
|
func (z *RenameDataResp) MarshalMsg(b []byte) (o []byte, err error) {
|
||||||
o = msgp.Require(b, z.Msgsize())
|
o = msgp.Require(b, z.Msgsize())
|
||||||
// map header, size 1
|
// map header, size 2
|
||||||
// string "sig"
|
// string "Sign"
|
||||||
o = append(o, 0x81, 0xa3, 0x73, 0x69, 0x67)
|
o = append(o, 0x82, 0xa4, 0x53, 0x69, 0x67, 0x6e)
|
||||||
o = msgp.AppendUint64(o, z.Signature)
|
o = msgp.AppendBytes(o, z.Sign)
|
||||||
|
// string "OldDataDir"
|
||||||
|
o = append(o, 0xaa, 0x4f, 0x6c, 0x64, 0x44, 0x61, 0x74, 0x61, 0x44, 0x69, 0x72)
|
||||||
|
o = msgp.AppendString(o, z.OldDataDir)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4812,10 +4887,16 @@ func (z *RenameDataResp) UnmarshalMsg(bts []byte) (o []byte, err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
switch msgp.UnsafeString(field) {
|
switch msgp.UnsafeString(field) {
|
||||||
case "sig":
|
case "Sign":
|
||||||
z.Signature, bts, err = msgp.ReadUint64Bytes(bts)
|
z.Sign, bts, err = msgp.ReadBytesBytes(bts, z.Sign)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = msgp.WrapError(err, "Signature")
|
err = msgp.WrapError(err, "Sign")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case "OldDataDir":
|
||||||
|
z.OldDataDir, bts, err = msgp.ReadStringBytes(bts)
|
||||||
|
if err != nil {
|
||||||
|
err = msgp.WrapError(err, "OldDataDir")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
|
@ -4831,8 +4912,8 @@ func (z *RenameDataResp) UnmarshalMsg(bts []byte) (o []byte, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
|
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
|
||||||
func (z RenameDataResp) Msgsize() (s int) {
|
func (z *RenameDataResp) Msgsize() (s int) {
|
||||||
s = 1 + 4 + msgp.Uint64Size
|
s = 1 + 5 + msgp.BytesPrefixSize + len(z.Sign) + 11 + msgp.StringPrefixSize + len(z.OldDataDir)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -85,7 +85,7 @@ type StorageAPI interface {
|
||||||
UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo, opts UpdateMetadataOpts) error
|
UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo, opts UpdateMetadataOpts) error
|
||||||
ReadVersion(ctx context.Context, origvolume, volume, path, versionID string, opts ReadOptions) (FileInfo, error)
|
ReadVersion(ctx context.Context, origvolume, volume, path, versionID string, opts ReadOptions) (FileInfo, error)
|
||||||
ReadXL(ctx context.Context, volume, path string, readData bool) (RawFileInfo, error)
|
ReadXL(ctx context.Context, volume, path string, readData bool) (RawFileInfo, error)
|
||||||
RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string, opts RenameOptions) (uint64, error)
|
RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string, opts RenameOptions) (RenameDataResp, error)
|
||||||
|
|
||||||
// File operations.
|
// File operations.
|
||||||
ListDir(ctx context.Context, origvolume, volume, dirPath string, count int) ([]string, error)
|
ListDir(ctx context.Context, origvolume, volume, dirPath string, count int) ([]string, error)
|
||||||
|
|
|
@ -440,7 +440,9 @@ func (client *storageRESTClient) CheckParts(ctx context.Context, volume string,
|
||||||
}
|
}
|
||||||
|
|
||||||
// RenameData - rename source path to destination path atomically, metadata and data file.
|
// RenameData - rename source path to destination path atomically, metadata and data file.
|
||||||
func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string, opts RenameOptions) (sign uint64, err error) {
|
func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo,
|
||||||
|
dstVolume, dstPath string, opts RenameOptions,
|
||||||
|
) (res RenameDataResp, err error) {
|
||||||
params := RenameDataHandlerParams{
|
params := RenameDataHandlerParams{
|
||||||
DiskID: *client.diskID.Load(),
|
DiskID: *client.diskID.Load(),
|
||||||
SrcVolume: srcVolume,
|
SrcVolume: srcVolume,
|
||||||
|
@ -457,11 +459,11 @@ func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcP
|
||||||
resp, err = storageRenameDataInlineRPC.Call(ctx, client.gridConn, &RenameDataInlineHandlerParams{params})
|
resp, err = storageRenameDataInlineRPC.Call(ctx, client.gridConn, &RenameDataInlineHandlerParams{params})
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, toStorageErr(err)
|
return res, toStorageErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
defer storageRenameDataRPC.PutResponse(resp)
|
defer storageRenameDataRPC.PutResponse(resp)
|
||||||
return resp.Signature, nil
|
return *resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// where we keep old *Readers
|
// where we keep old *Readers
|
||||||
|
|
|
@ -67,7 +67,7 @@ var (
|
||||||
storageWriteAllRPC = grid.NewSingleHandler[*WriteAllHandlerParams, grid.NoPayload](grid.HandlerWriteAll, func() *WriteAllHandlerParams { return &WriteAllHandlerParams{} }, grid.NewNoPayload)
|
storageWriteAllRPC = grid.NewSingleHandler[*WriteAllHandlerParams, grid.NoPayload](grid.HandlerWriteAll, func() *WriteAllHandlerParams { return &WriteAllHandlerParams{} }, grid.NewNoPayload)
|
||||||
storageReadVersionRPC = grid.NewSingleHandler[*grid.MSS, *FileInfo](grid.HandlerReadVersion, grid.NewMSS, func() *FileInfo { return &FileInfo{} })
|
storageReadVersionRPC = grid.NewSingleHandler[*grid.MSS, *FileInfo](grid.HandlerReadVersion, grid.NewMSS, func() *FileInfo { return &FileInfo{} })
|
||||||
storageReadXLRPC = grid.NewSingleHandler[*grid.MSS, *RawFileInfo](grid.HandlerReadXL, grid.NewMSS, func() *RawFileInfo { return &RawFileInfo{} })
|
storageReadXLRPC = grid.NewSingleHandler[*grid.MSS, *RawFileInfo](grid.HandlerReadXL, grid.NewMSS, func() *RawFileInfo { return &RawFileInfo{} })
|
||||||
storageRenameDataRPC = grid.NewSingleHandler[*RenameDataHandlerParams, *RenameDataResp](grid.HandlerRenameData, func() *RenameDataHandlerParams { return &RenameDataHandlerParams{} }, func() *RenameDataResp { return &RenameDataResp{} })
|
storageRenameDataRPC = grid.NewSingleHandler[*RenameDataHandlerParams, *RenameDataResp](grid.HandlerRenameData2, func() *RenameDataHandlerParams { return &RenameDataHandlerParams{} }, func() *RenameDataResp { return &RenameDataResp{} })
|
||||||
storageRenameDataInlineRPC = grid.NewSingleHandler[*RenameDataInlineHandlerParams, *RenameDataResp](grid.HandlerRenameDataInline, newRenameDataInlineHandlerParams, func() *RenameDataResp { return &RenameDataResp{} }).AllowCallRequestPool(false)
|
storageRenameDataInlineRPC = grid.NewSingleHandler[*RenameDataInlineHandlerParams, *RenameDataResp](grid.HandlerRenameDataInline, newRenameDataInlineHandlerParams, func() *RenameDataResp { return &RenameDataResp{} }).AllowCallRequestPool(false)
|
||||||
storageRenameFileRPC = grid.NewSingleHandler[*RenameFileHandlerParams, grid.NoPayload](grid.HandlerRenameFile, func() *RenameFileHandlerParams { return &RenameFileHandlerParams{} }, grid.NewNoPayload).AllowCallRequestPool(true)
|
storageRenameFileRPC = grid.NewSingleHandler[*RenameFileHandlerParams, grid.NoPayload](grid.HandlerRenameFile, func() *RenameFileHandlerParams { return &RenameFileHandlerParams{} }, grid.NewNoPayload).AllowCallRequestPool(true)
|
||||||
storageStatVolRPC = grid.NewSingleHandler[*grid.MSS, *VolInfo](grid.HandlerStatVol, grid.NewMSS, func() *VolInfo { return &VolInfo{} })
|
storageStatVolRPC = grid.NewSingleHandler[*grid.MSS, *VolInfo](grid.HandlerStatVol, grid.NewMSS, func() *VolInfo { return &VolInfo{} })
|
||||||
|
@ -695,10 +695,8 @@ func (s *storageRESTServer) RenameDataHandler(p *RenameDataHandlerParams) (*Rena
|
||||||
return nil, grid.NewRemoteErr(errDiskNotFound)
|
return nil, grid.NewRemoteErr(errDiskNotFound)
|
||||||
}
|
}
|
||||||
|
|
||||||
sign, err := s.getStorage().RenameData(context.Background(), p.SrcVolume, p.SrcPath, p.FI, p.DstVolume, p.DstPath, p.Opts)
|
resp, err := s.getStorage().RenameData(context.Background(), p.SrcVolume, p.SrcPath, p.FI, p.DstVolume, p.DstPath, p.Opts)
|
||||||
return &RenameDataResp{
|
return &resp, grid.NewRemoteErr(err)
|
||||||
Signature: sign,
|
|
||||||
}, grid.NewRemoteErr(err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// RenameDataInlineHandler - renames a meta object and data dir to destination.
|
// RenameDataInlineHandler - renames a meta object and data dir to destination.
|
||||||
|
|
|
@ -461,10 +461,10 @@ func (p *xlStorageDiskIDCheck) RenameFile(ctx context.Context, srcVolume, srcPat
|
||||||
return w.Run(func() error { return p.storage.RenameFile(ctx, srcVolume, srcPath, dstVolume, dstPath) })
|
return w.Run(func() error { return p.storage.RenameFile(ctx, srcVolume, srcPath, dstVolume, dstPath) })
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *xlStorageDiskIDCheck) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string, opts RenameOptions) (sign uint64, err error) {
|
func (p *xlStorageDiskIDCheck) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string, opts RenameOptions) (res RenameDataResp, err error) {
|
||||||
ctx, done, err := p.TrackDiskHealth(ctx, storageMetricRenameData, srcPath, fi.DataDir, dstVolume, dstPath)
|
ctx, done, err := p.TrackDiskHealth(ctx, storageMetricRenameData, srcPath, fi.DataDir, dstVolume, dstPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return res, err
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
if err == nil && !skipAccessChecks(dstVolume) {
|
if err == nil && !skipAccessChecks(dstVolume) {
|
||||||
|
@ -472,11 +472,12 @@ func (p *xlStorageDiskIDCheck) RenameData(ctx context.Context, srcVolume, srcPat
|
||||||
}
|
}
|
||||||
done(&err)
|
done(&err)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Copy inline data to a new buffer to function with deadlines.
|
// Copy inline data to a new buffer to function with deadlines.
|
||||||
if len(fi.Data) > 0 {
|
if len(fi.Data) > 0 {
|
||||||
fi.Data = append(grid.GetByteBufferCap(len(fi.Data))[:0], fi.Data...)
|
fi.Data = append(grid.GetByteBufferCap(len(fi.Data))[:0], fi.Data...)
|
||||||
}
|
}
|
||||||
return xioutil.WithDeadline[uint64](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) (result uint64, err error) {
|
return xioutil.WithDeadline[RenameDataResp](ctx, globalDriveConfig.GetMaxTimeout(), func(ctx context.Context) (res RenameDataResp, err error) {
|
||||||
if len(fi.Data) > 0 {
|
if len(fi.Data) > 0 {
|
||||||
defer grid.PutByteBuffer(fi.Data)
|
defer grid.PutByteBuffer(fi.Data)
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,7 @@ import (
|
||||||
pathutil "path"
|
pathutil "path"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
"slices"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -48,7 +49,6 @@ import (
|
||||||
xioutil "github.com/minio/minio/internal/ioutil"
|
xioutil "github.com/minio/minio/internal/ioutil"
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
"github.com/pkg/xattr"
|
"github.com/pkg/xattr"
|
||||||
"github.com/zeebo/xxh3"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -65,6 +65,9 @@ const (
|
||||||
|
|
||||||
// XL metadata file carries per object metadata.
|
// XL metadata file carries per object metadata.
|
||||||
xlStorageFormatFile = "xl.meta"
|
xlStorageFormatFile = "xl.meta"
|
||||||
|
|
||||||
|
// XL metadata file backup file carries previous per object metadata.
|
||||||
|
xlStorageFormatFileBackup = "xl.meta.bkp"
|
||||||
)
|
)
|
||||||
|
|
||||||
var alignedBuf []byte
|
var alignedBuf []byte
|
||||||
|
@ -1366,6 +1369,10 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
|
||||||
return s.WriteAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf)
|
return s.WriteAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if opts.UndoWrite && opts.OldDataDir != "" {
|
||||||
|
return renameAll(pathJoin(filePath, opts.OldDataDir, xlStorageFormatFileBackup), pathJoin(filePath, xlStorageFormatFile), filePath)
|
||||||
|
}
|
||||||
|
|
||||||
return s.deleteFile(volumeDir, pathJoin(volumeDir, path, xlStorageFormatFile), true, false)
|
return s.deleteFile(volumeDir, pathJoin(volumeDir, path, xlStorageFormatFile), true, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1375,12 +1382,21 @@ func (s *xlStorage) UpdateMetadata(ctx context.Context, volume, path string, fi
|
||||||
return errInvalidArgument
|
return errInvalidArgument
|
||||||
}
|
}
|
||||||
|
|
||||||
|
volumeDir, err := s.getVolDir(volume)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate file path length, before reading.
|
||||||
|
filePath := pathJoin(volumeDir, path)
|
||||||
|
if err = checkPathLength(filePath); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile))
|
buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == errFileNotFound {
|
if err == errFileNotFound && fi.VersionID != "" {
|
||||||
if fi.VersionID != "" {
|
return errFileVersionNotFound
|
||||||
return errFileVersionNotFound
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -1405,7 +1421,7 @@ func (s *xlStorage) UpdateMetadata(ctx context.Context, volume, path string, fi
|
||||||
}
|
}
|
||||||
defer metaDataPoolPut(wbuf)
|
defer metaDataPoolPut(wbuf)
|
||||||
|
|
||||||
return s.writeAll(ctx, volume, pathJoin(path, xlStorageFormatFile), wbuf, !opts.NoPersistence)
|
return s.writeAll(ctx, volume, pathJoin(path, xlStorageFormatFile), wbuf, !opts.NoPersistence, volumeDir)
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteMetadata - writes FileInfo metadata for path at `xl.meta`
|
// WriteMetadata - writes FileInfo metadata for path at `xl.meta`
|
||||||
|
@ -1438,7 +1454,7 @@ func (s *xlStorage) WriteMetadata(ctx context.Context, origvolume, volume, path
|
||||||
// this is currently used by
|
// this is currently used by
|
||||||
// - emphemeral objects such as objects created during listObjects() calls
|
// - emphemeral objects such as objects created during listObjects() calls
|
||||||
// - newMultipartUpload() call..
|
// - newMultipartUpload() call..
|
||||||
return s.writeAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf, false)
|
return s.writeAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf, false, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile))
|
buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile))
|
||||||
|
@ -1936,14 +1952,17 @@ func (s *xlStorage) openFileDirect(path string, mode int) (f *os.File, err error
|
||||||
return w, nil
|
return w, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *xlStorage) openFileSync(filePath string, mode int) (f *os.File, err error) {
|
func (s *xlStorage) openFileSync(filePath string, mode int, skipParent string) (f *os.File, err error) {
|
||||||
return s.openFile(filePath, mode|writeMode)
|
return s.openFile(filePath, mode|writeMode, skipParent)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *xlStorage) openFile(filePath string, mode int) (f *os.File, err error) {
|
func (s *xlStorage) openFile(filePath string, mode int, skipParent string) (f *os.File, err error) {
|
||||||
|
if skipParent == "" {
|
||||||
|
skipParent = s.drivePath
|
||||||
|
}
|
||||||
// Create top level directories if they don't exist.
|
// Create top level directories if they don't exist.
|
||||||
// with mode 0777 mkdir honors system umask.
|
// with mode 0777 mkdir honors system umask.
|
||||||
if err = mkdirAll(pathutil.Dir(filePath), 0o777, s.drivePath); err != nil {
|
if err = mkdirAll(pathutil.Dir(filePath), 0o777, skipParent); err != nil {
|
||||||
return nil, osErrToFileErr(err)
|
return nil, osErrToFileErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2095,18 +2114,22 @@ func (s *xlStorage) CreateFile(ctx context.Context, origvolume, volume, path str
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return s.writeAllDirect(ctx, filePath, fileSize, r, os.O_CREATE|os.O_WRONLY|os.O_EXCL)
|
return s.writeAllDirect(ctx, filePath, fileSize, r, os.O_CREATE|os.O_WRONLY|os.O_EXCL, volumeDir)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *xlStorage) writeAllDirect(ctx context.Context, filePath string, fileSize int64, r io.Reader, flags int) (err error) {
|
func (s *xlStorage) writeAllDirect(ctx context.Context, filePath string, fileSize int64, r io.Reader, flags int, skipParent string) (err error) {
|
||||||
if contextCanceled(ctx) {
|
if contextCanceled(ctx) {
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if skipParent == "" {
|
||||||
|
skipParent = s.drivePath
|
||||||
|
}
|
||||||
|
|
||||||
// Create top level directories if they don't exist.
|
// Create top level directories if they don't exist.
|
||||||
// with mode 0777 mkdir honors system umask.
|
// with mode 0777 mkdir honors system umask.
|
||||||
parentFilePath := pathutil.Dir(filePath)
|
parentFilePath := pathutil.Dir(filePath)
|
||||||
if err = mkdirAll(parentFilePath, 0o777, s.drivePath); err != nil {
|
if err = mkdirAll(parentFilePath, 0o777, skipParent); err != nil {
|
||||||
return osErrToFileErr(err)
|
return osErrToFileErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2168,7 +2191,7 @@ func (s *xlStorage) writeAllDirect(ctx context.Context, filePath string, fileSiz
|
||||||
return w.Close()
|
return w.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *xlStorage) writeAll(ctx context.Context, volume string, path string, b []byte, sync bool) (err error) {
|
func (s *xlStorage) writeAll(ctx context.Context, volume string, path string, b []byte, sync bool, skipParent string) (err error) {
|
||||||
if contextCanceled(ctx) {
|
if contextCanceled(ctx) {
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
}
|
}
|
||||||
|
@ -2194,11 +2217,11 @@ func (s *xlStorage) writeAll(ctx context.Context, volume string, path string, b
|
||||||
// This is an optimization mainly to ensure faster I/O.
|
// This is an optimization mainly to ensure faster I/O.
|
||||||
if len(b) > xioutil.DirectioAlignSize {
|
if len(b) > xioutil.DirectioAlignSize {
|
||||||
r := bytes.NewReader(b)
|
r := bytes.NewReader(b)
|
||||||
return s.writeAllDirect(ctx, filePath, r.Size(), r, flags)
|
return s.writeAllDirect(ctx, filePath, r.Size(), r, flags, skipParent)
|
||||||
}
|
}
|
||||||
w, err = s.openFileSync(filePath, flags)
|
w, err = s.openFileSync(filePath, flags, skipParent)
|
||||||
} else {
|
} else {
|
||||||
w, err = s.openFile(filePath, flags)
|
w, err = s.openFile(filePath, flags, skipParent)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -2235,7 +2258,12 @@ func (s *xlStorage) WriteAll(ctx context.Context, volume string, path string, b
|
||||||
s.Unlock()
|
s.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.writeAll(ctx, volume, path, b, true)
|
volumeDir, err := s.getVolDir(volume)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.writeAll(ctx, volume, path, b, true, volumeDir)
|
||||||
}
|
}
|
||||||
|
|
||||||
// AppendFile - append a byte array at path, if file doesn't exist at
|
// AppendFile - append a byte array at path, if file doesn't exist at
|
||||||
|
@ -2261,7 +2289,7 @@ func (s *xlStorage) AppendFile(ctx context.Context, volume string, path string,
|
||||||
var w *os.File
|
var w *os.File
|
||||||
// Create file if not found. Not doing O_DIRECT here to avoid the code that does buffer aligned writes.
|
// Create file if not found. Not doing O_DIRECT here to avoid the code that does buffer aligned writes.
|
||||||
// AppendFile() is only used by healing code to heal objects written in old format.
|
// AppendFile() is only used by healing code to heal objects written in old format.
|
||||||
w, err = s.openFileSync(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY)
|
w, err = s.openFileSync(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, volumeDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -2425,7 +2453,7 @@ func skipAccessChecks(volume string) (ok bool) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// RenameData - rename source path to destination path atomically, metadata and data directory.
|
// RenameData - rename source path to destination path atomically, metadata and data directory.
|
||||||
func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string, opts RenameOptions) (sign uint64, err error) {
|
func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string, opts RenameOptions) (res RenameDataResp, err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
ignoredErrs := []error{
|
ignoredErrs := []error{
|
||||||
errFileNotFound,
|
errFileNotFound,
|
||||||
|
@ -2451,24 +2479,24 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
||||||
|
|
||||||
srcVolumeDir, err := s.getVolDir(srcVolume)
|
srcVolumeDir, err := s.getVolDir(srcVolume)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
dstVolumeDir, err := s.getVolDir(dstVolume)
|
dstVolumeDir, err := s.getVolDir(dstVolume)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !skipAccessChecks(srcVolume) {
|
if !skipAccessChecks(srcVolume) {
|
||||||
// Stat a volume entry.
|
// Stat a volume entry.
|
||||||
if err = Access(srcVolumeDir); err != nil {
|
if err = Access(srcVolumeDir); err != nil {
|
||||||
return 0, convertAccessError(err, errVolumeAccessDenied)
|
return res, convertAccessError(err, errVolumeAccessDenied)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !skipAccessChecks(dstVolume) {
|
if !skipAccessChecks(dstVolume) {
|
||||||
if err = Access(dstVolumeDir); err != nil {
|
if err = Access(dstVolumeDir); err != nil {
|
||||||
return 0, convertAccessError(err, errVolumeAccessDenied)
|
return res, convertAccessError(err, errVolumeAccessDenied)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2490,13 +2518,17 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = checkPathLength(srcFilePath); err != nil {
|
if err = checkPathLength(srcFilePath); err != nil {
|
||||||
return 0, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = checkPathLength(dstFilePath); err != nil {
|
if err = checkPathLength(dstFilePath); err != nil {
|
||||||
return 0, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.RLock()
|
||||||
|
formatLegacy := s.formatLegacy
|
||||||
|
s.RUnlock()
|
||||||
|
|
||||||
dstBuf, err := xioutil.ReadFile(dstFilePath)
|
dstBuf, err := xioutil.ReadFile(dstFilePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// handle situations when dstFilePath is 'file'
|
// handle situations when dstFilePath is 'file'
|
||||||
|
@ -2506,20 +2538,22 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
||||||
if isSysErrNotDir(err) && runtime.GOOS != globalWindowsOSName {
|
if isSysErrNotDir(err) && runtime.GOOS != globalWindowsOSName {
|
||||||
// NOTE: On windows the error happens at
|
// NOTE: On windows the error happens at
|
||||||
// next line and returns appropriate error.
|
// next line and returns appropriate error.
|
||||||
return 0, errFileAccessDenied
|
return res, errFileAccessDenied
|
||||||
}
|
}
|
||||||
if !osIsNotExist(err) {
|
if !osIsNotExist(err) {
|
||||||
return 0, osErrToFileErr(err)
|
return res, osErrToFileErr(err)
|
||||||
}
|
}
|
||||||
// errFileNotFound comes here.
|
if formatLegacy {
|
||||||
err = s.renameLegacyMetadata(dstVolumeDir, dstPath)
|
// errFileNotFound comes here.
|
||||||
if err != nil && err != errFileNotFound {
|
err = s.renameLegacyMetadata(dstVolumeDir, dstPath)
|
||||||
return 0, err
|
if err != nil && err != errFileNotFound {
|
||||||
}
|
return res, err
|
||||||
if err == nil {
|
}
|
||||||
dstBuf, err = xioutil.ReadFile(dstFilePath)
|
if err == nil {
|
||||||
if err != nil && !osIsNotExist(err) {
|
dstBuf, err = xioutil.ReadFile(dstFilePath)
|
||||||
return 0, osErrToFileErr(err)
|
if err != nil && !osIsNotExist(err) {
|
||||||
|
return res, osErrToFileErr(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2548,9 +2582,6 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
s.RLock()
|
|
||||||
formatLegacy := s.formatLegacy
|
|
||||||
s.RUnlock()
|
|
||||||
// It is possible that some drives may not have `xl.meta` file
|
// It is possible that some drives may not have `xl.meta` file
|
||||||
// in such scenarios verify if at least `part.1` files exist
|
// in such scenarios verify if at least `part.1` files exist
|
||||||
// to verify for legacy version.
|
// to verify for legacy version.
|
||||||
|
@ -2562,7 +2593,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
||||||
currentDataPath := pathJoin(dstVolumeDir, dstPath)
|
currentDataPath := pathJoin(dstVolumeDir, dstPath)
|
||||||
entries, err := readDirN(currentDataPath, 1)
|
entries, err := readDirN(currentDataPath, 1)
|
||||||
if err != nil && err != errFileNotFound {
|
if err != nil && err != errFileNotFound {
|
||||||
return 0, osErrToFileErr(err)
|
return res, osErrToFileErr(err)
|
||||||
}
|
}
|
||||||
for _, entry := range entries {
|
for _, entry := range entries {
|
||||||
if entry == xlStorageFormatFile || strings.HasSuffix(entry, slashSeparator) {
|
if entry == xlStorageFormatFile || strings.HasSuffix(entry, slashSeparator) {
|
||||||
|
@ -2576,64 +2607,58 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
legacyDataPath := pathJoin(dstVolumeDir, dstPath, legacyDataDir)
|
var legacyDataPath string
|
||||||
if legacyPreserved {
|
if formatLegacy {
|
||||||
// Preserve all the legacy data, could be slow, but at max there can be 10,000 parts.
|
legacyDataPath = pathJoin(dstVolumeDir, dstPath, legacyDataDir)
|
||||||
currentDataPath := pathJoin(dstVolumeDir, dstPath)
|
if legacyPreserved {
|
||||||
entries, err := readDir(currentDataPath)
|
// Preserve all the legacy data, could be slow, but at max there can be 1res,000 parts.
|
||||||
if err != nil {
|
currentDataPath := pathJoin(dstVolumeDir, dstPath)
|
||||||
return 0, osErrToFileErr(err)
|
entries, err := readDir(currentDataPath)
|
||||||
}
|
if err != nil {
|
||||||
|
return res, osErrToFileErr(err)
|
||||||
// legacy data dir means its old content, honor system umask.
|
|
||||||
if err = mkdirAll(legacyDataPath, 0o777, dstVolumeDir); err != nil {
|
|
||||||
// any failed mkdir-calls delete them.
|
|
||||||
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
|
|
||||||
return 0, osErrToFileErr(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, entry := range entries {
|
|
||||||
// Skip xl.meta renames further, also ignore any directories such as `legacyDataDir`
|
|
||||||
if entry == xlStorageFormatFile || strings.HasSuffix(entry, slashSeparator) {
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = Rename(pathJoin(currentDataPath, entry), pathJoin(legacyDataPath, entry)); err != nil {
|
// legacy data dir means its old content, honor system umask.
|
||||||
// Any failed rename calls un-roll previous transaction.
|
if err = mkdirAll(legacyDataPath, 0o777, dstVolumeDir); err != nil {
|
||||||
|
// any failed mkdir-calls delete them.
|
||||||
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
|
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
|
||||||
|
return res, osErrToFileErr(err)
|
||||||
|
}
|
||||||
|
|
||||||
return 0, osErrToFileErr(err)
|
for _, entry := range entries {
|
||||||
|
// Skip xl.meta renames further, also ignore any directories such as `legacyDataDir`
|
||||||
|
if entry == xlStorageFormatFile || strings.HasSuffix(entry, slashSeparator) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = Rename(pathJoin(currentDataPath, entry), pathJoin(legacyDataPath, entry)); err != nil {
|
||||||
|
// Any failed rename calls un-roll previous transaction.
|
||||||
|
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
|
||||||
|
|
||||||
|
return res, osErrToFileErr(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var oldDstDataPath, reqVID string
|
// Set skipParent to skip mkdirAll() calls for deeply nested objects
|
||||||
|
// - if its an overwrite
|
||||||
|
// - if its a versioned object
|
||||||
|
//
|
||||||
|
// This can potentiall reduce syscalls by strings.Split(path, "/")
|
||||||
|
// times relative to the object name.
|
||||||
|
skipParent := dstVolumeDir
|
||||||
|
if len(dstBuf) > 0 {
|
||||||
|
skipParent = pathutil.Dir(dstFilePath)
|
||||||
|
}
|
||||||
|
|
||||||
|
var reqVID string
|
||||||
if fi.VersionID == "" {
|
if fi.VersionID == "" {
|
||||||
reqVID = nullVersionID
|
reqVID = nullVersionID
|
||||||
} else {
|
} else {
|
||||||
reqVID = fi.VersionID
|
reqVID = fi.VersionID
|
||||||
}
|
}
|
||||||
|
|
||||||
// Replace the data of null version or any other existing version-id
|
|
||||||
_, ver, err := xlMeta.findVersionStr(reqVID)
|
|
||||||
if err == nil {
|
|
||||||
dataDir := ver.getDataDir()
|
|
||||||
if dataDir != "" && (xlMeta.SharedDataDirCountStr(reqVID, dataDir) == 0) {
|
|
||||||
// Purge the destination path as we are not preserving anything
|
|
||||||
// versioned object was not requested.
|
|
||||||
oldDstDataPath = pathJoin(dstVolumeDir, dstPath, dataDir)
|
|
||||||
// if old destination path is same as new destination path
|
|
||||||
// there is nothing to purge, this is true in case of healing
|
|
||||||
// avoid setting oldDstDataPath at that point.
|
|
||||||
if oldDstDataPath == dstDataPath {
|
|
||||||
oldDstDataPath = ""
|
|
||||||
} else {
|
|
||||||
xlMeta.data.remove(reqVID, dataDir)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Empty fi.VersionID indicates that versioning is either
|
// Empty fi.VersionID indicates that versioning is either
|
||||||
// suspended or disabled on this bucket. RenameData will replace
|
// suspended or disabled on this bucket. RenameData will replace
|
||||||
// the 'null' version. We add a free-version to track its tiered
|
// the 'null' version. We add a free-version to track its tiered
|
||||||
|
@ -2651,74 +2676,115 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
||||||
}
|
}
|
||||||
|
|
||||||
// indicates if RenameData() is called by healing.
|
// indicates if RenameData() is called by healing.
|
||||||
// healing doesn't preserve the dataDir as 'legacy'
|
healing := fi.Healing()
|
||||||
healing := fi.XLV1 && fi.DataDir != legacyDataDir
|
|
||||||
|
// Replace the data of null version or any other existing version-id
|
||||||
|
_, ver, err := xlMeta.findVersionStr(reqVID)
|
||||||
|
if err == nil {
|
||||||
|
dataDir := ver.getDataDir()
|
||||||
|
if dataDir != "" && (xlMeta.SharedDataDirCountStr(reqVID, dataDir) == 0) {
|
||||||
|
// Purge the destination path as we are not preserving anything
|
||||||
|
// versioned object was not requested.
|
||||||
|
res.OldDataDir = dataDir
|
||||||
|
if healing {
|
||||||
|
// if old destination path is same as new destination path
|
||||||
|
// there is nothing to purge, this is true in case of healing
|
||||||
|
// avoid setting OldDataDir at that point.
|
||||||
|
res.OldDataDir = ""
|
||||||
|
} else {
|
||||||
|
xlMeta.data.remove(reqVID, dataDir)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if err = xlMeta.AddVersion(fi); err != nil {
|
if err = xlMeta.AddVersion(fi); err != nil {
|
||||||
if legacyPreserved {
|
if legacyPreserved {
|
||||||
// Any failed rename calls un-roll previous transaction.
|
// Any failed rename calls un-roll previous transaction.
|
||||||
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
|
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
|
||||||
}
|
}
|
||||||
return 0, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var sbuf bytes.Buffer
|
if len(xlMeta.versions) <= 10 {
|
||||||
for _, ver := range xlMeta.versions {
|
// any number of versions beyond this is excessive
|
||||||
sbuf.Write(ver.header.Signature[:])
|
// avoid healing such objects in this manner, let
|
||||||
|
// it heal during the regular scanner cycle.
|
||||||
|
dst := []byte{}
|
||||||
|
for _, ver := range xlMeta.versions {
|
||||||
|
dst = slices.Grow(dst, 16)
|
||||||
|
copy(dst[len(dst):], ver.header.VersionID[:])
|
||||||
|
}
|
||||||
|
res.Sign = dst
|
||||||
}
|
}
|
||||||
sign = xxh3.Hash(sbuf.Bytes())
|
|
||||||
|
|
||||||
dstBuf, err = xlMeta.AppendTo(metaDataPoolGet())
|
newDstBuf, err := xlMeta.AppendTo(metaDataPoolGet())
|
||||||
defer metaDataPoolPut(dstBuf)
|
defer metaDataPoolPut(newDstBuf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if legacyPreserved {
|
if legacyPreserved {
|
||||||
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
|
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
|
||||||
}
|
}
|
||||||
return 0, errFileCorrupt
|
return res, errFileCorrupt
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = s.WriteAll(ctx, srcVolume, pathJoin(srcPath, xlStorageFormatFile), dstBuf); err != nil {
|
if err = s.WriteAll(ctx, srcVolume, pathJoin(srcPath, xlStorageFormatFile), newDstBuf); err != nil {
|
||||||
if legacyPreserved {
|
if legacyPreserved {
|
||||||
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
|
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
|
||||||
}
|
}
|
||||||
return 0, osErrToFileErr(err)
|
return res, osErrToFileErr(err)
|
||||||
}
|
}
|
||||||
diskHealthCheckOK(ctx, err)
|
diskHealthCheckOK(ctx, err)
|
||||||
|
|
||||||
if srcDataPath != "" && len(fi.Data) == 0 && fi.Size > 0 {
|
notInline := srcDataPath != "" && len(fi.Data) == 0 && fi.Size > 0
|
||||||
// renameAll only for objects that have xl.meta not saved inline.
|
if notInline {
|
||||||
s.moveToTrash(dstDataPath, true, false)
|
|
||||||
if healing {
|
if healing {
|
||||||
|
// renameAll only for objects that have xl.meta not saved inline.
|
||||||
|
// this must be done in healing only, otherwise it is expected
|
||||||
|
// that for fresh PutObject() call dstDataPath can never exist.
|
||||||
|
// if its an overwrite then the caller deletes the DataDir
|
||||||
|
// in a separate RPC call.
|
||||||
|
s.moveToTrash(dstDataPath, true, false)
|
||||||
|
|
||||||
// If we are healing we should purge any legacyDataPath content,
|
// If we are healing we should purge any legacyDataPath content,
|
||||||
// that was previously preserved during PutObject() call
|
// that was previously preserved during PutObject() call
|
||||||
// on a versioned bucket.
|
// on a versioned bucket.
|
||||||
s.moveToTrash(legacyDataPath, true, false)
|
s.moveToTrash(legacyDataPath, true, false)
|
||||||
}
|
}
|
||||||
if err = renameAll(srcDataPath, dstDataPath, dstVolumeDir); err != nil {
|
if err = renameAll(srcDataPath, dstDataPath, skipParent); err != nil {
|
||||||
if legacyPreserved {
|
if legacyPreserved {
|
||||||
// Any failed rename calls un-roll previous transaction.
|
// Any failed rename calls un-roll previous transaction.
|
||||||
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
|
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
|
||||||
}
|
}
|
||||||
|
// if its a partial rename() do not attempt to delete recursively.
|
||||||
s.deleteFile(dstVolumeDir, dstDataPath, false, false)
|
s.deleteFile(dstVolumeDir, dstDataPath, false, false)
|
||||||
return 0, osErrToFileErr(err)
|
return res, osErrToFileErr(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// When we are not inlined and there is no oldDataDir present
|
||||||
|
// we backup existing xl.meta -> xl.meta.bkp - this is done to
|
||||||
|
// ensure for some reason we didn't get enough quorum we can
|
||||||
|
// revert this back to original xl.meta and preserve the older dataDir.
|
||||||
|
if notInline && res.OldDataDir != "" {
|
||||||
|
// preserve current xl.meta inside the oldDataDir.
|
||||||
|
if err = s.writeAll(ctx, dstVolume, pathJoin(dstPath, res.OldDataDir, xlStorageFormatFileBackup), dstBuf, true, skipParent); err != nil {
|
||||||
|
if legacyPreserved {
|
||||||
|
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
|
||||||
|
}
|
||||||
|
return res, osErrToFileErr(err)
|
||||||
|
}
|
||||||
|
diskHealthCheckOK(ctx, err)
|
||||||
|
}
|
||||||
|
|
||||||
// Commit meta-file
|
// Commit meta-file
|
||||||
if err = renameAll(srcFilePath, dstFilePath, dstVolumeDir); err != nil {
|
if err = renameAll(srcFilePath, dstFilePath, skipParent); err != nil {
|
||||||
if legacyPreserved {
|
if legacyPreserved {
|
||||||
// Any failed rename calls un-roll previous transaction.
|
// Any failed rename calls un-roll previous transaction.
|
||||||
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
|
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
|
||||||
}
|
}
|
||||||
|
// if its a partial rename() do not attempt to delete recursively.
|
||||||
|
// this can be healed since all parts are available.
|
||||||
s.deleteFile(dstVolumeDir, dstDataPath, false, false)
|
s.deleteFile(dstVolumeDir, dstDataPath, false, false)
|
||||||
return 0, osErrToFileErr(err)
|
return res, osErrToFileErr(err)
|
||||||
}
|
|
||||||
|
|
||||||
// additionally only purge older data at the end of the transaction of new data-dir
|
|
||||||
// movement, this is to ensure that previous data references can co-exist for
|
|
||||||
// any recoverability.
|
|
||||||
if oldDstDataPath != "" {
|
|
||||||
s.moveToTrash(oldDstDataPath, true, false)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if srcVolume != minioMetaMultipartBucket {
|
if srcVolume != minioMetaMultipartBucket {
|
||||||
|
@ -2729,7 +2795,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
||||||
} else {
|
} else {
|
||||||
s.deleteFile(srcVolumeDir, pathutil.Dir(srcFilePath), true, false)
|
s.deleteFile(srcVolumeDir, pathutil.Dir(srcFilePath), true, false)
|
||||||
}
|
}
|
||||||
return sign, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RenameFile - rename source path to destination path atomically.
|
// RenameFile - rename source path to destination path atomically.
|
||||||
|
@ -3047,8 +3113,11 @@ func (s *xlStorage) CleanAbandonedData(ctx context.Context, volume string, path
|
||||||
// Do not abort on context errors.
|
// Do not abort on context errors.
|
||||||
for dir := range foundDirs {
|
for dir := range foundDirs {
|
||||||
toRemove := pathJoin(volumeDir, path, dir+SlashSeparator)
|
toRemove := pathJoin(volumeDir, path, dir+SlashSeparator)
|
||||||
err := s.deleteFile(volumeDir, toRemove, true, true)
|
err = s.deleteFile(volumeDir, toRemove, true, true)
|
||||||
diskHealthCheckOK(ctx, err)
|
diskHealthCheckOK(ctx, err)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do the same for inline data
|
// Do the same for inline data
|
||||||
|
@ -3056,32 +3125,38 @@ func (s *xlStorage) CleanAbandonedData(ctx context.Context, volume string, path
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clear and repopulate
|
// Clear and repopulate
|
||||||
for k := range foundDirs {
|
for k := range foundDirs {
|
||||||
delete(foundDirs, k)
|
delete(foundDirs, k)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Populate into map
|
// Populate into map
|
||||||
for _, k := range dirs {
|
for _, k := range dirs {
|
||||||
foundDirs[k] = struct{}{}
|
foundDirs[k] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete all directories we expect to be there.
|
// Delete all directories we expect to be there.
|
||||||
for _, dir := range wantDirs {
|
for _, dir := range wantDirs {
|
||||||
delete(foundDirs, dir)
|
delete(foundDirs, dir)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Nothing to delete
|
||||||
|
if len(foundDirs) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Delete excessive inline entries.
|
// Delete excessive inline entries.
|
||||||
if len(foundDirs) > 0 {
|
// Convert to slice.
|
||||||
// Convert to slice.
|
dirs = dirs[:0]
|
||||||
dirs = dirs[:0]
|
for dir := range foundDirs {
|
||||||
for dir := range foundDirs {
|
dirs = append(dirs, dir)
|
||||||
dirs = append(dirs, dir)
|
}
|
||||||
}
|
if xl.data.remove(dirs...) {
|
||||||
if xl.data.remove(dirs...) {
|
newBuf, err := xl.AppendTo(metaDataPoolGet())
|
||||||
newBuf, err := xl.AppendTo(metaDataPoolGet())
|
if err == nil {
|
||||||
if err == nil {
|
defer metaDataPoolPut(newBuf)
|
||||||
defer metaDataPoolPut(newBuf)
|
return s.WriteAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf)
|
||||||
return s.WriteAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -43,10 +43,7 @@ unset MINIO_KMS_KES_KEY_FILE
|
||||||
unset MINIO_KMS_KES_ENDPOINT
|
unset MINIO_KMS_KES_ENDPOINT
|
||||||
unset MINIO_KMS_KES_KEY_NAME
|
unset MINIO_KMS_KES_KEY_NAME
|
||||||
|
|
||||||
(
|
go install -v github.com/minio/minio/docs/debugging/s3-check-md5@latest
|
||||||
cd ./docs/debugging/s3-check-md5
|
|
||||||
go install -v
|
|
||||||
)
|
|
||||||
|
|
||||||
wget -q -O mc https://dl.minio.io/client/mc/release/linux-amd64/mc &&
|
wget -q -O mc https://dl.minio.io/client/mc/release/linux-amd64/mc &&
|
||||||
chmod +x mc
|
chmod +x mc
|
||||||
|
|
|
@ -147,10 +147,7 @@ if [ $ret -ne 0 ]; then
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
(
|
go install -v github.com/minio/minio/docs/debugging/s3-check-md5@latest
|
||||||
cd ./docs/debugging/s3-check-md5
|
|
||||||
go install -v
|
|
||||||
)
|
|
||||||
|
|
||||||
s3-check-md5 -versions -access-key minioadmin -secret-key minioadmin -endpoint http://127.0.0.1:9001/ -bucket versioned
|
s3-check-md5 -versions -access-key minioadmin -secret-key minioadmin -endpoint http://127.0.0.1:9001/ -bucket versioned
|
||||||
|
|
||||||
|
|
|
@ -158,10 +158,7 @@ if [ $ret -ne 0 ]; then
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
(
|
go install -v github.com/minio/minio/docs/debugging/s3-check-md5@latest
|
||||||
cd ./docs/debugging/s3-check-md5
|
|
||||||
go install -v
|
|
||||||
)
|
|
||||||
|
|
||||||
s3-check-md5 -versions -access-key minioadmin -secret-key minioadmin -endpoint http://127.0.0.1:9001/ -bucket versioned
|
s3-check-md5 -versions -access-key minioadmin -secret-key minioadmin -endpoint http://127.0.0.1:9001/ -bucket versioned
|
||||||
|
|
||||||
|
|
|
@ -144,10 +144,7 @@ if [ "${expected_checksum}" != "${got_checksum}" ]; then
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
(
|
go install -v github.com/minio/minio/docs/debugging/s3-check-md5@latest
|
||||||
cd ./docs/debugging/s3-check-md5
|
|
||||||
go install -v
|
|
||||||
)
|
|
||||||
|
|
||||||
s3-check-md5 -versions -access-key minioadmin -secret-key minioadmin -endpoint http://127.0.0.1:9001/ -bucket versioned
|
s3-check-md5 -versions -access-key minioadmin -secret-key minioadmin -endpoint http://127.0.0.1:9001/ -bucket versioned
|
||||||
|
|
||||||
|
|
|
@ -210,10 +210,7 @@ if [ "${expected_checksum}" != "${got_checksum}" ]; then
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
(
|
go install -v github.com/minio/minio/docs/debugging/s3-check-md5@latest
|
||||||
cd ./docs/debugging/s3-check-md5
|
|
||||||
go install -v
|
|
||||||
)
|
|
||||||
|
|
||||||
s3-check-md5 -versions -access-key minioadmin -secret-key minioadmin -endpoint http://127.0.0.1:9001/ -bucket bucket2
|
s3-check-md5 -versions -access-key minioadmin -secret-key minioadmin -endpoint http://127.0.0.1:9001/ -bucket bucket2
|
||||||
s3-check-md5 -versions -access-key minioadmin -secret-key minioadmin -endpoint http://127.0.0.1:9001/ -bucket versioned
|
s3-check-md5 -versions -access-key minioadmin -secret-key minioadmin -endpoint http://127.0.0.1:9001/ -bucket versioned
|
||||||
|
|
|
@ -111,6 +111,7 @@ const (
|
||||||
HandlerWriteAll
|
HandlerWriteAll
|
||||||
HandlerListBuckets
|
HandlerListBuckets
|
||||||
HandlerRenameDataInline
|
HandlerRenameDataInline
|
||||||
|
HandlerRenameData2
|
||||||
|
|
||||||
// Add more above here ^^^
|
// Add more above here ^^^
|
||||||
// If all handlers are used, the type of Handler can be changed.
|
// If all handlers are used, the type of Handler can be changed.
|
||||||
|
@ -189,6 +190,8 @@ var handlerPrefixes = [handlerLast]string{
|
||||||
HandlerConsoleLog: peerPrefix,
|
HandlerConsoleLog: peerPrefix,
|
||||||
HandlerListDir: storagePrefix,
|
HandlerListDir: storagePrefix,
|
||||||
HandlerListBuckets: peerPrefixS3,
|
HandlerListBuckets: peerPrefixS3,
|
||||||
|
HandlerRenameDataInline: storagePrefix,
|
||||||
|
HandlerRenameData2: storagePrefix,
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|
|
@ -81,14 +81,15 @@ func _() {
|
||||||
_ = x[HandlerWriteAll-70]
|
_ = x[HandlerWriteAll-70]
|
||||||
_ = x[HandlerListBuckets-71]
|
_ = x[HandlerListBuckets-71]
|
||||||
_ = x[HandlerRenameDataInline-72]
|
_ = x[HandlerRenameDataInline-72]
|
||||||
_ = x[handlerTest-73]
|
_ = x[HandlerRenameData2-73]
|
||||||
_ = x[handlerTest2-74]
|
_ = x[handlerTest-74]
|
||||||
_ = x[handlerLast-75]
|
_ = x[handlerTest2-75]
|
||||||
|
_ = x[handlerLast-76]
|
||||||
}
|
}
|
||||||
|
|
||||||
const _HandlerID_name = "handlerInvalidLockLockLockRLockLockUnlockLockRUnlockLockRefreshLockForceUnlockWalkDirStatVolDiskInfoNSScannerReadXLReadVersionDeleteFileDeleteVersionUpdateMetadataWriteMetadataCheckPartsRenameDataRenameFileReadAllServerVerifyTraceListenDeleteBucketMetadataLoadBucketMetadataReloadSiteReplicationConfigReloadPoolMetaStopRebalanceLoadRebalanceMetaLoadTransitionTierConfigDeletePolicyLoadPolicyLoadPolicyMappingDeleteServiceAccountLoadServiceAccountDeleteUserLoadUserLoadGroupHealBucketMakeBucketHeadBucketDeleteBucketGetMetricsGetResourceMetricsGetMemInfoGetProcInfoGetOSInfoGetPartitionsGetNetInfoGetCPUsServerInfoGetSysConfigGetSysServicesGetSysErrorsGetAllBucketStatsGetBucketStatsGetSRMetricsGetPeerMetricsGetMetacacheListingUpdateMetacacheListingGetPeerBucketMetricsStorageInfoConsoleLogListDirGetLocksBackgroundHealStatusGetLastDayTierStatsSignalServiceGetBandwidthWriteAllListBucketsRenameDataInlinehandlerTesthandlerTest2handlerLast"
|
const _HandlerID_name = "handlerInvalidLockLockLockRLockLockUnlockLockRUnlockLockRefreshLockForceUnlockWalkDirStatVolDiskInfoNSScannerReadXLReadVersionDeleteFileDeleteVersionUpdateMetadataWriteMetadataCheckPartsRenameDataRenameFileReadAllServerVerifyTraceListenDeleteBucketMetadataLoadBucketMetadataReloadSiteReplicationConfigReloadPoolMetaStopRebalanceLoadRebalanceMetaLoadTransitionTierConfigDeletePolicyLoadPolicyLoadPolicyMappingDeleteServiceAccountLoadServiceAccountDeleteUserLoadUserLoadGroupHealBucketMakeBucketHeadBucketDeleteBucketGetMetricsGetResourceMetricsGetMemInfoGetProcInfoGetOSInfoGetPartitionsGetNetInfoGetCPUsServerInfoGetSysConfigGetSysServicesGetSysErrorsGetAllBucketStatsGetBucketStatsGetSRMetricsGetPeerMetricsGetMetacacheListingUpdateMetacacheListingGetPeerBucketMetricsStorageInfoConsoleLogListDirGetLocksBackgroundHealStatusGetLastDayTierStatsSignalServiceGetBandwidthWriteAllListBucketsRenameDataInlineRenameData2handlerTesthandlerTest2handlerLast"
|
||||||
|
|
||||||
var _HandlerID_index = [...]uint16{0, 14, 22, 31, 41, 52, 63, 78, 85, 92, 100, 109, 115, 126, 136, 149, 163, 176, 186, 196, 206, 213, 225, 230, 236, 256, 274, 301, 315, 328, 345, 369, 381, 391, 408, 428, 446, 456, 464, 473, 483, 493, 503, 515, 525, 543, 553, 564, 573, 586, 596, 603, 613, 625, 639, 651, 668, 682, 694, 708, 727, 749, 769, 780, 790, 797, 805, 825, 844, 857, 869, 877, 888, 904, 915, 927, 938}
|
var _HandlerID_index = [...]uint16{0, 14, 22, 31, 41, 52, 63, 78, 85, 92, 100, 109, 115, 126, 136, 149, 163, 176, 186, 196, 206, 213, 225, 230, 236, 256, 274, 301, 315, 328, 345, 369, 381, 391, 408, 428, 446, 456, 464, 473, 483, 493, 503, 515, 525, 543, 553, 564, 573, 586, 596, 603, 613, 625, 639, 651, 668, 682, 694, 708, 727, 749, 769, 780, 790, 797, 805, 825, 844, 857, 869, 877, 888, 904, 915, 926, 938, 949}
|
||||||
|
|
||||||
func (i HandlerID) String() string {
|
func (i HandlerID) String() string {
|
||||||
if i >= HandlerID(len(_HandlerID_index)-1) {
|
if i >= HandlerID(len(_HandlerID_index)-1) {
|
||||||
|
|
Loading…
Reference in New Issue