2021-04-18 12:41:13 -07:00
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
2016-06-01 16:43:31 -07:00
2016-08-18 16:23:42 -07:00
package cmd
2016-05-20 20:48:47 -07:00
import (
2021-03-30 02:00:55 +02:00
"bytes"
2018-03-14 12:01:47 -07:00
"context"
2021-01-11 22:36:51 -08:00
"errors"
2020-03-03 03:29:30 +03:00
"fmt"
2016-05-20 20:48:47 -07:00
"io"
2018-09-20 19:22:09 -07:00
"net/http"
2016-05-20 20:48:47 -07:00
"path"
2022-10-27 09:05:24 -07:00
"runtime"
2021-05-27 13:38:04 -07:00
"strconv"
2020-11-12 12:12:09 -08:00
"strings"
2020-03-11 08:56:36 -07:00
"sync"
2016-05-20 20:48:47 -07:00
2022-01-14 10:01:25 -08:00
"github.com/klauspost/readahead"
2022-12-06 13:46:50 -08:00
"github.com/minio/madmin-go/v2"
2020-07-14 17:38:05 +01:00
"github.com/minio/minio-go/v7/pkg/tags"
2021-06-01 14:59:40 -07:00
"github.com/minio/minio/internal/bucket/lifecycle"
2022-04-11 13:25:32 -07:00
"github.com/minio/minio/internal/bucket/object/lock"
2021-06-01 14:59:40 -07:00
"github.com/minio/minio/internal/bucket/replication"
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/hash"
xhttp "github.com/minio/minio/internal/http"
xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/sync/errgroup"
2021-05-28 15:17:01 -07:00
"github.com/minio/pkg/mimedb"
2022-09-07 07:25:39 -07:00
"github.com/minio/pkg/wildcard"
2021-07-08 01:04:37 -07:00
uatomic "go.uber.org/atomic"
2016-05-20 20:48:47 -07:00
)
2016-11-20 16:57:12 -08:00
// list all errors which can be ignored in object operations.
2020-07-24 13:16:11 -07:00
var objectOpIgnoredErrs = append ( baseIgnoredErrs , errDiskAccessDenied , errUnformattedDisk )
2016-11-20 16:57:12 -08:00
2021-11-16 09:28:29 -08:00
// Object Operations
2016-05-20 20:48:47 -07:00
2021-03-24 14:19:52 -07:00
func countOnlineDisks ( onlineDisks [ ] StorageAPI ) ( online int ) {
for _ , onlineDisk := range onlineDisks {
if onlineDisk != nil && onlineDisk . IsOnline ( ) {
online ++
}
}
return online
}
2016-12-26 16:29:26 -08:00
// CopyObject - copy object source object to destination object.
// if source object and destination object are same we only
// update metadata.
2021-03-04 03:36:43 +01:00
func ( er erasureObjects ) CopyObject ( ctx context . Context , srcBucket , srcObject , dstBucket , dstObject string , srcInfo ObjectInfo , srcOpts , dstOpts ObjectOptions ) ( oi ObjectInfo , err error ) {
2022-05-04 08:45:27 +01:00
auditObjectErasureSet ( ctx , dstObject , & er )
2020-08-03 16:21:10 -07:00
// This call shouldn't be used for anything other than metadata updates or adding self referential versions.
2020-05-28 14:36:38 -07:00
if ! srcInfo . metadataOnly {
return oi , NotImplemented { }
}
2020-10-28 00:09:15 -07:00
2021-05-17 08:25:48 -07:00
defer NSUpdated ( dstBucket , dstObject )
2021-04-19 10:30:42 -07:00
if ! dstOpts . NoLock {
lk := er . NewNSLock ( dstBucket , dstObject )
2021-04-29 20:55:21 -07:00
lkctx , err := lk . GetLock ( ctx , globalOperationTimeout )
2021-04-19 10:30:42 -07:00
if err != nil {
return oi , err
}
2021-04-29 20:55:21 -07:00
ctx = lkctx . Context ( )
defer lk . Unlock ( lkctx . Cancel )
2020-09-15 20:44:48 -07:00
}
2020-05-28 14:36:38 -07:00
// Read metadata associated with the object from all disks.
2020-06-12 20:04:01 -07:00
storageDisks := er . getDisks ( )
2016-12-26 16:29:26 -08:00
2022-04-20 12:49:05 -07:00
var metaArr [ ] FileInfo
var errs [ ] error
// Read metadata associated with the object from all disks.
if srcOpts . VersionID != "" {
metaArr , errs = readAllFileInfo ( ctx , storageDisks , srcBucket , srcObject , srcOpts . VersionID , true )
} else {
metaArr , errs = readAllXL ( ctx , storageDisks , srcBucket , srcObject , true )
}
2021-01-16 12:08:02 -08:00
readQuorum , writeQuorum , err := objectQuorumFromMeta ( ctx , metaArr , errs , er . defaultParityCount )
2020-05-28 14:36:38 -07:00
if err != nil {
2022-04-20 12:49:05 -07:00
if errors . Is ( err , errErasureReadQuorum ) && ! strings . HasPrefix ( srcBucket , minioMetaBucket ) {
_ , derr := er . deleteIfDangling ( ctx , srcBucket , srcObject , metaArr , errs , nil , srcOpts )
if derr != nil {
err = derr
}
}
return ObjectInfo { } , toObjectErr ( err , srcBucket , srcObject )
2020-05-28 14:36:38 -07:00
}
2016-12-26 16:29:26 -08:00
2020-05-28 14:36:38 -07:00
// List all online disks.
2021-11-21 10:41:30 -08:00
onlineDisks , modTime := listOnlineDisks ( storageDisks , metaArr , errs )
2016-12-26 16:29:26 -08:00
2020-05-28 14:36:38 -07:00
// Pick latest valid metadata.
2021-11-21 10:41:30 -08:00
fi , err := pickValidFileInfo ( ctx , metaArr , modTime , readQuorum )
2020-05-28 14:36:38 -07:00
if err != nil {
return oi , toObjectErr ( err , srcBucket , srcObject )
}
2020-06-12 20:04:01 -07:00
if fi . Deleted {
if srcOpts . VersionID == "" {
return oi , toObjectErr ( errFileNotFound , srcBucket , srcObject )
}
2022-05-31 02:57:57 -07:00
return fi . ToObjectInfo ( srcBucket , srcObject , srcOpts . Versioned || srcOpts . VersionSuspended ) , toObjectErr ( errMethodNotAllowed , srcBucket , srcObject )
2020-05-28 14:36:38 -07:00
}
2018-03-02 17:24:02 -08:00
2021-11-21 10:41:30 -08:00
filterOnlineDisksInplace ( fi , metaArr , onlineDisks )
2020-08-03 16:21:10 -07:00
versionID := srcInfo . VersionID
if srcInfo . versionOnly {
versionID = dstOpts . VersionID
// preserve destination versionId if specified.
if versionID == "" {
versionID = mustGetUUID ( )
2022-04-20 10:22:05 -07:00
fi . IsLatest = true // we are creating a new version so this is latest.
2020-08-03 16:21:10 -07:00
}
modTime = UTCNow ( )
}
2022-04-20 10:22:05 -07:00
2020-08-03 16:21:10 -07:00
fi . VersionID = versionID // set any new versionID we might have created
fi . ModTime = modTime // set modTime for the new versionID
2020-11-19 11:50:22 -08:00
if ! dstOpts . MTime . IsZero ( ) {
modTime = dstOpts . MTime
fi . ModTime = dstOpts . MTime
}
2021-01-29 14:49:18 -08:00
fi . Metadata = srcInfo . UserDefined
2020-09-15 20:44:48 -07:00
srcInfo . UserDefined [ "etag" ] = srcInfo . ETag
2022-05-24 17:27:45 -07:00
inlineData := fi . InlineData ( )
freeVersionID := fi . TierFreeVersionID ( )
freeVersionMarker := fi . TierFreeVersion ( )
2020-06-12 20:04:01 -07:00
// Update `xl.meta` content on each disks.
for index := range metaArr {
2020-11-20 09:10:48 -08:00
if metaArr [ index ] . IsValid ( ) {
metaArr [ index ] . ModTime = modTime
metaArr [ index ] . VersionID = versionID
2022-04-20 10:22:05 -07:00
if ! metaArr [ index ] . InlineData ( ) {
// If the data is not inlined, we may end up incorrectly
// inlining the data here, that leads to an inconsistent
// situation where some objects are were not inlined
// were now inlined, make sure to `nil` the Data such
// that xl.meta is written as expected.
metaArr [ index ] . Data = nil
}
2022-05-24 17:27:45 -07:00
metaArr [ index ] . Metadata = srcInfo . UserDefined
// Preserve existing values
if inlineData {
metaArr [ index ] . SetInlineData ( )
}
if freeVersionID != "" {
metaArr [ index ] . SetTierFreeVersionID ( freeVersionID )
}
if freeVersionMarker {
metaArr [ index ] . SetTierFreeVersion ( )
}
2020-11-20 09:10:48 -08:00
}
2020-06-12 20:04:01 -07:00
}
2016-12-26 16:29:26 -08:00
2021-04-01 22:12:03 -07:00
// Write unique `xl.meta` for each disk.
2021-04-20 10:44:39 -07:00
if _ , err = writeUniqueFileInfo ( ctx , onlineDisks , srcBucket , srcObject , metaArr , writeQuorum ) ; err != nil {
2021-04-01 22:12:03 -07:00
return oi , toObjectErr ( err , srcBucket , srcObject )
2016-12-26 16:29:26 -08:00
}
2022-05-31 02:57:57 -07:00
return fi . ToObjectInfo ( srcBucket , srcObject , srcOpts . Versioned || srcOpts . VersionSuspended ) , nil
2016-12-26 16:29:26 -08:00
}
2018-09-20 19:22:09 -07:00
// GetObjectNInfo - returns object info and an object
// Read(Closer). When err != nil, the returned reader is always nil.
2020-06-12 20:04:01 -07:00
func ( er erasureObjects ) GetObjectNInfo ( ctx context . Context , bucket , object string , rs * HTTPRangeSpec , h http . Header , lockType LockType , opts ObjectOptions ) ( gr * GetObjectReader , err error ) {
2022-05-04 08:45:27 +01:00
auditObjectErasureSet ( ctx , object , & er )
2020-09-14 15:57:13 -07:00
var unlockOnDefer bool
2022-01-02 09:15:06 -08:00
nsUnlocker := func ( ) { }
2020-09-14 15:57:13 -07:00
defer func ( ) {
if unlockOnDefer {
nsUnlocker ( )
}
} ( )
// Acquire lock
if lockType != noLock {
2020-11-04 08:25:42 -08:00
lock := er . NewNSLock ( bucket , object )
2020-09-14 15:57:13 -07:00
switch lockType {
case writeLock :
2021-04-29 20:55:21 -07:00
lkctx , err := lock . GetLock ( ctx , globalOperationTimeout )
2021-03-04 03:36:43 +01:00
if err != nil {
2020-09-14 15:57:13 -07:00
return nil , err
}
2021-04-29 20:55:21 -07:00
ctx = lkctx . Context ( )
nsUnlocker = func ( ) { lock . Unlock ( lkctx . Cancel ) }
2020-09-14 15:57:13 -07:00
case readLock :
2021-04-29 20:55:21 -07:00
lkctx , err := lock . GetRLock ( ctx , globalOperationTimeout )
2021-03-04 03:36:43 +01:00
if err != nil {
2020-09-14 15:57:13 -07:00
return nil , err
}
2021-04-29 20:55:21 -07:00
ctx = lkctx . Context ( )
nsUnlocker = func ( ) { lock . RUnlock ( lkctx . Cancel ) }
2020-09-14 15:57:13 -07:00
}
unlockOnDefer = true
}
2021-01-07 19:27:31 -08:00
fi , metaArr , onlineDisks , err := er . getObjectFileInfo ( ctx , bucket , object , opts , true )
2018-09-20 19:22:09 -07:00
if err != nil {
return nil , toObjectErr ( err , bucket , object )
}
2021-12-21 10:08:26 -08:00
if ! fi . DataShardFixed ( ) {
diskMTime := pickValidDiskTimeWithQuorum ( metaArr , fi . Erasure . DataBlocks )
if ! diskMTime . Equal ( timeSentinel ) && ! diskMTime . IsZero ( ) {
for index := range onlineDisks {
if onlineDisks [ index ] == OfflineDisk {
continue
}
if ! metaArr [ index ] . IsValid ( ) {
continue
}
2021-12-22 11:43:01 -08:00
if ! metaArr [ index ] . AcceptableDelta ( diskMTime , shardDiskTimeDelta ) {
2021-12-21 10:08:26 -08:00
// If disk mTime mismatches it is considered outdated
// https://github.com/minio/minio/pull/13803
//
// This check only is active if we could find maximally
// occurring disk mtimes that are somewhat same across
// the quorum. Allowing to skip those shards which we
// might think are wrong.
onlineDisks [ index ] = OfflineDisk
}
}
}
}
2022-05-31 02:57:57 -07:00
objInfo := fi . ToObjectInfo ( bucket , object , opts . Versioned || opts . VersionSuspended )
2020-07-02 16:17:27 -07:00
if objInfo . DeleteMarker {
if opts . VersionID == "" {
return & GetObjectReader {
ObjInfo : objInfo ,
} , toObjectErr ( errFileNotFound , bucket , object )
}
// Make sure to return object info to provide extra information.
return & GetObjectReader {
ObjInfo : objInfo ,
} , toObjectErr ( errMethodNotAllowed , bucket , object )
}
2021-05-04 08:40:42 -07:00
if objInfo . IsRemote ( ) {
gr , err := getTransitionedObjectReader ( ctx , bucket , object , rs , h , objInfo , opts )
if err != nil {
return nil , err
2020-11-12 12:12:09 -08:00
}
2021-05-04 08:40:42 -07:00
unlockOnDefer = false
return gr . WithCleanupFuncs ( nsUnlocker ) , nil
2020-11-12 12:12:09 -08:00
}
2021-04-30 18:37:58 -07:00
fn , off , length , err := NewGetObjectReader ( rs , objInfo , opts )
if err != nil {
return nil , err
2018-09-20 19:22:09 -07:00
}
2021-04-30 18:37:58 -07:00
unlockOnDefer = false
2021-05-11 09:18:37 -07:00
pr , pw := xioutil . WaitPipe ( )
2018-09-20 19:22:09 -07:00
go func ( ) {
2021-04-30 18:37:58 -07:00
pw . CloseWithError ( er . getObjectWithFileInfo ( ctx , bucket , object , off , length , pw , fi , metaArr , onlineDisks ) )
2018-09-20 19:22:09 -07:00
} ( )
2020-04-20 22:01:59 -07:00
2018-09-21 11:42:06 -07:00
// Cleanup function to cause the go routine above to exit, in
// case of incomplete read.
2021-05-11 09:18:37 -07:00
pipeCloser := func ( ) {
pr . CloseWithError ( nil )
}
2018-09-20 19:22:09 -07:00
2021-06-24 09:44:00 -07:00
return fn ( pr , h , pipeCloser , nsUnlocker )
2018-09-20 19:22:09 -07:00
}
2020-10-28 09:18:35 -07:00
func ( er erasureObjects ) getObjectWithFileInfo ( ctx context . Context , bucket , object string , startOffset int64 , length int64 , writer io . Writer , fi FileInfo , metaArr [ ] FileInfo , onlineDisks [ ] StorageAPI ) error {
2020-10-28 00:09:15 -07:00
// Reorder online disks based on erasure distribution order.
2016-07-24 22:49:27 -07:00
// Reorder parts metadata based on erasure distribution order.
2021-03-15 20:03:13 -07:00
onlineDisks , metaArr = shuffleDisksAndPartsMetadataByIndex ( onlineDisks , metaArr , fi )
2016-05-25 16:42:31 -07:00
2016-12-21 11:29:32 -08:00
// For negative length read everything.
if length < 0 {
2020-06-12 20:04:01 -07:00
length = fi . Size - startOffset
2016-07-08 07:46:49 -07:00
}
2016-12-21 11:29:32 -08:00
// Reply back invalid range if the input offset and length fall out of range.
2020-06-12 20:04:01 -07:00
if startOffset > fi . Size || startOffset + length > fi . Size {
logger . LogIf ( ctx , InvalidRange { startOffset , length , fi . Size } , logger . Application )
return InvalidRange { startOffset , length , fi . Size }
2016-07-08 07:46:49 -07:00
}
2016-06-20 02:05:26 +05:30
// Get start part index and offset.
2020-06-12 20:04:01 -07:00
partIndex , partOffset , err := fi . ObjectToPartOffset ( ctx , startOffset )
2016-05-20 20:48:47 -07:00
if err != nil {
2020-06-12 20:04:01 -07:00
return InvalidRange { startOffset , length , fi . Size }
2016-05-20 20:48:47 -07:00
}
2016-05-31 20:23:31 -07:00
2017-01-27 19:51:02 +01:00
// Calculate endOffset according to length
endOffset := startOffset
if length > 0 {
endOffset += length - 1
}
2016-06-20 02:05:26 +05:30
// Get last part index to read given length.
2020-06-12 20:04:01 -07:00
lastPartIndex , _ , err := fi . ObjectToPartOffset ( ctx , endOffset )
2016-06-20 02:05:26 +05:30
if err != nil {
2020-06-12 20:04:01 -07:00
return InvalidRange { startOffset , length , fi . Size }
2016-06-20 02:05:26 +05:30
}
2017-02-24 09:20:40 -08:00
var totalBytesRead int64
2020-06-12 20:04:01 -07:00
erasure , err := NewErasure ( ctx , fi . Erasure . DataBlocks , fi . Erasure . ParityBlocks , fi . Erasure . BlockSize )
2017-08-14 18:08:42 -07:00
if err != nil {
return toObjectErr ( err , bucket , object )
}
2021-05-25 16:33:06 -07:00
Prefer local disks when fetching data blocks (#9563)
If the requested server is part of the set this will always read
from the local disk, even if the disk contains a parity shard.
In default setup there is a 50% chance that at least
one shard that otherwise would have been fetched remotely
will be read locally instead.
It basically trades RPC call overhead for reed-solomon.
On distributed localhost this seems to be fairly break-even,
with a very small gain in throughput and latency.
However on networked servers this should be a bigger
1MB objects, before:
```
Operation: GET. Concurrency: 32. Hosts: 4.
Requests considered: 76257:
* Avg: 25ms 50%: 24ms 90%: 32ms 99%: 42ms Fastest: 7ms Slowest: 67ms
* First Byte: Average: 23ms, Median: 22ms, Best: 5ms, Worst: 65ms
Throughput:
* Average: 1213.68 MiB/s, 1272.63 obj/s (59.948s, starting 14:45:44 CEST)
```
After:
```
Operation: GET. Concurrency: 32. Hosts: 4.
Requests considered: 78845:
* Avg: 24ms 50%: 24ms 90%: 31ms 99%: 39ms Fastest: 8ms Slowest: 62ms
* First Byte: Average: 22ms, Median: 21ms, Best: 6ms, Worst: 57ms
Throughput:
* Average: 1255.11 MiB/s, 1316.08 obj/s (59.938s, starting 14:43:58 CEST)
```
Bonus fix: Only ask for heal once on an object.
2020-05-26 16:47:23 -07:00
var healOnce sync . Once
2020-11-12 12:12:09 -08:00
2021-05-14 16:50:47 -07:00
// once we have obtained a common FileInfo i.e latest, we should stick
// to single dataDir to read the content to avoid reading from some other
// dataDir that has stale FileInfo{} to ensure that we fail appropriately
// during reads and expect the same dataDir everywhere.
dataDir := fi . DataDir
2016-06-20 02:05:26 +05:30
for ; partIndex <= lastPartIndex ; partIndex ++ {
2016-06-22 03:04:11 +05:30
if length == totalBytesRead {
break
}
2020-03-03 03:29:30 +03:00
2020-06-12 20:04:01 -07:00
partNumber := fi . Parts [ partIndex ] . Number
2020-03-03 03:29:30 +03:00
2016-05-31 20:23:31 -07:00
// Save the current part name and size.
2020-06-12 20:04:01 -07:00
partSize := fi . Parts [ partIndex ] . Size
2016-06-22 21:35:03 +05:30
2018-08-06 15:14:08 -07:00
partLength := partSize - partOffset
// partLength should be adjusted so that we don't write more data than what was requested.
if partLength > ( length - totalBytesRead ) {
partLength = length - totalBytesRead
2016-06-20 02:05:26 +05:30
}
2016-05-31 20:23:31 -07:00
2020-06-12 20:04:01 -07:00
tillOffset := erasure . ShardFileOffset ( partOffset , partLength , partSize )
2016-07-16 21:05:30 +05:30
// Get the checksums of the current part.
2019-01-17 04:58:18 -08:00
readers := make ( [ ] io . ReaderAt , len ( onlineDisks ) )
Prefer local disks when fetching data blocks (#9563)
If the requested server is part of the set this will always read
from the local disk, even if the disk contains a parity shard.
In default setup there is a 50% chance that at least
one shard that otherwise would have been fetched remotely
will be read locally instead.
It basically trades RPC call overhead for reed-solomon.
On distributed localhost this seems to be fairly break-even,
with a very small gain in throughput and latency.
However on networked servers this should be a bigger
1MB objects, before:
```
Operation: GET. Concurrency: 32. Hosts: 4.
Requests considered: 76257:
* Avg: 25ms 50%: 24ms 90%: 32ms 99%: 42ms Fastest: 7ms Slowest: 67ms
* First Byte: Average: 23ms, Median: 22ms, Best: 5ms, Worst: 65ms
Throughput:
* Average: 1213.68 MiB/s, 1272.63 obj/s (59.948s, starting 14:45:44 CEST)
```
After:
```
Operation: GET. Concurrency: 32. Hosts: 4.
Requests considered: 78845:
* Avg: 24ms 50%: 24ms 90%: 31ms 99%: 39ms Fastest: 8ms Slowest: 62ms
* First Byte: Average: 22ms, Median: 21ms, Best: 6ms, Worst: 57ms
Throughput:
* Average: 1255.11 MiB/s, 1316.08 obj/s (59.938s, starting 14:43:58 CEST)
```
Bonus fix: Only ask for heal once on an object.
2020-05-26 16:47:23 -07:00
prefer := make ( [ ] bool , len ( onlineDisks ) )
2018-08-06 15:14:08 -07:00
for index , disk := range onlineDisks {
2017-08-14 18:08:42 -07:00
if disk == OfflineDisk {
2016-07-24 22:49:27 -07:00
continue
}
2020-11-20 09:10:48 -08:00
if ! metaArr [ index ] . IsValid ( ) {
continue
}
2020-03-03 03:29:30 +03:00
checksumInfo := metaArr [ index ] . Erasure . GetChecksumInfo ( partNumber )
2021-05-14 16:50:47 -07:00
partPath := pathJoin ( object , dataDir , fmt . Sprintf ( "part.%d" , partNumber ) )
readers [ index ] = newBitrotReader ( disk , metaArr [ index ] . Data , bucket , partPath , tillOffset ,
2020-03-03 03:29:30 +03:00
checksumInfo . Algorithm , checksumInfo . Hash , erasure . ShardSize ( ) )
Prefer local disks when fetching data blocks (#9563)
If the requested server is part of the set this will always read
from the local disk, even if the disk contains a parity shard.
In default setup there is a 50% chance that at least
one shard that otherwise would have been fetched remotely
will be read locally instead.
It basically trades RPC call overhead for reed-solomon.
On distributed localhost this seems to be fairly break-even,
with a very small gain in throughput and latency.
However on networked servers this should be a bigger
1MB objects, before:
```
Operation: GET. Concurrency: 32. Hosts: 4.
Requests considered: 76257:
* Avg: 25ms 50%: 24ms 90%: 32ms 99%: 42ms Fastest: 7ms Slowest: 67ms
* First Byte: Average: 23ms, Median: 22ms, Best: 5ms, Worst: 65ms
Throughput:
* Average: 1213.68 MiB/s, 1272.63 obj/s (59.948s, starting 14:45:44 CEST)
```
After:
```
Operation: GET. Concurrency: 32. Hosts: 4.
Requests considered: 78845:
* Avg: 24ms 50%: 24ms 90%: 31ms 99%: 39ms Fastest: 8ms Slowest: 62ms
* First Byte: Average: 22ms, Median: 21ms, Best: 6ms, Worst: 57ms
Throughput:
* Average: 1255.11 MiB/s, 1316.08 obj/s (59.938s, starting 14:43:58 CEST)
```
Bonus fix: Only ask for heal once on an object.
2020-05-26 16:47:23 -07:00
// Prefer local disks
prefer [ index ] = disk . Hostname ( ) == ""
2016-07-16 21:05:30 +05:30
}
2021-01-27 10:21:14 -08:00
written , err := erasure . Decode ( ctx , writer , readers , partOffset , partLength , partSize , prefer )
2020-06-12 20:04:01 -07:00
// Note: we should not be defer'ing the following closeBitrotReaders() call as
// we are inside a for loop i.e if we use defer, we would accumulate a lot of open files by the time
2019-01-17 04:58:18 -08:00
// we return from this function.
closeBitrotReaders ( readers )
2016-05-31 20:23:31 -07:00
if err != nil {
2021-01-27 10:21:14 -08:00
// If we have successfully written all the content that was asked
// by the client, but we still see an error - this would mean
// that we have some parts or data blocks missing or corrupted
// - attempt a heal to successfully heal them for future calls.
if written == partLength {
var scan madmin . HealScanMode
2021-10-13 19:49:14 -07:00
switch {
case errors . Is ( err , errFileNotFound ) :
2021-01-27 10:21:14 -08:00
scan = madmin . HealNormalScan
2021-10-13 19:49:14 -07:00
case errors . Is ( err , errFileCorrupt ) :
2021-01-27 10:21:14 -08:00
scan = madmin . HealDeepScan
}
2021-10-13 19:49:14 -07:00
switch scan {
case madmin . HealNormalScan , madmin . HealDeepScan :
2021-01-27 10:21:14 -08:00
healOnce . Do ( func ( ) {
2021-02-17 10:18:12 -08:00
if _ , healing := er . getOnlineDisksWithHealing ( ) ; ! healing {
go healObject ( bucket , object , fi . VersionID , scan )
}
2021-01-27 10:21:14 -08:00
} )
2021-10-13 19:49:14 -07:00
// Healing is triggered and we have written
// successfully the content to client for
// the specific part, we should `nil` this error
// and proceed forward, instead of throwing errors.
err = nil
2021-01-27 10:21:14 -08:00
}
2020-04-01 19:14:00 +00:00
}
if err != nil {
return toObjectErr ( err , bucket , object )
}
2016-05-31 20:23:31 -07:00
}
2019-01-17 04:58:18 -08:00
for i , r := range readers {
2018-08-06 15:14:08 -07:00
if r == nil {
onlineDisks [ i ] = OfflineDisk
}
}
2016-07-28 02:20:34 -07:00
// Track total bytes read from disk and written to the client.
2018-08-06 15:14:08 -07:00
totalBytesRead += partLength
2016-06-22 21:35:03 +05:30
// partOffset will be valid only for the first part, hence reset it to 0 for
// the remaining parts.
2016-05-31 20:23:31 -07:00
partOffset = 0
2016-06-01 16:43:31 -07:00
} // End of read all parts loop.
2016-05-31 20:23:31 -07:00
// Return success.
2016-05-28 15:13:15 -07:00
return nil
2016-05-20 20:48:47 -07:00
}
2016-06-01 16:43:31 -07:00
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
2020-06-12 20:04:01 -07:00
func ( er erasureObjects ) GetObjectInfo ( ctx context . Context , bucket , object string , opts ObjectOptions ) ( info ObjectInfo , err error ) {
2022-05-04 08:45:27 +01:00
auditObjectErasureSet ( ctx , object , & er )
2021-02-16 02:43:47 -08:00
if ! opts . NoLock {
// Lock the object before reading.
lk := er . NewNSLock ( bucket , object )
2021-04-29 20:55:21 -07:00
lkctx , err := lk . GetRLock ( ctx , globalOperationTimeout )
2021-03-04 03:36:43 +01:00
if err != nil {
2021-02-16 02:43:47 -08:00
return ObjectInfo { } , err
}
2021-04-29 20:55:21 -07:00
ctx = lkctx . Context ( )
defer lk . RUnlock ( lkctx . Cancel )
2020-09-14 15:57:13 -07:00
}
2020-07-02 16:17:27 -07:00
return er . getObjectInfo ( ctx , bucket , object , opts )
2016-05-20 20:48:47 -07:00
}
2022-10-27 09:05:24 -07:00
func auditDanglingObjectDeletion ( ctx context . Context , bucket , object , versionID string , tags map [ string ] interface { } ) {
2022-10-24 19:35:07 +01:00
if len ( logger . AuditTargets ( ) ) == 0 {
return
}
opts := AuditLogOptions {
Event : "DeleteDanglingObject" ,
Bucket : bucket ,
Object : object ,
VersionID : versionID ,
Tags : tags ,
}
auditLogInternal ( ctx , opts )
}
2022-02-08 20:08:23 -08:00
func ( er erasureObjects ) deleteIfDangling ( ctx context . Context , bucket , object string , metaArr [ ] FileInfo , errs [ ] error , dataErrs [ ] error , opts ObjectOptions ) ( FileInfo , error ) {
2022-10-27 09:05:24 -07:00
_ , file , line , cok := runtime . Caller ( 1 )
2022-02-08 20:08:23 -08:00
var err error
m , ok := isObjectDangling ( metaArr , errs , dataErrs )
if ok {
2022-10-27 09:05:24 -07:00
tags := make ( map [ string ] interface { } , 4 )
tags [ "set" ] = er . setIndex
tags [ "pool" ] = er . poolIndex
tags [ "parity" ] = m . Erasure . ParityBlocks
if cok {
tags [ "caller" ] = fmt . Sprintf ( "%s:%d" , file , line )
}
defer auditDanglingObjectDeletion ( ctx , bucket , object , m . VersionID , tags )
2022-10-24 19:35:07 +01:00
2022-02-08 20:08:23 -08:00
err = errFileNotFound
if opts . VersionID != "" {
err = errFileVersionNotFound
}
defer NSUpdated ( bucket , object )
2022-08-18 16:41:59 -07:00
fi := FileInfo {
VersionID : m . VersionID ,
}
2022-02-08 20:08:23 -08:00
if opts . VersionID != "" {
2022-08-18 16:41:59 -07:00
fi . VersionID = opts . VersionID
}
disks := er . getDisks ( )
g := errgroup . WithNErrs ( len ( disks ) )
for index := range disks {
index := index
g . Go ( func ( ) error {
if disks [ index ] == nil {
return errDiskNotFound
}
return disks [ index ] . DeleteVersion ( ctx , bucket , object , fi , false )
} , index )
2022-02-08 20:08:23 -08:00
}
2022-08-18 16:41:59 -07:00
g . Wait ( )
2022-02-08 20:08:23 -08:00
}
return m , err
}
2022-04-20 12:49:05 -07:00
func readAllXL ( ctx context . Context , disks [ ] StorageAPI , bucket , object string , readData bool ) ( [ ] FileInfo , [ ] error ) {
metadataArray := make ( [ ] * xlMetaV2 , len ( disks ) )
metaFileInfos := make ( [ ] FileInfo , len ( metadataArray ) )
metadataShallowVersions := make ( [ ] [ ] xlMetaV2ShallowVersion , len ( disks ) )
g := errgroup . WithNErrs ( len ( disks ) )
// Read `xl.meta` in parallel across disks.
for index := range disks {
index := index
g . Go ( func ( ) ( err error ) {
if disks [ index ] == nil {
return errDiskNotFound
}
rf , err := disks [ index ] . ReadXL ( ctx , bucket , object , readData )
if err != nil {
return err
}
var xl xlMetaV2
if err = xl . LoadOrConvert ( rf . Buf ) ; err != nil {
return err
}
metadataArray [ index ] = & xl
metaFileInfos [ index ] = FileInfo {
DiskMTime : rf . DiskMTime ,
}
return nil
} , index )
}
2022-09-02 12:47:17 -07:00
ignoredErrs := [ ] error {
errFileNotFound ,
2022-11-08 22:29:05 +01:00
errFileNameTooLong ,
2022-09-02 12:47:17 -07:00
errVolumeNotFound ,
errFileVersionNotFound ,
errDiskNotFound ,
}
2022-04-20 12:49:05 -07:00
errs := g . Wait ( )
2022-06-06 15:15:11 -07:00
for index , err := range errs {
2022-06-06 17:33:41 -07:00
if err == nil {
continue
}
2022-09-02 12:47:17 -07:00
if bucket == minioMetaBucket {
// minioMetaBucket "reads" for .metacache are not written with O_SYNC
// so there is a potential for them to not fully committed to stable
// storage leading to unexpected EOFs. Allow these failures to
// be ignored since the caller already ignores them in streamMetadataParts()
ignoredErrs = append ( ignoredErrs , io . ErrUnexpectedEOF , io . EOF )
}
if ! IsErr ( err , ignoredErrs ... ) {
2022-06-06 15:15:11 -07:00
logger . LogOnceIf ( ctx , fmt . Errorf ( "Drive %s, path (%s/%s) returned an error (%w)" ,
disks [ index ] , bucket , object , err ) ,
disks [ index ] . String ( ) )
}
}
2022-04-20 12:49:05 -07:00
for index := range metadataArray {
if metadataArray [ index ] != nil {
metadataShallowVersions [ index ] = metadataArray [ index ] . versions
}
}
readQuorum := ( len ( disks ) + 1 ) / 2
2022-05-30 12:43:54 -07:00
meta := & xlMetaV2 { versions : mergeXLV2Versions ( readQuorum , false , 1 , metadataShallowVersions ... ) }
lfi , err := meta . ToFileInfo ( bucket , object , "" )
if err != nil {
for i := range errs {
if errs [ i ] == nil {
errs [ i ] = err
}
}
return metaFileInfos , errs
}
if ! lfi . IsValid ( ) {
for i := range errs {
if errs [ i ] == nil {
errs [ i ] = errCorruptedFormat
}
}
return metaFileInfos , errs
}
2022-07-29 10:03:53 -07:00
versionID := lfi . VersionID
if versionID == "" {
versionID = nullVersionID
}
2022-04-20 12:49:05 -07:00
for index := range metadataArray {
if metadataArray [ index ] == nil {
continue
}
// make sure to preserve this for diskmtime based healing bugfix.
diskMTime := metaFileInfos [ index ] . DiskMTime
2022-07-29 10:03:53 -07:00
metaFileInfos [ index ] , errs [ index ] = metadataArray [ index ] . ToFileInfo ( bucket , object , versionID )
2022-05-30 12:43:54 -07:00
if errs [ index ] != nil {
continue
}
2022-07-29 10:03:53 -07:00
if readData {
metaFileInfos [ index ] . Data = metadataArray [ index ] . data . find ( versionID )
2022-04-20 12:49:05 -07:00
}
2022-07-29 10:03:53 -07:00
metaFileInfos [ index ] . DiskMTime = diskMTime
2022-04-20 12:49:05 -07:00
}
// Return all the metadata.
return metaFileInfos , errs
}
2021-01-07 19:27:31 -08:00
func ( er erasureObjects ) getObjectFileInfo ( ctx context . Context , bucket , object string , opts ObjectOptions , readData bool ) ( fi FileInfo , metaArr [ ] FileInfo , onlineDisks [ ] StorageAPI , err error ) {
2020-06-12 20:04:01 -07:00
disks := er . getDisks ( )
2018-10-19 11:00:09 -07:00
2022-04-20 12:49:05 -07:00
var errs [ ] error
2018-10-19 11:00:09 -07:00
// Read metadata associated with the object from all disks.
2022-04-20 12:49:05 -07:00
if opts . VersionID != "" {
metaArr , errs = readAllFileInfo ( ctx , disks , bucket , object , opts . VersionID , readData )
} else {
metaArr , errs = readAllXL ( ctx , disks , bucket , object , readData )
}
2018-10-19 11:00:09 -07:00
2021-01-16 12:08:02 -08:00
readQuorum , _ , err := objectQuorumFromMeta ( ctx , metaArr , errs , er . defaultParityCount )
2019-02-05 17:58:48 -08:00
if err != nil {
2022-02-08 20:08:23 -08:00
if errors . Is ( err , errErasureReadQuorum ) && ! strings . HasPrefix ( bucket , minioMetaBucket ) {
_ , derr := er . deleteIfDangling ( ctx , bucket , object , metaArr , errs , nil , opts )
if derr != nil {
err = derr
}
}
return fi , nil , nil , toObjectErr ( err , bucket , object )
2018-02-01 10:47:49 -08:00
}
2020-05-27 16:14:26 -07:00
if reducedErr := reduceReadQuorumErrs ( ctx , errs , objectOpIgnoredErrs , readQuorum ) ; reducedErr != nil {
2022-01-10 09:07:49 -08:00
if errors . Is ( reducedErr , errErasureReadQuorum ) && ! strings . HasPrefix ( bucket , minioMetaBucket ) {
2022-12-05 20:18:50 +01:00
er . deleteIfDangling ( ctx , bucket , object , metaArr , errs , nil , opts )
2020-11-05 11:48:55 -08:00
}
2020-06-12 20:04:01 -07:00
return fi , nil , nil , toObjectErr ( reducedErr , bucket , object )
2020-05-27 16:14:26 -07:00
}
2020-06-12 20:04:01 -07:00
2020-05-27 16:14:26 -07:00
// List all online disks.
2021-11-21 10:41:30 -08:00
onlineDisks , modTime := listOnlineDisks ( disks , metaArr , errs )
2018-02-01 10:47:49 -08:00
// Pick latest valid metadata.
2021-11-21 10:41:30 -08:00
fi , err = pickValidFileInfo ( ctx , metaArr , modTime , readQuorum )
2016-05-25 01:33:39 -07:00
if err != nil {
2020-06-12 20:04:01 -07:00
return fi , nil , nil , err
2016-05-20 20:48:47 -07:00
}
2021-01-27 10:21:14 -08:00
2021-11-21 10:41:30 -08:00
filterOnlineDisksInplace ( fi , metaArr , onlineDisks )
2021-04-29 09:54:16 -07:00
// if one of the disk is offline, return right here no need
// to attempt a heal on the object.
if countErrs ( errs , errDiskNotFound ) > 0 {
return fi , metaArr , onlineDisks , nil
}
2021-01-27 10:21:14 -08:00
var missingBlocks int
for i , err := range errs {
if err != nil && errors . Is ( err , errFileNotFound ) {
missingBlocks ++
continue
}
2021-11-21 10:41:30 -08:00
if metaArr [ i ] . IsValid ( ) && metaArr [ i ] . ModTime . Equal ( fi . ModTime ) {
2021-01-27 10:21:14 -08:00
continue
}
2022-05-30 12:43:54 -07:00
metaArr [ i ] = FileInfo { }
onlineDisks [ i ] = nil
2021-01-27 10:21:14 -08:00
missingBlocks ++
}
// if missing metadata can be reconstructed, attempt to reconstruct.
2021-07-26 08:01:41 -07:00
// additionally do not heal delete markers inline, let them be
// healed upon regular heal process.
if ! fi . Deleted && missingBlocks > 0 && missingBlocks < readQuorum {
2021-02-17 10:18:12 -08:00
if _ , healing := er . getOnlineDisksWithHealing ( ) ; ! healing {
go healObject ( bucket , object , fi . VersionID , madmin . HealNormalScan )
}
2021-01-27 10:21:14 -08:00
}
2020-06-12 20:04:01 -07:00
return fi , metaArr , onlineDisks , nil
2020-05-27 16:14:26 -07:00
}
// getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo.
2020-06-12 20:04:01 -07:00
func ( er erasureObjects ) getObjectInfo ( ctx context . Context , bucket , object string , opts ObjectOptions ) ( objInfo ObjectInfo , err error ) {
2021-01-07 19:27:31 -08:00
fi , _ , _ , err := er . getObjectFileInfo ( ctx , bucket , object , opts , false )
2020-05-27 16:14:26 -07:00
if err != nil {
2020-07-02 16:17:27 -07:00
return objInfo , toObjectErr ( err , bucket , object )
2020-05-27 16:14:26 -07:00
}
2022-05-31 02:57:57 -07:00
objInfo = fi . ToObjectInfo ( bucket , object , opts . Versioned || opts . VersionSuspended )
2020-06-12 20:04:01 -07:00
if fi . Deleted {
2020-11-19 18:43:58 -08:00
if opts . VersionID == "" || opts . DeleteMarker {
2020-06-12 20:04:01 -07:00
return objInfo , toObjectErr ( errFileNotFound , bucket , object )
}
// Make sure to return object info to provide extra information.
2020-06-17 08:33:14 -07:00
return objInfo , toObjectErr ( errMethodNotAllowed , bucket , object )
2020-06-12 20:04:01 -07:00
}
2020-11-21 23:48:50 -08:00
return objInfo , nil
2016-05-20 20:48:47 -07:00
}
2021-09-01 08:57:42 -07:00
// getObjectInfoAndQuroum - wrapper for reading object metadata and constructs ObjectInfo, additionally returns write quorum for the object.
func ( er erasureObjects ) getObjectInfoAndQuorum ( ctx context . Context , bucket , object string , opts ObjectOptions ) ( objInfo ObjectInfo , wquorum int , err error ) {
fi , _ , _ , err := er . getObjectFileInfo ( ctx , bucket , object , opts , false )
if err != nil {
2021-12-28 12:41:52 -08:00
return objInfo , er . defaultWQuorum ( ) , toObjectErr ( err , bucket , object )
2021-09-01 08:57:42 -07:00
}
2022-09-15 12:43:49 -07:00
wquorum = fi . WriteQuorum ( er . defaultWQuorum ( ) )
2021-09-01 08:57:42 -07:00
2022-05-31 02:57:57 -07:00
objInfo = fi . ToObjectInfo ( bucket , object , opts . Versioned || opts . VersionSuspended )
2021-09-18 16:31:35 -04:00
if ! fi . VersionPurgeStatus ( ) . Empty ( ) && opts . VersionID != "" {
2021-09-01 08:57:42 -07:00
// Make sure to return object info to provide extra information.
return objInfo , wquorum , toObjectErr ( errMethodNotAllowed , bucket , object )
}
if fi . Deleted {
if opts . VersionID == "" || opts . DeleteMarker {
return objInfo , wquorum , toObjectErr ( errFileNotFound , bucket , object )
}
// Make sure to return object info to provide extra information.
return objInfo , wquorum , toObjectErr ( errMethodNotAllowed , bucket , object )
}
return objInfo , wquorum , nil
}
2020-06-12 20:04:01 -07:00
// Similar to rename but renames data from srcEntry to dstEntry at dataDir
2021-04-20 10:44:39 -07:00
func renameData ( ctx context . Context , disks [ ] StorageAPI , srcBucket , srcEntry string , metadata [ ] FileInfo , dstBucket , dstEntry string , writeQuorum int ) ( [ ] StorageAPI , error ) {
2020-06-12 20:04:01 -07:00
g := errgroup . WithNErrs ( len ( disks ) )
2021-06-30 19:32:07 -07:00
fvID := mustGetUUID ( )
for index := range disks {
metadata [ index ] . SetTierFreeVersionID ( fvID )
}
2022-09-05 16:51:37 -07:00
diskVersions := make ( [ ] uint64 , len ( disks ) )
2020-06-12 20:04:01 -07:00
// Rename file on all underlying storage disks.
for index := range disks {
index := index
g . Go ( func ( ) error {
if disks [ index ] == nil {
return errDiskNotFound
}
2022-09-05 16:51:37 -07:00
2021-04-20 10:44:39 -07:00
// Pick one FileInfo for a disk at index.
fi := metadata [ index ]
// Assign index when index is initialized
if fi . Erasure . Index == 0 {
fi . Erasure . Index = index + 1
2020-06-12 20:04:01 -07:00
}
2021-06-30 19:32:07 -07:00
2022-09-05 16:51:37 -07:00
if ! fi . IsValid ( ) {
return errFileCorrupt
2021-04-20 10:44:39 -07:00
}
2022-09-05 16:51:37 -07:00
sign , err := disks [ index ] . RenameData ( ctx , srcBucket , srcEntry , fi , dstBucket , dstEntry )
if err != nil {
return err
}
diskVersions [ index ] = sign
return nil
2020-06-12 20:04:01 -07:00
} , index )
}
// Wait for all renames to finish.
errs := g . Wait ( )
2022-09-07 07:25:39 -07:00
err := reduceWriteQuorumErrs ( ctx , errs , objectOpIgnoredErrs , writeQuorum )
if err == nil {
versions := reduceCommonVersions ( diskVersions , writeQuorum )
for index , dversions := range diskVersions {
if versions != dversions {
errs [ index ] = errFileNeedsHealing
}
2022-09-05 16:51:37 -07:00
}
2022-09-07 07:25:39 -07:00
err = reduceWriteQuorumErrs ( ctx , errs , objectOpIgnoredErrs , writeQuorum )
2022-09-05 16:51:37 -07:00
}
2022-02-23 11:59:13 -08:00
// We can safely allow RenameData errors up to len(er.getDisks()) - writeQuorum
2022-09-07 07:25:39 -07:00
// otherwise return failure.
2020-06-12 20:04:01 -07:00
return evalDisks ( disks , errs ) , err
}
2021-08-09 06:58:54 -07:00
func ( er erasureObjects ) putMetacacheObject ( ctx context . Context , key string , r * PutObjReader , opts ObjectOptions ) ( objInfo ObjectInfo , err error ) {
data := r . Reader
// No metadata is set, allocate a new one.
if opts . UserDefined == nil {
opts . UserDefined = make ( map [ string ] string )
}
storageDisks := er . getDisks ( )
// Get parity and data drive count based on storage class metadata
parityDrives := globalStorageClass . GetParityForSC ( opts . UserDefined [ xhttp . AmzStorageClass ] )
2022-06-27 20:22:18 -07:00
if parityDrives < 0 {
2021-08-09 06:58:54 -07:00
parityDrives = er . defaultParityCount
}
dataDrives := len ( storageDisks ) - parityDrives
// we now know the number of blocks this object needs for data and parity.
// writeQuorum is dataBlocks + 1
writeQuorum := dataDrives
if dataDrives == parityDrives {
writeQuorum ++
}
// Validate input data size and it can never be less than zero.
if data . Size ( ) < - 1 {
logger . LogIf ( ctx , errInvalidArgument , logger . Application )
return ObjectInfo { } , toObjectErr ( errInvalidArgument )
}
// Initialize parts metadata
partsMetadata := make ( [ ] FileInfo , len ( storageDisks ) )
fi := newFileInfo ( pathJoin ( minioMetaBucket , key ) , dataDrives , parityDrives )
fi . DataDir = mustGetUUID ( )
// Initialize erasure metadata.
for index := range partsMetadata {
partsMetadata [ index ] = fi
}
// Order disks according to erasure distribution
var onlineDisks [ ] StorageAPI
onlineDisks , partsMetadata = shuffleDisksAndPartsMetadata ( storageDisks , partsMetadata , fi )
erasure , err := NewErasure ( ctx , fi . Erasure . DataBlocks , fi . Erasure . ParityBlocks , fi . Erasure . BlockSize )
if err != nil {
return ObjectInfo { } , toObjectErr ( err , minioMetaBucket , key )
}
// Fetch buffer for I/O, returns from the pool if not allocates a new one and returns.
var buffer [ ] byte
switch size := data . Size ( ) ; {
case size == 0 :
buffer = make ( [ ] byte , 1 ) // Allocate atleast a byte to reach EOF
case size >= fi . Erasure . BlockSize :
buffer = er . bp . Get ( )
defer er . bp . Put ( buffer )
case size < fi . Erasure . BlockSize :
// No need to allocate fully blockSizeV1 buffer if the incoming data is smaller.
buffer = make ( [ ] byte , size , 2 * size + int64 ( fi . Erasure . ParityBlocks + fi . Erasure . DataBlocks - 1 ) )
}
if len ( buffer ) > int ( fi . Erasure . BlockSize ) {
buffer = buffer [ : fi . Erasure . BlockSize ]
}
shardFileSize := erasure . ShardFileSize ( data . Size ( ) )
writers := make ( [ ] io . Writer , len ( onlineDisks ) )
inlineBuffers := make ( [ ] * bytes . Buffer , len ( onlineDisks ) )
for i , disk := range onlineDisks {
if disk == nil {
continue
}
2022-01-12 18:49:01 -08:00
if disk . IsOnline ( ) {
inlineBuffers [ i ] = bytes . NewBuffer ( make ( [ ] byte , 0 , shardFileSize ) )
writers [ i ] = newStreamingBitrotWriterBuffer ( inlineBuffers [ i ] , DefaultBitrotAlgorithm , erasure . ShardSize ( ) )
}
2021-08-09 06:58:54 -07:00
}
n , erasureErr := erasure . Encode ( ctx , data , writers , buffer , writeQuorum )
closeBitrotWriters ( writers )
if erasureErr != nil {
return ObjectInfo { } , toObjectErr ( erasureErr , minioMetaBucket , key )
}
// Should return IncompleteBody{} error when reader has fewer bytes
// than specified in request header.
if n < data . Size ( ) {
return ObjectInfo { } , IncompleteBody { Bucket : minioMetaBucket , Object : key }
}
2022-07-11 17:30:56 -07:00
var index [ ] byte
if opts . IndexCB != nil {
index = opts . IndexCB ( )
}
2021-08-09 06:58:54 -07:00
2022-07-19 18:56:24 -07:00
modTime := UTCNow ( )
2021-08-09 06:58:54 -07:00
for i , w := range writers {
if w == nil {
2022-01-12 18:49:01 -08:00
// Make sure to avoid writing to disks which we couldn't complete in erasure.Encode()
2021-08-09 06:58:54 -07:00
onlineDisks [ i ] = nil
continue
}
2021-08-10 11:12:22 -07:00
partsMetadata [ i ] . Data = inlineBuffers [ i ] . Bytes ( )
2022-08-30 01:57:16 +02:00
partsMetadata [ i ] . AddObjectPart ( 1 , "" , n , data . ActualSize ( ) , modTime , index , nil )
2021-08-09 06:58:54 -07:00
partsMetadata [ i ] . Erasure . AddChecksumInfo ( ChecksumInfo {
PartNumber : 1 ,
Algorithm : DefaultBitrotAlgorithm ,
Hash : bitrotWriterSum ( w ) ,
} )
}
// Fill all the necessary metadata.
// Update `xl.meta` content on each disks.
for index := range partsMetadata {
partsMetadata [ index ] . Size = n
2021-08-10 11:12:22 -07:00
partsMetadata [ index ] . Fresh = true
2021-08-09 06:58:54 -07:00
partsMetadata [ index ] . ModTime = modTime
2021-08-10 11:12:22 -07:00
partsMetadata [ index ] . Metadata = opts . UserDefined
2021-08-09 06:58:54 -07:00
}
// Set an additional header when data is inlined.
for index := range partsMetadata {
partsMetadata [ index ] . SetInlineData ( )
}
for i := 0 ; i < len ( onlineDisks ) ; i ++ {
if onlineDisks [ i ] != nil && onlineDisks [ i ] . IsOnline ( ) {
// Object info is the same in all disks, so we can pick
// the first meta from online disk
fi = partsMetadata [ i ]
break
}
}
2021-08-10 11:12:22 -07:00
if _ , err = writeUniqueFileInfo ( ctx , onlineDisks , minioMetaBucket , key , partsMetadata , writeQuorum ) ; err != nil {
return ObjectInfo { } , toObjectErr ( err , minioMetaBucket , key )
2021-08-09 06:58:54 -07:00
}
2022-05-31 02:57:57 -07:00
return fi . ToObjectInfo ( minioMetaBucket , key , opts . Versioned || opts . VersionSuspended ) , nil
2021-08-09 06:58:54 -07:00
}
2016-06-01 16:43:31 -07:00
// PutObject - creates an object upon reading from the input stream
// until EOF, erasure codes the data across all disk and additionally
2020-06-12 20:04:01 -07:00
// writes `xl.meta` which carries the necessary metadata for future
2016-06-01 16:43:31 -07:00
// object operations.
2020-06-12 20:04:01 -07:00
func ( er erasureObjects ) PutObject ( ctx context . Context , bucket string , object string , data * PutObjReader , opts ObjectOptions ) ( objInfo ObjectInfo , err error ) {
return er . putObject ( ctx , bucket , object , data , opts )
2018-01-12 20:34:52 -08:00
}
2020-06-12 20:04:01 -07:00
// putObject wrapper for erasureObjects PutObject
func ( er erasureObjects ) putObject ( ctx context . Context , bucket string , object string , r * PutObjReader , opts ObjectOptions ) ( objInfo ObjectInfo , err error ) {
2022-05-04 08:45:27 +01:00
auditObjectErasureSet ( ctx , object , & er )
2022-09-14 18:44:04 -07:00
if opts . CheckPrecondFn != nil {
obj , err := er . getObjectInfo ( ctx , bucket , object , opts )
2022-11-26 14:43:32 -08:00
if err != nil && ! isErrVersionNotFound ( err ) {
2022-09-14 18:44:04 -07:00
return objInfo , err
}
if opts . CheckPrecondFn ( obj ) {
return objInfo , PreConditionFailed { }
}
}
2018-11-14 17:36:41 -08:00
data := r . Reader
2022-05-05 04:14:41 -07:00
userDefined := cloneMSS ( opts . UserDefined )
2018-01-29 18:43:13 -08:00
2020-06-12 20:04:01 -07:00
storageDisks := er . getDisks ( )
2019-10-06 22:50:24 -07:00
2021-03-09 19:19:47 +01:00
parityDrives := len ( storageDisks ) / 2
if ! opts . MaxParity {
// Get parity and data drive count based on storage class metadata
2022-05-05 04:14:41 -07:00
parityDrives = globalStorageClass . GetParityForSC ( userDefined [ xhttp . AmzStorageClass ] )
2022-06-27 20:22:18 -07:00
if parityDrives < 0 {
2021-03-09 19:19:47 +01:00
parityDrives = er . defaultParityCount
}
2021-05-27 20:38:09 +02:00
// If we have offline disks upgrade the number of erasure codes for this object.
2021-05-27 13:38:04 -07:00
parityOrig := parityDrives
2021-07-08 01:04:37 -07:00
atomicParityDrives := uatomic . NewInt64 ( 0 )
// Start with current parityDrives
atomicParityDrives . Store ( int64 ( parityDrives ) )
var wg sync . WaitGroup
2021-05-27 20:38:09 +02:00
for _ , disk := range storageDisks {
if disk == nil {
2021-07-08 01:04:37 -07:00
atomicParityDrives . Inc ( )
2021-06-04 18:38:19 +02:00
continue
2021-05-27 20:38:09 +02:00
}
2021-07-08 01:04:37 -07:00
if ! disk . IsOnline ( ) {
atomicParityDrives . Inc ( )
continue
2021-05-27 20:38:09 +02:00
}
2021-07-08 01:04:37 -07:00
wg . Add ( 1 )
go func ( disk StorageAPI ) {
defer wg . Done ( )
di , err := disk . DiskInfo ( ctx )
if err != nil || di . ID == "" {
atomicParityDrives . Inc ( )
}
} ( disk )
}
wg . Wait ( )
parityDrives = int ( atomicParityDrives . Load ( ) )
if parityDrives >= len ( storageDisks ) / 2 {
parityDrives = len ( storageDisks ) / 2
2021-05-27 20:38:09 +02:00
}
2021-05-27 13:38:04 -07:00
if parityOrig != parityDrives {
2022-05-05 04:14:41 -07:00
userDefined [ minIOErasureUpgraded ] = strconv . Itoa ( parityOrig ) + "->" + strconv . Itoa ( parityDrives )
2021-05-27 20:38:09 +02:00
}
2019-10-06 22:50:24 -07:00
}
dataDrives := len ( storageDisks ) - parityDrives
2018-01-29 18:43:13 -08:00
// we now know the number of blocks this object needs for data and parity.
// writeQuorum is dataBlocks + 1
2020-06-09 19:19:03 -07:00
writeQuorum := dataDrives
if dataDrives == parityDrives {
2021-01-16 12:08:02 -08:00
writeQuorum ++
2020-06-09 19:19:03 -07:00
}
2018-01-29 18:43:13 -08:00
2017-10-06 09:38:01 -07:00
// Validate input data size and it can never be less than zero.
2018-09-28 09:06:17 +05:30
if data . Size ( ) < - 1 {
2019-10-11 18:50:54 -07:00
logger . LogIf ( ctx , errInvalidArgument , logger . Application )
2018-04-05 15:04:40 -07:00
return ObjectInfo { } , toObjectErr ( errInvalidArgument )
2017-10-06 09:38:01 -07:00
}
2017-01-31 00:44:42 +01:00
// Initialize parts metadata
2020-12-07 10:04:07 -08:00
partsMetadata := make ( [ ] FileInfo , len ( storageDisks ) )
2020-06-12 20:04:01 -07:00
2021-02-25 10:11:31 -08:00
fi := newFileInfo ( pathJoin ( bucket , object ) , dataDrives , parityDrives )
2021-04-19 10:30:42 -07:00
fi . VersionID = opts . VersionID
if opts . Versioned && fi . VersionID == "" {
fi . VersionID = mustGetUUID ( )
2020-06-12 20:04:01 -07:00
}
2021-05-15 19:54:07 -07:00
2020-06-12 20:04:01 -07:00
fi . DataDir = mustGetUUID ( )
2022-08-31 17:13:23 +02:00
fi . Checksum = opts . WantChecksum . AppendTo ( nil )
if opts . EncryptFn != nil {
fi . Checksum = opts . EncryptFn ( "object-checksum" , fi . Checksum )
}
2021-05-15 19:54:07 -07:00
uniqueID := mustGetUUID ( )
tempObj := uniqueID
2016-07-13 11:56:25 -07:00
2020-06-12 20:04:01 -07:00
// Initialize erasure metadata.
2017-01-31 00:44:42 +01:00
for index := range partsMetadata {
2020-06-12 20:04:01 -07:00
partsMetadata [ index ] = fi
2017-01-31 00:44:42 +01:00
}
// Order disks according to erasure distribution
2020-10-28 00:09:15 -07:00
var onlineDisks [ ] StorageAPI
2021-04-21 19:06:08 -07:00
onlineDisks , partsMetadata = shuffleDisksAndPartsMetadata ( storageDisks , partsMetadata , fi )
2017-01-31 00:44:42 +01:00
2020-06-12 20:04:01 -07:00
erasure , err := NewErasure ( ctx , fi . Erasure . DataBlocks , fi . Erasure . ParityBlocks , fi . Erasure . BlockSize )
2017-08-14 18:08:42 -07:00
if err != nil {
return ObjectInfo { } , toObjectErr ( err , bucket , object )
}
2017-10-06 09:38:01 -07:00
2018-02-15 17:45:57 -08:00
// Fetch buffer for I/O, returns from the pool if not allocates a new one and returns.
2018-06-13 11:55:12 -07:00
var buffer [ ] byte
switch size := data . Size ( ) ; {
case size == 0 :
buffer = make ( [ ] byte , 1 ) // Allocate atleast a byte to reach EOF
2022-07-13 07:52:15 -07:00
case size >= fi . Erasure . BlockSize || size == - 1 :
2020-06-12 20:04:01 -07:00
buffer = er . bp . Get ( )
defer er . bp . Put ( buffer )
case size < fi . Erasure . BlockSize :
2018-06-13 11:55:12 -07:00
// No need to allocate fully blockSizeV1 buffer if the incoming data is smaller.
2020-06-12 20:04:01 -07:00
buffer = make ( [ ] byte , size , 2 * size + int64 ( fi . Erasure . ParityBlocks + fi . Erasure . DataBlocks - 1 ) )
2018-06-13 11:55:12 -07:00
}
2017-10-06 09:38:01 -07:00
2020-06-12 20:04:01 -07:00
if len ( buffer ) > int ( fi . Erasure . BlockSize ) {
buffer = buffer [ : fi . Erasure . BlockSize ]
2018-08-06 15:14:08 -07:00
}
2019-05-14 12:33:18 -07:00
partName := "part.1"
2020-06-12 20:04:01 -07:00
tempErasureObj := pathJoin ( uniqueID , fi . DataDir , partName )
2017-01-31 00:44:42 +01:00
2021-03-24 14:19:52 -07:00
// Delete temporary object in the event of failure.
// If PutObject succeeded there would be no temporary
// object to delete.
var online int
defer func ( ) {
if online != len ( onlineDisks ) {
2022-11-01 08:00:02 -07:00
er . deleteAll ( context . Background ( ) , minioMetaTmpBucket , tempObj )
2021-03-24 14:19:52 -07:00
}
} ( )
2021-03-31 18:19:14 +02:00
shardFileSize := erasure . ShardFileSize ( data . Size ( ) )
2019-05-14 12:33:18 -07:00
writers := make ( [ ] io . Writer , len ( onlineDisks ) )
2021-03-30 02:00:55 +02:00
var inlineBuffers [ ] * bytes . Buffer
2021-03-31 18:19:14 +02:00
if shardFileSize >= 0 {
if ! opts . Versioned && shardFileSize < smallFileThreshold {
inlineBuffers = make ( [ ] * bytes . Buffer , len ( onlineDisks ) )
} else if shardFileSize < smallFileThreshold / 8 {
inlineBuffers = make ( [ ] * bytes . Buffer , len ( onlineDisks ) )
}
2021-08-12 19:05:24 +02:00
} else {
// If compressed, use actual size to determine.
if sz := erasure . ShardFileSize ( data . ActualSize ( ) ) ; sz > 0 {
if ! opts . Versioned && sz < smallFileThreshold {
inlineBuffers = make ( [ ] * bytes . Buffer , len ( onlineDisks ) )
} else if sz < smallFileThreshold / 8 {
inlineBuffers = make ( [ ] * bytes . Buffer , len ( onlineDisks ) )
}
}
2021-03-30 02:00:55 +02:00
}
2019-05-14 12:33:18 -07:00
for i , disk := range onlineDisks {
if disk == nil {
continue
2018-09-28 09:06:17 +05:30
}
2021-03-30 02:00:55 +02:00
2022-01-12 18:49:01 -08:00
if ! disk . IsOnline ( ) {
continue
}
2021-03-30 02:00:55 +02:00
if len ( inlineBuffers ) > 0 {
2021-08-12 19:05:24 +02:00
sz := shardFileSize
if sz < 0 {
sz = data . ActualSize ( )
}
inlineBuffers [ i ] = bytes . NewBuffer ( make ( [ ] byte , 0 , sz ) )
2021-03-30 02:00:55 +02:00
writers [ i ] = newStreamingBitrotWriterBuffer ( inlineBuffers [ i ] , DefaultBitrotAlgorithm , erasure . ShardSize ( ) )
continue
}
2022-01-12 18:49:01 -08:00
2021-05-17 17:32:28 +02:00
writers [ i ] = newBitrotWriter ( disk , minioMetaTmpBucket , tempErasureObj , shardFileSize , DefaultBitrotAlgorithm , erasure . ShardSize ( ) )
2019-05-14 12:33:18 -07:00
}
2018-09-28 09:06:17 +05:30
2022-01-14 10:01:25 -08:00
toEncode := io . Reader ( data )
if data . Size ( ) > bigFileThreshold {
// We use 2 buffers, so we always have a full buffer of input.
bufA := er . bp . Get ( )
bufB := er . bp . Get ( )
defer er . bp . Put ( bufA )
defer er . bp . Put ( bufB )
ra , err := readahead . NewReaderBuffer ( data , [ ] [ ] byte { bufA [ : fi . Erasure . BlockSize ] , bufB [ : fi . Erasure . BlockSize ] } )
if err == nil {
toEncode = ra
defer ra . Close ( )
}
logger . LogIf ( ctx , err )
}
n , erasureErr := erasure . Encode ( ctx , toEncode , writers , buffer , writeQuorum )
2019-05-14 12:33:18 -07:00
closeBitrotWriters ( writers )
if erasureErr != nil {
return ObjectInfo { } , toObjectErr ( erasureErr , minioMetaTmpBucket , tempErasureObj )
}
2017-08-14 18:08:42 -07:00
2019-05-14 12:33:18 -07:00
// Should return IncompleteBody{} error when reader has fewer bytes
// than specified in request header.
if n < data . Size ( ) {
2020-09-08 14:22:04 -07:00
return ObjectInfo { } , IncompleteBody { Bucket : bucket , Object : object }
2019-05-14 12:33:18 -07:00
}
2017-01-31 15:34:49 -08:00
2022-07-11 17:30:56 -07:00
var compIndex [ ] byte
if opts . IndexCB != nil {
compIndex = opts . IndexCB ( )
}
2020-12-24 15:02:02 -08:00
if ! opts . NoLock {
lk := er . NewNSLock ( bucket , object )
2021-04-29 20:55:21 -07:00
lkctx , err := lk . GetLock ( ctx , globalOperationTimeout )
2021-03-04 03:36:43 +01:00
if err != nil {
2020-12-24 15:02:02 -08:00
return ObjectInfo { } , err
}
2021-04-29 20:55:21 -07:00
ctx = lkctx . Context ( )
defer lk . Unlock ( lkctx . Cancel )
2020-09-15 20:44:48 -07:00
}
2022-07-19 18:56:24 -07:00
modTime := opts . MTime
if opts . MTime . IsZero ( ) {
modTime = UTCNow ( )
}
2019-05-14 12:33:18 -07:00
for i , w := range writers {
if w == nil {
onlineDisks [ i ] = nil
continue
2017-01-31 00:44:42 +01:00
}
2021-03-30 02:00:55 +02:00
if len ( inlineBuffers ) > 0 && inlineBuffers [ i ] != nil {
partsMetadata [ i ] . Data = inlineBuffers [ i ] . Bytes ( )
2021-04-21 19:06:08 -07:00
} else {
partsMetadata [ i ] . Data = nil
2021-03-30 02:00:55 +02:00
}
2022-08-30 01:57:16 +02:00
// No need to add checksum to part. We already have it on the object.
partsMetadata [ i ] . AddObjectPart ( 1 , "" , n , data . ActualSize ( ) , modTime , compIndex , nil )
2020-03-03 03:29:30 +03:00
partsMetadata [ i ] . Erasure . AddChecksumInfo ( ChecksumInfo {
PartNumber : 1 ,
Algorithm : DefaultBitrotAlgorithm ,
Hash : bitrotWriterSum ( w ) ,
} )
2016-07-18 19:06:48 -07:00
}
2022-07-16 19:35:24 -07:00
2022-09-14 18:44:04 -07:00
userDefined [ "etag" ] = r . MD5CurrentHexString ( )
if opts . PreserveETag != "" {
userDefined [ "etag" ] = opts . PreserveETag
2020-08-12 17:32:24 -07:00
}
2016-06-01 16:43:31 -07:00
2016-06-16 21:42:02 -07:00
// Guess content-type from the extension if possible.
2022-05-05 04:14:41 -07:00
if userDefined [ "content-type" ] == "" {
userDefined [ "content-type" ] = mimedb . TypeByExtension ( path . Ext ( object ) )
2016-05-20 20:48:47 -07:00
}
2016-05-26 19:55:48 -07:00
// Fill all the necessary metadata.
2020-06-12 20:04:01 -07:00
// Update `xl.meta` content on each disks.
2016-05-31 20:23:31 -07:00
for index := range partsMetadata {
2022-05-05 04:14:41 -07:00
partsMetadata [ index ] . Metadata = userDefined
2020-06-12 20:04:01 -07:00
partsMetadata [ index ] . Size = n
partsMetadata [ index ] . ModTime = modTime
2016-05-31 20:23:31 -07:00
}
2021-05-25 16:33:06 -07:00
if len ( inlineBuffers ) > 0 {
// Set an additional header when data is inlined.
for index := range partsMetadata {
2021-07-16 09:38:27 -07:00
partsMetadata [ index ] . SetInlineData ( )
2021-05-25 16:33:06 -07:00
}
}
2021-04-01 22:12:03 -07:00
// Rename the successfully written temporary object to final location.
2022-09-07 07:25:39 -07:00
onlineDisks , err = renameData ( ctx , onlineDisks , minioMetaTmpBucket , tempObj , partsMetadata , bucket , object , writeQuorum )
2020-01-16 03:30:32 +01:00
for i := 0 ; i < len ( onlineDisks ) ; i ++ {
2020-12-22 09:16:43 -08:00
if onlineDisks [ i ] != nil && onlineDisks [ i ] . IsOnline ( ) {
2021-07-16 06:32:06 +01:00
// Object info is the same in all disks, so we can pick
// the first meta from online disk
fi = partsMetadata [ i ]
break
2020-01-16 03:30:32 +01:00
}
}
2022-02-23 11:59:13 -08:00
// For speedtest objects do not attempt to heal them.
if ! opts . Speedtest {
// Whether a disk was initially or becomes offline
// during this upload, send it to the MRF list.
for i := 0 ; i < len ( onlineDisks ) ; i ++ {
if onlineDisks [ i ] != nil && onlineDisks [ i ] . IsOnline ( ) {
continue
}
er . addPartial ( bucket , object , fi . VersionID , fi . Size )
break
2020-06-12 20:04:01 -07:00
}
}
2021-07-16 06:32:06 +01:00
2022-09-07 07:25:39 -07:00
healEntry := func ( entry metaCacheEntry ) error {
if entry . isDir ( ) {
return nil
}
// We might land at .metacache, .trash, .multipart
// no need to heal them skip, only when bucket
// is '.minio.sys'
if bucket == minioMetaBucket {
if wildcard . Match ( "buckets/*/.metacache/*" , entry . name ) {
return nil
}
if wildcard . Match ( "tmp/*" , entry . name ) {
return nil
}
if wildcard . Match ( "multipart/*" , entry . name ) {
return nil
}
if wildcard . Match ( "tmp-old/*" , entry . name ) {
return nil
}
}
2022-11-14 18:35:26 -08:00
2022-09-07 07:25:39 -07:00
fivs , err := entry . fileInfoVersions ( bucket )
if err != nil {
2022-11-14 18:35:26 -08:00
healObject ( bucket , entry . name , "" , madmin . HealNormalScan )
2022-09-07 07:25:39 -07:00
return err
}
2022-11-14 18:35:26 -08:00
if len ( fivs . Versions ) > 2 {
return fmt . Errorf ( "bucket(%s)/object(%s) object with %d versions needs healing, allowing lazy healing via scanner instead here for versions greater than 2" ,
bucket , entry . name ,
len ( fivs . Versions ) )
}
2022-09-07 07:25:39 -07:00
for _ , version := range fivs . Versions {
2022-11-14 18:35:26 -08:00
healObject ( bucket , entry . name , version . VersionID , madmin . HealNormalScan )
2022-09-07 07:25:39 -07:00
}
return nil
}
if err != nil {
if errors . Is ( err , errFileNotFound ) {
return ObjectInfo { } , toObjectErr ( errErasureWriteQuorum , bucket , object )
}
if errors . Is ( err , errFileNeedsHealing ) && ! opts . Speedtest {
listAndHeal ( ctx , bucket , object , & er , healEntry )
} else {
logger . LogIf ( ctx , err )
return ObjectInfo { } , toObjectErr ( err , bucket , object )
}
}
defer NSUpdated ( bucket , object )
2021-09-18 16:31:35 -04:00
fi . ReplicationState = opts . PutReplicationState ( )
2021-03-24 14:19:52 -07:00
online = countOnlineDisks ( onlineDisks )
2020-06-12 20:04:01 -07:00
2021-09-22 19:17:09 -07:00
// we are adding a new version to this object under the namespace lock, so this is the latest version.
fi . IsLatest = true
2022-05-31 02:57:57 -07:00
return fi . ToObjectInfo ( bucket , object , opts . Versioned || opts . VersionSuspended ) , nil
2020-06-12 20:04:01 -07:00
}
2022-08-18 16:41:59 -07:00
func ( er erasureObjects ) deleteObjectVersion ( ctx context . Context , bucket , object string , fi FileInfo , forceDelMarker bool ) error {
2020-06-12 20:04:01 -07:00
disks := er . getDisks ( )
2022-08-18 16:41:59 -07:00
// Assume (N/2 + 1) quorum for Delete()
// this is a theoretical assumption such that
// for delete's we do not need to honor storage
// class for objects that have reduced quorum
// due to storage class - this only needs to be honored
// for Read() requests alone that we already do.
writeQuorum := len ( disks ) / 2 + 1
2020-06-12 20:04:01 -07:00
g := errgroup . WithNErrs ( len ( disks ) )
for index := range disks {
index := index
g . Go ( func ( ) error {
if disks [ index ] == nil {
return errDiskNotFound
}
2021-02-03 19:33:43 +01:00
return disks [ index ] . DeleteVersion ( ctx , bucket , object , fi , forceDelMarker )
2020-06-12 20:04:01 -07:00
} , index )
2016-09-03 00:48:35 +05:30
}
2020-06-12 20:04:01 -07:00
// return errors if any during deletion
return reduceWriteQuorumErrs ( ctx , g . Wait ( ) , objectOpIgnoredErrs , writeQuorum )
2016-05-20 20:48:47 -07:00
}
2020-06-12 20:04:01 -07:00
// DeleteObjects deletes objects/versions in bulk, this function will still automatically split objects list
// into smaller bulks if some object names are found to be duplicated in the delete list, splitting
// into smaller bulks will avoid holding twice the write lock of the duplicated object names.
func ( er erasureObjects ) DeleteObjects ( ctx context . Context , bucket string , objects [ ] ObjectToDelete , opts ObjectOptions ) ( [ ] DeletedObject , [ ] error ) {
2022-05-04 08:45:27 +01:00
for _ , obj := range objects {
auditObjectErasureSet ( ctx , obj . ObjectV . ObjectName , & er )
}
2020-06-12 20:04:01 -07:00
errs := make ( [ ] error , len ( objects ) )
dobjects := make ( [ ] DeletedObject , len ( objects ) )
writeQuorums := make ( [ ] int , len ( objects ) )
2020-04-27 19:06:21 +02:00
2020-06-12 20:04:01 -07:00
storageDisks := er . getDisks ( )
for i := range objects {
// Assume (N/2 + 1) quorums for all objects
// this is a theoretical assumption such that
// for delete's we do not need to honor storage
// class for objects which have reduced quorum
// storage class only needs to be honored for
// Read() requests alone which we already do.
2022-01-27 17:00:15 -08:00
writeQuorums [ i ] = len ( storageDisks ) / 2 + 1
2019-05-13 20:25:49 +01:00
}
2021-11-01 10:50:07 -07:00
versionsMap := make ( map [ string ] FileInfoVersions , len ( objects ) )
2020-06-12 20:04:01 -07:00
for i := range objects {
2021-11-01 10:50:07 -07:00
// Construct the FileInfo data that needs to be preserved on the disk.
vr := FileInfo {
Name : objects [ i ] . ObjectName ,
VersionID : objects [ i ] . VersionID ,
ReplicationState : objects [ i ] . ReplicationState ( ) ,
// save the index to set correct error at this index.
Idx : i ,
}
vr . SetTierFreeVersionID ( mustGetUUID ( ) )
// VersionID is not set means delete is not specific about
// any version, look for if the bucket is versioned or not.
2020-06-12 20:04:01 -07:00
if objects [ i ] . VersionID == "" {
2022-05-06 19:05:28 -07:00
// MinIO extension to bucket version configuration
suspended := opts . VersionSuspended
versioned := opts . Versioned
if opts . PrefixEnabledFn != nil {
versioned = opts . PrefixEnabledFn ( objects [ i ] . ObjectName )
}
if versioned || suspended {
2021-11-01 10:50:07 -07:00
// Bucket is versioned and no version was explicitly
// mentioned for deletes, create a delete marker instead.
vr . ModTime = UTCNow ( )
vr . Deleted = true
// Versioning suspended means that we add a `null` version
// delete marker, if not add a new version for this delete
// marker.
2022-05-07 22:06:44 -07:00
if versioned {
2021-11-01 10:50:07 -07:00
vr . VersionID = mustGetUUID ( )
2020-06-12 20:04:01 -07:00
}
}
}
2021-11-01 10:50:07 -07:00
// De-dup same object name to collect multiple versions for same object.
v , ok := versionsMap [ objects [ i ] . ObjectName ]
if ok {
v . Versions = append ( v . Versions , vr )
} else {
v = FileInfoVersions {
Name : vr . Name ,
Versions : [ ] FileInfo { vr } ,
}
2020-06-12 20:04:01 -07:00
}
2021-11-01 10:50:07 -07:00
if vr . Deleted {
dobjects [ i ] = DeletedObject {
DeleteMarker : vr . Deleted ,
DeleteMarkerVersionID : vr . VersionID ,
DeleteMarkerMTime : DeleteMarkerMTime { vr . ModTime } ,
ObjectName : vr . Name ,
ReplicationState : vr . ReplicationState ,
}
} else {
dobjects [ i ] = DeletedObject {
ObjectName : vr . Name ,
VersionID : vr . VersionID ,
ReplicationState : vr . ReplicationState ,
}
}
versionsMap [ objects [ i ] . ObjectName ] = v
}
dedupVersions := make ( [ ] FileInfoVersions , 0 , len ( versionsMap ) )
for _ , version := range versionsMap {
dedupVersions = append ( dedupVersions , version )
2020-06-12 20:04:01 -07:00
}
2020-03-06 13:44:24 -08:00
2019-05-13 20:25:49 +01:00
// Initialize list of errors.
2022-01-02 09:15:06 -08:00
delObjErrs := make ( [ ] [ ] error , len ( storageDisks ) )
2020-06-12 20:04:01 -07:00
var wg sync . WaitGroup
// Remove versions in bulk for each disk
for index , disk := range storageDisks {
2020-03-11 08:56:36 -07:00
wg . Add ( 1 )
go func ( index int , disk StorageAPI ) {
defer wg . Done ( )
2021-11-01 10:50:07 -07:00
delObjErrs [ index ] = make ( [ ] error , len ( objects ) )
2020-11-28 21:15:45 -08:00
if disk == nil {
2021-11-01 10:50:07 -07:00
for i := range objects {
2020-11-28 21:15:45 -08:00
delObjErrs [ index ] [ i ] = errDiskNotFound
}
return
}
2021-11-01 10:50:07 -07:00
errs := disk . DeleteVersions ( ctx , bucket , dedupVersions )
for i , err := range errs {
if err == nil {
continue
}
for _ , v := range dedupVersions [ i ] . Versions {
2022-01-06 10:47:49 -08:00
if err == errFileNotFound || err == errFileVersionNotFound {
if ! dobjects [ v . Idx ] . DeleteMarker {
// Not delete marker, if not found, ok.
continue
}
}
2021-11-01 10:50:07 -07:00
delObjErrs [ index ] [ v . Idx ] = err
}
}
2020-06-12 20:04:01 -07:00
} ( index , disk )
2019-05-13 20:25:49 +01:00
}
2020-03-11 08:56:36 -07:00
wg . Wait ( )
2019-05-13 20:25:49 +01:00
// Reduce errors for each object
for objIndex := range objects {
2020-06-12 20:04:01 -07:00
diskErrs := make ( [ ] error , len ( storageDisks ) )
2020-03-11 08:56:36 -07:00
// Iterate over disks to fetch the error
// of deleting of the current object
2019-05-13 20:25:49 +01:00
for i := range delObjErrs {
2020-03-11 08:56:36 -07:00
// delObjErrs[i] is not nil when disks[i] is also not nil
2019-05-13 20:25:49 +01:00
if delObjErrs [ i ] != nil {
2020-09-02 00:19:03 -07:00
diskErrs [ i ] = delObjErrs [ i ] [ objIndex ]
2019-05-13 20:25:49 +01:00
}
}
2020-11-28 21:15:45 -08:00
err := reduceWriteQuorumErrs ( ctx , diskErrs , objectOpIgnoredErrs , writeQuorums [ objIndex ] )
if objects [ objIndex ] . VersionID != "" {
errs [ objIndex ] = toObjectErr ( err , bucket , objects [ objIndex ] . ObjectName , objects [ objIndex ] . VersionID )
} else {
errs [ objIndex ] = toObjectErr ( err , bucket , objects [ objIndex ] . ObjectName )
}
2022-01-27 17:00:15 -08:00
defer NSUpdated ( bucket , objects [ objIndex ] . ObjectName )
2019-05-13 20:25:49 +01:00
}
2020-06-29 13:07:26 -07:00
// Check failed deletes across multiple objects
2021-11-01 10:50:07 -07:00
for i , dobj := range dobjects {
// This object errored, no need to attempt a heal.
if errs [ i ] != nil {
continue
}
2020-06-29 13:07:26 -07:00
// Check if there is any offline disk and add it to the MRF list
for _ , disk := range storageDisks {
2020-12-22 09:16:43 -08:00
if disk != nil && disk . IsOnline ( ) {
// Skip attempted heal on online disks.
continue
2020-06-29 13:07:26 -07:00
}
2020-12-22 09:16:43 -08:00
// all other direct versionId references we should
// ensure no dangling file is left over.
2021-11-01 10:50:07 -07:00
er . addPartial ( bucket , dobj . ObjectName , dobj . VersionID , - 1 )
2020-12-22 09:16:43 -08:00
break
2020-06-29 13:07:26 -07:00
}
}
2020-06-12 20:04:01 -07:00
return dobjects , errs
2019-05-13 20:25:49 +01:00
}
2021-06-16 02:43:14 +01:00
func ( er erasureObjects ) deletePrefix ( ctx context . Context , bucket , prefix string ) error {
disks := er . getDisks ( )
g := errgroup . WithNErrs ( len ( disks ) )
2022-05-30 10:58:37 -07:00
dirPrefix := encodeDirObject ( prefix )
2021-06-16 02:43:14 +01:00
for index := range disks {
index := index
g . Go ( func ( ) error {
if disks [ index ] == nil {
return nil
}
2022-05-30 10:58:37 -07:00
// Deletes
// - The prefix and its children
// - The prefix__XLDIR__
2022-07-11 21:45:54 +05:30
defer disks [ index ] . Delete ( ctx , bucket , dirPrefix , DeleteOptions {
Recursive : true ,
Force : true ,
} )
return disks [ index ] . Delete ( ctx , bucket , prefix , DeleteOptions {
Recursive : true ,
Force : true ,
} )
2021-06-16 02:43:14 +01:00
} , index )
}
for _ , err := range g . Wait ( ) {
if err != nil {
return err
}
}
return nil
}
2016-06-01 16:43:31 -07:00
// DeleteObject - deletes an object, this call doesn't necessary reply
// any error as it is not necessary for the handler to reply back a
// response to the client request.
2020-06-12 20:04:01 -07:00
func ( er erasureObjects ) DeleteObject ( ctx context . Context , bucket , object string , opts ObjectOptions ) ( objInfo ObjectInfo , err error ) {
2022-05-04 08:45:27 +01:00
auditObjectErasureSet ( ctx , object , & er )
2021-06-16 02:43:14 +01:00
if opts . DeletePrefix {
return ObjectInfo { } , toObjectErr ( er . deletePrefix ( ctx , bucket , object ) , bucket , object )
}
2021-08-27 17:06:47 -07:00
var lc * lifecycle . Lifecycle
2022-04-11 13:25:32 -07:00
var rcfg lock . Retention
2021-08-27 17:06:47 -07:00
if opts . Expiration . Expire {
// Check if the current bucket has a configured lifecycle policy
lc , _ = globalLifecycleSys . Get ( bucket )
2022-04-11 13:25:32 -07:00
rcfg , _ = globalBucketObjectLockSys . Get ( bucket )
2021-08-27 17:06:47 -07:00
}
// expiration attempted on a bucket with no lifecycle
// rules shall be rejected.
if lc == nil && opts . Expiration . Expire {
if opts . VersionID != "" {
return objInfo , VersionNotFound {
Bucket : bucket ,
Object : object ,
VersionID : opts . VersionID ,
}
}
return objInfo , ObjectNotFound {
Bucket : bucket ,
Object : object ,
}
}
2020-12-11 16:58:15 -08:00
objInfo = ObjectInfo { VersionID : opts . VersionID } // version id needed in Delete API response.
2022-08-18 16:41:59 -07:00
defer NSUpdated ( bucket , object )
storageDisks := er . getDisks ( )
2021-05-17 08:25:48 -07:00
2022-10-26 16:09:27 -07:00
// Determine whether to mark object deleted for replication
var markDelete bool
2021-08-27 17:06:47 -07:00
if opts . Expiration . Expire {
2022-08-18 16:41:59 -07:00
goi , _ , err := er . getObjectInfoAndQuorum ( ctx , bucket , object , opts )
if err == nil {
2022-10-21 18:46:53 +01:00
evt := evalActionFromLifecycle ( ctx , * lc , rcfg , goi )
2022-08-18 16:41:59 -07:00
var isErr bool
2022-10-21 18:46:53 +01:00
switch evt . Action {
2022-08-18 16:41:59 -07:00
case lifecycle . NoneAction :
isErr = true
case lifecycle . TransitionAction , lifecycle . TransitionVersionAction :
isErr = true
2021-08-27 17:06:47 -07:00
}
2022-08-18 16:41:59 -07:00
if isErr {
if goi . VersionID != "" {
return goi , VersionNotFound {
Bucket : bucket ,
Object : object ,
VersionID : goi . VersionID ,
}
}
return goi , ObjectNotFound {
Bucket : bucket ,
Object : object ,
}
2021-08-27 17:06:47 -07:00
}
2022-10-26 16:09:27 -07:00
markDelete = goi . VersionID != ""
2021-08-27 17:06:47 -07:00
}
2020-09-14 15:57:13 -07:00
}
2021-08-27 17:06:47 -07:00
2022-08-18 16:41:59 -07:00
versionFound := ! ( opts . DeleteMarker && opts . VersionID != "" )
2021-09-01 08:57:42 -07:00
2022-10-26 16:09:27 -07:00
if ! markDelete {
markDelete = ! opts . DeleteMarker && opts . VersionID != ""
}
2020-11-19 18:43:58 -08:00
// Default deleteMarker to true if object is under versioning
2022-08-18 16:41:59 -07:00
// versioning suspended means we add `null` version as
// delete marker, if its not decided already.
deleteMarker := ( opts . Versioned || opts . VersionSuspended ) && opts . VersionID == "" || ( opts . DeleteMarker && opts . VersionID != "" )
2021-02-09 15:11:43 -08:00
2022-10-26 16:09:27 -07:00
if markDelete && opts . VersionID != "" {
2020-11-19 18:43:58 -08:00
// case where replica version needs to be deleted on target cluster
2021-09-18 16:31:35 -04:00
if versionFound && opts . DeleteMarkerReplicationStatus ( ) == replication . Replica {
2020-11-19 18:43:58 -08:00
markDelete = false
}
2021-09-18 16:31:35 -04:00
if opts . VersionPurgeStatus ( ) . Empty ( ) && opts . DeleteMarkerReplicationStatus ( ) . Empty ( ) {
2020-11-19 18:43:58 -08:00
markDelete = false
}
2021-09-18 16:31:35 -04:00
if opts . VersionPurgeStatus ( ) == Complete {
2020-11-19 18:43:58 -08:00
markDelete = false
}
}
2018-08-29 13:36:19 -07:00
2020-11-19 18:43:58 -08:00
modTime := opts . MTime
if opts . MTime . IsZero ( ) {
modTime = UTCNow ( )
}
2021-06-30 19:32:07 -07:00
fvID := mustGetUUID ( )
2022-08-22 15:59:06 -07:00
2022-08-18 16:41:59 -07:00
if markDelete && ( opts . Versioned || opts . VersionSuspended ) {
fi := FileInfo {
Name : object ,
2022-08-22 15:59:06 -07:00
Deleted : true ,
2022-08-18 16:41:59 -07:00
MarkDeleted : markDelete ,
ModTime : modTime ,
ReplicationState : opts . DeleteReplication ,
TransitionStatus : opts . Transition . Status ,
ExpireRestored : opts . Transition . ExpireRestored ,
}
fi . SetTierFreeVersionID ( fvID )
if opts . Versioned {
fi . VersionID = mustGetUUID ( )
if opts . VersionID != "" {
fi . VersionID = opts . VersionID
2020-06-12 20:04:01 -07:00
}
2018-08-31 22:16:35 +02:00
}
2022-08-18 16:41:59 -07:00
// versioning suspended means we add `null` version as
// delete marker. Add delete marker, since we don't have
// any version specified explicitly. Or if a particular
// version id needs to be replicated.
if err = er . deleteObjectVersion ( ctx , bucket , object , fi , opts . DeleteMarker ) ; err != nil {
return objInfo , toObjectErr ( err , bucket , object )
}
return fi . ToObjectInfo ( bucket , object , opts . Versioned || opts . VersionSuspended ) , nil
2018-08-29 13:36:19 -07:00
}
2016-06-17 10:48:43 +05:30
2020-06-12 20:04:01 -07:00
// Delete the object version on all disks.
2021-06-30 19:32:07 -07:00
dfi := FileInfo {
2021-09-18 16:31:35 -04:00
Name : object ,
VersionID : opts . VersionID ,
MarkDeleted : markDelete ,
Deleted : deleteMarker ,
ModTime : modTime ,
ReplicationState : opts . DeleteReplication ,
TransitionStatus : opts . Transition . Status ,
ExpireRestored : opts . Transition . ExpireRestored ,
2021-06-30 19:32:07 -07:00
}
dfi . SetTierFreeVersionID ( fvID )
2022-08-18 16:41:59 -07:00
if err = er . deleteObjectVersion ( ctx , bucket , object , dfi , opts . DeleteMarker ) ; err != nil {
2020-06-12 20:04:01 -07:00
return objInfo , toObjectErr ( err , bucket , object )
2016-06-08 00:05:03 +05:30
}
2020-06-29 13:07:26 -07:00
for _ , disk := range storageDisks {
2020-12-22 09:16:43 -08:00
if disk != nil && disk . IsOnline ( ) {
continue
2020-06-29 13:07:26 -07:00
}
2021-07-16 06:32:06 +01:00
er . addPartial ( bucket , object , opts . VersionID , - 1 )
2020-12-22 09:16:43 -08:00
break
2020-06-29 13:07:26 -07:00
}
2022-08-18 16:41:59 -07:00
return dfi . ToObjectInfo ( bucket , object , opts . Versioned || opts . VersionSuspended ) , nil
2018-02-09 15:19:30 -08:00
}
2020-01-16 03:30:32 +01:00
2020-06-29 13:07:26 -07:00
// Send the successful but partial upload/delete, however ignore
2020-01-16 03:30:32 +01:00
// if the channel is blocked by other items.
2021-07-16 06:32:06 +01:00
func ( er erasureObjects ) addPartial ( bucket , object , versionID string , size int64 ) {
globalMRFState . addPartialOp ( partialOperation {
bucket : bucket ,
object : object ,
versionID : versionID ,
size : size ,
setIndex : er . setIndex ,
poolIndex : er . poolIndex ,
} )
2020-01-16 03:30:32 +01:00
}
2020-01-20 22:15:59 +05:30
2021-04-04 13:32:31 -07:00
func ( er erasureObjects ) PutObjectMetadata ( ctx context . Context , bucket , object string , opts ObjectOptions ) ( ObjectInfo , error ) {
2021-12-21 10:08:26 -08:00
if ! opts . NoLock {
// Lock the object before updating metadata.
lk := er . NewNSLock ( bucket , object )
lkctx , err := lk . GetLock ( ctx , globalOperationTimeout )
if err != nil {
return ObjectInfo { } , err
}
ctx = lkctx . Context ( )
defer lk . Unlock ( lkctx . Cancel )
2021-02-01 22:52:51 +01:00
}
2020-06-12 20:04:01 -07:00
disks := er . getDisks ( )
2020-01-20 22:15:59 +05:30
2022-04-20 12:49:05 -07:00
var metaArr [ ] FileInfo
var errs [ ] error
2020-01-20 22:15:59 +05:30
// Read metadata associated with the object from all disks.
2022-04-20 12:49:05 -07:00
if opts . VersionID != "" {
metaArr , errs = readAllFileInfo ( ctx , disks , bucket , object , opts . VersionID , false )
} else {
metaArr , errs = readAllXL ( ctx , disks , bucket , object , false )
}
2020-06-12 20:04:01 -07:00
2021-04-04 13:32:31 -07:00
readQuorum , _ , err := objectQuorumFromMeta ( ctx , metaArr , errs , er . defaultParityCount )
2020-06-12 20:04:01 -07:00
if err != nil {
2022-04-20 12:49:05 -07:00
if errors . Is ( err , errErasureReadQuorum ) && ! strings . HasPrefix ( bucket , minioMetaBucket ) {
_ , derr := er . deleteIfDangling ( ctx , bucket , object , metaArr , errs , nil , opts )
if derr != nil {
err = derr
}
}
2021-02-01 22:52:51 +01:00
return ObjectInfo { } , toObjectErr ( err , bucket , object )
2020-06-12 20:04:01 -07:00
}
2020-01-20 22:15:59 +05:30
2020-06-12 20:04:01 -07:00
// List all online disks.
2021-11-21 10:41:30 -08:00
onlineDisks , modTime := listOnlineDisks ( disks , metaArr , errs )
2020-06-12 20:04:01 -07:00
// Pick latest valid metadata.
2021-11-21 10:41:30 -08:00
fi , err := pickValidFileInfo ( ctx , metaArr , modTime , readQuorum )
2020-01-20 22:15:59 +05:30
if err != nil {
2021-02-01 22:52:51 +01:00
return ObjectInfo { } , toObjectErr ( err , bucket , object )
2020-01-20 22:15:59 +05:30
}
2021-11-21 10:41:30 -08:00
2020-06-12 20:04:01 -07:00
if fi . Deleted {
2021-02-01 22:52:51 +01:00
return ObjectInfo { } , toObjectErr ( errMethodNotAllowed , bucket , object )
2020-06-12 20:04:01 -07:00
}
2021-11-21 10:41:30 -08:00
filterOnlineDisksInplace ( fi , metaArr , onlineDisks )
2021-10-30 08:22:04 -07:00
// if version-id is not specified retention is supposed to be set on the latest object.
if opts . VersionID == "" {
opts . VersionID = fi . VersionID
}
2022-05-31 02:57:57 -07:00
objInfo := fi . ToObjectInfo ( bucket , object , opts . Versioned || opts . VersionSuspended )
2021-10-30 08:22:04 -07:00
if opts . EvalMetadataFn != nil {
2022-12-02 17:35:04 +01:00
if err := opts . EvalMetadataFn ( & objInfo ) ; err != nil {
2021-10-30 08:22:04 -07:00
return ObjectInfo { } , err
}
}
for k , v := range objInfo . UserDefined {
2021-04-04 13:32:31 -07:00
fi . Metadata [ k ] = v
2021-04-01 22:12:03 -07:00
}
2021-04-04 13:32:31 -07:00
fi . ModTime = opts . MTime
fi . VersionID = opts . VersionID
2021-04-01 22:12:03 -07:00
2021-11-21 10:41:30 -08:00
if err = er . updateObjectMeta ( ctx , bucket , object , fi , onlineDisks ) ; err != nil {
2021-02-01 22:52:51 +01:00
return ObjectInfo { } , toObjectErr ( err , bucket , object )
2020-01-20 22:15:59 +05:30
}
2022-05-31 02:57:57 -07:00
return fi . ToObjectInfo ( bucket , object , opts . Versioned || opts . VersionSuspended ) , nil
2020-01-20 22:15:59 +05:30
}
2021-04-04 13:32:31 -07:00
// PutObjectTags - replace or add tags to an existing object
func ( er erasureObjects ) PutObjectTags ( ctx context . Context , bucket , object string , tags string , opts ObjectOptions ) ( ObjectInfo , error ) {
// Lock the object before updating tags.
lk := er . NewNSLock ( bucket , object )
2021-04-29 20:55:21 -07:00
lkctx , err := lk . GetLock ( ctx , globalOperationTimeout )
2021-04-04 13:32:31 -07:00
if err != nil {
return ObjectInfo { } , err
2020-10-28 09:18:35 -07:00
}
2021-04-29 20:55:21 -07:00
ctx = lkctx . Context ( )
defer lk . Unlock ( lkctx . Cancel )
2021-04-04 13:32:31 -07:00
2020-10-28 09:18:35 -07:00
disks := er . getDisks ( )
2022-04-20 12:49:05 -07:00
var metaArr [ ] FileInfo
var errs [ ] error
2020-10-28 09:18:35 -07:00
// Read metadata associated with the object from all disks.
2022-04-20 12:49:05 -07:00
if opts . VersionID != "" {
metaArr , errs = readAllFileInfo ( ctx , disks , bucket , object , opts . VersionID , false )
} else {
metaArr , errs = readAllXL ( ctx , disks , bucket , object , false )
}
2020-10-28 09:18:35 -07:00
2021-04-04 13:32:31 -07:00
readQuorum , _ , err := objectQuorumFromMeta ( ctx , metaArr , errs , er . defaultParityCount )
2020-10-28 09:18:35 -07:00
if err != nil {
2022-04-20 12:49:05 -07:00
if errors . Is ( err , errErasureReadQuorum ) && ! strings . HasPrefix ( bucket , minioMetaBucket ) {
_ , derr := er . deleteIfDangling ( ctx , bucket , object , metaArr , errs , nil , opts )
if derr != nil {
err = derr
}
}
2021-04-04 13:32:31 -07:00
return ObjectInfo { } , toObjectErr ( err , bucket , object )
2020-10-28 09:18:35 -07:00
}
// List all online disks.
2021-11-21 10:41:30 -08:00
onlineDisks , modTime := listOnlineDisks ( disks , metaArr , errs )
2020-10-28 09:18:35 -07:00
// Pick latest valid metadata.
2021-11-21 10:41:30 -08:00
fi , err := pickValidFileInfo ( ctx , metaArr , modTime , readQuorum )
2020-10-28 09:18:35 -07:00
if err != nil {
2021-04-04 13:32:31 -07:00
return ObjectInfo { } , toObjectErr ( err , bucket , object )
}
if fi . Deleted {
if opts . VersionID == "" {
return ObjectInfo { } , toObjectErr ( errFileNotFound , bucket , object )
}
return ObjectInfo { } , toObjectErr ( errMethodNotAllowed , bucket , object )
2020-10-28 09:18:35 -07:00
}
2021-11-21 10:41:30 -08:00
filterOnlineDisksInplace ( fi , metaArr , onlineDisks )
2021-04-04 13:32:31 -07:00
fi . Metadata [ xhttp . AmzObjectTagging ] = tags
2022-01-10 19:06:10 -08:00
fi . ReplicationState = opts . PutReplicationState ( )
2021-04-04 13:32:31 -07:00
for k , v := range opts . UserDefined {
2020-10-28 09:18:35 -07:00
fi . Metadata [ k ] = v
}
2021-11-21 10:41:30 -08:00
if err = er . updateObjectMeta ( ctx , bucket , object , fi , onlineDisks ) ; err != nil {
2021-04-04 13:32:31 -07:00
return ObjectInfo { } , toObjectErr ( err , bucket , object )
2020-10-28 09:18:35 -07:00
}
2022-05-31 02:57:57 -07:00
return fi . ToObjectInfo ( bucket , object , opts . Versioned || opts . VersionSuspended ) , nil
2021-04-04 13:32:31 -07:00
}
2020-10-28 09:18:35 -07:00
2021-04-04 13:32:31 -07:00
// updateObjectMeta will update the metadata of a file.
2021-11-21 10:41:30 -08:00
func ( er erasureObjects ) updateObjectMeta ( ctx context . Context , bucket , object string , fi FileInfo , onlineDisks [ ] StorageAPI ) error {
2021-04-04 13:32:31 -07:00
if len ( fi . Metadata ) == 0 {
return nil
2020-10-28 09:18:35 -07:00
}
2021-11-21 10:41:30 -08:00
g := errgroup . WithNErrs ( len ( onlineDisks ) )
2021-04-01 22:12:03 -07:00
2021-04-04 13:32:31 -07:00
// Start writing `xl.meta` to all disks in parallel.
2021-11-21 10:41:30 -08:00
for index := range onlineDisks {
2021-04-04 13:32:31 -07:00
index := index
g . Go ( func ( ) error {
2021-11-21 10:41:30 -08:00
if onlineDisks [ index ] == nil {
2021-04-04 13:32:31 -07:00
return errDiskNotFound
}
2021-11-21 10:41:30 -08:00
return onlineDisks [ index ] . UpdateMetadata ( ctx , bucket , object , fi )
2021-04-04 13:32:31 -07:00
} , index )
2021-04-01 22:12:03 -07:00
}
2021-04-04 13:32:31 -07:00
// Wait for all the routines.
mErrs := g . Wait ( )
2020-10-28 09:18:35 -07:00
2021-12-28 12:41:52 -08:00
return reduceWriteQuorumErrs ( ctx , mErrs , objectOpIgnoredErrs , er . defaultWQuorum ( ) )
2020-10-28 09:18:35 -07:00
}
2020-05-23 11:09:35 -07:00
// DeleteObjectTags - delete object tags from an existing object
2021-02-01 22:52:51 +01:00
func ( er erasureObjects ) DeleteObjectTags ( ctx context . Context , bucket , object string , opts ObjectOptions ) ( ObjectInfo , error ) {
2020-06-12 20:04:01 -07:00
return er . PutObjectTags ( ctx , bucket , object , "" , opts )
2020-01-20 22:15:59 +05:30
}
2020-05-23 11:09:35 -07:00
// GetObjectTags - get object tags from an existing object
2020-06-12 20:04:01 -07:00
func ( er erasureObjects ) GetObjectTags ( ctx context . Context , bucket , object string , opts ObjectOptions ) ( * tags . Tags , error ) {
2020-01-20 22:15:59 +05:30
// GetObjectInfo will return tag value as well
2020-06-12 20:04:01 -07:00
oi , err := er . GetObjectInfo ( ctx , bucket , object , opts )
2020-01-20 22:15:59 +05:30
if err != nil {
2020-05-05 21:18:13 +00:00
return nil , err
2020-01-20 22:15:59 +05:30
}
2020-05-05 21:18:13 +00:00
return tags . ParseObjectTags ( oi . UserTags )
2020-01-20 22:15:59 +05:30
}
2021-04-19 10:30:42 -07:00
// TransitionObject - transition object content to target tier.
func ( er erasureObjects ) TransitionObject ( ctx context . Context , bucket , object string , opts ObjectOptions ) error {
tgtClient , err := globalTierConfigMgr . getDriver ( opts . Transition . Tier )
if err != nil {
return err
}
2021-05-17 08:25:48 -07:00
2021-04-19 10:30:42 -07:00
// Acquire write lock before starting to transition the object.
lk := er . NewNSLock ( bucket , object )
2021-04-29 20:55:21 -07:00
lkctx , err := lk . GetLock ( ctx , globalDeleteOperationTimeout )
2021-04-19 10:30:42 -07:00
if err != nil {
return err
}
2021-04-29 20:55:21 -07:00
ctx = lkctx . Context ( )
defer lk . Unlock ( lkctx . Cancel )
2021-04-19 10:30:42 -07:00
fi , metaArr , onlineDisks , err := er . getObjectFileInfo ( ctx , bucket , object , opts , true )
if err != nil {
return toObjectErr ( err , bucket , object )
}
if fi . Deleted {
if opts . VersionID == "" {
return toObjectErr ( errFileNotFound , bucket , object )
}
// Make sure to return object info to provide extra information.
return toObjectErr ( errMethodNotAllowed , bucket , object )
}
// verify that the object queued for transition is identical to that on disk.
if ! opts . MTime . Equal ( fi . ModTime ) || ! strings . EqualFold ( opts . Transition . ETag , extractETag ( fi . Metadata ) ) {
return toObjectErr ( errFileNotFound , bucket , object )
}
// if object already transitioned, return
if fi . TransitionStatus == lifecycle . TransitionComplete {
return nil
}
2021-08-24 12:24:00 -07:00
defer NSUpdated ( bucket , object )
2021-04-19 10:30:42 -07:00
if fi . XLV1 {
if _ , err = er . HealObject ( ctx , bucket , object , "" , madmin . HealOpts { NoLock : true } ) ; err != nil {
return err
}
// Fetch FileInfo again. HealObject migrates object the latest
// format. Among other things this changes fi.DataDir and
// possibly fi.Data (if data is inlined).
fi , metaArr , onlineDisks , err = er . getObjectFileInfo ( ctx , bucket , object , opts , true )
if err != nil {
return toObjectErr ( err , bucket , object )
}
}
2021-07-16 09:38:27 -07:00
2021-07-20 10:49:52 -07:00
destObj , err := genTransitionObjName ( bucket )
2021-04-19 10:30:42 -07:00
if err != nil {
return err
}
2021-05-11 09:18:37 -07:00
pr , pw := xioutil . WaitPipe ( )
2021-04-19 10:30:42 -07:00
go func ( ) {
err := er . getObjectWithFileInfo ( ctx , bucket , object , 0 , fi . Size , pw , fi , metaArr , onlineDisks )
pw . CloseWithError ( err )
} ( )
2021-05-11 09:18:37 -07:00
2021-06-03 14:26:51 -07:00
var rv remoteVersionID
rv , err = tgtClient . Put ( ctx , destObj , pr , fi . Size )
2021-05-11 09:18:37 -07:00
pr . CloseWithError ( err )
if err != nil {
2021-04-19 10:30:42 -07:00
logger . LogIf ( ctx , fmt . Errorf ( "Unable to transition %s/%s(%s) to %s tier: %w" , bucket , object , opts . VersionID , opts . Transition . Tier , err ) )
return err
}
fi . TransitionStatus = lifecycle . TransitionComplete
fi . TransitionedObjName = destObj
fi . TransitionTier = opts . Transition . Tier
2021-06-03 14:26:51 -07:00
fi . TransitionVersionID = string ( rv )
2021-04-19 10:30:42 -07:00
eventName := event . ObjectTransitionComplete
storageDisks := er . getDisks ( )
2021-09-01 08:57:42 -07:00
2022-08-18 16:41:59 -07:00
if err = er . deleteObjectVersion ( ctx , bucket , object , fi , false ) ; err != nil {
2021-04-19 10:30:42 -07:00
eventName = event . ObjectTransitionFailed
}
2021-08-24 12:24:00 -07:00
2021-04-19 10:30:42 -07:00
for _ , disk := range storageDisks {
if disk != nil && disk . IsOnline ( ) {
continue
}
2021-07-16 06:32:06 +01:00
er . addPartial ( bucket , object , opts . VersionID , - 1 )
2021-04-19 10:30:42 -07:00
break
}
2021-07-30 12:45:25 -07:00
2022-05-31 02:57:57 -07:00
objInfo := fi . ToObjectInfo ( bucket , object , opts . Versioned || opts . VersionSuspended )
2021-04-19 10:30:42 -07:00
sendEvent ( eventArgs {
EventName : eventName ,
BucketName : bucket ,
2021-07-30 12:45:25 -07:00
Object : objInfo ,
Host : "Internal: [ILM-Transition]" ,
2021-04-19 10:30:42 -07:00
} )
2021-07-30 12:45:25 -07:00
auditLogLifecycle ( ctx , objInfo , ILMTransition )
2021-04-19 10:30:42 -07:00
return err
}
// RestoreTransitionedObject - restore transitioned object content locally on this cluster.
// This is similar to PostObjectRestore from AWS GLACIER
// storage class. When PostObjectRestore API is called, a temporary copy of the object
// is restored locally to the bucket on source cluster until the restore expiry date.
// The copy that was transitioned continues to reside in the transitioned tier.
func ( er erasureObjects ) RestoreTransitionedObject ( ctx context . Context , bucket , object string , opts ObjectOptions ) error {
return er . restoreTransitionedObject ( ctx , bucket , object , opts )
}
// update restore status header in the metadata
2021-04-23 10:52:26 -07:00
func ( er erasureObjects ) updateRestoreMetadata ( ctx context . Context , bucket , object string , objInfo ObjectInfo , opts ObjectOptions , rerr error ) error {
2021-04-19 10:30:42 -07:00
oi := objInfo . Clone ( )
oi . metadataOnly = true // Perform only metadata updates.
if rerr == nil {
oi . UserDefined [ xhttp . AmzRestore ] = completedRestoreObj ( opts . Transition . RestoreExpiry ) . String ( )
} else { // allow retry in the case of failure to restore
delete ( oi . UserDefined , xhttp . AmzRestore )
}
if _ , err := er . CopyObject ( ctx , bucket , object , bucket , object , oi , ObjectOptions {
VersionID : oi . VersionID ,
} , ObjectOptions {
VersionID : oi . VersionID ,
} ) ; err != nil {
logger . LogIf ( ctx , fmt . Errorf ( "Unable to update transition restore metadata for %s/%s(%s): %s" , bucket , object , oi . VersionID , err ) )
return err
}
return nil
}
// restoreTransitionedObject for multipart object chunks the file stream from remote tier into the same number of parts
// as in the xl.meta for this version and rehydrates the part.n into the fi.DataDir for this version as in the xl.meta
func ( er erasureObjects ) restoreTransitionedObject ( ctx context . Context , bucket string , object string , opts ObjectOptions ) error {
2021-04-23 10:52:26 -07:00
setRestoreHeaderFn := func ( oi ObjectInfo , rerr error ) error {
er . updateRestoreMetadata ( ctx , bucket , object , oi , opts , rerr )
2021-04-19 10:30:42 -07:00
return rerr
}
var oi ObjectInfo
// get the file info on disk for transitioned object
actualfi , _ , _ , err := er . getObjectFileInfo ( ctx , bucket , object , opts , false )
if err != nil {
2021-04-23 10:52:26 -07:00
return setRestoreHeaderFn ( oi , toObjectErr ( err , bucket , object ) )
2021-04-19 10:30:42 -07:00
}
2022-05-31 02:57:57 -07:00
oi = actualfi . ToObjectInfo ( bucket , object , opts . Versioned || opts . VersionSuspended )
2021-04-24 19:07:27 -07:00
ropts := putRestoreOpts ( bucket , object , opts . Transition . RestoreRequest , oi )
2021-04-19 10:30:42 -07:00
if len ( oi . Parts ) == 1 {
var rs * HTTPRangeSpec
gr , err := getTransitionedObjectReader ( ctx , bucket , object , rs , http . Header { } , oi , opts )
if err != nil {
2021-04-23 10:52:26 -07:00
return setRestoreHeaderFn ( oi , toObjectErr ( err , bucket , object ) )
2021-04-19 10:30:42 -07:00
}
defer gr . Close ( )
hashReader , err := hash . NewReader ( gr , gr . ObjInfo . Size , "" , "" , gr . ObjInfo . Size )
if err != nil {
2021-04-23 10:52:26 -07:00
return setRestoreHeaderFn ( oi , toObjectErr ( err , bucket , object ) )
2021-04-19 10:30:42 -07:00
}
pReader := NewPutObjReader ( hashReader )
ropts . UserDefined [ xhttp . AmzRestore ] = completedRestoreObj ( opts . Transition . RestoreExpiry ) . String ( )
_ , err = er . PutObject ( ctx , bucket , object , pReader , ropts )
2021-04-23 10:52:26 -07:00
return setRestoreHeaderFn ( oi , toObjectErr ( err , bucket , object ) )
2021-04-19 10:30:42 -07:00
}
2021-04-24 19:07:27 -07:00
2022-08-30 01:57:16 +02:00
res , err := er . NewMultipartUpload ( ctx , bucket , object , ropts )
2021-04-19 10:30:42 -07:00
if err != nil {
2021-04-23 10:52:26 -07:00
return setRestoreHeaderFn ( oi , err )
2021-04-19 10:30:42 -07:00
}
2021-04-24 19:07:27 -07:00
2021-04-19 10:30:42 -07:00
var uploadedParts [ ] CompletePart
var rs * HTTPRangeSpec
// get reader from the warm backend - note that even in the case of encrypted objects, this stream is still encrypted.
gr , err := getTransitionedObjectReader ( ctx , bucket , object , rs , http . Header { } , oi , opts )
if err != nil {
2021-04-23 10:52:26 -07:00
return setRestoreHeaderFn ( oi , err )
2021-04-19 10:30:42 -07:00
}
defer gr . Close ( )
2021-04-24 19:07:27 -07:00
2021-04-19 10:30:42 -07:00
// rehydrate the parts back on disk as per the original xl.meta prior to transition
for _ , partInfo := range oi . Parts {
hr , err := hash . NewReader ( gr , partInfo . Size , "" , "" , partInfo . Size )
if err != nil {
2021-04-23 10:52:26 -07:00
return setRestoreHeaderFn ( oi , err )
2021-04-19 10:30:42 -07:00
}
2022-08-30 01:57:16 +02:00
pInfo , err := er . PutObjectPart ( ctx , bucket , object , res . UploadID , partInfo . Number , NewPutObjReader ( hr ) , ObjectOptions { } )
2021-04-19 10:30:42 -07:00
if err != nil {
2021-04-23 10:52:26 -07:00
return setRestoreHeaderFn ( oi , err )
2021-04-19 10:30:42 -07:00
}
2021-04-26 18:24:06 -07:00
if pInfo . Size != partInfo . Size {
return setRestoreHeaderFn ( oi , InvalidObjectState { Bucket : bucket , Object : object } )
}
2021-04-19 10:30:42 -07:00
uploadedParts = append ( uploadedParts , CompletePart {
PartNumber : pInfo . PartNumber ,
ETag : pInfo . ETag ,
} )
}
2022-08-30 01:57:16 +02:00
_ , err = er . CompleteMultipartUpload ( ctx , bucket , object , res . UploadID , uploadedParts , ObjectOptions {
2022-01-02 09:15:06 -08:00
MTime : oi . ModTime ,
} )
2021-04-26 18:24:06 -07:00
return setRestoreHeaderFn ( oi , err )
2021-04-19 10:30:42 -07:00
}