mirror of
https://github.com/minio/minio.git
synced 2024-12-24 22:25:54 -05:00
implement putMetacacheObject() optimizing List operations (#12903)
removes unexpected features from regular putObject() such as - increasing parity when disks are down, avoids a lot of DiskInfo() calls. - triggering MRF for metacache objects if disks are offline - avoiding renames from temporary location to actual namespace, not needed since metacache files are unique.
This commit is contained in:
parent
92c94011f1
commit
54ab3a1d5b
@ -582,6 +582,167 @@ func rename(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBuc
|
|||||||
return evalDisks(disks, errs), err
|
return evalDisks(disks, errs), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (er erasureObjects) putMetacacheObject(ctx context.Context, key string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
||||||
|
data := r.Reader
|
||||||
|
|
||||||
|
// No metadata is set, allocate a new one.
|
||||||
|
if opts.UserDefined == nil {
|
||||||
|
opts.UserDefined = make(map[string]string)
|
||||||
|
}
|
||||||
|
|
||||||
|
storageDisks := er.getDisks()
|
||||||
|
// Get parity and data drive count based on storage class metadata
|
||||||
|
parityDrives := globalStorageClass.GetParityForSC(opts.UserDefined[xhttp.AmzStorageClass])
|
||||||
|
if parityDrives <= 0 {
|
||||||
|
parityDrives = er.defaultParityCount
|
||||||
|
}
|
||||||
|
dataDrives := len(storageDisks) - parityDrives
|
||||||
|
|
||||||
|
// we now know the number of blocks this object needs for data and parity.
|
||||||
|
// writeQuorum is dataBlocks + 1
|
||||||
|
writeQuorum := dataDrives
|
||||||
|
if dataDrives == parityDrives {
|
||||||
|
writeQuorum++
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate input data size and it can never be less than zero.
|
||||||
|
if data.Size() < -1 {
|
||||||
|
logger.LogIf(ctx, errInvalidArgument, logger.Application)
|
||||||
|
return ObjectInfo{}, toObjectErr(errInvalidArgument)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize parts metadata
|
||||||
|
partsMetadata := make([]FileInfo, len(storageDisks))
|
||||||
|
|
||||||
|
fi := newFileInfo(pathJoin(minioMetaBucket, key), dataDrives, parityDrives)
|
||||||
|
fi.DataDir = mustGetUUID()
|
||||||
|
|
||||||
|
// Initialize erasure metadata.
|
||||||
|
for index := range partsMetadata {
|
||||||
|
partsMetadata[index] = fi
|
||||||
|
}
|
||||||
|
|
||||||
|
// Order disks according to erasure distribution
|
||||||
|
var onlineDisks []StorageAPI
|
||||||
|
onlineDisks, partsMetadata = shuffleDisksAndPartsMetadata(storageDisks, partsMetadata, fi)
|
||||||
|
|
||||||
|
erasure, err := NewErasure(ctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize)
|
||||||
|
if err != nil {
|
||||||
|
return ObjectInfo{}, toObjectErr(err, minioMetaBucket, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch buffer for I/O, returns from the pool if not allocates a new one and returns.
|
||||||
|
var buffer []byte
|
||||||
|
switch size := data.Size(); {
|
||||||
|
case size == 0:
|
||||||
|
buffer = make([]byte, 1) // Allocate atleast a byte to reach EOF
|
||||||
|
case size >= fi.Erasure.BlockSize:
|
||||||
|
buffer = er.bp.Get()
|
||||||
|
defer er.bp.Put(buffer)
|
||||||
|
case size < fi.Erasure.BlockSize:
|
||||||
|
// No need to allocate fully blockSizeV1 buffer if the incoming data is smaller.
|
||||||
|
buffer = make([]byte, size, 2*size+int64(fi.Erasure.ParityBlocks+fi.Erasure.DataBlocks-1))
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(buffer) > int(fi.Erasure.BlockSize) {
|
||||||
|
buffer = buffer[:fi.Erasure.BlockSize]
|
||||||
|
}
|
||||||
|
|
||||||
|
shardFileSize := erasure.ShardFileSize(data.Size())
|
||||||
|
writers := make([]io.Writer, len(onlineDisks))
|
||||||
|
inlineBuffers := make([]*bytes.Buffer, len(onlineDisks))
|
||||||
|
for i, disk := range onlineDisks {
|
||||||
|
if disk == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
inlineBuffers[i] = bytes.NewBuffer(make([]byte, 0, shardFileSize))
|
||||||
|
writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize())
|
||||||
|
}
|
||||||
|
|
||||||
|
n, erasureErr := erasure.Encode(ctx, data, writers, buffer, writeQuorum)
|
||||||
|
closeBitrotWriters(writers)
|
||||||
|
if erasureErr != nil {
|
||||||
|
return ObjectInfo{}, toObjectErr(erasureErr, minioMetaBucket, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should return IncompleteBody{} error when reader has fewer bytes
|
||||||
|
// than specified in request header.
|
||||||
|
if n < data.Size() {
|
||||||
|
return ObjectInfo{}, IncompleteBody{Bucket: minioMetaBucket, Object: key}
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, w := range writers {
|
||||||
|
if w == nil {
|
||||||
|
onlineDisks[i] = nil
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if len(inlineBuffers) > 0 && inlineBuffers[i] != nil {
|
||||||
|
partsMetadata[i].Data = inlineBuffers[i].Bytes()
|
||||||
|
} else {
|
||||||
|
partsMetadata[i].Data = nil
|
||||||
|
}
|
||||||
|
partsMetadata[i].AddObjectPart(1, "", n, data.ActualSize())
|
||||||
|
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{
|
||||||
|
PartNumber: 1,
|
||||||
|
Algorithm: DefaultBitrotAlgorithm,
|
||||||
|
Hash: bitrotWriterSum(w),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
modTime := UTCNow()
|
||||||
|
|
||||||
|
// Fill all the necessary metadata.
|
||||||
|
// Update `xl.meta` content on each disks.
|
||||||
|
for index := range partsMetadata {
|
||||||
|
partsMetadata[index].Metadata = opts.UserDefined
|
||||||
|
partsMetadata[index].Size = n
|
||||||
|
partsMetadata[index].ModTime = modTime
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set an additional header when data is inlined.
|
||||||
|
for index := range partsMetadata {
|
||||||
|
partsMetadata[index].SetInlineData()
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < len(onlineDisks); i++ {
|
||||||
|
if onlineDisks[i] != nil && onlineDisks[i].IsOnline() {
|
||||||
|
// Object info is the same in all disks, so we can pick
|
||||||
|
// the first meta from online disk
|
||||||
|
fi = partsMetadata[i]
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
g := errgroup.WithNErrs(len(onlineDisks))
|
||||||
|
|
||||||
|
// Rename file on all underlying storage disks.
|
||||||
|
for index := range onlineDisks {
|
||||||
|
index := index
|
||||||
|
g.Go(func() error {
|
||||||
|
if onlineDisks[index] == nil {
|
||||||
|
return errDiskNotFound
|
||||||
|
}
|
||||||
|
// Pick one FileInfo for a disk at index.
|
||||||
|
fi := partsMetadata[index]
|
||||||
|
// Assign index when index is initialized
|
||||||
|
if fi.Erasure.Index == 0 {
|
||||||
|
fi.Erasure.Index = index + 1
|
||||||
|
}
|
||||||
|
|
||||||
|
if fi.IsValid() {
|
||||||
|
return onlineDisks[index].WriteMetadata(ctx, minioMetaBucket, key, fi)
|
||||||
|
}
|
||||||
|
return errFileCorrupt
|
||||||
|
}, index)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all renames to finish.
|
||||||
|
errs := g.Wait()
|
||||||
|
|
||||||
|
return fi.ToObjectInfo(minioMetaBucket, key), reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||||
|
}
|
||||||
|
|
||||||
// PutObject - creates an object upon reading from the input stream
|
// PutObject - creates an object upon reading from the input stream
|
||||||
// until EOF, erasure codes the data across all disk and additionally
|
// until EOF, erasure codes the data across all disk and additionally
|
||||||
// writes `xl.meta` which carries the necessary metadata for future
|
// writes `xl.meta` which carries the necessary metadata for future
|
||||||
|
@ -648,9 +648,8 @@ func (er *erasureObjects) saveMetaCacheStream(ctx context.Context, mc *metaCache
|
|||||||
r, err := hash.NewReader(bytes.NewReader(b.data), int64(len(b.data)), "", "", int64(len(b.data)))
|
r, err := hash.NewReader(bytes.NewReader(b.data), int64(len(b.data)), "", "", int64(len(b.data)))
|
||||||
logger.LogIf(ctx, err)
|
logger.LogIf(ctx, err)
|
||||||
custom := b.headerKV()
|
custom := b.headerKV()
|
||||||
_, err = er.putObject(ctx, minioMetaBucket, o.objectPath(b.n), NewPutObjReader(r), ObjectOptions{
|
_, err = er.putMetacacheObject(ctx, o.objectPath(b.n), NewPutObjReader(r), ObjectOptions{
|
||||||
UserDefined: custom,
|
UserDefined: custom,
|
||||||
NoLock: true, // No need to hold namespace lock, each prefix caches uniquely.
|
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
mc.setErr(err.Error())
|
mc.setErr(err.Error())
|
||||||
|
Loading…
Reference in New Issue
Block a user