2019-11-19 17:42:27 -08:00
/ *
2020-07-08 17:36:56 -07:00
* MinIO Cloud Storage , ( C ) 2019 , 2020 MinIO , Inc .
2019-11-19 17:42:27 -08:00
*
* Licensed under the Apache License , Version 2.0 ( the "License" ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
*
* http : //www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an "AS IS" BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
* /
package cmd
import (
"context"
2020-09-16 21:14:35 -07:00
"errors"
2019-11-19 17:42:27 -08:00
"fmt"
"io"
"math/rand"
"net/http"
2020-12-15 17:34:54 -08:00
"sort"
2020-08-13 15:21:20 -07:00
"strconv"
2020-11-13 16:58:20 -08:00
"strings"
2019-12-12 15:02:37 +01:00
"sync"
2020-03-19 00:19:29 +01:00
"time"
2019-11-19 17:42:27 -08:00
2020-07-14 17:38:05 +01:00
"github.com/minio/minio-go/v7/pkg/set"
"github.com/minio/minio-go/v7/pkg/tags"
2020-05-23 17:38:39 -07:00
"github.com/minio/minio/cmd/config/storageclass"
2019-11-19 17:42:27 -08:00
"github.com/minio/minio/cmd/logger"
2021-01-07 09:52:53 -08:00
"github.com/minio/minio/pkg/color"
2019-11-19 17:42:27 -08:00
"github.com/minio/minio/pkg/madmin"
"github.com/minio/minio/pkg/sync/errgroup"
)
2020-12-01 13:50:33 -08:00
type erasureServerPools struct {
2020-05-19 13:53:54 -07:00
GatewayUnsupported
2020-12-01 13:50:33 -08:00
serverPools [ ] * erasureSets
2020-09-10 09:18:19 -07:00
// Shut down async operations
shutdown context . CancelFunc
2019-11-19 17:42:27 -08:00
}
2021-01-26 20:47:42 -08:00
func ( z * erasureServerPools ) SinglePool ( ) bool {
2020-12-01 13:50:33 -08:00
return len ( z . serverPools ) == 1
2019-11-19 17:42:27 -08:00
}
2021-01-06 09:35:47 -08:00
// Initialize new pool of erasure sets.
2020-12-01 13:50:33 -08:00
func newErasureServerPools ( ctx context . Context , endpointServerPools EndpointServerPools ) ( ObjectLayer , error ) {
2019-11-21 04:24:51 -08:00
var (
2021-01-19 10:01:31 -08:00
deploymentID string
distributionAlgo string
commonParityDrives int
err error
2019-11-21 04:24:51 -08:00
2020-12-01 13:50:33 -08:00
formats = make ( [ ] * formatErasureV3 , len ( endpointServerPools ) )
storageDisks = make ( [ ] [ ] StorageAPI , len ( endpointServerPools ) )
z = & erasureServerPools { serverPools : make ( [ ] * erasureSets , len ( endpointServerPools ) ) }
2019-11-21 04:24:51 -08:00
)
2020-04-27 19:06:21 +02:00
var localDrives [ ] string
2020-12-01 13:50:33 -08:00
local := endpointServerPools . FirstLocal ( )
for i , ep := range endpointServerPools {
2020-04-27 19:06:21 +02:00
for _ , endpoint := range ep . Endpoints {
if endpoint . IsLocal {
localDrives = append ( localDrives , endpoint . Path )
}
}
2021-01-19 10:01:31 -08:00
2021-01-29 11:40:55 -08:00
// If storage class is not set during startup, default values are used
// -- Default for Reduced Redundancy Storage class is, parity = 2
// -- Default for Standard Storage class is, parity = 2 - disks 4, 5
// -- Default for Standard Storage class is, parity = 3 - disks 6, 7
// -- Default for Standard Storage class is, parity = 4 - disks 8 to 16
2021-01-19 10:01:31 -08:00
if commonParityDrives == 0 {
commonParityDrives = ecDrivesNoConfig ( ep . DrivesPerSet )
}
2021-01-29 11:40:55 -08:00
if err = storageclass . ValidateParity ( commonParityDrives , ep . DrivesPerSet ) ; err != nil {
return nil , fmt . Errorf ( "All current serverPools should have same parity ratio - expected %d, got %d" , commonParityDrives , ecDrivesNoConfig ( ep . DrivesPerSet ) )
2021-01-19 10:01:31 -08:00
}
2020-06-12 20:04:01 -07:00
storageDisks [ i ] , formats [ i ] , err = waitForFormatErasure ( local , ep . Endpoints , i + 1 ,
2021-01-19 10:01:31 -08:00
ep . SetCount , ep . DrivesPerSet , deploymentID , distributionAlgo )
2019-11-21 04:24:51 -08:00
if err != nil {
return nil , err
}
2021-01-19 10:01:31 -08:00
2019-11-21 04:24:51 -08:00
if deploymentID == "" {
2021-01-16 12:08:02 -08:00
// all zones should have same deployment ID
2019-11-21 04:24:51 -08:00
deploymentID = formats [ i ] . ID
}
2021-01-19 10:01:31 -08:00
if distributionAlgo == "" {
distributionAlgo = formats [ i ] . Erasure . DistributionAlgo
}
// Validate if users brought different DeploymentID pools.
if deploymentID != formats [ i ] . ID {
return nil , fmt . Errorf ( "All serverPools should have same deployment ID expected %s, got %s" , deploymentID , formats [ i ] . ID )
}
// Validate if users brought different different distribution algo pools.
if distributionAlgo != formats [ i ] . Erasure . DistributionAlgo {
return nil , fmt . Errorf ( "All serverPools should have same distributionAlgo expected %s, got %s" , distributionAlgo , formats [ i ] . Erasure . DistributionAlgo )
}
2021-01-29 11:40:55 -08:00
z . serverPools [ i ] , err = newErasureSets ( ctx , ep . Endpoints , storageDisks [ i ] , formats [ i ] , commonParityDrives )
2019-11-19 17:42:27 -08:00
if err != nil {
return nil , err
}
2021-01-26 22:21:51 +01:00
z . serverPools [ i ] . poolNumber = i
2019-11-19 17:42:27 -08:00
}
2020-09-10 09:18:19 -07:00
ctx , z . shutdown = context . WithCancel ( ctx )
2020-08-20 13:17:42 -07:00
go intDataUpdateTracker . start ( ctx , localDrives ... )
2019-11-19 17:42:27 -08:00
return z , nil
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) NewNSLock ( bucket string , objects ... string ) RWLocker {
return z . serverPools [ 0 ] . NewNSLock ( bucket , objects ... )
2019-11-19 17:42:27 -08:00
}
2020-12-01 12:07:39 -08:00
// GetDisksID will return disks by their ID.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) GetDisksID ( ids ... string ) [ ] StorageAPI {
2020-12-01 12:07:39 -08:00
idMap := make ( map [ string ] struct { } )
for _ , id := range ids {
idMap [ id ] = struct { } { }
}
res := make ( [ ] StorageAPI , 0 , len ( idMap ) )
2020-12-11 16:58:36 -08:00
for _ , s := range z . serverPools {
s . erasureDisksMu . RLock ( )
defer s . erasureDisksMu . RUnlock ( )
for _ , disks := range s . erasureDisks {
2020-12-01 12:07:39 -08:00
for _ , disk := range disks {
2020-12-11 16:58:36 -08:00
if disk == OfflineDisk {
continue
}
if id , _ := disk . GetDiskID ( ) ; id != "" {
if _ , ok := idMap [ id ] ; ok {
res = append ( res , disk )
}
2020-12-01 12:07:39 -08:00
}
}
}
}
return res
}
2021-01-22 12:09:24 -08:00
func ( z * erasureServerPools ) SetDriveCounts ( ) [ ] int {
setDriveCounts := make ( [ ] int , len ( z . serverPools ) )
for i := range z . serverPools {
setDriveCounts [ i ] = z . serverPools [ i ] . SetDriveCount ( )
}
return setDriveCounts
2020-08-05 13:31:12 -07:00
}
2021-01-06 09:35:47 -08:00
type serverPoolsAvailableSpace [ ] poolAvailableSpace
2019-11-19 17:42:27 -08:00
2021-01-06 09:35:47 -08:00
type poolAvailableSpace struct {
2019-11-19 17:42:27 -08:00
Index int
Available uint64
}
// TotalAvailable - total available space
2020-12-01 13:50:33 -08:00
func ( p serverPoolsAvailableSpace ) TotalAvailable ( ) uint64 {
2019-11-19 17:42:27 -08:00
total := uint64 ( 0 )
for _ , z := range p {
total += z . Available
}
return total
}
2021-01-26 20:47:42 -08:00
// getAvailablePoolIdx will return an index that can hold size bytes.
2020-12-01 13:50:33 -08:00
// -1 is returned if no serverPools have available space for the size given.
2021-01-26 20:47:42 -08:00
func ( z * erasureServerPools ) getAvailablePoolIdx ( ctx context . Context , size int64 ) int {
2020-12-01 13:50:33 -08:00
serverPools := z . getServerPoolsAvailableSpace ( ctx , size )
total := serverPools . TotalAvailable ( )
2019-11-19 17:42:27 -08:00
if total == 0 {
2020-06-20 06:36:44 -07:00
return - 1
2019-11-19 17:42:27 -08:00
}
// choose when we reach this many
choose := rand . Uint64 ( ) % total
atTotal := uint64 ( 0 )
2021-01-06 09:35:47 -08:00
for _ , pool := range serverPools {
atTotal += pool . Available
if atTotal > choose && pool . Available > 0 {
return pool . Index
2019-11-19 17:42:27 -08:00
}
}
// Should not happen, but print values just in case.
2020-12-01 13:50:33 -08:00
logger . LogIf ( ctx , fmt . Errorf ( "reached end of serverPools (total: %v, atTotal: %v, choose: %v)" , total , atTotal , choose ) )
2020-06-20 06:36:44 -07:00
return - 1
2019-11-19 17:42:27 -08:00
}
2021-01-06 09:35:47 -08:00
// getServerPoolsAvailableSpace will return the available space of each pool after storing the content.
// If there is not enough space the pool will return 0 bytes available.
2020-06-20 06:36:44 -07:00
// Negative sizes are seen as 0 bytes.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) getServerPoolsAvailableSpace ( ctx context . Context , size int64 ) serverPoolsAvailableSpace {
2020-06-20 06:36:44 -07:00
if size < 0 {
size = 0
}
2020-12-01 13:50:33 -08:00
var serverPools = make ( serverPoolsAvailableSpace , len ( z . serverPools ) )
2019-11-19 17:42:27 -08:00
2020-12-01 13:50:33 -08:00
storageInfos := make ( [ ] StorageInfo , len ( z . serverPools ) )
g := errgroup . WithNErrs ( len ( z . serverPools ) )
for index := range z . serverPools {
2019-11-19 17:42:27 -08:00
index := index
g . Go ( func ( ) error {
2020-12-01 13:50:33 -08:00
storageInfos [ index ] = z . serverPools [ index ] . StorageUsageInfo ( ctx )
2019-11-19 17:42:27 -08:00
return nil
} , index )
}
// Wait for the go routines.
g . Wait ( )
for i , zinfo := range storageInfos {
var available uint64
2020-06-20 06:36:44 -07:00
var total uint64
2020-07-13 09:51:07 -07:00
for _ , disk := range zinfo . Disks {
total += disk . TotalSpace
available += disk . TotalSpace - disk . UsedSpace
2020-06-20 06:36:44 -07:00
}
// Make sure we can fit "size" on to the disk without getting above the diskFillFraction
if available < uint64 ( size ) {
available = 0
}
if available > 0 {
// How much will be left after adding the file.
available -= - uint64 ( size )
// wantLeft is how much space there at least must be left.
wantLeft := uint64 ( float64 ( total ) * ( 1.0 - diskFillFraction ) )
if available <= wantLeft {
available = 0
}
}
2021-01-06 09:35:47 -08:00
serverPools [ i ] = poolAvailableSpace {
2019-11-19 17:42:27 -08:00
Index : i ,
Available : available ,
}
}
2020-12-01 13:50:33 -08:00
return serverPools
2019-11-19 17:42:27 -08:00
}
2021-01-26 20:47:42 -08:00
// getPoolIdx returns the found previous object and its corresponding pool idx,
2021-01-06 09:35:47 -08:00
// if none are found falls back to most available space pool.
2021-01-26 20:47:42 -08:00
func ( z * erasureServerPools ) getPoolIdx ( ctx context . Context , bucket , object string , opts ObjectOptions , size int64 ) ( idx int , err error ) {
if z . SinglePool ( ) {
2020-06-17 08:33:14 -07:00
return 0 , nil
}
2021-01-06 09:35:47 -08:00
for i , pool := range z . serverPools {
objInfo , err := pool . GetObjectInfo ( ctx , bucket , object , opts )
2020-06-17 08:33:14 -07:00
switch err . ( type ) {
case ObjectNotFound :
// VersionId was not specified but found delete marker or no versions exist.
case MethodNotAllowed :
// VersionId was specified but found delete marker
default :
if err != nil {
// any other un-handled errors return right here.
return - 1 , err
}
}
// delete marker not specified means no versions
2021-01-06 09:35:47 -08:00
// exist continue to next pool.
2020-06-17 08:33:14 -07:00
if ! objInfo . DeleteMarker && err != nil {
continue
}
// Success case and when DeleteMarker is true return.
return i , nil
}
2020-06-20 06:36:44 -07:00
// We multiply the size by 2 to account for erasure coding.
2021-01-26 20:47:42 -08:00
idx = z . getAvailablePoolIdx ( ctx , size * 2 )
2020-06-20 06:36:44 -07:00
if idx < 0 {
return - 1 , toObjectErr ( errDiskFull )
}
return idx , nil
2020-06-17 08:33:14 -07:00
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) Shutdown ( ctx context . Context ) error {
2020-09-10 09:18:19 -07:00
defer z . shutdown ( )
2019-11-19 17:42:27 -08:00
2020-12-01 13:50:33 -08:00
g := errgroup . WithNErrs ( len ( z . serverPools ) )
2019-11-19 17:42:27 -08:00
2020-12-01 13:50:33 -08:00
for index := range z . serverPools {
2019-11-19 17:42:27 -08:00
index := index
g . Go ( func ( ) error {
2020-12-01 13:50:33 -08:00
return z . serverPools [ index ] . Shutdown ( ctx )
2019-11-19 17:42:27 -08:00
} , index )
}
for _ , err := range g . Wait ( ) {
if err != nil {
logger . LogIf ( ctx , err )
}
// let's the rest shutdown
}
return nil
}
2020-12-21 18:35:19 +01:00
func ( z * erasureServerPools ) BackendInfo ( ) ( b BackendInfo ) {
b . Type = BackendErasure
scParity := globalStorageClass . GetParityForSC ( storageclass . STANDARD )
2021-01-16 12:08:02 -08:00
if scParity <= 0 {
scParity = z . serverPools [ 0 ] . defaultParityCount
2020-12-21 18:35:19 +01:00
}
rrSCParity := globalStorageClass . GetParityForSC ( storageclass . RRS )
2021-01-22 12:09:24 -08:00
// Data blocks can vary per pool, but parity is same.
for _ , setDriveCount := range z . SetDriveCounts ( ) {
b . StandardSCData = append ( b . StandardSCData , setDriveCount - scParity )
b . RRSCData = append ( b . RRSCData , setDriveCount - rrSCParity )
}
b . StandardSCParity = scParity
2020-12-21 18:35:19 +01:00
b . RRSCParity = rrSCParity
return
}
2021-01-04 09:42:09 -08:00
func ( z * erasureServerPools ) StorageInfo ( ctx context . Context ) ( StorageInfo , [ ] error ) {
2019-11-19 17:42:27 -08:00
var storageInfo StorageInfo
2020-12-01 13:50:33 -08:00
storageInfos := make ( [ ] StorageInfo , len ( z . serverPools ) )
storageInfosErrs := make ( [ ] [ ] error , len ( z . serverPools ) )
g := errgroup . WithNErrs ( len ( z . serverPools ) )
for index := range z . serverPools {
2019-11-19 17:42:27 -08:00
index := index
g . Go ( func ( ) error {
2021-01-04 09:42:09 -08:00
storageInfos [ index ] , storageInfosErrs [ index ] = z . serverPools [ index ] . StorageInfo ( ctx )
2019-11-19 17:42:27 -08:00
return nil
} , index )
}
// Wait for the go routines.
g . Wait ( )
2020-12-21 18:35:19 +01:00
storageInfo . Backend = z . BackendInfo ( )
2019-11-19 17:42:27 -08:00
for _ , lstorageInfo := range storageInfos {
2020-07-13 09:51:07 -07:00
storageInfo . Disks = append ( storageInfo . Disks , lstorageInfo . Disks ... )
2020-10-22 13:36:24 -07:00
}
2020-05-28 13:03:04 -07:00
var errs [ ] error
2020-12-01 13:50:33 -08:00
for i := range z . serverPools {
2020-05-28 13:03:04 -07:00
errs = append ( errs , storageInfosErrs [ i ] ... )
}
return storageInfo , errs
2019-11-19 17:42:27 -08:00
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) CrawlAndGetDataUsage ( ctx context . Context , bf * bloomFilter , updates chan <- DataUsageInfo ) error {
2020-03-19 00:19:29 +01:00
ctx , cancel := context . WithCancel ( ctx )
defer cancel ( )
2020-06-12 20:04:01 -07:00
2019-12-12 15:02:37 +01:00
var wg sync . WaitGroup
2020-03-19 00:19:29 +01:00
var mu sync . Mutex
var results [ ] dataUsageCache
var firstErr error
2020-12-15 17:34:54 -08:00
allBuckets , err := z . ListBuckets ( ctx )
if err != nil {
return err
}
2021-01-07 09:52:53 -08:00
if len ( allBuckets ) == 0 {
logger . Info ( color . Green ( "data-crawl:" ) + " No buckets found, skipping crawl" )
updates <- DataUsageInfo { } // no buckets found update data usage to reflect latest state
return nil
}
2020-12-15 17:34:54 -08:00
// Crawl latest allBuckets first.
sort . Slice ( allBuckets , func ( i , j int ) bool {
return allBuckets [ i ] . Created . After ( allBuckets [ j ] . Created )
} )
2020-03-19 00:19:29 +01:00
2020-12-01 13:50:33 -08:00
// Collect for each set in serverPools.
for _ , z := range z . serverPools {
2020-09-24 09:53:38 -07:00
for _ , erObj := range z . sets {
2020-03-19 00:19:29 +01:00
wg . Add ( 1 )
results = append ( results , dataUsageCache { } )
2020-06-12 20:04:01 -07:00
go func ( i int , erObj * erasureObjects ) {
2020-03-19 00:19:29 +01:00
updates := make ( chan dataUsageCache , 1 )
defer close ( updates )
// Start update collector.
go func ( ) {
defer wg . Done ( )
for info := range updates {
mu . Lock ( )
results [ i ] = info
mu . Unlock ( )
}
} ( )
// Start crawler. Blocks until done.
2020-12-15 17:34:54 -08:00
err := erObj . crawlAndGetDataUsage ( ctx , allBuckets , bf , updates )
2020-03-19 00:19:29 +01:00
if err != nil {
2020-06-12 10:28:21 -07:00
logger . LogIf ( ctx , err )
2020-03-19 00:19:29 +01:00
mu . Lock ( )
if firstErr == nil {
firstErr = err
}
// Cancel remaining...
cancel ( )
mu . Unlock ( )
return
2019-12-12 15:02:37 +01:00
}
2020-06-12 20:04:01 -07:00
} ( len ( results ) - 1 , erObj )
2019-12-12 15:02:37 +01:00
}
}
2020-03-19 00:19:29 +01:00
updateCloser := make ( chan chan struct { } )
go func ( ) {
updateTicker := time . NewTicker ( 30 * time . Second )
defer updateTicker . Stop ( )
var lastUpdate time . Time
2020-07-14 18:59:05 -07:00
2021-01-06 09:35:47 -08:00
// We need to merge since we will get the same buckets from each pool.
2020-07-14 18:59:05 -07:00
// Therefore to get the exact bucket sizes we must merge before we can convert.
2020-07-24 11:02:10 -07:00
var allMerged dataUsageCache
2020-07-14 18:59:05 -07:00
2020-03-19 00:19:29 +01:00
update := func ( ) {
mu . Lock ( )
defer mu . Unlock ( )
2020-07-24 11:02:10 -07:00
allMerged = dataUsageCache { Info : dataUsageCacheInfo { Name : dataUsageRoot } }
2020-03-19 00:19:29 +01:00
for _ , info := range results {
if info . Info . LastUpdate . IsZero ( ) {
// Not filled yet.
return
}
allMerged . merge ( info )
}
if allMerged . root ( ) != nil && allMerged . Info . LastUpdate . After ( lastUpdate ) {
updates <- allMerged . dui ( allMerged . Info . Name , allBuckets )
lastUpdate = allMerged . Info . LastUpdate
}
}
for {
select {
case <- ctx . Done ( ) :
return
case v := <- updateCloser :
update ( )
2020-07-14 18:59:05 -07:00
// Enforce quotas when all is done.
2020-08-24 10:15:46 -07:00
if firstErr == nil {
for _ , b := range allBuckets {
enforceFIFOQuotaBucket ( ctx , z , b . Name , allMerged . bucketUsageInfo ( b . Name ) )
}
2020-07-14 18:59:05 -07:00
}
2020-03-19 00:19:29 +01:00
close ( v )
return
case <- updateTicker . C :
update ( )
}
}
} ( )
2019-12-12 15:02:37 +01:00
wg . Wait ( )
2020-03-19 00:19:29 +01:00
ch := make ( chan struct { } )
2020-06-12 10:28:21 -07:00
select {
case updateCloser <- ch :
<- ch
case <- ctx . Done ( ) :
2020-08-24 10:15:46 -07:00
if firstErr == nil {
firstErr = ctx . Err ( )
}
2020-06-12 10:28:21 -07:00
}
2020-03-19 00:19:29 +01:00
return firstErr
2019-12-12 15:02:37 +01:00
}
2020-12-01 13:50:33 -08:00
// MakeBucketWithLocation - creates a new bucket across all serverPools simultaneously
2019-11-19 17:42:27 -08:00
// even if one of the sets fail to create buckets, we proceed all the successful
// operations.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) MakeBucketWithLocation ( ctx context . Context , bucket string , opts BucketOptions ) error {
g := errgroup . WithNErrs ( len ( z . serverPools ) )
2019-11-19 17:42:27 -08:00
// Create buckets in parallel across all sets.
2020-12-01 13:50:33 -08:00
for index := range z . serverPools {
2019-11-19 17:42:27 -08:00
index := index
g . Go ( func ( ) error {
2020-12-01 13:50:33 -08:00
return z . serverPools [ index ] . MakeBucketWithLocation ( ctx , bucket , opts )
2019-11-19 17:42:27 -08:00
} , index )
}
errs := g . Wait ( )
2020-05-12 23:20:42 +01:00
// Return the first encountered error
2019-11-19 17:42:27 -08:00
for _ , err := range errs {
if err != nil {
return err
}
}
2020-05-19 13:53:54 -07:00
// If it doesn't exist we get a new, so ignore errors
2020-05-20 10:18:15 -07:00
meta := newBucketMetadata ( bucket )
2020-06-12 20:04:01 -07:00
if opts . LockEnabled {
meta . VersioningConfigXML = enabledBucketVersioningConfig
2020-05-21 11:03:59 -07:00
meta . ObjectLockConfigXML = enabledBucketObjectLockConfig
2020-05-20 10:18:15 -07:00
}
2020-06-12 20:04:01 -07:00
2020-05-19 13:53:54 -07:00
if err := meta . Save ( ctx , z ) ; err != nil {
return toObjectErr ( err , bucket )
2020-05-08 13:44:44 -07:00
}
2020-06-12 20:04:01 -07:00
2020-05-19 13:53:54 -07:00
globalBucketMetadataSys . Set ( bucket , meta )
2020-05-08 13:44:44 -07:00
2019-11-19 17:42:27 -08:00
// Success.
return nil
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) GetObjectNInfo ( ctx context . Context , bucket , object string , rs * HTTPRangeSpec , h http . Header , lockType LockType , opts ObjectOptions ) ( gr * GetObjectReader , err error ) {
2020-10-06 12:03:57 -07:00
if err = checkGetObjArgs ( ctx , bucket , object ) ; err != nil {
return nil , err
}
2020-09-19 08:39:41 -07:00
object = encodeDirObject ( object )
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
gr , err = pool . GetObjectNInfo ( ctx , bucket , object , rs , h , lockType , opts )
2019-11-19 17:42:27 -08:00
if err != nil {
2020-07-02 16:17:27 -07:00
if isErrObjectNotFound ( err ) || isErrVersionNotFound ( err ) {
2019-11-19 17:42:27 -08:00
continue
}
2020-07-02 16:17:27 -07:00
return gr , err
2019-11-19 17:42:27 -08:00
}
return gr , nil
}
2020-07-02 16:17:27 -07:00
if opts . VersionID != "" {
return gr , VersionNotFound { Bucket : bucket , Object : object , VersionID : opts . VersionID }
}
return gr , ObjectNotFound { Bucket : bucket , Object : object }
2019-11-19 17:42:27 -08:00
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) GetObject ( ctx context . Context , bucket , object string , startOffset int64 , length int64 , writer io . Writer , etag string , opts ObjectOptions ) error {
2020-10-06 12:03:57 -07:00
if err := checkGetObjArgs ( ctx , bucket , object ) ; err != nil {
return err
}
2020-09-19 08:39:41 -07:00
object = encodeDirObject ( object )
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
if err := pool . GetObject ( ctx , bucket , object , startOffset , length , writer , etag , opts ) ; err != nil {
2020-07-02 16:17:27 -07:00
if isErrObjectNotFound ( err ) || isErrVersionNotFound ( err ) {
2019-11-19 17:42:27 -08:00
continue
}
return err
}
return nil
}
2020-09-14 15:57:13 -07:00
if opts . VersionID != "" {
return VersionNotFound { Bucket : bucket , Object : object , VersionID : opts . VersionID }
}
2019-11-19 17:42:27 -08:00
return ObjectNotFound { Bucket : bucket , Object : object }
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) GetObjectInfo ( ctx context . Context , bucket , object string , opts ObjectOptions ) ( objInfo ObjectInfo , err error ) {
2020-10-06 12:03:57 -07:00
if err = checkGetObjArgs ( ctx , bucket , object ) ; err != nil {
return objInfo , err
}
2020-09-19 08:39:41 -07:00
object = encodeDirObject ( object )
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
objInfo , err = pool . GetObjectInfo ( ctx , bucket , object , opts )
2019-11-19 17:42:27 -08:00
if err != nil {
2020-07-02 16:17:27 -07:00
if isErrObjectNotFound ( err ) || isErrVersionNotFound ( err ) {
2019-11-19 17:42:27 -08:00
continue
}
return objInfo , err
}
return objInfo , nil
}
2020-09-19 08:39:41 -07:00
object = decodeDirObject ( object )
2020-07-02 16:17:27 -07:00
if opts . VersionID != "" {
return objInfo , VersionNotFound { Bucket : bucket , Object : object , VersionID : opts . VersionID }
}
return objInfo , ObjectNotFound { Bucket : bucket , Object : object }
2019-11-19 17:42:27 -08:00
}
2021-01-06 09:35:47 -08:00
// PutObject - writes an object to least used erasure pool.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) PutObject ( ctx context . Context , bucket string , object string , data * PutObjReader , opts ObjectOptions ) ( ObjectInfo , error ) {
2020-10-06 12:03:57 -07:00
// Validate put object input args.
if err := checkPutObjectArgs ( ctx , bucket , object , z ) ; err != nil {
return ObjectInfo { } , err
}
2020-09-19 08:39:41 -07:00
object = encodeDirObject ( object )
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
return z . serverPools [ 0 ] . PutObject ( ctx , bucket , object , data , opts )
2019-11-19 17:42:27 -08:00
}
2021-01-26 20:47:42 -08:00
idx , err := z . getPoolIdx ( ctx , bucket , object , opts , data . Size ( ) )
2020-06-17 08:33:14 -07:00
if err != nil {
return ObjectInfo { } , err
2019-11-19 17:42:27 -08:00
}
2020-06-17 08:33:14 -07:00
2021-01-06 09:35:47 -08:00
// Overwrite the object at the right pool
2020-12-01 13:50:33 -08:00
return z . serverPools [ idx ] . PutObject ( ctx , bucket , object , data , opts )
2019-11-19 17:42:27 -08:00
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) DeleteObject ( ctx context . Context , bucket string , object string , opts ObjectOptions ) ( objInfo ObjectInfo , err error ) {
2020-10-06 12:03:57 -07:00
if err = checkDelObjArgs ( ctx , bucket , object ) ; err != nil {
return objInfo , err
}
2020-09-19 08:39:41 -07:00
object = encodeDirObject ( object )
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
return z . serverPools [ 0 ] . DeleteObject ( ctx , bucket , object , opts )
2019-11-19 17:42:27 -08:00
}
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
objInfo , err = pool . DeleteObject ( ctx , bucket , object , opts )
2020-06-12 20:04:01 -07:00
if err == nil {
return objInfo , nil
}
2020-07-02 16:17:27 -07:00
if err != nil && ! isErrObjectNotFound ( err ) && ! isErrVersionNotFound ( err ) {
2020-06-12 20:04:01 -07:00
break
2019-11-19 17:42:27 -08:00
}
}
2020-10-08 12:32:32 -07:00
2020-06-12 20:04:01 -07:00
return objInfo , err
2019-11-19 17:42:27 -08:00
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) DeleteObjects ( ctx context . Context , bucket string , objects [ ] ObjectToDelete , opts ObjectOptions ) ( [ ] DeletedObject , [ ] error ) {
2019-11-19 17:42:27 -08:00
derrs := make ( [ ] error , len ( objects ) )
2020-06-12 20:04:01 -07:00
dobjects := make ( [ ] DeletedObject , len ( objects ) )
2020-06-18 10:25:07 -07:00
objSets := set . NewStringSet ( )
2019-11-19 17:42:27 -08:00
for i := range derrs {
2020-09-19 08:39:41 -07:00
objects [ i ] . ObjectName = encodeDirObject ( objects [ i ] . ObjectName )
2020-06-12 20:04:01 -07:00
derrs [ i ] = checkDelObjArgs ( ctx , bucket , objects [ i ] . ObjectName )
2020-06-18 10:25:07 -07:00
objSets . Add ( objects [ i ] . ObjectName )
2019-11-19 17:42:27 -08:00
}
2020-02-21 11:29:57 +05:30
// Acquire a bulk write lock across 'objects'
2020-11-04 08:25:42 -08:00
multiDeleteLock := z . NewNSLock ( bucket , objSets . ToSlice ( ) ... )
if err := multiDeleteLock . GetLock ( ctx , globalOperationTimeout ) ; err != nil {
2020-06-12 20:04:01 -07:00
for i := range derrs {
derrs [ i ] = err
}
DeletedObjects: Return objects on lock failure (#10874)
Return objects when locking fails.
<details>
<summary>Panic</summary>
```
: 2020/11/10 04:15:55 http: panic serving 10.10.62.153:44858: runtime error: index out of range [0] with length 0
: goroutine 363537270 [running]:
: net/http.(*conn).serve.func1(0xc019232780)
: net/http/server.go:1801 +0x147
: panic(0x1cadd60, 0xc001719260)
: runtime/panic.go:975 +0x47a
: github.com/minio/minio/cmd.criticalErrorHandler.ServeHTTP.func1(0xc0121d1200, 0x210cda0, 0xc0141940e0)
: github.com/minio/minio/cmd/generic-handlers.go:781 +0x1a8
: panic(0x1cadd60, 0xc001719260)
: runtime/panic.go:969 +0x1b9
: github.com/minio/minio/cmd.objectAPIHandlers.DeleteMultipleObjectsHandler(0x1e71ce8, 0x1e71cc8, 0x2108420, 0xc0192328c0, 0xc0121d1400)
: github.com/minio/minio/cmd/bucket-handlers.go:465 +0x2490
: net/http.HandlerFunc.ServeHTTP(...)
: net/http/server.go:2042
: github.com/minio/minio/cmd.httpTraceAll.func1(0x2108420, 0xc0192328c0, 0xc0121d1400)
: github.com/minio/minio/cmd/handler-utils.go:353 +0x158
: net/http.HandlerFunc.ServeHTTP(...)
: net/http/server.go:2042
: github.com/minio/minio/cmd.collectAPIStats.func1(0x2108420, 0xc019232820, 0xc0121d1400)
: github.com/minio/minio/cmd/handler-utils.go:380 +0xed
: net/http.HandlerFunc.ServeHTTP(...)
: net/http/server.go:2042
: github.com/minio/minio/cmd.maxClients.func1(0x2108420, 0xc019232820, 0xc0121d1400)
: github.com/minio/minio/cmd/handler-api.go:132 +0x33b
: net/http.HandlerFunc.ServeHTTP(0xc00271d590, 0x2108420, 0xc019232820, 0xc0121d1400)
: net/http/server.go:2042 +0x44
: github.com/minio/minio/cmd.redirectHandler.ServeHTTP(0x20e2180, 0xc00271d590, 0x2108420, 0xc019232820, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:192 +0x156
: github.com/minio/minio/cmd.customHeaderHandler.ServeHTTP(0x20e1060, 0xc0141a22b0, 0x21083e0, 0xc01814d2e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:751 +0x162
: github.com/minio/minio/cmd.securityHeaderHandler.ServeHTTP(0x20e0fc0, 0xc0141a22c0, 0x21083e0, 0xc01814d2e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:766 +0x1d6
: github.com/minio/minio/cmd.bucketForwardingHandler.ServeHTTP(0xc0121c7a40, 0x20e1120, 0xc0141a22d0, 0x21083e0, 0xc01814d2e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:624 +0xbf
: github.com/minio/minio/cmd.requestValidityHandler.ServeHTTP(0x20e0f20, 0xc01814d280, 0x21083e0, 0xc01814d2e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:608 +0x42a
: github.com/minio/minio/cmd.httpStatsHandler.ServeHTTP(0x20e10c0, 0xc0141a2300, 0x210cda0, 0xc0141940e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:536 +0xe4
: github.com/minio/minio/cmd.requestSizeLimitHandler.ServeHTTP(0x20e0fe0, 0xc0141a2310, 0x50004000000, 0x210cda0, 0xc0141940e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:68 +0xd4
: github.com/minio/minio/cmd.requestHeaderSizeLimitHandler.ServeHTTP(0x20e10a0, 0xc01814d2a0, 0x210cda0, 0xc0141940e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:93 +0x1b7
: github.com/minio/minio/cmd.crossDomainPolicy.ServeHTTP(0x20e1080, 0xc0141a2320, 0x210cda0, 0xc0141940e0, 0xc0121d1400)
: github.com/minio/minio/cmd/crossdomain-xml-handler.go:51 +0x82
: github.com/minio/minio/cmd.browserRedirectHandler.ServeHTTP(0x20e0fa0, 0xc0141a2330, 0x210cda0, 0xc0141940e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:276 +0x68
: github.com/minio/minio/cmd.minioReservedBucketHandler.ServeHTTP(0x20e0f00, 0xc0141a2340, 0x210cda0, 0xc0141940e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:344 +0xb8
: github.com/minio/minio/cmd.cacheControlHandler.ServeHTTP(0x20e1020, 0xc0141a2350, 0x210cda0, 0xc0141940e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:303 +0x1ce
: github.com/minio/minio/cmd.timeValidityHandler.ServeHTTP(0x20e0f40, 0xc0141a2360, 0x210cda0, 0xc0141940e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:414 +0x3ca
: github.com/minio/minio/cmd.resourceHandler.ServeHTTP(0x20e1160, 0xc0141a2370, 0x210cda0, 0xc0141940e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:516 +0xab
: github.com/minio/minio/cmd.authHandler.ServeHTTP(0x20e1100, 0xc0141a2380, 0x210cda0, 0xc0141940e0, 0xc0121d1400)
: github.com/minio/minio/cmd/auth-handler.go:502 +0x2e7
: github.com/minio/minio/cmd.sseTLSHandler.ServeHTTP(0x20e0ee0, 0xc0141a2390, 0x210cda0, 0xc0141940e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:802 +0x79
: github.com/minio/minio/cmd.reservedMetadataHandler.ServeHTTP(0x20e1140, 0xc0141a23a0, 0x210cda0, 0xc0141940e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:139 +0x1b7
: github.com/gorilla/mux.(*Router).ServeHTTP(0xc00073fb00, 0x210cda0, 0xc0141940e0, 0xc0121d1200)
: github.com/gorilla/mux@v1.8.0/mux.go:210 +0xd3
: github.com/rs/cors.(*Cors).Handler.func1(0x210cda0, 0xc0141940e0, 0xc0121d1200)
: github.com/rs/cors@v1.7.0/cors.go:219 +0x1b9
: net/http.HandlerFunc.ServeHTTP(0xc0009aece0, 0x210cda0, 0xc0141940e0, 0xc0121d1200)
: net/http/server.go:2042 +0x44
: github.com/minio/minio/cmd.criticalErrorHandler.ServeHTTP(0x20e2180, 0xc0009aece0, 0x210cda0, 0xc0141940e0, 0xc0121d1200)
: github.com/minio/minio/cmd/generic-handlers.go:784 +0x85
: github.com/minio/minio/cmd/http.(*Server).Start.func1(0x210cda0, 0xc0141940e0, 0xc0121d1200)
: github.com/minio/minio/cmd/http/server.go:101 +0x258
: net/http.HandlerFunc.ServeHTTP(0xc000dc4080, 0x210cda0, 0xc0141940e0, 0xc0121d1200)
: net/http/server.go:2042 +0x44
: net/http.serverHandler.ServeHTTP(0xc000764c60, 0x210cda0, 0xc0141940e0, 0xc0121d1200)
: net/http/server.go:2843 +0xa3
: net/http.(*conn).serve(0xc019232780, 0x2114720, 0xc03381f6c0)
: net/http/server.go:1925 +0x8ad
: created by net/http.(*Server).Serve
: net/http/server.go:2969 +0x36c
```
</details>
2020-11-11 09:14:32 -08:00
return dobjects , derrs
2019-11-19 17:42:27 -08:00
}
2020-02-21 11:29:57 +05:30
defer multiDeleteLock . Unlock ( )
2019-11-19 17:42:27 -08:00
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
return z . serverPools [ 0 ] . DeleteObjects ( ctx , bucket , objects , opts )
2020-11-28 21:15:45 -08:00
}
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
deletedObjects , errs := pool . DeleteObjects ( ctx , bucket , objects , opts )
2019-11-19 17:42:27 -08:00
for i , derr := range errs {
2020-11-28 21:15:45 -08:00
if derr != nil {
derrs [ i ] = derr
2020-06-12 20:04:01 -07:00
}
2020-11-28 21:15:45 -08:00
dobjects [ i ] = deletedObjects [ i ]
2019-11-19 17:42:27 -08:00
}
}
2020-06-12 20:04:01 -07:00
return dobjects , derrs
2019-11-19 17:42:27 -08:00
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) CopyObject ( ctx context . Context , srcBucket , srcObject , dstBucket , dstObject string , srcInfo ObjectInfo , srcOpts , dstOpts ObjectOptions ) ( objInfo ObjectInfo , err error ) {
2020-09-19 08:39:41 -07:00
srcObject = encodeDirObject ( srcObject )
dstObject = encodeDirObject ( dstObject )
2020-05-28 14:36:38 -07:00
cpSrcDstSame := isStringEqual ( pathJoin ( srcBucket , srcObject ) , pathJoin ( dstBucket , dstObject ) )
2019-11-19 17:42:27 -08:00
2021-01-26 20:47:42 -08:00
poolIdx , err := z . getPoolIdx ( ctx , dstBucket , dstObject , dstOpts , srcInfo . Size )
2020-06-17 08:33:14 -07:00
if err != nil {
return objInfo , err
2020-05-28 14:36:38 -07:00
}
2020-08-03 16:21:10 -07:00
if cpSrcDstSame && srcInfo . metadataOnly {
2020-09-14 15:57:13 -07:00
// Version ID is set for the destination and source == destination version ID.
2020-06-19 08:44:51 -07:00
if dstOpts . VersionID != "" && srcOpts . VersionID == dstOpts . VersionID {
2021-01-06 09:35:47 -08:00
return z . serverPools [ poolIdx ] . CopyObject ( ctx , srcBucket , srcObject , dstBucket , dstObject , srcInfo , srcOpts , dstOpts )
2020-06-19 08:44:51 -07:00
}
2020-09-14 15:57:13 -07:00
// Destination is not versioned and source version ID is empty
// perform an in-place update.
2020-06-19 08:44:51 -07:00
if ! dstOpts . Versioned && srcOpts . VersionID == "" {
2021-01-06 09:35:47 -08:00
return z . serverPools [ poolIdx ] . CopyObject ( ctx , srcBucket , srcObject , dstBucket , dstObject , srcInfo , srcOpts , dstOpts )
2020-06-19 08:44:51 -07:00
}
2020-09-14 15:57:13 -07:00
// Destination is versioned, source is not destination version,
// as a special case look for if the source object is not legacy
// from older format, for older format we will rewrite them as
// newer using PutObject() - this is an optimization to save space
2020-08-03 16:21:10 -07:00
if dstOpts . Versioned && srcOpts . VersionID != dstOpts . VersionID && ! srcInfo . Legacy {
// CopyObject optimization where we don't create an entire copy
// of the content, instead we add a reference.
srcInfo . versionOnly = true
2021-01-06 09:35:47 -08:00
return z . serverPools [ poolIdx ] . CopyObject ( ctx , srcBucket , srcObject , dstBucket , dstObject , srcInfo , srcOpts , dstOpts )
2020-08-03 16:21:10 -07:00
}
2020-06-19 08:44:51 -07:00
}
2020-06-17 11:13:41 -07:00
putOpts := ObjectOptions {
ServerSideEncryption : dstOpts . ServerSideEncryption ,
UserDefined : srcInfo . UserDefined ,
Versioned : dstOpts . Versioned ,
VersionID : dstOpts . VersionID ,
2020-11-19 11:50:22 -08:00
MTime : dstOpts . MTime ,
2020-06-17 11:13:41 -07:00
}
2021-01-06 09:35:47 -08:00
return z . serverPools [ poolIdx ] . PutObject ( ctx , dstBucket , dstObject , srcInfo . PutObjReader , putOpts )
2019-11-19 17:42:27 -08:00
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) ListObjectsV2 ( ctx context . Context , bucket , prefix , continuationToken , delimiter string , maxKeys int , fetchOwner bool , startAfter string ) ( ListObjectsV2Info , error ) {
2019-11-19 17:42:27 -08:00
marker := continuationToken
if marker == "" {
marker = startAfter
}
loi , err := z . ListObjects ( ctx , bucket , prefix , marker , delimiter , maxKeys )
if err != nil {
return ListObjectsV2Info { } , err
}
listObjectsV2Info := ListObjectsV2Info {
IsTruncated : loi . IsTruncated ,
ContinuationToken : continuationToken ,
NextContinuationToken : loi . NextMarker ,
Objects : loi . Objects ,
Prefixes : loi . Prefixes ,
}
return listObjectsV2Info , err
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) ListObjectVersions ( ctx context . Context , bucket , prefix , marker , versionMarker , delimiter string , maxKeys int ) ( ListObjectVersionsInfo , error ) {
2020-06-12 20:04:01 -07:00
loi := ListObjectVersionsInfo { }
if marker == "" && versionMarker != "" {
return loi , NotImplemented { }
}
2020-11-13 16:58:20 -08:00
opts := listPathOptions {
2020-10-28 09:18:35 -07:00
Bucket : bucket ,
Prefix : prefix ,
Separator : delimiter ,
Limit : maxKeys ,
Marker : marker ,
InclDeleted : true ,
2020-11-03 08:53:48 -08:00
AskDisks : globalAPIConfig . getListQuorum ( ) ,
2020-11-13 16:58:20 -08:00
}
// Shortcut for APN/1.0 Veeam/1.0 Backup/10.0
// It requests unique blocks with a specific prefix.
// We skip scanning the parent directory for
// more objects matching the prefix.
ri := logger . GetReqInfo ( ctx )
if ri != nil && strings . Contains ( ri . UserAgent , ` 1.0 Veeam/1.0 Backup ` ) && strings . HasSuffix ( prefix , ".blk" ) {
2020-12-15 11:25:36 -08:00
opts . discardResult = true
2020-11-13 16:58:20 -08:00
opts . Transient = true
}
merged , err := z . listPath ( ctx , opts )
2020-10-28 09:18:35 -07:00
if err != nil && err != io . EOF {
return loi , err
2020-06-12 20:04:01 -07:00
}
2020-12-19 09:36:04 -08:00
objects := merged . fileInfoVersions ( bucket , prefix , delimiter , versionMarker )
loi . IsTruncated = err == nil && len ( objects ) > 0
if maxKeys > 0 && len ( objects ) > maxKeys {
objects = objects [ : maxKeys ]
2020-10-28 09:18:35 -07:00
loi . IsTruncated = true
2020-06-12 20:04:01 -07:00
}
2020-12-19 09:36:04 -08:00
for _ , obj := range objects {
if obj . IsDir && delimiter != "" {
loi . Prefixes = append ( loi . Prefixes , obj . Name )
} else {
loi . Objects = append ( loi . Objects , obj )
}
}
2020-06-12 20:04:01 -07:00
if loi . IsTruncated {
2020-12-19 09:36:04 -08:00
last := objects [ len ( objects ) - 1 ]
2020-10-28 09:18:35 -07:00
loi . NextMarker = encodeMarker ( last . Name , merged . listID )
loi . NextVersionIDMarker = last . VersionID
2019-11-19 17:42:27 -08:00
}
2020-10-28 09:18:35 -07:00
return loi , nil
}
2019-11-19 17:42:27 -08:00
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) ListObjects ( ctx context . Context , bucket , prefix , marker , delimiter string , maxKeys int ) ( ListObjectsInfo , error ) {
2020-10-28 09:18:35 -07:00
var loi ListObjectsInfo
2020-12-19 09:36:04 -08:00
2020-10-28 09:18:35 -07:00
merged , err := z . listPath ( ctx , listPathOptions {
Bucket : bucket ,
Prefix : prefix ,
Separator : delimiter ,
Limit : maxKeys ,
Marker : marker ,
InclDeleted : false ,
2020-11-03 08:53:48 -08:00
AskDisks : globalAPIConfig . getListQuorum ( ) ,
2020-10-28 09:18:35 -07:00
} )
if err != nil && err != io . EOF {
logger . LogIf ( ctx , err )
return loi , err
2020-06-12 20:04:01 -07:00
}
2020-12-19 09:36:04 -08:00
2020-10-28 09:18:35 -07:00
// Default is recursive, if delimiter is set then list non recursive.
2020-12-19 09:36:04 -08:00
objects := merged . fileInfos ( bucket , prefix , delimiter )
loi . IsTruncated = err == nil && len ( objects ) > 0
if maxKeys > 0 && len ( objects ) > maxKeys {
objects = objects [ : maxKeys ]
loi . IsTruncated = true
}
for _ , obj := range objects {
if obj . IsDir && delimiter != "" {
loi . Prefixes = append ( loi . Prefixes , obj . Name )
} else {
loi . Objects = append ( loi . Objects , obj )
}
}
2020-06-12 20:04:01 -07:00
if loi . IsTruncated {
2020-12-19 09:36:04 -08:00
last := objects [ len ( objects ) - 1 ]
loi . NextMarker = encodeMarker ( last . Name , merged . listID )
2020-06-12 20:04:01 -07:00
}
return loi , nil
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) ListMultipartUploads ( ctx context . Context , bucket , prefix , keyMarker , uploadIDMarker , delimiter string , maxUploads int ) ( ListMultipartsInfo , error ) {
2020-05-19 13:53:54 -07:00
if err := checkListMultipartArgs ( ctx , bucket , prefix , keyMarker , uploadIDMarker , delimiter , z ) ; err != nil {
return ListMultipartsInfo { } , err
}
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
return z . serverPools [ 0 ] . ListMultipartUploads ( ctx , bucket , prefix , keyMarker , uploadIDMarker , delimiter , maxUploads )
2019-11-19 17:42:27 -08:00
}
2020-05-19 13:53:54 -07:00
2021-01-06 09:35:47 -08:00
var poolResult = ListMultipartsInfo { }
poolResult . MaxUploads = maxUploads
poolResult . KeyMarker = keyMarker
poolResult . Prefix = prefix
poolResult . Delimiter = delimiter
for _ , pool := range z . serverPools {
result , err := pool . ListMultipartUploads ( ctx , bucket , prefix , keyMarker , uploadIDMarker ,
2019-11-19 17:42:27 -08:00
delimiter , maxUploads )
if err != nil {
return result , err
}
2021-01-06 09:35:47 -08:00
poolResult . Uploads = append ( poolResult . Uploads , result . Uploads ... )
2019-11-19 17:42:27 -08:00
}
2021-01-06 09:35:47 -08:00
return poolResult , nil
2019-11-19 17:42:27 -08:00
}
// Initiate a new multipart upload on a hashedSet based on object name.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) NewMultipartUpload ( ctx context . Context , bucket , object string , opts ObjectOptions ) ( string , error ) {
2020-05-19 13:53:54 -07:00
if err := checkNewMultipartArgs ( ctx , bucket , object , z ) ; err != nil {
return "" , err
}
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
return z . serverPools [ 0 ] . NewMultipartUpload ( ctx , bucket , object , opts )
2019-11-19 17:42:27 -08:00
}
2020-06-17 08:33:14 -07:00
2020-06-20 06:36:44 -07:00
// We don't know the exact size, so we ask for at least 1GiB file.
2021-01-26 20:47:42 -08:00
idx , err := z . getPoolIdx ( ctx , bucket , object , opts , 1 << 30 )
2020-06-17 08:33:14 -07:00
if err != nil {
return "" , err
}
2020-12-01 13:50:33 -08:00
return z . serverPools [ idx ] . NewMultipartUpload ( ctx , bucket , object , opts )
2019-11-19 17:42:27 -08:00
}
// Copies a part of an object from source hashedSet to destination hashedSet.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) CopyObjectPart ( ctx context . Context , srcBucket , srcObject , destBucket , destObject string , uploadID string , partID int , startOffset int64 , length int64 , srcInfo ObjectInfo , srcOpts , dstOpts ObjectOptions ) ( PartInfo , error ) {
2020-05-19 13:53:54 -07:00
if err := checkNewMultipartArgs ( ctx , srcBucket , srcObject , z ) ; err != nil {
return PartInfo { } , err
}
2019-11-19 17:42:27 -08:00
return z . PutObjectPart ( ctx , destBucket , destObject , uploadID , partID ,
NewPutObjReader ( srcInfo . Reader , nil , nil ) , dstOpts )
}
// PutObjectPart - writes part of an object to hashedSet based on the object name.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) PutObjectPart ( ctx context . Context , bucket , object , uploadID string , partID int , data * PutObjReader , opts ObjectOptions ) ( PartInfo , error ) {
2020-05-19 13:53:54 -07:00
if err := checkPutObjectPartArgs ( ctx , bucket , object , z ) ; err != nil {
return PartInfo { } , err
}
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
return z . serverPools [ 0 ] . PutObjectPart ( ctx , bucket , object , uploadID , partID , data , opts )
2019-11-19 17:42:27 -08:00
}
2020-05-28 14:36:38 -07:00
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
_ , err := pool . GetMultipartInfo ( ctx , bucket , object , uploadID , opts )
2020-05-28 12:36:20 -07:00
if err == nil {
2021-01-06 09:35:47 -08:00
return pool . PutObjectPart ( ctx , bucket , object , uploadID , partID , data , opts )
2019-11-19 17:42:27 -08:00
}
2020-05-28 12:36:20 -07:00
switch err . ( type ) {
case InvalidUploadID :
2021-01-06 09:35:47 -08:00
// Look for information on the next pool
2020-05-28 12:36:20 -07:00
continue
}
// Any other unhandled errors such as quorum return.
return PartInfo { } , err
2019-11-19 17:42:27 -08:00
}
return PartInfo { } , InvalidUploadID {
Bucket : bucket ,
Object : object ,
UploadID : uploadID ,
}
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) GetMultipartInfo ( ctx context . Context , bucket , object , uploadID string , opts ObjectOptions ) ( MultipartInfo , error ) {
2020-05-28 12:36:20 -07:00
if err := checkListPartsArgs ( ctx , bucket , object , z ) ; err != nil {
return MultipartInfo { } , err
}
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
return z . serverPools [ 0 ] . GetMultipartInfo ( ctx , bucket , object , uploadID , opts )
2020-05-28 12:36:20 -07:00
}
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
mi , err := pool . GetMultipartInfo ( ctx , bucket , object , uploadID , opts )
2020-05-28 12:36:20 -07:00
if err == nil {
return mi , nil
}
switch err . ( type ) {
case InvalidUploadID :
2021-01-06 09:35:47 -08:00
// upload id not found, continue to the next pool.
2020-05-28 12:36:20 -07:00
continue
}
// any other unhandled error return right here.
return MultipartInfo { } , err
}
return MultipartInfo { } , InvalidUploadID {
Bucket : bucket ,
Object : object ,
UploadID : uploadID ,
}
}
2019-11-19 17:42:27 -08:00
// ListObjectParts - lists all uploaded parts to an object in hashedSet.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) ListObjectParts ( ctx context . Context , bucket , object , uploadID string , partNumberMarker int , maxParts int , opts ObjectOptions ) ( ListPartsInfo , error ) {
2020-05-19 13:53:54 -07:00
if err := checkListPartsArgs ( ctx , bucket , object , z ) ; err != nil {
return ListPartsInfo { } , err
}
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
return z . serverPools [ 0 ] . ListObjectParts ( ctx , bucket , object , uploadID , partNumberMarker , maxParts , opts )
2019-11-19 17:42:27 -08:00
}
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
_ , err := pool . GetMultipartInfo ( ctx , bucket , object , uploadID , opts )
2020-05-28 12:36:20 -07:00
if err == nil {
2021-01-06 09:35:47 -08:00
return pool . ListObjectParts ( ctx , bucket , object , uploadID , partNumberMarker , maxParts , opts )
2019-11-19 17:42:27 -08:00
}
2020-05-28 12:36:20 -07:00
switch err . ( type ) {
case InvalidUploadID :
continue
}
return ListPartsInfo { } , err
2019-11-19 17:42:27 -08:00
}
return ListPartsInfo { } , InvalidUploadID {
Bucket : bucket ,
Object : object ,
UploadID : uploadID ,
}
}
// Aborts an in-progress multipart operation on hashedSet based on the object name.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) AbortMultipartUpload ( ctx context . Context , bucket , object , uploadID string , opts ObjectOptions ) error {
2020-05-19 13:53:54 -07:00
if err := checkAbortMultipartArgs ( ctx , bucket , object , z ) ; err != nil {
return err
}
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
return z . serverPools [ 0 ] . AbortMultipartUpload ( ctx , bucket , object , uploadID , opts )
2019-11-19 17:42:27 -08:00
}
2020-05-19 13:53:54 -07:00
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
_ , err := pool . GetMultipartInfo ( ctx , bucket , object , uploadID , opts )
2020-05-28 12:36:20 -07:00
if err == nil {
2021-01-06 09:35:47 -08:00
return pool . AbortMultipartUpload ( ctx , bucket , object , uploadID , opts )
2019-11-19 17:42:27 -08:00
}
2020-05-28 12:36:20 -07:00
switch err . ( type ) {
case InvalidUploadID :
2021-01-06 09:35:47 -08:00
// upload id not found move to next pool
2020-05-28 12:36:20 -07:00
continue
}
return err
2019-11-19 17:42:27 -08:00
}
return InvalidUploadID {
Bucket : bucket ,
Object : object ,
UploadID : uploadID ,
}
}
// CompleteMultipartUpload - completes a pending multipart transaction, on hashedSet based on object name.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) CompleteMultipartUpload ( ctx context . Context , bucket , object , uploadID string , uploadedParts [ ] CompletePart , opts ObjectOptions ) ( objInfo ObjectInfo , err error ) {
2020-05-19 13:53:54 -07:00
if err = checkCompleteMultipartArgs ( ctx , bucket , object , z ) ; err != nil {
return objInfo , err
}
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
return z . serverPools [ 0 ] . CompleteMultipartUpload ( ctx , bucket , object , uploadID , uploadedParts , opts )
2019-11-19 17:42:27 -08:00
}
// Purge any existing object.
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
pool . DeleteObject ( ctx , bucket , object , opts )
2019-11-19 17:42:27 -08:00
}
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
result , err := pool . ListMultipartUploads ( ctx , bucket , object , "" , "" , "" , maxUploadsList )
2019-11-19 17:42:27 -08:00
if err != nil {
return objInfo , err
}
if result . Lookup ( uploadID ) {
2021-01-06 09:35:47 -08:00
return pool . CompleteMultipartUpload ( ctx , bucket , object , uploadID , uploadedParts , opts )
2019-11-19 17:42:27 -08:00
}
}
return objInfo , InvalidUploadID {
Bucket : bucket ,
Object : object ,
UploadID : uploadID ,
}
}
2020-12-01 13:50:33 -08:00
// GetBucketInfo - returns bucket info from one of the erasure coded serverPools.
func ( z * erasureServerPools ) GetBucketInfo ( ctx context . Context , bucket string ) ( bucketInfo BucketInfo , err error ) {
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
bucketInfo , err = z . serverPools [ 0 ] . GetBucketInfo ( ctx , bucket )
2020-05-19 13:53:54 -07:00
if err != nil {
return bucketInfo , err
}
meta , err := globalBucketMetadataSys . Get ( bucket )
if err == nil {
bucketInfo . Created = meta . Created
}
return bucketInfo , nil
2019-11-19 17:42:27 -08:00
}
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
bucketInfo , err = pool . GetBucketInfo ( ctx , bucket )
2019-11-19 17:42:27 -08:00
if err != nil {
if isErrBucketNotFound ( err ) {
continue
}
return bucketInfo , err
}
2020-05-19 13:53:54 -07:00
meta , err := globalBucketMetadataSys . Get ( bucket )
if err == nil {
bucketInfo . Created = meta . Created
}
2019-11-19 17:42:27 -08:00
return bucketInfo , nil
}
return bucketInfo , BucketNotFound {
Bucket : bucket ,
}
}
// IsNotificationSupported returns whether bucket notification is applicable for this layer.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) IsNotificationSupported ( ) bool {
2019-11-19 17:42:27 -08:00
return true
}
2020-07-20 12:52:49 -07:00
// IsListenSupported returns whether listen bucket notification is applicable for this layer.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) IsListenSupported ( ) bool {
2019-11-19 17:42:27 -08:00
return true
}
// IsEncryptionSupported returns whether server side encryption is implemented for this layer.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) IsEncryptionSupported ( ) bool {
2019-11-19 17:42:27 -08:00
return true
}
// IsCompressionSupported returns whether compression is applicable for this layer.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) IsCompressionSupported ( ) bool {
2019-11-19 17:42:27 -08:00
return true
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) IsTaggingSupported ( ) bool {
2020-05-23 11:09:35 -07:00
return true
}
2020-12-01 13:50:33 -08:00
// DeleteBucket - deletes a bucket on all serverPools simultaneously,
// even if one of the serverPools fail to delete buckets, we proceed to
2019-11-19 17:42:27 -08:00
// undo a successful operation.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) DeleteBucket ( ctx context . Context , bucket string , forceDelete bool ) error {
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
return z . serverPools [ 0 ] . DeleteBucket ( ctx , bucket , forceDelete )
2019-11-19 17:42:27 -08:00
}
2020-12-01 13:50:33 -08:00
g := errgroup . WithNErrs ( len ( z . serverPools ) )
2019-11-19 17:42:27 -08:00
2020-12-01 13:50:33 -08:00
// Delete buckets in parallel across all serverPools.
for index := range z . serverPools {
2019-11-19 17:42:27 -08:00
index := index
g . Go ( func ( ) error {
2020-12-01 13:50:33 -08:00
return z . serverPools [ index ] . DeleteBucket ( ctx , bucket , forceDelete )
2019-11-19 17:42:27 -08:00
} , index )
}
errs := g . Wait ( )
2020-03-28 04:52:59 +00:00
2020-05-08 13:44:44 -07:00
// For any write quorum failure, we undo all the delete
// buckets operation by creating all the buckets again.
2019-11-19 17:42:27 -08:00
for _ , err := range errs {
if err != nil {
if _ , ok := err . ( InsufficientWriteQuorum ) ; ok {
2020-12-01 13:50:33 -08:00
undoDeleteBucketServerPools ( ctx , bucket , z . serverPools , errs )
2019-11-19 17:42:27 -08:00
}
2020-05-08 13:44:44 -07:00
2019-11-19 17:42:27 -08:00
return err
}
}
// Success.
return nil
}
2020-10-28 09:18:35 -07:00
// deleteAll will delete a bucket+prefix unconditionally across all disks.
// Note that set distribution is ignored so it should only be used in cases where
// data is not distributed across sets.
// Errors are logged but individual disk failures are not returned.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) deleteAll ( ctx context . Context , bucket , prefix string ) {
2020-10-28 09:18:35 -07:00
var wg sync . WaitGroup
2020-12-01 13:50:33 -08:00
for _ , servers := range z . serverPools {
2020-10-28 09:18:35 -07:00
for _ , set := range servers . sets {
for _ , disk := range set . getDisks ( ) {
if disk == nil {
continue
}
wg . Add ( 1 )
go func ( disk StorageAPI ) {
defer wg . Done ( )
2020-11-02 17:52:13 -08:00
disk . Delete ( ctx , bucket , prefix , true )
2020-10-28 09:18:35 -07:00
} ( disk )
}
}
}
wg . Wait ( )
}
2019-11-19 17:42:27 -08:00
// This function is used to undo a successful DeleteBucket operation.
2020-12-01 13:50:33 -08:00
func undoDeleteBucketServerPools ( ctx context . Context , bucket string , serverPools [ ] * erasureSets , errs [ ] error ) {
g := errgroup . WithNErrs ( len ( serverPools ) )
2019-11-19 17:42:27 -08:00
2020-12-01 13:50:33 -08:00
// Undo previous delete bucket on all underlying serverPools.
for index := range serverPools {
2019-11-19 17:42:27 -08:00
index := index
g . Go ( func ( ) error {
if errs [ index ] == nil {
2020-12-01 13:50:33 -08:00
return serverPools [ index ] . MakeBucketWithLocation ( ctx , bucket , BucketOptions { } )
2019-11-19 17:42:27 -08:00
}
return nil
} , index )
}
g . Wait ( )
}
2020-12-01 13:50:33 -08:00
// List all buckets from one of the serverPools, we are not doing merge
2019-11-19 17:42:27 -08:00
// sort here just for simplification. As per design it is assumed
2020-12-01 13:50:33 -08:00
// that all buckets are present on all serverPools.
func ( z * erasureServerPools ) ListBuckets ( ctx context . Context ) ( buckets [ ] BucketInfo , err error ) {
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
buckets , err = z . serverPools [ 0 ] . ListBuckets ( ctx )
2020-05-08 13:44:44 -07:00
} else {
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
buckets , err = pool . ListBuckets ( ctx )
2020-05-08 13:44:44 -07:00
if err != nil {
logger . LogIf ( ctx , err )
continue
}
break
}
2019-11-19 17:42:27 -08:00
}
2020-05-08 13:44:44 -07:00
if err != nil {
return nil , err
}
for i := range buckets {
2020-05-19 13:53:54 -07:00
meta , err := globalBucketMetadataSys . Get ( buckets [ i ] . Name )
2020-05-08 13:44:44 -07:00
if err == nil {
buckets [ i ] . Created = meta . Created
}
2019-11-19 17:42:27 -08:00
}
2020-05-08 13:44:44 -07:00
return buckets , nil
2019-11-19 17:42:27 -08:00
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) HealFormat ( ctx context . Context , dryRun bool ) ( madmin . HealResultItem , error ) {
2019-11-19 17:42:27 -08:00
// Acquire lock on format.json
2020-11-04 08:25:42 -08:00
formatLock := z . NewNSLock ( minioMetaBucket , formatConfigFile )
if err := formatLock . GetLock ( ctx , globalOperationTimeout ) ; err != nil {
2019-11-19 17:42:27 -08:00
return madmin . HealResultItem { } , err
}
defer formatLock . Unlock ( )
var r = madmin . HealResultItem {
Type : madmin . HealItemMetadata ,
Detail : "disk-format" ,
}
2020-01-15 17:19:13 -08:00
var countNoHeal int
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
result , err := pool . HealFormat ( ctx , dryRun )
2020-09-16 21:14:35 -07:00
if err != nil && ! errors . Is ( err , errNoHealRequired ) {
2019-11-19 17:42:27 -08:00
logger . LogIf ( ctx , err )
continue
}
2020-12-01 13:50:33 -08:00
// Count errNoHealRequired across all serverPools,
2020-01-15 17:19:13 -08:00
// to return appropriate error to the caller
2020-09-16 21:14:35 -07:00
if errors . Is ( err , errNoHealRequired ) {
2020-01-15 17:19:13 -08:00
countNoHeal ++
}
2019-11-20 23:40:26 +05:30
r . DiskCount += result . DiskCount
r . SetCount += result . SetCount
r . Before . Drives = append ( r . Before . Drives , result . Before . Drives ... )
r . After . Drives = append ( r . After . Drives , result . After . Drives ... )
2019-11-19 17:42:27 -08:00
}
2020-09-16 21:14:35 -07:00
2020-12-01 13:50:33 -08:00
// No heal returned by all serverPools, return errNoHealRequired
if countNoHeal == len ( z . serverPools ) {
2020-01-15 17:19:13 -08:00
return r , errNoHealRequired
}
2020-09-16 21:14:35 -07:00
2019-11-19 17:42:27 -08:00
return r , nil
}
2020-12-13 11:57:08 -08:00
func ( z * erasureServerPools ) HealBucket ( ctx context . Context , bucket string , opts madmin . HealOpts ) ( madmin . HealResultItem , error ) {
2019-11-19 17:42:27 -08:00
var r = madmin . HealResultItem {
Type : madmin . HealItemBucket ,
Bucket : bucket ,
}
2020-12-14 12:07:07 -08:00
// Attempt heal on the bucket metadata, ignore any failures
_ , _ = z . HealObject ( ctx , minioMetaBucket , pathJoin ( bucketConfigPrefix , bucket , bucketMetadataFile ) , "" , opts )
2020-12-13 11:57:08 -08:00
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
result , err := pool . HealBucket ( ctx , bucket , opts )
2019-11-19 17:42:27 -08:00
if err != nil {
switch err . ( type ) {
case BucketNotFound :
continue
}
return result , err
}
2019-11-20 23:40:26 +05:30
r . DiskCount += result . DiskCount
r . SetCount += result . SetCount
r . Before . Drives = append ( r . Before . Drives , result . Before . Drives ... )
r . After . Drives = append ( r . After . Drives , result . After . Drives ... )
2019-11-19 17:42:27 -08:00
}
2020-12-14 12:07:07 -08:00
2019-11-19 17:42:27 -08:00
return r , nil
}
2020-02-25 21:22:28 +05:30
// Walk a bucket, optionally prefix recursively, until we have returned
// all the content to objectInfo channel, it is callers responsibility
// to allocate a receive channel for ObjectInfo, upon any unhandled
// error walker returns error. Optionally if context.Done() is received
// then Walk() stops the walker.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) Walk ( ctx context . Context , bucket , prefix string , results chan <- ObjectInfo , opts ObjectOptions ) error {
2020-02-25 21:22:28 +05:30
if err := checkListObjsArgs ( ctx , bucket , prefix , "" , z ) ; err != nil {
2020-02-26 09:28:58 +05:30
// Upon error close the channel.
close ( results )
2020-02-25 21:22:28 +05:30
return err
}
2020-07-10 22:21:04 -07:00
if opts . WalkVersions {
go func ( ) {
defer close ( results )
2020-11-03 08:53:48 -08:00
var marker , versionIDMarker string
2020-07-10 22:21:04 -07:00
for {
2020-11-03 08:53:48 -08:00
loi , err := z . ListObjectVersions ( ctx , bucket , prefix , marker , versionIDMarker , "" , 1000 )
if err != nil {
break
2020-07-10 22:21:04 -07:00
}
2020-11-03 08:53:48 -08:00
for _ , obj := range loi . Objects {
results <- obj
2020-07-10 22:21:04 -07:00
}
2020-11-03 08:53:48 -08:00
if ! loi . IsTruncated {
break
}
marker = loi . NextMarker
versionIDMarker = loi . NextVersionIDMarker
2020-07-10 22:21:04 -07:00
}
} ( )
return nil
}
2020-02-25 21:22:28 +05:30
go func ( ) {
defer close ( results )
2020-11-03 08:53:48 -08:00
var marker string
2020-02-25 21:22:28 +05:30
for {
2020-11-03 08:53:48 -08:00
loi , err := z . ListObjects ( ctx , bucket , prefix , marker , "" , 1000 )
if err != nil {
break
2020-02-25 21:22:28 +05:30
}
2020-11-03 08:53:48 -08:00
for _ , obj := range loi . Objects {
results <- obj
2020-02-25 21:22:28 +05:30
}
2020-11-03 08:53:48 -08:00
if ! loi . IsTruncated {
break
}
marker = loi . NextMarker
2020-02-25 21:22:28 +05:30
}
} ( )
return nil
}
2020-06-12 20:04:01 -07:00
// HealObjectFn closure function heals the object.
2020-08-24 13:47:01 -07:00
type HealObjectFn func ( bucket , object , versionID string ) error
2020-01-29 12:05:44 +05:30
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) HealObjects ( ctx context . Context , bucket , prefix string , opts madmin . HealOpts , healObject HealObjectFn ) error {
2020-08-24 13:47:01 -07:00
// If listing did not return any entries upon first attempt, we
// return `ObjectNotFound`, to indicate the caller for any
// actions they may want to take as if `prefix` is missing.
err := toObjectErr ( errFileNotFound , bucket , prefix )
2020-12-01 13:50:33 -08:00
for _ , erasureSet := range z . serverPools {
2020-11-11 10:58:16 -08:00
for _ , set := range erasureSet . sets {
var entryChs [ ] FileInfoVersionsCh
var mu sync . Mutex
var wg sync . WaitGroup
for _ , disk := range set . getOnlineDisks ( ) {
disk := disk
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
entryCh , err := disk . WalkVersions ( ctx , bucket , prefix , "" , true , ctx . Done ( ) )
if err != nil {
// Disk walk returned error, ignore it.
return
}
mu . Lock ( )
entryChs = append ( entryChs , FileInfoVersionsCh {
Ch : entryCh ,
} )
mu . Unlock ( )
} ( )
}
wg . Wait ( )
2020-10-01 20:24:34 -07:00
2020-11-11 10:58:16 -08:00
entriesValid := make ( [ ] bool , len ( entryChs ) )
entries := make ( [ ] FileInfoVersions , len ( entryChs ) )
2020-01-29 12:05:44 +05:30
2020-11-11 10:58:16 -08:00
for {
entry , quorumCount , ok := lexicallySortedEntryVersions ( entryChs , entries , entriesValid )
if ! ok {
break
}
2020-10-01 20:24:34 -07:00
2021-01-27 11:19:28 +01:00
// Remove empty directories if found - they have no meaning.
// Can be left over from highly concurrent put/remove.
if quorumCount > set . setDriveCount / 2 && entry . IsEmptyDir {
if ! opts . DryRun && opts . Remove {
set . deleteEmptyDir ( ctx , bucket , entry . Name )
}
}
2020-11-11 10:58:16 -08:00
// Indicate that first attempt was a success and subsequent loop
// knows that its not our first attempt at 'prefix'
err = nil
2021-01-22 12:09:24 -08:00
if quorumCount == set . setDriveCount && opts . ScanMode == madmin . HealNormalScan {
2020-11-11 10:58:16 -08:00
continue
}
2020-01-29 12:05:44 +05:30
2020-11-11 10:58:16 -08:00
for _ , version := range entry . Versions {
if err := healObject ( bucket , version . Name , version . VersionID ) ; err != nil {
return toObjectErr ( err , bucket , version . Name )
}
}
2020-06-12 20:04:01 -07:00
}
2019-11-19 17:42:27 -08:00
}
}
2020-01-29 12:05:44 +05:30
2020-08-24 13:47:01 -07:00
return err
2019-11-19 17:42:27 -08:00
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) HealObject ( ctx context . Context , bucket , object , versionID string , opts madmin . HealOpts ) ( madmin . HealResultItem , error ) {
2020-09-19 08:39:41 -07:00
object = encodeDirObject ( object )
2020-11-04 08:25:42 -08:00
lk := z . NewNSLock ( bucket , object )
2020-07-21 13:54:06 -07:00
if bucket == minioMetaBucket {
// For .minio.sys bucket heals we should hold write locks.
2020-11-04 08:25:42 -08:00
if err := lk . GetLock ( ctx , globalOperationTimeout ) ; err != nil {
2020-07-21 13:54:06 -07:00
return madmin . HealResultItem { } , err
}
defer lk . Unlock ( )
} else {
// Lock the object before healing. Use read lock since healing
// will only regenerate parts & xl.meta of outdated disks.
2020-11-04 08:25:42 -08:00
if err := lk . GetRLock ( ctx , globalOperationTimeout ) ; err != nil {
2020-07-21 13:54:06 -07:00
return madmin . HealResultItem { } , err
}
defer lk . RUnlock ( )
2019-11-19 17:42:27 -08:00
}
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
result , err := pool . HealObject ( ctx , bucket , object , versionID , opts )
2021-01-25 18:53:37 +01:00
result . Object = decodeDirObject ( result . Object )
2019-11-19 17:42:27 -08:00
if err != nil {
2020-07-02 16:17:27 -07:00
if isErrObjectNotFound ( err ) || isErrVersionNotFound ( err ) {
2019-11-19 17:42:27 -08:00
continue
}
return result , err
}
return result , nil
}
2020-10-22 13:36:24 -07:00
if versionID != "" {
return madmin . HealResultItem { } , VersionNotFound {
Bucket : bucket ,
Object : object ,
VersionID : versionID ,
}
}
2019-11-19 17:42:27 -08:00
return madmin . HealResultItem { } , ObjectNotFound {
Bucket : bucket ,
Object : object ,
}
}
2019-12-06 12:46:06 +05:30
// GetMetrics - no op
2021-01-18 20:35:38 -08:00
func ( z * erasureServerPools ) GetMetrics ( ctx context . Context ) ( * BackendMetrics , error ) {
2019-12-06 12:46:06 +05:30
logger . LogIf ( ctx , NotImplemented { } )
2021-01-18 20:35:38 -08:00
return & BackendMetrics { } , NotImplemented { }
2019-12-06 12:46:06 +05:30
}
2019-12-28 22:24:43 +05:30
2021-01-26 20:47:42 -08:00
func ( z * erasureServerPools ) getPoolAndSet ( id string ) ( int , int , error ) {
2021-01-06 09:35:47 -08:00
for poolIdx := range z . serverPools {
format := z . serverPools [ poolIdx ] . format
2020-06-12 20:04:01 -07:00
for setIdx , set := range format . Erasure . Sets {
2020-05-23 17:38:39 -07:00
for _ , diskID := range set {
if diskID == id {
2021-01-06 09:35:47 -08:00
return poolIdx , setIdx , nil
2020-05-23 17:38:39 -07:00
}
}
}
}
2020-06-15 22:09:39 -07:00
return 0 , 0 , fmt . Errorf ( "DiskID(%s) %w" , id , errDiskNotFound )
2020-05-23 17:38:39 -07:00
}
2020-07-20 18:31:22 -07:00
// HealthOptions takes input options to return sepcific information
type HealthOptions struct {
Maintenance bool
}
// HealthResult returns the current state of the system, also
// additionally with any specific heuristic information which
// was queried
type HealthResult struct {
Healthy bool
2020-08-07 13:22:53 -07:00
HealingDrives int
2021-01-26 20:47:42 -08:00
PoolID , SetID int
2020-07-20 18:31:22 -07:00
WriteQuorum int
}
// Health - returns current status of the object layer health,
// provides if write access exists across sets, additionally
// can be used to query scenarios if health may be lost
// if this node is taken down by an external orchestrator.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) Health ( ctx context . Context , opts HealthOptions ) HealthResult {
erasureSetUpCount := make ( [ ] [ ] int , len ( z . serverPools ) )
for i := range z . serverPools {
erasureSetUpCount [ i ] = make ( [ ] int , len ( z . serverPools [ i ] . sets ) )
2020-05-23 17:38:39 -07:00
}
2020-06-04 14:58:34 -07:00
diskIDs := globalNotificationSys . GetLocalDiskIDs ( ctx )
2020-07-20 18:31:22 -07:00
if ! opts . Maintenance {
diskIDs = append ( diskIDs , getLocalDiskIDs ( z ) )
}
2020-05-23 17:38:39 -07:00
2020-07-20 18:31:22 -07:00
for _ , localDiskIDs := range diskIDs {
for _ , id := range localDiskIDs {
2021-01-26 20:47:42 -08:00
poolIdx , setIdx , err := z . getPoolAndSet ( id )
2020-07-20 18:31:22 -07:00
if err != nil {
logger . LogIf ( ctx , err )
continue
}
2021-01-06 09:35:47 -08:00
erasureSetUpCount [ poolIdx ] [ setIdx ] ++
2020-05-23 17:38:39 -07:00
}
}
2020-08-13 15:21:20 -07:00
reqInfo := ( & logger . ReqInfo { } ) . AppendTags ( "maintenance" , strconv . FormatBool ( opts . Maintenance ) )
2021-01-16 12:08:02 -08:00
b := z . BackendInfo ( )
2021-01-22 12:09:24 -08:00
writeQuorum := b . StandardSCData [ 0 ]
if writeQuorum == b . StandardSCParity {
2020-09-02 22:54:56 -07:00
writeQuorum ++
}
2020-09-04 17:09:02 -07:00
var aggHealStateResult madmin . BgHealState
if opts . Maintenance {
// check if local disks are being healed, if they are being healed
// we need to tell healthy status as 'false' so that this server
// is not taken down for maintenance
var err error
aggHealStateResult , err = getAggregatedBackgroundHealState ( ctx )
if err != nil {
logger . LogIf ( logger . SetReqInfo ( ctx , reqInfo ) , fmt . Errorf ( "Unable to verify global heal status: %w" , err ) )
return HealthResult {
Healthy : false ,
}
}
if len ( aggHealStateResult . HealDisks ) > 0 {
logger . LogIf ( logger . SetReqInfo ( ctx , reqInfo ) , fmt . Errorf ( "Total drives to be healed %d" , len ( aggHealStateResult . HealDisks ) ) )
}
}
2021-01-06 09:35:47 -08:00
for poolIdx := range erasureSetUpCount {
for setIdx := range erasureSetUpCount [ poolIdx ] {
if erasureSetUpCount [ poolIdx ] [ setIdx ] < writeQuorum {
2020-08-13 15:21:20 -07:00
logger . LogIf ( logger . SetReqInfo ( ctx , reqInfo ) ,
2021-01-06 09:35:47 -08:00
fmt . Errorf ( "Write quorum may be lost on pool: %d, set: %d, expected write quorum: %d" ,
poolIdx , setIdx , writeQuorum ) )
2020-07-20 18:31:22 -07:00
return HealthResult {
2020-09-04 17:09:02 -07:00
Healthy : false ,
HealingDrives : len ( aggHealStateResult . HealDisks ) ,
2021-01-26 20:47:42 -08:00
PoolID : poolIdx ,
2020-09-04 17:09:02 -07:00
SetID : setIdx ,
WriteQuorum : writeQuorum ,
2020-07-20 18:31:22 -07:00
}
2020-05-23 17:38:39 -07:00
}
}
}
2020-08-07 13:22:53 -07:00
2020-08-12 16:53:15 -07:00
// when maintenance is not specified we don't have
// to look at the healing side of the code.
if ! opts . Maintenance {
return HealthResult {
2020-09-02 22:54:56 -07:00
Healthy : true ,
WriteQuorum : writeQuorum ,
2020-08-12 16:53:15 -07:00
}
}
2020-07-20 18:31:22 -07:00
return HealthResult {
2020-09-02 22:54:56 -07:00
Healthy : len ( aggHealStateResult . HealDisks ) == 0 ,
2020-08-07 13:22:53 -07:00
HealingDrives : len ( aggHealStateResult . HealDisks ) ,
2020-09-02 22:54:56 -07:00
WriteQuorum : writeQuorum ,
2020-07-20 18:31:22 -07:00
}
2019-12-28 22:24:43 +05:30
}
2020-01-20 22:15:59 +05:30
2020-05-23 11:09:35 -07:00
// PutObjectTags - replace or add tags to an existing object
2021-02-01 22:52:51 +01:00
func ( z * erasureServerPools ) PutObjectTags ( ctx context . Context , bucket , object string , tags string , opts ObjectOptions ) ( ObjectInfo , error ) {
2020-09-19 08:39:41 -07:00
object = encodeDirObject ( object )
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
return z . serverPools [ 0 ] . PutObjectTags ( ctx , bucket , object , tags , opts )
2020-01-20 22:15:59 +05:30
}
2020-06-29 13:07:26 -07:00
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
2021-02-01 22:52:51 +01:00
objInfo , err := pool . PutObjectTags ( ctx , bucket , object , tags , opts )
2020-01-20 22:15:59 +05:30
if err != nil {
2020-07-02 16:17:27 -07:00
if isErrObjectNotFound ( err ) || isErrVersionNotFound ( err ) {
2020-01-20 22:15:59 +05:30
continue
}
2021-02-01 22:52:51 +01:00
return ObjectInfo { } , err
2020-01-20 22:15:59 +05:30
}
2021-02-01 22:52:51 +01:00
return objInfo , nil
2020-01-20 22:15:59 +05:30
}
2020-06-29 13:07:26 -07:00
if opts . VersionID != "" {
2021-02-01 22:52:51 +01:00
return ObjectInfo { } , VersionNotFound {
2020-06-29 13:07:26 -07:00
Bucket : bucket ,
Object : object ,
VersionID : opts . VersionID ,
}
}
2021-02-01 22:52:51 +01:00
return ObjectInfo { } , ObjectNotFound {
2020-01-20 22:15:59 +05:30
Bucket : bucket ,
2020-06-29 13:07:26 -07:00
Object : object ,
2020-01-20 22:15:59 +05:30
}
}
2020-05-23 11:09:35 -07:00
// DeleteObjectTags - delete object tags from an existing object
2021-02-01 22:52:51 +01:00
func ( z * erasureServerPools ) DeleteObjectTags ( ctx context . Context , bucket , object string , opts ObjectOptions ) ( ObjectInfo , error ) {
2020-09-19 08:39:41 -07:00
object = encodeDirObject ( object )
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
return z . serverPools [ 0 ] . DeleteObjectTags ( ctx , bucket , object , opts )
2020-01-20 22:15:59 +05:30
}
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
2021-02-01 22:52:51 +01:00
objInfo , err := pool . DeleteObjectTags ( ctx , bucket , object , opts )
2020-01-20 22:15:59 +05:30
if err != nil {
2020-07-02 16:17:27 -07:00
if isErrObjectNotFound ( err ) || isErrVersionNotFound ( err ) {
2020-01-20 22:15:59 +05:30
continue
}
2021-02-01 22:52:51 +01:00
return ObjectInfo { } , err
2020-01-20 22:15:59 +05:30
}
2021-02-01 22:52:51 +01:00
return objInfo , nil
2020-01-20 22:15:59 +05:30
}
2020-06-29 13:07:26 -07:00
if opts . VersionID != "" {
2021-02-01 22:52:51 +01:00
return ObjectInfo { } , VersionNotFound {
2020-06-29 13:07:26 -07:00
Bucket : bucket ,
Object : object ,
VersionID : opts . VersionID ,
}
}
2021-02-01 22:52:51 +01:00
return ObjectInfo { } , ObjectNotFound {
2020-01-20 22:15:59 +05:30
Bucket : bucket ,
2020-06-29 13:07:26 -07:00
Object : object ,
2020-01-20 22:15:59 +05:30
}
}
2020-05-23 11:09:35 -07:00
// GetObjectTags - get object tags from an existing object
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) GetObjectTags ( ctx context . Context , bucket , object string , opts ObjectOptions ) ( * tags . Tags , error ) {
2020-09-19 08:39:41 -07:00
object = encodeDirObject ( object )
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
return z . serverPools [ 0 ] . GetObjectTags ( ctx , bucket , object , opts )
2020-01-20 22:15:59 +05:30
}
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
tags , err := pool . GetObjectTags ( ctx , bucket , object , opts )
2020-01-20 22:15:59 +05:30
if err != nil {
2020-07-02 16:17:27 -07:00
if isErrObjectNotFound ( err ) || isErrVersionNotFound ( err ) {
2020-01-20 22:15:59 +05:30
continue
}
return tags , err
}
return tags , nil
}
2020-06-29 13:07:26 -07:00
if opts . VersionID != "" {
return nil , VersionNotFound {
Bucket : bucket ,
Object : object ,
VersionID : opts . VersionID ,
}
}
return nil , ObjectNotFound {
2020-01-20 22:15:59 +05:30
Bucket : bucket ,
2020-06-29 13:07:26 -07:00
Object : object ,
2020-01-20 22:15:59 +05:30
}
}