2016-06-01 16:43:31 -07:00
/ *
2019-04-09 11:39:42 -07:00
* MinIO Cloud Storage , ( C ) 2016 , 2017 , 2018 MinIO , Inc .
2016-06-01 16:43:31 -07:00
*
* Licensed under the Apache License , Version 2.0 ( the "License" ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
*
* http : //www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an "AS IS" BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
* /
2016-08-18 16:23:42 -07:00
package cmd
2016-05-20 20:48:47 -07:00
import (
2018-09-20 19:22:09 -07:00
"bytes"
2018-03-14 12:01:47 -07:00
"context"
2020-03-03 03:29:30 +03:00
"fmt"
2016-05-20 20:48:47 -07:00
"io"
2018-09-20 19:22:09 -07:00
"net/http"
2016-05-20 20:48:47 -07:00
"path"
2019-10-06 22:50:24 -07:00
xhttp "github.com/minio/minio/cmd/http"
2018-04-05 15:04:40 -07:00
"github.com/minio/minio/cmd/logger"
2020-01-28 03:42:34 +05:30
"github.com/minio/minio/pkg/bucket/object/tagging"
2016-05-20 20:48:47 -07:00
"github.com/minio/minio/pkg/mimedb"
2019-10-14 09:44:51 -07:00
"github.com/minio/minio/pkg/sync/errgroup"
2016-05-20 20:48:47 -07:00
)
2016-11-20 16:57:12 -08:00
// list all errors which can be ignored in object operations.
2017-02-01 11:16:17 -08:00
var objectOpIgnoredErrs = append ( baseIgnoredErrs , errDiskAccessDenied )
2016-11-20 16:57:12 -08:00
2018-01-29 18:43:13 -08:00
// putObjectDir hints the bottom layer to create a new directory.
2018-04-05 15:04:40 -07:00
func ( xl xlObjects ) putObjectDir ( ctx context . Context , bucket , object string , writeQuorum int ) error {
2019-10-14 09:44:51 -07:00
storageDisks := xl . getDisks ( )
g := errgroup . WithNErrs ( len ( storageDisks ) )
2018-01-29 18:43:13 -08:00
// Prepare object creation in all disks
2019-10-14 09:44:51 -07:00
for index := range storageDisks {
if storageDisks [ index ] == nil {
2018-01-29 18:43:13 -08:00
continue
}
2019-10-14 09:44:51 -07:00
index := index
g . Go ( func ( ) error {
err := storageDisks [ index ] . MakeVol ( pathJoin ( bucket , object ) )
if err != nil && err != errVolumeExists {
return err
2018-01-29 18:43:13 -08:00
}
2019-10-14 09:44:51 -07:00
return nil
} , index )
2018-01-29 18:43:13 -08:00
}
2019-10-14 09:44:51 -07:00
return reduceWriteQuorumErrs ( ctx , g . Wait ( ) , objectOpIgnoredErrs , writeQuorum )
2018-01-29 18:43:13 -08:00
}
2016-05-20 20:48:47 -07:00
/// Object Operations
2016-12-26 16:29:26 -08:00
// CopyObject - copy object source object to destination object.
// if source object and destination object are same we only
// update metadata.
2018-09-10 09:42:43 -07:00
func ( xl xlObjects ) CopyObject ( ctx context . Context , srcBucket , srcObject , dstBucket , dstObject string , srcInfo ObjectInfo , srcOpts , dstOpts ObjectOptions ) ( oi ObjectInfo , e error ) {
2018-02-23 15:07:21 -08:00
cpSrcDstSame := isStringEqual ( pathJoin ( srcBucket , srcObject ) , pathJoin ( dstBucket , dstObject ) )
2018-01-12 20:34:52 -08:00
2019-05-08 18:35:40 -07:00
// Check if this request is only metadata update.
if cpSrcDstSame {
// Read metadata associated with the object from all disks.
storageDisks := xl . getDisks ( )
2017-12-22 16:58:13 +05:30
2019-05-08 18:35:40 -07:00
metaArr , errs := readAllXLMetadata ( ctx , storageDisks , srcBucket , srcObject )
2017-12-22 16:58:13 +05:30
2019-05-08 18:35:40 -07:00
// get Quorum for this object
readQuorum , writeQuorum , err := objectQuorumFromMeta ( ctx , xl , metaArr , errs )
if err != nil {
return oi , toObjectErr ( err , srcBucket , srcObject )
}
2016-12-26 16:29:26 -08:00
2019-05-08 18:35:40 -07:00
if reducedErr := reduceReadQuorumErrs ( ctx , errs , objectOpIgnoredErrs , readQuorum ) ; reducedErr != nil {
return oi , toObjectErr ( reducedErr , srcBucket , srcObject )
}
2016-12-26 16:29:26 -08:00
2019-05-08 18:35:40 -07:00
// List all online disks.
_ , modTime := listOnlineDisks ( storageDisks , metaArr , errs )
2016-12-26 16:29:26 -08:00
2019-05-08 18:35:40 -07:00
// Pick latest valid metadata.
xlMeta , err := pickValidXLMeta ( ctx , metaArr , modTime , readQuorum )
if err != nil {
return oi , toObjectErr ( err , srcBucket , srcObject )
}
2016-12-26 16:29:26 -08:00
// Update `xl.json` content on each disks.
2018-03-02 17:24:02 -08:00
for index := range metaArr {
metaArr [ index ] . Meta = srcInfo . UserDefined
2018-03-09 10:50:39 -08:00
metaArr [ index ] . Meta [ "etag" ] = srcInfo . ETag
2016-12-26 16:29:26 -08:00
}
2018-03-02 17:24:02 -08:00
var onlineDisks [ ] StorageAPI
2016-12-26 16:29:26 -08:00
tempObj := mustGetUUID ( )
2019-04-25 15:33:26 +01:00
// Cleanup in case of xl.json writing failure
defer xl . deleteObject ( ctx , minioMetaTmpBucket , tempObj , writeQuorum , false )
2016-12-26 16:29:26 -08:00
// Write unique `xl.json` for each disk.
2018-04-05 15:04:40 -07:00
if onlineDisks , err = writeUniqueXLMetadata ( ctx , storageDisks , minioMetaTmpBucket , tempObj , metaArr , writeQuorum ) ; err != nil {
2017-06-21 19:53:09 -07:00
return oi , toObjectErr ( err , srcBucket , srcObject )
2016-12-26 16:29:26 -08:00
}
2019-05-08 18:35:40 -07:00
2016-12-26 16:29:26 -08:00
// Rename atomically `xl.json` from tmp location to destination for each disk.
2018-04-05 15:04:40 -07:00
if _ , err = renameXLMetadata ( ctx , onlineDisks , minioMetaTmpBucket , tempObj , srcBucket , srcObject , writeQuorum ) ; err != nil {
2017-06-21 19:53:09 -07:00
return oi , toObjectErr ( err , srcBucket , srcObject )
2016-12-26 16:29:26 -08:00
}
2019-05-08 18:35:40 -07:00
return xlMeta . ToObjectInfo ( srcBucket , srcObject ) , nil
2016-12-26 16:29:26 -08:00
}
2019-05-08 18:35:40 -07:00
putOpts := ObjectOptions { ServerSideEncryption : dstOpts . ServerSideEncryption , UserDefined : srcInfo . UserDefined }
2019-05-13 22:42:06 +05:30
return xl . PutObject ( ctx , dstBucket , dstObject , srcInfo . PutObjReader , putOpts )
2016-12-26 16:29:26 -08:00
}
2018-09-20 19:22:09 -07:00
// GetObjectNInfo - returns object info and an object
// Read(Closer). When err != nil, the returned reader is always nil.
2018-09-27 03:06:45 -07:00
func ( xl xlObjects ) GetObjectNInfo ( ctx context . Context , bucket , object string , rs * HTTPRangeSpec , h http . Header , lockType LockType , opts ObjectOptions ) ( gr * GetObjectReader , err error ) {
2018-09-20 19:22:09 -07:00
if err = checkGetObjArgs ( ctx , bucket , object ) ; err != nil {
return nil , err
}
// Handler directory request by returning a reader that
// returns no bytes.
2019-12-06 12:46:06 +05:30
if HasSuffix ( object , SlashSeparator ) {
2018-09-20 19:22:09 -07:00
var objInfo ObjectInfo
if objInfo , err = xl . getObjectInfoDir ( ctx , bucket , object ) ; err != nil {
return nil , toObjectErr ( err , bucket , object )
}
2019-11-19 17:42:27 -08:00
return NewGetObjectReaderFromReader ( bytes . NewBuffer ( nil ) , objInfo , opts . CheckCopyPrecondFn )
2018-09-20 19:22:09 -07:00
}
var objInfo ObjectInfo
objInfo , err = xl . getObjectInfo ( ctx , bucket , object )
if err != nil {
return nil , toObjectErr ( err , bucket , object )
}
2019-11-19 17:42:27 -08:00
fn , off , length , nErr := NewGetObjectReader ( rs , objInfo , opts . CheckCopyPrecondFn )
2018-09-20 19:22:09 -07:00
if nErr != nil {
return nil , nErr
}
pr , pw := io . Pipe ( )
go func ( ) {
2018-09-27 03:06:45 -07:00
err := xl . getObject ( ctx , bucket , object , off , length , pw , "" , opts )
2018-09-20 19:22:09 -07:00
pw . CloseWithError ( err )
} ( )
2018-09-21 11:42:06 -07:00
// Cleanup function to cause the go routine above to exit, in
// case of incomplete read.
pipeCloser := func ( ) { pr . Close ( ) }
2018-09-20 19:22:09 -07:00
2019-03-06 21:38:41 +01:00
return fn ( pr , h , opts . CheckCopyPrecondFn , pipeCloser )
2018-09-20 19:22:09 -07:00
}
2016-06-01 16:43:31 -07:00
// GetObject - reads an object erasured coded across multiple
// disks. Supports additional parameters like offset and length
2016-12-26 16:29:26 -08:00
// which are synonymous with HTTP Range requests.
2016-06-01 16:43:31 -07:00
//
2016-12-26 16:29:26 -08:00
// startOffset indicates the starting read location of the object.
// length indicates the total length of the object.
2018-09-10 09:42:43 -07:00
func ( xl xlObjects ) GetObject ( ctx context . Context , bucket , object string , startOffset int64 , length int64 , writer io . Writer , etag string , opts ObjectOptions ) error {
return xl . getObject ( ctx , bucket , object , startOffset , length , writer , etag , opts )
2018-01-12 20:34:52 -08:00
}
// getObject wrapper for xl GetObject
2018-09-10 09:42:43 -07:00
func ( xl xlObjects ) getObject ( ctx context . Context , bucket , object string , startOffset int64 , length int64 , writer io . Writer , etag string , opts ObjectOptions ) error {
2018-01-12 20:34:52 -08:00
2018-04-05 15:04:40 -07:00
if err := checkGetObjArgs ( ctx , bucket , object ) ; err != nil {
2016-12-01 23:15:17 -08:00
return err
2016-05-20 20:48:47 -07:00
}
2016-12-21 11:29:32 -08:00
// Start offset cannot be negative.
if startOffset < 0 {
2019-10-11 18:50:54 -07:00
logger . LogIf ( ctx , errUnexpected , logger . Application )
2018-04-05 15:04:40 -07:00
return errUnexpected
2016-07-07 01:30:34 -07:00
}
2016-12-21 11:29:32 -08:00
2016-07-08 07:46:49 -07:00
// Writer cannot be nil.
if writer == nil {
2018-04-05 15:04:40 -07:00
logger . LogIf ( ctx , errUnexpected )
return errUnexpected
2016-07-08 07:46:49 -07:00
}
2016-09-01 00:09:08 +05:30
2018-01-29 18:43:13 -08:00
// If its a directory request, we return an empty body.
2019-12-06 12:46:06 +05:30
if HasSuffix ( object , SlashSeparator ) {
2018-01-29 18:43:13 -08:00
_ , err := writer . Write ( [ ] byte ( "" ) )
2018-04-05 15:04:40 -07:00
logger . LogIf ( ctx , err )
return toObjectErr ( err , bucket , object )
2018-01-29 18:43:13 -08:00
}
2016-05-31 20:23:31 -07:00
// Read metadata associated with the object from all disks.
2018-04-05 15:04:40 -07:00
metaArr , errs := readAllXLMetadata ( ctx , xl . getDisks ( ) , bucket , object )
2017-12-22 16:58:13 +05:30
// get Quorum for this object
2018-07-31 00:23:29 -07:00
readQuorum , _ , err := objectQuorumFromMeta ( ctx , xl , metaArr , errs )
2017-12-22 16:58:13 +05:30
if err != nil {
return toObjectErr ( err , bucket , object )
}
2018-04-05 15:04:40 -07:00
if reducedErr := reduceReadQuorumErrs ( ctx , errs , objectOpIgnoredErrs , readQuorum ) ; reducedErr != nil {
2016-07-19 19:24:32 -07:00
return toObjectErr ( reducedErr , bucket , object )
2016-07-10 01:31:32 +05:30
}
2016-05-25 16:42:31 -07:00
// List all online disks.
2018-02-15 17:45:57 -08:00
onlineDisks , modTime := listOnlineDisks ( xl . getDisks ( ) , metaArr , errs )
2016-05-26 19:55:48 -07:00
2016-06-17 10:56:18 +05:30
// Pick latest valid metadata.
2018-08-17 14:42:04 -07:00
xlMeta , err := pickValidXLMeta ( ctx , metaArr , modTime , readQuorum )
2016-11-21 10:26:44 +05:30
if err != nil {
return err
}
2016-07-24 22:49:27 -07:00
// Reorder online disks based on erasure distribution order.
2017-02-24 09:20:40 -08:00
onlineDisks = shuffleDisks ( onlineDisks , xlMeta . Erasure . Distribution )
2016-07-24 22:49:27 -07:00
// Reorder parts metadata based on erasure distribution order.
2017-02-24 09:20:40 -08:00
metaArr = shufflePartsMetadata ( metaArr , xlMeta . Erasure . Distribution )
2016-05-25 16:42:31 -07:00
2016-12-21 11:29:32 -08:00
// For negative length read everything.
if length < 0 {
length = xlMeta . Stat . Size - startOffset
2016-07-08 07:46:49 -07:00
}
2016-12-21 11:29:32 -08:00
// Reply back invalid range if the input offset and length fall out of range.
if startOffset > xlMeta . Stat . Size || startOffset + length > xlMeta . Stat . Size {
2019-10-11 18:50:54 -07:00
logger . LogIf ( ctx , InvalidRange { startOffset , length , xlMeta . Stat . Size } , logger . Application )
2018-04-05 15:04:40 -07:00
return InvalidRange { startOffset , length , xlMeta . Stat . Size }
2016-07-08 07:46:49 -07:00
}
2016-06-20 02:05:26 +05:30
// Get start part index and offset.
2018-04-05 15:04:40 -07:00
partIndex , partOffset , err := xlMeta . ObjectToPartOffset ( ctx , startOffset )
2016-05-20 20:48:47 -07:00
if err != nil {
2018-04-05 15:04:40 -07:00
return InvalidRange { startOffset , length , xlMeta . Stat . Size }
2016-05-20 20:48:47 -07:00
}
2016-05-31 20:23:31 -07:00
2017-01-27 19:51:02 +01:00
// Calculate endOffset according to length
endOffset := startOffset
if length > 0 {
endOffset += length - 1
}
2016-06-20 02:05:26 +05:30
// Get last part index to read given length.
2018-04-05 15:04:40 -07:00
lastPartIndex , _ , err := xlMeta . ObjectToPartOffset ( ctx , endOffset )
2016-06-20 02:05:26 +05:30
if err != nil {
2018-04-05 15:04:40 -07:00
return InvalidRange { startOffset , length , xlMeta . Stat . Size }
2016-06-20 02:05:26 +05:30
}
2017-02-24 09:20:40 -08:00
var totalBytesRead int64
2018-08-23 23:35:37 -07:00
erasure , err := NewErasure ( ctx , xlMeta . Erasure . DataBlocks , xlMeta . Erasure . ParityBlocks , xlMeta . Erasure . BlockSize )
2017-08-14 18:08:42 -07:00
if err != nil {
return toObjectErr ( err , bucket , object )
}
2018-08-06 15:14:08 -07:00
2016-06-20 02:05:26 +05:30
for ; partIndex <= lastPartIndex ; partIndex ++ {
2016-06-22 03:04:11 +05:30
if length == totalBytesRead {
break
}
2020-03-03 03:29:30 +03:00
partNumber := xlMeta . Parts [ partIndex ] . Number
2016-05-31 20:23:31 -07:00
// Save the current part name and size.
partSize := xlMeta . Parts [ partIndex ] . Size
2016-06-22 21:35:03 +05:30
2018-08-06 15:14:08 -07:00
partLength := partSize - partOffset
// partLength should be adjusted so that we don't write more data than what was requested.
if partLength > ( length - totalBytesRead ) {
partLength = length - totalBytesRead
2016-06-20 02:05:26 +05:30
}
2016-05-31 20:23:31 -07:00
2019-01-17 04:58:18 -08:00
tillOffset := erasure . ShardFileTillOffset ( partOffset , partLength , partSize )
2016-07-16 21:05:30 +05:30
// Get the checksums of the current part.
2019-01-17 04:58:18 -08:00
readers := make ( [ ] io . ReaderAt , len ( onlineDisks ) )
2018-08-06 15:14:08 -07:00
for index , disk := range onlineDisks {
2017-08-14 18:08:42 -07:00
if disk == OfflineDisk {
2016-07-24 22:49:27 -07:00
continue
}
2020-03-03 03:29:30 +03:00
checksumInfo := metaArr [ index ] . Erasure . GetChecksumInfo ( partNumber )
partPath := pathJoin ( object , fmt . Sprintf ( "part.%d" , partNumber ) )
readers [ index ] = newBitrotReader ( disk , bucket , partPath , tillOffset ,
checksumInfo . Algorithm , checksumInfo . Hash , erasure . ShardSize ( ) )
2016-07-16 21:05:30 +05:30
}
2019-01-17 04:58:18 -08:00
err := erasure . Decode ( ctx , writer , readers , partOffset , partLength , partSize )
// Note: we should not be defer'ing the following closeBitrotReaders() call as we are inside a for loop i.e if we use defer, we would accumulate a lot of open files by the time
// we return from this function.
closeBitrotReaders ( readers )
2016-05-31 20:23:31 -07:00
if err != nil {
2016-07-08 20:34:27 -07:00
return toObjectErr ( err , bucket , object )
2016-05-31 20:23:31 -07:00
}
2019-01-17 04:58:18 -08:00
for i , r := range readers {
2018-08-06 15:14:08 -07:00
if r == nil {
onlineDisks [ i ] = OfflineDisk
}
}
2016-07-28 02:20:34 -07:00
// Track total bytes read from disk and written to the client.
2018-08-06 15:14:08 -07:00
totalBytesRead += partLength
2016-05-31 20:23:31 -07:00
2016-06-22 21:35:03 +05:30
// partOffset will be valid only for the first part, hence reset it to 0 for
// the remaining parts.
2016-05-31 20:23:31 -07:00
partOffset = 0
2016-06-01 16:43:31 -07:00
} // End of read all parts loop.
2016-05-31 20:23:31 -07:00
// Return success.
2016-05-28 15:13:15 -07:00
return nil
2016-05-20 20:48:47 -07:00
}
2018-01-29 18:43:13 -08:00
// getObjectInfoDir - This getObjectInfo is specific to object directory lookup.
2019-10-14 09:44:51 -07:00
func ( xl xlObjects ) getObjectInfoDir ( ctx context . Context , bucket , object string ) ( ObjectInfo , error ) {
storageDisks := xl . getDisks ( )
g := errgroup . WithNErrs ( len ( storageDisks ) )
2018-01-29 18:43:13 -08:00
// Prepare object creation in a all disks
2019-10-14 09:44:51 -07:00
for index , disk := range storageDisks {
2018-01-29 18:43:13 -08:00
if disk == nil {
continue
}
2019-10-14 09:44:51 -07:00
index := index
g . Go ( func ( ) error {
2019-04-23 14:54:28 -07:00
// Check if 'prefix' is an object on this 'disk'.
2019-10-14 09:44:51 -07:00
entries , err := storageDisks [ index ] . ListDir ( bucket , object , 1 , "" )
2019-04-23 14:54:28 -07:00
if err != nil {
2019-10-14 09:44:51 -07:00
return err
2019-04-23 14:54:28 -07:00
}
if len ( entries ) > 0 {
// Not a directory if not empty.
2019-10-14 09:44:51 -07:00
return errFileNotFound
2018-01-29 18:43:13 -08:00
}
2019-10-14 09:44:51 -07:00
return nil
} , index )
2018-01-29 18:43:13 -08:00
}
2019-10-14 09:44:51 -07:00
readQuorum := len ( storageDisks ) / 2
err := reduceReadQuorumErrs ( ctx , g . Wait ( ) , objectOpIgnoredErrs , readQuorum )
return dirObjectInfo ( bucket , object , 0 , map [ string ] string { } ) , err
2018-01-29 18:43:13 -08:00
}
2016-06-01 16:43:31 -07:00
// GetObjectInfo - reads object metadata and replies back ObjectInfo.
2018-09-10 09:42:43 -07:00
func ( xl xlObjects ) GetObjectInfo ( ctx context . Context , bucket , object string , opts ObjectOptions ) ( oi ObjectInfo , e error ) {
2018-04-05 15:04:40 -07:00
if err := checkGetObjArgs ( ctx , bucket , object ) ; err != nil {
2017-06-21 19:53:09 -07:00
return oi , err
2016-05-20 20:48:47 -07:00
}
2016-09-01 00:09:08 +05:30
2019-12-06 12:46:06 +05:30
if HasSuffix ( object , SlashSeparator ) {
2019-04-23 14:54:28 -07:00
info , err := xl . getObjectInfoDir ( ctx , bucket , object )
if err != nil {
return oi , toObjectErr ( err , bucket , object )
2018-02-02 00:34:15 -08:00
}
2019-04-23 14:54:28 -07:00
return info , nil
2018-01-29 18:43:13 -08:00
}
2018-04-05 15:04:40 -07:00
info , err := xl . getObjectInfo ( ctx , bucket , object )
2016-05-20 20:48:47 -07:00
if err != nil {
2017-06-21 19:53:09 -07:00
return oi , toObjectErr ( err , bucket , object )
2016-05-20 20:48:47 -07:00
}
2018-01-29 18:43:13 -08:00
2016-05-20 20:48:47 -07:00
return info , nil
}
2018-10-19 11:00:09 -07:00
// getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo.
func ( xl xlObjects ) getObjectInfo ( ctx context . Context , bucket , object string ) ( objInfo ObjectInfo , err error ) {
disks := xl . getDisks ( )
// Read metadata associated with the object from all disks.
metaArr , errs := readAllXLMetadata ( ctx , disks , bucket , object )
2019-02-05 17:58:48 -08:00
readQuorum , _ , err := objectQuorumFromMeta ( ctx , xl , metaArr , errs )
if err != nil {
return objInfo , err
2018-02-01 10:47:49 -08:00
}
// List all the file commit ids from parts metadata.
modTimes := listObjectModtimes ( metaArr , errs )
// Reduce list of UUIDs to a single common value.
modTime , _ := commonTime ( modTimes )
// Pick latest valid metadata.
2018-08-17 14:42:04 -07:00
xlMeta , err := pickValidXLMeta ( ctx , metaArr , modTime , readQuorum )
2016-05-25 01:33:39 -07:00
if err != nil {
2018-02-01 10:47:49 -08:00
return objInfo , err
2016-05-20 20:48:47 -07:00
}
2016-09-09 11:08:18 +05:30
2018-03-01 20:37:57 +01:00
return xlMeta . ToObjectInfo ( bucket , object ) , nil
2016-05-20 20:48:47 -07:00
}
2016-12-26 16:29:26 -08:00
func undoRename ( disks [ ] StorageAPI , srcBucket , srcEntry , dstBucket , dstEntry string , isDir bool , errs [ ] error ) {
2016-06-18 00:27:51 +05:30
// Undo rename object on disks where RenameFile succeeded.
2016-06-20 19:11:55 -07:00
// If srcEntry/dstEntry are objects then add a trailing slash to copy
// over all the parts inside the object directory
2016-12-26 16:29:26 -08:00
if isDir {
2016-06-20 19:11:55 -07:00
srcEntry = retainSlash ( srcEntry )
dstEntry = retainSlash ( dstEntry )
}
2019-10-14 09:44:51 -07:00
g := errgroup . WithNErrs ( len ( disks ) )
2016-07-11 22:53:54 -07:00
for index , disk := range disks {
2016-06-18 00:27:51 +05:30
if disk == nil {
continue
}
2019-10-14 09:44:51 -07:00
index := index
g . Go ( func ( ) error {
if errs [ index ] == nil {
_ = disks [ index ] . RenameFile ( dstBucket , dstEntry , srcBucket , srcEntry )
2016-06-18 00:27:51 +05:30
}
2019-10-14 09:44:51 -07:00
return nil
} , index )
2016-06-18 00:27:51 +05:30
}
2019-10-14 09:44:51 -07:00
g . Wait ( )
2016-06-18 00:27:51 +05:30
}
2016-06-20 19:11:55 -07:00
// rename - common function that renamePart and renameObject use to rename
// the respective underlying storage layer representations.
2018-04-11 17:15:42 -07:00
func rename ( ctx context . Context , disks [ ] StorageAPI , srcBucket , srcEntry , dstBucket , dstEntry string , isDir bool , writeQuorum int , ignoredErr [ ] error ) ( [ ] StorageAPI , error ) {
2016-05-20 20:48:47 -07:00
2016-12-26 16:29:26 -08:00
if isDir {
2016-06-20 19:11:55 -07:00
dstEntry = retainSlash ( dstEntry )
srcEntry = retainSlash ( srcEntry )
}
2019-10-14 09:44:51 -07:00
g := errgroup . WithNErrs ( len ( disks ) )
2016-05-20 20:48:47 -07:00
// Rename file on all underlying storage disks.
2019-10-14 09:44:51 -07:00
for index := range disks {
index := index
g . Go ( func ( ) error {
if disks [ index ] == nil {
return errDiskNotFound
}
if err := disks [ index ] . RenameFile ( srcBucket , srcEntry , dstBucket , dstEntry ) ; err != nil {
2018-04-11 17:15:42 -07:00
if ! IsErrIgnored ( err , ignoredErr ... ) {
2019-10-14 09:44:51 -07:00
return err
2018-04-11 17:15:42 -07:00
}
2016-05-20 20:48:47 -07:00
}
2019-10-14 09:44:51 -07:00
return nil
} , index )
2016-05-20 20:48:47 -07:00
}
2016-06-01 16:43:31 -07:00
// Wait for all renames to finish.
2019-10-14 09:44:51 -07:00
errs := g . Wait ( )
2016-05-20 20:48:47 -07:00
2018-02-15 17:45:57 -08:00
// We can safely allow RenameFile errors up to len(xl.getDisks()) - writeQuorum
2016-05-20 20:48:47 -07:00
// otherwise return failure. Cleanup successful renames.
2018-04-05 15:04:40 -07:00
err := reduceWriteQuorumErrs ( ctx , errs , objectOpIgnoredErrs , writeQuorum )
2018-04-10 09:36:37 -07:00
if err == errXLWriteQuorum {
2016-06-18 00:27:51 +05:30
// Undo all the partial rename operations.
2016-12-26 16:29:26 -08:00
undoRename ( disks , srcBucket , srcEntry , dstBucket , dstEntry , isDir , errs )
2016-05-20 20:48:47 -07:00
}
2017-06-15 01:14:27 +01:00
return evalDisks ( disks , errs ) , err
2016-05-20 20:48:47 -07:00
}
2016-06-01 16:43:31 -07:00
// PutObject - creates an object upon reading from the input stream
// until EOF, erasure codes the data across all disk and additionally
// writes `xl.json` which carries the necessary metadata for future
// object operations.
2019-02-08 21:31:06 -08:00
func ( xl xlObjects ) PutObject ( ctx context . Context , bucket string , object string , data * PutObjReader , opts ObjectOptions ) ( objInfo ObjectInfo , err error ) {
2018-02-09 15:19:30 -08:00
// Validate put object input args.
2018-04-05 15:04:40 -07:00
if err = checkPutObjectArgs ( ctx , bucket , object , xl , data . Size ( ) ) ; err != nil {
2018-02-09 15:19:30 -08:00
return ObjectInfo { } , err
}
2018-09-20 19:22:09 -07:00
2019-02-08 21:31:06 -08:00
return xl . putObject ( ctx , bucket , object , data , opts )
2018-01-12 20:34:52 -08:00
}
// putObject wrapper for xl PutObject
2019-02-08 21:31:06 -08:00
func ( xl xlObjects ) putObject ( ctx context . Context , bucket string , object string , r * PutObjReader , opts ObjectOptions ) ( objInfo ObjectInfo , err error ) {
2018-11-14 17:36:41 -08:00
data := r . Reader
2018-01-29 18:43:13 -08:00
uniqueID := mustGetUUID ( )
tempObj := uniqueID
// No metadata is set, allocate a new one.
2019-02-08 21:31:06 -08:00
if opts . UserDefined == nil {
opts . UserDefined = make ( map [ string ] string )
2018-01-29 18:43:13 -08:00
}
2019-10-06 22:50:24 -07:00
storageDisks := xl . getDisks ( )
2018-01-29 18:43:13 -08:00
// Get parity and data drive count based on storage class metadata
2019-10-22 22:59:13 -07:00
parityDrives := globalStorageClass . GetParityForSC ( opts . UserDefined [ xhttp . AmzStorageClass ] )
2019-10-06 22:50:24 -07:00
if parityDrives == 0 {
parityDrives = len ( storageDisks ) / 2
}
dataDrives := len ( storageDisks ) - parityDrives
2018-01-29 18:43:13 -08:00
// we now know the number of blocks this object needs for data and parity.
// writeQuorum is dataBlocks + 1
writeQuorum := dataDrives + 1
// Delete temporary object in the event of failure.
// If PutObject succeeded there would be no temporary
// object to delete.
2018-08-29 13:36:19 -07:00
defer xl . deleteObject ( ctx , minioMetaTmpBucket , tempObj , writeQuorum , false )
2018-01-29 18:43:13 -08:00
2017-01-20 16:33:01 -08:00
// This is a special case with size as '0' and object ends with
// a slash separator, we treat it like a valid operation and
// return success.
2017-09-19 12:40:27 -07:00
if isObjectDir ( object , data . Size ( ) ) {
2017-02-21 19:43:44 -08:00
// Check if an object is present as one of the parent dir.
// -- FIXME. (needs a new kind of lock).
2017-05-09 14:32:24 -07:00
// -- FIXME (this also causes performance issue when disks are down).
2018-04-05 15:04:40 -07:00
if xl . parentDirIsObject ( ctx , bucket , path . Dir ( object ) ) {
2019-03-20 13:06:53 -07:00
return ObjectInfo { } , toObjectErr ( errFileParentIsFile , bucket , object )
2017-02-21 19:43:44 -08:00
}
2018-01-29 18:43:13 -08:00
2018-04-05 15:04:40 -07:00
if err = xl . putObjectDir ( ctx , minioMetaTmpBucket , tempObj , writeQuorum ) ; err != nil {
return ObjectInfo { } , toObjectErr ( err , bucket , object )
2018-01-29 18:43:13 -08:00
}
2018-10-18 00:37:02 +01:00
// Rename the successfully written temporary object to final location. Ignore errFileAccessDenied
// error because it means that the target object dir exists and we want to be close to S3 specification.
2019-10-06 22:50:24 -07:00
if _ , err = rename ( ctx , storageDisks , minioMetaTmpBucket , tempObj , bucket , object , true , writeQuorum , [ ] error { errFileAccessDenied } ) ; err != nil {
2018-01-29 18:43:13 -08:00
return ObjectInfo { } , toObjectErr ( err , bucket , object )
}
2019-02-08 21:31:06 -08:00
return dirObjectInfo ( bucket , object , data . Size ( ) , opts . UserDefined ) , nil
2017-01-20 16:33:01 -08:00
}
2017-02-21 19:43:44 -08:00
2017-10-06 09:38:01 -07:00
// Validate input data size and it can never be less than zero.
2018-09-28 09:06:17 +05:30
if data . Size ( ) < - 1 {
2019-10-11 18:50:54 -07:00
logger . LogIf ( ctx , errInvalidArgument , logger . Application )
2018-04-05 15:04:40 -07:00
return ObjectInfo { } , toObjectErr ( errInvalidArgument )
2017-10-06 09:38:01 -07:00
}
2017-02-21 19:43:44 -08:00
// Check if an object is present as one of the parent dir.
// -- FIXME. (needs a new kind of lock).
2017-05-09 14:32:24 -07:00
// -- FIXME (this also causes performance issue when disks are down).
2018-04-05 15:04:40 -07:00
if xl . parentDirIsObject ( ctx , bucket , path . Dir ( object ) ) {
2019-03-20 13:06:53 -07:00
return ObjectInfo { } , toObjectErr ( errFileParentIsFile , bucket , object )
2017-02-21 19:43:44 -08:00
}
2017-01-31 00:44:42 +01:00
// Initialize parts metadata
2018-02-15 17:45:57 -08:00
partsMetadata := make ( [ ] xlMetaV1 , len ( xl . getDisks ( ) ) )
2017-01-31 00:44:42 +01:00
2017-12-22 16:58:13 +05:30
xlMeta := newXLMetaV1 ( object , dataDrives , parityDrives )
2016-07-13 11:56:25 -07:00
2017-01-31 00:44:42 +01:00
// Initialize xl meta.
for index := range partsMetadata {
partsMetadata [ index ] = xlMeta
}
// Order disks according to erasure distribution
2019-10-06 22:50:24 -07:00
onlineDisks := shuffleDisks ( storageDisks , xlMeta . Erasure . Distribution )
2017-01-31 00:44:42 +01:00
2018-08-23 23:35:37 -07:00
erasure , err := NewErasure ( ctx , xlMeta . Erasure . DataBlocks , xlMeta . Erasure . ParityBlocks , xlMeta . Erasure . BlockSize )
2017-08-14 18:08:42 -07:00
if err != nil {
return ObjectInfo { } , toObjectErr ( err , bucket , object )
}
2017-10-06 09:38:01 -07:00
2018-02-15 17:45:57 -08:00
// Fetch buffer for I/O, returns from the pool if not allocates a new one and returns.
2018-06-13 11:55:12 -07:00
var buffer [ ] byte
switch size := data . Size ( ) ; {
case size == 0 :
buffer = make ( [ ] byte , 1 ) // Allocate atleast a byte to reach EOF
2018-10-04 23:37:15 +01:00
case size == - 1 || size >= blockSizeV1 :
2018-09-28 09:06:17 +05:30
buffer = xl . bp . Get ( )
defer xl . bp . Put ( buffer )
2018-06-13 11:55:12 -07:00
case size < blockSizeV1 :
// No need to allocate fully blockSizeV1 buffer if the incoming data is smaller.
2020-01-18 23:21:58 +01:00
buffer = make ( [ ] byte , size , 2 * size + int64 ( erasure . parityBlocks + erasure . dataBlocks - 1 ) )
2018-06-13 11:55:12 -07:00
}
2017-10-06 09:38:01 -07:00
2018-08-06 15:14:08 -07:00
if len ( buffer ) > int ( xlMeta . Erasure . BlockSize ) {
buffer = buffer [ : xlMeta . Erasure . BlockSize ]
}
2019-05-14 12:33:18 -07:00
partName := "part.1"
tempErasureObj := pathJoin ( uniqueID , partName )
2017-01-31 00:44:42 +01:00
2019-05-14 12:33:18 -07:00
writers := make ( [ ] io . Writer , len ( onlineDisks ) )
for i , disk := range onlineDisks {
if disk == nil {
continue
2018-09-28 09:06:17 +05:30
}
2019-05-14 12:33:18 -07:00
writers [ i ] = newBitrotWriter ( disk , minioMetaTmpBucket , tempErasureObj , erasure . ShardFileSize ( data . Size ( ) ) , DefaultBitrotAlgorithm , erasure . ShardSize ( ) )
}
2018-09-28 09:06:17 +05:30
2019-05-14 12:33:18 -07:00
n , erasureErr := erasure . Encode ( ctx , data , writers , buffer , erasure . dataBlocks + 1 )
closeBitrotWriters ( writers )
if erasureErr != nil {
return ObjectInfo { } , toObjectErr ( erasureErr , minioMetaTmpBucket , tempErasureObj )
}
2017-08-14 18:08:42 -07:00
2019-05-14 12:33:18 -07:00
// Should return IncompleteBody{} error when reader has fewer bytes
// than specified in request header.
if n < data . Size ( ) {
2019-10-11 18:50:54 -07:00
logger . LogIf ( ctx , IncompleteBody { } , logger . Application )
2019-05-14 12:33:18 -07:00
return ObjectInfo { } , IncompleteBody { }
}
2017-01-31 15:34:49 -08:00
2019-05-14 12:33:18 -07:00
for i , w := range writers {
if w == nil {
onlineDisks [ i ] = nil
continue
2017-01-31 00:44:42 +01:00
}
2020-03-03 03:29:30 +03:00
partsMetadata [ i ] . AddObjectPart ( 1 , "" , n , data . ActualSize ( ) )
partsMetadata [ i ] . Erasure . AddChecksumInfo ( ChecksumInfo {
PartNumber : 1 ,
Algorithm : DefaultBitrotAlgorithm ,
Hash : bitrotWriterSum ( w ) ,
} )
2016-07-18 19:06:48 -07:00
}
2016-07-01 14:33:28 -07:00
2016-05-20 20:48:47 -07:00
// Save additional erasureMetadata.
2017-03-18 23:58:41 +05:30
modTime := UTCNow ( )
2018-11-14 17:36:41 -08:00
2019-02-08 21:31:06 -08:00
opts . UserDefined [ "etag" ] = r . MD5CurrentHexString ( )
2016-06-01 16:43:31 -07:00
2016-06-16 21:42:02 -07:00
// Guess content-type from the extension if possible.
2019-02-08 21:31:06 -08:00
if opts . UserDefined [ "content-type" ] == "" {
opts . UserDefined [ "content-type" ] = mimedb . TypeByExtension ( path . Ext ( object ) )
2016-05-20 20:48:47 -07:00
}
2016-06-20 18:48:47 +05:30
if xl . isObject ( bucket , object ) {
2018-10-04 04:39:24 +01:00
// Deny if WORM is enabled
2020-01-13 17:29:31 -08:00
if isWORMEnabled ( bucket ) {
2019-11-20 13:18:09 -08:00
if _ , err := xl . getObjectInfo ( ctx , bucket , object ) ; err == nil {
2019-11-13 04:20:18 +05:30
return ObjectInfo { } , ObjectAlreadyExists { Bucket : bucket , Object : object }
}
2018-10-04 04:39:24 +01:00
}
2017-02-21 19:43:44 -08:00
// Rename if an object already exists to temporary location.
newUniqueID := mustGetUUID ( )
// Delete successfully renamed object.
2018-08-29 13:36:19 -07:00
defer xl . deleteObject ( ctx , minioMetaTmpBucket , newUniqueID , writeQuorum , false )
2016-10-20 11:22:03 +05:30
2018-10-05 01:22:49 +01:00
// NOTE: Do not use online disks slice here: the reason is that existing object should be purged
// regardless of `xl.json` status and rolled back in case of errors. Also allow renaming the
// existing object if it is not present in quorum disks so users can overwrite stale objects.
2020-01-16 03:30:32 +01:00
_ , err = rename ( ctx , storageDisks , bucket , object , minioMetaTmpBucket , newUniqueID , true , writeQuorum , [ ] error { errFileNotFound } )
2016-06-20 18:48:47 +05:30
if err != nil {
2016-09-03 00:48:35 +05:30
return ObjectInfo { } , toObjectErr ( err , bucket , object )
2016-06-20 18:48:47 +05:30
}
2016-05-20 20:48:47 -07:00
}
2016-05-26 19:55:48 -07:00
// Fill all the necessary metadata.
2016-06-01 16:43:31 -07:00
// Update `xl.json` content on each disks.
2016-05-31 20:23:31 -07:00
for index := range partsMetadata {
2019-02-08 21:31:06 -08:00
partsMetadata [ index ] . Meta = opts . UserDefined
2019-05-14 12:33:18 -07:00
partsMetadata [ index ] . Stat . Size = n
2017-01-31 00:44:42 +01:00
partsMetadata [ index ] . Stat . ModTime = modTime
2016-05-31 20:23:31 -07:00
}
// Write unique `xl.json` for each disk.
2018-04-05 15:04:40 -07:00
if onlineDisks , err = writeUniqueXLMetadata ( ctx , onlineDisks , minioMetaTmpBucket , tempObj , partsMetadata , writeQuorum ) ; err != nil {
2016-09-03 00:48:35 +05:30
return ObjectInfo { } , toObjectErr ( err , bucket , object )
2016-05-20 20:48:47 -07:00
}
2016-05-29 00:42:09 -07:00
2016-06-01 16:43:31 -07:00
// Rename the successfully written temporary object to final location.
2020-01-16 03:30:32 +01:00
if onlineDisks , err = rename ( ctx , onlineDisks , minioMetaTmpBucket , tempObj , bucket , object , true , writeQuorum , nil ) ; err != nil {
2016-09-03 00:48:35 +05:30
return ObjectInfo { } , toObjectErr ( err , bucket , object )
2016-05-28 15:13:15 -07:00
}
2016-05-29 00:42:09 -07:00
2020-01-16 03:30:32 +01:00
// Whether a disk was initially or becomes offline
// during this upload, send it to the MRF list.
for i := 0 ; i < len ( onlineDisks ) ; i ++ {
if onlineDisks [ i ] == nil || storageDisks [ i ] == nil {
xl . addPartialUpload ( bucket , object )
break
}
}
2017-01-31 00:44:42 +01:00
// Object info is the same in all disks, so we can pick the first meta
// of the first disk
xlMeta = partsMetadata [ 0 ]
2016-09-03 00:48:35 +05:30
objInfo = ObjectInfo {
IsDir : false ,
Bucket : bucket ,
Name : object ,
Size : xlMeta . Stat . Size ,
ModTime : xlMeta . Stat . ModTime ,
2017-05-14 12:05:51 -07:00
ETag : xlMeta . Meta [ "etag" ] ,
2016-09-03 00:48:35 +05:30
ContentType : xlMeta . Meta [ "content-type" ] ,
ContentEncoding : xlMeta . Meta [ "content-encoding" ] ,
UserDefined : xlMeta . Meta ,
}
2017-01-16 19:23:43 -08:00
2016-09-03 00:48:35 +05:30
return objInfo , nil
2016-05-20 20:48:47 -07:00
}
2016-06-01 16:43:31 -07:00
// deleteObject - wrapper for delete object, deletes an object from
// all the disks in parallel, including `xl.json` associated with the
// object.
2018-08-29 13:36:19 -07:00
func ( xl xlObjects ) deleteObject ( ctx context . Context , bucket , object string , writeQuorum int , isDir bool ) error {
var disks [ ] StorageAPI
2018-01-29 18:43:13 -08:00
var err error
2018-02-21 00:33:26 +01:00
2018-08-29 13:36:19 -07:00
tmpObj := mustGetUUID ( )
if bucket == minioMetaTmpBucket {
tmpObj = object
disks = xl . getDisks ( )
} else {
2018-10-05 01:22:49 +01:00
// Rename the current object while requiring write quorum, but also consider
// that a non found object in a given disk as a success since it already
// confirms that the object doesn't have a part in that disk (already removed)
2018-08-29 13:36:19 -07:00
if isDir {
2018-10-05 01:22:49 +01:00
disks , err = rename ( ctx , xl . getDisks ( ) , bucket , object , minioMetaTmpBucket , tmpObj , true , writeQuorum ,
[ ] error { errFileNotFound , errFileAccessDenied } )
2018-08-29 13:36:19 -07:00
} else {
2018-10-05 01:22:49 +01:00
disks , err = rename ( ctx , xl . getDisks ( ) , bucket , object , minioMetaTmpBucket , tmpObj , true , writeQuorum ,
[ ] error { errFileNotFound } )
2018-01-29 18:43:13 -08:00
}
2018-03-27 05:09:28 +05:30
if err != nil {
2018-08-29 13:36:19 -07:00
return toObjectErr ( err , bucket , object )
2018-03-27 05:09:28 +05:30
}
2017-12-22 16:58:13 +05:30
}
2019-10-14 09:44:51 -07:00
g := errgroup . WithNErrs ( len ( disks ) )
2018-08-29 13:36:19 -07:00
2019-10-14 09:44:51 -07:00
for index := range disks {
index := index
g . Go ( func ( ) error {
if disks [ index ] == nil {
return errDiskNotFound
}
var err error
2018-02-21 00:33:26 +01:00
if isDir {
// DeleteFile() simply tries to remove a directory
// and will succeed only if that directory is empty.
2019-10-14 09:44:51 -07:00
err = disks [ index ] . DeleteFile ( minioMetaTmpBucket , tmpObj )
2018-02-21 00:33:26 +01:00
} else {
2019-10-14 09:44:51 -07:00
err = cleanupDir ( ctx , disks [ index ] , minioMetaTmpBucket , tmpObj )
2018-02-21 00:33:26 +01:00
}
2019-10-14 09:44:51 -07:00
if err != nil && err != errVolumeNotFound {
return err
2016-05-25 14:32:49 -07:00
}
2019-10-14 09:44:51 -07:00
return nil
} , index )
2016-05-20 20:48:47 -07:00
}
2018-03-27 05:09:28 +05:30
// return errors if any during deletion
2019-10-14 09:44:51 -07:00
return reduceWriteQuorumErrs ( ctx , g . Wait ( ) , objectOpIgnoredErrs , writeQuorum )
2016-05-20 20:48:47 -07:00
}
2019-05-13 20:25:49 +01:00
// deleteObject - wrapper for delete object, deletes an object from
// all the disks in parallel, including `xl.json` associated with the
// object.
func ( xl xlObjects ) doDeleteObjects ( ctx context . Context , bucket string , objects [ ] string , errs [ ] error , writeQuorums [ ] int , isDirs [ ] bool ) ( [ ] error , error ) {
var tmpObjs = make ( [ ] string , len ( objects ) )
if bucket == minioMetaTmpBucket {
copy ( tmpObjs , objects )
} else {
2020-03-06 13:44:24 -08:00
for idx := range objects {
if errs [ idx ] != nil {
2019-05-13 20:25:49 +01:00
continue
}
2020-03-06 13:44:24 -08:00
tmpObjs [ idx ] = mustGetUUID ( )
2019-05-13 20:25:49 +01:00
var err error
2020-03-06 13:44:24 -08:00
// Rename the current object while requiring
// write quorum, but also consider that a non
// found object in a given disk as a success
// since it already confirms that the object
// doesn't have a part in that disk (already removed)
if isDirs [ idx ] {
_ , err = rename ( ctx , xl . getDisks ( ) , bucket , objects [ idx ] ,
minioMetaTmpBucket , tmpObjs [ idx ] , true , writeQuorums [ idx ] ,
2019-05-13 20:25:49 +01:00
[ ] error { errFileNotFound , errFileAccessDenied } )
} else {
2020-03-06 13:44:24 -08:00
_ , err = rename ( ctx , xl . getDisks ( ) , bucket , objects [ idx ] ,
minioMetaTmpBucket , tmpObjs [ idx ] , true , writeQuorums [ idx ] ,
2019-05-13 20:25:49 +01:00
[ ] error { errFileNotFound } )
}
if err != nil {
2020-03-06 13:44:24 -08:00
errs [ idx ] = err
2019-05-13 20:25:49 +01:00
}
}
}
2020-03-06 13:44:24 -08:00
disks := xl . getDisks ( )
2019-05-13 20:25:49 +01:00
// Initialize list of errors.
var opErrs = make ( [ ] error , len ( disks ) )
var delObjErrs = make ( [ ] [ ] error , len ( disks ) )
2019-05-14 22:43:22 +01:00
// Remove objects in bulk for each disk
2019-05-13 20:25:49 +01:00
for index , disk := range disks {
if disk == nil {
opErrs [ index ] = errDiskNotFound
continue
}
2019-10-15 18:35:41 -07:00
delObjErrs [ index ] , opErrs [ index ] = cleanupObjectsBulk ( disk , minioMetaTmpBucket , tmpObjs , errs )
2020-02-02 07:41:29 +05:30
if opErrs [ index ] == errVolumeNotFound || opErrs [ index ] == errFileNotFound {
2019-05-14 22:43:22 +01:00
opErrs [ index ] = nil
}
2019-05-13 20:25:49 +01:00
}
// Return errors if any during deletion
2020-02-02 07:41:29 +05:30
if err := reduceWriteQuorumErrs ( ctx , opErrs , objectOpIgnoredErrs , len ( disks ) / 2 + 1 ) ; err != nil {
2019-05-13 20:25:49 +01:00
return nil , err
}
// Reduce errors for each object
for objIndex := range objects {
if errs [ objIndex ] != nil {
continue
}
2020-02-02 07:41:29 +05:30
listErrs := make ( [ ] error , len ( disks ) )
2019-05-13 20:25:49 +01:00
for i := range delObjErrs {
if delObjErrs [ i ] != nil {
listErrs [ i ] = delObjErrs [ i ] [ objIndex ]
}
}
errs [ objIndex ] = reduceWriteQuorumErrs ( ctx , listErrs , objectOpIgnoredErrs , writeQuorums [ objIndex ] )
}
return errs , nil
}
func ( xl xlObjects ) deleteObjects ( ctx context . Context , bucket string , objects [ ] string ) ( [ ] error , error ) {
errs := make ( [ ] error , len ( objects ) )
writeQuorums := make ( [ ] int , len ( objects ) )
isObjectDirs := make ( [ ] bool , len ( objects ) )
for i , object := range objects {
errs [ i ] = checkDelObjArgs ( ctx , bucket , object )
}
for i , object := range objects {
2019-12-06 12:46:06 +05:30
isObjectDirs [ i ] = HasSuffix ( object , SlashSeparator )
2019-05-13 20:25:49 +01:00
}
for i , object := range objects {
if isObjectDirs [ i ] {
_ , err := xl . getObjectInfoDir ( ctx , bucket , object )
if err == errXLReadQuorum {
if isObjectDirDangling ( statAllDirs ( ctx , xl . getDisks ( ) , bucket , object ) ) {
// If object is indeed dangling, purge it.
errs [ i ] = nil
}
}
if err != nil {
errs [ i ] = toObjectErr ( err , bucket , object )
continue
}
}
}
2020-03-06 13:44:24 -08:00
for i := range objects {
2019-05-13 20:25:49 +01:00
if errs [ i ] != nil {
continue
}
2020-03-06 13:44:24 -08:00
// Assume (N/2 + 1) quorums for all objects
// this is a theoretical assumption such that
// for delete's we do not need to honor storage
// class for objects which have reduced quorum
// storage class only needs to be honored for
// Read() requests alone which we already do.
writeQuorums [ i ] = len ( xl . getDisks ( ) ) / 2 + 1
2019-05-13 20:25:49 +01:00
}
return xl . doDeleteObjects ( ctx , bucket , objects , errs , writeQuorums , isObjectDirs )
}
// DeleteObjects deletes objects in bulk, this function will still automatically split objects list
// into smaller bulks if some object names are found to be duplicated in the delete list, splitting
// into smaller bulks will avoid holding twice the write lock of the duplicated object names.
func ( xl xlObjects ) DeleteObjects ( ctx context . Context , bucket string , objects [ ] string ) ( [ ] error , error ) {
var (
i , start , end int
// Deletion result for all objects
deleteErrs [ ] error
// Object names store will be used to check for object name duplication
objectNamesStore = make ( map [ string ] interface { } )
)
for {
if i >= len ( objects ) {
break
}
object := objects [ i ]
_ , duplicationFound := objectNamesStore [ object ]
if duplicationFound {
end = i - 1
} else {
objectNamesStore [ object ] = true
end = i
}
if duplicationFound || i == len ( objects ) - 1 {
errs , err := xl . deleteObjects ( ctx , bucket , objects [ start : end + 1 ] )
if err != nil {
return nil , err
}
deleteErrs = append ( deleteErrs , errs ... )
objectNamesStore = make ( map [ string ] interface { } )
}
if duplicationFound {
// Avoid to increase the index if object
// name is found to be duplicated.
start = i
} else {
i ++
}
}
return deleteErrs , nil
}
2016-06-01 16:43:31 -07:00
// DeleteObject - deletes an object, this call doesn't necessary reply
// any error as it is not necessary for the handler to reply back a
// response to the client request.
2018-03-14 12:01:47 -07:00
func ( xl xlObjects ) DeleteObject ( ctx context . Context , bucket , object string ) ( err error ) {
2018-04-05 15:04:40 -07:00
if err = checkDelObjArgs ( ctx , bucket , object ) ; err != nil {
2016-12-01 23:15:17 -08:00
return err
2016-05-20 20:48:47 -07:00
}
2016-09-01 00:09:08 +05:30
2018-08-31 22:16:35 +02:00
var writeQuorum int
2019-12-06 12:46:06 +05:30
var isObjectDir = HasSuffix ( object , SlashSeparator )
2018-08-29 13:36:19 -07:00
2019-04-23 14:54:28 -07:00
if isObjectDir {
_ , err = xl . getObjectInfoDir ( ctx , bucket , object )
if err == errXLReadQuorum {
if isObjectDirDangling ( statAllDirs ( ctx , xl . getDisks ( ) , bucket , object ) ) {
// If object is indeed dangling, purge it.
err = nil
}
}
if err != nil {
return toObjectErr ( err , bucket , object )
}
2018-10-30 16:07:57 -07:00
}
2018-08-31 22:16:35 +02:00
if isObjectDir {
writeQuorum = len ( xl . getDisks ( ) ) / 2 + 1
} else {
// Read metadata associated with the object from all disks.
partsMetadata , errs := readAllXLMetadata ( ctx , xl . getDisks ( ) , bucket , object )
// get Quorum for this object
_ , writeQuorum , err = objectQuorumFromMeta ( ctx , xl , partsMetadata , errs )
if err != nil {
return toObjectErr ( err , bucket , object )
}
2018-08-29 13:36:19 -07:00
}
2016-06-17 10:48:43 +05:30
2016-06-18 00:27:51 +05:30
// Delete the object on all disks.
2018-08-31 22:16:35 +02:00
if err = xl . deleteObject ( ctx , bucket , object , writeQuorum , isObjectDir ) ; err != nil {
2016-06-18 00:27:51 +05:30
return toObjectErr ( err , bucket , object )
2016-06-08 00:05:03 +05:30
}
2016-06-18 00:27:51 +05:30
// Success.
return nil
2016-05-20 20:48:47 -07:00
}
2018-02-09 15:19:30 -08:00
// ListObjectsV2 lists all blobs in bucket filtered by prefix
2018-03-14 12:01:47 -07:00
func ( xl xlObjects ) ListObjectsV2 ( ctx context . Context , bucket , prefix , continuationToken , delimiter string , maxKeys int , fetchOwner bool , startAfter string ) ( result ListObjectsV2Info , err error ) {
2018-07-01 14:22:45 +10:00
marker := continuationToken
if marker == "" {
marker = startAfter
}
loi , err := xl . ListObjects ( ctx , bucket , prefix , marker , delimiter , maxKeys )
2018-02-09 15:19:30 -08:00
if err != nil {
return result , err
}
listObjectsV2Info := ListObjectsV2Info {
IsTruncated : loi . IsTruncated ,
ContinuationToken : continuationToken ,
NextContinuationToken : loi . NextMarker ,
Objects : loi . Objects ,
Prefixes : loi . Prefixes ,
}
return listObjectsV2Info , err
}
2020-01-16 03:30:32 +01:00
2020-01-20 22:15:59 +05:30
// Send the successful but partial upload, however ignore
2020-01-16 03:30:32 +01:00
// if the channel is blocked by other items.
func ( xl xlObjects ) addPartialUpload ( bucket , key string ) {
select {
case xl . mrfUploadCh <- partialUpload { bucket : bucket , object : key } :
default :
}
}
2020-01-20 22:15:59 +05:30
// PutObjectTag - replace or add tags to an existing object
func ( xl xlObjects ) PutObjectTag ( ctx context . Context , bucket , object string , tags string ) error {
disks := xl . getDisks ( )
// Read metadata associated with the object from all disks.
metaArr , errs := readAllXLMetadata ( ctx , disks , bucket , object )
_ , writeQuorum , err := objectQuorumFromMeta ( ctx , xl , metaArr , errs )
if err != nil {
return err
}
for i , xlMeta := range metaArr {
// clean xlMeta.Meta of tag key, before updating the new tags
delete ( xlMeta . Meta , xhttp . AmzObjectTagging )
// Don't update for empty tags
if tags != "" {
xlMeta . Meta [ xhttp . AmzObjectTagging ] = tags
}
metaArr [ i ] . Meta = xlMeta . Meta
}
tempObj := mustGetUUID ( )
// Write unique `xl.json` for each disk.
if disks , err = writeUniqueXLMetadata ( ctx , disks , minioMetaTmpBucket , tempObj , metaArr , writeQuorum ) ; err != nil {
return toObjectErr ( err , bucket , object )
}
// Atomically rename `xl.json` from tmp location to destination for each disk.
if _ , err = renameXLMetadata ( ctx , disks , minioMetaTmpBucket , tempObj , bucket , object , writeQuorum ) ; err != nil {
return toObjectErr ( err , bucket , object )
}
return nil
}
// DeleteObjectTag - delete object tags from an existing object
func ( xl xlObjects ) DeleteObjectTag ( ctx context . Context , bucket , object string ) error {
return xl . PutObjectTag ( ctx , bucket , object , "" )
}
// GetObjectTag - get object tags from an existing object
func ( xl xlObjects ) GetObjectTag ( ctx context . Context , bucket , object string ) ( tagging . Tagging , error ) {
// GetObjectInfo will return tag value as well
oi , err := xl . GetObjectInfo ( ctx , bucket , object , ObjectOptions { } )
if err != nil {
return tagging . Tagging { } , err
}
tags , err := tagging . FromString ( oi . UserTags )
if err != nil {
return tagging . Tagging { } , err
}
return tags , nil
}