minio/cmd/erasure-single-drive.go
Anis Elleuch b3edb25377
bloom: healObject to mark a path dirty only for dangling objects (#15458)
The path is marked dirty automatically when healObject() is called, which is
wrong. HealObject() is called during self-healing and this will lead to
an increase in the false positive result of the bloom filter.

Also move NSUpdated() from renameData() and call it directly in
CompleteMultipart and PutObject, this is not a functional change but
it will make it less prone to errors in the future.
2022-08-02 16:57:39 -07:00

3454 lines
107 KiB
Go

// Copyright (c) 2015-2022 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"math/rand"
"net/http"
"os"
"path"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/dustin/go-humanize"
"github.com/klauspost/readahead"
"github.com/minio/madmin-go"
"github.com/minio/minio-go/v7/pkg/s3utils"
"github.com/minio/minio-go/v7/pkg/set"
"github.com/minio/minio-go/v7/pkg/tags"
"github.com/minio/minio/internal/bpool"
"github.com/minio/minio/internal/bucket/lifecycle"
"github.com/minio/minio/internal/bucket/object/lock"
"github.com/minio/minio/internal/bucket/replication"
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/hash"
xhttp "github.com/minio/minio/internal/http"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/sync/errgroup"
"github.com/minio/pkg/mimedb"
)
// erasureSingle - Implements single drive XL layer
type erasureSingle struct {
GatewayUnsupported
disk StorageAPI
endpoint Endpoint
// Locker mutex map.
nsMutex *nsLockMap
// Byte pools used for temporary i/o buffers.
bp *bpool.BytePoolCap
deletedCleanupSleeper *dynamicSleeper
// Shut down async operations
shutdown context.CancelFunc
format *formatErasureV3
}
// Initialize new set of erasure coded sets.
func newErasureSingle(ctx context.Context, storageDisk StorageAPI, format *formatErasureV3) (ObjectLayer, error) {
// Number of buffers, max 2GB
n := (2 * humanize.GiByte) / (blockSizeV2 * 2)
// Initialize byte pool once for all sets, bpool size is set to
// setCount * setDriveCount with each memory upto blockSizeV2.
bp := bpool.NewBytePoolCap(n, blockSizeV2, blockSizeV2*2)
// Initialize the erasure sets instance.
s := &erasureSingle{
disk: storageDisk,
endpoint: storageDisk.Endpoint(),
format: format,
nsMutex: newNSLock(false),
bp: bp,
deletedCleanupSleeper: newDynamicSleeper(10, 2*time.Second, false),
}
// start cleanup stale uploads go-routine.
go s.cleanupStaleUploads(ctx)
// start cleanup of deleted objects.
go s.cleanupDeletedObjects(ctx)
ctx, s.shutdown = context.WithCancel(ctx)
go intDataUpdateTracker.start(ctx, s.endpoint.Path)
return s, nil
}
// List all buckets from one of the set, we are not doing merge
// sort here just for simplification. As per design it is assumed
// that all buckets are present on all sets.
func (es *erasureSingle) ListBuckets(ctx context.Context, opts BucketOptions) (buckets []BucketInfo, err error) {
var listBuckets []BucketInfo
healBuckets := map[string]VolInfo{}
// lists all unique buckets across drives.
if err := listAllBuckets(ctx, []StorageAPI{es.disk}, healBuckets, 0); err != nil {
return nil, err
}
// include deleted buckets in listBuckets output
deletedBuckets := map[string]VolInfo{}
if opts.Deleted {
// lists all deleted buckets across drives.
if err := listDeletedBuckets(ctx, []StorageAPI{es.disk}, deletedBuckets, 0); err != nil {
return nil, err
}
}
for _, v := range healBuckets {
bi := BucketInfo{
Name: v.Name,
Created: v.Created,
}
if vi, ok := deletedBuckets[v.Name]; ok {
bi.Deleted = vi.Created
}
listBuckets = append(listBuckets, bi)
}
for _, v := range deletedBuckets {
if _, ok := healBuckets[v.Name]; !ok {
listBuckets = append(listBuckets, BucketInfo{
Name: v.Name,
Deleted: v.Created,
})
}
}
sort.Slice(listBuckets, func(i, j int) bool {
return listBuckets[i].Name < listBuckets[j].Name
})
for i := range listBuckets {
meta, err := globalBucketMetadataSys.Get(listBuckets[i].Name)
if err == nil {
listBuckets[i].Created = meta.Created
}
}
return listBuckets, nil
}
func (es *erasureSingle) cleanupStaleUploads(ctx context.Context) {
timer := time.NewTimer(globalAPIConfig.getStaleUploadsCleanupInterval())
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
case <-timer.C:
es.cleanupStaleUploadsOnDisk(ctx, es.disk, globalAPIConfig.getStaleUploadsExpiry())
// Reset for the next interval
timer.Reset(globalAPIConfig.getStaleUploadsCleanupInterval())
}
}
}
// cleanup ".trash/" folder every 5m minutes with sufficient sleep cycles, between each
// deletes a dynamic sleeper is used with a factor of 10 ratio with max delay between
// deletes to be 2 seconds.
func (es *erasureSingle) cleanupDeletedObjects(ctx context.Context) {
timer := time.NewTimer(globalAPIConfig.getDeleteCleanupInterval())
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
case <-timer.C:
es.cleanupDeletedObjectsInner(ctx)
// Reset for the next interval
timer.Reset(globalAPIConfig.getDeleteCleanupInterval())
}
}
}
// NewNSLock - initialize a new namespace RWLocker instance.
func (es *erasureSingle) NewNSLock(bucket string, objects ...string) RWLocker {
return es.nsMutex.NewNSLock(nil, bucket, objects...)
}
// Shutdown function for object storage interface.
func (es *erasureSingle) Shutdown(ctx context.Context) error {
defer es.shutdown()
// Add any object layer shutdown activities here.
closeStorageDisks(es.disk)
return nil
}
func (es *erasureSingle) SetDriveCounts() []int {
return []int{1}
}
func (es *erasureSingle) BackendInfo() (b madmin.BackendInfo) {
b.Type = madmin.Erasure
scParity := 0
rrSCParity := 0
// Data blocks can vary per pool, but parity is same.
for _, setDriveCount := range es.SetDriveCounts() {
b.StandardSCData = append(b.StandardSCData, setDriveCount-scParity)
b.RRSCData = append(b.RRSCData, setDriveCount-rrSCParity)
}
b.StandardSCParity = scParity
b.RRSCParity = rrSCParity
return
}
// StorageInfo - returns underlying storage statistics.
func (es *erasureSingle) StorageInfo(ctx context.Context) (StorageInfo, []error) {
disks := []StorageAPI{es.disk}
endpoints := []Endpoint{es.endpoint}
storageInfo, errs := getStorageInfo(disks, endpoints)
storageInfo.Backend = es.BackendInfo()
return storageInfo, errs
}
// LocalStorageInfo - returns underlying local storage statistics.
func (es *erasureSingle) LocalStorageInfo(ctx context.Context) (StorageInfo, []error) {
disks := []StorageAPI{es.disk}
endpoints := []Endpoint{es.endpoint}
var localDisks []StorageAPI
var localEndpoints []Endpoint
for i, endpoint := range endpoints {
if endpoint.IsLocal {
localDisks = append(localDisks, disks[i])
localEndpoints = append(localEndpoints, endpoint)
}
}
return getStorageInfo(localDisks, localEndpoints)
}
// Clean-up previously deleted objects. from .minio.sys/tmp/.trash/
func (es *erasureSingle) cleanupDeletedObjectsInner(ctx context.Context) {
diskPath := es.disk.Endpoint().Path
readDirFn(pathJoin(diskPath, minioMetaTmpDeletedBucket), func(ddir string, typ os.FileMode) error {
wait := es.deletedCleanupSleeper.Timer(ctx)
removeAll(pathJoin(diskPath, minioMetaTmpDeletedBucket, ddir))
wait()
return nil
})
}
func (es *erasureSingle) renameAll(ctx context.Context, bucket, prefix string) {
if es.disk != nil {
es.disk.RenameFile(ctx, bucket, prefix, minioMetaTmpDeletedBucket, mustGetUUID())
}
}
type renameAllStorager interface {
renameAll(ctx context.Context, bucket, prefix string)
}
// Bucket operations
// MakeBucket - make a bucket.
func (es *erasureSingle) MakeBucketWithLocation(ctx context.Context, bucket string, opts MakeBucketOptions) error {
defer NSUpdated(bucket, slashSeparator)
// Lock the bucket name before creating.
lk := es.NewNSLock(minioMetaTmpBucket, bucket+".lck")
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
return err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
// Verify if bucket is valid.
if !isMinioMetaBucketName(bucket) {
if err := s3utils.CheckValidBucketNameStrict(bucket); err != nil {
return BucketNameInvalid{Bucket: bucket}
}
}
if err := es.disk.MakeVol(ctx, bucket); err != nil {
if opts.ForceCreate && errors.Is(err, errVolumeExists) {
// No need to return error when force create was
// requested.
return nil
}
if !errors.Is(err, errVolumeExists) {
logger.LogIf(ctx, err)
}
return toObjectErr(err, bucket)
}
// If it doesn't exist we get a new, so ignore errors
meta := newBucketMetadata(bucket)
meta.SetCreatedAt(opts.CreatedAt)
if opts.LockEnabled {
meta.VersioningConfigXML = enabledBucketVersioningConfig
meta.ObjectLockConfigXML = enabledBucketObjectLockConfig
}
if opts.VersioningEnabled {
meta.VersioningConfigXML = enabledBucketVersioningConfig
}
if err := meta.Save(context.Background(), es); err != nil {
return toObjectErr(err, bucket)
}
globalBucketMetadataSys.Set(bucket, meta)
return nil
}
// GetBucketInfo - returns BucketInfo for a bucket.
func (es *erasureSingle) GetBucketInfo(ctx context.Context, bucket string, opts BucketOptions) (bi BucketInfo, e error) {
volInfo, err := es.disk.StatVol(ctx, bucket)
if err != nil {
if opts.Deleted {
if dvi, derr := es.disk.StatVol(ctx, pathJoin(minioMetaBucket, bucketMetaPrefix, deletedBucketsPrefix, bucket)); derr == nil {
return BucketInfo{Name: bucket, Deleted: dvi.Created}, nil
}
}
return bi, toObjectErr(err, bucket)
}
return BucketInfo{Name: volInfo.Name, Created: volInfo.Created}, nil
}
// DeleteBucket - deletes a bucket.
func (es *erasureSingle) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error {
// Collect if all disks report volume not found.
defer NSUpdated(bucket, slashSeparator)
err := es.disk.DeleteVol(ctx, bucket, opts.Force)
// Purge the entire bucket metadata entirely.
deleteBucketMetadata(ctx, es, bucket)
globalBucketMetadataSys.Remove(bucket)
if err == nil || errors.Is(err, errVolumeNotFound) {
if opts.SRDeleteOp == MarkDelete {
es.markDelete(ctx, minioMetaBucket, pathJoin(bucketMetaPrefix, deletedBucketsPrefix, bucket))
}
}
return toObjectErr(err, bucket)
}
// markDelete creates a vol entry in .minio.sys/buckets/.deleted until site replication
// syncs the delete to peers
func (es *erasureSingle) markDelete(ctx context.Context, bucket, prefix string) error {
err := es.disk.MakeVol(ctx, pathJoin(bucket, prefix))
if err != nil && errors.Is(err, errVolumeExists) {
return nil
}
return toObjectErr(err, bucket)
}
// purgeDelete deletes vol entry in .minio.sys/buckets/.deleted after site replication
// syncs the delete to peers OR on a new MakeBucket call.
func (es *erasureSingle) purgeDelete(ctx context.Context, bucket, prefix string) error {
err := es.disk.DeleteVol(ctx, pathJoin(bucket, prefix), true)
return toObjectErr(err, bucket)
}
// IsNotificationSupported returns whether bucket notification is applicable for this layer.
func (es *erasureSingle) IsNotificationSupported() bool {
return true
}
// IsListenSupported returns whether listen bucket notification is applicable for this layer.
func (es *erasureSingle) IsListenSupported() bool {
return true
}
// IsEncryptionSupported returns whether server side encryption is implemented for this layer.
func (es *erasureSingle) IsEncryptionSupported() bool {
return true
}
// IsCompressionSupported returns whether compression is applicable for this layer.
func (es *erasureSingle) IsCompressionSupported() bool {
return true
}
// IsTaggingSupported indicates whethes *erasureSingle implements tagging support.
func (es *erasureSingle) IsTaggingSupported() bool {
return true
}
// Object Operations
// CopyObject - copy object source object to destination object.
// if source object and destination object are same we only
// update metadata.
func (es *erasureSingle) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (oi ObjectInfo, err error) {
defer NSUpdated(dstBucket, dstObject)
srcObject = encodeDirObject(srcObject)
dstObject = encodeDirObject(dstObject)
cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))
if !dstOpts.NoLock {
ns := es.NewNSLock(dstBucket, dstObject)
lkctx, err := ns.GetLock(ctx, globalOperationTimeout)
if err != nil {
return ObjectInfo{}, err
}
ctx = lkctx.Context()
defer ns.Unlock(lkctx.Cancel)
dstOpts.NoLock = true
}
if cpSrcDstSame && srcInfo.metadataOnly {
// Read metadata associated with the object from all disks.
storageDisks := []StorageAPI{es.disk}
var metaArr []FileInfo
var errs []error
// Read metadata associated with the object from all disks.
if srcOpts.VersionID != "" {
metaArr, errs = readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID, true)
} else {
metaArr, errs = readAllXL(ctx, storageDisks, srcBucket, srcObject, true)
}
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, metaArr, errs, 0)
if err != nil {
return ObjectInfo{}, toObjectErr(err, srcBucket, srcObject)
}
// List all online disks.
onlineDisks, modTime := listOnlineDisks(storageDisks, metaArr, errs)
// Pick latest valid metadata.
fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum)
if err != nil {
return oi, toObjectErr(err, srcBucket, srcObject)
}
if fi.Deleted {
if srcOpts.VersionID == "" {
return oi, toObjectErr(errFileNotFound, srcBucket, srcObject)
}
return fi.ToObjectInfo(srcBucket, srcObject, srcOpts.Versioned || srcOpts.VersionSuspended), toObjectErr(errMethodNotAllowed, srcBucket, srcObject)
}
filterOnlineDisksInplace(fi, metaArr, onlineDisks)
versionID := srcInfo.VersionID
if srcInfo.versionOnly {
versionID = dstOpts.VersionID
// preserve destination versionId if specified.
if versionID == "" {
versionID = mustGetUUID()
fi.IsLatest = true // we are creating a new version so this is latest.
}
modTime = UTCNow()
}
// If the data is not inlined, we may end up incorrectly
// inlining the data here, that leads to an inconsistent
// situation where some objects are were not inlined
// were now inlined, make sure to `nil` the Data such
// that xl.meta is written as expected.
if !fi.InlineData() {
fi.Data = nil
}
fi.VersionID = versionID // set any new versionID we might have created
fi.ModTime = modTime // set modTime for the new versionID
if !dstOpts.MTime.IsZero() {
modTime = dstOpts.MTime
fi.ModTime = dstOpts.MTime
}
fi.Metadata = srcInfo.UserDefined
srcInfo.UserDefined["etag"] = srcInfo.ETag
// Update `xl.meta` content on each disks.
for index := range metaArr {
if metaArr[index].IsValid() {
metaArr[index].ModTime = modTime
metaArr[index].VersionID = versionID
metaArr[index].Metadata = srcInfo.UserDefined
if !metaArr[index].InlineData() {
// If the data is not inlined, we may end up incorrectly
// inlining the data here, that leads to an inconsistent
// situation where some objects are were not inlined
// were now inlined, make sure to `nil` the Data such
// that xl.meta is written as expected.
metaArr[index].Data = nil
}
}
}
// Write unique `xl.meta` for each disk.
if _, err = writeUniqueFileInfo(ctx, onlineDisks, srcBucket, srcObject, metaArr, writeQuorum); err != nil {
return oi, toObjectErr(err, srcBucket, srcObject)
}
return fi.ToObjectInfo(srcBucket, srcObject, srcOpts.Versioned || srcOpts.VersionSuspended), nil
}
putOpts := ObjectOptions{
ServerSideEncryption: dstOpts.ServerSideEncryption,
UserDefined: srcInfo.UserDefined,
Versioned: dstOpts.Versioned,
VersionID: dstOpts.VersionID,
MTime: dstOpts.MTime,
NoLock: true,
}
return es.PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts)
}
// GetObjectNInfo - returns object info and an object
// Read(Closer). When err != nil, the returned reader is always nil.
func (es *erasureSingle) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
if err = checkGetObjArgs(ctx, bucket, object); err != nil {
return nil, err
}
object = encodeDirObject(object)
var unlockOnDefer bool
nsUnlocker := func() {}
defer func() {
if unlockOnDefer {
nsUnlocker()
}
}()
// Acquire lock
if lockType != noLock {
lock := es.NewNSLock(bucket, object)
switch lockType {
case writeLock:
lkctx, err := lock.GetLock(ctx, globalOperationTimeout)
if err != nil {
return nil, err
}
ctx = lkctx.Context()
nsUnlocker = func() { lock.Unlock(lkctx.Cancel) }
case readLock:
lkctx, err := lock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return nil, err
}
ctx = lkctx.Context()
nsUnlocker = func() { lock.RUnlock(lkctx.Cancel) }
}
unlockOnDefer = true
}
fi, metaArr, onlineDisks, err := es.getObjectFileInfo(ctx, bucket, object, opts, true)
if err != nil {
return nil, toObjectErr(err, bucket, object)
}
objInfo := fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
if objInfo.DeleteMarker {
if opts.VersionID == "" {
return &GetObjectReader{
ObjInfo: objInfo,
}, toObjectErr(errFileNotFound, bucket, object)
}
// Make sure to return object info to provide extra information.
return &GetObjectReader{
ObjInfo: objInfo,
}, toObjectErr(errMethodNotAllowed, bucket, object)
}
if objInfo.IsRemote() {
gr, err := getTransitionedObjectReader(ctx, bucket, object, rs, h, objInfo, opts)
if err != nil {
return nil, err
}
unlockOnDefer = false
return gr.WithCleanupFuncs(nsUnlocker), nil
}
fn, off, length, err := NewGetObjectReader(rs, objInfo, opts)
if err != nil {
return nil, err
}
unlockOnDefer = false
pr, pw := xioutil.WaitPipe()
go func() {
pw.CloseWithError(es.getObjectWithFileInfo(ctx, bucket, object, off, length, pw, fi, metaArr, onlineDisks))
}()
// Cleanup function to cause the go routine above to exit, in
// case of incomplete read.
pipeCloser := func() {
pr.CloseWithError(nil)
}
return fn(pr, h, pipeCloser, nsUnlocker)
}
func (es *erasureSingle) getObjectWithFileInfo(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI) error {
// Reorder online disks based on erasure distribution ordes.
// Reorder parts metadata based on erasure distribution ordes.
onlineDisks, metaArr = shuffleDisksAndPartsMetadataByIndex(onlineDisks, metaArr, fi)
// For negative length read everything.
if length < 0 {
length = fi.Size - startOffset
}
// Reply back invalid range if the input offset and length fall out of range.
if startOffset > fi.Size || startOffset+length > fi.Size {
logger.LogIf(ctx, InvalidRange{startOffset, length, fi.Size}, logger.Application)
return InvalidRange{startOffset, length, fi.Size}
}
// Get start part index and offset.
partIndex, partOffset, err := fi.ObjectToPartOffset(ctx, startOffset)
if err != nil {
return InvalidRange{startOffset, length, fi.Size}
}
// Calculate endOffset according to length
endOffset := startOffset
if length > 0 {
endOffset += length - 1
}
// Get last part index to read given length.
lastPartIndex, _, err := fi.ObjectToPartOffset(ctx, endOffset)
if err != nil {
return InvalidRange{startOffset, length, fi.Size}
}
var totalBytesRead int64
erasure, err := NewErasure(ctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize)
if err != nil {
return toObjectErr(err, bucket, object)
}
// once we have obtained a common FileInfo i.e latest, we should stick
// to single dataDir to read the content to avoid reading from some other
// dataDir that has stale FileInfo{} to ensure that we fail appropriately
// during reads and expect the same dataDir everywhere.
dataDir := fi.DataDir
for ; partIndex <= lastPartIndex; partIndex++ {
if length == totalBytesRead {
break
}
partNumber := fi.Parts[partIndex].Number
// Save the current part name and size.
partSize := fi.Parts[partIndex].Size
partLength := partSize - partOffset
// partLength should be adjusted so that we don't write more data than what was requested.
if partLength > (length - totalBytesRead) {
partLength = length - totalBytesRead
}
tillOffset := erasure.ShardFileOffset(partOffset, partLength, partSize)
// Get the checksums of the current part.
readers := make([]io.ReaderAt, len(onlineDisks))
prefer := make([]bool, len(onlineDisks))
for index, disk := range onlineDisks {
if disk == OfflineDisk {
continue
}
if !metaArr[index].IsValid() {
continue
}
checksumInfo := metaArr[index].Erasure.GetChecksumInfo(partNumber)
partPath := pathJoin(object, dataDir, fmt.Sprintf("part.%d", partNumber))
readers[index] = newBitrotReader(disk, metaArr[index].Data, bucket, partPath, tillOffset,
checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize())
// Prefer local disks
prefer[index] = disk.Hostname() == ""
}
_, err = erasure.Decode(ctx, writer, readers, partOffset, partLength, partSize, prefer)
// Note: we should not be defer'ing the following closeBitrotReaders() call as
// we are inside a for loop i.e if we use defer, we would accumulate a lot of open files by the time
// we return from this function.
closeBitrotReaders(readers)
if err != nil {
return toObjectErr(err, bucket, object)
}
for i, r := range readers {
if r == nil {
onlineDisks[i] = OfflineDisk
}
}
// Track total bytes read from disk and written to the client.
totalBytesRead += partLength
// partOffset will be valid only for the first part, hence reset it to 0 for
// the remaining parts.
partOffset = 0
} // End of read all parts loop.
// Return success.
return nil
}
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
func (es *erasureSingle) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (info ObjectInfo, err error) {
if err = checkGetObjArgs(ctx, bucket, object); err != nil {
return info, err
}
object = encodeDirObject(object)
if !opts.NoLock {
// Lock the object before reading.
lk := es.NewNSLock(bucket, object)
lkctx, err := lk.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return ObjectInfo{}, err
}
ctx = lkctx.Context()
defer lk.RUnlock(lkctx.Cancel)
}
return es.getObjectInfo(ctx, bucket, object, opts)
}
func (es *erasureSingle) getObjectFileInfo(ctx context.Context, bucket, object string, opts ObjectOptions, readData bool) (fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI, err error) {
disks := []StorageAPI{es.disk}
var errs []error
// Read metadata associated with the object from all disks.
metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, readData)
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, 0)
if err != nil {
return fi, nil, nil, toObjectErr(err, bucket, object)
}
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
return fi, nil, nil, toObjectErr(reducedErr, bucket, object)
}
// List all online disks.
onlineDisks, modTime := listOnlineDisks(disks, metaArr, errs)
// Pick latest valid metadata.
fi, err = pickValidFileInfo(ctx, metaArr, modTime, readQuorum)
if err != nil {
return fi, nil, nil, err
}
filterOnlineDisksInplace(fi, metaArr, onlineDisks)
return fi, metaArr, onlineDisks, nil
}
// getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo.
func (es *erasureSingle) getObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
fi, _, _, err := es.getObjectFileInfo(ctx, bucket, object, opts, false)
if err != nil {
return objInfo, toObjectErr(err, bucket, object)
}
objInfo = fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
if fi.Deleted {
if opts.VersionID == "" || opts.DeleteMarker {
return objInfo, toObjectErr(errFileNotFound, bucket, object)
}
// Make sure to return object info to provide extra information.
return objInfo, toObjectErr(errMethodNotAllowed, bucket, object)
}
return objInfo, nil
}
// getObjectInfoAndQuroum - wrapper for reading object metadata and constructs ObjectInfo, additionally returns write quorum for the object.
func (es *erasureSingle) getObjectInfoAndQuorum(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, wquorum int, err error) {
fi, _, _, err := es.getObjectFileInfo(ctx, bucket, object, opts, false)
if err != nil {
return objInfo, 1, toObjectErr(err, bucket, object)
}
wquorum = fi.Erasure.DataBlocks
if fi.Erasure.DataBlocks == fi.Erasure.ParityBlocks {
wquorum++
}
objInfo = fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
if !fi.VersionPurgeStatus().Empty() && opts.VersionID != "" {
// Make sure to return object info to provide extra information.
return objInfo, wquorum, toObjectErr(errMethodNotAllowed, bucket, object)
}
if fi.Deleted {
if opts.VersionID == "" || opts.DeleteMarker {
return objInfo, wquorum, toObjectErr(errFileNotFound, bucket, object)
}
// Make sure to return object info to provide extra information.
return objInfo, wquorum, toObjectErr(errMethodNotAllowed, bucket, object)
}
return objInfo, wquorum, nil
}
func (es *erasureSingle) putMetacacheObject(ctx context.Context, key string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
data := r.Reader
// No metadata is set, allocate a new one.
if opts.UserDefined == nil {
opts.UserDefined = make(map[string]string)
}
storageDisks := []StorageAPI{es.disk}
// Get parity and data drive count based on storage class metadata
parityDrives := 0
dataDrives := len(storageDisks) - parityDrives
// we now know the number of blocks this object needs for data and parity.
// writeQuorum is dataBlocks + 1
writeQuorum := dataDrives
if dataDrives == parityDrives {
writeQuorum++
}
// Validate input data size and it can never be less than zero.
if data.Size() < -1 {
logger.LogIf(ctx, errInvalidArgument, logger.Application)
return ObjectInfo{}, toObjectErr(errInvalidArgument)
}
// Initialize parts metadata
partsMetadata := make([]FileInfo, len(storageDisks))
fi := newFileInfo(pathJoin(minioMetaBucket, key), dataDrives, parityDrives)
fi.DataDir = mustGetUUID()
// Initialize erasure metadata.
for index := range partsMetadata {
partsMetadata[index] = fi
}
// Order disks according to erasure distribution
var onlineDisks []StorageAPI
onlineDisks, partsMetadata = shuffleDisksAndPartsMetadata(storageDisks, partsMetadata, fi)
erasure, err := NewErasure(ctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize)
if err != nil {
return ObjectInfo{}, toObjectErr(err, minioMetaBucket, key)
}
// Fetch buffer for I/O, returns from the pool if not allocates a new one and returns.
var buffer []byte
switch size := data.Size(); {
case size == 0:
buffer = make([]byte, 1) // Allocate atleast a byte to reach EOF
case size >= fi.Erasure.BlockSize:
buffer = es.bp.Get()
defer es.bp.Put(buffer)
case size < fi.Erasure.BlockSize:
// No need to allocate fully blockSizeV1 buffer if the incoming data is smaller.
buffer = make([]byte, size, 2*size+int64(fi.Erasure.ParityBlocks+fi.Erasure.DataBlocks-1))
}
if len(buffer) > int(fi.Erasure.BlockSize) {
buffer = buffer[:fi.Erasure.BlockSize]
}
shardFileSize := erasure.ShardFileSize(data.Size())
writers := make([]io.Writer, len(onlineDisks))
inlineBuffers := make([]*bytes.Buffer, len(onlineDisks))
for i, disk := range onlineDisks {
if disk == nil {
continue
}
if disk.IsOnline() {
inlineBuffers[i] = bytes.NewBuffer(make([]byte, 0, shardFileSize))
writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize())
}
}
n, erasureErr := erasure.Encode(ctx, data, writers, buffer, writeQuorum)
closeBitrotWriters(writers)
if erasureErr != nil {
return ObjectInfo{}, toObjectErr(erasureErr, minioMetaBucket, key)
}
// Should return IncompleteBody{} error when reader has fewer bytes
// than specified in request header.
if n < data.Size() {
return ObjectInfo{}, IncompleteBody{Bucket: minioMetaBucket, Object: key}
}
var index []byte
if opts.IndexCB != nil {
index = opts.IndexCB()
}
modTime := UTCNow()
for i, w := range writers {
if w == nil {
// Make sure to avoid writing to disks which we couldn't complete in erasure.Encode()
onlineDisks[i] = nil
continue
}
partsMetadata[i].Data = inlineBuffers[i].Bytes()
partsMetadata[i].AddObjectPart(1, "", n, data.ActualSize(), modTime, index)
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{
PartNumber: 1,
Algorithm: DefaultBitrotAlgorithm,
Hash: bitrotWriterSum(w),
})
}
// Fill all the necessary metadata.
// Update `xl.meta` content on each disks.
for index := range partsMetadata {
partsMetadata[index].Size = n
partsMetadata[index].Fresh = true
partsMetadata[index].ModTime = modTime
partsMetadata[index].Metadata = opts.UserDefined
}
// Set an additional header when data is inlined.
for index := range partsMetadata {
partsMetadata[index].SetInlineData()
}
for i := 0; i < len(onlineDisks); i++ {
if onlineDisks[i] != nil && onlineDisks[i].IsOnline() {
// Object info is the same in all disks, so we can pick
// the first meta from online disk
fi = partsMetadata[i]
break
}
}
if _, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaBucket, key, partsMetadata, writeQuorum); err != nil {
return ObjectInfo{}, toObjectErr(err, minioMetaBucket, key)
}
return fi.ToObjectInfo(minioMetaBucket, key, opts.Versioned || opts.VersionSuspended), nil
}
// PutObject - creates an object upon reading from the input stream
// until EOF, erasure codes the data across all disk and additionally
// writes `xl.meta` which carries the necessary metadata for future
// object operations.
func (es *erasureSingle) PutObject(ctx context.Context, bucket string, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
// Validate put object input args.
if err := checkPutObjectArgs(ctx, bucket, object, es); err != nil {
return ObjectInfo{}, err
}
object = encodeDirObject(object)
if !isMinioMetaBucketName(bucket) && !hasSpaceFor(getDiskInfos(ctx, es.disk), data.Size()) {
return ObjectInfo{}, toObjectErr(errDiskFull)
}
return es.putObject(ctx, bucket, object, data, opts)
}
// putObject wrapper for erasureObjects PutObject
func (es *erasureSingle) putObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
data := r.Reader
// No metadata is set, allocate a new one.
if opts.UserDefined == nil {
opts.UserDefined = make(map[string]string)
}
storageDisks := []StorageAPI{es.disk}
parityDrives := 0
dataDrives := len(storageDisks) - parityDrives
// we now know the number of blocks this object needs for data and parity.
// writeQuorum is dataBlocks + 1
writeQuorum := dataDrives
if dataDrives == parityDrives {
writeQuorum++
}
// Validate input data size and it can never be less than zero.
if data.Size() < -1 {
logger.LogIf(ctx, errInvalidArgument, logger.Application)
return ObjectInfo{}, toObjectErr(errInvalidArgument)
}
// Initialize parts metadata
partsMetadata := make([]FileInfo, len(storageDisks))
fi := newFileInfo(pathJoin(bucket, object), dataDrives, parityDrives)
fi.VersionID = opts.VersionID
if opts.Versioned && fi.VersionID == "" {
fi.VersionID = mustGetUUID()
}
fi.DataDir = mustGetUUID()
uniqueID := mustGetUUID()
tempObj := uniqueID
// Initialize erasure metadata.
for index := range partsMetadata {
partsMetadata[index] = fi
}
// Order disks according to erasure distribution
var onlineDisks []StorageAPI
onlineDisks, partsMetadata = shuffleDisksAndPartsMetadata(storageDisks, partsMetadata, fi)
erasure, err := NewErasure(ctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize)
if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
// Fetch buffer for I/O, returns from the pool if not allocates a new one and returns.
var buffer []byte
switch size := data.Size(); {
case size == 0:
buffer = make([]byte, 1) // Allocate atleast a byte to reach EOF
case size == -1:
if size := data.ActualSize(); size > 0 && size < fi.Erasure.BlockSize {
buffer = make([]byte, data.ActualSize()+256, data.ActualSize()*2+512)
} else {
buffer = es.bp.Get()
defer es.bp.Put(buffer)
}
case size >= fi.Erasure.BlockSize:
buffer = es.bp.Get()
defer es.bp.Put(buffer)
case size < fi.Erasure.BlockSize:
// No need to allocate fully blockSizeV1 buffer if the incoming data is smaller.
buffer = make([]byte, size, 2*size+int64(fi.Erasure.ParityBlocks+fi.Erasure.DataBlocks-1))
}
if len(buffer) > int(fi.Erasure.BlockSize) {
buffer = buffer[:fi.Erasure.BlockSize]
}
partName := "part.1"
tempErasureObj := pathJoin(uniqueID, fi.DataDir, partName)
// Delete temporary object in the event of failure.
// If PutObject succeeded there would be no temporary
// object to delete.
var online int
defer func() {
if online != len(onlineDisks) {
es.disk.RenameFile(context.Background(), minioMetaTmpBucket, tempObj, minioMetaTmpDeletedBucket, mustGetUUID())
}
}()
shardFileSize := erasure.ShardFileSize(data.Size())
writers := make([]io.Writer, len(onlineDisks))
var inlineBuffers []*bytes.Buffer
if shardFileSize >= 0 {
if !opts.Versioned && shardFileSize < smallFileThreshold {
inlineBuffers = make([]*bytes.Buffer, len(onlineDisks))
} else if shardFileSize < smallFileThreshold/8 {
inlineBuffers = make([]*bytes.Buffer, len(onlineDisks))
}
} else {
// If compressed, use actual size to determine.
if sz := erasure.ShardFileSize(data.ActualSize()); sz > 0 {
if !opts.Versioned && sz < smallFileThreshold {
inlineBuffers = make([]*bytes.Buffer, len(onlineDisks))
} else if sz < smallFileThreshold/8 {
inlineBuffers = make([]*bytes.Buffer, len(onlineDisks))
}
}
}
for i, disk := range onlineDisks {
if disk == nil {
continue
}
if !disk.IsOnline() {
continue
}
if len(inlineBuffers) > 0 {
sz := shardFileSize
if sz < 0 {
sz = data.ActualSize()
}
inlineBuffers[i] = bytes.NewBuffer(make([]byte, 0, sz))
writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize())
continue
}
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tempErasureObj, shardFileSize, DefaultBitrotAlgorithm, erasure.ShardSize())
}
toEncode := io.Reader(data)
if data.Size() > bigFileThreshold {
// We use 2 buffers, so we always have a full buffer of input.
bufA := es.bp.Get()
bufB := es.bp.Get()
defer es.bp.Put(bufA)
defer es.bp.Put(bufB)
ra, err := readahead.NewReaderBuffer(data, [][]byte{bufA[:fi.Erasure.BlockSize], bufB[:fi.Erasure.BlockSize]})
if err == nil {
toEncode = ra
defer ra.Close()
}
logger.LogIf(ctx, err)
}
n, erasureErr := erasure.Encode(ctx, toEncode, writers, buffer, writeQuorum)
closeBitrotWriters(writers)
if erasureErr != nil {
return ObjectInfo{}, toObjectErr(erasureErr, minioMetaTmpBucket, tempErasureObj)
}
// Should return IncompleteBody{} error when reader has fewer bytes
// than specified in request header.
if n < data.Size() {
return ObjectInfo{}, IncompleteBody{Bucket: bucket, Object: object}
}
if !opts.NoLock {
lk := es.NewNSLock(bucket, object)
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
return ObjectInfo{}, err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
}
var index []byte
if opts.IndexCB != nil {
index = opts.IndexCB()
}
modTime := opts.MTime
if opts.MTime.IsZero() {
modTime = UTCNow()
}
for i, w := range writers {
if w == nil {
onlineDisks[i] = nil
continue
}
if len(inlineBuffers) > 0 && inlineBuffers[i] != nil {
partsMetadata[i].Data = inlineBuffers[i].Bytes()
} else {
partsMetadata[i].Data = nil
}
partsMetadata[i].AddObjectPart(1, "", n, data.ActualSize(), modTime, index)
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{
PartNumber: 1,
Algorithm: DefaultBitrotAlgorithm,
Hash: bitrotWriterSum(w),
})
}
if opts.UserDefined["etag"] == "" {
opts.UserDefined["etag"] = r.MD5CurrentHexString()
}
// Guess content-type from the extension if possible.
if opts.UserDefined["content-type"] == "" {
opts.UserDefined["content-type"] = mimedb.TypeByExtension(path.Ext(object))
}
// Fill all the necessary metadata.
// Update `xl.meta` content on each disks.
for index := range partsMetadata {
partsMetadata[index].Metadata = opts.UserDefined
partsMetadata[index].Size = n
partsMetadata[index].ModTime = modTime
}
if len(inlineBuffers) > 0 {
// Set an additional header when data is inlined.
for index := range partsMetadata {
partsMetadata[index].SetInlineData()
}
}
// Rename the successfully written temporary object to final location.
if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, bucket, object, writeQuorum); err != nil {
if errors.Is(err, errFileNotFound) {
return ObjectInfo{}, toObjectErr(errErasureWriteQuorum, bucket, object)
}
logger.LogIf(ctx, err)
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
defer NSUpdated(bucket, object)
for i := 0; i < len(onlineDisks); i++ {
if onlineDisks[i] != nil && onlineDisks[i].IsOnline() {
// Object info is the same in all disks, so we can pick
// the first meta from online disk
fi = partsMetadata[i]
break
}
}
fi.ReplicationState = opts.PutReplicationState()
online = countOnlineDisks(onlineDisks)
// we are adding a new version to this object under the namespace lock, so this is the latest version.
fi.IsLatest = true
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
}
func (es *erasureSingle) deleteObjectVersion(ctx context.Context, bucket, object string, writeQuorum int, fi FileInfo, forceDelMarker bool) error {
return es.disk.DeleteVersion(ctx, bucket, object, fi, forceDelMarker)
}
// DeleteObjects deletes objects/versions in bulk, this function will still automatically split objects list
// into smaller bulks if some object names are found to be duplicated in the delete list, splitting
// into smaller bulks will avoid holding twice the write lock of the duplicated object names.
func (es *erasureSingle) DeleteObjects(ctx context.Context, bucket string, objects []ObjectToDelete, opts ObjectOptions) ([]DeletedObject, []error) {
errs := make([]error, len(objects))
dobjects := make([]DeletedObject, len(objects))
objSets := set.NewStringSet()
for i := range errs {
objects[i].ObjectName = encodeDirObject(objects[i].ObjectName)
errs[i] = checkDelObjArgs(ctx, bucket, objects[i].ObjectName)
objSets.Add(objects[i].ObjectName)
}
// Acquire a bulk write lock across 'objects'
multiDeleteLock := es.NewNSLock(bucket, objSets.ToSlice()...)
lkctx, err := multiDeleteLock.GetLock(ctx, globalOperationTimeout)
if err != nil {
for i := range errs {
errs[i] = err
}
return dobjects, errs
}
ctx = lkctx.Context()
defer multiDeleteLock.Unlock(lkctx.Cancel)
writeQuorums := make([]int, len(objects))
storageDisks := []StorageAPI{es.disk}
for i := range objects {
// Single drive write quorum is '1'
writeQuorums[i] = 1
}
versionsMap := make(map[string]FileInfoVersions, len(objects))
for i := range objects {
// Construct the FileInfo data that needs to be preserved on the disk.
vr := FileInfo{
Name: objects[i].ObjectName,
VersionID: objects[i].VersionID,
ReplicationState: objects[i].ReplicationState(),
// save the index to set correct error at this index.
Idx: i,
}
vr.SetTierFreeVersionID(mustGetUUID())
// VersionID is not set means delete is not specific about
// any version, look for if the bucket is versioned or not.
if objects[i].VersionID == "" {
// MinIO extension to bucket version configuration
suspended := opts.VersionSuspended
versioned := opts.Versioned
if opts.PrefixEnabledFn != nil {
versioned = opts.PrefixEnabledFn(objects[i].ObjectName)
}
if versioned || suspended {
// Bucket is versioned and no version was explicitly
// mentioned for deletes, create a delete marker instead.
vr.ModTime = UTCNow()
vr.Deleted = true
// Versioning suspended means that we add a `null` version
// delete marker, if not add a new version for this delete
// marker.
if versioned {
vr.VersionID = mustGetUUID()
}
}
}
// De-dup same object name to collect multiple versions for same object.
v, ok := versionsMap[objects[i].ObjectName]
if ok {
v.Versions = append(v.Versions, vr)
} else {
v = FileInfoVersions{
Name: vr.Name,
Versions: []FileInfo{vr},
}
}
if vr.Deleted {
dobjects[i] = DeletedObject{
DeleteMarker: vr.Deleted,
DeleteMarkerVersionID: vr.VersionID,
DeleteMarkerMTime: DeleteMarkerMTime{vr.ModTime},
ObjectName: vr.Name,
ReplicationState: vr.ReplicationState,
}
} else {
dobjects[i] = DeletedObject{
ObjectName: vr.Name,
VersionID: vr.VersionID,
ReplicationState: vr.ReplicationState,
}
}
versionsMap[objects[i].ObjectName] = v
}
dedupVersions := make([]FileInfoVersions, 0, len(versionsMap))
for _, version := range versionsMap {
dedupVersions = append(dedupVersions, version)
}
// Initialize list of errors.
delObjErrs := make([][]error, len(storageDisks))
var wg sync.WaitGroup
// Remove versions in bulk for each disk
for index, disk := range storageDisks {
wg.Add(1)
go func(index int, disk StorageAPI) {
defer wg.Done()
delObjErrs[index] = make([]error, len(objects))
if disk == nil {
for i := range objects {
delObjErrs[index][i] = errDiskNotFound
}
return
}
errs := disk.DeleteVersions(ctx, bucket, dedupVersions)
for i, err := range errs {
if err == nil {
continue
}
for _, v := range dedupVersions[i].Versions {
if err == errFileNotFound || err == errFileVersionNotFound {
if !dobjects[v.Idx].DeleteMarker {
// Not delete marker, if not found, ok.
continue
}
}
delObjErrs[index][v.Idx] = err
}
}
}(index, disk)
}
wg.Wait()
// Reduce errors for each object
for objIndex := range objects {
diskErrs := make([]error, len(storageDisks))
// Iterate over disks to fetch the error
// of deleting of the current object
for i := range delObjErrs {
// delObjErrs[i] is not nil when disks[i] is also not nil
if delObjErrs[i] != nil {
diskErrs[i] = delObjErrs[i][objIndex]
}
}
err := reduceWriteQuorumErrs(ctx, diskErrs, objectOpIgnoredErrs, writeQuorums[objIndex])
if objects[objIndex].VersionID != "" {
errs[objIndex] = toObjectErr(err, bucket, objects[objIndex].ObjectName, objects[objIndex].VersionID)
} else {
errs[objIndex] = toObjectErr(err, bucket, objects[objIndex].ObjectName)
}
defer NSUpdated(bucket, objects[objIndex].ObjectName)
}
return dobjects, errs
}
func (es *erasureSingle) deletePrefix(ctx context.Context, bucket, prefix string) error {
dirPrefix := encodeDirObject(prefix)
defer es.disk.Delete(ctx, bucket, dirPrefix, DeleteOptions{
Recursive: true,
Force: true,
})
return es.disk.Delete(ctx, bucket, prefix, DeleteOptions{
Recursive: true,
Force: true,
})
}
// DeleteObject - deletes an object, this call doesn't necessary reply
// any error as it is not necessary for the handler to reply back a
// response to the client request.
func (es *erasureSingle) DeleteObject(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
if err = checkDelObjArgs(ctx, bucket, object); err != nil {
return objInfo, err
}
if opts.DeletePrefix {
return ObjectInfo{}, toObjectErr(es.deletePrefix(ctx, bucket, object), bucket, object)
}
object = encodeDirObject(object)
var lc *lifecycle.Lifecycle
var rcfg lock.Retention
if opts.Expiration.Expire {
// Check if the current bucket has a configured lifecycle policy
lc, _ = globalLifecycleSys.Get(bucket)
rcfg, _ = globalBucketObjectLockSys.Get(bucket)
}
// expiration attempted on a bucket with no lifecycle
// rules shall be rejected.
if lc == nil && opts.Expiration.Expire {
if opts.VersionID != "" {
return objInfo, VersionNotFound{
Bucket: bucket,
Object: object,
VersionID: opts.VersionID,
}
}
return objInfo, ObjectNotFound{
Bucket: bucket,
Object: object,
}
}
// Acquire a write lock before deleting the object.
lk := es.NewNSLock(bucket, object)
lkctx, err := lk.GetLock(ctx, globalDeleteOperationTimeout)
if err != nil {
return ObjectInfo{}, err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
versionFound := true
objInfo = ObjectInfo{VersionID: opts.VersionID} // version id needed in Delete API response.
goi, writeQuorum, gerr := es.getObjectInfoAndQuorum(ctx, bucket, object, opts)
if gerr != nil && goi.Name == "" {
switch gerr.(type) {
case InsufficientReadQuorum:
return objInfo, InsufficientWriteQuorum{}
}
// For delete marker replication, versionID being replicated will not exist on disk
if opts.DeleteMarker {
versionFound = false
} else {
return objInfo, gerr
}
}
if opts.Expiration.Expire {
action := evalActionFromLifecycle(ctx, *lc, rcfg, goi, false)
var isErr bool
switch action {
case lifecycle.NoneAction:
isErr = true
case lifecycle.TransitionAction, lifecycle.TransitionVersionAction:
isErr = true
}
if isErr {
if goi.VersionID != "" {
return goi, VersionNotFound{
Bucket: bucket,
Object: object,
VersionID: goi.VersionID,
}
}
return goi, ObjectNotFound{
Bucket: bucket,
Object: object,
}
}
}
defer NSUpdated(bucket, object)
var markDelete bool
// Determine whether to mark object deleted for replication
if goi.VersionID != "" {
markDelete = true
}
// Default deleteMarker to true if object is under versioning
deleteMarker := opts.Versioned
if opts.VersionID != "" {
// case where replica version needs to be deleted on target cluster
if versionFound && opts.DeleteMarkerReplicationStatus() == replication.Replica {
markDelete = false
}
if opts.VersionPurgeStatus().Empty() && opts.DeleteMarkerReplicationStatus().Empty() {
markDelete = false
}
if opts.VersionPurgeStatus() == Complete {
markDelete = false
}
// Version is found but we do not wish to create more delete markers
// now, since VersionPurgeStatus() is already set, we can let the
// lower layers decide this. This fixes a regression that was introduced
// in PR #14555 where !VersionPurgeStatus.Empty() is automatically
// considered as Delete marker true to avoid listing such objects by
// regular ListObjects() calls. However for delete replication this
// ends up being a problem because "upon" a successful delete this
// ends up creating a new delete marker that is spurious and unnecessary.
if versionFound {
if !goi.VersionPurgeStatus.Empty() {
deleteMarker = false
} else if !goi.DeleteMarker { // implies a versioned delete of object
deleteMarker = false
}
}
}
modTime := opts.MTime
if opts.MTime.IsZero() {
modTime = UTCNow()
}
fvID := mustGetUUID()
if markDelete {
if opts.Versioned || opts.VersionSuspended {
if !deleteMarker {
// versioning suspended means we add `null` version as
// delete marker, if its not decided already.
deleteMarker = opts.VersionSuspended && opts.VersionID == ""
}
fi := FileInfo{
Name: object,
Deleted: deleteMarker,
MarkDeleted: markDelete,
ModTime: modTime,
ReplicationState: opts.DeleteReplication,
TransitionStatus: opts.Transition.Status,
ExpireRestored: opts.Transition.ExpireRestored,
}
fi.SetTierFreeVersionID(fvID)
if opts.Versioned {
fi.VersionID = mustGetUUID()
if opts.VersionID != "" {
fi.VersionID = opts.VersionID
}
}
// versioning suspended means we add `null` version as
// delete marker. Add delete marker, since we don't have
// any version specified explicitly. Or if a particular
// version id needs to be replicated.
if err = es.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi, opts.DeleteMarker); err != nil {
return objInfo, toObjectErr(err, bucket, object)
}
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
}
}
// Delete the object version on all disks.
dfi := FileInfo{
Name: object,
VersionID: opts.VersionID,
MarkDeleted: markDelete,
Deleted: deleteMarker,
ModTime: modTime,
ReplicationState: opts.DeleteReplication,
TransitionStatus: opts.Transition.Status,
ExpireRestored: opts.Transition.ExpireRestored,
}
dfi.SetTierFreeVersionID(fvID)
if err = es.deleteObjectVersion(ctx, bucket, object, writeQuorum, dfi, opts.DeleteMarker); err != nil {
return objInfo, toObjectErr(err, bucket, object)
}
return ObjectInfo{
Bucket: bucket,
Name: object,
VersionID: opts.VersionID,
VersionPurgeStatusInternal: opts.DeleteReplication.VersionPurgeStatusInternal,
ReplicationStatusInternal: opts.DeleteReplication.ReplicationStatusInternal,
}, nil
}
func (es *erasureSingle) PutObjectMetadata(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
if !opts.NoLock {
// Lock the object before updating metadata.
lk := es.NewNSLock(bucket, object)
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
return ObjectInfo{}, err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
}
disks := []StorageAPI{es.disk}
var metaArr []FileInfo
var errs []error
// Read metadata associated with the object from all disks.
metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false)
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, 0)
if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
// List all online disks.
onlineDisks, modTime := listOnlineDisks(disks, metaArr, errs)
// Pick latest valid metadata.
fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum)
if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
if fi.Deleted {
return ObjectInfo{}, toObjectErr(errMethodNotAllowed, bucket, object)
}
filterOnlineDisksInplace(fi, metaArr, onlineDisks)
// if version-id is not specified retention is supposed to be set on the latest object.
if opts.VersionID == "" {
opts.VersionID = fi.VersionID
}
objInfo := fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
if opts.EvalMetadataFn != nil {
if err := opts.EvalMetadataFn(objInfo); err != nil {
return ObjectInfo{}, err
}
}
for k, v := range objInfo.UserDefined {
fi.Metadata[k] = v
}
fi.ModTime = opts.MTime
fi.VersionID = opts.VersionID
if err = es.updateObjectMeta(ctx, bucket, object, fi, onlineDisks...); err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
}
// PutObjectTags - replace or add tags to an existing object
func (es *erasureSingle) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) (ObjectInfo, error) {
// Lock the object before updating tags.
lk := es.NewNSLock(bucket, object)
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
return ObjectInfo{}, err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
disks := []StorageAPI{es.disk}
var metaArr []FileInfo
var errs []error
// Read metadata associated with the object from all disks.
if opts.VersionID != "" {
metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false)
} else {
metaArr, errs = readAllXL(ctx, disks, bucket, object, false)
}
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, 0)
if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
// List all online disks.
onlineDisks, modTime := listOnlineDisks(disks, metaArr, errs)
// Pick latest valid metadata.
fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum)
if err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
if fi.Deleted {
if opts.VersionID == "" {
return ObjectInfo{}, toObjectErr(errFileNotFound, bucket, object)
}
return ObjectInfo{}, toObjectErr(errMethodNotAllowed, bucket, object)
}
filterOnlineDisksInplace(fi, metaArr, onlineDisks)
fi.Metadata[xhttp.AmzObjectTagging] = tags
fi.ReplicationState = opts.PutReplicationState()
for k, v := range opts.UserDefined {
fi.Metadata[k] = v
}
if err = es.updateObjectMeta(ctx, bucket, object, fi, onlineDisks...); err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
}
// updateObjectMeta will update the metadata of a file.
func (es *erasureSingle) updateObjectMeta(ctx context.Context, bucket, object string, fi FileInfo, onlineDisks ...StorageAPI) error {
if len(fi.Metadata) == 0 {
return nil
}
g := errgroup.WithNErrs(len(onlineDisks))
// Start writing `xl.meta` to all disks in parallel.
for index := range onlineDisks {
index := index
g.Go(func() error {
if onlineDisks[index] == nil {
return errDiskNotFound
}
return onlineDisks[index].UpdateMetadata(ctx, bucket, object, fi)
}, index)
}
// Wait for all the routines.
mErrs := g.Wait()
return reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, 1)
}
// DeleteObjectTags - delete object tags from an existing object
func (es *erasureSingle) DeleteObjectTags(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
return es.PutObjectTags(ctx, bucket, object, "", opts)
}
// GetObjectTags - get object tags from an existing object
func (es *erasureSingle) GetObjectTags(ctx context.Context, bucket, object string, opts ObjectOptions) (*tags.Tags, error) {
// GetObjectInfo will return tag value as well
oi, err := es.GetObjectInfo(ctx, bucket, object, opts)
if err != nil {
return nil, err
}
return tags.ParseObjectTags(oi.UserTags)
}
// TransitionObject - transition object content to target tier.
func (es *erasureSingle) TransitionObject(ctx context.Context, bucket, object string, opts ObjectOptions) error {
tgtClient, err := globalTierConfigMgr.getDriver(opts.Transition.Tier)
if err != nil {
return err
}
// Acquire write lock before starting to transition the object.
lk := es.NewNSLock(bucket, object)
lkctx, err := lk.GetLock(ctx, globalDeleteOperationTimeout)
if err != nil {
return err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
fi, metaArr, onlineDisks, err := es.getObjectFileInfo(ctx, bucket, object, opts, true)
if err != nil {
return toObjectErr(err, bucket, object)
}
if fi.Deleted {
if opts.VersionID == "" {
return toObjectErr(errFileNotFound, bucket, object)
}
// Make sure to return object info to provide extra information.
return toObjectErr(errMethodNotAllowed, bucket, object)
}
// verify that the object queued for transition is identical to that on disk.
if !opts.MTime.Equal(fi.ModTime) || !strings.EqualFold(opts.Transition.ETag, extractETag(fi.Metadata)) {
return toObjectErr(errFileNotFound, bucket, object)
}
// if object already transitioned, return
if fi.TransitionStatus == lifecycle.TransitionComplete {
return nil
}
defer NSUpdated(bucket, object)
destObj, err := genTransitionObjName(bucket)
if err != nil {
return err
}
pr, pw := xioutil.WaitPipe()
go func() {
err := es.getObjectWithFileInfo(ctx, bucket, object, 0, fi.Size, pw, fi, metaArr, onlineDisks)
pw.CloseWithError(err)
}()
var rv remoteVersionID
rv, err = tgtClient.Put(ctx, destObj, pr, fi.Size)
pr.CloseWithError(err)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to transition %s/%s(%s) to %s tier: %w", bucket, object, opts.VersionID, opts.Transition.Tier, err))
return err
}
fi.TransitionStatus = lifecycle.TransitionComplete
fi.TransitionedObjName = destObj
fi.TransitionTier = opts.Transition.Tier
fi.TransitionVersionID = string(rv)
eventName := event.ObjectTransitionComplete
// we now know the number of blocks this object needs for data and parity.
// writeQuorum is dataBlocks + 1
writeQuorum := fi.Erasure.DataBlocks
if fi.Erasure.DataBlocks == fi.Erasure.ParityBlocks {
writeQuorum++
}
if err = es.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi, false); err != nil {
eventName = event.ObjectTransitionFailed
}
objInfo := fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
sendEvent(eventArgs{
EventName: eventName,
BucketName: bucket,
Object: objInfo,
Host: "Internal: [ILM-Transition]",
})
auditLogLifecycle(ctx, objInfo, ILMTransition)
return err
}
// RestoreTransitionedObject - restore transitioned object content locally on this cluster.
// This is similar to PostObjectRestore from AWS GLACIER
// storage class. When PostObjectRestore API is called, a temporary copy of the object
// is restored locally to the bucket on source cluster until the restore expiry date.
// The copy that was transitioned continues to reside in the transitioned tier.
func (es *erasureSingle) RestoreTransitionedObject(ctx context.Context, bucket, object string, opts ObjectOptions) error {
return es.restoreTransitionedObject(ctx, bucket, object, opts)
}
// update restore status header in the metadata
func (es *erasureSingle) updateRestoreMetadata(ctx context.Context, bucket, object string, objInfo ObjectInfo, opts ObjectOptions, rerr error) error {
oi := objInfo.Clone()
oi.metadataOnly = true // Perform only metadata updates.
if rerr == nil {
oi.UserDefined[xhttp.AmzRestore] = completedRestoreObj(opts.Transition.RestoreExpiry).String()
} else { // allow retry in the case of failure to restore
delete(oi.UserDefined, xhttp.AmzRestore)
}
if _, err := es.CopyObject(ctx, bucket, object, bucket, object, oi, ObjectOptions{
VersionID: oi.VersionID,
}, ObjectOptions{
VersionID: oi.VersionID,
}); err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to update transition restore metadata for %s/%s(%s): %s", bucket, object, oi.VersionID, err))
return err
}
return nil
}
// restoreTransitionedObject for multipart object chunks the file stream from remote tier into the same number of parts
// as in the xl.meta for this version and rehydrates the part.n into the fi.DataDir for this version as in the xl.meta
func (es *erasureSingle) restoreTransitionedObject(ctx context.Context, bucket string, object string, opts ObjectOptions) error {
setRestoreHeaderFn := func(oi ObjectInfo, rerr error) error {
es.updateRestoreMetadata(ctx, bucket, object, oi, opts, rerr)
return rerr
}
var oi ObjectInfo
// get the file info on disk for transitioned object
actualfi, _, _, err := es.getObjectFileInfo(ctx, bucket, object, opts, false)
if err != nil {
return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object))
}
oi = actualfi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
ropts := putRestoreOpts(bucket, object, opts.Transition.RestoreRequest, oi)
if len(oi.Parts) == 1 {
var rs *HTTPRangeSpec
gr, err := getTransitionedObjectReader(ctx, bucket, object, rs, http.Header{}, oi, opts)
if err != nil {
return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object))
}
defer gr.Close()
hashReader, err := hash.NewReader(gr, gr.ObjInfo.Size, "", "", gr.ObjInfo.Size)
if err != nil {
return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object))
}
pReader := NewPutObjReader(hashReader)
ropts.UserDefined[xhttp.AmzRestore] = completedRestoreObj(opts.Transition.RestoreExpiry).String()
_, err = es.PutObject(ctx, bucket, object, pReader, ropts)
return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object))
}
uploadID, err := es.NewMultipartUpload(ctx, bucket, object, ropts)
if err != nil {
return setRestoreHeaderFn(oi, err)
}
var uploadedParts []CompletePart
var rs *HTTPRangeSpec
// get reader from the warm backend - note that even in the case of encrypted objects, this stream is still encrypted.
gr, err := getTransitionedObjectReader(ctx, bucket, object, rs, http.Header{}, oi, opts)
if err != nil {
return setRestoreHeaderFn(oi, err)
}
defer gr.Close()
// rehydrate the parts back on disk as per the original xl.meta prior to transition
for _, partInfo := range oi.Parts {
hr, err := hash.NewReader(gr, partInfo.Size, "", "", partInfo.Size)
if err != nil {
return setRestoreHeaderFn(oi, err)
}
pInfo, err := es.PutObjectPart(ctx, bucket, object, uploadID, partInfo.Number, NewPutObjReader(hr), ObjectOptions{})
if err != nil {
return setRestoreHeaderFn(oi, err)
}
if pInfo.Size != partInfo.Size {
return setRestoreHeaderFn(oi, InvalidObjectState{Bucket: bucket, Object: object})
}
uploadedParts = append(uploadedParts, CompletePart{
PartNumber: pInfo.PartNumber,
ETag: pInfo.ETag,
})
}
_, err = es.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, ObjectOptions{
MTime: oi.ModTime,
})
return setRestoreHeaderFn(oi, err)
}
func (es *erasureSingle) getUploadIDDir(bucket, object, uploadID string) string {
return pathJoin(es.getMultipartSHADir(bucket, object), uploadID)
}
func (es *erasureSingle) getMultipartSHADir(bucket, object string) string {
return getSHA256Hash([]byte(pathJoin(bucket, object)))
}
// checkUploadIDExists - verify if a given uploadID exists and is valid.
func (es *erasureSingle) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string) (err error) {
defer func() {
if err == errFileNotFound {
err = errUploadIDNotFound
}
}()
disks := []StorageAPI{es.disk}
// Read metadata associated with the object from all disks.
metaArr, errs := readAllFileInfo(ctx, disks, minioMetaMultipartBucket, es.getUploadIDDir(bucket, object, uploadID), "", false)
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, 0)
if err != nil {
return err
}
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
return reducedErr
}
// List all online disks.
_, modTime := listOnlineDisks(disks, metaArr, errs)
// Pick latest valid metadata.
_, err = pickValidFileInfo(ctx, metaArr, modTime, readQuorum)
return err
}
// Removes part given by partName belonging to a mulitpart upload from minioMetaBucket
func (es *erasureSingle) removeObjectPart(bucket, object, uploadID, dataDir string, partNumber int) {
uploadIDPath := es.getUploadIDDir(bucket, object, uploadID)
curpartPath := pathJoin(uploadIDPath, dataDir, fmt.Sprintf("part.%d", partNumber))
storageDisks := []StorageAPI{es.disk}
g := errgroup.WithNErrs(len(storageDisks))
for index, disk := range storageDisks {
if disk == nil {
continue
}
index := index
g.Go(func() error {
// Ignoring failure to remove parts that weren't present in CompleteMultipartUpload
// requests. xl.meta is the authoritative source of truth on which parts constitute
// the object. The presence of parts that don't belong in the object doesn't affect correctness.
_ = storageDisks[index].Delete(context.TODO(), minioMetaMultipartBucket, curpartPath, DeleteOptions{
Recursive: false,
Force: false,
})
return nil
}, index)
}
g.Wait()
}
// Remove the old multipart uploads on the given disk.
func (es *erasureSingle) cleanupStaleUploadsOnDisk(ctx context.Context, disk StorageAPI, expiry time.Duration) {
now := time.Now()
diskPath := disk.Endpoint().Path
readDirFn(pathJoin(diskPath, minioMetaMultipartBucket), func(shaDir string, typ os.FileMode) error {
return readDirFn(pathJoin(diskPath, minioMetaMultipartBucket, shaDir), func(uploadIDDir string, typ os.FileMode) error {
uploadIDPath := pathJoin(shaDir, uploadIDDir)
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "", false)
if err != nil {
return nil
}
wait := es.deletedCleanupSleeper.Timer(ctx)
if now.Sub(fi.ModTime) > expiry {
es.disk.RenameFile(context.Background(), minioMetaMultipartBucket, uploadIDPath, minioMetaTmpDeletedBucket, mustGetUUID())
}
wait()
return nil
})
})
readDirFn(pathJoin(diskPath, minioMetaTmpBucket), func(tmpDir string, typ os.FileMode) error {
if tmpDir == ".trash/" { // do not remove .trash/ here, it has its own routines
return nil
}
vi, err := disk.StatVol(ctx, pathJoin(minioMetaTmpBucket, tmpDir))
if err != nil {
return nil
}
wait := es.deletedCleanupSleeper.Timer(ctx)
if now.Sub(vi.Created) > expiry {
disk.Delete(ctx, minioMetaTmpBucket, tmpDir, DeleteOptions{
Recursive: true,
Force: false,
})
}
wait()
return nil
})
}
// ListMultipartUploads - lists all the pending multipart
// uploads for a particular object in a bucket.
//
// Implements minimal S3 compatible ListMultipartUploads API. We do
// not support prefix based listing, this is a deliberate attempt
// towards simplification of multipart APIs.
// The resulting ListMultipartsInfo structure is unmarshalled directly as XML.
func (es *erasureSingle) ListMultipartUploads(ctx context.Context, bucket, object, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) {
if err := checkListMultipartArgs(ctx, bucket, object, keyMarker, uploadIDMarker, delimiter, es); err != nil {
return ListMultipartsInfo{}, err
}
result.MaxUploads = maxUploads
result.KeyMarker = keyMarker
result.Prefix = object
result.Delimiter = delimiter
uploadIDs, err := es.disk.ListDir(ctx, minioMetaMultipartBucket, es.getMultipartSHADir(bucket, object), -1)
if err != nil {
if err == errFileNotFound {
return result, nil
}
logger.LogIf(ctx, err)
return result, toObjectErr(err, bucket, object)
}
for i := range uploadIDs {
uploadIDs[i] = strings.TrimSuffix(uploadIDs[i], SlashSeparator)
}
// S3 spec says uploadIDs should be sorted based on initiated time, we need
// to read the metadata entry.
var uploads []MultipartInfo
populatedUploadIds := set.NewStringSet()
for _, uploadID := range uploadIDs {
if populatedUploadIds.Contains(uploadID) {
continue
}
fi, err := es.disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(es.getUploadIDDir(bucket, object, uploadID)), "", false)
if err != nil {
return result, toObjectErr(err, bucket, object)
}
populatedUploadIds.Add(uploadID)
uploads = append(uploads, MultipartInfo{
Object: object,
UploadID: uploadID,
Initiated: fi.ModTime,
})
}
sort.Slice(uploads, func(i int, j int) bool {
return uploads[i].Initiated.Before(uploads[j].Initiated)
})
uploadIndex := 0
if uploadIDMarker != "" {
for uploadIndex < len(uploads) {
if uploads[uploadIndex].UploadID != uploadIDMarker {
uploadIndex++
continue
}
if uploads[uploadIndex].UploadID == uploadIDMarker {
uploadIndex++
break
}
uploadIndex++
}
}
for uploadIndex < len(uploads) {
result.Uploads = append(result.Uploads, uploads[uploadIndex])
result.NextUploadIDMarker = uploads[uploadIndex].UploadID
uploadIndex++
if len(result.Uploads) == maxUploads {
break
}
}
result.IsTruncated = uploadIndex < len(uploads)
if !result.IsTruncated {
result.NextKeyMarker = ""
result.NextUploadIDMarker = ""
}
return result, nil
}
// newMultipartUpload - wrapper for initializing a new multipart
// request; returns a unique upload id.
//
// Internally this function creates 'uploads.json' associated for the
// incoming object at
// '.minio.sys/multipart/bucket/object/uploads.json' on all the
// disks. `uploads.json` carries metadata regarding on-going multipart
// operation(s) on the object.
func (es *erasureSingle) newMultipartUpload(ctx context.Context, bucket string, object string, opts ObjectOptions) (string, error) {
onlineDisks := []StorageAPI{es.disk}
parityDrives := 0
dataDrives := len(onlineDisks) - parityDrives
// we now know the number of blocks this object needs for data and parity.
// establish the writeQuorum using this data
writeQuorum := dataDrives
if dataDrives == parityDrives {
writeQuorum++
}
// Initialize parts metadata
partsMetadata := make([]FileInfo, len(onlineDisks))
fi := newFileInfo(pathJoin(bucket, object), dataDrives, parityDrives)
fi.VersionID = opts.VersionID
if opts.Versioned && fi.VersionID == "" {
fi.VersionID = mustGetUUID()
}
fi.DataDir = mustGetUUID()
// Initialize erasure metadata.
for index := range partsMetadata {
partsMetadata[index] = fi
}
// Guess content-type from the extension if possible.
if opts.UserDefined["content-type"] == "" {
opts.UserDefined["content-type"] = mimedb.TypeByExtension(path.Ext(object))
}
modTime := opts.MTime
if opts.MTime.IsZero() {
modTime = UTCNow()
}
onlineDisks, partsMetadata = shuffleDisksAndPartsMetadata(onlineDisks, partsMetadata, fi)
// Fill all the necessary metadata.
// Update `xl.meta` content on each disks.
for index := range partsMetadata {
partsMetadata[index].Fresh = true
partsMetadata[index].ModTime = modTime
partsMetadata[index].Metadata = opts.UserDefined
}
uploadID := mustGetUUID()
uploadIDPath := es.getUploadIDDir(bucket, object, uploadID)
// Write updated `xl.meta` to all disks.
if _, err := writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil {
return "", toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
}
// Return success.
return uploadID, nil
}
// NewMultipartUpload - initialize a new multipart upload, returns a
// unique id. The unique id returned here is of UUID form, for each
// subsequent request each UUID is unique.
//
// Implements S3 compatible initiate multipart API.
func (es *erasureSingle) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (string, error) {
if err := checkNewMultipartArgs(ctx, bucket, object, es); err != nil {
return "", err
}
// No metadata is set, allocate a new one.
if opts.UserDefined == nil {
opts.UserDefined = make(map[string]string)
}
return es.newMultipartUpload(ctx, bucket, object, opts)
}
// CopyObjectPart - reads incoming stream and internally erasure codes
// them. This call is similar to put object part operation but the source
// data is read from an existing object.
//
// Implements S3 compatible Upload Part Copy API.
func (es *erasureSingle) CopyObjectPart(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject, uploadID string, partID int, startOffset int64, length int64, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (pi PartInfo, e error) {
partInfo, err := es.PutObjectPart(ctx, dstBucket, dstObject, uploadID, partID, NewPutObjReader(srcInfo.Reader), dstOpts)
if err != nil {
return pi, toObjectErr(err, dstBucket, dstObject)
}
// Success.
return partInfo, nil
}
// PutObjectPart - reads incoming stream and internally erasure codes
// them. This call is similar to single put operation but it is part
// of the multipart transaction.
//
// Implements S3 compatible Upload Part API.
func (es *erasureSingle) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) {
if err := checkPutObjectPartArgs(ctx, bucket, object, es); err != nil {
return PartInfo{}, err
}
// Write lock for this part ID.
// Held throughout the operation.
partIDLock := es.NewNSLock(bucket, pathJoin(object, uploadID, strconv.Itoa(partID)))
plkctx, err := partIDLock.GetLock(ctx, globalOperationTimeout)
if err != nil {
return PartInfo{}, err
}
pctx := plkctx.Context()
defer partIDLock.Unlock(plkctx.Cancel)
// Read lock for upload id.
// Only held while reading the upload metadata.
uploadIDRLock := es.NewNSLock(bucket, pathJoin(object, uploadID))
rlkctx, err := uploadIDRLock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return PartInfo{}, err
}
rctx := rlkctx.Context()
defer func() {
if uploadIDRLock != nil {
uploadIDRLock.RUnlock(rlkctx.Cancel)
}
}()
data := r.Reader
// Validate input data size and it can never be less than zero.
if data.Size() < -1 {
logger.LogIf(rctx, errInvalidArgument, logger.Application)
return pi, toObjectErr(errInvalidArgument)
}
var partsMetadata []FileInfo
var errs []error
uploadIDPath := es.getUploadIDDir(bucket, object, uploadID)
// Validates if upload ID exists.
if err = es.checkUploadIDExists(rctx, bucket, object, uploadID); err != nil {
return pi, toObjectErr(err, bucket, object, uploadID)
}
storageDisks := []StorageAPI{es.disk}
// Read metadata associated with the object from all disks.
partsMetadata, errs = readAllFileInfo(rctx, storageDisks, minioMetaMultipartBucket,
uploadIDPath, "", false)
// Unlock upload id locks before, so others can get it.
uploadIDRLock.RUnlock(rlkctx.Cancel)
uploadIDRLock = nil
// get Quorum for this object
_, writeQuorum, err := objectQuorumFromMeta(pctx, partsMetadata, errs, 0)
if err != nil {
return pi, toObjectErr(err, bucket, object)
}
reducedErr := reduceWriteQuorumErrs(pctx, errs, objectOpIgnoredErrs, writeQuorum)
if reducedErr == errErasureWriteQuorum {
return pi, toObjectErr(reducedErr, bucket, object)
}
// List all online disks.
onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
// Pick one from the first valid metadata.
fi, err := pickValidFileInfo(pctx, partsMetadata, modTime, writeQuorum)
if err != nil {
return pi, err
}
onlineDisks = shuffleDisks(onlineDisks, fi.Erasure.Distribution)
// Need a unique name for the part being written in minioMetaBucket to
// accommodate concurrent PutObjectPart requests
partSuffix := fmt.Sprintf("part.%d", partID)
tmpPart := mustGetUUID()
tmpPartPath := pathJoin(tmpPart, partSuffix)
// Delete the temporary object part. If PutObjectPart succeeds there would be nothing to delete.
var online int
defer func() {
if online != len(onlineDisks) {
es.disk.RenameFile(context.Background(), minioMetaTmpBucket, tmpPart, minioMetaTmpDeletedBucket, mustGetUUID())
}
}()
erasure, err := NewErasure(pctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize)
if err != nil {
return pi, toObjectErr(err, bucket, object)
}
// Fetch buffer for I/O, returns from the pool if not allocates a new one and returns.
var buffer []byte
switch size := data.Size(); {
case size == 0:
buffer = make([]byte, 1) // Allocate atleast a byte to reach EOF
case size == -1:
if size := data.ActualSize(); size > 0 && size < fi.Erasure.BlockSize {
buffer = make([]byte, data.ActualSize()+256, data.ActualSize()*2+512)
} else {
buffer = es.bp.Get()
defer es.bp.Put(buffer)
}
case size >= fi.Erasure.BlockSize:
buffer = es.bp.Get()
defer es.bp.Put(buffer)
case size < fi.Erasure.BlockSize:
// No need to allocate fully fi.Erasure.BlockSize buffer if the incoming data is smalles.
buffer = make([]byte, size, 2*size+int64(fi.Erasure.ParityBlocks+fi.Erasure.DataBlocks-1))
}
if len(buffer) > int(fi.Erasure.BlockSize) {
buffer = buffer[:fi.Erasure.BlockSize]
}
writers := make([]io.Writer, len(onlineDisks))
for i, disk := range onlineDisks {
if disk == nil {
continue
}
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tmpPartPath, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize())
}
toEncode := io.Reader(data)
if data.Size() > bigFileThreshold {
// Add input readahead.
// We use 2 buffers, so we always have a full buffer of input.
bufA := es.bp.Get()
bufB := es.bp.Get()
defer es.bp.Put(bufA)
defer es.bp.Put(bufB)
ra, err := readahead.NewReaderBuffer(data, [][]byte{bufA[:fi.Erasure.BlockSize], bufB[:fi.Erasure.BlockSize]})
if err == nil {
toEncode = ra
defer ra.Close()
}
}
n, err := erasure.Encode(pctx, toEncode, writers, buffer, writeQuorum)
closeBitrotWriters(writers)
if err != nil {
return pi, toObjectErr(err, bucket, object)
}
// Should return IncompleteBody{} error when reader has fewer bytes
// than specified in request header.
if n < data.Size() {
return pi, IncompleteBody{Bucket: bucket, Object: object}
}
for i := range writers {
if writers[i] == nil {
onlineDisks[i] = nil
}
}
// Acquire write lock to update metadata.
uploadIDWLock := es.NewNSLock(bucket, pathJoin(object, uploadID))
wlkctx, err := uploadIDWLock.GetLock(pctx, globalOperationTimeout)
if err != nil {
return PartInfo{}, err
}
wctx := wlkctx.Context()
defer uploadIDWLock.Unlock(wlkctx.Cancel)
// Validates if upload ID exists.
if err = es.checkUploadIDExists(wctx, bucket, object, uploadID); err != nil {
return pi, toObjectErr(err, bucket, object, uploadID)
}
// Rename temporary part file to its final location.
partPath := pathJoin(uploadIDPath, fi.DataDir, partSuffix)
onlineDisks, err = renamePart(wctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, writeQuorum)
if err != nil {
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
}
// Read metadata again because it might be updated with parallel upload of another part.
partsMetadata, errs = readAllFileInfo(wctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
reducedErr = reduceWriteQuorumErrs(wctx, errs, objectOpIgnoredErrs, writeQuorum)
if reducedErr == errErasureWriteQuorum {
return pi, toObjectErr(reducedErr, bucket, object)
}
// Get current highest version based on re-read partsMetadata.
onlineDisks, modTime = listOnlineDisks(onlineDisks, partsMetadata, errs)
// Pick one from the first valid metadata.
fi, err = pickValidFileInfo(wctx, partsMetadata, modTime, writeQuorum)
if err != nil {
return pi, err
}
// Once part is successfully committed, proceed with updating erasure metadata.
fi.ModTime = UTCNow()
md5hex := r.MD5CurrentHexString()
var index []byte
if opts.IndexCB != nil {
index = opts.IndexCB()
}
// Add the current part.
fi.AddObjectPart(partID, md5hex, n, data.ActualSize(), fi.ModTime, index)
for i, disk := range onlineDisks {
if disk == OfflineDisk {
continue
}
partsMetadata[i].Size = fi.Size
partsMetadata[i].ModTime = fi.ModTime
partsMetadata[i].Parts = fi.Parts
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{
PartNumber: partID,
Algorithm: DefaultBitrotAlgorithm,
Hash: bitrotWriterSum(writers[i]),
})
}
// Writes update `xl.meta` format for each disk.
if _, err = writeUniqueFileInfo(wctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil {
return pi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
}
online = countOnlineDisks(onlineDisks)
// Return success.
return PartInfo{
PartNumber: partID,
ETag: md5hex,
LastModified: fi.ModTime,
Size: n,
ActualSize: data.ActualSize(),
}, nil
}
// GetMultipartInfo returns multipart metadata uploaded during newMultipartUpload, used
// by callers to verify object states
// - encrypted
// - compressed
func (es *erasureSingle) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (MultipartInfo, error) {
if err := checkListPartsArgs(ctx, bucket, object, es); err != nil {
return MultipartInfo{}, err
}
result := MultipartInfo{
Bucket: bucket,
Object: object,
UploadID: uploadID,
}
uploadIDLock := es.NewNSLock(bucket, pathJoin(object, uploadID))
lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return MultipartInfo{}, err
}
ctx = lkctx.Context()
defer uploadIDLock.RUnlock(lkctx.Cancel)
if err := es.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
return result, toObjectErr(err, bucket, object, uploadID)
}
uploadIDPath := es.getUploadIDDir(bucket, object, uploadID)
storageDisks := []StorageAPI{es.disk}
// Read metadata associated with the object from all disks.
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, opts.VersionID, false)
// get Quorum for this object
readQuorum, _, err := objectQuorumFromMeta(ctx, partsMetadata, errs, 0)
if err != nil {
return result, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
}
reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum)
if reducedErr == errErasureReadQuorum {
return result, toObjectErr(reducedErr, minioMetaMultipartBucket, uploadIDPath)
}
_, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
// Pick one from the first valid metadata.
fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, readQuorum)
if err != nil {
return result, err
}
result.UserDefined = cloneMSS(fi.Metadata)
return result, nil
}
// ListObjectParts - lists all previously uploaded parts for a given
// object and uploadID. Takes additional input of part-number-marker
// to indicate where the listing should begin from.
//
// Implements S3 compatible ListObjectParts API. The resulting
// ListPartsInfo structure is marshaled directly into XML and
// replied back to the client.
func (es *erasureSingle) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) {
if err := checkListPartsArgs(ctx, bucket, object, es); err != nil {
return ListPartsInfo{}, err
}
uploadIDLock := es.NewNSLock(bucket, pathJoin(object, uploadID))
lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return ListPartsInfo{}, err
}
ctx = lkctx.Context()
defer uploadIDLock.RUnlock(lkctx.Cancel)
if err := es.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
return result, toObjectErr(err, bucket, object, uploadID)
}
uploadIDPath := es.getUploadIDDir(bucket, object, uploadID)
storageDisks := []StorageAPI{es.disk}
// Read metadata associated with the object from all disks.
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
// get Quorum for this object
_, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs, 0)
if err != nil {
return result, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
}
reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
if reducedErr == errErasureWriteQuorum {
return result, toObjectErr(reducedErr, minioMetaMultipartBucket, uploadIDPath)
}
_, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
// Pick one from the first valid metadata.
fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, writeQuorum)
if err != nil {
return result, err
}
// Populate the result stub.
result.Bucket = bucket
result.Object = object
result.UploadID = uploadID
result.MaxParts = maxParts
result.PartNumberMarker = partNumberMarker
result.UserDefined = cloneMSS(fi.Metadata)
// For empty number of parts or maxParts as zero, return right here.
if len(fi.Parts) == 0 || maxParts == 0 {
return result, nil
}
// Limit output to maxPartsList.
if maxParts > maxPartsList {
maxParts = maxPartsList
}
// Only parts with higher part numbers will be listed.
partIdx := objectPartIndex(fi.Parts, partNumberMarker)
parts := fi.Parts
if partIdx != -1 {
parts = fi.Parts[partIdx+1:]
}
count := maxParts
for _, part := range parts {
result.Parts = append(result.Parts, PartInfo{
PartNumber: part.Number,
ETag: part.ETag,
LastModified: fi.ModTime,
Size: part.Size,
})
count--
if count == 0 {
break
}
}
// If listed entries are more than maxParts, we set IsTruncated as true.
if len(parts) > len(result.Parts) {
result.IsTruncated = true
// Make sure to fill next part number marker if IsTruncated is
// true for subsequent listing.
nextPartNumberMarker := result.Parts[len(result.Parts)-1].PartNumber
result.NextPartNumberMarker = nextPartNumberMarker
}
return result, nil
}
// CompleteMultipartUpload - completes an ongoing multipart
// transaction after receiving all the parts indicated by the client.
// Returns an md5sum calculated by concatenating all the individual
// md5sums of all the parts.
//
// Implements S3 compatible Complete multipart API.
func (es *erasureSingle) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, parts []CompletePart, opts ObjectOptions) (oi ObjectInfo, err error) {
if err = checkCompleteMultipartArgs(ctx, bucket, object, es); err != nil {
return oi, err
}
// Hold read-locks to verify uploaded parts, also disallows
// parallel part uploads as well.
uploadIDLock := es.NewNSLock(bucket, pathJoin(object, uploadID))
rlkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return oi, err
}
rctx := rlkctx.Context()
defer uploadIDLock.RUnlock(rlkctx.Cancel)
if err = es.checkUploadIDExists(rctx, bucket, object, uploadID); err != nil {
return oi, toObjectErr(err, bucket, object, uploadID)
}
uploadIDPath := es.getUploadIDDir(bucket, object, uploadID)
storageDisks := []StorageAPI{es.disk}
// Read metadata associated with the object from all disks.
partsMetadata, errs := readAllFileInfo(rctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
// get Quorum for this object
_, writeQuorum, err := objectQuorumFromMeta(rctx, partsMetadata, errs, 0)
if err != nil {
return oi, toObjectErr(err, bucket, object)
}
reducedErr := reduceWriteQuorumErrs(rctx, errs, objectOpIgnoredErrs, writeQuorum)
if reducedErr == errErasureWriteQuorum {
return oi, toObjectErr(reducedErr, bucket, object)
}
onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
// Pick one from the first valid metadata.
fi, err := pickValidFileInfo(rctx, partsMetadata, modTime, writeQuorum)
if err != nil {
return oi, err
}
// Calculate full object size.
var objectSize int64
// Calculate consolidated actual size.
var objectActualSize int64
// Order online disks in accordance with distribution order.
// Order parts metadata in accordance with distribution order.
onlineDisks, partsMetadata = shuffleDisksAndPartsMetadataByIndex(onlineDisks, partsMetadata, fi)
// Save current erasure metadata for validation.
currentFI := fi
// Allocate parts similar to incoming slice.
fi.Parts = make([]ObjectPartInfo, len(parts))
// Validate each part and then commit to disk.
for i, part := range parts {
partIdx := objectPartIndex(currentFI.Parts, part.PartNumber)
// All parts should have same part number.
if partIdx == -1 {
invp := InvalidPart{
PartNumber: part.PartNumber,
GotETag: part.ETag,
}
return oi, invp
}
// ensure that part ETag is canonicalized to strip off extraneous quotes
part.ETag = canonicalizeETag(part.ETag)
if currentFI.Parts[partIdx].ETag != part.ETag {
invp := InvalidPart{
PartNumber: part.PartNumber,
ExpETag: currentFI.Parts[partIdx].ETag,
GotETag: part.ETag,
}
return oi, invp
}
// All parts except the last part has to be atleast 5MB.
if (i < len(parts)-1) && !isMinAllowedPartSize(currentFI.Parts[partIdx].ActualSize) {
return oi, PartTooSmall{
PartNumber: part.PartNumber,
PartSize: currentFI.Parts[partIdx].ActualSize,
PartETag: part.ETag,
}
}
// Save for total object size.
objectSize += currentFI.Parts[partIdx].Size
// Save the consolidated actual size.
objectActualSize += currentFI.Parts[partIdx].ActualSize
// Add incoming parts.
fi.Parts[i] = ObjectPartInfo{
Number: part.PartNumber,
Size: currentFI.Parts[partIdx].Size,
ActualSize: currentFI.Parts[partIdx].ActualSize,
Index: currentFI.Parts[partIdx].Index,
}
}
// Save the final object size and modtime.
fi.Size = objectSize
fi.ModTime = opts.MTime
if opts.MTime.IsZero() {
fi.ModTime = UTCNow()
}
// Save successfully calculated md5sum.
fi.Metadata["etag"] = opts.UserDefined["etag"]
if fi.Metadata["etag"] == "" {
fi.Metadata["etag"] = getCompleteMultipartMD5(parts)
}
// Save the consolidated actual size.
fi.Metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(objectActualSize, 10)
// Update all erasure metadata, make sure to not modify fields like
// checksum which are different on each disks.
for index := range partsMetadata {
if partsMetadata[index].IsValid() {
partsMetadata[index].Size = fi.Size
partsMetadata[index].ModTime = fi.ModTime
partsMetadata[index].Metadata = fi.Metadata
partsMetadata[index].Parts = fi.Parts
}
}
// Hold namespace to complete the transaction
lk := es.NewNSLock(bucket, object)
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
return oi, err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
// Write final `xl.meta` at uploadID location
onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum)
if err != nil {
return oi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
}
// Remove parts that weren't present in CompleteMultipartUpload request.
for _, curpart := range currentFI.Parts {
if objectPartIndex(fi.Parts, curpart.Number) == -1 {
// Delete the missing part files. e.g,
// Request 1: NewMultipart
// Request 2: PutObjectPart 1
// Request 3: PutObjectPart 2
// Request 4: CompleteMultipartUpload --part 2
// N.B. 1st part is not present. This part should be removed from the storage.
es.removeObjectPart(bucket, object, uploadID, fi.DataDir, curpart.Number)
}
}
// Rename the multipart object to final location.
if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath,
partsMetadata, bucket, object, writeQuorum); err != nil {
return oi, toObjectErr(err, bucket, object)
}
defer NSUpdated(bucket, object)
for i := 0; i < len(onlineDisks); i++ {
if onlineDisks[i] != nil && onlineDisks[i].IsOnline() {
// Object info is the same in all disks, so we can pick
// the first meta from online disk
fi = partsMetadata[i]
break
}
}
// we are adding a new version to this object under the namespace lock, so this is the latest version.
fi.IsLatest = true
// Success, return object info.
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
}
// AbortMultipartUpload - aborts an ongoing multipart operation
// signified by the input uploadID. This is an atomic operation
// doesn't require clients to initiate multiple such requests.
//
// All parts are purged from all disks and reference to the uploadID
// would be removed from the system, rollback is not possible on this
// operation.
func (es *erasureSingle) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (err error) {
if err = checkAbortMultipartArgs(ctx, bucket, object, es); err != nil {
return err
}
lk := es.NewNSLock(bucket, pathJoin(object, uploadID))
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
return err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
// Validates if upload ID exists.
if err := es.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
return toObjectErr(err, bucket, object, uploadID)
}
// Cleanup all uploaded parts.
es.disk.RenameFile(ctx, minioMetaMultipartBucket, es.getUploadIDDir(bucket, object, uploadID), minioMetaTmpDeletedBucket, mustGetUUID())
// Successfully purged.
return nil
}
func (es *erasureSingle) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
var loi ListObjectsInfo
// Automatically remove the object/version is an expiry lifecycle rule can be applied
lc, _ := globalLifecycleSys.Get(bucket)
// Check if bucket is object locked.
rcfg, _ := globalBucketObjectLockSys.Get(bucket)
if len(prefix) > 0 && maxKeys == 1 && delimiter == "" && marker == "" {
// Optimization for certain applications like
// - Cohesity
// - Actifio, Splunk etc.
// which send ListObjects requests where the actual object
// itself is the prefix and max-keys=1 in such scenarios
// we can simply verify locally if such an object exists
// to avoid the need for ListObjects().
objInfo, err := es.GetObjectInfo(ctx, bucket, prefix, ObjectOptions{NoLock: true})
if err == nil {
if lc != nil {
action := evalActionFromLifecycle(ctx, *lc, rcfg, objInfo, false)
switch action {
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
fallthrough
case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
return loi, nil
}
}
loi.Objects = append(loi.Objects, objInfo)
return loi, nil
}
}
opts := listPathOptions{
Bucket: bucket,
Prefix: prefix,
Separator: delimiter,
Limit: maxKeysPlusOne(maxKeys, marker != ""),
Marker: marker,
InclDeleted: false,
AskDisks: globalAPIConfig.getListQuorum(),
Lifecycle: lc,
Retention: rcfg,
}
merged, err := es.listPath(ctx, &opts)
if err != nil && err != io.EOF {
if !isErrBucketNotFound(err) {
logger.LogIf(ctx, err)
}
return loi, err
}
merged.forwardPast(opts.Marker)
defer merged.truncate(0) // Release when returning
// Default is recursive, if delimiter is set then list non recursive.
objects := merged.fileInfos(bucket, prefix, delimiter)
loi.IsTruncated = err == nil && len(objects) > 0
if maxKeys > 0 && len(objects) > maxKeys {
objects = objects[:maxKeys]
loi.IsTruncated = true
}
for _, obj := range objects {
if obj.IsDir && obj.ModTime.IsZero() && delimiter != "" {
loi.Prefixes = append(loi.Prefixes, obj.Name)
} else {
loi.Objects = append(loi.Objects, obj)
}
}
if loi.IsTruncated {
last := objects[len(objects)-1]
loi.NextMarker = opts.encodeMarker(last.Name)
}
return loi, nil
}
func (es *erasureSingle) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (ListObjectsV2Info, error) {
marker := continuationToken
if marker == "" {
marker = startAfter
}
loi, err := es.ListObjects(ctx, bucket, prefix, marker, delimiter, maxKeys)
if err != nil {
return ListObjectsV2Info{}, err
}
listObjectsV2Info := ListObjectsV2Info{
IsTruncated: loi.IsTruncated,
ContinuationToken: continuationToken,
NextContinuationToken: loi.NextMarker,
Objects: loi.Objects,
Prefixes: loi.Prefixes,
}
return listObjectsV2Info, err
}
func (es *erasureSingle) ListObjectVersions(ctx context.Context, bucket, prefix, marker, versionMarker, delimiter string, maxKeys int) (ListObjectVersionsInfo, error) {
loi := ListObjectVersionsInfo{}
if marker == "" && versionMarker != "" {
return loi, NotImplemented{}
}
opts := listPathOptions{
Bucket: bucket,
Prefix: prefix,
Separator: delimiter,
Limit: maxKeysPlusOne(maxKeys, marker != ""),
Marker: marker,
InclDeleted: true,
AskDisks: "strict",
Versioned: true,
}
merged, err := es.listPath(ctx, &opts)
if err != nil && err != io.EOF {
return loi, err
}
defer merged.truncate(0) // Release when returning
if versionMarker == "" {
o := listPathOptions{Marker: marker}
// If we are not looking for a specific version skip it.
o.parseMarker()
merged.forwardPast(o.Marker)
}
objects := merged.fileInfoVersions(bucket, prefix, delimiter, versionMarker)
loi.IsTruncated = err == nil && len(objects) > 0
if maxKeys > 0 && len(objects) > maxKeys {
objects = objects[:maxKeys]
loi.IsTruncated = true
}
for _, obj := range objects {
if obj.IsDir && obj.ModTime.IsZero() && delimiter != "" {
loi.Prefixes = append(loi.Prefixes, obj.Name)
} else {
loi.Objects = append(loi.Objects, obj)
}
}
if loi.IsTruncated {
last := objects[len(objects)-1]
loi.NextMarker = opts.encodeMarker(last.Name)
loi.NextVersionIDMarker = last.VersionID
}
return loi, nil
}
// Walk a bucket, optionally prefix recursively, until we have returned
// all the content to objectInfo channel, it is callers responsibility
// to allocate a receive channel for ObjectInfo, upon any unhandled
// error walker returns error. Optionally if context.Done() is received
// then Walk() stops the walker.
func (es *erasureSingle) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts ObjectOptions) error {
if err := checkListObjsArgs(ctx, bucket, prefix, "", es); err != nil {
// Upon error close the channel.
close(results)
return err
}
ctx, cancel := context.WithCancel(ctx)
go func() {
defer cancel()
defer close(results)
versioned := opts.Versioned || opts.VersionSuspended
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
loadEntry := func(entry metaCacheEntry) {
if entry.isDir() {
return
}
fivs, err := entry.fileInfoVersions(bucket)
if err != nil {
cancel()
return
}
if opts.WalkAscending {
for i := len(fivs.Versions) - 1; i >= 0; i-- {
version := fivs.Versions[i]
results <- version.ToObjectInfo(bucket, version.Name, versioned)
}
return
}
for _, version := range fivs.Versions {
results <- version.ToObjectInfo(bucket, version.Name, versioned)
}
}
// How to resolve partial results.
resolver := metadataResolutionParams{
dirQuorum: 1,
objQuorum: 1,
bucket: bucket,
}
path := baseDirFromPrefix(prefix)
filterPrefix := strings.Trim(strings.TrimPrefix(prefix, path), slashSeparator)
if path == prefix {
filterPrefix = ""
}
lopts := listPathRawOptions{
disks: []StorageAPI{es.disk},
bucket: bucket,
path: path,
filterPrefix: filterPrefix,
recursive: true,
forwardTo: "",
minDisks: 1,
reportNotFound: false,
agreed: loadEntry,
partial: func(entries metaCacheEntries, _ []error) {
entry, ok := entries.resolve(&resolver)
if !ok {
// check if we can get one entry atleast
// proceed to heal nonetheless.
entry, _ = entries.firstFound()
}
loadEntry(*entry)
},
finished: nil,
}
if err := listPathRaw(ctx, lopts); err != nil {
logger.LogIf(ctx, fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts))
return
}
}()
wg.Wait()
}()
return nil
}
// Health - returns current status of the object layer health, for single drive
// its as simple as returning healthy as long as drive is accessible.
func (es *erasureSingle) Health(ctx context.Context, opts HealthOptions) HealthResult {
_, err := es.disk.DiskInfo(ctx)
if err != nil {
return HealthResult{}
}
if opts.Maintenance {
// Single drive cannot be put under maintenance.
return HealthResult{
Healthy: false,
WriteQuorum: 1,
}
}
return HealthResult{
Healthy: true,
WriteQuorum: 1,
}
}
// ReadHealth - returns current status of the object layer health for reads,
// for single drive its as simple as returning healthy as long as drive is accessible.
func (es *erasureSingle) ReadHealth(ctx context.Context) bool {
res := es.Health(ctx, HealthOptions{})
return res.Healthy
}
// nsScanner will start scanning buckets and send updated totals as they are traversed.
// Updates are sent on a regular basis and the caller *must* consume them.
func (es *erasureSingle) nsScanner(ctx context.Context, buckets []BucketInfo, bf *bloomFilter, wantCycle uint32, updates chan<- dataUsageCache, healScanMode madmin.HealScanMode) error {
if len(buckets) == 0 {
return nil
}
// Collect disks we can use.
disks := []StorageAPI{es.disk}
// Load bucket totals
oldCache := dataUsageCache{}
if err := oldCache.load(ctx, es, dataUsageCacheName); err != nil {
return err
}
// New cache..
cache := dataUsageCache{
Info: dataUsageCacheInfo{
Name: dataUsageRoot,
NextCycle: oldCache.Info.NextCycle,
},
Cache: make(map[string]dataUsageEntry, len(oldCache.Cache)),
}
bloom := bf.bytes()
// Put all buckets into channel.
bucketCh := make(chan BucketInfo, len(buckets))
// Add new buckets first
for _, b := range buckets {
if oldCache.find(b.Name) == nil {
bucketCh <- b
}
}
// Add existing buckets.
for _, b := range buckets {
e := oldCache.find(b.Name)
if e != nil {
cache.replace(b.Name, dataUsageRoot, *e)
bucketCh <- b
}
}
close(bucketCh)
bucketResults := make(chan dataUsageEntryInfo, len(disks))
// Start async collector/saver.
// This goroutine owns the cache.
var saverWg sync.WaitGroup
saverWg.Add(1)
go func() {
// Add jitter to the update time so multiple sets don't sync up.
updateTime := 30*time.Second + time.Duration(float64(10*time.Second)*rand.Float64())
t := time.NewTicker(updateTime)
defer t.Stop()
defer saverWg.Done()
var lastSave time.Time
for {
select {
case <-ctx.Done():
// Return without saving.
return
case <-t.C:
if cache.Info.LastUpdate.Equal(lastSave) {
continue
}
logger.LogIf(ctx, cache.save(ctx, es, dataUsageCacheName))
updates <- cache.clone()
lastSave = cache.Info.LastUpdate
case v, ok := <-bucketResults:
if !ok {
// Save final state...
cache.Info.NextCycle = wantCycle
cache.Info.LastUpdate = time.Now()
logger.LogIf(ctx, cache.save(ctx, es, dataUsageCacheName))
updates <- cache
return
}
cache.replace(v.Name, v.Parent, v.Entry)
cache.Info.LastUpdate = time.Now()
}
}
}()
// Shuffle disks to ensure a total randomness of bucket/disk association to ensure
// that objects that are not present in all disks are accounted and ILM applied.
r := rand.New(rand.NewSource(time.Now().UnixNano()))
r.Shuffle(len(disks), func(i, j int) { disks[i], disks[j] = disks[j], disks[i] })
// Start one scanner per disk
var wg sync.WaitGroup
wg.Add(len(disks))
for i := range disks {
go func(i int) {
defer wg.Done()
disk := disks[i]
for bucket := range bucketCh {
select {
case <-ctx.Done():
return
default:
}
// Load cache for bucket
cacheName := pathJoin(bucket.Name, dataUsageCacheName)
cache := dataUsageCache{}
logger.LogIf(ctx, cache.load(ctx, es, cacheName))
if cache.Info.Name == "" {
cache.Info.Name = bucket.Name
}
cache.Info.BloomFilter = bloom
cache.Info.SkipHealing = true
cache.Info.NextCycle = wantCycle
if cache.Info.Name != bucket.Name {
logger.LogIf(ctx, fmt.Errorf("cache name mismatch: %s != %s", cache.Info.Name, bucket.Name))
cache.Info = dataUsageCacheInfo{
Name: bucket.Name,
LastUpdate: time.Time{},
NextCycle: wantCycle,
}
}
// Collect updates.
updates := make(chan dataUsageEntry, 1)
var wg sync.WaitGroup
wg.Add(1)
go func(name string) {
defer wg.Done()
for update := range updates {
bucketResults <- dataUsageEntryInfo{
Name: name,
Parent: dataUsageRoot,
Entry: update,
}
}
}(cache.Info.Name)
// Calc usage
before := cache.Info.LastUpdate
var err error
cache, err = disk.NSScanner(ctx, cache, updates, healScanMode)
cache.Info.BloomFilter = nil
if err != nil {
if !cache.Info.LastUpdate.IsZero() && cache.Info.LastUpdate.After(before) {
logger.LogIf(ctx, cache.save(ctx, es, cacheName))
} else {
logger.LogIf(ctx, err)
}
// This ensures that we don't close
// bucketResults channel while the
// updates-collector goroutine still
// holds a reference to this.
wg.Wait()
continue
}
wg.Wait()
var root dataUsageEntry
if r := cache.root(); r != nil {
root = cache.flatten(*r)
}
t := time.Now()
bucketResults <- dataUsageEntryInfo{
Name: cache.Info.Name,
Parent: dataUsageRoot,
Entry: root,
}
// We want to avoid synchronizing up all writes in case
// the results are piled up.
time.Sleep(time.Duration(float64(time.Since(t)) * rand.Float64()))
// Save cache
logger.LogIf(ctx, cache.save(ctx, es, cacheName))
}
}(i)
}
wg.Wait()
close(bucketResults)
saverWg.Wait()
return nil
}
func (es *erasureSingle) NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo, wantCycle uint32, healScanMode madmin.HealScanMode) error {
// Updates must be closed before we return.
defer close(updates)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup
var mu sync.Mutex
results := make([]dataUsageCache, 1)
var firstErr error
allBuckets, err := es.ListBuckets(ctx, BucketOptions{})
if err != nil {
return err
}
if len(allBuckets) == 0 {
updates <- DataUsageInfo{} // no buckets found update data usage to reflect latest state
return nil
}
// Scanner latest allBuckets first.
sort.Slice(allBuckets, func(i, j int) bool {
return allBuckets[i].Created.After(allBuckets[j].Created)
})
wg.Add(1)
go func() {
updates := make(chan dataUsageCache, 1)
defer close(updates)
// Start update collector.
go func() {
defer wg.Done()
for info := range updates {
mu.Lock()
results[0] = info
mu.Unlock()
}
}()
// Start scanner. Blocks until done.
err := es.nsScanner(ctx, allBuckets, bf, wantCycle, updates, healScanMode)
if err != nil {
logger.LogIf(ctx, err)
mu.Lock()
if firstErr == nil {
firstErr = err
}
// Cancel remaining...
cancel()
mu.Unlock()
return
}
}()
updateCloser := make(chan chan struct{})
go func() {
updateTicker := time.NewTicker(30 * time.Second)
defer updateTicker.Stop()
var lastUpdate time.Time
// We need to merge since we will get the same buckets from each pool.
// Therefore to get the exact bucket sizes we must merge before we can convert.
var allMerged dataUsageCache
update := func() {
mu.Lock()
defer mu.Unlock()
allMerged = dataUsageCache{Info: dataUsageCacheInfo{Name: dataUsageRoot}}
for _, info := range results {
if info.Info.LastUpdate.IsZero() {
// Not filled yet.
return
}
allMerged.merge(info)
}
if allMerged.root() != nil && allMerged.Info.LastUpdate.After(lastUpdate) {
updates <- allMerged.dui(allMerged.Info.Name, allBuckets)
lastUpdate = allMerged.Info.LastUpdate
}
}
for {
select {
case <-ctx.Done():
return
case v := <-updateCloser:
update()
close(v)
return
case <-updateTicker.C:
update()
}
}
}()
wg.Wait()
ch := make(chan struct{})
select {
case updateCloser <- ch:
<-ch
case <-ctx.Done():
if firstErr == nil {
firstErr = ctx.Err()
}
}
return firstErr
}
// GetRawData will return all files with a given raw path to the callback.
// Errors are ignored, only errors from the callback are returned.
// For now only direct file paths are supported.
func (es *erasureSingle) GetRawData(ctx context.Context, volume, file string, fn func(r io.Reader, host string, disk string, filename string, info StatInfo) error) error {
found := 0
stats, err := es.disk.StatInfoFile(ctx, volume, file, true)
if err != nil {
return err
}
for _, si := range stats {
found++
var r io.ReadCloser
if !si.Dir {
r, err = es.disk.ReadFileStream(ctx, volume, si.Name, 0, si.Size)
if err != nil {
continue
}
} else {
r = io.NopCloser(bytes.NewBuffer([]byte{}))
}
// Keep disk path instead of ID, to ensure that the downloaded zip file can be
// easily automated with `minio server hostname{1...n}/disk{1...m}`.
err = fn(r, es.disk.Hostname(), es.disk.Endpoint().Path, pathJoin(volume, si.Name), si)
r.Close()
if err != nil {
return err
}
}
if found == 0 {
return errFileNotFound
}
return nil
}