// Copyright (c) 2015-2022 MinIO, Inc. // // This file is part of MinIO Object Storage stack // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . package cmd import ( "bytes" "context" "errors" "fmt" "io" "math/rand" "net/http" "os" "path" "sort" "strconv" "strings" "sync" "time" "github.com/dustin/go-humanize" "github.com/klauspost/readahead" "github.com/minio/madmin-go" "github.com/minio/minio-go/v7/pkg/s3utils" "github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio-go/v7/pkg/tags" "github.com/minio/minio/internal/bpool" "github.com/minio/minio/internal/bucket/lifecycle" "github.com/minio/minio/internal/bucket/object/lock" "github.com/minio/minio/internal/bucket/replication" "github.com/minio/minio/internal/event" "github.com/minio/minio/internal/hash" xhttp "github.com/minio/minio/internal/http" xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/sync/errgroup" "github.com/minio/pkg/mimedb" ) // erasureSingle - Implements single drive XL layer type erasureSingle struct { GatewayUnsupported disk StorageAPI endpoint Endpoint // Locker mutex map. nsMutex *nsLockMap // Byte pools used for temporary i/o buffers. bp *bpool.BytePoolCap deletedCleanupSleeper *dynamicSleeper // Shut down async operations shutdown context.CancelFunc format *formatErasureV3 } // Initialize new set of erasure coded sets. func newErasureSingle(ctx context.Context, storageDisk StorageAPI, format *formatErasureV3) (ObjectLayer, error) { // Number of buffers, max 2GB n := (2 * humanize.GiByte) / (blockSizeV2 * 2) // Initialize byte pool once for all sets, bpool size is set to // setCount * setDriveCount with each memory upto blockSizeV2. bp := bpool.NewBytePoolCap(n, blockSizeV2, blockSizeV2*2) // Initialize the erasure sets instance. s := &erasureSingle{ disk: storageDisk, endpoint: storageDisk.Endpoint(), format: format, nsMutex: newNSLock(false), bp: bp, deletedCleanupSleeper: newDynamicSleeper(10, 2*time.Second), } // start cleanup stale uploads go-routine. go s.cleanupStaleUploads(ctx) // start cleanup of deleted objects. go s.cleanupDeletedObjects(ctx) ctx, s.shutdown = context.WithCancel(ctx) go intDataUpdateTracker.start(ctx, s.endpoint.Path) return s, nil } // List all buckets from one of the set, we are not doing merge // sort here just for simplification. As per design it is assumed // that all buckets are present on all sets. func (es *erasureSingle) ListBuckets(ctx context.Context) (buckets []BucketInfo, err error) { var listBuckets []BucketInfo healBuckets := map[string]VolInfo{} // lists all unique buckets across drives. if err := listAllBuckets(ctx, []StorageAPI{es.disk}, healBuckets, 0); err != nil { return nil, err } for _, v := range healBuckets { listBuckets = append(listBuckets, BucketInfo(v)) } sort.Slice(listBuckets, func(i, j int) bool { return listBuckets[i].Name < listBuckets[j].Name }) for i := range listBuckets { meta, err := globalBucketMetadataSys.Get(listBuckets[i].Name) if err == nil { listBuckets[i].Created = meta.Created } } return listBuckets, nil } func (es *erasureSingle) cleanupStaleUploads(ctx context.Context) { timer := time.NewTimer(globalAPIConfig.getStaleUploadsCleanupInterval()) defer timer.Stop() for { select { case <-ctx.Done(): return case <-timer.C: es.cleanupStaleUploadsOnDisk(ctx, es.disk, globalAPIConfig.getStaleUploadsExpiry()) // Reset for the next interval timer.Reset(globalAPIConfig.getStaleUploadsCleanupInterval()) } } } // cleanup ".trash/" folder every 5m minutes with sufficient sleep cycles, between each // deletes a dynamic sleeper is used with a factor of 10 ratio with max delay between // deletes to be 2 seconds. func (es *erasureSingle) cleanupDeletedObjects(ctx context.Context) { timer := time.NewTimer(globalAPIConfig.getDeleteCleanupInterval()) defer timer.Stop() for { select { case <-ctx.Done(): return case <-timer.C: es.cleanupDeletedObjectsInner(ctx) // Reset for the next interval timer.Reset(globalAPIConfig.getDeleteCleanupInterval()) } } } // NewNSLock - initialize a new namespace RWLocker instance. func (es *erasureSingle) NewNSLock(bucket string, objects ...string) RWLocker { return es.nsMutex.NewNSLock(nil, bucket, objects...) } // Shutdown function for object storage interface. func (es *erasureSingle) Shutdown(ctx context.Context) error { defer es.shutdown() // Add any object layer shutdown activities here. closeStorageDisks(es.disk) return nil } func (es *erasureSingle) BackendInfo() (b madmin.BackendInfo) { b.Type = madmin.Erasure scParity := 0 rrSCParity := 0 // Data blocks can vary per pool, but parity is same. for _, setDriveCount := range es.SetDriveCounts() { b.StandardSCData = append(b.StandardSCData, setDriveCount-scParity) b.RRSCData = append(b.RRSCData, setDriveCount-rrSCParity) } b.StandardSCParity = scParity b.RRSCParity = rrSCParity return } // StorageInfo - returns underlying storage statistics. func (es *erasureSingle) StorageInfo(ctx context.Context) (StorageInfo, []error) { disks := []StorageAPI{es.disk} endpoints := []Endpoint{es.endpoint} storageInfo, errs := getStorageInfo(disks, endpoints) storageInfo.Backend = es.BackendInfo() return storageInfo, errs } // LocalStorageInfo - returns underlying local storage statistics. func (es *erasureSingle) LocalStorageInfo(ctx context.Context) (StorageInfo, []error) { disks := []StorageAPI{es.disk} endpoints := []Endpoint{es.endpoint} var localDisks []StorageAPI var localEndpoints []Endpoint for i, endpoint := range endpoints { if endpoint.IsLocal { localDisks = append(localDisks, disks[i]) localEndpoints = append(localEndpoints, endpoint) } } return getStorageInfo(localDisks, localEndpoints) } // Clean-up previously deleted objects. from .minio.sys/tmp/.trash/ func (es *erasureSingle) cleanupDeletedObjectsInner(ctx context.Context) { diskPath := es.disk.Endpoint().Path readDirFn(pathJoin(diskPath, minioMetaTmpDeletedBucket), func(ddir string, typ os.FileMode) error { wait := es.deletedCleanupSleeper.Timer(ctx) removeAll(pathJoin(diskPath, minioMetaTmpDeletedBucket, ddir)) wait() return nil }) } func (es *erasureSingle) renameAll(ctx context.Context, bucket, prefix string) { if es.disk != nil { es.disk.RenameFile(ctx, bucket, prefix, minioMetaTmpDeletedBucket, mustGetUUID()) } } type renameAllStorager interface { renameAll(ctx context.Context, bucket, prefix string) } // Bucket operations // MakeBucket - make a bucket. func (es *erasureSingle) MakeBucketWithLocation(ctx context.Context, bucket string, opts BucketOptions) error { defer NSUpdated(bucket, slashSeparator) // Lock the bucket name before creating. lk := es.NewNSLock(minioMetaTmpBucket, bucket+".lck") lkctx, err := lk.GetLock(ctx, globalOperationTimeout) if err != nil { return err } ctx = lkctx.Context() defer lk.Unlock(lkctx.Cancel) // Verify if bucket is valid. if !isMinioMetaBucketName(bucket) { if err := s3utils.CheckValidBucketNameStrict(bucket); err != nil { return BucketNameInvalid{Bucket: bucket} } } if err := es.disk.MakeVol(ctx, bucket); err != nil { if opts.ForceCreate && errors.Is(err, errVolumeExists) { // No need to return error when force create was // requested. return nil } if !errors.Is(err, errVolumeExists) { logger.LogIf(ctx, err) } return toObjectErr(err, bucket) } // If it doesn't exist we get a new, so ignore errors meta := newBucketMetadata(bucket) if opts.LockEnabled { meta.VersioningConfigXML = enabledBucketVersioningConfig meta.ObjectLockConfigXML = enabledBucketObjectLockConfig } if opts.VersioningEnabled { meta.VersioningConfigXML = enabledBucketVersioningConfig } if err := meta.Save(context.Background(), es); err != nil { return toObjectErr(err, bucket) } globalBucketMetadataSys.Set(bucket, meta) return nil } // GetBucketInfo - returns BucketInfo for a bucket. func (es *erasureSingle) GetBucketInfo(ctx context.Context, bucket string) (bi BucketInfo, e error) { volInfo, err := es.disk.StatVol(ctx, bucket) if err != nil { return bi, toObjectErr(err, bucket) } return BucketInfo(volInfo), nil } // DeleteBucket - deletes a bucket. func (es *erasureSingle) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error { // Collect if all disks report volume not found. defer NSUpdated(bucket, slashSeparator) err := es.disk.DeleteVol(ctx, bucket, opts.Force) return toObjectErr(err, bucket) } // IsNotificationSupported returns whether bucket notification is applicable for this layer. func (es *erasureSingle) IsNotificationSupported() bool { return true } // IsListenSupported returns whether listen bucket notification is applicable for this layer. func (es *erasureSingle) IsListenSupported() bool { return true } // IsEncryptionSupported returns whether server side encryption is implemented for this layer. func (es *erasureSingle) IsEncryptionSupported() bool { return true } // IsCompressionSupported returns whether compression is applicable for this layer. func (es *erasureSingle) IsCompressionSupported() bool { return true } // IsTaggingSupported indicates whethes *erasureSingle implements tagging support. func (es *erasureSingle) IsTaggingSupported() bool { return true } // Object Operations // CopyObject - copy object source object to destination object. // if source object and destination object are same we only // update metadata. func (es *erasureSingle) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (oi ObjectInfo, err error) { defer NSUpdated(dstBucket, dstObject) srcObject = encodeDirObject(srcObject) dstObject = encodeDirObject(dstObject) cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject)) if !dstOpts.NoLock { ns := es.NewNSLock(dstBucket, dstObject) lkctx, err := ns.GetLock(ctx, globalOperationTimeout) if err != nil { return ObjectInfo{}, err } ctx = lkctx.Context() defer ns.Unlock(lkctx.Cancel) dstOpts.NoLock = true } if cpSrcDstSame && srcInfo.metadataOnly { // Read metadata associated with the object from all disks. storageDisks := []StorageAPI{es.disk} var metaArr []FileInfo var errs []error // Read metadata associated with the object from all disks. if srcOpts.VersionID != "" { metaArr, errs = readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID, true) } else { metaArr, errs = readAllXL(ctx, storageDisks, srcBucket, srcObject, true) } readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, metaArr, errs, 0) if err != nil { return ObjectInfo{}, toObjectErr(err, srcBucket, srcObject) } // List all online disks. onlineDisks, modTime := listOnlineDisks(storageDisks, metaArr, errs) // Pick latest valid metadata. fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum) if err != nil { return oi, toObjectErr(err, srcBucket, srcObject) } if fi.Deleted { if srcOpts.VersionID == "" { return oi, toObjectErr(errFileNotFound, srcBucket, srcObject) } return fi.ToObjectInfo(srcBucket, srcObject), toObjectErr(errMethodNotAllowed, srcBucket, srcObject) } filterOnlineDisksInplace(fi, metaArr, onlineDisks) versionID := srcInfo.VersionID if srcInfo.versionOnly { versionID = dstOpts.VersionID // preserve destination versionId if specified. if versionID == "" { versionID = mustGetUUID() fi.IsLatest = true // we are creating a new version so this is latest. } modTime = UTCNow() } // If the data is not inlined, we may end up incorrectly // inlining the data here, that leads to an inconsistent // situation where some objects are were not inlined // were now inlined, make sure to `nil` the Data such // that xl.meta is written as expected. if !fi.InlineData() { fi.Data = nil } fi.VersionID = versionID // set any new versionID we might have created fi.ModTime = modTime // set modTime for the new versionID if !dstOpts.MTime.IsZero() { modTime = dstOpts.MTime fi.ModTime = dstOpts.MTime } fi.Metadata = srcInfo.UserDefined srcInfo.UserDefined["etag"] = srcInfo.ETag // Update `xl.meta` content on each disks. for index := range metaArr { if metaArr[index].IsValid() { metaArr[index].ModTime = modTime metaArr[index].VersionID = versionID metaArr[index].Metadata = srcInfo.UserDefined if !metaArr[index].InlineData() { // If the data is not inlined, we may end up incorrectly // inlining the data here, that leads to an inconsistent // situation where some objects are were not inlined // were now inlined, make sure to `nil` the Data such // that xl.meta is written as expected. metaArr[index].Data = nil } } } // Write unique `xl.meta` for each disk. if _, err = writeUniqueFileInfo(ctx, onlineDisks, srcBucket, srcObject, metaArr, writeQuorum); err != nil { return oi, toObjectErr(err, srcBucket, srcObject) } return fi.ToObjectInfo(srcBucket, srcObject), nil } putOpts := ObjectOptions{ ServerSideEncryption: dstOpts.ServerSideEncryption, UserDefined: srcInfo.UserDefined, Versioned: dstOpts.Versioned, VersionID: dstOpts.VersionID, MTime: dstOpts.MTime, NoLock: true, } return es.PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts) } // GetObjectNInfo - returns object info and an object // Read(Closer). When err != nil, the returned reader is always nil. func (es *erasureSingle) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) { if err = checkGetObjArgs(ctx, bucket, object); err != nil { return nil, err } object = encodeDirObject(object) var unlockOnDefer bool nsUnlocker := func() {} defer func() { if unlockOnDefer { nsUnlocker() } }() // Acquire lock if lockType != noLock { lock := es.NewNSLock(bucket, object) switch lockType { case writeLock: lkctx, err := lock.GetLock(ctx, globalOperationTimeout) if err != nil { return nil, err } ctx = lkctx.Context() nsUnlocker = func() { lock.Unlock(lkctx.Cancel) } case readLock: lkctx, err := lock.GetRLock(ctx, globalOperationTimeout) if err != nil { return nil, err } ctx = lkctx.Context() nsUnlocker = func() { lock.RUnlock(lkctx.Cancel) } } unlockOnDefer = true } fi, metaArr, onlineDisks, err := es.getObjectFileInfo(ctx, bucket, object, opts, true) if err != nil { return nil, toObjectErr(err, bucket, object) } objInfo := fi.ToObjectInfo(bucket, object) if objInfo.DeleteMarker { if opts.VersionID == "" { return &GetObjectReader{ ObjInfo: objInfo, }, toObjectErr(errFileNotFound, bucket, object) } // Make sure to return object info to provide extra information. return &GetObjectReader{ ObjInfo: objInfo, }, toObjectErr(errMethodNotAllowed, bucket, object) } if objInfo.IsRemote() { gr, err := getTransitionedObjectReader(ctx, bucket, object, rs, h, objInfo, opts) if err != nil { return nil, err } unlockOnDefer = false return gr.WithCleanupFuncs(nsUnlocker), nil } fn, off, length, err := NewGetObjectReader(rs, objInfo, opts) if err != nil { return nil, err } unlockOnDefer = false pr, pw := xioutil.WaitPipe() go func() { pw.CloseWithError(es.getObjectWithFileInfo(ctx, bucket, object, off, length, pw, fi, metaArr, onlineDisks)) }() // Cleanup function to cause the go routine above to exit, in // case of incomplete read. pipeCloser := func() { pr.CloseWithError(nil) } return fn(pr, h, pipeCloser, nsUnlocker) } func (es *erasureSingle) getObjectWithFileInfo(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI) error { // Reorder online disks based on erasure distribution ordes. // Reorder parts metadata based on erasure distribution ordes. onlineDisks, metaArr = shuffleDisksAndPartsMetadataByIndex(onlineDisks, metaArr, fi) // For negative length read everything. if length < 0 { length = fi.Size - startOffset } // Reply back invalid range if the input offset and length fall out of range. if startOffset > fi.Size || startOffset+length > fi.Size { logger.LogIf(ctx, InvalidRange{startOffset, length, fi.Size}, logger.Application) return InvalidRange{startOffset, length, fi.Size} } // Get start part index and offset. partIndex, partOffset, err := fi.ObjectToPartOffset(ctx, startOffset) if err != nil { return InvalidRange{startOffset, length, fi.Size} } // Calculate endOffset according to length endOffset := startOffset if length > 0 { endOffset += length - 1 } // Get last part index to read given length. lastPartIndex, _, err := fi.ObjectToPartOffset(ctx, endOffset) if err != nil { return InvalidRange{startOffset, length, fi.Size} } var totalBytesRead int64 erasure, err := NewErasure(ctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize) if err != nil { return toObjectErr(err, bucket, object) } // once we have obtained a common FileInfo i.e latest, we should stick // to single dataDir to read the content to avoid reading from some other // dataDir that has stale FileInfo{} to ensure that we fail appropriately // during reads and expect the same dataDir everywhere. dataDir := fi.DataDir for ; partIndex <= lastPartIndex; partIndex++ { if length == totalBytesRead { break } partNumber := fi.Parts[partIndex].Number // Save the current part name and size. partSize := fi.Parts[partIndex].Size partLength := partSize - partOffset // partLength should be adjusted so that we don't write more data than what was requested. if partLength > (length - totalBytesRead) { partLength = length - totalBytesRead } tillOffset := erasure.ShardFileOffset(partOffset, partLength, partSize) // Get the checksums of the current part. readers := make([]io.ReaderAt, len(onlineDisks)) prefer := make([]bool, len(onlineDisks)) for index, disk := range onlineDisks { if disk == OfflineDisk { continue } if !metaArr[index].IsValid() { continue } checksumInfo := metaArr[index].Erasure.GetChecksumInfo(partNumber) partPath := pathJoin(object, dataDir, fmt.Sprintf("part.%d", partNumber)) readers[index] = newBitrotReader(disk, metaArr[index].Data, bucket, partPath, tillOffset, checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize()) // Prefer local disks prefer[index] = disk.Hostname() == "" } _, err = erasure.Decode(ctx, writer, readers, partOffset, partLength, partSize, prefer) // Note: we should not be defer'ing the following closeBitrotReaders() call as // we are inside a for loop i.e if we use defer, we would accumulate a lot of open files by the time // we return from this function. closeBitrotReaders(readers) if err != nil { return toObjectErr(err, bucket, object) } for i, r := range readers { if r == nil { onlineDisks[i] = OfflineDisk } } // Track total bytes read from disk and written to the client. totalBytesRead += partLength // partOffset will be valid only for the first part, hence reset it to 0 for // the remaining parts. partOffset = 0 } // End of read all parts loop. // Return success. return nil } // GetObjectInfo - reads object metadata and replies back ObjectInfo. func (es *erasureSingle) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (info ObjectInfo, err error) { if err = checkGetObjArgs(ctx, bucket, object); err != nil { return info, err } object = encodeDirObject(object) if !opts.NoLock { // Lock the object before reading. lk := es.NewNSLock(bucket, object) lkctx, err := lk.GetRLock(ctx, globalOperationTimeout) if err != nil { return ObjectInfo{}, err } ctx = lkctx.Context() defer lk.RUnlock(lkctx.Cancel) } return es.getObjectInfo(ctx, bucket, object, opts) } func (es *erasureSingle) getObjectFileInfo(ctx context.Context, bucket, object string, opts ObjectOptions, readData bool) (fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI, err error) { disks := []StorageAPI{es.disk} var errs []error // Read metadata associated with the object from all disks. metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, readData) readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, 0) if err != nil { return fi, nil, nil, toObjectErr(err, bucket, object) } if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil { return fi, nil, nil, toObjectErr(reducedErr, bucket, object) } // List all online disks. onlineDisks, modTime := listOnlineDisks(disks, metaArr, errs) // Pick latest valid metadata. fi, err = pickValidFileInfo(ctx, metaArr, modTime, readQuorum) if err != nil { return fi, nil, nil, err } filterOnlineDisksInplace(fi, metaArr, onlineDisks) return fi, metaArr, onlineDisks, nil } // getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo. func (es *erasureSingle) getObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { fi, _, _, err := es.getObjectFileInfo(ctx, bucket, object, opts, false) if err != nil { return objInfo, toObjectErr(err, bucket, object) } objInfo = fi.ToObjectInfo(bucket, object) if fi.Deleted { if opts.VersionID == "" || opts.DeleteMarker { return objInfo, toObjectErr(errFileNotFound, bucket, object) } // Make sure to return object info to provide extra information. return objInfo, toObjectErr(errMethodNotAllowed, bucket, object) } return objInfo, nil } // getObjectInfoAndQuroum - wrapper for reading object metadata and constructs ObjectInfo, additionally returns write quorum for the object. func (es *erasureSingle) getObjectInfoAndQuorum(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, wquorum int, err error) { fi, _, _, err := es.getObjectFileInfo(ctx, bucket, object, opts, false) if err != nil { return objInfo, 1, toObjectErr(err, bucket, object) } wquorum = fi.Erasure.DataBlocks if fi.Erasure.DataBlocks == fi.Erasure.ParityBlocks { wquorum++ } objInfo = fi.ToObjectInfo(bucket, object) if !fi.VersionPurgeStatus().Empty() && opts.VersionID != "" { // Make sure to return object info to provide extra information. return objInfo, wquorum, toObjectErr(errMethodNotAllowed, bucket, object) } if fi.Deleted { if opts.VersionID == "" || opts.DeleteMarker { return objInfo, wquorum, toObjectErr(errFileNotFound, bucket, object) } // Make sure to return object info to provide extra information. return objInfo, wquorum, toObjectErr(errMethodNotAllowed, bucket, object) } return objInfo, wquorum, nil } func (es *erasureSingle) putMetacacheObject(ctx context.Context, key string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) { data := r.Reader // No metadata is set, allocate a new one. if opts.UserDefined == nil { opts.UserDefined = make(map[string]string) } storageDisks := []StorageAPI{es.disk} // Get parity and data drive count based on storage class metadata parityDrives := 0 dataDrives := len(storageDisks) - parityDrives // we now know the number of blocks this object needs for data and parity. // writeQuorum is dataBlocks + 1 writeQuorum := dataDrives if dataDrives == parityDrives { writeQuorum++ } // Validate input data size and it can never be less than zero. if data.Size() < -1 { logger.LogIf(ctx, errInvalidArgument, logger.Application) return ObjectInfo{}, toObjectErr(errInvalidArgument) } // Initialize parts metadata partsMetadata := make([]FileInfo, len(storageDisks)) fi := newFileInfo(pathJoin(minioMetaBucket, key), dataDrives, parityDrives) fi.DataDir = mustGetUUID() // Initialize erasure metadata. for index := range partsMetadata { partsMetadata[index] = fi } // Order disks according to erasure distribution var onlineDisks []StorageAPI onlineDisks, partsMetadata = shuffleDisksAndPartsMetadata(storageDisks, partsMetadata, fi) erasure, err := NewErasure(ctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize) if err != nil { return ObjectInfo{}, toObjectErr(err, minioMetaBucket, key) } // Fetch buffer for I/O, returns from the pool if not allocates a new one and returns. var buffer []byte switch size := data.Size(); { case size == 0: buffer = make([]byte, 1) // Allocate atleast a byte to reach EOF case size >= fi.Erasure.BlockSize: buffer = es.bp.Get() defer es.bp.Put(buffer) case size < fi.Erasure.BlockSize: // No need to allocate fully blockSizeV1 buffer if the incoming data is smaller. buffer = make([]byte, size, 2*size+int64(fi.Erasure.ParityBlocks+fi.Erasure.DataBlocks-1)) } if len(buffer) > int(fi.Erasure.BlockSize) { buffer = buffer[:fi.Erasure.BlockSize] } shardFileSize := erasure.ShardFileSize(data.Size()) writers := make([]io.Writer, len(onlineDisks)) inlineBuffers := make([]*bytes.Buffer, len(onlineDisks)) for i, disk := range onlineDisks { if disk == nil { continue } if disk.IsOnline() { inlineBuffers[i] = bytes.NewBuffer(make([]byte, 0, shardFileSize)) writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize()) } } n, erasureErr := erasure.Encode(ctx, data, writers, buffer, writeQuorum) closeBitrotWriters(writers) if erasureErr != nil { return ObjectInfo{}, toObjectErr(erasureErr, minioMetaBucket, key) } // Should return IncompleteBody{} error when reader has fewer bytes // than specified in request header. if n < data.Size() { return ObjectInfo{}, IncompleteBody{Bucket: minioMetaBucket, Object: key} } for i, w := range writers { if w == nil { // Make sure to avoid writing to disks which we couldn't complete in erasure.Encode() onlineDisks[i] = nil continue } partsMetadata[i].Data = inlineBuffers[i].Bytes() partsMetadata[i].AddObjectPart(1, "", n, data.ActualSize()) partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{ PartNumber: 1, Algorithm: DefaultBitrotAlgorithm, Hash: bitrotWriterSum(w), }) } modTime := UTCNow() // Fill all the necessary metadata. // Update `xl.meta` content on each disks. for index := range partsMetadata { partsMetadata[index].Size = n partsMetadata[index].Fresh = true partsMetadata[index].ModTime = modTime partsMetadata[index].Metadata = opts.UserDefined } // Set an additional header when data is inlined. for index := range partsMetadata { partsMetadata[index].SetInlineData() } for i := 0; i < len(onlineDisks); i++ { if onlineDisks[i] != nil && onlineDisks[i].IsOnline() { // Object info is the same in all disks, so we can pick // the first meta from online disk fi = partsMetadata[i] break } } if _, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaBucket, key, partsMetadata, writeQuorum); err != nil { return ObjectInfo{}, toObjectErr(err, minioMetaBucket, key) } return fi.ToObjectInfo(minioMetaBucket, key), nil } // PutObject - creates an object upon reading from the input stream // until EOF, erasure codes the data across all disk and additionally // writes `xl.meta` which carries the necessary metadata for future // object operations. func (es *erasureSingle) PutObject(ctx context.Context, bucket string, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) { // Validate put object input args. if err := checkPutObjectArgs(ctx, bucket, object, es); err != nil { return ObjectInfo{}, err } object = encodeDirObject(object) if !isMinioMetaBucketName(bucket) && !hasSpaceFor(getDiskInfos(ctx, es.disk), data.Size()) { return ObjectInfo{}, toObjectErr(errDiskFull) } return es.putObject(ctx, bucket, object, data, opts) } // putObject wrapper for erasureObjects PutObject func (es *erasureSingle) putObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) { data := r.Reader // No metadata is set, allocate a new one. if opts.UserDefined == nil { opts.UserDefined = make(map[string]string) } storageDisks := []StorageAPI{es.disk} parityDrives := 0 dataDrives := len(storageDisks) - parityDrives // we now know the number of blocks this object needs for data and parity. // writeQuorum is dataBlocks + 1 writeQuorum := dataDrives if dataDrives == parityDrives { writeQuorum++ } // Validate input data size and it can never be less than zero. if data.Size() < -1 { logger.LogIf(ctx, errInvalidArgument, logger.Application) return ObjectInfo{}, toObjectErr(errInvalidArgument) } // Initialize parts metadata partsMetadata := make([]FileInfo, len(storageDisks)) fi := newFileInfo(pathJoin(bucket, object), dataDrives, parityDrives) fi.VersionID = opts.VersionID if opts.Versioned && fi.VersionID == "" { fi.VersionID = mustGetUUID() } fi.DataDir = mustGetUUID() uniqueID := mustGetUUID() tempObj := uniqueID // Initialize erasure metadata. for index := range partsMetadata { partsMetadata[index] = fi } // Order disks according to erasure distribution var onlineDisks []StorageAPI onlineDisks, partsMetadata = shuffleDisksAndPartsMetadata(storageDisks, partsMetadata, fi) erasure, err := NewErasure(ctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize) if err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } // Fetch buffer for I/O, returns from the pool if not allocates a new one and returns. var buffer []byte switch size := data.Size(); { case size == 0: buffer = make([]byte, 1) // Allocate atleast a byte to reach EOF case size == -1: if size := data.ActualSize(); size > 0 && size < fi.Erasure.BlockSize { buffer = make([]byte, data.ActualSize()+256, data.ActualSize()*2+512) } else { buffer = es.bp.Get() defer es.bp.Put(buffer) } case size >= fi.Erasure.BlockSize: buffer = es.bp.Get() defer es.bp.Put(buffer) case size < fi.Erasure.BlockSize: // No need to allocate fully blockSizeV1 buffer if the incoming data is smaller. buffer = make([]byte, size, 2*size+int64(fi.Erasure.ParityBlocks+fi.Erasure.DataBlocks-1)) } if len(buffer) > int(fi.Erasure.BlockSize) { buffer = buffer[:fi.Erasure.BlockSize] } partName := "part.1" tempErasureObj := pathJoin(uniqueID, fi.DataDir, partName) // Delete temporary object in the event of failure. // If PutObject succeeded there would be no temporary // object to delete. var online int defer func() { if online != len(onlineDisks) { es.disk.RenameFile(context.Background(), minioMetaTmpBucket, tempObj, minioMetaTmpDeletedBucket, mustGetUUID()) } }() shardFileSize := erasure.ShardFileSize(data.Size()) writers := make([]io.Writer, len(onlineDisks)) var inlineBuffers []*bytes.Buffer if shardFileSize >= 0 { if !opts.Versioned && shardFileSize < smallFileThreshold { inlineBuffers = make([]*bytes.Buffer, len(onlineDisks)) } else if shardFileSize < smallFileThreshold/8 { inlineBuffers = make([]*bytes.Buffer, len(onlineDisks)) } } else { // If compressed, use actual size to determine. if sz := erasure.ShardFileSize(data.ActualSize()); sz > 0 { if !opts.Versioned && sz < smallFileThreshold { inlineBuffers = make([]*bytes.Buffer, len(onlineDisks)) } else if sz < smallFileThreshold/8 { inlineBuffers = make([]*bytes.Buffer, len(onlineDisks)) } } } for i, disk := range onlineDisks { if disk == nil { continue } if !disk.IsOnline() { continue } if len(inlineBuffers) > 0 { sz := shardFileSize if sz < 0 { sz = data.ActualSize() } inlineBuffers[i] = bytes.NewBuffer(make([]byte, 0, sz)) writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize()) continue } writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tempErasureObj, shardFileSize, DefaultBitrotAlgorithm, erasure.ShardSize()) } toEncode := io.Reader(data) if data.Size() > bigFileThreshold { // We use 2 buffers, so we always have a full buffer of input. bufA := es.bp.Get() bufB := es.bp.Get() defer es.bp.Put(bufA) defer es.bp.Put(bufB) ra, err := readahead.NewReaderBuffer(data, [][]byte{bufA[:fi.Erasure.BlockSize], bufB[:fi.Erasure.BlockSize]}) if err == nil { toEncode = ra defer ra.Close() } logger.LogIf(ctx, err) } n, erasureErr := erasure.Encode(ctx, toEncode, writers, buffer, writeQuorum) closeBitrotWriters(writers) if erasureErr != nil { return ObjectInfo{}, toObjectErr(erasureErr, minioMetaTmpBucket, tempErasureObj) } // Should return IncompleteBody{} error when reader has fewer bytes // than specified in request header. if n < data.Size() { return ObjectInfo{}, IncompleteBody{Bucket: bucket, Object: object} } if !opts.NoLock { lk := es.NewNSLock(bucket, object) lkctx, err := lk.GetLock(ctx, globalOperationTimeout) if err != nil { return ObjectInfo{}, err } ctx = lkctx.Context() defer lk.Unlock(lkctx.Cancel) } for i, w := range writers { if w == nil { onlineDisks[i] = nil continue } if len(inlineBuffers) > 0 && inlineBuffers[i] != nil { partsMetadata[i].Data = inlineBuffers[i].Bytes() } else { partsMetadata[i].Data = nil } partsMetadata[i].AddObjectPart(1, "", n, data.ActualSize()) partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{ PartNumber: 1, Algorithm: DefaultBitrotAlgorithm, Hash: bitrotWriterSum(w), }) } if opts.UserDefined["etag"] == "" { opts.UserDefined["etag"] = r.MD5CurrentHexString() } // Guess content-type from the extension if possible. if opts.UserDefined["content-type"] == "" { opts.UserDefined["content-type"] = mimedb.TypeByExtension(path.Ext(object)) } modTime := opts.MTime if opts.MTime.IsZero() { modTime = UTCNow() } // Fill all the necessary metadata. // Update `xl.meta` content on each disks. for index := range partsMetadata { partsMetadata[index].Metadata = opts.UserDefined partsMetadata[index].Size = n partsMetadata[index].ModTime = modTime } if len(inlineBuffers) > 0 { // Set an additional header when data is inlined. for index := range partsMetadata { partsMetadata[index].SetInlineData() } } // Rename the successfully written temporary object to final location. if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, bucket, object, writeQuorum); err != nil { if errors.Is(err, errFileNotFound) { return ObjectInfo{}, toObjectErr(errErasureWriteQuorum, bucket, object) } logger.LogIf(ctx, err) return ObjectInfo{}, toObjectErr(err, bucket, object) } for i := 0; i < len(onlineDisks); i++ { if onlineDisks[i] != nil && onlineDisks[i].IsOnline() { // Object info is the same in all disks, so we can pick // the first meta from online disk fi = partsMetadata[i] break } } fi.ReplicationState = opts.PutReplicationState() online = countOnlineDisks(onlineDisks) // we are adding a new version to this object under the namespace lock, so this is the latest version. fi.IsLatest = true return fi.ToObjectInfo(bucket, object), nil } func (es *erasureSingle) deleteObjectVersion(ctx context.Context, bucket, object string, writeQuorum int, fi FileInfo, forceDelMarker bool) error { return es.disk.DeleteVersion(ctx, bucket, object, fi, forceDelMarker) } // DeleteObjects deletes objects/versions in bulk, this function will still automatically split objects list // into smaller bulks if some object names are found to be duplicated in the delete list, splitting // into smaller bulks will avoid holding twice the write lock of the duplicated object names. func (es *erasureSingle) DeleteObjects(ctx context.Context, bucket string, objects []ObjectToDelete, opts ObjectOptions) ([]DeletedObject, []error) { errs := make([]error, len(objects)) dobjects := make([]DeletedObject, len(objects)) objSets := set.NewStringSet() for i := range errs { objects[i].ObjectName = encodeDirObject(objects[i].ObjectName) errs[i] = checkDelObjArgs(ctx, bucket, objects[i].ObjectName) objSets.Add(objects[i].ObjectName) } // Acquire a bulk write lock across 'objects' multiDeleteLock := es.NewNSLock(bucket, objSets.ToSlice()...) lkctx, err := multiDeleteLock.GetLock(ctx, globalOperationTimeout) if err != nil { for i := range errs { errs[i] = err } return dobjects, errs } ctx = lkctx.Context() defer multiDeleteLock.Unlock(lkctx.Cancel) writeQuorums := make([]int, len(objects)) storageDisks := []StorageAPI{es.disk} for i := range objects { // Single drive write quorum is '1' writeQuorums[i] = 1 } versionsMap := make(map[string]FileInfoVersions, len(objects)) for i := range objects { // Construct the FileInfo data that needs to be preserved on the disk. vr := FileInfo{ Name: objects[i].ObjectName, VersionID: objects[i].VersionID, ReplicationState: objects[i].ReplicationState(), // save the index to set correct error at this index. Idx: i, } vr.SetTierFreeVersionID(mustGetUUID()) // VersionID is not set means delete is not specific about // any version, look for if the bucket is versioned or not. if objects[i].VersionID == "" { if opts.Versioned || opts.VersionSuspended { // Bucket is versioned and no version was explicitly // mentioned for deletes, create a delete marker instead. vr.ModTime = UTCNow() vr.Deleted = true // Versioning suspended means that we add a `null` version // delete marker, if not add a new version for this delete // marker. if opts.Versioned { vr.VersionID = mustGetUUID() } } } // De-dup same object name to collect multiple versions for same object. v, ok := versionsMap[objects[i].ObjectName] if ok { v.Versions = append(v.Versions, vr) } else { v = FileInfoVersions{ Name: vr.Name, Versions: []FileInfo{vr}, } } if vr.Deleted { dobjects[i] = DeletedObject{ DeleteMarker: vr.Deleted, DeleteMarkerVersionID: vr.VersionID, DeleteMarkerMTime: DeleteMarkerMTime{vr.ModTime}, ObjectName: vr.Name, ReplicationState: vr.ReplicationState, } } else { dobjects[i] = DeletedObject{ ObjectName: vr.Name, VersionID: vr.VersionID, ReplicationState: vr.ReplicationState, } } versionsMap[objects[i].ObjectName] = v } dedupVersions := make([]FileInfoVersions, 0, len(versionsMap)) for _, version := range versionsMap { dedupVersions = append(dedupVersions, version) } // Initialize list of errors. delObjErrs := make([][]error, len(storageDisks)) var wg sync.WaitGroup // Remove versions in bulk for each disk for index, disk := range storageDisks { wg.Add(1) go func(index int, disk StorageAPI) { defer wg.Done() delObjErrs[index] = make([]error, len(objects)) if disk == nil { for i := range objects { delObjErrs[index][i] = errDiskNotFound } return } errs := disk.DeleteVersions(ctx, bucket, dedupVersions) for i, err := range errs { if err == nil { continue } for _, v := range dedupVersions[i].Versions { if err == errFileNotFound || err == errFileVersionNotFound { if !dobjects[v.Idx].DeleteMarker { // Not delete marker, if not found, ok. continue } } delObjErrs[index][v.Idx] = err } } }(index, disk) } wg.Wait() // Reduce errors for each object for objIndex := range objects { diskErrs := make([]error, len(storageDisks)) // Iterate over disks to fetch the error // of deleting of the current object for i := range delObjErrs { // delObjErrs[i] is not nil when disks[i] is also not nil if delObjErrs[i] != nil { diskErrs[i] = delObjErrs[i][objIndex] } } err := reduceWriteQuorumErrs(ctx, diskErrs, objectOpIgnoredErrs, writeQuorums[objIndex]) if objects[objIndex].VersionID != "" { errs[objIndex] = toObjectErr(err, bucket, objects[objIndex].ObjectName, objects[objIndex].VersionID) } else { errs[objIndex] = toObjectErr(err, bucket, objects[objIndex].ObjectName) } defer NSUpdated(bucket, objects[objIndex].ObjectName) } return dobjects, errs } func (es *erasureSingle) deletePrefix(ctx context.Context, bucket, prefix string) error { dirPrefix := encodeDirObject(prefix) defer es.disk.Delete(ctx, bucket, dirPrefix, true) return es.disk.Delete(ctx, bucket, prefix, true) } // DeleteObject - deletes an object, this call doesn't necessary reply // any error as it is not necessary for the handler to reply back a // response to the client request. func (es *erasureSingle) DeleteObject(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) { if err = checkDelObjArgs(ctx, bucket, object); err != nil { return objInfo, err } if opts.DeletePrefix { return ObjectInfo{}, toObjectErr(es.deletePrefix(ctx, bucket, object), bucket, object) } object = encodeDirObject(object) var lc *lifecycle.Lifecycle var rcfg lock.Retention if opts.Expiration.Expire { // Check if the current bucket has a configured lifecycle policy lc, _ = globalLifecycleSys.Get(bucket) rcfg, _ = globalBucketObjectLockSys.Get(bucket) } // expiration attempted on a bucket with no lifecycle // rules shall be rejected. if lc == nil && opts.Expiration.Expire { if opts.VersionID != "" { return objInfo, VersionNotFound{ Bucket: bucket, Object: object, VersionID: opts.VersionID, } } return objInfo, ObjectNotFound{ Bucket: bucket, Object: object, } } // Acquire a write lock before deleting the object. lk := es.NewNSLock(bucket, object) lkctx, err := lk.GetLock(ctx, globalDeleteOperationTimeout) if err != nil { return ObjectInfo{}, err } ctx = lkctx.Context() defer lk.Unlock(lkctx.Cancel) versionFound := true objInfo = ObjectInfo{VersionID: opts.VersionID} // version id needed in Delete API response. goi, writeQuorum, gerr := es.getObjectInfoAndQuorum(ctx, bucket, object, opts) if gerr != nil && goi.Name == "" { switch gerr.(type) { case InsufficientReadQuorum: return objInfo, InsufficientWriteQuorum{} } // For delete marker replication, versionID being replicated will not exist on disk if opts.DeleteMarker { versionFound = false } else { return objInfo, gerr } } if opts.Expiration.Expire { action := evalActionFromLifecycle(ctx, *lc, rcfg, goi, false) var isErr bool switch action { case lifecycle.NoneAction: isErr = true case lifecycle.TransitionAction, lifecycle.TransitionVersionAction: isErr = true } if isErr { if goi.VersionID != "" { return goi, VersionNotFound{ Bucket: bucket, Object: object, VersionID: goi.VersionID, } } return goi, ObjectNotFound{ Bucket: bucket, Object: object, } } } defer NSUpdated(bucket, object) var markDelete bool // Determine whether to mark object deleted for replication if goi.VersionID != "" { markDelete = true } // Default deleteMarker to true if object is under versioning deleteMarker := opts.Versioned if opts.VersionID != "" { // case where replica version needs to be deleted on target cluster if versionFound && opts.DeleteMarkerReplicationStatus() == replication.Replica { markDelete = false } if opts.VersionPurgeStatus().Empty() && opts.DeleteMarkerReplicationStatus().Empty() { markDelete = false } if opts.VersionPurgeStatus() == Complete { markDelete = false } // Version is found but we do not wish to create more delete markers // now, since VersionPurgeStatus() is already set, we can let the // lower layers decide this. This fixes a regression that was introduced // in PR #14555 where !VersionPurgeStatus.Empty() is automatically // considered as Delete marker true to avoid listing such objects by // regular ListObjects() calls. However for delete replication this // ends up being a problem because "upon" a successful delete this // ends up creating a new delete marker that is spurious and unnecessary. if versionFound { if !goi.VersionPurgeStatus.Empty() { deleteMarker = false } else if !goi.DeleteMarker { // implies a versioned delete of object deleteMarker = false } } } modTime := opts.MTime if opts.MTime.IsZero() { modTime = UTCNow() } fvID := mustGetUUID() if markDelete { if opts.Versioned || opts.VersionSuspended { if !deleteMarker { // versioning suspended means we add `null` version as // delete marker, if its not decided already. deleteMarker = opts.VersionSuspended && opts.VersionID == "" } fi := FileInfo{ Name: object, Deleted: deleteMarker, MarkDeleted: markDelete, ModTime: modTime, ReplicationState: opts.DeleteReplication, TransitionStatus: opts.Transition.Status, ExpireRestored: opts.Transition.ExpireRestored, } fi.SetTierFreeVersionID(fvID) if opts.Versioned { fi.VersionID = mustGetUUID() if opts.VersionID != "" { fi.VersionID = opts.VersionID } } // versioning suspended means we add `null` version as // delete marker. Add delete marker, since we don't have // any version specified explicitly. Or if a particular // version id needs to be replicated. if err = es.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi, opts.DeleteMarker); err != nil { return objInfo, toObjectErr(err, bucket, object) } return fi.ToObjectInfo(bucket, object), nil } } // Delete the object version on all disks. dfi := FileInfo{ Name: object, VersionID: opts.VersionID, MarkDeleted: markDelete, Deleted: deleteMarker, ModTime: modTime, ReplicationState: opts.DeleteReplication, TransitionStatus: opts.Transition.Status, ExpireRestored: opts.Transition.ExpireRestored, } dfi.SetTierFreeVersionID(fvID) if err = es.deleteObjectVersion(ctx, bucket, object, writeQuorum, dfi, opts.DeleteMarker); err != nil { return objInfo, toObjectErr(err, bucket, object) } return ObjectInfo{ Bucket: bucket, Name: object, VersionID: opts.VersionID, VersionPurgeStatusInternal: opts.DeleteReplication.VersionPurgeStatusInternal, ReplicationStatusInternal: opts.DeleteReplication.ReplicationStatusInternal, }, nil } func (es *erasureSingle) PutObjectMetadata(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) { if !opts.NoLock { // Lock the object before updating metadata. lk := es.NewNSLock(bucket, object) lkctx, err := lk.GetLock(ctx, globalOperationTimeout) if err != nil { return ObjectInfo{}, err } ctx = lkctx.Context() defer lk.Unlock(lkctx.Cancel) } disks := []StorageAPI{es.disk} var metaArr []FileInfo var errs []error // Read metadata associated with the object from all disks. metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false) readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, 0) if err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } // List all online disks. onlineDisks, modTime := listOnlineDisks(disks, metaArr, errs) // Pick latest valid metadata. fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum) if err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } if fi.Deleted { return ObjectInfo{}, toObjectErr(errMethodNotAllowed, bucket, object) } filterOnlineDisksInplace(fi, metaArr, onlineDisks) // if version-id is not specified retention is supposed to be set on the latest object. if opts.VersionID == "" { opts.VersionID = fi.VersionID } objInfo := fi.ToObjectInfo(bucket, object) if opts.EvalMetadataFn != nil { if err := opts.EvalMetadataFn(objInfo); err != nil { return ObjectInfo{}, err } } for k, v := range objInfo.UserDefined { fi.Metadata[k] = v } fi.ModTime = opts.MTime fi.VersionID = opts.VersionID if err = es.updateObjectMeta(ctx, bucket, object, fi, onlineDisks...); err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } return fi.ToObjectInfo(bucket, object), nil } // PutObjectTags - replace or add tags to an existing object func (es *erasureSingle) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) (ObjectInfo, error) { // Lock the object before updating tags. lk := es.NewNSLock(bucket, object) lkctx, err := lk.GetLock(ctx, globalOperationTimeout) if err != nil { return ObjectInfo{}, err } ctx = lkctx.Context() defer lk.Unlock(lkctx.Cancel) disks := []StorageAPI{es.disk} var metaArr []FileInfo var errs []error // Read metadata associated with the object from all disks. if opts.VersionID != "" { metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false) } else { metaArr, errs = readAllXL(ctx, disks, bucket, object, false) } readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, 0) if err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } // List all online disks. onlineDisks, modTime := listOnlineDisks(disks, metaArr, errs) // Pick latest valid metadata. fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum) if err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } if fi.Deleted { if opts.VersionID == "" { return ObjectInfo{}, toObjectErr(errFileNotFound, bucket, object) } return ObjectInfo{}, toObjectErr(errMethodNotAllowed, bucket, object) } filterOnlineDisksInplace(fi, metaArr, onlineDisks) fi.Metadata[xhttp.AmzObjectTagging] = tags fi.ReplicationState = opts.PutReplicationState() for k, v := range opts.UserDefined { fi.Metadata[k] = v } if err = es.updateObjectMeta(ctx, bucket, object, fi, onlineDisks...); err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } return fi.ToObjectInfo(bucket, object), nil } // updateObjectMeta will update the metadata of a file. func (es *erasureSingle) updateObjectMeta(ctx context.Context, bucket, object string, fi FileInfo, onlineDisks ...StorageAPI) error { if len(fi.Metadata) == 0 { return nil } g := errgroup.WithNErrs(len(onlineDisks)) // Start writing `xl.meta` to all disks in parallel. for index := range onlineDisks { index := index g.Go(func() error { if onlineDisks[index] == nil { return errDiskNotFound } return onlineDisks[index].UpdateMetadata(ctx, bucket, object, fi) }, index) } // Wait for all the routines. mErrs := g.Wait() return reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, 1) } // DeleteObjectTags - delete object tags from an existing object func (es *erasureSingle) DeleteObjectTags(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) { return es.PutObjectTags(ctx, bucket, object, "", opts) } // GetObjectTags - get object tags from an existing object func (es *erasureSingle) GetObjectTags(ctx context.Context, bucket, object string, opts ObjectOptions) (*tags.Tags, error) { // GetObjectInfo will return tag value as well oi, err := es.GetObjectInfo(ctx, bucket, object, opts) if err != nil { return nil, err } return tags.ParseObjectTags(oi.UserTags) } // TransitionObject - transition object content to target tier. func (es *erasureSingle) TransitionObject(ctx context.Context, bucket, object string, opts ObjectOptions) error { tgtClient, err := globalTierConfigMgr.getDriver(opts.Transition.Tier) if err != nil { return err } // Acquire write lock before starting to transition the object. lk := es.NewNSLock(bucket, object) lkctx, err := lk.GetLock(ctx, globalDeleteOperationTimeout) if err != nil { return err } ctx = lkctx.Context() defer lk.Unlock(lkctx.Cancel) fi, metaArr, onlineDisks, err := es.getObjectFileInfo(ctx, bucket, object, opts, true) if err != nil { return toObjectErr(err, bucket, object) } if fi.Deleted { if opts.VersionID == "" { return toObjectErr(errFileNotFound, bucket, object) } // Make sure to return object info to provide extra information. return toObjectErr(errMethodNotAllowed, bucket, object) } // verify that the object queued for transition is identical to that on disk. if !opts.MTime.Equal(fi.ModTime) || !strings.EqualFold(opts.Transition.ETag, extractETag(fi.Metadata)) { return toObjectErr(errFileNotFound, bucket, object) } // if object already transitioned, return if fi.TransitionStatus == lifecycle.TransitionComplete { return nil } defer NSUpdated(bucket, object) destObj, err := genTransitionObjName(bucket) if err != nil { return err } pr, pw := xioutil.WaitPipe() go func() { err := es.getObjectWithFileInfo(ctx, bucket, object, 0, fi.Size, pw, fi, metaArr, onlineDisks) pw.CloseWithError(err) }() var rv remoteVersionID rv, err = tgtClient.Put(ctx, destObj, pr, fi.Size) pr.CloseWithError(err) if err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to transition %s/%s(%s) to %s tier: %w", bucket, object, opts.VersionID, opts.Transition.Tier, err)) return err } fi.TransitionStatus = lifecycle.TransitionComplete fi.TransitionedObjName = destObj fi.TransitionTier = opts.Transition.Tier fi.TransitionVersionID = string(rv) eventName := event.ObjectTransitionComplete // we now know the number of blocks this object needs for data and parity. // writeQuorum is dataBlocks + 1 writeQuorum := fi.Erasure.DataBlocks if fi.Erasure.DataBlocks == fi.Erasure.ParityBlocks { writeQuorum++ } if err = es.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi, false); err != nil { eventName = event.ObjectTransitionFailed } objInfo := fi.ToObjectInfo(bucket, object) sendEvent(eventArgs{ EventName: eventName, BucketName: bucket, Object: objInfo, Host: "Internal: [ILM-Transition]", }) auditLogLifecycle(ctx, objInfo, ILMTransition) return err } // RestoreTransitionedObject - restore transitioned object content locally on this cluster. // This is similar to PostObjectRestore from AWS GLACIER // storage class. When PostObjectRestore API is called, a temporary copy of the object // is restored locally to the bucket on source cluster until the restore expiry date. // The copy that was transitioned continues to reside in the transitioned tier. func (es *erasureSingle) RestoreTransitionedObject(ctx context.Context, bucket, object string, opts ObjectOptions) error { return es.restoreTransitionedObject(ctx, bucket, object, opts) } // update restore status header in the metadata func (es *erasureSingle) updateRestoreMetadata(ctx context.Context, bucket, object string, objInfo ObjectInfo, opts ObjectOptions, rerr error) error { oi := objInfo.Clone() oi.metadataOnly = true // Perform only metadata updates. if rerr == nil { oi.UserDefined[xhttp.AmzRestore] = completedRestoreObj(opts.Transition.RestoreExpiry).String() } else { // allow retry in the case of failure to restore delete(oi.UserDefined, xhttp.AmzRestore) } if _, err := es.CopyObject(ctx, bucket, object, bucket, object, oi, ObjectOptions{ VersionID: oi.VersionID, }, ObjectOptions{ VersionID: oi.VersionID, }); err != nil { logger.LogIf(ctx, fmt.Errorf("Unable to update transition restore metadata for %s/%s(%s): %s", bucket, object, oi.VersionID, err)) return err } return nil } // restoreTransitionedObject for multipart object chunks the file stream from remote tier into the same number of parts // as in the xl.meta for this version and rehydrates the part.n into the fi.DataDir for this version as in the xl.meta func (es *erasureSingle) restoreTransitionedObject(ctx context.Context, bucket string, object string, opts ObjectOptions) error { setRestoreHeaderFn := func(oi ObjectInfo, rerr error) error { es.updateRestoreMetadata(ctx, bucket, object, oi, opts, rerr) return rerr } var oi ObjectInfo // get the file info on disk for transitioned object actualfi, _, _, err := es.getObjectFileInfo(ctx, bucket, object, opts, false) if err != nil { return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object)) } oi = actualfi.ToObjectInfo(bucket, object) ropts := putRestoreOpts(bucket, object, opts.Transition.RestoreRequest, oi) if len(oi.Parts) == 1 { var rs *HTTPRangeSpec gr, err := getTransitionedObjectReader(ctx, bucket, object, rs, http.Header{}, oi, opts) if err != nil { return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object)) } defer gr.Close() hashReader, err := hash.NewReader(gr, gr.ObjInfo.Size, "", "", gr.ObjInfo.Size) if err != nil { return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object)) } pReader := NewPutObjReader(hashReader) ropts.UserDefined[xhttp.AmzRestore] = completedRestoreObj(opts.Transition.RestoreExpiry).String() _, err = es.PutObject(ctx, bucket, object, pReader, ropts) return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object)) } uploadID, err := es.NewMultipartUpload(ctx, bucket, object, ropts) if err != nil { return setRestoreHeaderFn(oi, err) } var uploadedParts []CompletePart var rs *HTTPRangeSpec // get reader from the warm backend - note that even in the case of encrypted objects, this stream is still encrypted. gr, err := getTransitionedObjectReader(ctx, bucket, object, rs, http.Header{}, oi, opts) if err != nil { return setRestoreHeaderFn(oi, err) } defer gr.Close() // rehydrate the parts back on disk as per the original xl.meta prior to transition for _, partInfo := range oi.Parts { hr, err := hash.NewReader(gr, partInfo.Size, "", "", partInfo.Size) if err != nil { return setRestoreHeaderFn(oi, err) } pInfo, err := es.PutObjectPart(ctx, bucket, object, uploadID, partInfo.Number, NewPutObjReader(hr), ObjectOptions{}) if err != nil { return setRestoreHeaderFn(oi, err) } if pInfo.Size != partInfo.Size { return setRestoreHeaderFn(oi, InvalidObjectState{Bucket: bucket, Object: object}) } uploadedParts = append(uploadedParts, CompletePart{ PartNumber: pInfo.PartNumber, ETag: pInfo.ETag, }) } _, err = es.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, ObjectOptions{ MTime: oi.ModTime, }) return setRestoreHeaderFn(oi, err) } func (es *erasureSingle) getUploadIDDir(bucket, object, uploadID string) string { return pathJoin(es.getMultipartSHADir(bucket, object), uploadID) } func (es *erasureSingle) getMultipartSHADir(bucket, object string) string { return getSHA256Hash([]byte(pathJoin(bucket, object))) } // checkUploadIDExists - verify if a given uploadID exists and is valid. func (es *erasureSingle) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string) (err error) { defer func() { if err == errFileNotFound { err = errUploadIDNotFound } }() disks := []StorageAPI{es.disk} // Read metadata associated with the object from all disks. metaArr, errs := readAllFileInfo(ctx, disks, minioMetaMultipartBucket, es.getUploadIDDir(bucket, object, uploadID), "", false) readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, 0) if err != nil { return err } if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil { return reducedErr } // List all online disks. _, modTime := listOnlineDisks(disks, metaArr, errs) // Pick latest valid metadata. _, err = pickValidFileInfo(ctx, metaArr, modTime, readQuorum) return err } // Removes part given by partName belonging to a mulitpart upload from minioMetaBucket func (es *erasureSingle) removeObjectPart(bucket, object, uploadID, dataDir string, partNumber int) { uploadIDPath := es.getUploadIDDir(bucket, object, uploadID) curpartPath := pathJoin(uploadIDPath, dataDir, fmt.Sprintf("part.%d", partNumber)) storageDisks := []StorageAPI{es.disk} g := errgroup.WithNErrs(len(storageDisks)) for index, disk := range storageDisks { if disk == nil { continue } index := index g.Go(func() error { // Ignoring failure to remove parts that weren't present in CompleteMultipartUpload // requests. xl.meta is the authoritative source of truth on which parts constitute // the object. The presence of parts that don't belong in the object doesn't affect correctness. _ = storageDisks[index].Delete(context.TODO(), minioMetaMultipartBucket, curpartPath, false) return nil }, index) } g.Wait() } // Remove the old multipart uploads on the given disk. func (es *erasureSingle) cleanupStaleUploadsOnDisk(ctx context.Context, disk StorageAPI, expiry time.Duration) { now := time.Now() diskPath := disk.Endpoint().Path readDirFn(pathJoin(diskPath, minioMetaMultipartBucket), func(shaDir string, typ os.FileMode) error { return readDirFn(pathJoin(diskPath, minioMetaMultipartBucket, shaDir), func(uploadIDDir string, typ os.FileMode) error { uploadIDPath := pathJoin(shaDir, uploadIDDir) fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "", false) if err != nil { return nil } wait := es.deletedCleanupSleeper.Timer(ctx) if now.Sub(fi.ModTime) > expiry { es.disk.RenameFile(context.Background(), minioMetaMultipartBucket, uploadIDPath, minioMetaTmpDeletedBucket, mustGetUUID()) } wait() return nil }) }) readDirFn(pathJoin(diskPath, minioMetaTmpBucket), func(tmpDir string, typ os.FileMode) error { if tmpDir == ".trash/" { // do not remove .trash/ here, it has its own routines return nil } vi, err := disk.StatVol(ctx, pathJoin(minioMetaTmpBucket, tmpDir)) if err != nil { return nil } wait := es.deletedCleanupSleeper.Timer(ctx) if now.Sub(vi.Created) > expiry { disk.Delete(ctx, minioMetaTmpBucket, tmpDir, true) } wait() return nil }) } // ListMultipartUploads - lists all the pending multipart // uploads for a particular object in a bucket. // // Implements minimal S3 compatible ListMultipartUploads API. We do // not support prefix based listing, this is a deliberate attempt // towards simplification of multipart APIs. // The resulting ListMultipartsInfo structure is unmarshalled directly as XML. func (es *erasureSingle) ListMultipartUploads(ctx context.Context, bucket, object, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) { if err := checkListMultipartArgs(ctx, bucket, object, keyMarker, uploadIDMarker, delimiter, es); err != nil { return ListMultipartsInfo{}, err } result.MaxUploads = maxUploads result.KeyMarker = keyMarker result.Prefix = object result.Delimiter = delimiter uploadIDs, err := es.disk.ListDir(ctx, minioMetaMultipartBucket, es.getMultipartSHADir(bucket, object), -1) if err != nil { if err == errFileNotFound { return result, nil } logger.LogIf(ctx, err) return result, toObjectErr(err, bucket, object) } for i := range uploadIDs { uploadIDs[i] = strings.TrimSuffix(uploadIDs[i], SlashSeparator) } // S3 spec says uploadIDs should be sorted based on initiated time, we need // to read the metadata entry. var uploads []MultipartInfo populatedUploadIds := set.NewStringSet() for _, uploadID := range uploadIDs { if populatedUploadIds.Contains(uploadID) { continue } fi, err := es.disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(es.getUploadIDDir(bucket, object, uploadID)), "", false) if err != nil { return result, toObjectErr(err, bucket, object) } populatedUploadIds.Add(uploadID) uploads = append(uploads, MultipartInfo{ Object: object, UploadID: uploadID, Initiated: fi.ModTime, }) } sort.Slice(uploads, func(i int, j int) bool { return uploads[i].Initiated.Before(uploads[j].Initiated) }) uploadIndex := 0 if uploadIDMarker != "" { for uploadIndex < len(uploads) { if uploads[uploadIndex].UploadID != uploadIDMarker { uploadIndex++ continue } if uploads[uploadIndex].UploadID == uploadIDMarker { uploadIndex++ break } uploadIndex++ } } for uploadIndex < len(uploads) { result.Uploads = append(result.Uploads, uploads[uploadIndex]) result.NextUploadIDMarker = uploads[uploadIndex].UploadID uploadIndex++ if len(result.Uploads) == maxUploads { break } } result.IsTruncated = uploadIndex < len(uploads) if !result.IsTruncated { result.NextKeyMarker = "" result.NextUploadIDMarker = "" } return result, nil } // newMultipartUpload - wrapper for initializing a new multipart // request; returns a unique upload id. // // Internally this function creates 'uploads.json' associated for the // incoming object at // '.minio.sys/multipart/bucket/object/uploads.json' on all the // disks. `uploads.json` carries metadata regarding on-going multipart // operation(s) on the object. func (es *erasureSingle) newMultipartUpload(ctx context.Context, bucket string, object string, opts ObjectOptions) (string, error) { onlineDisks := []StorageAPI{es.disk} parityDrives := 0 dataDrives := len(onlineDisks) - parityDrives // we now know the number of blocks this object needs for data and parity. // establish the writeQuorum using this data writeQuorum := dataDrives if dataDrives == parityDrives { writeQuorum++ } // Initialize parts metadata partsMetadata := make([]FileInfo, len(onlineDisks)) fi := newFileInfo(pathJoin(bucket, object), dataDrives, parityDrives) fi.VersionID = opts.VersionID if opts.Versioned && fi.VersionID == "" { fi.VersionID = mustGetUUID() } fi.DataDir = mustGetUUID() // Initialize erasure metadata. for index := range partsMetadata { partsMetadata[index] = fi } // Guess content-type from the extension if possible. if opts.UserDefined["content-type"] == "" { opts.UserDefined["content-type"] = mimedb.TypeByExtension(path.Ext(object)) } modTime := opts.MTime if opts.MTime.IsZero() { modTime = UTCNow() } onlineDisks, partsMetadata = shuffleDisksAndPartsMetadata(onlineDisks, partsMetadata, fi) // Fill all the necessary metadata. // Update `xl.meta` content on each disks. for index := range partsMetadata { partsMetadata[index].Fresh = true partsMetadata[index].ModTime = modTime partsMetadata[index].Metadata = opts.UserDefined } uploadID := mustGetUUID() uploadIDPath := es.getUploadIDDir(bucket, object, uploadID) // Write updated `xl.meta` to all disks. if _, err := writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil { return "", toObjectErr(err, minioMetaMultipartBucket, uploadIDPath) } // Return success. return uploadID, nil } // NewMultipartUpload - initialize a new multipart upload, returns a // unique id. The unique id returned here is of UUID form, for each // subsequent request each UUID is unique. // // Implements S3 compatible initiate multipart API. func (es *erasureSingle) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (string, error) { if err := checkNewMultipartArgs(ctx, bucket, object, es); err != nil { return "", err } // No metadata is set, allocate a new one. if opts.UserDefined == nil { opts.UserDefined = make(map[string]string) } return es.newMultipartUpload(ctx, bucket, object, opts) } // CopyObjectPart - reads incoming stream and internally erasure codes // them. This call is similar to put object part operation but the source // data is read from an existing object. // // Implements S3 compatible Upload Part Copy API. func (es *erasureSingle) CopyObjectPart(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject, uploadID string, partID int, startOffset int64, length int64, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (pi PartInfo, e error) { partInfo, err := es.PutObjectPart(ctx, dstBucket, dstObject, uploadID, partID, NewPutObjReader(srcInfo.Reader), dstOpts) if err != nil { return pi, toObjectErr(err, dstBucket, dstObject) } // Success. return partInfo, nil } // PutObjectPart - reads incoming stream and internally erasure codes // them. This call is similar to single put operation but it is part // of the multipart transaction. // // Implements S3 compatible Upload Part API. func (es *erasureSingle) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) { if err := checkPutObjectPartArgs(ctx, bucket, object, es); err != nil { return PartInfo{}, err } // Write lock for this part ID. // Held throughout the operation. partIDLock := es.NewNSLock(bucket, pathJoin(object, uploadID, strconv.Itoa(partID))) plkctx, err := partIDLock.GetLock(ctx, globalOperationTimeout) if err != nil { return PartInfo{}, err } pctx := plkctx.Context() defer partIDLock.Unlock(plkctx.Cancel) // Read lock for upload id. // Only held while reading the upload metadata. uploadIDRLock := es.NewNSLock(bucket, pathJoin(object, uploadID)) rlkctx, err := uploadIDRLock.GetRLock(ctx, globalOperationTimeout) if err != nil { return PartInfo{}, err } rctx := rlkctx.Context() defer func() { if uploadIDRLock != nil { uploadIDRLock.RUnlock(rlkctx.Cancel) } }() data := r.Reader // Validate input data size and it can never be less than zero. if data.Size() < -1 { logger.LogIf(rctx, errInvalidArgument, logger.Application) return pi, toObjectErr(errInvalidArgument) } var partsMetadata []FileInfo var errs []error uploadIDPath := es.getUploadIDDir(bucket, object, uploadID) // Validates if upload ID exists. if err = es.checkUploadIDExists(rctx, bucket, object, uploadID); err != nil { return pi, toObjectErr(err, bucket, object, uploadID) } storageDisks := []StorageAPI{es.disk} // Read metadata associated with the object from all disks. partsMetadata, errs = readAllFileInfo(rctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false) // Unlock upload id locks before, so others can get it. uploadIDRLock.RUnlock(rlkctx.Cancel) uploadIDRLock = nil // get Quorum for this object _, writeQuorum, err := objectQuorumFromMeta(pctx, partsMetadata, errs, 0) if err != nil { return pi, toObjectErr(err, bucket, object) } reducedErr := reduceWriteQuorumErrs(pctx, errs, objectOpIgnoredErrs, writeQuorum) if reducedErr == errErasureWriteQuorum { return pi, toObjectErr(reducedErr, bucket, object) } // List all online disks. onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) // Pick one from the first valid metadata. fi, err := pickValidFileInfo(pctx, partsMetadata, modTime, writeQuorum) if err != nil { return pi, err } onlineDisks = shuffleDisks(onlineDisks, fi.Erasure.Distribution) // Need a unique name for the part being written in minioMetaBucket to // accommodate concurrent PutObjectPart requests partSuffix := fmt.Sprintf("part.%d", partID) tmpPart := mustGetUUID() tmpPartPath := pathJoin(tmpPart, partSuffix) // Delete the temporary object part. If PutObjectPart succeeds there would be nothing to delete. var online int defer func() { if online != len(onlineDisks) { es.disk.RenameFile(context.Background(), minioMetaTmpBucket, tmpPart, minioMetaTmpDeletedBucket, mustGetUUID()) } }() erasure, err := NewErasure(pctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize) if err != nil { return pi, toObjectErr(err, bucket, object) } // Fetch buffer for I/O, returns from the pool if not allocates a new one and returns. var buffer []byte switch size := data.Size(); { case size == 0: buffer = make([]byte, 1) // Allocate atleast a byte to reach EOF case size == -1: if size := data.ActualSize(); size > 0 && size < fi.Erasure.BlockSize { buffer = make([]byte, data.ActualSize()+256, data.ActualSize()*2+512) } else { buffer = es.bp.Get() defer es.bp.Put(buffer) } case size >= fi.Erasure.BlockSize: buffer = es.bp.Get() defer es.bp.Put(buffer) case size < fi.Erasure.BlockSize: // No need to allocate fully fi.Erasure.BlockSize buffer if the incoming data is smalles. buffer = make([]byte, size, 2*size+int64(fi.Erasure.ParityBlocks+fi.Erasure.DataBlocks-1)) } if len(buffer) > int(fi.Erasure.BlockSize) { buffer = buffer[:fi.Erasure.BlockSize] } writers := make([]io.Writer, len(onlineDisks)) for i, disk := range onlineDisks { if disk == nil { continue } writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tmpPartPath, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize()) } toEncode := io.Reader(data) if data.Size() > bigFileThreshold { // Add input readahead. // We use 2 buffers, so we always have a full buffer of input. bufA := es.bp.Get() bufB := es.bp.Get() defer es.bp.Put(bufA) defer es.bp.Put(bufB) ra, err := readahead.NewReaderBuffer(data, [][]byte{bufA[:fi.Erasure.BlockSize], bufB[:fi.Erasure.BlockSize]}) if err == nil { toEncode = ra defer ra.Close() } } n, err := erasure.Encode(pctx, toEncode, writers, buffer, writeQuorum) closeBitrotWriters(writers) if err != nil { return pi, toObjectErr(err, bucket, object) } // Should return IncompleteBody{} error when reader has fewer bytes // than specified in request header. if n < data.Size() { return pi, IncompleteBody{Bucket: bucket, Object: object} } for i := range writers { if writers[i] == nil { onlineDisks[i] = nil } } // Acquire write lock to update metadata. uploadIDWLock := es.NewNSLock(bucket, pathJoin(object, uploadID)) wlkctx, err := uploadIDWLock.GetLock(pctx, globalOperationTimeout) if err != nil { return PartInfo{}, err } wctx := wlkctx.Context() defer uploadIDWLock.Unlock(wlkctx.Cancel) // Validates if upload ID exists. if err = es.checkUploadIDExists(wctx, bucket, object, uploadID); err != nil { return pi, toObjectErr(err, bucket, object, uploadID) } // Rename temporary part file to its final location. partPath := pathJoin(uploadIDPath, fi.DataDir, partSuffix) onlineDisks, err = renamePart(wctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, writeQuorum) if err != nil { return pi, toObjectErr(err, minioMetaMultipartBucket, partPath) } // Read metadata again because it might be updated with parallel upload of another part. partsMetadata, errs = readAllFileInfo(wctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, "", false) reducedErr = reduceWriteQuorumErrs(wctx, errs, objectOpIgnoredErrs, writeQuorum) if reducedErr == errErasureWriteQuorum { return pi, toObjectErr(reducedErr, bucket, object) } // Get current highest version based on re-read partsMetadata. onlineDisks, modTime = listOnlineDisks(onlineDisks, partsMetadata, errs) // Pick one from the first valid metadata. fi, err = pickValidFileInfo(wctx, partsMetadata, modTime, writeQuorum) if err != nil { return pi, err } // Once part is successfully committed, proceed with updating erasure metadata. fi.ModTime = UTCNow() md5hex := r.MD5CurrentHexString() // Add the current part. fi.AddObjectPart(partID, md5hex, n, data.ActualSize()) for i, disk := range onlineDisks { if disk == OfflineDisk { continue } partsMetadata[i].Size = fi.Size partsMetadata[i].ModTime = fi.ModTime partsMetadata[i].Parts = fi.Parts partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{ PartNumber: partID, Algorithm: DefaultBitrotAlgorithm, Hash: bitrotWriterSum(writers[i]), }) } // Writes update `xl.meta` format for each disk. if _, err = writeUniqueFileInfo(wctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil { return pi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath) } online = countOnlineDisks(onlineDisks) // Return success. return PartInfo{ PartNumber: partID, ETag: md5hex, LastModified: fi.ModTime, Size: n, ActualSize: data.ActualSize(), }, nil } // GetMultipartInfo returns multipart metadata uploaded during newMultipartUpload, used // by callers to verify object states // - encrypted // - compressed func (es *erasureSingle) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (MultipartInfo, error) { if err := checkListPartsArgs(ctx, bucket, object, es); err != nil { return MultipartInfo{}, err } result := MultipartInfo{ Bucket: bucket, Object: object, UploadID: uploadID, } uploadIDLock := es.NewNSLock(bucket, pathJoin(object, uploadID)) lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) if err != nil { return MultipartInfo{}, err } ctx = lkctx.Context() defer uploadIDLock.RUnlock(lkctx.Cancel) if err := es.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { return result, toObjectErr(err, bucket, object, uploadID) } uploadIDPath := es.getUploadIDDir(bucket, object, uploadID) storageDisks := []StorageAPI{es.disk} // Read metadata associated with the object from all disks. partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, opts.VersionID, false) // get Quorum for this object readQuorum, _, err := objectQuorumFromMeta(ctx, partsMetadata, errs, 0) if err != nil { return result, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath) } reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum) if reducedErr == errErasureReadQuorum { return result, toObjectErr(reducedErr, minioMetaMultipartBucket, uploadIDPath) } _, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) // Pick one from the first valid metadata. fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, readQuorum) if err != nil { return result, err } result.UserDefined = cloneMSS(fi.Metadata) return result, nil } // ListObjectParts - lists all previously uploaded parts for a given // object and uploadID. Takes additional input of part-number-marker // to indicate where the listing should begin from. // // Implements S3 compatible ListObjectParts API. The resulting // ListPartsInfo structure is marshaled directly into XML and // replied back to the client. func (es *erasureSingle) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) { if err := checkListPartsArgs(ctx, bucket, object, es); err != nil { return ListPartsInfo{}, err } uploadIDLock := es.NewNSLock(bucket, pathJoin(object, uploadID)) lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) if err != nil { return ListPartsInfo{}, err } ctx = lkctx.Context() defer uploadIDLock.RUnlock(lkctx.Cancel) if err := es.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { return result, toObjectErr(err, bucket, object, uploadID) } uploadIDPath := es.getUploadIDDir(bucket, object, uploadID) storageDisks := []StorageAPI{es.disk} // Read metadata associated with the object from all disks. partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false) // get Quorum for this object _, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs, 0) if err != nil { return result, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath) } reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum) if reducedErr == errErasureWriteQuorum { return result, toObjectErr(reducedErr, minioMetaMultipartBucket, uploadIDPath) } _, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) // Pick one from the first valid metadata. fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, writeQuorum) if err != nil { return result, err } // Populate the result stub. result.Bucket = bucket result.Object = object result.UploadID = uploadID result.MaxParts = maxParts result.PartNumberMarker = partNumberMarker result.UserDefined = cloneMSS(fi.Metadata) // For empty number of parts or maxParts as zero, return right here. if len(fi.Parts) == 0 || maxParts == 0 { return result, nil } // Limit output to maxPartsList. if maxParts > maxPartsList { maxParts = maxPartsList } // Only parts with higher part numbers will be listed. partIdx := objectPartIndex(fi.Parts, partNumberMarker) parts := fi.Parts if partIdx != -1 { parts = fi.Parts[partIdx+1:] } count := maxParts for _, part := range parts { result.Parts = append(result.Parts, PartInfo{ PartNumber: part.Number, ETag: part.ETag, LastModified: fi.ModTime, Size: part.Size, }) count-- if count == 0 { break } } // If listed entries are more than maxParts, we set IsTruncated as true. if len(parts) > len(result.Parts) { result.IsTruncated = true // Make sure to fill next part number marker if IsTruncated is // true for subsequent listing. nextPartNumberMarker := result.Parts[len(result.Parts)-1].PartNumber result.NextPartNumberMarker = nextPartNumberMarker } return result, nil } // CompleteMultipartUpload - completes an ongoing multipart // transaction after receiving all the parts indicated by the client. // Returns an md5sum calculated by concatenating all the individual // md5sums of all the parts. // // Implements S3 compatible Complete multipart API. func (es *erasureSingle) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, parts []CompletePart, opts ObjectOptions) (oi ObjectInfo, err error) { if err = checkCompleteMultipartArgs(ctx, bucket, object, es); err != nil { return oi, err } // Hold read-locks to verify uploaded parts, also disallows // parallel part uploads as well. uploadIDLock := es.NewNSLock(bucket, pathJoin(object, uploadID)) rlkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout) if err != nil { return oi, err } rctx := rlkctx.Context() defer uploadIDLock.RUnlock(rlkctx.Cancel) if err = es.checkUploadIDExists(rctx, bucket, object, uploadID); err != nil { return oi, toObjectErr(err, bucket, object, uploadID) } uploadIDPath := es.getUploadIDDir(bucket, object, uploadID) storageDisks := []StorageAPI{es.disk} // Read metadata associated with the object from all disks. partsMetadata, errs := readAllFileInfo(rctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false) // get Quorum for this object _, writeQuorum, err := objectQuorumFromMeta(rctx, partsMetadata, errs, 0) if err != nil { return oi, toObjectErr(err, bucket, object) } reducedErr := reduceWriteQuorumErrs(rctx, errs, objectOpIgnoredErrs, writeQuorum) if reducedErr == errErasureWriteQuorum { return oi, toObjectErr(reducedErr, bucket, object) } onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) // Pick one from the first valid metadata. fi, err := pickValidFileInfo(rctx, partsMetadata, modTime, writeQuorum) if err != nil { return oi, err } // Calculate full object size. var objectSize int64 // Calculate consolidated actual size. var objectActualSize int64 // Order online disks in accordance with distribution order. // Order parts metadata in accordance with distribution order. onlineDisks, partsMetadata = shuffleDisksAndPartsMetadataByIndex(onlineDisks, partsMetadata, fi) // Save current erasure metadata for validation. currentFI := fi // Allocate parts similar to incoming slice. fi.Parts = make([]ObjectPartInfo, len(parts)) // Validate each part and then commit to disk. for i, part := range parts { partIdx := objectPartIndex(currentFI.Parts, part.PartNumber) // All parts should have same part number. if partIdx == -1 { invp := InvalidPart{ PartNumber: part.PartNumber, GotETag: part.ETag, } return oi, invp } // ensure that part ETag is canonicalized to strip off extraneous quotes part.ETag = canonicalizeETag(part.ETag) if currentFI.Parts[partIdx].ETag != part.ETag { invp := InvalidPart{ PartNumber: part.PartNumber, ExpETag: currentFI.Parts[partIdx].ETag, GotETag: part.ETag, } return oi, invp } // All parts except the last part has to be atleast 5MB. if (i < len(parts)-1) && !isMinAllowedPartSize(currentFI.Parts[partIdx].ActualSize) { return oi, PartTooSmall{ PartNumber: part.PartNumber, PartSize: currentFI.Parts[partIdx].ActualSize, PartETag: part.ETag, } } // Save for total object size. objectSize += currentFI.Parts[partIdx].Size // Save the consolidated actual size. objectActualSize += currentFI.Parts[partIdx].ActualSize // Add incoming parts. fi.Parts[i] = ObjectPartInfo{ Number: part.PartNumber, Size: currentFI.Parts[partIdx].Size, ActualSize: currentFI.Parts[partIdx].ActualSize, } } // Save the final object size and modtime. fi.Size = objectSize fi.ModTime = opts.MTime if opts.MTime.IsZero() { fi.ModTime = UTCNow() } // Save successfully calculated md5sum. fi.Metadata["etag"] = opts.UserDefined["etag"] if fi.Metadata["etag"] == "" { fi.Metadata["etag"] = getCompleteMultipartMD5(parts) } // Save the consolidated actual size. fi.Metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(objectActualSize, 10) // Update all erasure metadata, make sure to not modify fields like // checksum which are different on each disks. for index := range partsMetadata { if partsMetadata[index].IsValid() { partsMetadata[index].Size = fi.Size partsMetadata[index].ModTime = fi.ModTime partsMetadata[index].Metadata = fi.Metadata partsMetadata[index].Parts = fi.Parts } } // Hold namespace to complete the transaction lk := es.NewNSLock(bucket, object) lkctx, err := lk.GetLock(ctx, globalOperationTimeout) if err != nil { return oi, err } ctx = lkctx.Context() defer lk.Unlock(lkctx.Cancel) // Write final `xl.meta` at uploadID location onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum) if err != nil { return oi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath) } // Remove parts that weren't present in CompleteMultipartUpload request. for _, curpart := range currentFI.Parts { if objectPartIndex(fi.Parts, curpart.Number) == -1 { // Delete the missing part files. e.g, // Request 1: NewMultipart // Request 2: PutObjectPart 1 // Request 3: PutObjectPart 2 // Request 4: CompleteMultipartUpload --part 2 // N.B. 1st part is not present. This part should be removed from the storage. es.removeObjectPart(bucket, object, uploadID, fi.DataDir, curpart.Number) } } // Rename the multipart object to final location. if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, bucket, object, writeQuorum); err != nil { return oi, toObjectErr(err, bucket, object) } for i := 0; i < len(onlineDisks); i++ { if onlineDisks[i] != nil && onlineDisks[i].IsOnline() { // Object info is the same in all disks, so we can pick // the first meta from online disk fi = partsMetadata[i] break } } // we are adding a new version to this object under the namespace lock, so this is the latest version. fi.IsLatest = true // Success, return object info. return fi.ToObjectInfo(bucket, object), nil } // AbortMultipartUpload - aborts an ongoing multipart operation // signified by the input uploadID. This is an atomic operation // doesn't require clients to initiate multiple such requests. // // All parts are purged from all disks and reference to the uploadID // would be removed from the system, rollback is not possible on this // operation. func (es *erasureSingle) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (err error) { if err = checkAbortMultipartArgs(ctx, bucket, object, es); err != nil { return err } lk := es.NewNSLock(bucket, pathJoin(object, uploadID)) lkctx, err := lk.GetLock(ctx, globalOperationTimeout) if err != nil { return err } ctx = lkctx.Context() defer lk.Unlock(lkctx.Cancel) // Validates if upload ID exists. if err := es.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil { return toObjectErr(err, bucket, object, uploadID) } // Cleanup all uploaded parts. es.disk.RenameFile(ctx, minioMetaMultipartBucket, es.getUploadIDDir(bucket, object, uploadID), minioMetaTmpDeletedBucket, mustGetUUID()) // Successfully purged. return nil } func (es *erasureSingle) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) { var loi ListObjectsInfo // Automatically remove the object/version is an expiry lifecycle rule can be applied lc, _ := globalLifecycleSys.Get(bucket) // Check if bucket is object locked. rcfg, _ := globalBucketObjectLockSys.Get(bucket) if len(prefix) > 0 && maxKeys == 1 && delimiter == "" && marker == "" { // Optimization for certain applications like // - Cohesity // - Actifio, Splunk etc. // which send ListObjects requests where the actual object // itself is the prefix and max-keys=1 in such scenarios // we can simply verify locally if such an object exists // to avoid the need for ListObjects(). objInfo, err := es.GetObjectInfo(ctx, bucket, prefix, ObjectOptions{NoLock: true}) if err == nil { if lc != nil { action := evalActionFromLifecycle(ctx, *lc, rcfg, objInfo, false) switch action { case lifecycle.DeleteVersionAction, lifecycle.DeleteAction: fallthrough case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction: return loi, nil } } loi.Objects = append(loi.Objects, objInfo) return loi, nil } } opts := listPathOptions{ Bucket: bucket, Prefix: prefix, Separator: delimiter, Limit: maxKeysPlusOne(maxKeys, marker != ""), Marker: marker, InclDeleted: false, AskDisks: globalAPIConfig.getListQuorum(), Lifecycle: lc, Retention: rcfg, } merged, err := es.listPath(ctx, &opts) if err != nil && err != io.EOF { if !isErrBucketNotFound(err) { logger.LogIf(ctx, err) } return loi, err } merged.forwardPast(opts.Marker) defer merged.truncate(0) // Release when returning // Default is recursive, if delimiter is set then list non recursive. objects := merged.fileInfos(bucket, prefix, delimiter) loi.IsTruncated = err == nil && len(objects) > 0 if maxKeys > 0 && len(objects) > maxKeys { objects = objects[:maxKeys] loi.IsTruncated = true } for _, obj := range objects { if obj.IsDir && obj.ModTime.IsZero() && delimiter != "" { loi.Prefixes = append(loi.Prefixes, obj.Name) } else { loi.Objects = append(loi.Objects, obj) } } if loi.IsTruncated { last := objects[len(objects)-1] loi.NextMarker = opts.encodeMarker(last.Name) } return loi, nil } func (es *erasureSingle) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (ListObjectsV2Info, error) { marker := continuationToken if marker == "" { marker = startAfter } loi, err := es.ListObjects(ctx, bucket, prefix, marker, delimiter, maxKeys) if err != nil { return ListObjectsV2Info{}, err } listObjectsV2Info := ListObjectsV2Info{ IsTruncated: loi.IsTruncated, ContinuationToken: continuationToken, NextContinuationToken: loi.NextMarker, Objects: loi.Objects, Prefixes: loi.Prefixes, } return listObjectsV2Info, err } func (es *erasureSingle) ListObjectVersions(ctx context.Context, bucket, prefix, marker, versionMarker, delimiter string, maxKeys int) (ListObjectVersionsInfo, error) { loi := ListObjectVersionsInfo{} if marker == "" && versionMarker != "" { return loi, NotImplemented{} } opts := listPathOptions{ Bucket: bucket, Prefix: prefix, Separator: delimiter, Limit: maxKeysPlusOne(maxKeys, marker != ""), Marker: marker, InclDeleted: true, AskDisks: "strict", Versioned: true, } merged, err := es.listPath(ctx, &opts) if err != nil && err != io.EOF { return loi, err } defer merged.truncate(0) // Release when returning if versionMarker == "" { o := listPathOptions{Marker: marker} // If we are not looking for a specific version skip it. o.parseMarker() merged.forwardPast(o.Marker) } objects := merged.fileInfoVersions(bucket, prefix, delimiter, versionMarker) loi.IsTruncated = err == nil && len(objects) > 0 if maxKeys > 0 && len(objects) > maxKeys { objects = objects[:maxKeys] loi.IsTruncated = true } for _, obj := range objects { if obj.IsDir && obj.ModTime.IsZero() && delimiter != "" { loi.Prefixes = append(loi.Prefixes, obj.Name) } else { loi.Objects = append(loi.Objects, obj) } } if loi.IsTruncated { last := objects[len(objects)-1] loi.NextMarker = opts.encodeMarker(last.Name) loi.NextVersionIDMarker = last.VersionID } return loi, nil } // Walk a bucket, optionally prefix recursively, until we have returned // all the content to objectInfo channel, it is callers responsibility // to allocate a receive channel for ObjectInfo, upon any unhandled // error walker returns error. Optionally if context.Done() is received // then Walk() stops the walker. func (es *erasureSingle) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts ObjectOptions) error { if err := checkListObjsArgs(ctx, bucket, prefix, "", es); err != nil { // Upon error close the channel. close(results) return err } ctx, cancel := context.WithCancel(ctx) go func() { defer cancel() defer close(results) var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() loadEntry := func(entry metaCacheEntry) { if entry.isDir() { return } fivs, err := entry.fileInfoVersions(bucket) if err != nil { cancel() return } if opts.WalkAscending { for i := len(fivs.Versions) - 1; i >= 0; i-- { version := fivs.Versions[i] results <- version.ToObjectInfo(bucket, version.Name) } return } for _, version := range fivs.Versions { results <- version.ToObjectInfo(bucket, version.Name) } } // How to resolve partial results. resolver := metadataResolutionParams{ dirQuorum: 1, objQuorum: 1, bucket: bucket, } path := baseDirFromPrefix(prefix) filterPrefix := strings.Trim(strings.TrimPrefix(prefix, path), slashSeparator) if path == prefix { filterPrefix = "" } lopts := listPathRawOptions{ disks: []StorageAPI{es.disk}, bucket: bucket, path: path, filterPrefix: filterPrefix, recursive: true, forwardTo: "", minDisks: 1, reportNotFound: false, agreed: loadEntry, partial: func(entries metaCacheEntries, nAgreed int, errs []error) { entry, ok := entries.resolve(&resolver) if !ok { // check if we can get one entry atleast // proceed to heal nonetheless. entry, _ = entries.firstFound() } loadEntry(*entry) }, finished: nil, } if err := listPathRaw(ctx, lopts); err != nil { logger.LogIf(ctx, fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts)) return } }() wg.Wait() }() return nil } // nsScanner will start scanning buckets and send updated totals as they are traversed. // Updates are sent on a regular basis and the caller *must* consume them. func (es *erasureSingle) nsScanner(ctx context.Context, buckets []BucketInfo, bf *bloomFilter, wantCycle uint32, updates chan<- dataUsageCache, healScanMode madmin.HealScanMode) error { if len(buckets) == 0 { return nil } // Collect disks we can use. disks := []StorageAPI{es.disk} // Load bucket totals oldCache := dataUsageCache{} if err := oldCache.load(ctx, es, dataUsageCacheName); err != nil { return err } // New cache.. cache := dataUsageCache{ Info: dataUsageCacheInfo{ Name: dataUsageRoot, NextCycle: oldCache.Info.NextCycle, }, Cache: make(map[string]dataUsageEntry, len(oldCache.Cache)), } bloom := bf.bytes() // Put all buckets into channel. bucketCh := make(chan BucketInfo, len(buckets)) // Add new buckets first for _, b := range buckets { if oldCache.find(b.Name) == nil { bucketCh <- b } } // Add existing buckets. for _, b := range buckets { e := oldCache.find(b.Name) if e != nil { cache.replace(b.Name, dataUsageRoot, *e) bucketCh <- b } } close(bucketCh) bucketResults := make(chan dataUsageEntryInfo, len(disks)) // Start async collector/saver. // This goroutine owns the cache. var saverWg sync.WaitGroup saverWg.Add(1) go func() { // Add jitter to the update time so multiple sets don't sync up. updateTime := 30*time.Second + time.Duration(float64(10*time.Second)*rand.Float64()) t := time.NewTicker(updateTime) defer t.Stop() defer saverWg.Done() var lastSave time.Time for { select { case <-ctx.Done(): // Return without saving. return case <-t.C: if cache.Info.LastUpdate.Equal(lastSave) { continue } logger.LogIf(ctx, cache.save(ctx, es, dataUsageCacheName)) updates <- cache.clone() lastSave = cache.Info.LastUpdate case v, ok := <-bucketResults: if !ok { // Save final state... cache.Info.NextCycle = wantCycle cache.Info.LastUpdate = time.Now() logger.LogIf(ctx, cache.save(ctx, es, dataUsageCacheName)) updates <- cache return } cache.replace(v.Name, v.Parent, v.Entry) cache.Info.LastUpdate = time.Now() } } }() // Shuffle disks to ensure a total randomness of bucket/disk association to ensure // that objects that are not present in all disks are accounted and ILM applied. r := rand.New(rand.NewSource(time.Now().UnixNano())) r.Shuffle(len(disks), func(i, j int) { disks[i], disks[j] = disks[j], disks[i] }) // Start one scanner per disk var wg sync.WaitGroup wg.Add(len(disks)) for i := range disks { go func(i int) { defer wg.Done() disk := disks[i] for bucket := range bucketCh { select { case <-ctx.Done(): return default: } // Load cache for bucket cacheName := pathJoin(bucket.Name, dataUsageCacheName) cache := dataUsageCache{} logger.LogIf(ctx, cache.load(ctx, es, cacheName)) if cache.Info.Name == "" { cache.Info.Name = bucket.Name } cache.Info.BloomFilter = bloom cache.Info.SkipHealing = true cache.Info.NextCycle = wantCycle if cache.Info.Name != bucket.Name { logger.LogIf(ctx, fmt.Errorf("cache name mismatch: %s != %s", cache.Info.Name, bucket.Name)) cache.Info = dataUsageCacheInfo{ Name: bucket.Name, LastUpdate: time.Time{}, NextCycle: wantCycle, } } // Collect updates. updates := make(chan dataUsageEntry, 1) var wg sync.WaitGroup wg.Add(1) go func(name string) { defer wg.Done() for update := range updates { bucketResults <- dataUsageEntryInfo{ Name: name, Parent: dataUsageRoot, Entry: update, } } }(cache.Info.Name) // Calc usage before := cache.Info.LastUpdate var err error cache, err = disk.NSScanner(ctx, cache, updates, healScanMode) cache.Info.BloomFilter = nil if err != nil { if !cache.Info.LastUpdate.IsZero() && cache.Info.LastUpdate.After(before) { logger.LogIf(ctx, cache.save(ctx, es, cacheName)) } else { logger.LogIf(ctx, err) } // This ensures that we don't close // bucketResults channel while the // updates-collector goroutine still // holds a reference to this. wg.Wait() continue } wg.Wait() var root dataUsageEntry if r := cache.root(); r != nil { root = cache.flatten(*r) } t := time.Now() bucketResults <- dataUsageEntryInfo{ Name: cache.Info.Name, Parent: dataUsageRoot, Entry: root, } // We want to avoid synchronizing up all writes in case // the results are piled up. time.Sleep(time.Duration(float64(time.Since(t)) * rand.Float64())) // Save cache logger.LogIf(ctx, cache.save(ctx, es, cacheName)) } }(i) } wg.Wait() close(bucketResults) saverWg.Wait() return nil } func (es *erasureSingle) NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo, wantCycle uint32, healScanMode madmin.HealScanMode) error { // Updates must be closed before we return. defer close(updates) ctx, cancel := context.WithCancel(ctx) defer cancel() var wg sync.WaitGroup var mu sync.Mutex results := make([]dataUsageCache, 1) var firstErr error allBuckets, err := es.ListBuckets(ctx) if err != nil { return err } if len(allBuckets) == 0 { updates <- DataUsageInfo{} // no buckets found update data usage to reflect latest state return nil } // Scanner latest allBuckets first. sort.Slice(allBuckets, func(i, j int) bool { return allBuckets[i].Created.After(allBuckets[j].Created) }) wg.Add(1) go func() { updates := make(chan dataUsageCache, 1) defer close(updates) // Start update collector. go func() { defer wg.Done() for info := range updates { mu.Lock() results[0] = info mu.Unlock() } }() // Start scanner. Blocks until done. err := es.nsScanner(ctx, allBuckets, bf, wantCycle, updates, healScanMode) if err != nil { logger.LogIf(ctx, err) mu.Lock() if firstErr == nil { firstErr = err } // Cancel remaining... cancel() mu.Unlock() return } }() updateCloser := make(chan chan struct{}) go func() { updateTicker := time.NewTicker(30 * time.Second) defer updateTicker.Stop() var lastUpdate time.Time // We need to merge since we will get the same buckets from each pool. // Therefore to get the exact bucket sizes we must merge before we can convert. var allMerged dataUsageCache update := func() { mu.Lock() defer mu.Unlock() allMerged = dataUsageCache{Info: dataUsageCacheInfo{Name: dataUsageRoot}} for _, info := range results { if info.Info.LastUpdate.IsZero() { // Not filled yet. return } allMerged.merge(info) } if allMerged.root() != nil && allMerged.Info.LastUpdate.After(lastUpdate) { updates <- allMerged.dui(allMerged.Info.Name, allBuckets) lastUpdate = allMerged.Info.LastUpdate } } for { select { case <-ctx.Done(): return case v := <-updateCloser: update() close(v) return case <-updateTicker.C: update() } } }() wg.Wait() ch := make(chan struct{}) select { case updateCloser <- ch: <-ch case <-ctx.Done(): if firstErr == nil { firstErr = ctx.Err() } } return firstErr }