fix: healing delete marker on versioned buckets (#10530)

Healing was not working correctly in the distributed mode because
errFileVersionNotFound was not properly converted in storage rest
client.

Besides, fixing the healing delete marker is not working as expected.
This commit is contained in:
Anis Elleuch 2020-09-21 23:16:16 +01:00 committed by GitHub
parent cd8d511d3d
commit 4c81201f95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 39 additions and 26 deletions

View File

@ -265,7 +265,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
} }
case errs[i] == errDiskNotFound, dataErrs[i] == errDiskNotFound: case errs[i] == errDiskNotFound, dataErrs[i] == errDiskNotFound:
driveState = madmin.DriveStateOffline driveState = madmin.DriveStateOffline
case errs[i] == errFileNotFound, errs[i] == errVolumeNotFound: case errs[i] == errFileNotFound, errs[i] == errFileVersionNotFound, errs[i] == errVolumeNotFound:
fallthrough fallthrough
case dataErrs[i] == errFileNotFound, dataErrs[i] == errFileVersionNotFound, dataErrs[i] == errVolumeNotFound: case dataErrs[i] == errFileNotFound, dataErrs[i] == errFileVersionNotFound, dataErrs[i] == errVolumeNotFound:
driveState = madmin.DriveStateMissing driveState = madmin.DriveStateMissing

View File

@ -75,6 +75,8 @@ func toStorageErr(err error) error {
return errVolumeExists return errVolumeExists
case errFileNotFound.Error(): case errFileNotFound.Error():
return errFileNotFound return errFileNotFound
case errFileVersionNotFound.Error():
return errFileVersionNotFound
case errFileNameTooLong.Error(): case errFileNameTooLong.Error():
return errFileNameTooLong return errFileNameTooLong
case errFileAccessDenied.Error(): case errFileAccessDenied.Error():

View File

@ -244,14 +244,25 @@ func (z *xlMetaV2) AddVersion(fi FileInfo) error {
} }
} }
dd, err := uuid.Parse(fi.DataDir) var dd uuid.UUID
if err != nil { if fi.DataDir != "" {
return err dd, err = uuid.Parse(fi.DataDir)
if err != nil {
return err
}
} }
ventry := xlMetaV2Version{ ventry := xlMetaV2Version{}
Type: ObjectType,
ObjectV2: &xlMetaV2Object{ if fi.Deleted {
ventry.Type = DeleteType
ventry.DeleteMarker = &xlMetaV2DeleteMarker{
VersionID: uv,
ModTime: fi.ModTime.UnixNano(),
}
} else {
ventry.Type = ObjectType
ventry.ObjectV2 = &xlMetaV2Object{
VersionID: uv, VersionID: uv,
DataDir: dd, DataDir: dd,
Size: fi.Size, Size: fi.Size,
@ -269,27 +280,27 @@ func (z *xlMetaV2) AddVersion(fi FileInfo) error {
PartActualSizes: make([]int64, len(fi.Parts)), PartActualSizes: make([]int64, len(fi.Parts)),
MetaSys: make(map[string][]byte), MetaSys: make(map[string][]byte),
MetaUser: make(map[string]string, len(fi.Metadata)), MetaUser: make(map[string]string, len(fi.Metadata)),
},
}
for i := range fi.Erasure.Distribution {
ventry.ObjectV2.ErasureDist[i] = uint8(fi.Erasure.Distribution[i])
}
for i := range fi.Parts {
ventry.ObjectV2.PartSizes[i] = fi.Parts[i].Size
if fi.Parts[i].ETag != "" {
ventry.ObjectV2.PartETags[i] = fi.Parts[i].ETag
} }
ventry.ObjectV2.PartNumbers[i] = fi.Parts[i].Number
ventry.ObjectV2.PartActualSizes[i] = fi.Parts[i].ActualSize
}
for k, v := range fi.Metadata { for i := range fi.Erasure.Distribution {
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) { ventry.ObjectV2.ErasureDist[i] = uint8(fi.Erasure.Distribution[i])
ventry.ObjectV2.MetaSys[k] = []byte(v) }
} else {
ventry.ObjectV2.MetaUser[k] = v for i := range fi.Parts {
ventry.ObjectV2.PartSizes[i] = fi.Parts[i].Size
if fi.Parts[i].ETag != "" {
ventry.ObjectV2.PartETags[i] = fi.Parts[i].ETag
}
ventry.ObjectV2.PartNumbers[i] = fi.Parts[i].Number
ventry.ObjectV2.PartActualSizes[i] = fi.Parts[i].ActualSize
}
for k, v := range fi.Metadata {
if strings.HasPrefix(strings.ToLower(k), ReservedMetadataPrefixLower) {
ventry.ObjectV2.MetaSys[k] = []byte(v)
} else {
ventry.ObjectV2.MetaUser[k] = v
}
} }
} }