mirror of
https://github.com/minio/minio.git
synced 2024-12-25 06:35:56 -05:00
14645142db
Erasure SD DeleteObjects() is only inheriting bucket versioning status from the handler layer. Add the missing versioning prefix evaluation for each object that will deleted.
3299 lines
102 KiB
Go
3299 lines
102 KiB
Go
// Copyright (c) 2015-2022 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math/rand"
|
|
"net/http"
|
|
"os"
|
|
"path"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/dustin/go-humanize"
|
|
"github.com/klauspost/readahead"
|
|
"github.com/minio/madmin-go"
|
|
"github.com/minio/minio-go/v7/pkg/s3utils"
|
|
"github.com/minio/minio-go/v7/pkg/set"
|
|
"github.com/minio/minio-go/v7/pkg/tags"
|
|
"github.com/minio/minio/internal/bpool"
|
|
"github.com/minio/minio/internal/bucket/lifecycle"
|
|
"github.com/minio/minio/internal/bucket/object/lock"
|
|
"github.com/minio/minio/internal/bucket/replication"
|
|
"github.com/minio/minio/internal/event"
|
|
"github.com/minio/minio/internal/hash"
|
|
xhttp "github.com/minio/minio/internal/http"
|
|
xioutil "github.com/minio/minio/internal/ioutil"
|
|
"github.com/minio/minio/internal/logger"
|
|
"github.com/minio/minio/internal/sync/errgroup"
|
|
"github.com/minio/pkg/mimedb"
|
|
)
|
|
|
|
// erasureSingle - Implements single drive XL layer
|
|
type erasureSingle struct {
|
|
GatewayUnsupported
|
|
|
|
disk StorageAPI
|
|
|
|
endpoint Endpoint
|
|
|
|
// Locker mutex map.
|
|
nsMutex *nsLockMap
|
|
|
|
// Byte pools used for temporary i/o buffers.
|
|
bp *bpool.BytePoolCap
|
|
|
|
deletedCleanupSleeper *dynamicSleeper
|
|
|
|
// Shut down async operations
|
|
shutdown context.CancelFunc
|
|
|
|
format *formatErasureV3
|
|
}
|
|
|
|
// Initialize new set of erasure coded sets.
|
|
func newErasureSingle(ctx context.Context, storageDisk StorageAPI, format *formatErasureV3) (ObjectLayer, error) {
|
|
// Number of buffers, max 2GB
|
|
n := (2 * humanize.GiByte) / (blockSizeV2 * 2)
|
|
|
|
// Initialize byte pool once for all sets, bpool size is set to
|
|
// setCount * setDriveCount with each memory upto blockSizeV2.
|
|
bp := bpool.NewBytePoolCap(n, blockSizeV2, blockSizeV2*2)
|
|
|
|
// Initialize the erasure sets instance.
|
|
s := &erasureSingle{
|
|
disk: storageDisk,
|
|
endpoint: storageDisk.Endpoint(),
|
|
format: format,
|
|
nsMutex: newNSLock(false),
|
|
bp: bp,
|
|
deletedCleanupSleeper: newDynamicSleeper(10, 2*time.Second),
|
|
}
|
|
|
|
// start cleanup stale uploads go-routine.
|
|
go s.cleanupStaleUploads(ctx)
|
|
|
|
// start cleanup of deleted objects.
|
|
go s.cleanupDeletedObjects(ctx)
|
|
|
|
ctx, s.shutdown = context.WithCancel(ctx)
|
|
go intDataUpdateTracker.start(ctx, s.endpoint.Path)
|
|
|
|
return s, nil
|
|
}
|
|
|
|
// List all buckets from one of the set, we are not doing merge
|
|
// sort here just for simplification. As per design it is assumed
|
|
// that all buckets are present on all sets.
|
|
func (es *erasureSingle) ListBuckets(ctx context.Context) (buckets []BucketInfo, err error) {
|
|
var listBuckets []BucketInfo
|
|
healBuckets := map[string]VolInfo{}
|
|
// lists all unique buckets across drives.
|
|
if err := listAllBuckets(ctx, []StorageAPI{es.disk}, healBuckets, 0); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, v := range healBuckets {
|
|
listBuckets = append(listBuckets, BucketInfo(v))
|
|
}
|
|
|
|
sort.Slice(listBuckets, func(i, j int) bool {
|
|
return listBuckets[i].Name < listBuckets[j].Name
|
|
})
|
|
|
|
for i := range listBuckets {
|
|
meta, err := globalBucketMetadataSys.Get(listBuckets[i].Name)
|
|
if err == nil {
|
|
listBuckets[i].Created = meta.Created
|
|
}
|
|
}
|
|
|
|
return listBuckets, nil
|
|
}
|
|
|
|
func (es *erasureSingle) cleanupStaleUploads(ctx context.Context) {
|
|
timer := time.NewTimer(globalAPIConfig.getStaleUploadsCleanupInterval())
|
|
defer timer.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-timer.C:
|
|
es.cleanupStaleUploadsOnDisk(ctx, es.disk, globalAPIConfig.getStaleUploadsExpiry())
|
|
|
|
// Reset for the next interval
|
|
timer.Reset(globalAPIConfig.getStaleUploadsCleanupInterval())
|
|
}
|
|
}
|
|
}
|
|
|
|
// cleanup ".trash/" folder every 5m minutes with sufficient sleep cycles, between each
|
|
// deletes a dynamic sleeper is used with a factor of 10 ratio with max delay between
|
|
// deletes to be 2 seconds.
|
|
func (es *erasureSingle) cleanupDeletedObjects(ctx context.Context) {
|
|
timer := time.NewTimer(globalAPIConfig.getDeleteCleanupInterval())
|
|
defer timer.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-timer.C:
|
|
es.cleanupDeletedObjectsInner(ctx)
|
|
// Reset for the next interval
|
|
timer.Reset(globalAPIConfig.getDeleteCleanupInterval())
|
|
}
|
|
}
|
|
}
|
|
|
|
// NewNSLock - initialize a new namespace RWLocker instance.
|
|
func (es *erasureSingle) NewNSLock(bucket string, objects ...string) RWLocker {
|
|
return es.nsMutex.NewNSLock(nil, bucket, objects...)
|
|
}
|
|
|
|
// Shutdown function for object storage interface.
|
|
func (es *erasureSingle) Shutdown(ctx context.Context) error {
|
|
defer es.shutdown()
|
|
|
|
// Add any object layer shutdown activities here.
|
|
closeStorageDisks(es.disk)
|
|
return nil
|
|
}
|
|
|
|
func (es *erasureSingle) BackendInfo() (b madmin.BackendInfo) {
|
|
b.Type = madmin.Erasure
|
|
|
|
scParity := 0
|
|
rrSCParity := 0
|
|
|
|
// Data blocks can vary per pool, but parity is same.
|
|
for _, setDriveCount := range es.SetDriveCounts() {
|
|
b.StandardSCData = append(b.StandardSCData, setDriveCount-scParity)
|
|
b.RRSCData = append(b.RRSCData, setDriveCount-rrSCParity)
|
|
}
|
|
|
|
b.StandardSCParity = scParity
|
|
b.RRSCParity = rrSCParity
|
|
return
|
|
}
|
|
|
|
// StorageInfo - returns underlying storage statistics.
|
|
func (es *erasureSingle) StorageInfo(ctx context.Context) (StorageInfo, []error) {
|
|
disks := []StorageAPI{es.disk}
|
|
endpoints := []Endpoint{es.endpoint}
|
|
|
|
storageInfo, errs := getStorageInfo(disks, endpoints)
|
|
storageInfo.Backend = es.BackendInfo()
|
|
return storageInfo, errs
|
|
}
|
|
|
|
// LocalStorageInfo - returns underlying local storage statistics.
|
|
func (es *erasureSingle) LocalStorageInfo(ctx context.Context) (StorageInfo, []error) {
|
|
disks := []StorageAPI{es.disk}
|
|
endpoints := []Endpoint{es.endpoint}
|
|
|
|
var localDisks []StorageAPI
|
|
var localEndpoints []Endpoint
|
|
|
|
for i, endpoint := range endpoints {
|
|
if endpoint.IsLocal {
|
|
localDisks = append(localDisks, disks[i])
|
|
localEndpoints = append(localEndpoints, endpoint)
|
|
}
|
|
}
|
|
|
|
return getStorageInfo(localDisks, localEndpoints)
|
|
}
|
|
|
|
// Clean-up previously deleted objects. from .minio.sys/tmp/.trash/
|
|
func (es *erasureSingle) cleanupDeletedObjectsInner(ctx context.Context) {
|
|
diskPath := es.disk.Endpoint().Path
|
|
readDirFn(pathJoin(diskPath, minioMetaTmpDeletedBucket), func(ddir string, typ os.FileMode) error {
|
|
wait := es.deletedCleanupSleeper.Timer(ctx)
|
|
removeAll(pathJoin(diskPath, minioMetaTmpDeletedBucket, ddir))
|
|
wait()
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (es *erasureSingle) renameAll(ctx context.Context, bucket, prefix string) {
|
|
if es.disk != nil {
|
|
es.disk.RenameFile(ctx, bucket, prefix, minioMetaTmpDeletedBucket, mustGetUUID())
|
|
}
|
|
}
|
|
|
|
type renameAllStorager interface {
|
|
renameAll(ctx context.Context, bucket, prefix string)
|
|
}
|
|
|
|
// Bucket operations
|
|
// MakeBucket - make a bucket.
|
|
func (es *erasureSingle) MakeBucketWithLocation(ctx context.Context, bucket string, opts BucketOptions) error {
|
|
defer NSUpdated(bucket, slashSeparator)
|
|
|
|
// Lock the bucket name before creating.
|
|
lk := es.NewNSLock(minioMetaTmpBucket, bucket+".lck")
|
|
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ctx = lkctx.Context()
|
|
defer lk.Unlock(lkctx.Cancel)
|
|
|
|
// Verify if bucket is valid.
|
|
if !isMinioMetaBucketName(bucket) {
|
|
if err := s3utils.CheckValidBucketNameStrict(bucket); err != nil {
|
|
return BucketNameInvalid{Bucket: bucket}
|
|
}
|
|
}
|
|
|
|
if err := es.disk.MakeVol(ctx, bucket); err != nil {
|
|
if opts.ForceCreate && errors.Is(err, errVolumeExists) {
|
|
// No need to return error when force create was
|
|
// requested.
|
|
return nil
|
|
}
|
|
if !errors.Is(err, errVolumeExists) {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
return toObjectErr(err, bucket)
|
|
}
|
|
|
|
// If it doesn't exist we get a new, so ignore errors
|
|
meta := newBucketMetadata(bucket)
|
|
if opts.LockEnabled {
|
|
meta.VersioningConfigXML = enabledBucketVersioningConfig
|
|
meta.ObjectLockConfigXML = enabledBucketObjectLockConfig
|
|
}
|
|
|
|
if opts.VersioningEnabled {
|
|
meta.VersioningConfigXML = enabledBucketVersioningConfig
|
|
}
|
|
|
|
if err := meta.Save(context.Background(), es); err != nil {
|
|
return toObjectErr(err, bucket)
|
|
}
|
|
|
|
globalBucketMetadataSys.Set(bucket, meta)
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetBucketInfo - returns BucketInfo for a bucket.
|
|
func (es *erasureSingle) GetBucketInfo(ctx context.Context, bucket string) (bi BucketInfo, e error) {
|
|
volInfo, err := es.disk.StatVol(ctx, bucket)
|
|
if err != nil {
|
|
return bi, toObjectErr(err, bucket)
|
|
}
|
|
return BucketInfo(volInfo), nil
|
|
}
|
|
|
|
// DeleteBucket - deletes a bucket.
|
|
func (es *erasureSingle) DeleteBucket(ctx context.Context, bucket string, opts DeleteBucketOptions) error {
|
|
// Collect if all disks report volume not found.
|
|
defer NSUpdated(bucket, slashSeparator)
|
|
|
|
err := es.disk.DeleteVol(ctx, bucket, opts.Force)
|
|
return toObjectErr(err, bucket)
|
|
}
|
|
|
|
// IsNotificationSupported returns whether bucket notification is applicable for this layer.
|
|
func (es *erasureSingle) IsNotificationSupported() bool {
|
|
return true
|
|
}
|
|
|
|
// IsListenSupported returns whether listen bucket notification is applicable for this layer.
|
|
func (es *erasureSingle) IsListenSupported() bool {
|
|
return true
|
|
}
|
|
|
|
// IsEncryptionSupported returns whether server side encryption is implemented for this layer.
|
|
func (es *erasureSingle) IsEncryptionSupported() bool {
|
|
return true
|
|
}
|
|
|
|
// IsCompressionSupported returns whether compression is applicable for this layer.
|
|
func (es *erasureSingle) IsCompressionSupported() bool {
|
|
return true
|
|
}
|
|
|
|
// IsTaggingSupported indicates whethes *erasureSingle implements tagging support.
|
|
func (es *erasureSingle) IsTaggingSupported() bool {
|
|
return true
|
|
}
|
|
|
|
// Object Operations
|
|
|
|
// CopyObject - copy object source object to destination object.
|
|
// if source object and destination object are same we only
|
|
// update metadata.
|
|
func (es *erasureSingle) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (oi ObjectInfo, err error) {
|
|
defer NSUpdated(dstBucket, dstObject)
|
|
|
|
srcObject = encodeDirObject(srcObject)
|
|
dstObject = encodeDirObject(dstObject)
|
|
|
|
cpSrcDstSame := isStringEqual(pathJoin(srcBucket, srcObject), pathJoin(dstBucket, dstObject))
|
|
|
|
if !dstOpts.NoLock {
|
|
ns := es.NewNSLock(dstBucket, dstObject)
|
|
lkctx, err := ns.GetLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return ObjectInfo{}, err
|
|
}
|
|
ctx = lkctx.Context()
|
|
defer ns.Unlock(lkctx.Cancel)
|
|
dstOpts.NoLock = true
|
|
}
|
|
|
|
if cpSrcDstSame && srcInfo.metadataOnly {
|
|
// Read metadata associated with the object from all disks.
|
|
storageDisks := []StorageAPI{es.disk}
|
|
|
|
var metaArr []FileInfo
|
|
var errs []error
|
|
|
|
// Read metadata associated with the object from all disks.
|
|
if srcOpts.VersionID != "" {
|
|
metaArr, errs = readAllFileInfo(ctx, storageDisks, srcBucket, srcObject, srcOpts.VersionID, true)
|
|
} else {
|
|
metaArr, errs = readAllXL(ctx, storageDisks, srcBucket, srcObject, true)
|
|
}
|
|
|
|
readQuorum, writeQuorum, err := objectQuorumFromMeta(ctx, metaArr, errs, 0)
|
|
if err != nil {
|
|
return ObjectInfo{}, toObjectErr(err, srcBucket, srcObject)
|
|
}
|
|
|
|
// List all online disks.
|
|
onlineDisks, modTime := listOnlineDisks(storageDisks, metaArr, errs)
|
|
|
|
// Pick latest valid metadata.
|
|
fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum)
|
|
if err != nil {
|
|
return oi, toObjectErr(err, srcBucket, srcObject)
|
|
}
|
|
if fi.Deleted {
|
|
if srcOpts.VersionID == "" {
|
|
return oi, toObjectErr(errFileNotFound, srcBucket, srcObject)
|
|
}
|
|
return fi.ToObjectInfo(srcBucket, srcObject, srcOpts.Versioned || srcOpts.VersionSuspended), toObjectErr(errMethodNotAllowed, srcBucket, srcObject)
|
|
}
|
|
|
|
filterOnlineDisksInplace(fi, metaArr, onlineDisks)
|
|
|
|
versionID := srcInfo.VersionID
|
|
if srcInfo.versionOnly {
|
|
versionID = dstOpts.VersionID
|
|
// preserve destination versionId if specified.
|
|
if versionID == "" {
|
|
versionID = mustGetUUID()
|
|
fi.IsLatest = true // we are creating a new version so this is latest.
|
|
}
|
|
modTime = UTCNow()
|
|
}
|
|
|
|
// If the data is not inlined, we may end up incorrectly
|
|
// inlining the data here, that leads to an inconsistent
|
|
// situation where some objects are were not inlined
|
|
// were now inlined, make sure to `nil` the Data such
|
|
// that xl.meta is written as expected.
|
|
if !fi.InlineData() {
|
|
fi.Data = nil
|
|
}
|
|
|
|
fi.VersionID = versionID // set any new versionID we might have created
|
|
fi.ModTime = modTime // set modTime for the new versionID
|
|
if !dstOpts.MTime.IsZero() {
|
|
modTime = dstOpts.MTime
|
|
fi.ModTime = dstOpts.MTime
|
|
}
|
|
fi.Metadata = srcInfo.UserDefined
|
|
srcInfo.UserDefined["etag"] = srcInfo.ETag
|
|
|
|
// Update `xl.meta` content on each disks.
|
|
for index := range metaArr {
|
|
if metaArr[index].IsValid() {
|
|
metaArr[index].ModTime = modTime
|
|
metaArr[index].VersionID = versionID
|
|
metaArr[index].Metadata = srcInfo.UserDefined
|
|
if !metaArr[index].InlineData() {
|
|
// If the data is not inlined, we may end up incorrectly
|
|
// inlining the data here, that leads to an inconsistent
|
|
// situation where some objects are were not inlined
|
|
// were now inlined, make sure to `nil` the Data such
|
|
// that xl.meta is written as expected.
|
|
metaArr[index].Data = nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// Write unique `xl.meta` for each disk.
|
|
if _, err = writeUniqueFileInfo(ctx, onlineDisks, srcBucket, srcObject, metaArr, writeQuorum); err != nil {
|
|
return oi, toObjectErr(err, srcBucket, srcObject)
|
|
}
|
|
|
|
return fi.ToObjectInfo(srcBucket, srcObject, srcOpts.Versioned || srcOpts.VersionSuspended), nil
|
|
}
|
|
|
|
putOpts := ObjectOptions{
|
|
ServerSideEncryption: dstOpts.ServerSideEncryption,
|
|
UserDefined: srcInfo.UserDefined,
|
|
Versioned: dstOpts.Versioned,
|
|
VersionID: dstOpts.VersionID,
|
|
MTime: dstOpts.MTime,
|
|
NoLock: true,
|
|
}
|
|
|
|
return es.PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts)
|
|
}
|
|
|
|
// GetObjectNInfo - returns object info and an object
|
|
// Read(Closer). When err != nil, the returned reader is always nil.
|
|
func (es *erasureSingle) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
|
|
if err = checkGetObjArgs(ctx, bucket, object); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
object = encodeDirObject(object)
|
|
|
|
var unlockOnDefer bool
|
|
nsUnlocker := func() {}
|
|
defer func() {
|
|
if unlockOnDefer {
|
|
nsUnlocker()
|
|
}
|
|
}()
|
|
|
|
// Acquire lock
|
|
if lockType != noLock {
|
|
lock := es.NewNSLock(bucket, object)
|
|
switch lockType {
|
|
case writeLock:
|
|
lkctx, err := lock.GetLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ctx = lkctx.Context()
|
|
nsUnlocker = func() { lock.Unlock(lkctx.Cancel) }
|
|
case readLock:
|
|
lkctx, err := lock.GetRLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ctx = lkctx.Context()
|
|
nsUnlocker = func() { lock.RUnlock(lkctx.Cancel) }
|
|
}
|
|
unlockOnDefer = true
|
|
}
|
|
|
|
fi, metaArr, onlineDisks, err := es.getObjectFileInfo(ctx, bucket, object, opts, true)
|
|
if err != nil {
|
|
return nil, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
objInfo := fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
|
|
if objInfo.DeleteMarker {
|
|
if opts.VersionID == "" {
|
|
return &GetObjectReader{
|
|
ObjInfo: objInfo,
|
|
}, toObjectErr(errFileNotFound, bucket, object)
|
|
}
|
|
// Make sure to return object info to provide extra information.
|
|
return &GetObjectReader{
|
|
ObjInfo: objInfo,
|
|
}, toObjectErr(errMethodNotAllowed, bucket, object)
|
|
}
|
|
if objInfo.IsRemote() {
|
|
gr, err := getTransitionedObjectReader(ctx, bucket, object, rs, h, objInfo, opts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
unlockOnDefer = false
|
|
return gr.WithCleanupFuncs(nsUnlocker), nil
|
|
}
|
|
|
|
fn, off, length, err := NewGetObjectReader(rs, objInfo, opts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
unlockOnDefer = false
|
|
|
|
pr, pw := xioutil.WaitPipe()
|
|
go func() {
|
|
pw.CloseWithError(es.getObjectWithFileInfo(ctx, bucket, object, off, length, pw, fi, metaArr, onlineDisks))
|
|
}()
|
|
|
|
// Cleanup function to cause the go routine above to exit, in
|
|
// case of incomplete read.
|
|
pipeCloser := func() {
|
|
pr.CloseWithError(nil)
|
|
}
|
|
|
|
return fn(pr, h, pipeCloser, nsUnlocker)
|
|
}
|
|
|
|
func (es *erasureSingle) getObjectWithFileInfo(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI) error {
|
|
// Reorder online disks based on erasure distribution ordes.
|
|
// Reorder parts metadata based on erasure distribution ordes.
|
|
onlineDisks, metaArr = shuffleDisksAndPartsMetadataByIndex(onlineDisks, metaArr, fi)
|
|
|
|
// For negative length read everything.
|
|
if length < 0 {
|
|
length = fi.Size - startOffset
|
|
}
|
|
|
|
// Reply back invalid range if the input offset and length fall out of range.
|
|
if startOffset > fi.Size || startOffset+length > fi.Size {
|
|
logger.LogIf(ctx, InvalidRange{startOffset, length, fi.Size}, logger.Application)
|
|
return InvalidRange{startOffset, length, fi.Size}
|
|
}
|
|
|
|
// Get start part index and offset.
|
|
partIndex, partOffset, err := fi.ObjectToPartOffset(ctx, startOffset)
|
|
if err != nil {
|
|
return InvalidRange{startOffset, length, fi.Size}
|
|
}
|
|
|
|
// Calculate endOffset according to length
|
|
endOffset := startOffset
|
|
if length > 0 {
|
|
endOffset += length - 1
|
|
}
|
|
|
|
// Get last part index to read given length.
|
|
lastPartIndex, _, err := fi.ObjectToPartOffset(ctx, endOffset)
|
|
if err != nil {
|
|
return InvalidRange{startOffset, length, fi.Size}
|
|
}
|
|
|
|
var totalBytesRead int64
|
|
erasure, err := NewErasure(ctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize)
|
|
if err != nil {
|
|
return toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
// once we have obtained a common FileInfo i.e latest, we should stick
|
|
// to single dataDir to read the content to avoid reading from some other
|
|
// dataDir that has stale FileInfo{} to ensure that we fail appropriately
|
|
// during reads and expect the same dataDir everywhere.
|
|
dataDir := fi.DataDir
|
|
for ; partIndex <= lastPartIndex; partIndex++ {
|
|
if length == totalBytesRead {
|
|
break
|
|
}
|
|
|
|
partNumber := fi.Parts[partIndex].Number
|
|
|
|
// Save the current part name and size.
|
|
partSize := fi.Parts[partIndex].Size
|
|
|
|
partLength := partSize - partOffset
|
|
// partLength should be adjusted so that we don't write more data than what was requested.
|
|
if partLength > (length - totalBytesRead) {
|
|
partLength = length - totalBytesRead
|
|
}
|
|
|
|
tillOffset := erasure.ShardFileOffset(partOffset, partLength, partSize)
|
|
// Get the checksums of the current part.
|
|
readers := make([]io.ReaderAt, len(onlineDisks))
|
|
prefer := make([]bool, len(onlineDisks))
|
|
for index, disk := range onlineDisks {
|
|
if disk == OfflineDisk {
|
|
continue
|
|
}
|
|
if !metaArr[index].IsValid() {
|
|
continue
|
|
}
|
|
checksumInfo := metaArr[index].Erasure.GetChecksumInfo(partNumber)
|
|
partPath := pathJoin(object, dataDir, fmt.Sprintf("part.%d", partNumber))
|
|
readers[index] = newBitrotReader(disk, metaArr[index].Data, bucket, partPath, tillOffset,
|
|
checksumInfo.Algorithm, checksumInfo.Hash, erasure.ShardSize())
|
|
|
|
// Prefer local disks
|
|
prefer[index] = disk.Hostname() == ""
|
|
}
|
|
|
|
_, err = erasure.Decode(ctx, writer, readers, partOffset, partLength, partSize, prefer)
|
|
// Note: we should not be defer'ing the following closeBitrotReaders() call as
|
|
// we are inside a for loop i.e if we use defer, we would accumulate a lot of open files by the time
|
|
// we return from this function.
|
|
closeBitrotReaders(readers)
|
|
if err != nil {
|
|
return toObjectErr(err, bucket, object)
|
|
}
|
|
for i, r := range readers {
|
|
if r == nil {
|
|
onlineDisks[i] = OfflineDisk
|
|
}
|
|
}
|
|
// Track total bytes read from disk and written to the client.
|
|
totalBytesRead += partLength
|
|
// partOffset will be valid only for the first part, hence reset it to 0 for
|
|
// the remaining parts.
|
|
partOffset = 0
|
|
} // End of read all parts loop.
|
|
// Return success.
|
|
return nil
|
|
}
|
|
|
|
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
|
|
func (es *erasureSingle) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (info ObjectInfo, err error) {
|
|
if err = checkGetObjArgs(ctx, bucket, object); err != nil {
|
|
return info, err
|
|
}
|
|
|
|
object = encodeDirObject(object)
|
|
if !opts.NoLock {
|
|
// Lock the object before reading.
|
|
lk := es.NewNSLock(bucket, object)
|
|
lkctx, err := lk.GetRLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return ObjectInfo{}, err
|
|
}
|
|
ctx = lkctx.Context()
|
|
defer lk.RUnlock(lkctx.Cancel)
|
|
}
|
|
|
|
return es.getObjectInfo(ctx, bucket, object, opts)
|
|
}
|
|
|
|
func (es *erasureSingle) getObjectFileInfo(ctx context.Context, bucket, object string, opts ObjectOptions, readData bool) (fi FileInfo, metaArr []FileInfo, onlineDisks []StorageAPI, err error) {
|
|
disks := []StorageAPI{es.disk}
|
|
|
|
var errs []error
|
|
|
|
// Read metadata associated with the object from all disks.
|
|
metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, readData)
|
|
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, 0)
|
|
if err != nil {
|
|
return fi, nil, nil, toObjectErr(err, bucket, object)
|
|
}
|
|
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
|
|
return fi, nil, nil, toObjectErr(reducedErr, bucket, object)
|
|
}
|
|
|
|
// List all online disks.
|
|
onlineDisks, modTime := listOnlineDisks(disks, metaArr, errs)
|
|
|
|
// Pick latest valid metadata.
|
|
fi, err = pickValidFileInfo(ctx, metaArr, modTime, readQuorum)
|
|
if err != nil {
|
|
return fi, nil, nil, err
|
|
}
|
|
|
|
filterOnlineDisksInplace(fi, metaArr, onlineDisks)
|
|
return fi, metaArr, onlineDisks, nil
|
|
}
|
|
|
|
// getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo.
|
|
func (es *erasureSingle) getObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
|
fi, _, _, err := es.getObjectFileInfo(ctx, bucket, object, opts, false)
|
|
if err != nil {
|
|
return objInfo, toObjectErr(err, bucket, object)
|
|
}
|
|
objInfo = fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
|
|
if fi.Deleted {
|
|
if opts.VersionID == "" || opts.DeleteMarker {
|
|
return objInfo, toObjectErr(errFileNotFound, bucket, object)
|
|
}
|
|
// Make sure to return object info to provide extra information.
|
|
return objInfo, toObjectErr(errMethodNotAllowed, bucket, object)
|
|
}
|
|
|
|
return objInfo, nil
|
|
}
|
|
|
|
// getObjectInfoAndQuroum - wrapper for reading object metadata and constructs ObjectInfo, additionally returns write quorum for the object.
|
|
func (es *erasureSingle) getObjectInfoAndQuorum(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, wquorum int, err error) {
|
|
fi, _, _, err := es.getObjectFileInfo(ctx, bucket, object, opts, false)
|
|
if err != nil {
|
|
return objInfo, 1, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
wquorum = fi.Erasure.DataBlocks
|
|
if fi.Erasure.DataBlocks == fi.Erasure.ParityBlocks {
|
|
wquorum++
|
|
}
|
|
|
|
objInfo = fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
|
|
if !fi.VersionPurgeStatus().Empty() && opts.VersionID != "" {
|
|
// Make sure to return object info to provide extra information.
|
|
return objInfo, wquorum, toObjectErr(errMethodNotAllowed, bucket, object)
|
|
}
|
|
|
|
if fi.Deleted {
|
|
if opts.VersionID == "" || opts.DeleteMarker {
|
|
return objInfo, wquorum, toObjectErr(errFileNotFound, bucket, object)
|
|
}
|
|
// Make sure to return object info to provide extra information.
|
|
return objInfo, wquorum, toObjectErr(errMethodNotAllowed, bucket, object)
|
|
}
|
|
|
|
return objInfo, wquorum, nil
|
|
}
|
|
|
|
func (es *erasureSingle) putMetacacheObject(ctx context.Context, key string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
|
data := r.Reader
|
|
|
|
// No metadata is set, allocate a new one.
|
|
if opts.UserDefined == nil {
|
|
opts.UserDefined = make(map[string]string)
|
|
}
|
|
|
|
storageDisks := []StorageAPI{es.disk}
|
|
// Get parity and data drive count based on storage class metadata
|
|
parityDrives := 0
|
|
dataDrives := len(storageDisks) - parityDrives
|
|
|
|
// we now know the number of blocks this object needs for data and parity.
|
|
// writeQuorum is dataBlocks + 1
|
|
writeQuorum := dataDrives
|
|
if dataDrives == parityDrives {
|
|
writeQuorum++
|
|
}
|
|
|
|
// Validate input data size and it can never be less than zero.
|
|
if data.Size() < -1 {
|
|
logger.LogIf(ctx, errInvalidArgument, logger.Application)
|
|
return ObjectInfo{}, toObjectErr(errInvalidArgument)
|
|
}
|
|
|
|
// Initialize parts metadata
|
|
partsMetadata := make([]FileInfo, len(storageDisks))
|
|
|
|
fi := newFileInfo(pathJoin(minioMetaBucket, key), dataDrives, parityDrives)
|
|
fi.DataDir = mustGetUUID()
|
|
|
|
// Initialize erasure metadata.
|
|
for index := range partsMetadata {
|
|
partsMetadata[index] = fi
|
|
}
|
|
|
|
// Order disks according to erasure distribution
|
|
var onlineDisks []StorageAPI
|
|
onlineDisks, partsMetadata = shuffleDisksAndPartsMetadata(storageDisks, partsMetadata, fi)
|
|
|
|
erasure, err := NewErasure(ctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize)
|
|
if err != nil {
|
|
return ObjectInfo{}, toObjectErr(err, minioMetaBucket, key)
|
|
}
|
|
|
|
// Fetch buffer for I/O, returns from the pool if not allocates a new one and returns.
|
|
var buffer []byte
|
|
switch size := data.Size(); {
|
|
case size == 0:
|
|
buffer = make([]byte, 1) // Allocate atleast a byte to reach EOF
|
|
case size >= fi.Erasure.BlockSize:
|
|
buffer = es.bp.Get()
|
|
defer es.bp.Put(buffer)
|
|
case size < fi.Erasure.BlockSize:
|
|
// No need to allocate fully blockSizeV1 buffer if the incoming data is smaller.
|
|
buffer = make([]byte, size, 2*size+int64(fi.Erasure.ParityBlocks+fi.Erasure.DataBlocks-1))
|
|
}
|
|
|
|
if len(buffer) > int(fi.Erasure.BlockSize) {
|
|
buffer = buffer[:fi.Erasure.BlockSize]
|
|
}
|
|
|
|
shardFileSize := erasure.ShardFileSize(data.Size())
|
|
writers := make([]io.Writer, len(onlineDisks))
|
|
inlineBuffers := make([]*bytes.Buffer, len(onlineDisks))
|
|
for i, disk := range onlineDisks {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
if disk.IsOnline() {
|
|
inlineBuffers[i] = bytes.NewBuffer(make([]byte, 0, shardFileSize))
|
|
writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize())
|
|
}
|
|
}
|
|
|
|
n, erasureErr := erasure.Encode(ctx, data, writers, buffer, writeQuorum)
|
|
closeBitrotWriters(writers)
|
|
if erasureErr != nil {
|
|
return ObjectInfo{}, toObjectErr(erasureErr, minioMetaBucket, key)
|
|
}
|
|
|
|
// Should return IncompleteBody{} error when reader has fewer bytes
|
|
// than specified in request header.
|
|
if n < data.Size() {
|
|
return ObjectInfo{}, IncompleteBody{Bucket: minioMetaBucket, Object: key}
|
|
}
|
|
|
|
for i, w := range writers {
|
|
if w == nil {
|
|
// Make sure to avoid writing to disks which we couldn't complete in erasure.Encode()
|
|
onlineDisks[i] = nil
|
|
continue
|
|
}
|
|
partsMetadata[i].Data = inlineBuffers[i].Bytes()
|
|
partsMetadata[i].AddObjectPart(1, "", n, data.ActualSize())
|
|
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{
|
|
PartNumber: 1,
|
|
Algorithm: DefaultBitrotAlgorithm,
|
|
Hash: bitrotWriterSum(w),
|
|
})
|
|
}
|
|
|
|
modTime := UTCNow()
|
|
|
|
// Fill all the necessary metadata.
|
|
// Update `xl.meta` content on each disks.
|
|
for index := range partsMetadata {
|
|
partsMetadata[index].Size = n
|
|
partsMetadata[index].Fresh = true
|
|
partsMetadata[index].ModTime = modTime
|
|
partsMetadata[index].Metadata = opts.UserDefined
|
|
}
|
|
|
|
// Set an additional header when data is inlined.
|
|
for index := range partsMetadata {
|
|
partsMetadata[index].SetInlineData()
|
|
}
|
|
|
|
for i := 0; i < len(onlineDisks); i++ {
|
|
if onlineDisks[i] != nil && onlineDisks[i].IsOnline() {
|
|
// Object info is the same in all disks, so we can pick
|
|
// the first meta from online disk
|
|
fi = partsMetadata[i]
|
|
break
|
|
}
|
|
}
|
|
|
|
if _, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaBucket, key, partsMetadata, writeQuorum); err != nil {
|
|
return ObjectInfo{}, toObjectErr(err, minioMetaBucket, key)
|
|
}
|
|
|
|
return fi.ToObjectInfo(minioMetaBucket, key, opts.Versioned || opts.VersionSuspended), nil
|
|
}
|
|
|
|
// PutObject - creates an object upon reading from the input stream
|
|
// until EOF, erasure codes the data across all disk and additionally
|
|
// writes `xl.meta` which carries the necessary metadata for future
|
|
// object operations.
|
|
func (es *erasureSingle) PutObject(ctx context.Context, bucket string, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
|
// Validate put object input args.
|
|
if err := checkPutObjectArgs(ctx, bucket, object, es); err != nil {
|
|
return ObjectInfo{}, err
|
|
}
|
|
|
|
object = encodeDirObject(object)
|
|
|
|
if !isMinioMetaBucketName(bucket) && !hasSpaceFor(getDiskInfos(ctx, es.disk), data.Size()) {
|
|
return ObjectInfo{}, toObjectErr(errDiskFull)
|
|
}
|
|
|
|
return es.putObject(ctx, bucket, object, data, opts)
|
|
}
|
|
|
|
// putObject wrapper for erasureObjects PutObject
|
|
func (es *erasureSingle) putObject(ctx context.Context, bucket string, object string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
|
data := r.Reader
|
|
|
|
// No metadata is set, allocate a new one.
|
|
if opts.UserDefined == nil {
|
|
opts.UserDefined = make(map[string]string)
|
|
}
|
|
|
|
storageDisks := []StorageAPI{es.disk}
|
|
parityDrives := 0
|
|
dataDrives := len(storageDisks) - parityDrives
|
|
|
|
// we now know the number of blocks this object needs for data and parity.
|
|
// writeQuorum is dataBlocks + 1
|
|
writeQuorum := dataDrives
|
|
if dataDrives == parityDrives {
|
|
writeQuorum++
|
|
}
|
|
|
|
// Validate input data size and it can never be less than zero.
|
|
if data.Size() < -1 {
|
|
logger.LogIf(ctx, errInvalidArgument, logger.Application)
|
|
return ObjectInfo{}, toObjectErr(errInvalidArgument)
|
|
}
|
|
|
|
// Initialize parts metadata
|
|
partsMetadata := make([]FileInfo, len(storageDisks))
|
|
|
|
fi := newFileInfo(pathJoin(bucket, object), dataDrives, parityDrives)
|
|
fi.VersionID = opts.VersionID
|
|
if opts.Versioned && fi.VersionID == "" {
|
|
fi.VersionID = mustGetUUID()
|
|
}
|
|
|
|
fi.DataDir = mustGetUUID()
|
|
uniqueID := mustGetUUID()
|
|
tempObj := uniqueID
|
|
|
|
// Initialize erasure metadata.
|
|
for index := range partsMetadata {
|
|
partsMetadata[index] = fi
|
|
}
|
|
|
|
// Order disks according to erasure distribution
|
|
var onlineDisks []StorageAPI
|
|
onlineDisks, partsMetadata = shuffleDisksAndPartsMetadata(storageDisks, partsMetadata, fi)
|
|
|
|
erasure, err := NewErasure(ctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize)
|
|
if err != nil {
|
|
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
// Fetch buffer for I/O, returns from the pool if not allocates a new one and returns.
|
|
var buffer []byte
|
|
switch size := data.Size(); {
|
|
case size == 0:
|
|
buffer = make([]byte, 1) // Allocate atleast a byte to reach EOF
|
|
case size == -1:
|
|
if size := data.ActualSize(); size > 0 && size < fi.Erasure.BlockSize {
|
|
buffer = make([]byte, data.ActualSize()+256, data.ActualSize()*2+512)
|
|
} else {
|
|
buffer = es.bp.Get()
|
|
defer es.bp.Put(buffer)
|
|
}
|
|
case size >= fi.Erasure.BlockSize:
|
|
buffer = es.bp.Get()
|
|
defer es.bp.Put(buffer)
|
|
case size < fi.Erasure.BlockSize:
|
|
// No need to allocate fully blockSizeV1 buffer if the incoming data is smaller.
|
|
buffer = make([]byte, size, 2*size+int64(fi.Erasure.ParityBlocks+fi.Erasure.DataBlocks-1))
|
|
}
|
|
|
|
if len(buffer) > int(fi.Erasure.BlockSize) {
|
|
buffer = buffer[:fi.Erasure.BlockSize]
|
|
}
|
|
|
|
partName := "part.1"
|
|
tempErasureObj := pathJoin(uniqueID, fi.DataDir, partName)
|
|
|
|
// Delete temporary object in the event of failure.
|
|
// If PutObject succeeded there would be no temporary
|
|
// object to delete.
|
|
var online int
|
|
defer func() {
|
|
if online != len(onlineDisks) {
|
|
es.disk.RenameFile(context.Background(), minioMetaTmpBucket, tempObj, minioMetaTmpDeletedBucket, mustGetUUID())
|
|
}
|
|
}()
|
|
|
|
shardFileSize := erasure.ShardFileSize(data.Size())
|
|
writers := make([]io.Writer, len(onlineDisks))
|
|
var inlineBuffers []*bytes.Buffer
|
|
if shardFileSize >= 0 {
|
|
if !opts.Versioned && shardFileSize < smallFileThreshold {
|
|
inlineBuffers = make([]*bytes.Buffer, len(onlineDisks))
|
|
} else if shardFileSize < smallFileThreshold/8 {
|
|
inlineBuffers = make([]*bytes.Buffer, len(onlineDisks))
|
|
}
|
|
} else {
|
|
// If compressed, use actual size to determine.
|
|
if sz := erasure.ShardFileSize(data.ActualSize()); sz > 0 {
|
|
if !opts.Versioned && sz < smallFileThreshold {
|
|
inlineBuffers = make([]*bytes.Buffer, len(onlineDisks))
|
|
} else if sz < smallFileThreshold/8 {
|
|
inlineBuffers = make([]*bytes.Buffer, len(onlineDisks))
|
|
}
|
|
}
|
|
}
|
|
for i, disk := range onlineDisks {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
|
|
if !disk.IsOnline() {
|
|
continue
|
|
}
|
|
|
|
if len(inlineBuffers) > 0 {
|
|
sz := shardFileSize
|
|
if sz < 0 {
|
|
sz = data.ActualSize()
|
|
}
|
|
inlineBuffers[i] = bytes.NewBuffer(make([]byte, 0, sz))
|
|
writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize())
|
|
continue
|
|
}
|
|
|
|
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tempErasureObj, shardFileSize, DefaultBitrotAlgorithm, erasure.ShardSize())
|
|
}
|
|
|
|
toEncode := io.Reader(data)
|
|
if data.Size() > bigFileThreshold {
|
|
// We use 2 buffers, so we always have a full buffer of input.
|
|
bufA := es.bp.Get()
|
|
bufB := es.bp.Get()
|
|
defer es.bp.Put(bufA)
|
|
defer es.bp.Put(bufB)
|
|
ra, err := readahead.NewReaderBuffer(data, [][]byte{bufA[:fi.Erasure.BlockSize], bufB[:fi.Erasure.BlockSize]})
|
|
if err == nil {
|
|
toEncode = ra
|
|
defer ra.Close()
|
|
}
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
n, erasureErr := erasure.Encode(ctx, toEncode, writers, buffer, writeQuorum)
|
|
closeBitrotWriters(writers)
|
|
if erasureErr != nil {
|
|
return ObjectInfo{}, toObjectErr(erasureErr, minioMetaTmpBucket, tempErasureObj)
|
|
}
|
|
|
|
// Should return IncompleteBody{} error when reader has fewer bytes
|
|
// than specified in request header.
|
|
if n < data.Size() {
|
|
return ObjectInfo{}, IncompleteBody{Bucket: bucket, Object: object}
|
|
}
|
|
|
|
if !opts.NoLock {
|
|
lk := es.NewNSLock(bucket, object)
|
|
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return ObjectInfo{}, err
|
|
}
|
|
ctx = lkctx.Context()
|
|
defer lk.Unlock(lkctx.Cancel)
|
|
}
|
|
|
|
for i, w := range writers {
|
|
if w == nil {
|
|
onlineDisks[i] = nil
|
|
continue
|
|
}
|
|
if len(inlineBuffers) > 0 && inlineBuffers[i] != nil {
|
|
partsMetadata[i].Data = inlineBuffers[i].Bytes()
|
|
} else {
|
|
partsMetadata[i].Data = nil
|
|
}
|
|
partsMetadata[i].AddObjectPart(1, "", n, data.ActualSize())
|
|
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{
|
|
PartNumber: 1,
|
|
Algorithm: DefaultBitrotAlgorithm,
|
|
Hash: bitrotWriterSum(w),
|
|
})
|
|
}
|
|
if opts.UserDefined["etag"] == "" {
|
|
opts.UserDefined["etag"] = r.MD5CurrentHexString()
|
|
}
|
|
|
|
// Guess content-type from the extension if possible.
|
|
if opts.UserDefined["content-type"] == "" {
|
|
opts.UserDefined["content-type"] = mimedb.TypeByExtension(path.Ext(object))
|
|
}
|
|
|
|
modTime := opts.MTime
|
|
if opts.MTime.IsZero() {
|
|
modTime = UTCNow()
|
|
}
|
|
|
|
// Fill all the necessary metadata.
|
|
// Update `xl.meta` content on each disks.
|
|
for index := range partsMetadata {
|
|
partsMetadata[index].Metadata = opts.UserDefined
|
|
partsMetadata[index].Size = n
|
|
partsMetadata[index].ModTime = modTime
|
|
}
|
|
|
|
if len(inlineBuffers) > 0 {
|
|
// Set an additional header when data is inlined.
|
|
for index := range partsMetadata {
|
|
partsMetadata[index].SetInlineData()
|
|
}
|
|
}
|
|
|
|
// Rename the successfully written temporary object to final location.
|
|
if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaTmpBucket, tempObj, partsMetadata, bucket, object, writeQuorum); err != nil {
|
|
if errors.Is(err, errFileNotFound) {
|
|
return ObjectInfo{}, toObjectErr(errErasureWriteQuorum, bucket, object)
|
|
}
|
|
logger.LogIf(ctx, err)
|
|
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
for i := 0; i < len(onlineDisks); i++ {
|
|
if onlineDisks[i] != nil && onlineDisks[i].IsOnline() {
|
|
// Object info is the same in all disks, so we can pick
|
|
// the first meta from online disk
|
|
fi = partsMetadata[i]
|
|
break
|
|
}
|
|
}
|
|
|
|
fi.ReplicationState = opts.PutReplicationState()
|
|
online = countOnlineDisks(onlineDisks)
|
|
|
|
// we are adding a new version to this object under the namespace lock, so this is the latest version.
|
|
fi.IsLatest = true
|
|
|
|
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
|
|
}
|
|
|
|
func (es *erasureSingle) deleteObjectVersion(ctx context.Context, bucket, object string, writeQuorum int, fi FileInfo, forceDelMarker bool) error {
|
|
return es.disk.DeleteVersion(ctx, bucket, object, fi, forceDelMarker)
|
|
}
|
|
|
|
// DeleteObjects deletes objects/versions in bulk, this function will still automatically split objects list
|
|
// into smaller bulks if some object names are found to be duplicated in the delete list, splitting
|
|
// into smaller bulks will avoid holding twice the write lock of the duplicated object names.
|
|
func (es *erasureSingle) DeleteObjects(ctx context.Context, bucket string, objects []ObjectToDelete, opts ObjectOptions) ([]DeletedObject, []error) {
|
|
errs := make([]error, len(objects))
|
|
dobjects := make([]DeletedObject, len(objects))
|
|
objSets := set.NewStringSet()
|
|
for i := range errs {
|
|
objects[i].ObjectName = encodeDirObject(objects[i].ObjectName)
|
|
|
|
errs[i] = checkDelObjArgs(ctx, bucket, objects[i].ObjectName)
|
|
objSets.Add(objects[i].ObjectName)
|
|
}
|
|
|
|
// Acquire a bulk write lock across 'objects'
|
|
multiDeleteLock := es.NewNSLock(bucket, objSets.ToSlice()...)
|
|
lkctx, err := multiDeleteLock.GetLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
for i := range errs {
|
|
errs[i] = err
|
|
}
|
|
return dobjects, errs
|
|
}
|
|
ctx = lkctx.Context()
|
|
defer multiDeleteLock.Unlock(lkctx.Cancel)
|
|
|
|
writeQuorums := make([]int, len(objects))
|
|
storageDisks := []StorageAPI{es.disk}
|
|
|
|
for i := range objects {
|
|
// Single drive write quorum is '1'
|
|
writeQuorums[i] = 1
|
|
}
|
|
|
|
versionsMap := make(map[string]FileInfoVersions, len(objects))
|
|
for i := range objects {
|
|
// Construct the FileInfo data that needs to be preserved on the disk.
|
|
vr := FileInfo{
|
|
Name: objects[i].ObjectName,
|
|
VersionID: objects[i].VersionID,
|
|
ReplicationState: objects[i].ReplicationState(),
|
|
// save the index to set correct error at this index.
|
|
Idx: i,
|
|
}
|
|
vr.SetTierFreeVersionID(mustGetUUID())
|
|
// VersionID is not set means delete is not specific about
|
|
// any version, look for if the bucket is versioned or not.
|
|
if objects[i].VersionID == "" {
|
|
// MinIO extension to bucket version configuration
|
|
suspended := opts.VersionSuspended
|
|
versioned := opts.Versioned
|
|
if opts.PrefixEnabledFn != nil {
|
|
versioned = opts.PrefixEnabledFn(objects[i].ObjectName)
|
|
}
|
|
|
|
if versioned || suspended {
|
|
// Bucket is versioned and no version was explicitly
|
|
// mentioned for deletes, create a delete marker instead.
|
|
vr.ModTime = UTCNow()
|
|
vr.Deleted = true
|
|
// Versioning suspended means that we add a `null` version
|
|
// delete marker, if not add a new version for this delete
|
|
// marker.
|
|
if versioned {
|
|
vr.VersionID = mustGetUUID()
|
|
}
|
|
}
|
|
}
|
|
// De-dup same object name to collect multiple versions for same object.
|
|
v, ok := versionsMap[objects[i].ObjectName]
|
|
if ok {
|
|
v.Versions = append(v.Versions, vr)
|
|
} else {
|
|
v = FileInfoVersions{
|
|
Name: vr.Name,
|
|
Versions: []FileInfo{vr},
|
|
}
|
|
}
|
|
if vr.Deleted {
|
|
dobjects[i] = DeletedObject{
|
|
DeleteMarker: vr.Deleted,
|
|
DeleteMarkerVersionID: vr.VersionID,
|
|
DeleteMarkerMTime: DeleteMarkerMTime{vr.ModTime},
|
|
ObjectName: vr.Name,
|
|
ReplicationState: vr.ReplicationState,
|
|
}
|
|
} else {
|
|
dobjects[i] = DeletedObject{
|
|
ObjectName: vr.Name,
|
|
VersionID: vr.VersionID,
|
|
ReplicationState: vr.ReplicationState,
|
|
}
|
|
}
|
|
versionsMap[objects[i].ObjectName] = v
|
|
}
|
|
|
|
dedupVersions := make([]FileInfoVersions, 0, len(versionsMap))
|
|
for _, version := range versionsMap {
|
|
dedupVersions = append(dedupVersions, version)
|
|
}
|
|
|
|
// Initialize list of errors.
|
|
delObjErrs := make([][]error, len(storageDisks))
|
|
|
|
var wg sync.WaitGroup
|
|
// Remove versions in bulk for each disk
|
|
for index, disk := range storageDisks {
|
|
wg.Add(1)
|
|
go func(index int, disk StorageAPI) {
|
|
defer wg.Done()
|
|
delObjErrs[index] = make([]error, len(objects))
|
|
if disk == nil {
|
|
for i := range objects {
|
|
delObjErrs[index][i] = errDiskNotFound
|
|
}
|
|
return
|
|
}
|
|
errs := disk.DeleteVersions(ctx, bucket, dedupVersions)
|
|
for i, err := range errs {
|
|
if err == nil {
|
|
continue
|
|
}
|
|
for _, v := range dedupVersions[i].Versions {
|
|
if err == errFileNotFound || err == errFileVersionNotFound {
|
|
if !dobjects[v.Idx].DeleteMarker {
|
|
// Not delete marker, if not found, ok.
|
|
continue
|
|
}
|
|
}
|
|
delObjErrs[index][v.Idx] = err
|
|
}
|
|
}
|
|
}(index, disk)
|
|
}
|
|
wg.Wait()
|
|
|
|
// Reduce errors for each object
|
|
for objIndex := range objects {
|
|
diskErrs := make([]error, len(storageDisks))
|
|
// Iterate over disks to fetch the error
|
|
// of deleting of the current object
|
|
for i := range delObjErrs {
|
|
// delObjErrs[i] is not nil when disks[i] is also not nil
|
|
if delObjErrs[i] != nil {
|
|
diskErrs[i] = delObjErrs[i][objIndex]
|
|
}
|
|
}
|
|
err := reduceWriteQuorumErrs(ctx, diskErrs, objectOpIgnoredErrs, writeQuorums[objIndex])
|
|
if objects[objIndex].VersionID != "" {
|
|
errs[objIndex] = toObjectErr(err, bucket, objects[objIndex].ObjectName, objects[objIndex].VersionID)
|
|
} else {
|
|
errs[objIndex] = toObjectErr(err, bucket, objects[objIndex].ObjectName)
|
|
}
|
|
|
|
defer NSUpdated(bucket, objects[objIndex].ObjectName)
|
|
}
|
|
|
|
return dobjects, errs
|
|
}
|
|
|
|
func (es *erasureSingle) deletePrefix(ctx context.Context, bucket, prefix string) error {
|
|
dirPrefix := encodeDirObject(prefix)
|
|
defer es.disk.Delete(ctx, bucket, dirPrefix, true)
|
|
return es.disk.Delete(ctx, bucket, prefix, true)
|
|
}
|
|
|
|
// DeleteObject - deletes an object, this call doesn't necessary reply
|
|
// any error as it is not necessary for the handler to reply back a
|
|
// response to the client request.
|
|
func (es *erasureSingle) DeleteObject(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
|
if err = checkDelObjArgs(ctx, bucket, object); err != nil {
|
|
return objInfo, err
|
|
}
|
|
|
|
if opts.DeletePrefix {
|
|
return ObjectInfo{}, toObjectErr(es.deletePrefix(ctx, bucket, object), bucket, object)
|
|
}
|
|
|
|
object = encodeDirObject(object)
|
|
var lc *lifecycle.Lifecycle
|
|
var rcfg lock.Retention
|
|
if opts.Expiration.Expire {
|
|
// Check if the current bucket has a configured lifecycle policy
|
|
lc, _ = globalLifecycleSys.Get(bucket)
|
|
rcfg, _ = globalBucketObjectLockSys.Get(bucket)
|
|
}
|
|
|
|
// expiration attempted on a bucket with no lifecycle
|
|
// rules shall be rejected.
|
|
if lc == nil && opts.Expiration.Expire {
|
|
if opts.VersionID != "" {
|
|
return objInfo, VersionNotFound{
|
|
Bucket: bucket,
|
|
Object: object,
|
|
VersionID: opts.VersionID,
|
|
}
|
|
}
|
|
return objInfo, ObjectNotFound{
|
|
Bucket: bucket,
|
|
Object: object,
|
|
}
|
|
}
|
|
|
|
// Acquire a write lock before deleting the object.
|
|
lk := es.NewNSLock(bucket, object)
|
|
lkctx, err := lk.GetLock(ctx, globalDeleteOperationTimeout)
|
|
if err != nil {
|
|
return ObjectInfo{}, err
|
|
}
|
|
ctx = lkctx.Context()
|
|
defer lk.Unlock(lkctx.Cancel)
|
|
|
|
versionFound := true
|
|
objInfo = ObjectInfo{VersionID: opts.VersionID} // version id needed in Delete API response.
|
|
goi, writeQuorum, gerr := es.getObjectInfoAndQuorum(ctx, bucket, object, opts)
|
|
if gerr != nil && goi.Name == "" {
|
|
switch gerr.(type) {
|
|
case InsufficientReadQuorum:
|
|
return objInfo, InsufficientWriteQuorum{}
|
|
}
|
|
// For delete marker replication, versionID being replicated will not exist on disk
|
|
if opts.DeleteMarker {
|
|
versionFound = false
|
|
} else {
|
|
return objInfo, gerr
|
|
}
|
|
}
|
|
|
|
if opts.Expiration.Expire {
|
|
action := evalActionFromLifecycle(ctx, *lc, rcfg, goi, false)
|
|
var isErr bool
|
|
switch action {
|
|
case lifecycle.NoneAction:
|
|
isErr = true
|
|
case lifecycle.TransitionAction, lifecycle.TransitionVersionAction:
|
|
isErr = true
|
|
}
|
|
if isErr {
|
|
if goi.VersionID != "" {
|
|
return goi, VersionNotFound{
|
|
Bucket: bucket,
|
|
Object: object,
|
|
VersionID: goi.VersionID,
|
|
}
|
|
}
|
|
return goi, ObjectNotFound{
|
|
Bucket: bucket,
|
|
Object: object,
|
|
}
|
|
}
|
|
}
|
|
|
|
defer NSUpdated(bucket, object)
|
|
|
|
var markDelete bool
|
|
// Determine whether to mark object deleted for replication
|
|
if goi.VersionID != "" {
|
|
markDelete = true
|
|
}
|
|
|
|
// Default deleteMarker to true if object is under versioning
|
|
deleteMarker := opts.Versioned
|
|
|
|
if opts.VersionID != "" {
|
|
// case where replica version needs to be deleted on target cluster
|
|
if versionFound && opts.DeleteMarkerReplicationStatus() == replication.Replica {
|
|
markDelete = false
|
|
}
|
|
if opts.VersionPurgeStatus().Empty() && opts.DeleteMarkerReplicationStatus().Empty() {
|
|
markDelete = false
|
|
}
|
|
if opts.VersionPurgeStatus() == Complete {
|
|
markDelete = false
|
|
}
|
|
|
|
// Version is found but we do not wish to create more delete markers
|
|
// now, since VersionPurgeStatus() is already set, we can let the
|
|
// lower layers decide this. This fixes a regression that was introduced
|
|
// in PR #14555 where !VersionPurgeStatus.Empty() is automatically
|
|
// considered as Delete marker true to avoid listing such objects by
|
|
// regular ListObjects() calls. However for delete replication this
|
|
// ends up being a problem because "upon" a successful delete this
|
|
// ends up creating a new delete marker that is spurious and unnecessary.
|
|
if versionFound {
|
|
if !goi.VersionPurgeStatus.Empty() {
|
|
deleteMarker = false
|
|
} else if !goi.DeleteMarker { // implies a versioned delete of object
|
|
deleteMarker = false
|
|
}
|
|
}
|
|
}
|
|
|
|
modTime := opts.MTime
|
|
if opts.MTime.IsZero() {
|
|
modTime = UTCNow()
|
|
}
|
|
fvID := mustGetUUID()
|
|
if markDelete {
|
|
if opts.Versioned || opts.VersionSuspended {
|
|
if !deleteMarker {
|
|
// versioning suspended means we add `null` version as
|
|
// delete marker, if its not decided already.
|
|
deleteMarker = opts.VersionSuspended && opts.VersionID == ""
|
|
}
|
|
fi := FileInfo{
|
|
Name: object,
|
|
Deleted: deleteMarker,
|
|
MarkDeleted: markDelete,
|
|
ModTime: modTime,
|
|
ReplicationState: opts.DeleteReplication,
|
|
TransitionStatus: opts.Transition.Status,
|
|
ExpireRestored: opts.Transition.ExpireRestored,
|
|
}
|
|
fi.SetTierFreeVersionID(fvID)
|
|
if opts.Versioned {
|
|
fi.VersionID = mustGetUUID()
|
|
if opts.VersionID != "" {
|
|
fi.VersionID = opts.VersionID
|
|
}
|
|
}
|
|
// versioning suspended means we add `null` version as
|
|
// delete marker. Add delete marker, since we don't have
|
|
// any version specified explicitly. Or if a particular
|
|
// version id needs to be replicated.
|
|
if err = es.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi, opts.DeleteMarker); err != nil {
|
|
return objInfo, toObjectErr(err, bucket, object)
|
|
}
|
|
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
|
|
}
|
|
}
|
|
|
|
// Delete the object version on all disks.
|
|
dfi := FileInfo{
|
|
Name: object,
|
|
VersionID: opts.VersionID,
|
|
MarkDeleted: markDelete,
|
|
Deleted: deleteMarker,
|
|
ModTime: modTime,
|
|
ReplicationState: opts.DeleteReplication,
|
|
TransitionStatus: opts.Transition.Status,
|
|
ExpireRestored: opts.Transition.ExpireRestored,
|
|
}
|
|
dfi.SetTierFreeVersionID(fvID)
|
|
if err = es.deleteObjectVersion(ctx, bucket, object, writeQuorum, dfi, opts.DeleteMarker); err != nil {
|
|
return objInfo, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
return ObjectInfo{
|
|
Bucket: bucket,
|
|
Name: object,
|
|
VersionID: opts.VersionID,
|
|
VersionPurgeStatusInternal: opts.DeleteReplication.VersionPurgeStatusInternal,
|
|
ReplicationStatusInternal: opts.DeleteReplication.ReplicationStatusInternal,
|
|
}, nil
|
|
}
|
|
|
|
func (es *erasureSingle) PutObjectMetadata(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
|
|
if !opts.NoLock {
|
|
// Lock the object before updating metadata.
|
|
lk := es.NewNSLock(bucket, object)
|
|
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return ObjectInfo{}, err
|
|
}
|
|
ctx = lkctx.Context()
|
|
defer lk.Unlock(lkctx.Cancel)
|
|
}
|
|
|
|
disks := []StorageAPI{es.disk}
|
|
|
|
var metaArr []FileInfo
|
|
var errs []error
|
|
|
|
// Read metadata associated with the object from all disks.
|
|
metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false)
|
|
|
|
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, 0)
|
|
if err != nil {
|
|
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
// List all online disks.
|
|
onlineDisks, modTime := listOnlineDisks(disks, metaArr, errs)
|
|
|
|
// Pick latest valid metadata.
|
|
fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum)
|
|
if err != nil {
|
|
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
if fi.Deleted {
|
|
return ObjectInfo{}, toObjectErr(errMethodNotAllowed, bucket, object)
|
|
}
|
|
|
|
filterOnlineDisksInplace(fi, metaArr, onlineDisks)
|
|
|
|
// if version-id is not specified retention is supposed to be set on the latest object.
|
|
if opts.VersionID == "" {
|
|
opts.VersionID = fi.VersionID
|
|
}
|
|
|
|
objInfo := fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
|
|
if opts.EvalMetadataFn != nil {
|
|
if err := opts.EvalMetadataFn(objInfo); err != nil {
|
|
return ObjectInfo{}, err
|
|
}
|
|
}
|
|
for k, v := range objInfo.UserDefined {
|
|
fi.Metadata[k] = v
|
|
}
|
|
fi.ModTime = opts.MTime
|
|
fi.VersionID = opts.VersionID
|
|
|
|
if err = es.updateObjectMeta(ctx, bucket, object, fi, onlineDisks...); err != nil {
|
|
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
|
|
}
|
|
|
|
// PutObjectTags - replace or add tags to an existing object
|
|
func (es *erasureSingle) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) (ObjectInfo, error) {
|
|
// Lock the object before updating tags.
|
|
lk := es.NewNSLock(bucket, object)
|
|
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return ObjectInfo{}, err
|
|
}
|
|
ctx = lkctx.Context()
|
|
defer lk.Unlock(lkctx.Cancel)
|
|
|
|
disks := []StorageAPI{es.disk}
|
|
|
|
var metaArr []FileInfo
|
|
var errs []error
|
|
|
|
// Read metadata associated with the object from all disks.
|
|
if opts.VersionID != "" {
|
|
metaArr, errs = readAllFileInfo(ctx, disks, bucket, object, opts.VersionID, false)
|
|
} else {
|
|
metaArr, errs = readAllXL(ctx, disks, bucket, object, false)
|
|
}
|
|
|
|
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, 0)
|
|
if err != nil {
|
|
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
// List all online disks.
|
|
onlineDisks, modTime := listOnlineDisks(disks, metaArr, errs)
|
|
|
|
// Pick latest valid metadata.
|
|
fi, err := pickValidFileInfo(ctx, metaArr, modTime, readQuorum)
|
|
if err != nil {
|
|
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
|
}
|
|
if fi.Deleted {
|
|
if opts.VersionID == "" {
|
|
return ObjectInfo{}, toObjectErr(errFileNotFound, bucket, object)
|
|
}
|
|
return ObjectInfo{}, toObjectErr(errMethodNotAllowed, bucket, object)
|
|
}
|
|
|
|
filterOnlineDisksInplace(fi, metaArr, onlineDisks)
|
|
|
|
fi.Metadata[xhttp.AmzObjectTagging] = tags
|
|
fi.ReplicationState = opts.PutReplicationState()
|
|
for k, v := range opts.UserDefined {
|
|
fi.Metadata[k] = v
|
|
}
|
|
|
|
if err = es.updateObjectMeta(ctx, bucket, object, fi, onlineDisks...); err != nil {
|
|
return ObjectInfo{}, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
|
|
}
|
|
|
|
// updateObjectMeta will update the metadata of a file.
|
|
func (es *erasureSingle) updateObjectMeta(ctx context.Context, bucket, object string, fi FileInfo, onlineDisks ...StorageAPI) error {
|
|
if len(fi.Metadata) == 0 {
|
|
return nil
|
|
}
|
|
|
|
g := errgroup.WithNErrs(len(onlineDisks))
|
|
|
|
// Start writing `xl.meta` to all disks in parallel.
|
|
for index := range onlineDisks {
|
|
index := index
|
|
g.Go(func() error {
|
|
if onlineDisks[index] == nil {
|
|
return errDiskNotFound
|
|
}
|
|
return onlineDisks[index].UpdateMetadata(ctx, bucket, object, fi)
|
|
}, index)
|
|
}
|
|
|
|
// Wait for all the routines.
|
|
mErrs := g.Wait()
|
|
|
|
return reduceWriteQuorumErrs(ctx, mErrs, objectOpIgnoredErrs, 1)
|
|
}
|
|
|
|
// DeleteObjectTags - delete object tags from an existing object
|
|
func (es *erasureSingle) DeleteObjectTags(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
|
|
return es.PutObjectTags(ctx, bucket, object, "", opts)
|
|
}
|
|
|
|
// GetObjectTags - get object tags from an existing object
|
|
func (es *erasureSingle) GetObjectTags(ctx context.Context, bucket, object string, opts ObjectOptions) (*tags.Tags, error) {
|
|
// GetObjectInfo will return tag value as well
|
|
oi, err := es.GetObjectInfo(ctx, bucket, object, opts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return tags.ParseObjectTags(oi.UserTags)
|
|
}
|
|
|
|
// TransitionObject - transition object content to target tier.
|
|
func (es *erasureSingle) TransitionObject(ctx context.Context, bucket, object string, opts ObjectOptions) error {
|
|
tgtClient, err := globalTierConfigMgr.getDriver(opts.Transition.Tier)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Acquire write lock before starting to transition the object.
|
|
lk := es.NewNSLock(bucket, object)
|
|
lkctx, err := lk.GetLock(ctx, globalDeleteOperationTimeout)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ctx = lkctx.Context()
|
|
defer lk.Unlock(lkctx.Cancel)
|
|
|
|
fi, metaArr, onlineDisks, err := es.getObjectFileInfo(ctx, bucket, object, opts, true)
|
|
if err != nil {
|
|
return toObjectErr(err, bucket, object)
|
|
}
|
|
if fi.Deleted {
|
|
if opts.VersionID == "" {
|
|
return toObjectErr(errFileNotFound, bucket, object)
|
|
}
|
|
// Make sure to return object info to provide extra information.
|
|
return toObjectErr(errMethodNotAllowed, bucket, object)
|
|
}
|
|
// verify that the object queued for transition is identical to that on disk.
|
|
if !opts.MTime.Equal(fi.ModTime) || !strings.EqualFold(opts.Transition.ETag, extractETag(fi.Metadata)) {
|
|
return toObjectErr(errFileNotFound, bucket, object)
|
|
}
|
|
// if object already transitioned, return
|
|
if fi.TransitionStatus == lifecycle.TransitionComplete {
|
|
return nil
|
|
}
|
|
defer NSUpdated(bucket, object)
|
|
|
|
destObj, err := genTransitionObjName(bucket)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
pr, pw := xioutil.WaitPipe()
|
|
go func() {
|
|
err := es.getObjectWithFileInfo(ctx, bucket, object, 0, fi.Size, pw, fi, metaArr, onlineDisks)
|
|
pw.CloseWithError(err)
|
|
}()
|
|
|
|
var rv remoteVersionID
|
|
rv, err = tgtClient.Put(ctx, destObj, pr, fi.Size)
|
|
pr.CloseWithError(err)
|
|
if err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to transition %s/%s(%s) to %s tier: %w", bucket, object, opts.VersionID, opts.Transition.Tier, err))
|
|
return err
|
|
}
|
|
fi.TransitionStatus = lifecycle.TransitionComplete
|
|
fi.TransitionedObjName = destObj
|
|
fi.TransitionTier = opts.Transition.Tier
|
|
fi.TransitionVersionID = string(rv)
|
|
eventName := event.ObjectTransitionComplete
|
|
|
|
// we now know the number of blocks this object needs for data and parity.
|
|
// writeQuorum is dataBlocks + 1
|
|
writeQuorum := fi.Erasure.DataBlocks
|
|
if fi.Erasure.DataBlocks == fi.Erasure.ParityBlocks {
|
|
writeQuorum++
|
|
}
|
|
|
|
if err = es.deleteObjectVersion(ctx, bucket, object, writeQuorum, fi, false); err != nil {
|
|
eventName = event.ObjectTransitionFailed
|
|
}
|
|
|
|
objInfo := fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
|
|
sendEvent(eventArgs{
|
|
EventName: eventName,
|
|
BucketName: bucket,
|
|
Object: objInfo,
|
|
Host: "Internal: [ILM-Transition]",
|
|
})
|
|
auditLogLifecycle(ctx, objInfo, ILMTransition)
|
|
return err
|
|
}
|
|
|
|
// RestoreTransitionedObject - restore transitioned object content locally on this cluster.
|
|
// This is similar to PostObjectRestore from AWS GLACIER
|
|
// storage class. When PostObjectRestore API is called, a temporary copy of the object
|
|
// is restored locally to the bucket on source cluster until the restore expiry date.
|
|
// The copy that was transitioned continues to reside in the transitioned tier.
|
|
func (es *erasureSingle) RestoreTransitionedObject(ctx context.Context, bucket, object string, opts ObjectOptions) error {
|
|
return es.restoreTransitionedObject(ctx, bucket, object, opts)
|
|
}
|
|
|
|
// update restore status header in the metadata
|
|
func (es *erasureSingle) updateRestoreMetadata(ctx context.Context, bucket, object string, objInfo ObjectInfo, opts ObjectOptions, rerr error) error {
|
|
oi := objInfo.Clone()
|
|
oi.metadataOnly = true // Perform only metadata updates.
|
|
|
|
if rerr == nil {
|
|
oi.UserDefined[xhttp.AmzRestore] = completedRestoreObj(opts.Transition.RestoreExpiry).String()
|
|
} else { // allow retry in the case of failure to restore
|
|
delete(oi.UserDefined, xhttp.AmzRestore)
|
|
}
|
|
if _, err := es.CopyObject(ctx, bucket, object, bucket, object, oi, ObjectOptions{
|
|
VersionID: oi.VersionID,
|
|
}, ObjectOptions{
|
|
VersionID: oi.VersionID,
|
|
}); err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("Unable to update transition restore metadata for %s/%s(%s): %s", bucket, object, oi.VersionID, err))
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// restoreTransitionedObject for multipart object chunks the file stream from remote tier into the same number of parts
|
|
// as in the xl.meta for this version and rehydrates the part.n into the fi.DataDir for this version as in the xl.meta
|
|
func (es *erasureSingle) restoreTransitionedObject(ctx context.Context, bucket string, object string, opts ObjectOptions) error {
|
|
setRestoreHeaderFn := func(oi ObjectInfo, rerr error) error {
|
|
es.updateRestoreMetadata(ctx, bucket, object, oi, opts, rerr)
|
|
return rerr
|
|
}
|
|
var oi ObjectInfo
|
|
// get the file info on disk for transitioned object
|
|
actualfi, _, _, err := es.getObjectFileInfo(ctx, bucket, object, opts, false)
|
|
if err != nil {
|
|
return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object))
|
|
}
|
|
|
|
oi = actualfi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended)
|
|
ropts := putRestoreOpts(bucket, object, opts.Transition.RestoreRequest, oi)
|
|
if len(oi.Parts) == 1 {
|
|
var rs *HTTPRangeSpec
|
|
gr, err := getTransitionedObjectReader(ctx, bucket, object, rs, http.Header{}, oi, opts)
|
|
if err != nil {
|
|
return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object))
|
|
}
|
|
defer gr.Close()
|
|
hashReader, err := hash.NewReader(gr, gr.ObjInfo.Size, "", "", gr.ObjInfo.Size)
|
|
if err != nil {
|
|
return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object))
|
|
}
|
|
pReader := NewPutObjReader(hashReader)
|
|
ropts.UserDefined[xhttp.AmzRestore] = completedRestoreObj(opts.Transition.RestoreExpiry).String()
|
|
_, err = es.PutObject(ctx, bucket, object, pReader, ropts)
|
|
return setRestoreHeaderFn(oi, toObjectErr(err, bucket, object))
|
|
}
|
|
|
|
uploadID, err := es.NewMultipartUpload(ctx, bucket, object, ropts)
|
|
if err != nil {
|
|
return setRestoreHeaderFn(oi, err)
|
|
}
|
|
|
|
var uploadedParts []CompletePart
|
|
var rs *HTTPRangeSpec
|
|
// get reader from the warm backend - note that even in the case of encrypted objects, this stream is still encrypted.
|
|
gr, err := getTransitionedObjectReader(ctx, bucket, object, rs, http.Header{}, oi, opts)
|
|
if err != nil {
|
|
return setRestoreHeaderFn(oi, err)
|
|
}
|
|
defer gr.Close()
|
|
|
|
// rehydrate the parts back on disk as per the original xl.meta prior to transition
|
|
for _, partInfo := range oi.Parts {
|
|
hr, err := hash.NewReader(gr, partInfo.Size, "", "", partInfo.Size)
|
|
if err != nil {
|
|
return setRestoreHeaderFn(oi, err)
|
|
}
|
|
pInfo, err := es.PutObjectPart(ctx, bucket, object, uploadID, partInfo.Number, NewPutObjReader(hr), ObjectOptions{})
|
|
if err != nil {
|
|
return setRestoreHeaderFn(oi, err)
|
|
}
|
|
if pInfo.Size != partInfo.Size {
|
|
return setRestoreHeaderFn(oi, InvalidObjectState{Bucket: bucket, Object: object})
|
|
}
|
|
uploadedParts = append(uploadedParts, CompletePart{
|
|
PartNumber: pInfo.PartNumber,
|
|
ETag: pInfo.ETag,
|
|
})
|
|
}
|
|
_, err = es.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, ObjectOptions{
|
|
MTime: oi.ModTime,
|
|
})
|
|
return setRestoreHeaderFn(oi, err)
|
|
}
|
|
|
|
func (es *erasureSingle) getUploadIDDir(bucket, object, uploadID string) string {
|
|
return pathJoin(es.getMultipartSHADir(bucket, object), uploadID)
|
|
}
|
|
|
|
func (es *erasureSingle) getMultipartSHADir(bucket, object string) string {
|
|
return getSHA256Hash([]byte(pathJoin(bucket, object)))
|
|
}
|
|
|
|
// checkUploadIDExists - verify if a given uploadID exists and is valid.
|
|
func (es *erasureSingle) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string) (err error) {
|
|
defer func() {
|
|
if err == errFileNotFound {
|
|
err = errUploadIDNotFound
|
|
}
|
|
}()
|
|
|
|
disks := []StorageAPI{es.disk}
|
|
|
|
// Read metadata associated with the object from all disks.
|
|
metaArr, errs := readAllFileInfo(ctx, disks, minioMetaMultipartBucket, es.getUploadIDDir(bucket, object, uploadID), "", false)
|
|
|
|
readQuorum, _, err := objectQuorumFromMeta(ctx, metaArr, errs, 0)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if reducedErr := reduceReadQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum); reducedErr != nil {
|
|
return reducedErr
|
|
}
|
|
|
|
// List all online disks.
|
|
_, modTime := listOnlineDisks(disks, metaArr, errs)
|
|
|
|
// Pick latest valid metadata.
|
|
_, err = pickValidFileInfo(ctx, metaArr, modTime, readQuorum)
|
|
return err
|
|
}
|
|
|
|
// Removes part given by partName belonging to a mulitpart upload from minioMetaBucket
|
|
func (es *erasureSingle) removeObjectPart(bucket, object, uploadID, dataDir string, partNumber int) {
|
|
uploadIDPath := es.getUploadIDDir(bucket, object, uploadID)
|
|
curpartPath := pathJoin(uploadIDPath, dataDir, fmt.Sprintf("part.%d", partNumber))
|
|
storageDisks := []StorageAPI{es.disk}
|
|
|
|
g := errgroup.WithNErrs(len(storageDisks))
|
|
for index, disk := range storageDisks {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
index := index
|
|
g.Go(func() error {
|
|
// Ignoring failure to remove parts that weren't present in CompleteMultipartUpload
|
|
// requests. xl.meta is the authoritative source of truth on which parts constitute
|
|
// the object. The presence of parts that don't belong in the object doesn't affect correctness.
|
|
_ = storageDisks[index].Delete(context.TODO(), minioMetaMultipartBucket, curpartPath, false)
|
|
return nil
|
|
}, index)
|
|
}
|
|
g.Wait()
|
|
}
|
|
|
|
// Remove the old multipart uploads on the given disk.
|
|
func (es *erasureSingle) cleanupStaleUploadsOnDisk(ctx context.Context, disk StorageAPI, expiry time.Duration) {
|
|
now := time.Now()
|
|
diskPath := disk.Endpoint().Path
|
|
|
|
readDirFn(pathJoin(diskPath, minioMetaMultipartBucket), func(shaDir string, typ os.FileMode) error {
|
|
return readDirFn(pathJoin(diskPath, minioMetaMultipartBucket, shaDir), func(uploadIDDir string, typ os.FileMode) error {
|
|
uploadIDPath := pathJoin(shaDir, uploadIDDir)
|
|
fi, err := disk.ReadVersion(ctx, minioMetaMultipartBucket, uploadIDPath, "", false)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
wait := es.deletedCleanupSleeper.Timer(ctx)
|
|
if now.Sub(fi.ModTime) > expiry {
|
|
es.disk.RenameFile(context.Background(), minioMetaMultipartBucket, uploadIDPath, minioMetaTmpDeletedBucket, mustGetUUID())
|
|
}
|
|
wait()
|
|
return nil
|
|
})
|
|
})
|
|
|
|
readDirFn(pathJoin(diskPath, minioMetaTmpBucket), func(tmpDir string, typ os.FileMode) error {
|
|
if tmpDir == ".trash/" { // do not remove .trash/ here, it has its own routines
|
|
return nil
|
|
}
|
|
vi, err := disk.StatVol(ctx, pathJoin(minioMetaTmpBucket, tmpDir))
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
wait := es.deletedCleanupSleeper.Timer(ctx)
|
|
if now.Sub(vi.Created) > expiry {
|
|
disk.Delete(ctx, minioMetaTmpBucket, tmpDir, true)
|
|
}
|
|
wait()
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// ListMultipartUploads - lists all the pending multipart
|
|
// uploads for a particular object in a bucket.
|
|
//
|
|
// Implements minimal S3 compatible ListMultipartUploads API. We do
|
|
// not support prefix based listing, this is a deliberate attempt
|
|
// towards simplification of multipart APIs.
|
|
// The resulting ListMultipartsInfo structure is unmarshalled directly as XML.
|
|
func (es *erasureSingle) ListMultipartUploads(ctx context.Context, bucket, object, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) {
|
|
if err := checkListMultipartArgs(ctx, bucket, object, keyMarker, uploadIDMarker, delimiter, es); err != nil {
|
|
return ListMultipartsInfo{}, err
|
|
}
|
|
|
|
result.MaxUploads = maxUploads
|
|
result.KeyMarker = keyMarker
|
|
result.Prefix = object
|
|
result.Delimiter = delimiter
|
|
|
|
uploadIDs, err := es.disk.ListDir(ctx, minioMetaMultipartBucket, es.getMultipartSHADir(bucket, object), -1)
|
|
if err != nil {
|
|
if err == errFileNotFound {
|
|
return result, nil
|
|
}
|
|
logger.LogIf(ctx, err)
|
|
return result, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
for i := range uploadIDs {
|
|
uploadIDs[i] = strings.TrimSuffix(uploadIDs[i], SlashSeparator)
|
|
}
|
|
|
|
// S3 spec says uploadIDs should be sorted based on initiated time, we need
|
|
// to read the metadata entry.
|
|
var uploads []MultipartInfo
|
|
|
|
populatedUploadIds := set.NewStringSet()
|
|
|
|
for _, uploadID := range uploadIDs {
|
|
if populatedUploadIds.Contains(uploadID) {
|
|
continue
|
|
}
|
|
fi, err := es.disk.ReadVersion(ctx, minioMetaMultipartBucket, pathJoin(es.getUploadIDDir(bucket, object, uploadID)), "", false)
|
|
if err != nil {
|
|
return result, toObjectErr(err, bucket, object)
|
|
}
|
|
populatedUploadIds.Add(uploadID)
|
|
uploads = append(uploads, MultipartInfo{
|
|
Object: object,
|
|
UploadID: uploadID,
|
|
Initiated: fi.ModTime,
|
|
})
|
|
}
|
|
|
|
sort.Slice(uploads, func(i int, j int) bool {
|
|
return uploads[i].Initiated.Before(uploads[j].Initiated)
|
|
})
|
|
|
|
uploadIndex := 0
|
|
if uploadIDMarker != "" {
|
|
for uploadIndex < len(uploads) {
|
|
if uploads[uploadIndex].UploadID != uploadIDMarker {
|
|
uploadIndex++
|
|
continue
|
|
}
|
|
if uploads[uploadIndex].UploadID == uploadIDMarker {
|
|
uploadIndex++
|
|
break
|
|
}
|
|
uploadIndex++
|
|
}
|
|
}
|
|
for uploadIndex < len(uploads) {
|
|
result.Uploads = append(result.Uploads, uploads[uploadIndex])
|
|
result.NextUploadIDMarker = uploads[uploadIndex].UploadID
|
|
uploadIndex++
|
|
if len(result.Uploads) == maxUploads {
|
|
break
|
|
}
|
|
}
|
|
|
|
result.IsTruncated = uploadIndex < len(uploads)
|
|
|
|
if !result.IsTruncated {
|
|
result.NextKeyMarker = ""
|
|
result.NextUploadIDMarker = ""
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// newMultipartUpload - wrapper for initializing a new multipart
|
|
// request; returns a unique upload id.
|
|
//
|
|
// Internally this function creates 'uploads.json' associated for the
|
|
// incoming object at
|
|
// '.minio.sys/multipart/bucket/object/uploads.json' on all the
|
|
// disks. `uploads.json` carries metadata regarding on-going multipart
|
|
// operation(s) on the object.
|
|
func (es *erasureSingle) newMultipartUpload(ctx context.Context, bucket string, object string, opts ObjectOptions) (string, error) {
|
|
onlineDisks := []StorageAPI{es.disk}
|
|
parityDrives := 0
|
|
dataDrives := len(onlineDisks) - parityDrives
|
|
|
|
// we now know the number of blocks this object needs for data and parity.
|
|
// establish the writeQuorum using this data
|
|
writeQuorum := dataDrives
|
|
if dataDrives == parityDrives {
|
|
writeQuorum++
|
|
}
|
|
|
|
// Initialize parts metadata
|
|
partsMetadata := make([]FileInfo, len(onlineDisks))
|
|
|
|
fi := newFileInfo(pathJoin(bucket, object), dataDrives, parityDrives)
|
|
fi.VersionID = opts.VersionID
|
|
if opts.Versioned && fi.VersionID == "" {
|
|
fi.VersionID = mustGetUUID()
|
|
}
|
|
fi.DataDir = mustGetUUID()
|
|
|
|
// Initialize erasure metadata.
|
|
for index := range partsMetadata {
|
|
partsMetadata[index] = fi
|
|
}
|
|
|
|
// Guess content-type from the extension if possible.
|
|
if opts.UserDefined["content-type"] == "" {
|
|
opts.UserDefined["content-type"] = mimedb.TypeByExtension(path.Ext(object))
|
|
}
|
|
|
|
modTime := opts.MTime
|
|
if opts.MTime.IsZero() {
|
|
modTime = UTCNow()
|
|
}
|
|
|
|
onlineDisks, partsMetadata = shuffleDisksAndPartsMetadata(onlineDisks, partsMetadata, fi)
|
|
|
|
// Fill all the necessary metadata.
|
|
// Update `xl.meta` content on each disks.
|
|
for index := range partsMetadata {
|
|
partsMetadata[index].Fresh = true
|
|
partsMetadata[index].ModTime = modTime
|
|
partsMetadata[index].Metadata = opts.UserDefined
|
|
}
|
|
|
|
uploadID := mustGetUUID()
|
|
uploadIDPath := es.getUploadIDDir(bucket, object, uploadID)
|
|
|
|
// Write updated `xl.meta` to all disks.
|
|
if _, err := writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil {
|
|
return "", toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
|
|
}
|
|
|
|
// Return success.
|
|
return uploadID, nil
|
|
}
|
|
|
|
// NewMultipartUpload - initialize a new multipart upload, returns a
|
|
// unique id. The unique id returned here is of UUID form, for each
|
|
// subsequent request each UUID is unique.
|
|
//
|
|
// Implements S3 compatible initiate multipart API.
|
|
func (es *erasureSingle) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (string, error) {
|
|
if err := checkNewMultipartArgs(ctx, bucket, object, es); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// No metadata is set, allocate a new one.
|
|
if opts.UserDefined == nil {
|
|
opts.UserDefined = make(map[string]string)
|
|
}
|
|
return es.newMultipartUpload(ctx, bucket, object, opts)
|
|
}
|
|
|
|
// CopyObjectPart - reads incoming stream and internally erasure codes
|
|
// them. This call is similar to put object part operation but the source
|
|
// data is read from an existing object.
|
|
//
|
|
// Implements S3 compatible Upload Part Copy API.
|
|
func (es *erasureSingle) CopyObjectPart(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject, uploadID string, partID int, startOffset int64, length int64, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (pi PartInfo, e error) {
|
|
partInfo, err := es.PutObjectPart(ctx, dstBucket, dstObject, uploadID, partID, NewPutObjReader(srcInfo.Reader), dstOpts)
|
|
if err != nil {
|
|
return pi, toObjectErr(err, dstBucket, dstObject)
|
|
}
|
|
|
|
// Success.
|
|
return partInfo, nil
|
|
}
|
|
|
|
// PutObjectPart - reads incoming stream and internally erasure codes
|
|
// them. This call is similar to single put operation but it is part
|
|
// of the multipart transaction.
|
|
//
|
|
// Implements S3 compatible Upload Part API.
|
|
func (es *erasureSingle) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *PutObjReader, opts ObjectOptions) (pi PartInfo, err error) {
|
|
if err := checkPutObjectPartArgs(ctx, bucket, object, es); err != nil {
|
|
return PartInfo{}, err
|
|
}
|
|
|
|
// Write lock for this part ID.
|
|
// Held throughout the operation.
|
|
partIDLock := es.NewNSLock(bucket, pathJoin(object, uploadID, strconv.Itoa(partID)))
|
|
plkctx, err := partIDLock.GetLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return PartInfo{}, err
|
|
}
|
|
pctx := plkctx.Context()
|
|
defer partIDLock.Unlock(plkctx.Cancel)
|
|
|
|
// Read lock for upload id.
|
|
// Only held while reading the upload metadata.
|
|
uploadIDRLock := es.NewNSLock(bucket, pathJoin(object, uploadID))
|
|
rlkctx, err := uploadIDRLock.GetRLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return PartInfo{}, err
|
|
}
|
|
rctx := rlkctx.Context()
|
|
defer func() {
|
|
if uploadIDRLock != nil {
|
|
uploadIDRLock.RUnlock(rlkctx.Cancel)
|
|
}
|
|
}()
|
|
|
|
data := r.Reader
|
|
// Validate input data size and it can never be less than zero.
|
|
if data.Size() < -1 {
|
|
logger.LogIf(rctx, errInvalidArgument, logger.Application)
|
|
return pi, toObjectErr(errInvalidArgument)
|
|
}
|
|
|
|
var partsMetadata []FileInfo
|
|
var errs []error
|
|
uploadIDPath := es.getUploadIDDir(bucket, object, uploadID)
|
|
|
|
// Validates if upload ID exists.
|
|
if err = es.checkUploadIDExists(rctx, bucket, object, uploadID); err != nil {
|
|
return pi, toObjectErr(err, bucket, object, uploadID)
|
|
}
|
|
|
|
storageDisks := []StorageAPI{es.disk}
|
|
|
|
// Read metadata associated with the object from all disks.
|
|
partsMetadata, errs = readAllFileInfo(rctx, storageDisks, minioMetaMultipartBucket,
|
|
uploadIDPath, "", false)
|
|
|
|
// Unlock upload id locks before, so others can get it.
|
|
uploadIDRLock.RUnlock(rlkctx.Cancel)
|
|
uploadIDRLock = nil
|
|
|
|
// get Quorum for this object
|
|
_, writeQuorum, err := objectQuorumFromMeta(pctx, partsMetadata, errs, 0)
|
|
if err != nil {
|
|
return pi, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
reducedErr := reduceWriteQuorumErrs(pctx, errs, objectOpIgnoredErrs, writeQuorum)
|
|
if reducedErr == errErasureWriteQuorum {
|
|
return pi, toObjectErr(reducedErr, bucket, object)
|
|
}
|
|
|
|
// List all online disks.
|
|
onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
|
|
|
|
// Pick one from the first valid metadata.
|
|
fi, err := pickValidFileInfo(pctx, partsMetadata, modTime, writeQuorum)
|
|
if err != nil {
|
|
return pi, err
|
|
}
|
|
|
|
onlineDisks = shuffleDisks(onlineDisks, fi.Erasure.Distribution)
|
|
|
|
// Need a unique name for the part being written in minioMetaBucket to
|
|
// accommodate concurrent PutObjectPart requests
|
|
|
|
partSuffix := fmt.Sprintf("part.%d", partID)
|
|
tmpPart := mustGetUUID()
|
|
tmpPartPath := pathJoin(tmpPart, partSuffix)
|
|
|
|
// Delete the temporary object part. If PutObjectPart succeeds there would be nothing to delete.
|
|
var online int
|
|
defer func() {
|
|
if online != len(onlineDisks) {
|
|
es.disk.RenameFile(context.Background(), minioMetaTmpBucket, tmpPart, minioMetaTmpDeletedBucket, mustGetUUID())
|
|
}
|
|
}()
|
|
|
|
erasure, err := NewErasure(pctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize)
|
|
if err != nil {
|
|
return pi, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
// Fetch buffer for I/O, returns from the pool if not allocates a new one and returns.
|
|
var buffer []byte
|
|
switch size := data.Size(); {
|
|
case size == 0:
|
|
buffer = make([]byte, 1) // Allocate atleast a byte to reach EOF
|
|
case size == -1:
|
|
if size := data.ActualSize(); size > 0 && size < fi.Erasure.BlockSize {
|
|
buffer = make([]byte, data.ActualSize()+256, data.ActualSize()*2+512)
|
|
} else {
|
|
buffer = es.bp.Get()
|
|
defer es.bp.Put(buffer)
|
|
}
|
|
case size >= fi.Erasure.BlockSize:
|
|
buffer = es.bp.Get()
|
|
defer es.bp.Put(buffer)
|
|
case size < fi.Erasure.BlockSize:
|
|
// No need to allocate fully fi.Erasure.BlockSize buffer if the incoming data is smalles.
|
|
buffer = make([]byte, size, 2*size+int64(fi.Erasure.ParityBlocks+fi.Erasure.DataBlocks-1))
|
|
}
|
|
|
|
if len(buffer) > int(fi.Erasure.BlockSize) {
|
|
buffer = buffer[:fi.Erasure.BlockSize]
|
|
}
|
|
writers := make([]io.Writer, len(onlineDisks))
|
|
for i, disk := range onlineDisks {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, tmpPartPath, erasure.ShardFileSize(data.Size()), DefaultBitrotAlgorithm, erasure.ShardSize())
|
|
}
|
|
|
|
toEncode := io.Reader(data)
|
|
if data.Size() > bigFileThreshold {
|
|
// Add input readahead.
|
|
// We use 2 buffers, so we always have a full buffer of input.
|
|
bufA := es.bp.Get()
|
|
bufB := es.bp.Get()
|
|
defer es.bp.Put(bufA)
|
|
defer es.bp.Put(bufB)
|
|
ra, err := readahead.NewReaderBuffer(data, [][]byte{bufA[:fi.Erasure.BlockSize], bufB[:fi.Erasure.BlockSize]})
|
|
if err == nil {
|
|
toEncode = ra
|
|
defer ra.Close()
|
|
}
|
|
}
|
|
|
|
n, err := erasure.Encode(pctx, toEncode, writers, buffer, writeQuorum)
|
|
closeBitrotWriters(writers)
|
|
if err != nil {
|
|
return pi, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
// Should return IncompleteBody{} error when reader has fewer bytes
|
|
// than specified in request header.
|
|
if n < data.Size() {
|
|
return pi, IncompleteBody{Bucket: bucket, Object: object}
|
|
}
|
|
|
|
for i := range writers {
|
|
if writers[i] == nil {
|
|
onlineDisks[i] = nil
|
|
}
|
|
}
|
|
|
|
// Acquire write lock to update metadata.
|
|
uploadIDWLock := es.NewNSLock(bucket, pathJoin(object, uploadID))
|
|
wlkctx, err := uploadIDWLock.GetLock(pctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return PartInfo{}, err
|
|
}
|
|
wctx := wlkctx.Context()
|
|
defer uploadIDWLock.Unlock(wlkctx.Cancel)
|
|
|
|
// Validates if upload ID exists.
|
|
if err = es.checkUploadIDExists(wctx, bucket, object, uploadID); err != nil {
|
|
return pi, toObjectErr(err, bucket, object, uploadID)
|
|
}
|
|
|
|
// Rename temporary part file to its final location.
|
|
partPath := pathJoin(uploadIDPath, fi.DataDir, partSuffix)
|
|
onlineDisks, err = renamePart(wctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, writeQuorum)
|
|
if err != nil {
|
|
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
|
|
}
|
|
|
|
// Read metadata again because it might be updated with parallel upload of another part.
|
|
partsMetadata, errs = readAllFileInfo(wctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
|
|
reducedErr = reduceWriteQuorumErrs(wctx, errs, objectOpIgnoredErrs, writeQuorum)
|
|
if reducedErr == errErasureWriteQuorum {
|
|
return pi, toObjectErr(reducedErr, bucket, object)
|
|
}
|
|
|
|
// Get current highest version based on re-read partsMetadata.
|
|
onlineDisks, modTime = listOnlineDisks(onlineDisks, partsMetadata, errs)
|
|
|
|
// Pick one from the first valid metadata.
|
|
fi, err = pickValidFileInfo(wctx, partsMetadata, modTime, writeQuorum)
|
|
if err != nil {
|
|
return pi, err
|
|
}
|
|
|
|
// Once part is successfully committed, proceed with updating erasure metadata.
|
|
fi.ModTime = UTCNow()
|
|
|
|
md5hex := r.MD5CurrentHexString()
|
|
|
|
// Add the current part.
|
|
fi.AddObjectPart(partID, md5hex, n, data.ActualSize())
|
|
|
|
for i, disk := range onlineDisks {
|
|
if disk == OfflineDisk {
|
|
continue
|
|
}
|
|
partsMetadata[i].Size = fi.Size
|
|
partsMetadata[i].ModTime = fi.ModTime
|
|
partsMetadata[i].Parts = fi.Parts
|
|
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{
|
|
PartNumber: partID,
|
|
Algorithm: DefaultBitrotAlgorithm,
|
|
Hash: bitrotWriterSum(writers[i]),
|
|
})
|
|
}
|
|
|
|
// Writes update `xl.meta` format for each disk.
|
|
if _, err = writeUniqueFileInfo(wctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum); err != nil {
|
|
return pi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
|
|
}
|
|
|
|
online = countOnlineDisks(onlineDisks)
|
|
|
|
// Return success.
|
|
return PartInfo{
|
|
PartNumber: partID,
|
|
ETag: md5hex,
|
|
LastModified: fi.ModTime,
|
|
Size: n,
|
|
ActualSize: data.ActualSize(),
|
|
}, nil
|
|
}
|
|
|
|
// GetMultipartInfo returns multipart metadata uploaded during newMultipartUpload, used
|
|
// by callers to verify object states
|
|
// - encrypted
|
|
// - compressed
|
|
func (es *erasureSingle) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (MultipartInfo, error) {
|
|
if err := checkListPartsArgs(ctx, bucket, object, es); err != nil {
|
|
return MultipartInfo{}, err
|
|
}
|
|
|
|
result := MultipartInfo{
|
|
Bucket: bucket,
|
|
Object: object,
|
|
UploadID: uploadID,
|
|
}
|
|
|
|
uploadIDLock := es.NewNSLock(bucket, pathJoin(object, uploadID))
|
|
lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return MultipartInfo{}, err
|
|
}
|
|
ctx = lkctx.Context()
|
|
defer uploadIDLock.RUnlock(lkctx.Cancel)
|
|
|
|
if err := es.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
|
return result, toObjectErr(err, bucket, object, uploadID)
|
|
}
|
|
|
|
uploadIDPath := es.getUploadIDDir(bucket, object, uploadID)
|
|
|
|
storageDisks := []StorageAPI{es.disk}
|
|
|
|
// Read metadata associated with the object from all disks.
|
|
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, opts.VersionID, false)
|
|
|
|
// get Quorum for this object
|
|
readQuorum, _, err := objectQuorumFromMeta(ctx, partsMetadata, errs, 0)
|
|
if err != nil {
|
|
return result, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
|
|
}
|
|
|
|
reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, readQuorum)
|
|
if reducedErr == errErasureReadQuorum {
|
|
return result, toObjectErr(reducedErr, minioMetaMultipartBucket, uploadIDPath)
|
|
}
|
|
|
|
_, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
|
|
|
|
// Pick one from the first valid metadata.
|
|
fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, readQuorum)
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
|
|
result.UserDefined = cloneMSS(fi.Metadata)
|
|
return result, nil
|
|
}
|
|
|
|
// ListObjectParts - lists all previously uploaded parts for a given
|
|
// object and uploadID. Takes additional input of part-number-marker
|
|
// to indicate where the listing should begin from.
|
|
//
|
|
// Implements S3 compatible ListObjectParts API. The resulting
|
|
// ListPartsInfo structure is marshaled directly into XML and
|
|
// replied back to the client.
|
|
func (es *erasureSingle) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker, maxParts int, opts ObjectOptions) (result ListPartsInfo, err error) {
|
|
if err := checkListPartsArgs(ctx, bucket, object, es); err != nil {
|
|
return ListPartsInfo{}, err
|
|
}
|
|
|
|
uploadIDLock := es.NewNSLock(bucket, pathJoin(object, uploadID))
|
|
lkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return ListPartsInfo{}, err
|
|
}
|
|
ctx = lkctx.Context()
|
|
defer uploadIDLock.RUnlock(lkctx.Cancel)
|
|
|
|
if err := es.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
|
return result, toObjectErr(err, bucket, object, uploadID)
|
|
}
|
|
|
|
uploadIDPath := es.getUploadIDDir(bucket, object, uploadID)
|
|
|
|
storageDisks := []StorageAPI{es.disk}
|
|
|
|
// Read metadata associated with the object from all disks.
|
|
partsMetadata, errs := readAllFileInfo(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
|
|
|
|
// get Quorum for this object
|
|
_, writeQuorum, err := objectQuorumFromMeta(ctx, partsMetadata, errs, 0)
|
|
if err != nil {
|
|
return result, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
|
|
}
|
|
|
|
reducedErr := reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
|
if reducedErr == errErasureWriteQuorum {
|
|
return result, toObjectErr(reducedErr, minioMetaMultipartBucket, uploadIDPath)
|
|
}
|
|
|
|
_, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
|
|
|
|
// Pick one from the first valid metadata.
|
|
fi, err := pickValidFileInfo(ctx, partsMetadata, modTime, writeQuorum)
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
|
|
// Populate the result stub.
|
|
result.Bucket = bucket
|
|
result.Object = object
|
|
result.UploadID = uploadID
|
|
result.MaxParts = maxParts
|
|
result.PartNumberMarker = partNumberMarker
|
|
result.UserDefined = cloneMSS(fi.Metadata)
|
|
|
|
// For empty number of parts or maxParts as zero, return right here.
|
|
if len(fi.Parts) == 0 || maxParts == 0 {
|
|
return result, nil
|
|
}
|
|
|
|
// Limit output to maxPartsList.
|
|
if maxParts > maxPartsList {
|
|
maxParts = maxPartsList
|
|
}
|
|
|
|
// Only parts with higher part numbers will be listed.
|
|
partIdx := objectPartIndex(fi.Parts, partNumberMarker)
|
|
parts := fi.Parts
|
|
if partIdx != -1 {
|
|
parts = fi.Parts[partIdx+1:]
|
|
}
|
|
count := maxParts
|
|
for _, part := range parts {
|
|
result.Parts = append(result.Parts, PartInfo{
|
|
PartNumber: part.Number,
|
|
ETag: part.ETag,
|
|
LastModified: fi.ModTime,
|
|
Size: part.Size,
|
|
})
|
|
count--
|
|
if count == 0 {
|
|
break
|
|
}
|
|
}
|
|
// If listed entries are more than maxParts, we set IsTruncated as true.
|
|
if len(parts) > len(result.Parts) {
|
|
result.IsTruncated = true
|
|
// Make sure to fill next part number marker if IsTruncated is
|
|
// true for subsequent listing.
|
|
nextPartNumberMarker := result.Parts[len(result.Parts)-1].PartNumber
|
|
result.NextPartNumberMarker = nextPartNumberMarker
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// CompleteMultipartUpload - completes an ongoing multipart
|
|
// transaction after receiving all the parts indicated by the client.
|
|
// Returns an md5sum calculated by concatenating all the individual
|
|
// md5sums of all the parts.
|
|
//
|
|
// Implements S3 compatible Complete multipart API.
|
|
func (es *erasureSingle) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, parts []CompletePart, opts ObjectOptions) (oi ObjectInfo, err error) {
|
|
if err = checkCompleteMultipartArgs(ctx, bucket, object, es); err != nil {
|
|
return oi, err
|
|
}
|
|
|
|
// Hold read-locks to verify uploaded parts, also disallows
|
|
// parallel part uploads as well.
|
|
uploadIDLock := es.NewNSLock(bucket, pathJoin(object, uploadID))
|
|
rlkctx, err := uploadIDLock.GetRLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return oi, err
|
|
}
|
|
rctx := rlkctx.Context()
|
|
defer uploadIDLock.RUnlock(rlkctx.Cancel)
|
|
|
|
if err = es.checkUploadIDExists(rctx, bucket, object, uploadID); err != nil {
|
|
return oi, toObjectErr(err, bucket, object, uploadID)
|
|
}
|
|
|
|
uploadIDPath := es.getUploadIDDir(bucket, object, uploadID)
|
|
|
|
storageDisks := []StorageAPI{es.disk}
|
|
|
|
// Read metadata associated with the object from all disks.
|
|
partsMetadata, errs := readAllFileInfo(rctx, storageDisks, minioMetaMultipartBucket, uploadIDPath, "", false)
|
|
|
|
// get Quorum for this object
|
|
_, writeQuorum, err := objectQuorumFromMeta(rctx, partsMetadata, errs, 0)
|
|
if err != nil {
|
|
return oi, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
reducedErr := reduceWriteQuorumErrs(rctx, errs, objectOpIgnoredErrs, writeQuorum)
|
|
if reducedErr == errErasureWriteQuorum {
|
|
return oi, toObjectErr(reducedErr, bucket, object)
|
|
}
|
|
|
|
onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs)
|
|
|
|
// Pick one from the first valid metadata.
|
|
fi, err := pickValidFileInfo(rctx, partsMetadata, modTime, writeQuorum)
|
|
if err != nil {
|
|
return oi, err
|
|
}
|
|
|
|
// Calculate full object size.
|
|
var objectSize int64
|
|
|
|
// Calculate consolidated actual size.
|
|
var objectActualSize int64
|
|
|
|
// Order online disks in accordance with distribution order.
|
|
// Order parts metadata in accordance with distribution order.
|
|
onlineDisks, partsMetadata = shuffleDisksAndPartsMetadataByIndex(onlineDisks, partsMetadata, fi)
|
|
|
|
// Save current erasure metadata for validation.
|
|
currentFI := fi
|
|
|
|
// Allocate parts similar to incoming slice.
|
|
fi.Parts = make([]ObjectPartInfo, len(parts))
|
|
|
|
// Validate each part and then commit to disk.
|
|
for i, part := range parts {
|
|
partIdx := objectPartIndex(currentFI.Parts, part.PartNumber)
|
|
// All parts should have same part number.
|
|
if partIdx == -1 {
|
|
invp := InvalidPart{
|
|
PartNumber: part.PartNumber,
|
|
GotETag: part.ETag,
|
|
}
|
|
return oi, invp
|
|
}
|
|
|
|
// ensure that part ETag is canonicalized to strip off extraneous quotes
|
|
part.ETag = canonicalizeETag(part.ETag)
|
|
if currentFI.Parts[partIdx].ETag != part.ETag {
|
|
invp := InvalidPart{
|
|
PartNumber: part.PartNumber,
|
|
ExpETag: currentFI.Parts[partIdx].ETag,
|
|
GotETag: part.ETag,
|
|
}
|
|
return oi, invp
|
|
}
|
|
|
|
// All parts except the last part has to be atleast 5MB.
|
|
if (i < len(parts)-1) && !isMinAllowedPartSize(currentFI.Parts[partIdx].ActualSize) {
|
|
return oi, PartTooSmall{
|
|
PartNumber: part.PartNumber,
|
|
PartSize: currentFI.Parts[partIdx].ActualSize,
|
|
PartETag: part.ETag,
|
|
}
|
|
}
|
|
|
|
// Save for total object size.
|
|
objectSize += currentFI.Parts[partIdx].Size
|
|
|
|
// Save the consolidated actual size.
|
|
objectActualSize += currentFI.Parts[partIdx].ActualSize
|
|
|
|
// Add incoming parts.
|
|
fi.Parts[i] = ObjectPartInfo{
|
|
Number: part.PartNumber,
|
|
Size: currentFI.Parts[partIdx].Size,
|
|
ActualSize: currentFI.Parts[partIdx].ActualSize,
|
|
}
|
|
}
|
|
|
|
// Save the final object size and modtime.
|
|
fi.Size = objectSize
|
|
fi.ModTime = opts.MTime
|
|
if opts.MTime.IsZero() {
|
|
fi.ModTime = UTCNow()
|
|
}
|
|
|
|
// Save successfully calculated md5sum.
|
|
fi.Metadata["etag"] = opts.UserDefined["etag"]
|
|
if fi.Metadata["etag"] == "" {
|
|
fi.Metadata["etag"] = getCompleteMultipartMD5(parts)
|
|
}
|
|
|
|
// Save the consolidated actual size.
|
|
fi.Metadata[ReservedMetadataPrefix+"actual-size"] = strconv.FormatInt(objectActualSize, 10)
|
|
|
|
// Update all erasure metadata, make sure to not modify fields like
|
|
// checksum which are different on each disks.
|
|
for index := range partsMetadata {
|
|
if partsMetadata[index].IsValid() {
|
|
partsMetadata[index].Size = fi.Size
|
|
partsMetadata[index].ModTime = fi.ModTime
|
|
partsMetadata[index].Metadata = fi.Metadata
|
|
partsMetadata[index].Parts = fi.Parts
|
|
}
|
|
}
|
|
|
|
// Hold namespace to complete the transaction
|
|
lk := es.NewNSLock(bucket, object)
|
|
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return oi, err
|
|
}
|
|
ctx = lkctx.Context()
|
|
defer lk.Unlock(lkctx.Cancel)
|
|
|
|
// Write final `xl.meta` at uploadID location
|
|
onlineDisks, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, partsMetadata, writeQuorum)
|
|
if err != nil {
|
|
return oi, toObjectErr(err, minioMetaMultipartBucket, uploadIDPath)
|
|
}
|
|
|
|
// Remove parts that weren't present in CompleteMultipartUpload request.
|
|
for _, curpart := range currentFI.Parts {
|
|
if objectPartIndex(fi.Parts, curpart.Number) == -1 {
|
|
// Delete the missing part files. e.g,
|
|
// Request 1: NewMultipart
|
|
// Request 2: PutObjectPart 1
|
|
// Request 3: PutObjectPart 2
|
|
// Request 4: CompleteMultipartUpload --part 2
|
|
// N.B. 1st part is not present. This part should be removed from the storage.
|
|
es.removeObjectPart(bucket, object, uploadID, fi.DataDir, curpart.Number)
|
|
}
|
|
}
|
|
|
|
// Rename the multipart object to final location.
|
|
if onlineDisks, err = renameData(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath,
|
|
partsMetadata, bucket, object, writeQuorum); err != nil {
|
|
return oi, toObjectErr(err, bucket, object)
|
|
}
|
|
|
|
for i := 0; i < len(onlineDisks); i++ {
|
|
if onlineDisks[i] != nil && onlineDisks[i].IsOnline() {
|
|
// Object info is the same in all disks, so we can pick
|
|
// the first meta from online disk
|
|
fi = partsMetadata[i]
|
|
break
|
|
}
|
|
}
|
|
|
|
// we are adding a new version to this object under the namespace lock, so this is the latest version.
|
|
fi.IsLatest = true
|
|
|
|
// Success, return object info.
|
|
return fi.ToObjectInfo(bucket, object, opts.Versioned || opts.VersionSuspended), nil
|
|
}
|
|
|
|
// AbortMultipartUpload - aborts an ongoing multipart operation
|
|
// signified by the input uploadID. This is an atomic operation
|
|
// doesn't require clients to initiate multiple such requests.
|
|
//
|
|
// All parts are purged from all disks and reference to the uploadID
|
|
// would be removed from the system, rollback is not possible on this
|
|
// operation.
|
|
func (es *erasureSingle) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (err error) {
|
|
if err = checkAbortMultipartArgs(ctx, bucket, object, es); err != nil {
|
|
return err
|
|
}
|
|
|
|
lk := es.NewNSLock(bucket, pathJoin(object, uploadID))
|
|
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ctx = lkctx.Context()
|
|
defer lk.Unlock(lkctx.Cancel)
|
|
|
|
// Validates if upload ID exists.
|
|
if err := es.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
|
|
return toObjectErr(err, bucket, object, uploadID)
|
|
}
|
|
|
|
// Cleanup all uploaded parts.
|
|
es.disk.RenameFile(ctx, minioMetaMultipartBucket, es.getUploadIDDir(bucket, object, uploadID), minioMetaTmpDeletedBucket, mustGetUUID())
|
|
|
|
// Successfully purged.
|
|
return nil
|
|
}
|
|
|
|
func (es *erasureSingle) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
|
|
var loi ListObjectsInfo
|
|
|
|
// Automatically remove the object/version is an expiry lifecycle rule can be applied
|
|
lc, _ := globalLifecycleSys.Get(bucket)
|
|
|
|
// Check if bucket is object locked.
|
|
rcfg, _ := globalBucketObjectLockSys.Get(bucket)
|
|
|
|
if len(prefix) > 0 && maxKeys == 1 && delimiter == "" && marker == "" {
|
|
// Optimization for certain applications like
|
|
// - Cohesity
|
|
// - Actifio, Splunk etc.
|
|
// which send ListObjects requests where the actual object
|
|
// itself is the prefix and max-keys=1 in such scenarios
|
|
// we can simply verify locally if such an object exists
|
|
// to avoid the need for ListObjects().
|
|
objInfo, err := es.GetObjectInfo(ctx, bucket, prefix, ObjectOptions{NoLock: true})
|
|
if err == nil {
|
|
if lc != nil {
|
|
action := evalActionFromLifecycle(ctx, *lc, rcfg, objInfo, false)
|
|
switch action {
|
|
case lifecycle.DeleteVersionAction, lifecycle.DeleteAction:
|
|
fallthrough
|
|
case lifecycle.DeleteRestoredAction, lifecycle.DeleteRestoredVersionAction:
|
|
return loi, nil
|
|
}
|
|
}
|
|
loi.Objects = append(loi.Objects, objInfo)
|
|
return loi, nil
|
|
}
|
|
}
|
|
|
|
opts := listPathOptions{
|
|
Bucket: bucket,
|
|
Prefix: prefix,
|
|
Separator: delimiter,
|
|
Limit: maxKeysPlusOne(maxKeys, marker != ""),
|
|
Marker: marker,
|
|
InclDeleted: false,
|
|
AskDisks: globalAPIConfig.getListQuorum(),
|
|
Lifecycle: lc,
|
|
Retention: rcfg,
|
|
}
|
|
|
|
merged, err := es.listPath(ctx, &opts)
|
|
if err != nil && err != io.EOF {
|
|
if !isErrBucketNotFound(err) {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
return loi, err
|
|
}
|
|
|
|
merged.forwardPast(opts.Marker)
|
|
defer merged.truncate(0) // Release when returning
|
|
|
|
// Default is recursive, if delimiter is set then list non recursive.
|
|
objects := merged.fileInfos(bucket, prefix, delimiter)
|
|
loi.IsTruncated = err == nil && len(objects) > 0
|
|
if maxKeys > 0 && len(objects) > maxKeys {
|
|
objects = objects[:maxKeys]
|
|
loi.IsTruncated = true
|
|
}
|
|
for _, obj := range objects {
|
|
if obj.IsDir && obj.ModTime.IsZero() && delimiter != "" {
|
|
loi.Prefixes = append(loi.Prefixes, obj.Name)
|
|
} else {
|
|
loi.Objects = append(loi.Objects, obj)
|
|
}
|
|
}
|
|
if loi.IsTruncated {
|
|
last := objects[len(objects)-1]
|
|
loi.NextMarker = opts.encodeMarker(last.Name)
|
|
}
|
|
return loi, nil
|
|
}
|
|
|
|
func (es *erasureSingle) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (ListObjectsV2Info, error) {
|
|
marker := continuationToken
|
|
if marker == "" {
|
|
marker = startAfter
|
|
}
|
|
|
|
loi, err := es.ListObjects(ctx, bucket, prefix, marker, delimiter, maxKeys)
|
|
if err != nil {
|
|
return ListObjectsV2Info{}, err
|
|
}
|
|
|
|
listObjectsV2Info := ListObjectsV2Info{
|
|
IsTruncated: loi.IsTruncated,
|
|
ContinuationToken: continuationToken,
|
|
NextContinuationToken: loi.NextMarker,
|
|
Objects: loi.Objects,
|
|
Prefixes: loi.Prefixes,
|
|
}
|
|
return listObjectsV2Info, err
|
|
}
|
|
|
|
func (es *erasureSingle) ListObjectVersions(ctx context.Context, bucket, prefix, marker, versionMarker, delimiter string, maxKeys int) (ListObjectVersionsInfo, error) {
|
|
loi := ListObjectVersionsInfo{}
|
|
if marker == "" && versionMarker != "" {
|
|
return loi, NotImplemented{}
|
|
}
|
|
|
|
opts := listPathOptions{
|
|
Bucket: bucket,
|
|
Prefix: prefix,
|
|
Separator: delimiter,
|
|
Limit: maxKeysPlusOne(maxKeys, marker != ""),
|
|
Marker: marker,
|
|
InclDeleted: true,
|
|
AskDisks: "strict",
|
|
Versioned: true,
|
|
}
|
|
|
|
merged, err := es.listPath(ctx, &opts)
|
|
if err != nil && err != io.EOF {
|
|
return loi, err
|
|
}
|
|
defer merged.truncate(0) // Release when returning
|
|
if versionMarker == "" {
|
|
o := listPathOptions{Marker: marker}
|
|
// If we are not looking for a specific version skip it.
|
|
|
|
o.parseMarker()
|
|
merged.forwardPast(o.Marker)
|
|
}
|
|
objects := merged.fileInfoVersions(bucket, prefix, delimiter, versionMarker)
|
|
loi.IsTruncated = err == nil && len(objects) > 0
|
|
if maxKeys > 0 && len(objects) > maxKeys {
|
|
objects = objects[:maxKeys]
|
|
loi.IsTruncated = true
|
|
}
|
|
for _, obj := range objects {
|
|
if obj.IsDir && obj.ModTime.IsZero() && delimiter != "" {
|
|
loi.Prefixes = append(loi.Prefixes, obj.Name)
|
|
} else {
|
|
loi.Objects = append(loi.Objects, obj)
|
|
}
|
|
}
|
|
if loi.IsTruncated {
|
|
last := objects[len(objects)-1]
|
|
loi.NextMarker = opts.encodeMarker(last.Name)
|
|
loi.NextVersionIDMarker = last.VersionID
|
|
}
|
|
return loi, nil
|
|
}
|
|
|
|
// Walk a bucket, optionally prefix recursively, until we have returned
|
|
// all the content to objectInfo channel, it is callers responsibility
|
|
// to allocate a receive channel for ObjectInfo, upon any unhandled
|
|
// error walker returns error. Optionally if context.Done() is received
|
|
// then Walk() stops the walker.
|
|
func (es *erasureSingle) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts ObjectOptions) error {
|
|
if err := checkListObjsArgs(ctx, bucket, prefix, "", es); err != nil {
|
|
// Upon error close the channel.
|
|
close(results)
|
|
return err
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
go func() {
|
|
defer cancel()
|
|
defer close(results)
|
|
|
|
versioned := opts.Versioned || opts.VersionSuspended
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
loadEntry := func(entry metaCacheEntry) {
|
|
if entry.isDir() {
|
|
return
|
|
}
|
|
|
|
fivs, err := entry.fileInfoVersions(bucket)
|
|
if err != nil {
|
|
cancel()
|
|
return
|
|
}
|
|
if opts.WalkAscending {
|
|
for i := len(fivs.Versions) - 1; i >= 0; i-- {
|
|
version := fivs.Versions[i]
|
|
results <- version.ToObjectInfo(bucket, version.Name, versioned)
|
|
}
|
|
return
|
|
}
|
|
for _, version := range fivs.Versions {
|
|
results <- version.ToObjectInfo(bucket, version.Name, versioned)
|
|
}
|
|
}
|
|
|
|
// How to resolve partial results.
|
|
resolver := metadataResolutionParams{
|
|
dirQuorum: 1,
|
|
objQuorum: 1,
|
|
bucket: bucket,
|
|
}
|
|
|
|
path := baseDirFromPrefix(prefix)
|
|
filterPrefix := strings.Trim(strings.TrimPrefix(prefix, path), slashSeparator)
|
|
if path == prefix {
|
|
filterPrefix = ""
|
|
}
|
|
|
|
lopts := listPathRawOptions{
|
|
disks: []StorageAPI{es.disk},
|
|
bucket: bucket,
|
|
path: path,
|
|
filterPrefix: filterPrefix,
|
|
recursive: true,
|
|
forwardTo: "",
|
|
minDisks: 1,
|
|
reportNotFound: false,
|
|
agreed: loadEntry,
|
|
partial: func(entries metaCacheEntries, nAgreed int, errs []error) {
|
|
entry, ok := entries.resolve(&resolver)
|
|
if !ok {
|
|
// check if we can get one entry atleast
|
|
// proceed to heal nonetheless.
|
|
entry, _ = entries.firstFound()
|
|
}
|
|
|
|
loadEntry(*entry)
|
|
},
|
|
finished: nil,
|
|
}
|
|
|
|
if err := listPathRaw(ctx, lopts); err != nil {
|
|
logger.LogIf(ctx, fmt.Errorf("listPathRaw returned %w: opts(%#v)", err, lopts))
|
|
return
|
|
}
|
|
}()
|
|
wg.Wait()
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// nsScanner will start scanning buckets and send updated totals as they are traversed.
|
|
// Updates are sent on a regular basis and the caller *must* consume them.
|
|
func (es *erasureSingle) nsScanner(ctx context.Context, buckets []BucketInfo, bf *bloomFilter, wantCycle uint32, updates chan<- dataUsageCache, healScanMode madmin.HealScanMode) error {
|
|
if len(buckets) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Collect disks we can use.
|
|
disks := []StorageAPI{es.disk}
|
|
|
|
// Load bucket totals
|
|
oldCache := dataUsageCache{}
|
|
if err := oldCache.load(ctx, es, dataUsageCacheName); err != nil {
|
|
return err
|
|
}
|
|
|
|
// New cache..
|
|
cache := dataUsageCache{
|
|
Info: dataUsageCacheInfo{
|
|
Name: dataUsageRoot,
|
|
NextCycle: oldCache.Info.NextCycle,
|
|
},
|
|
Cache: make(map[string]dataUsageEntry, len(oldCache.Cache)),
|
|
}
|
|
bloom := bf.bytes()
|
|
|
|
// Put all buckets into channel.
|
|
bucketCh := make(chan BucketInfo, len(buckets))
|
|
// Add new buckets first
|
|
for _, b := range buckets {
|
|
if oldCache.find(b.Name) == nil {
|
|
bucketCh <- b
|
|
}
|
|
}
|
|
|
|
// Add existing buckets.
|
|
for _, b := range buckets {
|
|
e := oldCache.find(b.Name)
|
|
if e != nil {
|
|
cache.replace(b.Name, dataUsageRoot, *e)
|
|
bucketCh <- b
|
|
}
|
|
}
|
|
|
|
close(bucketCh)
|
|
bucketResults := make(chan dataUsageEntryInfo, len(disks))
|
|
|
|
// Start async collector/saver.
|
|
// This goroutine owns the cache.
|
|
var saverWg sync.WaitGroup
|
|
saverWg.Add(1)
|
|
go func() {
|
|
// Add jitter to the update time so multiple sets don't sync up.
|
|
updateTime := 30*time.Second + time.Duration(float64(10*time.Second)*rand.Float64())
|
|
t := time.NewTicker(updateTime)
|
|
defer t.Stop()
|
|
defer saverWg.Done()
|
|
var lastSave time.Time
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
// Return without saving.
|
|
return
|
|
case <-t.C:
|
|
if cache.Info.LastUpdate.Equal(lastSave) {
|
|
continue
|
|
}
|
|
logger.LogIf(ctx, cache.save(ctx, es, dataUsageCacheName))
|
|
updates <- cache.clone()
|
|
lastSave = cache.Info.LastUpdate
|
|
case v, ok := <-bucketResults:
|
|
if !ok {
|
|
// Save final state...
|
|
cache.Info.NextCycle = wantCycle
|
|
cache.Info.LastUpdate = time.Now()
|
|
logger.LogIf(ctx, cache.save(ctx, es, dataUsageCacheName))
|
|
updates <- cache
|
|
return
|
|
}
|
|
cache.replace(v.Name, v.Parent, v.Entry)
|
|
cache.Info.LastUpdate = time.Now()
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Shuffle disks to ensure a total randomness of bucket/disk association to ensure
|
|
// that objects that are not present in all disks are accounted and ILM applied.
|
|
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
r.Shuffle(len(disks), func(i, j int) { disks[i], disks[j] = disks[j], disks[i] })
|
|
|
|
// Start one scanner per disk
|
|
var wg sync.WaitGroup
|
|
wg.Add(len(disks))
|
|
for i := range disks {
|
|
go func(i int) {
|
|
defer wg.Done()
|
|
disk := disks[i]
|
|
|
|
for bucket := range bucketCh {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
// Load cache for bucket
|
|
cacheName := pathJoin(bucket.Name, dataUsageCacheName)
|
|
cache := dataUsageCache{}
|
|
logger.LogIf(ctx, cache.load(ctx, es, cacheName))
|
|
if cache.Info.Name == "" {
|
|
cache.Info.Name = bucket.Name
|
|
}
|
|
cache.Info.BloomFilter = bloom
|
|
cache.Info.SkipHealing = true
|
|
cache.Info.NextCycle = wantCycle
|
|
if cache.Info.Name != bucket.Name {
|
|
logger.LogIf(ctx, fmt.Errorf("cache name mismatch: %s != %s", cache.Info.Name, bucket.Name))
|
|
cache.Info = dataUsageCacheInfo{
|
|
Name: bucket.Name,
|
|
LastUpdate: time.Time{},
|
|
NextCycle: wantCycle,
|
|
}
|
|
}
|
|
// Collect updates.
|
|
updates := make(chan dataUsageEntry, 1)
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func(name string) {
|
|
defer wg.Done()
|
|
for update := range updates {
|
|
bucketResults <- dataUsageEntryInfo{
|
|
Name: name,
|
|
Parent: dataUsageRoot,
|
|
Entry: update,
|
|
}
|
|
}
|
|
}(cache.Info.Name)
|
|
// Calc usage
|
|
before := cache.Info.LastUpdate
|
|
var err error
|
|
cache, err = disk.NSScanner(ctx, cache, updates, healScanMode)
|
|
cache.Info.BloomFilter = nil
|
|
if err != nil {
|
|
if !cache.Info.LastUpdate.IsZero() && cache.Info.LastUpdate.After(before) {
|
|
logger.LogIf(ctx, cache.save(ctx, es, cacheName))
|
|
} else {
|
|
logger.LogIf(ctx, err)
|
|
}
|
|
// This ensures that we don't close
|
|
// bucketResults channel while the
|
|
// updates-collector goroutine still
|
|
// holds a reference to this.
|
|
wg.Wait()
|
|
continue
|
|
}
|
|
|
|
wg.Wait()
|
|
var root dataUsageEntry
|
|
if r := cache.root(); r != nil {
|
|
root = cache.flatten(*r)
|
|
}
|
|
t := time.Now()
|
|
bucketResults <- dataUsageEntryInfo{
|
|
Name: cache.Info.Name,
|
|
Parent: dataUsageRoot,
|
|
Entry: root,
|
|
}
|
|
// We want to avoid synchronizing up all writes in case
|
|
// the results are piled up.
|
|
time.Sleep(time.Duration(float64(time.Since(t)) * rand.Float64()))
|
|
// Save cache
|
|
logger.LogIf(ctx, cache.save(ctx, es, cacheName))
|
|
}
|
|
}(i)
|
|
}
|
|
wg.Wait()
|
|
close(bucketResults)
|
|
saverWg.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (es *erasureSingle) NSScanner(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo, wantCycle uint32, healScanMode madmin.HealScanMode) error {
|
|
// Updates must be closed before we return.
|
|
defer close(updates)
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
var wg sync.WaitGroup
|
|
var mu sync.Mutex
|
|
results := make([]dataUsageCache, 1)
|
|
var firstErr error
|
|
|
|
allBuckets, err := es.ListBuckets(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(allBuckets) == 0 {
|
|
updates <- DataUsageInfo{} // no buckets found update data usage to reflect latest state
|
|
return nil
|
|
}
|
|
|
|
// Scanner latest allBuckets first.
|
|
sort.Slice(allBuckets, func(i, j int) bool {
|
|
return allBuckets[i].Created.After(allBuckets[j].Created)
|
|
})
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
updates := make(chan dataUsageCache, 1)
|
|
defer close(updates)
|
|
// Start update collector.
|
|
go func() {
|
|
defer wg.Done()
|
|
for info := range updates {
|
|
mu.Lock()
|
|
results[0] = info
|
|
mu.Unlock()
|
|
}
|
|
}()
|
|
|
|
// Start scanner. Blocks until done.
|
|
err := es.nsScanner(ctx, allBuckets, bf, wantCycle, updates, healScanMode)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
mu.Lock()
|
|
if firstErr == nil {
|
|
firstErr = err
|
|
}
|
|
// Cancel remaining...
|
|
cancel()
|
|
mu.Unlock()
|
|
return
|
|
}
|
|
}()
|
|
|
|
updateCloser := make(chan chan struct{})
|
|
go func() {
|
|
updateTicker := time.NewTicker(30 * time.Second)
|
|
defer updateTicker.Stop()
|
|
var lastUpdate time.Time
|
|
|
|
// We need to merge since we will get the same buckets from each pool.
|
|
// Therefore to get the exact bucket sizes we must merge before we can convert.
|
|
var allMerged dataUsageCache
|
|
|
|
update := func() {
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
|
|
allMerged = dataUsageCache{Info: dataUsageCacheInfo{Name: dataUsageRoot}}
|
|
for _, info := range results {
|
|
if info.Info.LastUpdate.IsZero() {
|
|
// Not filled yet.
|
|
return
|
|
}
|
|
allMerged.merge(info)
|
|
}
|
|
if allMerged.root() != nil && allMerged.Info.LastUpdate.After(lastUpdate) {
|
|
updates <- allMerged.dui(allMerged.Info.Name, allBuckets)
|
|
lastUpdate = allMerged.Info.LastUpdate
|
|
}
|
|
}
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case v := <-updateCloser:
|
|
update()
|
|
close(v)
|
|
return
|
|
case <-updateTicker.C:
|
|
update()
|
|
}
|
|
}
|
|
}()
|
|
|
|
wg.Wait()
|
|
ch := make(chan struct{})
|
|
select {
|
|
case updateCloser <- ch:
|
|
<-ch
|
|
case <-ctx.Done():
|
|
if firstErr == nil {
|
|
firstErr = ctx.Err()
|
|
}
|
|
}
|
|
return firstErr
|
|
}
|