make sure to set Versioned field to ensure rename2 is not called (#18141)

without this the rename2() can rename the previous dataDir
causing issues for different versions of the object, only
latest version is preserved due to this bug.

Added healing code to ensure recovery of such content.
This commit is contained in:
Harshavardhana 2023-09-29 09:08:24 -07:00 committed by GitHub
parent dd8547e51c
commit c34bdc33fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 44 additions and 8 deletions

View File

@ -563,11 +563,26 @@ func (er *erasureObjects) healObject(ctx context.Context, bucket string, object
} }
if !latestMeta.XLV1 && !latestMeta.Deleted && !recreate && disksToHealCount > latestMeta.Erasure.ParityBlocks { if !latestMeta.XLV1 && !latestMeta.Deleted && !recreate && disksToHealCount > latestMeta.Erasure.ParityBlocks {
// When disk to heal count is greater than parity blocks we should simply error out. // Allow for dangling deletes, on versions that have DataDir missing etc.
err := fmt.Errorf("(%d > %d) more drives are expected to heal than parity, returned errors: %v (dataErrs %v) -> %s/%s(%s)", disksToHealCount, latestMeta.Erasure.ParityBlocks, errs, dataErrs, bucket, object, versionID) // this would end up restoring the correct readable versions.
logger.LogOnceIf(ctx, err, "heal-object-count-gt-parity") m, err := er.deleteIfDangling(ctx, bucket, object, partsMetadata, errs, dataErrs, ObjectOptions{
return er.defaultHealResult(latestMeta, storageDisks, storageEndpoints, errs, VersionID: versionID,
bucket, object, versionID), err })
errs = make([]error, len(errs))
for i := range errs {
errs[i] = err
}
if err == nil {
// Dangling object successfully purged, size is '0'
m.Size = 0
}
// Generate file/version not found with default heal result
err = errFileNotFound
if versionID != "" {
err = errFileVersionNotFound
}
return er.defaultHealResult(m, storageDisks, storageEndpoints,
errs, bucket, object, versionID), err
} }
cleanFileInfo := func(fi FileInfo) FileInfo { cleanFileInfo := func(fi FileInfo) FileInfo {
@ -1077,6 +1092,22 @@ func isObjectDangling(metaArr []FileInfo, errs []error, dataErrs []error) (valid
} }
if !validMeta.IsValid() { if !validMeta.IsValid() {
// validMeta is invalid because notFoundPartsErrs is
// greater than parity blocks, thus invalidating the FileInfo{}
// every dataErrs[i], metaArr[i] is an empty FileInfo{}
dataBlocks := (len(ndataErrs) + 1) / 2
if notFoundPartsErrs > dataBlocks {
// Not using parity to ensure that we do not delete
// any valid content, if any is recoverable. But if
// notFoundDataDirs are already greater than the data
// blocks all bets are off and it is safe to purge.
//
// This is purely a defensive code, ideally parityBlocks
// is sufficient, however we can't know that since we
// do have the FileInfo{}.
return validMeta, true
}
// We have no idea what this file is, leave it as is. // We have no idea what this file is, leave it as is.
return validMeta, false return validMeta, false
} }

View File

@ -496,13 +496,16 @@ func (er erasureObjects) deleteIfDangling(ctx context.Context, bucket, object st
m, ok := isObjectDangling(metaArr, errs, dataErrs) m, ok := isObjectDangling(metaArr, errs, dataErrs)
if ok { if ok {
tags := make(map[string]interface{}, 4) tags := make(map[string]interface{}, 4)
tags["size"] = m.Size
tags["set"] = er.setIndex tags["set"] = er.setIndex
tags["pool"] = er.poolIndex tags["pool"] = er.poolIndex
tags["merrs"] = errors.Join(errs...) tags["merrs"] = errors.Join(errs...)
tags["derrs"] = errors.Join(dataErrs...) tags["derrs"] = errors.Join(dataErrs...)
tags["mtime"] = m.ModTime.Format(http.TimeFormat) if m.IsValid() {
tags["parity"] = m.Erasure.ParityBlocks tags["size"] = m.Size
tags["mtime"] = m.ModTime.Format(http.TimeFormat)
tags["parity"] = m.Erasure.ParityBlocks
}
if cok { if cok {
tags["caller"] = fmt.Sprintf("%s:%d", file, line) tags["caller"] = fmt.Sprintf("%s:%d", file, line)
} }

View File

@ -955,6 +955,8 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return return
} }
opts.Versioned = versioned
opts.VersionSuspended = suspended
// First, we compute the ETag of the multipart object. // First, we compute the ETag of the multipart object.
// The ETag of a multi-part object is always: // The ETag of a multi-part object is always: