mirror of
https://github.com/minio/minio.git
synced 2025-01-23 04:33:15 -05:00
verify RenameData() versions to be consistent (#15649)
xl.meta gets written and never rolled back, however we definitely need to validate the state that is persisted on the disk, if there are inconsistencies - more than write quorum we should return an error to the client - if write quorum was achieved however there are inconsistent xl.meta's we should simply trigger an MRF on them
This commit is contained in:
parent
c240da6568
commit
2d9b5a65f1
@ -625,7 +625,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
||||
partsMetadata[i].Erasure.Index = i + 1
|
||||
|
||||
// Attempt a rename now from healed data to final location.
|
||||
if err = disk.RenameData(ctx, minioMetaTmpBucket, tmpID, partsMetadata[i], bucket, object); err != nil {
|
||||
if _, err = disk.RenameData(ctx, minioMetaTmpBucket, tmpID, partsMetadata[i], bucket, object); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return result, err
|
||||
}
|
||||
|
@ -27,6 +27,30 @@ import (
|
||||
"github.com/minio/minio/internal/sync/errgroup"
|
||||
)
|
||||
|
||||
// figure out the most commonVersions across disk that satisfies
|
||||
// the 'writeQuorum' this function returns '0' if quorum cannot
|
||||
// be achieved and disks have too many inconsistent versions.
|
||||
func reduceCommonVersions(diskVersions []uint64, writeQuorum int) (commonVersions uint64) {
|
||||
diskVersionsCount := make(map[uint64]int)
|
||||
for _, versions := range diskVersions {
|
||||
diskVersionsCount[versions]++
|
||||
}
|
||||
|
||||
max := 0
|
||||
for versions, count := range diskVersionsCount {
|
||||
if max < count {
|
||||
max = count
|
||||
commonVersions = versions
|
||||
}
|
||||
}
|
||||
|
||||
if max >= writeQuorum {
|
||||
return commonVersions
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
// Returns number of errors that occurred the most (incl. nil) and the
|
||||
// corresponding error value. NB When there is more than one error value that
|
||||
// occurs maximum number of times, the error value returned depends on how
|
||||
|
@ -712,6 +712,8 @@ func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry str
|
||||
for index := range disks {
|
||||
metadata[index].SetTierFreeVersionID(fvID)
|
||||
}
|
||||
|
||||
diskVersions := make([]uint64, len(disks))
|
||||
// Rename file on all underlying storage disks.
|
||||
for index := range disks {
|
||||
index := index
|
||||
@ -719,6 +721,7 @@ func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry str
|
||||
if disks[index] == nil {
|
||||
return errDiskNotFound
|
||||
}
|
||||
|
||||
// Pick one FileInfo for a disk at index.
|
||||
fi := metadata[index]
|
||||
// Assign index when index is initialized
|
||||
@ -726,16 +729,36 @@ func renameData(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry str
|
||||
fi.Erasure.Index = index + 1
|
||||
}
|
||||
|
||||
if fi.IsValid() {
|
||||
return disks[index].RenameData(ctx, srcBucket, srcEntry, fi, dstBucket, dstEntry)
|
||||
if !fi.IsValid() {
|
||||
return errFileCorrupt
|
||||
}
|
||||
return errFileCorrupt
|
||||
sign, err := disks[index].RenameData(ctx, srcBucket, srcEntry, fi, dstBucket, dstEntry)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
diskVersions[index] = sign
|
||||
return nil
|
||||
}, index)
|
||||
}
|
||||
|
||||
// Wait for all renames to finish.
|
||||
errs := g.Wait()
|
||||
|
||||
versions := reduceCommonVersions(diskVersions, writeQuorum)
|
||||
if versions == 0 {
|
||||
err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||
if err == nil {
|
||||
err = errErasureWriteQuorum
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for index, dversions := range diskVersions {
|
||||
if versions != dversions {
|
||||
errs[index] = errFileCorrupt
|
||||
}
|
||||
}
|
||||
|
||||
// We can safely allow RenameData errors up to len(er.getDisks()) - writeQuorum
|
||||
// otherwise return failure. Cleanup successful renames.
|
||||
err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||
|
@ -200,9 +200,9 @@ func (d *naughtyDisk) AppendFile(ctx context.Context, volume string, path string
|
||||
return d.disk.AppendFile(ctx, volume, path, buf)
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) error {
|
||||
func (d *naughtyDisk) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) (uint64, error) {
|
||||
if err := d.calcError(); err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
return d.disk.RenameData(ctx, srcVolume, srcPath, fi, dstVolume, dstPath)
|
||||
}
|
||||
|
@ -85,7 +85,7 @@ type StorageAPI interface {
|
||||
UpdateMetadata(ctx context.Context, volume, path string, fi FileInfo) error
|
||||
ReadVersion(ctx context.Context, volume, path, versionID string, readData bool) (FileInfo, error)
|
||||
ReadXL(ctx context.Context, volume, path string, readData bool) (RawFileInfo, error)
|
||||
RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) error
|
||||
RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) (uint64, error)
|
||||
|
||||
// File operations.
|
||||
ListDir(ctx context.Context, volume, dirPath string, count int) ([]string, error)
|
||||
@ -216,8 +216,8 @@ func (p *unrecognizedDisk) RenameFile(ctx context.Context, srcVolume, srcPath, d
|
||||
return errDiskNotFound
|
||||
}
|
||||
|
||||
func (p *unrecognizedDisk) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) error {
|
||||
return errDiskNotFound
|
||||
func (p *unrecognizedDisk) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) (uint64, error) {
|
||||
return 0, errDiskNotFound
|
||||
}
|
||||
|
||||
func (p *unrecognizedDisk) CheckParts(ctx context.Context, volume string, path string, fi FileInfo) (err error) {
|
||||
|
@ -446,7 +446,7 @@ func (client *storageRESTClient) CheckParts(ctx context.Context, volume string,
|
||||
}
|
||||
|
||||
// RenameData - rename source path to destination path atomically, metadata and data file.
|
||||
func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) (err error) {
|
||||
func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) (sign uint64, err error) {
|
||||
values := make(url.Values)
|
||||
values.Set(storageRESTSrcVolume, srcVolume)
|
||||
values.Set(storageRESTSrcPath, srcPath)
|
||||
@ -455,13 +455,26 @@ func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcP
|
||||
|
||||
var reader bytes.Buffer
|
||||
if err = msgp.Encode(&reader, &fi); err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
|
||||
respBody, err := client.call(ctx, storageRESTMethodRenameData, values, &reader, -1)
|
||||
defer xhttp.DrainBody(respBody)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return err
|
||||
respReader, err := waitForHTTPResponse(respBody)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
resp := &RenameDataResp{}
|
||||
if err = gob.NewDecoder(respReader).Decode(resp); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return resp.Signature, toStorageErr(resp.Err)
|
||||
}
|
||||
|
||||
// where we keep old *Readers
|
||||
|
@ -18,7 +18,7 @@
|
||||
package cmd
|
||||
|
||||
const (
|
||||
storageRESTVersion = "v48" // Added Checksums
|
||||
storageRESTVersion = "v49" // Added RenameData() to return versions
|
||||
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
|
||||
storageRESTPrefix = minioReservedBucketPath + "/storage"
|
||||
)
|
||||
|
@ -710,6 +710,12 @@ func (s *storageRESTServer) DeleteVersionsHandler(w http.ResponseWriter, r *http
|
||||
encoder.Encode(dErrsResp)
|
||||
}
|
||||
|
||||
// RenameDataResp - RenameData()'s response.
|
||||
type RenameDataResp struct {
|
||||
Signature uint64
|
||||
Err error
|
||||
}
|
||||
|
||||
// RenameDataHandler - renames a meta object and data dir to destination.
|
||||
func (s *storageRESTServer) RenameDataHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
@ -732,10 +738,20 @@ func (s *storageRESTServer) RenameDataHandler(w http.ResponseWriter, r *http.Req
|
||||
return
|
||||
}
|
||||
|
||||
err := s.storage.RenameData(r.Context(), srcVolume, srcFilePath, fi, dstVolume, dstFilePath)
|
||||
if err != nil {
|
||||
s.writeErrorResponse(w, err)
|
||||
setEventStreamHeaders(w)
|
||||
encoder := gob.NewEncoder(w)
|
||||
done := keepHTTPResponseAlive(w)
|
||||
|
||||
sign, err := s.storage.RenameData(r.Context(), srcVolume, srcFilePath, fi, dstVolume, dstFilePath)
|
||||
done(nil)
|
||||
|
||||
resp := &RenameDataResp{
|
||||
Signature: sign,
|
||||
}
|
||||
if err != nil {
|
||||
resp.Err = StorageErr(err.Error())
|
||||
}
|
||||
encoder.Encode(resp)
|
||||
}
|
||||
|
||||
// RenameFileHandler - rename a file.
|
||||
|
@ -363,10 +363,10 @@ func (p *xlStorageDiskIDCheck) RenameFile(ctx context.Context, srcVolume, srcPat
|
||||
return p.storage.RenameFile(ctx, srcVolume, srcPath, dstVolume, dstPath)
|
||||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) (err error) {
|
||||
func (p *xlStorageDiskIDCheck) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) (sign uint64, err error) {
|
||||
ctx, done, err := p.TrackDiskHealth(ctx, storageMetricRenameData, srcPath, fi.DataDir, dstVolume, dstPath)
|
||||
if err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
defer done(&err)
|
||||
|
||||
|
@ -46,6 +46,7 @@ import (
|
||||
"github.com/minio/minio/internal/logger"
|
||||
"github.com/minio/pkg/console"
|
||||
"github.com/yargevad/filepathx"
|
||||
"github.com/zeebo/xxh3"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -2132,7 +2133,7 @@ func skipAccessChecks(volume string) (ok bool) {
|
||||
}
|
||||
|
||||
// RenameData - rename source path to destination path atomically, metadata and data directory.
|
||||
func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) (err error) {
|
||||
func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, fi FileInfo, dstVolume, dstPath string) (sign uint64, err error) {
|
||||
defer func() {
|
||||
if err != nil && !contextCanceled(ctx) && !errors.Is(err, errFileNotFound) {
|
||||
// Only log these errors if context is not yet canceled.
|
||||
@ -2150,34 +2151,34 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
||||
|
||||
srcVolumeDir, err := s.getVolDir(srcVolume)
|
||||
if err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
|
||||
dstVolumeDir, err := s.getVolDir(dstVolume)
|
||||
if err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if !skipAccessChecks(srcVolume) {
|
||||
// Stat a volume entry.
|
||||
if err = Access(srcVolumeDir); err != nil {
|
||||
if osIsNotExist(err) {
|
||||
return errVolumeNotFound
|
||||
return 0, errVolumeNotFound
|
||||
} else if isSysErrIO(err) {
|
||||
return errFaultyDisk
|
||||
return 0, errFaultyDisk
|
||||
}
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
if !skipAccessChecks(dstVolume) {
|
||||
if err = Access(dstVolumeDir); err != nil {
|
||||
if osIsNotExist(err) {
|
||||
return errVolumeNotFound
|
||||
return 0, errVolumeNotFound
|
||||
} else if isSysErrIO(err) {
|
||||
return errFaultyDisk
|
||||
return 0, errFaultyDisk
|
||||
}
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
@ -2199,11 +2200,11 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
||||
}
|
||||
|
||||
if err = checkPathLength(srcFilePath); err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if err = checkPathLength(dstFilePath); err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
|
||||
dstBuf, err := xioutil.ReadFile(dstFilePath)
|
||||
@ -2215,20 +2216,20 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
||||
if isSysErrNotDir(err) && runtime.GOOS != globalWindowsOSName {
|
||||
// NOTE: On windows the error happens at
|
||||
// next line and returns appropriate error.
|
||||
return errFileAccessDenied
|
||||
return 0, errFileAccessDenied
|
||||
}
|
||||
if !osIsNotExist(err) {
|
||||
return osErrToFileErr(err)
|
||||
return 0, osErrToFileErr(err)
|
||||
}
|
||||
// errFileNotFound comes here.
|
||||
err = s.renameLegacyMetadata(dstVolumeDir, dstPath)
|
||||
if err != nil && err != errFileNotFound {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
if err == nil {
|
||||
dstBuf, err = xioutil.ReadFile(dstFilePath)
|
||||
if err != nil && !osIsNotExist(err) {
|
||||
return osErrToFileErr(err)
|
||||
return 0, osErrToFileErr(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2272,7 +2273,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
||||
currentDataPath := pathJoin(dstVolumeDir, dstPath)
|
||||
entries, err := readDirN(currentDataPath, 1)
|
||||
if err != nil && err != errFileNotFound {
|
||||
return osErrToFileErr(err)
|
||||
return 0, osErrToFileErr(err)
|
||||
}
|
||||
for _, entry := range entries {
|
||||
if entry == xlStorageFormatFile || strings.HasSuffix(entry, slashSeparator) {
|
||||
@ -2292,14 +2293,14 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
||||
currentDataPath := pathJoin(dstVolumeDir, dstPath)
|
||||
entries, err := readDir(currentDataPath)
|
||||
if err != nil {
|
||||
return osErrToFileErr(err)
|
||||
return 0, osErrToFileErr(err)
|
||||
}
|
||||
|
||||
// legacy data dir means its old content, honor system umask.
|
||||
if err = mkdirAll(legacyDataPath, 0o777); err != nil {
|
||||
// any failed mkdir-calls delete them.
|
||||
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
|
||||
return osErrToFileErr(err)
|
||||
return 0, osErrToFileErr(err)
|
||||
}
|
||||
|
||||
for _, entry := range entries {
|
||||
@ -2312,7 +2313,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
||||
// Any failed rename calls un-roll previous transaction.
|
||||
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
|
||||
|
||||
return osErrToFileErr(err)
|
||||
return 0, osErrToFileErr(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2357,9 +2358,15 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
||||
// Any failed rename calls un-roll previous transaction.
|
||||
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
|
||||
}
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var sbuf bytes.Buffer
|
||||
for _, ver := range xlMeta.versions {
|
||||
sbuf.Write(ver.header.Signature[:])
|
||||
}
|
||||
sign = xxh3.Hash(sbuf.Bytes())
|
||||
|
||||
dstBuf, err = xlMeta.AppendTo(metaDataPoolGet())
|
||||
defer metaDataPoolPut(dstBuf)
|
||||
if err != nil {
|
||||
@ -2368,7 +2375,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
||||
// Any failed rename calls un-roll previous transaction.
|
||||
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
|
||||
}
|
||||
return errFileCorrupt
|
||||
return 0, errFileCorrupt
|
||||
}
|
||||
|
||||
if srcDataPath != "" {
|
||||
@ -2377,7 +2384,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
||||
// Any failed rename calls un-roll previous transaction.
|
||||
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
|
||||
}
|
||||
return osErrToFileErr(err)
|
||||
return 0, osErrToFileErr(err)
|
||||
}
|
||||
diskHealthCheckOK(ctx, err)
|
||||
|
||||
@ -2396,7 +2403,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
||||
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
|
||||
}
|
||||
s.deleteFile(dstVolumeDir, dstDataPath, false, false)
|
||||
return osErrToFileErr(err)
|
||||
return 0, osErrToFileErr(err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -2407,7 +2414,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
||||
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
|
||||
}
|
||||
s.deleteFile(dstVolumeDir, dstFilePath, false, false)
|
||||
return osErrToFileErr(err)
|
||||
return 0, osErrToFileErr(err)
|
||||
}
|
||||
|
||||
// additionally only purge older data at the end of the transaction of new data-dir
|
||||
@ -2424,7 +2431,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
||||
s.deleteFile(dstVolumeDir, legacyDataPath, true, false)
|
||||
}
|
||||
s.deleteFile(dstVolumeDir, dstFilePath, false, false)
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
@ -2433,7 +2440,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
||||
// ideally all transaction should be complete.
|
||||
|
||||
Remove(pathutil.Dir(srcFilePath))
|
||||
return nil
|
||||
return sign, nil
|
||||
}
|
||||
|
||||
// RenameFile - rename source path to destination path atomically.
|
||||
|
2
go.mod
2
go.mod
@ -79,7 +79,7 @@ require (
|
||||
github.com/valyala/bytebufferpool v1.0.0
|
||||
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
|
||||
github.com/yargevad/filepathx v1.0.0
|
||||
github.com/zeebo/xxh3 v1.0.0
|
||||
github.com/zeebo/xxh3 v1.0.2
|
||||
go.etcd.io/etcd/api/v3 v3.5.4
|
||||
go.etcd.io/etcd/client/v3 v3.5.4
|
||||
go.uber.org/atomic v1.9.0
|
||||
|
5
go.sum
5
go.sum
@ -901,8 +901,9 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec
|
||||
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
|
||||
github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg=
|
||||
github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
|
||||
github.com/zeebo/xxh3 v1.0.0 h1:6eLPZCVXpsGnhv8RiWBEJs5kenm2W1CMwon19/l8ODc=
|
||||
github.com/zeebo/xxh3 v1.0.0/go.mod h1:8VHV24/3AZLn3b6Mlp/KuC33LWH687Wq6EnziEB+rsA=
|
||||
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
|
||||
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
|
||||
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
|
||||
go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=
|
||||
go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
|
||||
go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs=
|
||||
|
Loading…
x
Reference in New Issue
Block a user