From 54ab3a1d5b6226cec1db5bec837bf7d183e873b7 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 9 Aug 2021 06:58:54 -0700 Subject: [PATCH] implement putMetacacheObject() optimizing List operations (#12903) removes unexpected features from regular putObject() such as - increasing parity when disks are down, avoids a lot of DiskInfo() calls. - triggering MRF for metacache objects if disks are offline - avoiding renames from temporary location to actual namespace, not needed since metacache files are unique. --- cmd/erasure-object.go | 161 ++++++++++++++++++++++++++++++++++++++++++ cmd/metacache-set.go | 3 +- 2 files changed, 162 insertions(+), 2 deletions(-) diff --git a/cmd/erasure-object.go b/cmd/erasure-object.go index f964ca2a0..cdec61556 100644 --- a/cmd/erasure-object.go +++ b/cmd/erasure-object.go @@ -582,6 +582,167 @@ func rename(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBuc return evalDisks(disks, errs), err } +func (er erasureObjects) putMetacacheObject(ctx context.Context, key string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) { + data := r.Reader + + // No metadata is set, allocate a new one. + if opts.UserDefined == nil { + opts.UserDefined = make(map[string]string) + } + + storageDisks := er.getDisks() + // Get parity and data drive count based on storage class metadata + parityDrives := globalStorageClass.GetParityForSC(opts.UserDefined[xhttp.AmzStorageClass]) + if parityDrives <= 0 { + parityDrives = er.defaultParityCount + } + dataDrives := len(storageDisks) - parityDrives + + // we now know the number of blocks this object needs for data and parity. + // writeQuorum is dataBlocks + 1 + writeQuorum := dataDrives + if dataDrives == parityDrives { + writeQuorum++ + } + + // Validate input data size and it can never be less than zero. + if data.Size() < -1 { + logger.LogIf(ctx, errInvalidArgument, logger.Application) + return ObjectInfo{}, toObjectErr(errInvalidArgument) + } + + // Initialize parts metadata + partsMetadata := make([]FileInfo, len(storageDisks)) + + fi := newFileInfo(pathJoin(minioMetaBucket, key), dataDrives, parityDrives) + fi.DataDir = mustGetUUID() + + // Initialize erasure metadata. + for index := range partsMetadata { + partsMetadata[index] = fi + } + + // Order disks according to erasure distribution + var onlineDisks []StorageAPI + onlineDisks, partsMetadata = shuffleDisksAndPartsMetadata(storageDisks, partsMetadata, fi) + + erasure, err := NewErasure(ctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize) + if err != nil { + return ObjectInfo{}, toObjectErr(err, minioMetaBucket, key) + } + + // Fetch buffer for I/O, returns from the pool if not allocates a new one and returns. + var buffer []byte + switch size := data.Size(); { + case size == 0: + buffer = make([]byte, 1) // Allocate atleast a byte to reach EOF + case size >= fi.Erasure.BlockSize: + buffer = er.bp.Get() + defer er.bp.Put(buffer) + case size < fi.Erasure.BlockSize: + // No need to allocate fully blockSizeV1 buffer if the incoming data is smaller. + buffer = make([]byte, size, 2*size+int64(fi.Erasure.ParityBlocks+fi.Erasure.DataBlocks-1)) + } + + if len(buffer) > int(fi.Erasure.BlockSize) { + buffer = buffer[:fi.Erasure.BlockSize] + } + + shardFileSize := erasure.ShardFileSize(data.Size()) + writers := make([]io.Writer, len(onlineDisks)) + inlineBuffers := make([]*bytes.Buffer, len(onlineDisks)) + for i, disk := range onlineDisks { + if disk == nil { + continue + } + + inlineBuffers[i] = bytes.NewBuffer(make([]byte, 0, shardFileSize)) + writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize()) + } + + n, erasureErr := erasure.Encode(ctx, data, writers, buffer, writeQuorum) + closeBitrotWriters(writers) + if erasureErr != nil { + return ObjectInfo{}, toObjectErr(erasureErr, minioMetaBucket, key) + } + + // Should return IncompleteBody{} error when reader has fewer bytes + // than specified in request header. + if n < data.Size() { + return ObjectInfo{}, IncompleteBody{Bucket: minioMetaBucket, Object: key} + } + + for i, w := range writers { + if w == nil { + onlineDisks[i] = nil + continue + } + if len(inlineBuffers) > 0 && inlineBuffers[i] != nil { + partsMetadata[i].Data = inlineBuffers[i].Bytes() + } else { + partsMetadata[i].Data = nil + } + partsMetadata[i].AddObjectPart(1, "", n, data.ActualSize()) + partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{ + PartNumber: 1, + Algorithm: DefaultBitrotAlgorithm, + Hash: bitrotWriterSum(w), + }) + } + + modTime := UTCNow() + + // Fill all the necessary metadata. + // Update `xl.meta` content on each disks. + for index := range partsMetadata { + partsMetadata[index].Metadata = opts.UserDefined + partsMetadata[index].Size = n + partsMetadata[index].ModTime = modTime + } + + // Set an additional header when data is inlined. + for index := range partsMetadata { + partsMetadata[index].SetInlineData() + } + + for i := 0; i < len(onlineDisks); i++ { + if onlineDisks[i] != nil && onlineDisks[i].IsOnline() { + // Object info is the same in all disks, so we can pick + // the first meta from online disk + fi = partsMetadata[i] + break + } + } + + g := errgroup.WithNErrs(len(onlineDisks)) + + // Rename file on all underlying storage disks. + for index := range onlineDisks { + index := index + g.Go(func() error { + if onlineDisks[index] == nil { + return errDiskNotFound + } + // Pick one FileInfo for a disk at index. + fi := partsMetadata[index] + // Assign index when index is initialized + if fi.Erasure.Index == 0 { + fi.Erasure.Index = index + 1 + } + + if fi.IsValid() { + return onlineDisks[index].WriteMetadata(ctx, minioMetaBucket, key, fi) + } + return errFileCorrupt + }, index) + } + + // Wait for all renames to finish. + errs := g.Wait() + + return fi.ToObjectInfo(minioMetaBucket, key), reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) +} + // PutObject - creates an object upon reading from the input stream // until EOF, erasure codes the data across all disk and additionally // writes `xl.meta` which carries the necessary metadata for future diff --git a/cmd/metacache-set.go b/cmd/metacache-set.go index 6286f81e7..ef2099ed1 100644 --- a/cmd/metacache-set.go +++ b/cmd/metacache-set.go @@ -648,9 +648,8 @@ func (er *erasureObjects) saveMetaCacheStream(ctx context.Context, mc *metaCache r, err := hash.NewReader(bytes.NewReader(b.data), int64(len(b.data)), "", "", int64(len(b.data))) logger.LogIf(ctx, err) custom := b.headerKV() - _, err = er.putObject(ctx, minioMetaBucket, o.objectPath(b.n), NewPutObjReader(r), ObjectOptions{ + _, err = er.putMetacacheObject(ctx, o.objectPath(b.n), NewPutObjReader(r), ObjectOptions{ UserDefined: custom, - NoLock: true, // No need to hold namespace lock, each prefix caches uniquely. }) if err != nil { mc.setErr(err.Error())