mirror of
https://github.com/minio/minio.git
synced 2025-04-04 03:40:30 -04:00
xl: Disable rename2 in decommissioning/rebalance (#18964)
Always disable rename2 optimization in decom/rebalance
This commit is contained in:
parent
960d604013
commit
6ae97aedc9
@ -178,7 +178,10 @@ func shouldHealObjectOnDisk(erErr, dataErr error, meta FileInfo, latestMeta File
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
const xMinIOHealing = ReservedMetadataPrefix + "healing"
|
const (
|
||||||
|
xMinIOHealing = ReservedMetadataPrefix + "healing"
|
||||||
|
xMinIODataMov = ReservedMetadataPrefix + "data-mov"
|
||||||
|
)
|
||||||
|
|
||||||
// SetHealing marks object (version) as being healed.
|
// SetHealing marks object (version) as being healed.
|
||||||
// Note: this is to be used only from healObject
|
// Note: this is to be used only from healObject
|
||||||
@ -196,6 +199,21 @@ func (fi FileInfo) Healing() bool {
|
|||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetDataMov marks object (version) as being currently
|
||||||
|
// in movement, such as decommissioning or rebalance.
|
||||||
|
func (fi *FileInfo) SetDataMov() {
|
||||||
|
if fi.Metadata == nil {
|
||||||
|
fi.Metadata = make(map[string]string)
|
||||||
|
}
|
||||||
|
fi.Metadata[xMinIODataMov] = "true"
|
||||||
|
}
|
||||||
|
|
||||||
|
// DataMov returns true if object is being in movement
|
||||||
|
func (fi FileInfo) DataMov() bool {
|
||||||
|
_, ok := fi.Metadata[xMinIODataMov]
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
// Heals an object by re-writing corrupt/missing erasure blocks.
|
// Heals an object by re-writing corrupt/missing erasure blocks.
|
||||||
func (er *erasureObjects) healObject(ctx context.Context, bucket string, object string, versionID string, opts madmin.HealOpts) (result madmin.HealResultItem, err error) {
|
func (er *erasureObjects) healObject(ctx context.Context, bucket string, object string, versionID string, opts madmin.HealOpts) (result madmin.HealResultItem, err error) {
|
||||||
dryRun := opts.DryRun
|
dryRun := opts.DryRun
|
||||||
|
@ -1244,6 +1244,10 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
|||||||
// Save the consolidated actual size.
|
// Save the consolidated actual size.
|
||||||
fi.Metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(objectActualSize, 10)
|
fi.Metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(objectActualSize, 10)
|
||||||
|
|
||||||
|
if opts.DataMovement {
|
||||||
|
fi.SetDataMov()
|
||||||
|
}
|
||||||
|
|
||||||
// Update all erasure metadata, make sure to not modify fields like
|
// Update all erasure metadata, make sure to not modify fields like
|
||||||
// checksum which are different on each disks.
|
// checksum which are different on each disks.
|
||||||
for index := range partsMetadata {
|
for index := range partsMetadata {
|
||||||
|
@ -1516,13 +1516,12 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
|||||||
partsMetadata[index].Metadata = userDefined
|
partsMetadata[index].Metadata = userDefined
|
||||||
partsMetadata[index].Size = n
|
partsMetadata[index].Size = n
|
||||||
partsMetadata[index].ModTime = modTime
|
partsMetadata[index].ModTime = modTime
|
||||||
}
|
if len(inlineBuffers) > 0 {
|
||||||
|
|
||||||
if len(inlineBuffers) > 0 {
|
|
||||||
// Set an additional header when data is inlined.
|
|
||||||
for index := range partsMetadata {
|
|
||||||
partsMetadata[index].SetInlineData()
|
partsMetadata[index].SetInlineData()
|
||||||
}
|
}
|
||||||
|
if opts.DataMovement {
|
||||||
|
partsMetadata[index].SetDataMov()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rename the successfully written temporary object to final location.
|
// Rename the successfully written temporary object to final location.
|
||||||
|
@ -653,7 +653,8 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
_, err = z.CompleteMultipartUpload(ctx, bucket, objInfo.Name, res.UploadID, parts, ObjectOptions{
|
_, err = z.CompleteMultipartUpload(ctx, bucket, objInfo.Name, res.UploadID, parts, ObjectOptions{
|
||||||
MTime: objInfo.ModTime,
|
DataMovement: true,
|
||||||
|
MTime: objInfo.ModTime,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("decommissionObject: CompleteMultipartUpload() %w", err)
|
err = fmt.Errorf("decommissionObject: CompleteMultipartUpload() %w", err)
|
||||||
@ -665,11 +666,13 @@ func (z *erasureServerPools) decommissionObject(ctx context.Context, bucket stri
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("decommissionObject: hash.NewReader() %w", err)
|
return fmt.Errorf("decommissionObject: hash.NewReader() %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = z.PutObject(ctx,
|
_, err = z.PutObject(ctx,
|
||||||
bucket,
|
bucket,
|
||||||
objInfo.Name,
|
objInfo.Name,
|
||||||
NewPutObjReader(hr),
|
NewPutObjReader(hr),
|
||||||
ObjectOptions{
|
ObjectOptions{
|
||||||
|
DataMovement: true,
|
||||||
VersionID: objInfo.VersionID,
|
VersionID: objInfo.VersionID,
|
||||||
MTime: objInfo.ModTime,
|
MTime: objInfo.ModTime,
|
||||||
UserDefined: objInfo.UserDefined,
|
UserDefined: objInfo.UserDefined,
|
||||||
|
@ -809,7 +809,8 @@ func (z *erasureServerPools) rebalanceObject(ctx context.Context, bucket string,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
_, err = z.CompleteMultipartUpload(ctx, bucket, oi.Name, res.UploadID, parts, ObjectOptions{
|
_, err = z.CompleteMultipartUpload(ctx, bucket, oi.Name, res.UploadID, parts, ObjectOptions{
|
||||||
MTime: oi.ModTime,
|
DataMovement: true,
|
||||||
|
MTime: oi.ModTime,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("rebalanceObject: CompleteMultipartUpload() %w", err)
|
err = fmt.Errorf("rebalanceObject: CompleteMultipartUpload() %w", err)
|
||||||
@ -821,11 +822,13 @@ func (z *erasureServerPools) rebalanceObject(ctx context.Context, bucket string,
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("rebalanceObject: hash.NewReader() %w", err)
|
return fmt.Errorf("rebalanceObject: hash.NewReader() %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = z.PutObject(ctx,
|
_, err = z.PutObject(ctx,
|
||||||
bucket,
|
bucket,
|
||||||
oi.Name,
|
oi.Name,
|
||||||
NewPutObjReader(hr),
|
NewPutObjReader(hr),
|
||||||
ObjectOptions{
|
ObjectOptions{
|
||||||
|
DataMovement: true,
|
||||||
VersionID: oi.VersionID,
|
VersionID: oi.VersionID,
|
||||||
MTime: oi.ModTime,
|
MTime: oi.ModTime,
|
||||||
UserDefined: oi.UserDefined,
|
UserDefined: oi.UserDefined,
|
||||||
|
@ -109,6 +109,8 @@ type ObjectOptions struct {
|
|||||||
// participating in a rebalance operation. Typically set for 'write' operations.
|
// participating in a rebalance operation. Typically set for 'write' operations.
|
||||||
SkipRebalancing bool
|
SkipRebalancing bool
|
||||||
|
|
||||||
|
DataMovement bool // indicates an going decommisionning or rebalacing
|
||||||
|
|
||||||
PrefixEnabledFn func(prefix string) bool // function which returns true if versioning is enabled on prefix
|
PrefixEnabledFn func(prefix string) bool // function which returns true if versioning is enabled on prefix
|
||||||
|
|
||||||
// IndexCB will return any index created but the compression.
|
// IndexCB will return any index created but the compression.
|
||||||
|
@ -1638,9 +1638,9 @@ func (x *xlMetaV2) AddVersion(fi FileInfo) error {
|
|||||||
if len(k) > len(ReservedMetadataPrefixLower) && strings.EqualFold(k[:len(ReservedMetadataPrefixLower)], ReservedMetadataPrefixLower) {
|
if len(k) > len(ReservedMetadataPrefixLower) && strings.EqualFold(k[:len(ReservedMetadataPrefixLower)], ReservedMetadataPrefixLower) {
|
||||||
// Skip tierFVID, tierFVMarker keys; it's used
|
// Skip tierFVID, tierFVMarker keys; it's used
|
||||||
// only for creating free-version.
|
// only for creating free-version.
|
||||||
// Skip xMinIOHealing, it's used only in RenameData
|
// Also skip xMinIOHealing, xMinIODataMov as used only in RenameData
|
||||||
switch k {
|
switch k {
|
||||||
case tierFVIDKey, tierFVMarkerKey, xMinIOHealing:
|
case tierFVIDKey, tierFVMarkerKey, xMinIOHealing, xMinIODataMov:
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2629,7 +2629,7 @@ func (s *xlStorage) RenameData(ctx context.Context, srcVolume, srcPath string, f
|
|||||||
}
|
}
|
||||||
diskHealthCheckOK(ctx, err)
|
diskHealthCheckOK(ctx, err)
|
||||||
|
|
||||||
if !fi.Versioned && !fi.Healing() {
|
if !fi.Versioned && !fi.DataMov() && !fi.Healing() {
|
||||||
// Use https://man7.org/linux/man-pages/man2/rename.2.html if possible on unversioned bucket.
|
// Use https://man7.org/linux/man-pages/man2/rename.2.html if possible on unversioned bucket.
|
||||||
if err := Rename2(pathutil.Join(srcVolumeDir, srcPath), pathutil.Join(dstVolumeDir, dstPath)); err == nil {
|
if err := Rename2(pathutil.Join(srcVolumeDir, srcPath), pathutil.Join(dstVolumeDir, dstPath)); err == nil {
|
||||||
return sign, nil
|
return sign, nil
|
||||||
|
Loading…
x
Reference in New Issue
Block a user