mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
allow preserving legacyXLv1 with inline data format (#11951)
current master breaks this important requirement we need to preserve legacyXLv1 format, this is simply ignored and overwritten causing a myriad of issues by leaving stale files on the namespace etc. for now lets still use the two-phase approach of writing to `tmp` and then renaming the content to the actual namespace.
This commit is contained in:
parent
204c610d84
commit
434e5c0cfe
10
.github/workflows/go.yml
vendored
10
.github/workflows/go.yml
vendored
@ -12,7 +12,7 @@ jobs:
|
||||
strategy:
|
||||
matrix:
|
||||
go-version: [1.16.x]
|
||||
os: [ubuntu-latest, windows-latest, macos-latest]
|
||||
os: [ubuntu-latest, windows-latest]
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions/setup-node@v1
|
||||
@ -21,14 +21,6 @@ jobs:
|
||||
- uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: ${{ matrix.go-version }}
|
||||
- name: Build on ${{ matrix.os }}
|
||||
if: matrix.os == 'macos-latest'
|
||||
env:
|
||||
CGO_ENABLED: 0
|
||||
GO111MODULE: on
|
||||
run: |
|
||||
make
|
||||
make test-race
|
||||
- name: Build on ${{ matrix.os }}
|
||||
if: matrix.os == 'windows-latest'
|
||||
env:
|
||||
|
@ -376,9 +376,14 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
||||
partsMetadata[i] = cleanFileInfo(latestMeta)
|
||||
}
|
||||
|
||||
dataDir := latestMeta.DataDir
|
||||
// source data dir shall be empty in case of XLV1
|
||||
// differentiate it with dstDataDir for readability
|
||||
// srcDataDir is the one used with newBitrotReader()
|
||||
// to read existing content.
|
||||
srcDataDir := latestMeta.DataDir
|
||||
dstDataDir := latestMeta.DataDir
|
||||
if latestMeta.XLV1 {
|
||||
dataDir = migrateDataDir
|
||||
dstDataDir = migrateDataDir
|
||||
}
|
||||
|
||||
var inlineBuffers []*bytes.Buffer
|
||||
@ -419,10 +424,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
||||
continue
|
||||
}
|
||||
checksumInfo := copyPartsMetadata[i].Erasure.GetChecksumInfo(partNumber)
|
||||
partPath := pathJoin(object, dataDir, fmt.Sprintf("part.%d", partNumber))
|
||||
if latestMeta.XLV1 {
|
||||
partPath = pathJoin(object, fmt.Sprintf("part.%d", partNumber))
|
||||
}
|
||||
partPath := pathJoin(object, srcDataDir, fmt.Sprintf("part.%d", partNumber))
|
||||
readers[i] = newBitrotReader(disk, partsMetadata[i].Data, bucket, partPath, tillOffset, checksumAlgo, checksumInfo.Hash, erasure.ShardSize())
|
||||
}
|
||||
writers := make([]io.Writer, len(outDatedDisks))
|
||||
@ -430,7 +432,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
||||
if disk == OfflineDisk {
|
||||
continue
|
||||
}
|
||||
partPath := pathJoin(tmpID, dataDir, fmt.Sprintf("part.%d", partNumber))
|
||||
partPath := pathJoin(tmpID, dstDataDir, fmt.Sprintf("part.%d", partNumber))
|
||||
if len(inlineBuffers) > 0 {
|
||||
inlineBuffers[i] = bytes.NewBuffer(make([]byte, 0, erasure.ShardFileSize(latestMeta.Size)))
|
||||
writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize())
|
||||
@ -460,7 +462,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
||||
continue
|
||||
}
|
||||
|
||||
partsMetadata[i].DataDir = dataDir
|
||||
partsMetadata[i].DataDir = dstDataDir
|
||||
partsMetadata[i].AddObjectPart(partNumber, "", partSize, partActualSize)
|
||||
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{
|
||||
PartNumber: partNumber,
|
||||
@ -481,25 +483,6 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
|
||||
}
|
||||
}
|
||||
|
||||
if len(inlineBuffers) > 0 {
|
||||
// Write directly...
|
||||
if outDatedDisks, err = writeUniqueFileInfo(ctx, outDatedDisks, bucket, object, partsMetadata, diskCount(outDatedDisks)); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return result, toObjectErr(err, bucket, object)
|
||||
}
|
||||
result.ObjectSize = latestMeta.Size
|
||||
for _, disk := range outDatedDisks {
|
||||
if disk == OfflineDisk {
|
||||
continue
|
||||
}
|
||||
for i, v := range result.Before.Drives {
|
||||
if v.Endpoint == disk.String() {
|
||||
result.After.Drives[i].State = madmin.DriveStateOk
|
||||
}
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
defer er.deleteObject(context.Background(), minioMetaTmpBucket, tmpID, len(storageDisks)/2+1)
|
||||
|
||||
// Generate and write `xl.meta` generated from other disks.
|
||||
|
@ -279,6 +279,37 @@ func pickValidFileInfo(ctx context.Context, metaArr []FileInfo, modTime time.Tim
|
||||
return findFileInfoInQuorum(ctx, metaArr, modTime, quorum)
|
||||
}
|
||||
|
||||
// Rename metadata content to destination location for each disk concurrently.
|
||||
func renameFileInfo(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, quorum int) ([]StorageAPI, error) {
|
||||
ignoredErr := []error{errFileNotFound}
|
||||
|
||||
g := errgroup.WithNErrs(len(disks))
|
||||
|
||||
// Rename file on all underlying storage disks.
|
||||
for index := range disks {
|
||||
index := index
|
||||
g.Go(func() error {
|
||||
if disks[index] == nil {
|
||||
return errDiskNotFound
|
||||
}
|
||||
if err := disks[index].RenameData(ctx, srcBucket, srcEntry, "", dstBucket, dstEntry); err != nil {
|
||||
if !IsErrIgnored(err, ignoredErr...) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}, index)
|
||||
}
|
||||
|
||||
// Wait for all renames to finish.
|
||||
errs := g.Wait()
|
||||
|
||||
// We can safely allow RenameData errors up to len(er.getDisks()) - writeQuorum
|
||||
// otherwise return failure. Cleanup successful renames.
|
||||
err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, quorum)
|
||||
return evalDisks(disks, errs), err
|
||||
}
|
||||
|
||||
// writeUniqueFileInfo - writes unique `xl.meta` content for each disk concurrently.
|
||||
func writeUniqueFileInfo(ctx context.Context, disks []StorageAPI, bucket, prefix string, files []FileInfo, quorum int) ([]StorageAPI, error) {
|
||||
g := errgroup.WithNErrs(len(disks))
|
||||
|
@ -124,12 +124,28 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
|
||||
}
|
||||
}
|
||||
|
||||
// Write directly...
|
||||
if _, err = writeUniqueFileInfo(ctx, onlineDisks, srcBucket, srcObject, metaArr, writeQuorum); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return ObjectInfo{}, toObjectErr(err, srcBucket, srcObject)
|
||||
tempObj := mustGetUUID()
|
||||
|
||||
var online int
|
||||
// Cleanup in case of xl.meta writing failure
|
||||
defer func() {
|
||||
if online != len(onlineDisks) {
|
||||
er.deleteObject(context.Background(), minioMetaTmpBucket, tempObj, writeQuorum)
|
||||
}
|
||||
}()
|
||||
|
||||
// Write unique `xl.meta` for each disk.
|
||||
if onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaTmpBucket, tempObj, metaArr, writeQuorum); err != nil {
|
||||
return oi, toObjectErr(err, srcBucket, srcObject)
|
||||
}
|
||||
|
||||
// Rename atomically `xl.meta` from tmp location to destination for each disk.
|
||||
if _, err = renameFileInfo(ctx, onlineDisks, minioMetaTmpBucket, tempObj, srcBucket, srcObject, writeQuorum); err != nil {
|
||||
return oi, toObjectErr(err, srcBucket, srcObject)
|
||||
}
|
||||
|
||||
online = countOnlineDisks(onlineDisks)
|
||||
|
||||
return fi.ToObjectInfo(srcBucket, srcObject), nil
|
||||
}
|
||||
|
||||
@ -806,23 +822,15 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
||||
}
|
||||
|
||||
// Write unique `xl.meta` for each disk.
|
||||
if len(inlineBuffers) == 0 {
|
||||
if onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, writeQuorum); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
if onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, writeQuorum); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Rename the successfully written temporary object to final location.
|
||||
if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaTmpBucket, tempObj, fi.DataDir, bucket, object, writeQuorum, nil); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
} else {
|
||||
// Write directly...
|
||||
if onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, bucket, object, partsMetadata, writeQuorum); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
// Rename the successfully written temporary object to final location.
|
||||
if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaTmpBucket, tempObj, fi.DataDir, bucket, object, writeQuorum, nil); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Whether a disk was initially or becomes offline
|
||||
@ -1245,12 +1253,28 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
|
||||
}
|
||||
}
|
||||
|
||||
// Write directly...
|
||||
if _, err = writeUniqueFileInfo(ctx, onlineDisks, bucket, object, metaArr, writeQuorum); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
tempObj := mustGetUUID()
|
||||
|
||||
var online int
|
||||
// Cleanup in case of xl.meta writing failure
|
||||
defer func() {
|
||||
if online != len(onlineDisks) {
|
||||
er.deleteObject(context.Background(), minioMetaTmpBucket, tempObj, writeQuorum)
|
||||
}
|
||||
}()
|
||||
|
||||
// Write unique `xl.meta` for each disk.
|
||||
if onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaTmpBucket, tempObj, metaArr, writeQuorum); err != nil {
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Atomically rename metadata from tmp location to destination for each disk.
|
||||
if onlineDisks, err = renameFileInfo(ctx, onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, writeQuorum); err != nil {
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
online = countOnlineDisks(onlineDisks)
|
||||
|
||||
objInfo := fi.ToObjectInfo(bucket, object)
|
||||
objInfo.UserTags = tags
|
||||
|
||||
@ -1302,12 +1326,28 @@ func (er erasureObjects) updateObjectMeta(ctx context.Context, bucket, object st
|
||||
metaArr[i].Metadata = fi.Metadata
|
||||
}
|
||||
|
||||
// Write directly...
|
||||
if _, err = writeUniqueFileInfo(ctx, disks, bucket, object, metaArr, writeQuorum); err != nil {
|
||||
tempObj := mustGetUUID()
|
||||
|
||||
var online int
|
||||
// Cleanup in case of xl.meta writing failure
|
||||
defer func() {
|
||||
if online != len(disks) {
|
||||
er.deleteObject(context.Background(), minioMetaTmpBucket, tempObj, writeQuorum)
|
||||
}
|
||||
}()
|
||||
|
||||
// Write unique `xl.meta` for each disk.
|
||||
if disks, err = writeUniqueFileInfo(ctx, disks, minioMetaTmpBucket, tempObj, metaArr, writeQuorum); err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Atomically rename metadata from tmp location to destination for each disk.
|
||||
if disks, err = renameFileInfo(ctx, disks, minioMetaTmpBucket, tempObj, bucket, object, writeQuorum); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
online = countOnlineDisks(disks)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -64,11 +64,14 @@ const (
|
||||
|
||||
// Detects change in underlying disk.
|
||||
type xlStorageDiskIDCheck struct {
|
||||
storage StorageAPI
|
||||
diskID string
|
||||
|
||||
apiCalls [storageMetricLast]uint64
|
||||
// fields position optimized for memory please
|
||||
// do not re-order them, if you add new fields
|
||||
// please use `fieldalignment ./...` to check
|
||||
// if your changes are not causing any problems.
|
||||
storage StorageAPI
|
||||
apiLatencies [storageMetricLast]ewma.MovingAverage
|
||||
diskID string
|
||||
apiCalls [storageMetricLast]uint64
|
||||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) getMetrics() DiskMetrics {
|
||||
|
@ -1613,9 +1613,6 @@ func (s *xlStorage) CheckParts(ctx context.Context, volume string, path string,
|
||||
|
||||
for _, part := range fi.Parts {
|
||||
partPath := pathJoin(path, fi.DataDir, fmt.Sprintf("part.%d", part.Number))
|
||||
if fi.XLV1 {
|
||||
partPath = pathJoin(path, fmt.Sprintf("part.%d", part.Number))
|
||||
}
|
||||
filePath := pathJoin(volumeDir, partPath)
|
||||
if err = checkPathLength(filePath); err != nil {
|
||||
return err
|
||||
@ -1778,10 +1775,6 @@ func (s *xlStorage) Delete(ctx context.Context, volume string, path string, recu
|
||||
|
||||
// RenameData - rename source path to destination path atomically, metadata and data directory.
|
||||
func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir, dstVolume, dstPath string) (err error) {
|
||||
if dataDir == "" {
|
||||
return errInvalidArgument
|
||||
}
|
||||
|
||||
srcVolumeDir, err := s.getVolDir(srcVolume)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -1814,11 +1807,15 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir,
|
||||
srcFilePath := pathutil.Join(srcVolumeDir, pathJoin(srcPath, xlStorageFormatFile))
|
||||
dstFilePath := pathutil.Join(dstVolumeDir, pathJoin(dstPath, xlStorageFormatFile))
|
||||
|
||||
srcDataPath := retainSlash(pathJoin(srcVolumeDir, srcPath, dataDir))
|
||||
// make sure to always use path.Join here, do not use pathJoin as
|
||||
// it would additionally add `/` at the end and it comes in the
|
||||
// way of renameAll(), parentDir creation.
|
||||
dstDataPath := pathutil.Join(dstVolumeDir, dstPath, dataDir)
|
||||
var srcDataPath string
|
||||
var dstDataPath string
|
||||
if dataDir != "" {
|
||||
srcDataPath = retainSlash(pathJoin(srcVolumeDir, srcPath, dataDir))
|
||||
// make sure to always use path.Join here, do not use pathJoin as
|
||||
// it would additionally add `/` at the end and it comes in the
|
||||
// way of renameAll(), parentDir creation.
|
||||
dstDataPath = pathutil.Join(dstVolumeDir, dstPath, dataDir)
|
||||
}
|
||||
|
||||
if err = checkPathLength(srcFilePath); err != nil {
|
||||
return err
|
||||
@ -1994,22 +1991,33 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir,
|
||||
return errFileCorrupt
|
||||
}
|
||||
|
||||
if err = s.WriteAll(ctx, srcVolume, pathJoin(srcPath, xlStorageFormatFile), dstBuf); err != nil {
|
||||
return err
|
||||
}
|
||||
if srcDataPath != "" {
|
||||
if err = s.WriteAll(ctx, srcVolume, pathJoin(srcPath, xlStorageFormatFile), dstBuf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tmpuuid := mustGetUUID()
|
||||
renameAll(oldDstDataPath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, tmpuuid))
|
||||
tmpuuid = mustGetUUID()
|
||||
renameAll(dstDataPath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, tmpuuid))
|
||||
if err = renameAll(srcDataPath, dstDataPath); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return osErrToFileErr(err)
|
||||
}
|
||||
if oldDstDataPath != "" {
|
||||
renameAll(oldDstDataPath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, mustGetUUID()))
|
||||
}
|
||||
renameAll(dstDataPath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, mustGetUUID()))
|
||||
|
||||
// Commit meta-file
|
||||
if err = renameAll(srcFilePath, dstFilePath); err != nil {
|
||||
return osErrToFileErr(err)
|
||||
// renameAll only for objects that have xl.meta not saved inline.
|
||||
if len(fi.Data) == 0 && fi.Size > 0 {
|
||||
if err = renameAll(srcDataPath, dstDataPath); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return osErrToFileErr(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Commit meta-file
|
||||
if err = renameAll(srcFilePath, dstFilePath); err != nil {
|
||||
return osErrToFileErr(err)
|
||||
}
|
||||
} else {
|
||||
// Write meta-file directly, no data
|
||||
if err = s.WriteAll(ctx, dstVolume, pathJoin(dstPath, xlStorageFormatFile), dstBuf); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Remove parent dir of the source file if empty
|
||||
@ -2137,9 +2145,6 @@ func (s *xlStorage) VerifyFile(ctx context.Context, volume, path string, fi File
|
||||
for _, part := range fi.Parts {
|
||||
checksumInfo := erasure.GetChecksumInfo(part.Number)
|
||||
partPath := pathJoin(volumeDir, path, fi.DataDir, fmt.Sprintf("part.%d", part.Number))
|
||||
if fi.XLV1 {
|
||||
partPath = pathJoin(volumeDir, path, fmt.Sprintf("part.%d", part.Number))
|
||||
}
|
||||
if err := s.bitrotVerify(partPath,
|
||||
erasure.ShardFileSize(part.Size),
|
||||
checksumInfo.Algorithm,
|
||||
|
Loading…
Reference in New Issue
Block a user