fix: inline data upon overwrites should be readable (#12369)

This PR fixes two bugs

- Remove fi.Data upon overwrite of objects from inlined-data to non-inlined-data
- Workaround for an existing bug on disk with latest releases to ignore fi.Data
  and instead read from the disk for non-inlined-data
- Addtionally add a reserved metadata header to indicate data is inlined for
  a given version.
This commit is contained in:
Harshavardhana
2021-05-25 16:33:06 -07:00
committed by GitHub
parent 4fd1378242
commit 4840974d7a
5 changed files with 60 additions and 56 deletions

View File

@@ -214,37 +214,6 @@ func (er erasureObjects) GetObjectNInfo(ctx context.Context, bucket, object stri
return fn(pr, h, opts.CheckPrecondFn, pipeCloser, nsUnlocker)
}
// GetObject - reads an object erasured coded across multiple
// disks. Supports additional parameters like offset and length
// which are synonymous with HTTP Range requests.
//
// startOffset indicates the starting read location of the object.
// length indicates the total length of the object.
func (er erasureObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) (err error) {
// Lock the object before reading.
lk := er.NewNSLock(bucket, object)
lkctx, err := lk.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return err
}
ctx = lkctx.Context()
defer lk.RUnlock(lkctx.Cancel)
// Start offset cannot be negative.
if startOffset < 0 {
logger.LogIf(ctx, errUnexpected, logger.Application)
return errUnexpected
}
// Writer cannot be nil.
if writer == nil {
logger.LogIf(ctx, errUnexpected)
return errUnexpected
}
return er.getObject(ctx, bucket, object, startOffset, length, writer, opts)
}
func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI) error {
// Reorder online disks based on erasure distribution order.
// Reorder parts metadata based on erasure distribution order.
@@ -284,6 +253,27 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje
if err != nil {
return toObjectErr(err, bucket, object)
}
// This hack is needed to avoid a bug found when overwriting
// the inline data object with a un-inlined version, for the
// time being we need this as we have released inline-data
// version already and this bug is already present in newer
// releases.
//
// This mainly happens with objects < smallFileThreshold when
// they are overwritten with un-inlined objects >= smallFileThreshold,
// due to a bug in RenameData() the fi.Data is not niled leading to
// GetObject thinking that fi.Data is valid while fi.Size has
// changed already.
if _, ok := fi.Metadata[ReservedMetadataPrefixLower+"inline-data"]; ok {
shardFileSize := erasure.ShardFileSize(fi.Size)
if shardFileSize >= 0 && shardFileSize >= smallFileThreshold {
for i := range metaArr {
metaArr[i].Data = nil
}
}
}
var healOnce sync.Once
// once we have obtained a common FileInfo i.e latest, we should stick
@@ -373,23 +363,6 @@ func (er erasureObjects) getObjectWithFileInfo(ctx context.Context, bucket, obje
return nil
}
// getObject wrapper for erasure GetObject
func (er erasureObjects) getObject(ctx context.Context, bucket, object string, startOffset, length int64, writer io.Writer, opts ObjectOptions) error {
fi, metaArr, onlineDisks, err := er.getObjectFileInfo(ctx, bucket, object, opts, true)
if err != nil {
return toObjectErr(err, bucket, object)
}
if fi.Deleted {
if opts.VersionID == "" {
return toObjectErr(errFileNotFound, bucket, object)
}
// Make sure to return object info to provide extra information.
return toObjectErr(errMethodNotAllowed, bucket, object)
}
return er.getObjectWithFileInfo(ctx, bucket, object, startOffset, length, writer, fi, metaArr, onlineDisks)
}
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
func (er erasureObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (info ObjectInfo, err error) {
if !opts.NoLock {
@@ -797,6 +770,13 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
partsMetadata[index].ModTime = modTime
}
if len(inlineBuffers) > 0 {
// Set an additional header when data is inlined.
for index := range partsMetadata {
partsMetadata[index].Metadata[ReservedMetadataPrefixLower+"inline-data"] = "true"
}
}
// Rename the successfully written temporary object to final location.
if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, bucket, object, writeQuorum); err != nil {
logger.LogIf(ctx, err)