mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
do not use dataDir to reference inline data use versionID (#11942)
versionID is the one that needs to be preserved and as well as overwritten in case of replication, transition etc - dataDir is an ephemeral entity that changes during overwrites - make sure that versionID is used to save the object content. this would break things if you are already running the latest master, please wipe your current content and re-do your setup after this change.
This commit is contained in:
parent
f966fbc4a3
commit
204c610d84
@ -279,37 +279,6 @@ func pickValidFileInfo(ctx context.Context, metaArr []FileInfo, modTime time.Tim
|
||||
return findFileInfoInQuorum(ctx, metaArr, modTime, quorum)
|
||||
}
|
||||
|
||||
// Rename metadata content to destination location for each disk concurrently.
|
||||
func renameFileInfo(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBucket, dstEntry string, quorum int) ([]StorageAPI, error) {
|
||||
ignoredErr := []error{errFileNotFound}
|
||||
|
||||
g := errgroup.WithNErrs(len(disks))
|
||||
|
||||
// Rename file on all underlying storage disks.
|
||||
for index := range disks {
|
||||
index := index
|
||||
g.Go(func() error {
|
||||
if disks[index] == nil {
|
||||
return errDiskNotFound
|
||||
}
|
||||
if err := disks[index].RenameData(ctx, srcBucket, srcEntry, "", dstBucket, dstEntry); err != nil {
|
||||
if !IsErrIgnored(err, ignoredErr...) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}, index)
|
||||
}
|
||||
|
||||
// Wait for all renames to finish.
|
||||
errs := g.Wait()
|
||||
|
||||
// We can safely allow RenameData errors up to len(er.getDisks()) - writeQuorum
|
||||
// otherwise return failure. Cleanup successful renames.
|
||||
err := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, quorum)
|
||||
return evalDisks(disks, errs), err
|
||||
}
|
||||
|
||||
// writeUniqueFileInfo - writes unique `xl.meta` content for each disk concurrently.
|
||||
func writeUniqueFileInfo(ctx context.Context, disks []StorageAPI, bucket, prefix string, files []FileInfo, quorum int) ([]StorageAPI, error) {
|
||||
g := errgroup.WithNErrs(len(disks))
|
||||
|
@ -124,28 +124,12 @@ func (er erasureObjects) CopyObject(ctx context.Context, srcBucket, srcObject, d
|
||||
}
|
||||
}
|
||||
|
||||
tempObj := mustGetUUID()
|
||||
|
||||
var online int
|
||||
// Cleanup in case of xl.meta writing failure
|
||||
defer func() {
|
||||
if online != len(onlineDisks) {
|
||||
er.deleteObject(context.Background(), minioMetaTmpBucket, tempObj, writeQuorum)
|
||||
}
|
||||
}()
|
||||
|
||||
// Write unique `xl.meta` for each disk.
|
||||
if onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaTmpBucket, tempObj, metaArr, writeQuorum); err != nil {
|
||||
return oi, toObjectErr(err, srcBucket, srcObject)
|
||||
// Write directly...
|
||||
if _, err = writeUniqueFileInfo(ctx, onlineDisks, srcBucket, srcObject, metaArr, writeQuorum); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return ObjectInfo{}, toObjectErr(err, srcBucket, srcObject)
|
||||
}
|
||||
|
||||
// Rename atomically `xl.meta` from tmp location to destination for each disk.
|
||||
if _, err = renameFileInfo(ctx, onlineDisks, minioMetaTmpBucket, tempObj, srcBucket, srcObject, writeQuorum); err != nil {
|
||||
return oi, toObjectErr(err, srcBucket, srcObject)
|
||||
}
|
||||
|
||||
online = countOnlineDisks(onlineDisks)
|
||||
|
||||
return fi.ToObjectInfo(srcBucket, srcObject), nil
|
||||
}
|
||||
|
||||
@ -1261,15 +1245,9 @@ func (er erasureObjects) PutObjectTags(ctx context.Context, bucket, object strin
|
||||
}
|
||||
}
|
||||
|
||||
tempObj := mustGetUUID()
|
||||
|
||||
// Write unique `xl.meta` for each disk.
|
||||
if onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaTmpBucket, tempObj, metaArr, writeQuorum); err != nil {
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Atomically rename metadata from tmp location to destination for each disk.
|
||||
if _, err = renameFileInfo(ctx, onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, writeQuorum); err != nil {
|
||||
// Write directly...
|
||||
if _, err = writeUniqueFileInfo(ctx, onlineDisks, bucket, object, metaArr, writeQuorum); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
@ -1324,15 +1302,9 @@ func (er erasureObjects) updateObjectMeta(ctx context.Context, bucket, object st
|
||||
metaArr[i].Metadata = fi.Metadata
|
||||
}
|
||||
|
||||
tempObj := mustGetUUID()
|
||||
|
||||
// Write unique `xl.meta` for each disk.
|
||||
if disks, err = writeUniqueFileInfo(ctx, disks, minioMetaTmpBucket, tempObj, metaArr, writeQuorum); err != nil {
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
// Atomically rename metadata from tmp location to destination for each disk.
|
||||
if _, err = renameFileInfo(ctx, disks, minioMetaTmpBucket, tempObj, bucket, object, writeQuorum); err != nil {
|
||||
// Write directly...
|
||||
if _, err = writeUniqueFileInfo(ctx, disks, bucket, object, metaArr, writeQuorum); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return toObjectErr(err, bucket, object)
|
||||
}
|
||||
|
||||
|
@ -83,7 +83,17 @@ func getFileInfo(xlMetaBuf []byte, volume, path, versionID string, data bool) (F
|
||||
if !data || err != nil {
|
||||
return fi, err
|
||||
}
|
||||
fi.Data = xlMeta.data.find(fi.DataDir)
|
||||
versionID := fi.VersionID
|
||||
if versionID == "" {
|
||||
versionID = nullVersionID
|
||||
}
|
||||
fi.Data = xlMeta.data.find(versionID)
|
||||
if len(fi.Data) == 0 {
|
||||
// PR #11758 used DataDir, preserve it
|
||||
// for users who might have used master
|
||||
// branch
|
||||
fi.Data = xlMeta.data.find(fi.DataDir)
|
||||
}
|
||||
return fi, nil
|
||||
}
|
||||
|
||||
|
@ -702,9 +702,10 @@ func (z *xlMetaV2) AddVersion(fi FileInfo) error {
|
||||
ventry.ObjectV2.MetaUser[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
// If asked to save data.
|
||||
if len(fi.Data) > 0 || fi.Size == 0 {
|
||||
z.data.replace(dd.String(), fi.Data)
|
||||
z.data.replace(fi.VersionID, fi.Data)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -88,50 +88,50 @@ func TestXLV2FormatData(t *testing.T) {
|
||||
t.Fatalf("want 1 entry, got %d", len(list))
|
||||
}
|
||||
|
||||
if !bytes.Equal(xl2.data.find("bffea160-ca7f-465f-98bc-9b4f1c3ba1ef"), data) {
|
||||
t.Fatal("Find data returned", xl2.data.find("bffea160-ca7f-465f-98bc-9b4f1c3ba1ef"))
|
||||
if !bytes.Equal(xl2.data.find("756100c6-b393-4981-928a-d49bbc164741"), data) {
|
||||
t.Fatal("Find data returned", xl2.data.find("756100c6-b393-4981-928a-d49bbc164741"))
|
||||
}
|
||||
if !bytes.Equal(xl2.data.find(fi.DataDir), data2) {
|
||||
t.Fatal("Find data returned", xl2.data.find(fi.DataDir))
|
||||
if !bytes.Equal(xl2.data.find(fi.VersionID), data2) {
|
||||
t.Fatal("Find data returned", xl2.data.find(fi.VersionID))
|
||||
}
|
||||
|
||||
// Remove entry
|
||||
xl2.data.remove(fi.DataDir)
|
||||
xl2.data.remove(fi.VersionID)
|
||||
failOnErr(xl2.data.validate())
|
||||
if xl2.data.find(fi.DataDir) != nil {
|
||||
t.Fatal("Data was not removed:", xl2.data.find(fi.DataDir))
|
||||
if xl2.data.find(fi.VersionID) != nil {
|
||||
t.Fatal("Data was not removed:", xl2.data.find(fi.VersionID))
|
||||
}
|
||||
if xl2.data.entries() != 1 {
|
||||
t.Fatal("want 1 entry, got", xl2.data.entries())
|
||||
}
|
||||
// Re-add
|
||||
xl2.data.replace(fi.DataDir, fi.Data)
|
||||
xl2.data.replace(fi.VersionID, fi.Data)
|
||||
failOnErr(xl2.data.validate())
|
||||
if xl2.data.entries() != 2 {
|
||||
t.Fatal("want 2 entries, got", xl2.data.entries())
|
||||
}
|
||||
|
||||
// Replace entry
|
||||
xl2.data.replace("bffea160-ca7f-465f-98bc-9b4f1c3ba1ef", data2)
|
||||
xl2.data.replace("756100c6-b393-4981-928a-d49bbc164741", data2)
|
||||
failOnErr(xl2.data.validate())
|
||||
if xl2.data.entries() != 2 {
|
||||
t.Fatal("want 2 entries, got", xl2.data.entries())
|
||||
}
|
||||
if !bytes.Equal(xl2.data.find("bffea160-ca7f-465f-98bc-9b4f1c3ba1ef"), data2) {
|
||||
t.Fatal("Find data returned", xl2.data.find("bffea160-ca7f-465f-98bc-9b4f1c3ba1ef"))
|
||||
if !bytes.Equal(xl2.data.find("756100c6-b393-4981-928a-d49bbc164741"), data2) {
|
||||
t.Fatal("Find data returned", xl2.data.find("756100c6-b393-4981-928a-d49bbc164741"))
|
||||
}
|
||||
|
||||
if !xl2.data.rename("bffea160-ca7f-465f-98bc-9b4f1c3ba1ef", "new-key") {
|
||||
if !xl2.data.rename("756100c6-b393-4981-928a-d49bbc164741", "new-key") {
|
||||
t.Fatal("old key was not found")
|
||||
}
|
||||
failOnErr(xl2.data.validate())
|
||||
if !bytes.Equal(xl2.data.find("new-key"), data2) {
|
||||
t.Fatal("Find data returned", xl2.data.find("bffea160-ca7f-465f-98bc-9b4f1c3ba1ef"))
|
||||
t.Fatal("Find data returned", xl2.data.find("756100c6-b393-4981-928a-d49bbc164741"))
|
||||
}
|
||||
if xl2.data.entries() != 2 {
|
||||
t.Fatal("want 2 entries, got", xl2.data.entries())
|
||||
}
|
||||
if !bytes.Equal(xl2.data.find(fi.DataDir), data2) {
|
||||
if !bytes.Equal(xl2.data.find(fi.VersionID), data2) {
|
||||
t.Fatal("Find data returned", xl2.data.find(fi.DataDir))
|
||||
}
|
||||
|
||||
|
@ -850,7 +850,16 @@ func (s *xlStorage) DeleteVersion(ctx context.Context, volume, path string, fi F
|
||||
// api call to mark object as deleted. When object is pending transition,
|
||||
// just update the metadata and avoid deleting data dir.
|
||||
if dataDir != "" && fi.TransitionStatus != lifecycle.TransitionPending {
|
||||
versionID := fi.VersionID
|
||||
if versionID == "" {
|
||||
versionID = nullVersionID
|
||||
}
|
||||
xlMeta.data.remove(versionID)
|
||||
// PR #11758 used DataDir, preserve it
|
||||
// for users who might have used master
|
||||
// branch
|
||||
xlMeta.data.remove(dataDir)
|
||||
|
||||
filePath := pathJoin(volumeDir, path, dataDir)
|
||||
if err = checkPathLength(filePath); err != nil {
|
||||
return err
|
||||
@ -1769,6 +1778,10 @@ func (s *xlStorage) Delete(ctx context.Context, volume string, path string, recu
|
||||
|
||||
// RenameData - rename source path to destination path atomically, metadata and data directory.
|
||||
func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir, dstVolume, dstPath string) (err error) {
|
||||
if dataDir == "" {
|
||||
return errInvalidArgument
|
||||
}
|
||||
|
||||
srcVolumeDir, err := s.getVolDir(srcVolume)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -1801,15 +1814,11 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir,
|
||||
srcFilePath := pathutil.Join(srcVolumeDir, pathJoin(srcPath, xlStorageFormatFile))
|
||||
dstFilePath := pathutil.Join(dstVolumeDir, pathJoin(dstPath, xlStorageFormatFile))
|
||||
|
||||
var srcDataPath string
|
||||
var dstDataPath string
|
||||
if dataDir != "" {
|
||||
srcDataPath = retainSlash(pathJoin(srcVolumeDir, srcPath, dataDir))
|
||||
// make sure to always use path.Join here, do not use pathJoin as
|
||||
// it would additionally add `/` at the end and it comes in the
|
||||
// way of renameAll(), parentDir creation.
|
||||
dstDataPath = pathutil.Join(dstVolumeDir, dstPath, dataDir)
|
||||
}
|
||||
srcDataPath := retainSlash(pathJoin(srcVolumeDir, srcPath, dataDir))
|
||||
// make sure to always use path.Join here, do not use pathJoin as
|
||||
// it would additionally add `/` at the end and it comes in the
|
||||
// way of renameAll(), parentDir creation.
|
||||
dstDataPath := pathutil.Join(dstVolumeDir, dstPath, dataDir)
|
||||
|
||||
if err = checkPathLength(srcFilePath); err != nil {
|
||||
return err
|
||||
@ -1985,30 +1994,22 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath, dataDir,
|
||||
return errFileCorrupt
|
||||
}
|
||||
|
||||
// Commit data, if any
|
||||
if srcDataPath != "" {
|
||||
if err = s.WriteAll(ctx, srcVolume, pathJoin(srcPath, xlStorageFormatFile), dstBuf); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = s.WriteAll(ctx, srcVolume, pathJoin(srcPath, xlStorageFormatFile), dstBuf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tmpuuid := mustGetUUID()
|
||||
renameAll(oldDstDataPath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, tmpuuid))
|
||||
tmpuuid = mustGetUUID()
|
||||
renameAll(dstDataPath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, tmpuuid))
|
||||
if err = renameAll(srcDataPath, dstDataPath); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return osErrToFileErr(err)
|
||||
}
|
||||
tmpuuid := mustGetUUID()
|
||||
renameAll(oldDstDataPath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, tmpuuid))
|
||||
tmpuuid = mustGetUUID()
|
||||
renameAll(dstDataPath, pathutil.Join(s.diskPath, minioMetaTmpDeletedBucket, tmpuuid))
|
||||
if err = renameAll(srcDataPath, dstDataPath); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return osErrToFileErr(err)
|
||||
}
|
||||
|
||||
// Commit meta-file
|
||||
if err = renameAll(srcFilePath, dstFilePath); err != nil {
|
||||
return osErrToFileErr(err)
|
||||
}
|
||||
} else {
|
||||
// Write meta-file directly, no data
|
||||
if err = s.WriteAll(ctx, dstVolume, pathJoin(dstPath, xlStorageFormatFile), dstBuf); err != nil {
|
||||
return err
|
||||
}
|
||||
// Commit meta-file
|
||||
if err = renameAll(srcFilePath, dstFilePath); err != nil {
|
||||
return osErrToFileErr(err)
|
||||
}
|
||||
|
||||
// Remove parent dir of the source file if empty
|
||||
|
Loading…
Reference in New Issue
Block a user