2021-04-18 12:41:13 -07:00
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
2019-11-19 17:42:27 -08:00
package cmd
import (
"context"
2020-09-16 21:14:35 -07:00
"errors"
2019-11-19 17:42:27 -08:00
"fmt"
"io"
"math/rand"
"net/http"
2020-12-15 17:34:54 -08:00
"sort"
2020-08-13 15:21:20 -07:00
"strconv"
2019-12-12 15:02:37 +01:00
"sync"
2020-03-19 00:19:29 +01:00
"time"
2019-11-19 17:42:27 -08:00
2021-05-06 08:52:02 -07:00
"github.com/minio/madmin-go"
2020-07-14 17:38:05 +01:00
"github.com/minio/minio-go/v7/pkg/set"
"github.com/minio/minio-go/v7/pkg/tags"
2021-06-01 14:59:40 -07:00
"github.com/minio/minio/internal/config/storageclass"
"github.com/minio/minio/internal/logger"
"github.com/minio/minio/internal/sync/errgroup"
2021-05-28 15:17:01 -07:00
"github.com/minio/pkg/wildcard"
2019-11-19 17:42:27 -08:00
)
2020-12-01 13:50:33 -08:00
type erasureServerPools struct {
2020-05-19 13:53:54 -07:00
GatewayUnsupported
2020-12-01 13:50:33 -08:00
serverPools [ ] * erasureSets
2020-09-10 09:18:19 -07:00
// Shut down async operations
shutdown context . CancelFunc
2019-11-19 17:42:27 -08:00
}
2021-01-26 20:47:42 -08:00
func ( z * erasureServerPools ) SinglePool ( ) bool {
2020-12-01 13:50:33 -08:00
return len ( z . serverPools ) == 1
2019-11-19 17:42:27 -08:00
}
2021-01-06 09:35:47 -08:00
// Initialize new pool of erasure sets.
2020-12-01 13:50:33 -08:00
func newErasureServerPools ( ctx context . Context , endpointServerPools EndpointServerPools ) ( ObjectLayer , error ) {
2019-11-21 04:24:51 -08:00
var (
2021-01-19 10:01:31 -08:00
deploymentID string
distributionAlgo string
commonParityDrives int
err error
2019-11-21 04:24:51 -08:00
2020-12-01 13:50:33 -08:00
formats = make ( [ ] * formatErasureV3 , len ( endpointServerPools ) )
storageDisks = make ( [ ] [ ] StorageAPI , len ( endpointServerPools ) )
z = & erasureServerPools { serverPools : make ( [ ] * erasureSets , len ( endpointServerPools ) ) }
2019-11-21 04:24:51 -08:00
)
2020-04-27 19:06:21 +02:00
var localDrives [ ] string
2020-12-01 13:50:33 -08:00
local := endpointServerPools . FirstLocal ( )
for i , ep := range endpointServerPools {
2020-04-27 19:06:21 +02:00
for _ , endpoint := range ep . Endpoints {
if endpoint . IsLocal {
localDrives = append ( localDrives , endpoint . Path )
}
}
2021-01-19 10:01:31 -08:00
2021-01-29 11:40:55 -08:00
// If storage class is not set during startup, default values are used
// -- Default for Reduced Redundancy Storage class is, parity = 2
// -- Default for Standard Storage class is, parity = 2 - disks 4, 5
// -- Default for Standard Storage class is, parity = 3 - disks 6, 7
// -- Default for Standard Storage class is, parity = 4 - disks 8 to 16
2021-01-19 10:01:31 -08:00
if commonParityDrives == 0 {
commonParityDrives = ecDrivesNoConfig ( ep . DrivesPerSet )
}
2021-01-29 11:40:55 -08:00
if err = storageclass . ValidateParity ( commonParityDrives , ep . DrivesPerSet ) ; err != nil {
return nil , fmt . Errorf ( "All current serverPools should have same parity ratio - expected %d, got %d" , commonParityDrives , ecDrivesNoConfig ( ep . DrivesPerSet ) )
2021-01-19 10:01:31 -08:00
}
2020-06-12 20:04:01 -07:00
storageDisks [ i ] , formats [ i ] , err = waitForFormatErasure ( local , ep . Endpoints , i + 1 ,
2021-01-19 10:01:31 -08:00
ep . SetCount , ep . DrivesPerSet , deploymentID , distributionAlgo )
2019-11-21 04:24:51 -08:00
if err != nil {
return nil , err
}
2021-01-19 10:01:31 -08:00
2019-11-21 04:24:51 -08:00
if deploymentID == "" {
2021-01-16 12:08:02 -08:00
// all zones should have same deployment ID
2019-11-21 04:24:51 -08:00
deploymentID = formats [ i ] . ID
}
2021-01-19 10:01:31 -08:00
if distributionAlgo == "" {
distributionAlgo = formats [ i ] . Erasure . DistributionAlgo
}
// Validate if users brought different DeploymentID pools.
if deploymentID != formats [ i ] . ID {
return nil , fmt . Errorf ( "All serverPools should have same deployment ID expected %s, got %s" , deploymentID , formats [ i ] . ID )
}
2021-03-04 14:36:23 -08:00
z . serverPools [ i ] , err = newErasureSets ( ctx , ep . Endpoints , storageDisks [ i ] , formats [ i ] , commonParityDrives , i )
2019-11-19 17:42:27 -08:00
if err != nil {
return nil , err
}
}
2020-09-10 09:18:19 -07:00
ctx , z . shutdown = context . WithCancel ( ctx )
2020-08-20 13:17:42 -07:00
go intDataUpdateTracker . start ( ctx , localDrives ... )
2019-11-19 17:42:27 -08:00
return z , nil
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) NewNSLock ( bucket string , objects ... string ) RWLocker {
return z . serverPools [ 0 ] . NewNSLock ( bucket , objects ... )
2019-11-19 17:42:27 -08:00
}
2020-12-01 12:07:39 -08:00
// GetDisksID will return disks by their ID.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) GetDisksID ( ids ... string ) [ ] StorageAPI {
2020-12-01 12:07:39 -08:00
idMap := make ( map [ string ] struct { } )
for _ , id := range ids {
idMap [ id ] = struct { } { }
}
res := make ( [ ] StorageAPI , 0 , len ( idMap ) )
2020-12-11 16:58:36 -08:00
for _ , s := range z . serverPools {
s . erasureDisksMu . RLock ( )
defer s . erasureDisksMu . RUnlock ( )
for _ , disks := range s . erasureDisks {
2020-12-01 12:07:39 -08:00
for _ , disk := range disks {
2020-12-11 16:58:36 -08:00
if disk == OfflineDisk {
continue
}
if id , _ := disk . GetDiskID ( ) ; id != "" {
if _ , ok := idMap [ id ] ; ok {
res = append ( res , disk )
}
2020-12-01 12:07:39 -08:00
}
}
}
}
return res
}
2021-07-09 11:29:16 -07:00
// GetRawData will return all files with a given raw path to the callback.
// Errors are ignored, only errors from the callback are returned.
// For now only direct file paths are supported.
func ( z * erasureServerPools ) GetRawData ( ctx context . Context , volume , file string , fn func ( r io . Reader , host string , disk string , filename string , size int64 , modtime time . Time ) error ) error {
for _ , s := range z . serverPools {
for _ , disks := range s . erasureDisks {
for i , disk := range disks {
if disk == OfflineDisk {
continue
}
si , err := disk . StatInfoFile ( ctx , volume , file )
if err != nil {
continue
}
r , err := disk . ReadFileStream ( ctx , volume , file , 0 , si . Size )
if err != nil {
continue
}
defer r . Close ( )
did , err := disk . GetDiskID ( )
if err != nil {
did = fmt . Sprintf ( "disk-%d" , i )
}
err = fn ( r , disk . Hostname ( ) , did , pathJoin ( volume , file ) , si . Size , si . ModTime )
if err != nil {
return err
}
}
}
}
return nil
}
2021-01-22 12:09:24 -08:00
func ( z * erasureServerPools ) SetDriveCounts ( ) [ ] int {
setDriveCounts := make ( [ ] int , len ( z . serverPools ) )
for i := range z . serverPools {
setDriveCounts [ i ] = z . serverPools [ i ] . SetDriveCount ( )
}
return setDriveCounts
2020-08-05 13:31:12 -07:00
}
2021-01-06 09:35:47 -08:00
type serverPoolsAvailableSpace [ ] poolAvailableSpace
2019-11-19 17:42:27 -08:00
2021-01-06 09:35:47 -08:00
type poolAvailableSpace struct {
2019-11-19 17:42:27 -08:00
Index int
Available uint64
}
// TotalAvailable - total available space
2020-12-01 13:50:33 -08:00
func ( p serverPoolsAvailableSpace ) TotalAvailable ( ) uint64 {
2019-11-19 17:42:27 -08:00
total := uint64 ( 0 )
for _ , z := range p {
total += z . Available
}
return total
}
2021-01-26 20:47:42 -08:00
// getAvailablePoolIdx will return an index that can hold size bytes.
2020-12-01 13:50:33 -08:00
// -1 is returned if no serverPools have available space for the size given.
2021-06-07 17:13:15 +02:00
func ( z * erasureServerPools ) getAvailablePoolIdx ( ctx context . Context , bucket , object string , size int64 ) int {
serverPools := z . getServerPoolsAvailableSpace ( ctx , bucket , object , size )
2020-12-01 13:50:33 -08:00
total := serverPools . TotalAvailable ( )
2019-11-19 17:42:27 -08:00
if total == 0 {
2020-06-20 06:36:44 -07:00
return - 1
2019-11-19 17:42:27 -08:00
}
// choose when we reach this many
choose := rand . Uint64 ( ) % total
atTotal := uint64 ( 0 )
2021-01-06 09:35:47 -08:00
for _ , pool := range serverPools {
atTotal += pool . Available
if atTotal > choose && pool . Available > 0 {
return pool . Index
2019-11-19 17:42:27 -08:00
}
}
// Should not happen, but print values just in case.
2020-12-01 13:50:33 -08:00
logger . LogIf ( ctx , fmt . Errorf ( "reached end of serverPools (total: %v, atTotal: %v, choose: %v)" , total , atTotal , choose ) )
2020-06-20 06:36:44 -07:00
return - 1
2019-11-19 17:42:27 -08:00
}
2021-01-06 09:35:47 -08:00
// getServerPoolsAvailableSpace will return the available space of each pool after storing the content.
// If there is not enough space the pool will return 0 bytes available.
2020-06-20 06:36:44 -07:00
// Negative sizes are seen as 0 bytes.
2021-06-07 17:13:15 +02:00
func ( z * erasureServerPools ) getServerPoolsAvailableSpace ( ctx context . Context , bucket , object string , size int64 ) serverPoolsAvailableSpace {
2020-12-01 13:50:33 -08:00
var serverPools = make ( serverPoolsAvailableSpace , len ( z . serverPools ) )
2019-11-19 17:42:27 -08:00
2021-06-07 17:13:15 +02:00
storageInfos := make ( [ ] [ ] * DiskInfo , len ( z . serverPools ) )
2020-12-01 13:50:33 -08:00
g := errgroup . WithNErrs ( len ( z . serverPools ) )
for index := range z . serverPools {
2019-11-19 17:42:27 -08:00
index := index
g . Go ( func ( ) error {
2021-06-07 17:13:15 +02:00
// Get the set where it would be placed.
storageInfos [ index ] = getDiskInfos ( ctx , z . serverPools [ index ] . getHashedSet ( object ) . getDisks ( ) )
2019-11-19 17:42:27 -08:00
return nil
} , index )
}
// Wait for the go routines.
g . Wait ( )
for i , zinfo := range storageInfos {
var available uint64
2021-06-07 17:13:15 +02:00
if ! isMinioMetaBucketName ( bucket ) && ! hasSpaceFor ( zinfo , size ) {
serverPools [ i ] = poolAvailableSpace { Index : i }
continue
2020-06-20 06:36:44 -07:00
}
2021-06-07 17:13:15 +02:00
for _ , disk := range zinfo {
2021-06-09 11:14:47 -07:00
if disk == nil {
continue
}
2021-06-07 17:13:15 +02:00
available += disk . Total - disk . Used
2020-06-20 06:36:44 -07:00
}
2021-01-06 09:35:47 -08:00
serverPools [ i ] = poolAvailableSpace {
2019-11-19 17:42:27 -08:00
Index : i ,
Available : available ,
}
}
2020-12-01 13:50:33 -08:00
return serverPools
2019-11-19 17:42:27 -08:00
}
2021-06-10 23:07:16 -07:00
// poolObjInfo represents the state of an object per pool
type poolObjInfo struct {
PoolIndex int
ObjInfo ObjectInfo
Err error
}
2021-08-17 07:50:00 -07:00
func ( z * erasureServerPools ) getPoolIdxExistingWithOpts ( ctx context . Context , bucket , object string , opts ObjectOptions ) ( idx int , err error ) {
2021-03-16 19:02:20 +01:00
if z . SinglePool ( ) {
return 0 , nil
}
2021-06-10 23:07:16 -07:00
poolObjInfos := make ( [ ] poolObjInfo , len ( z . serverPools ) )
2021-03-16 19:02:20 +01:00
var wg sync . WaitGroup
for i , pool := range z . serverPools {
wg . Add ( 1 )
go func ( i int , pool * erasureSets ) {
defer wg . Done ( )
2021-06-10 23:07:16 -07:00
// remember the pool index, we may sort the slice original index might be lost.
pinfo := poolObjInfo {
PoolIndex : i ,
}
2021-08-17 07:50:00 -07:00
pinfo . ObjInfo , pinfo . Err = pool . GetObjectInfo ( ctx , bucket , object , opts )
2021-06-10 23:07:16 -07:00
poolObjInfos [ i ] = pinfo
2021-03-16 19:02:20 +01:00
} ( i , pool )
}
wg . Wait ( )
2021-06-10 23:07:16 -07:00
// Sort the objInfos such that we always serve latest
// this is a defensive change to handle any duplicate
// content that may have been created, we always serve
// the latest object.
sort . Slice ( poolObjInfos , func ( i , j int ) bool {
mtime1 := poolObjInfos [ i ] . ObjInfo . ModTime
mtime2 := poolObjInfos [ j ] . ObjInfo . ModTime
return mtime1 . After ( mtime2 )
} )
for _ , pinfo := range poolObjInfos {
if pinfo . Err != nil && ! isErrObjectNotFound ( pinfo . Err ) {
return - 1 , pinfo . Err
2021-03-16 19:02:20 +01:00
}
2021-06-10 23:07:16 -07:00
if isErrObjectNotFound ( pinfo . Err ) {
2021-03-16 19:02:20 +01:00
// No object exists or its a delete marker,
// check objInfo to confirm.
2021-06-10 23:07:16 -07:00
if pinfo . ObjInfo . DeleteMarker && pinfo . ObjInfo . Name != "" {
return pinfo . PoolIndex , nil
2021-03-16 19:02:20 +01:00
}
// objInfo is not valid, truly the object doesn't
// exist proceed to next pool.
continue
}
2021-06-10 23:07:16 -07:00
return pinfo . PoolIndex , nil
2021-03-16 19:02:20 +01:00
}
return - 1 , toObjectErr ( errFileNotFound , bucket , object )
}
2021-08-17 07:50:00 -07:00
func ( z * erasureServerPools ) getPoolIdxExistingNoLock ( ctx context . Context , bucket , object string ) ( idx int , err error ) {
return z . getPoolIdxExistingWithOpts ( ctx , bucket , object , ObjectOptions { NoLock : true } )
}
// getPoolIdxExisting returns the (first) found object pool index containing an object.
// If the object exists, but the latest version is a delete marker, the index with it is still returned.
// If the object does not exist ObjectNotFound error is returned.
// If any other error is found, it is returned.
// The check is skipped if there is only one zone, and 0, nil is always returned in that case.
func ( z * erasureServerPools ) getPoolIdxExisting ( ctx context . Context , bucket , object string ) ( idx int , err error ) {
return z . getPoolIdxExistingWithOpts ( ctx , bucket , object , ObjectOptions { } )
}
2021-01-26 20:47:42 -08:00
// getPoolIdx returns the found previous object and its corresponding pool idx,
2021-01-06 09:35:47 -08:00
// if none are found falls back to most available space pool.
2021-02-10 11:45:02 -08:00
func ( z * erasureServerPools ) getPoolIdx ( ctx context . Context , bucket , object string , size int64 ) ( idx int , err error ) {
2021-05-06 10:45:33 -07:00
idx , err = z . getPoolIdxExisting ( ctx , bucket , object )
if err != nil && ! isErrObjectNotFound ( err ) {
return idx , err
2020-06-17 08:33:14 -07:00
}
2021-02-16 19:36:15 -08:00
2021-05-06 10:45:33 -07:00
if isErrObjectNotFound ( err ) {
2021-06-07 17:13:15 +02:00
idx = z . getAvailablePoolIdx ( ctx , bucket , object , size )
2021-05-06 10:45:33 -07:00
if idx < 0 {
return - 1 , toObjectErr ( errDiskFull )
2020-06-17 08:33:14 -07:00
}
}
2020-06-20 06:36:44 -07:00
return idx , nil
2020-06-17 08:33:14 -07:00
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) Shutdown ( ctx context . Context ) error {
2020-09-10 09:18:19 -07:00
defer z . shutdown ( )
2019-11-19 17:42:27 -08:00
2020-12-01 13:50:33 -08:00
g := errgroup . WithNErrs ( len ( z . serverPools ) )
2019-11-19 17:42:27 -08:00
2020-12-01 13:50:33 -08:00
for index := range z . serverPools {
2019-11-19 17:42:27 -08:00
index := index
g . Go ( func ( ) error {
2020-12-01 13:50:33 -08:00
return z . serverPools [ index ] . Shutdown ( ctx )
2019-11-19 17:42:27 -08:00
} , index )
}
for _ , err := range g . Wait ( ) {
if err != nil {
logger . LogIf ( ctx , err )
}
// let's the rest shutdown
}
return nil
}
2021-03-04 14:36:23 -08:00
func ( z * erasureServerPools ) BackendInfo ( ) ( b madmin . BackendInfo ) {
b . Type = madmin . Erasure
2020-12-21 18:35:19 +01:00
scParity := globalStorageClass . GetParityForSC ( storageclass . STANDARD )
2021-01-16 12:08:02 -08:00
if scParity <= 0 {
scParity = z . serverPools [ 0 ] . defaultParityCount
2020-12-21 18:35:19 +01:00
}
rrSCParity := globalStorageClass . GetParityForSC ( storageclass . RRS )
2021-01-22 12:09:24 -08:00
// Data blocks can vary per pool, but parity is same.
for _ , setDriveCount := range z . SetDriveCounts ( ) {
b . StandardSCData = append ( b . StandardSCData , setDriveCount - scParity )
b . RRSCData = append ( b . RRSCData , setDriveCount - rrSCParity )
}
b . StandardSCParity = scParity
2020-12-21 18:35:19 +01:00
b . RRSCParity = rrSCParity
return
}
2021-03-02 17:28:04 -08:00
func ( z * erasureServerPools ) LocalStorageInfo ( ctx context . Context ) ( StorageInfo , [ ] error ) {
var storageInfo StorageInfo
storageInfos := make ( [ ] StorageInfo , len ( z . serverPools ) )
storageInfosErrs := make ( [ ] [ ] error , len ( z . serverPools ) )
g := errgroup . WithNErrs ( len ( z . serverPools ) )
for index := range z . serverPools {
index := index
g . Go ( func ( ) error {
storageInfos [ index ] , storageInfosErrs [ index ] = z . serverPools [ index ] . LocalStorageInfo ( ctx )
return nil
} , index )
}
// Wait for the go routines.
g . Wait ( )
storageInfo . Backend = z . BackendInfo ( )
for _ , lstorageInfo := range storageInfos {
storageInfo . Disks = append ( storageInfo . Disks , lstorageInfo . Disks ... )
}
var errs [ ] error
for i := range z . serverPools {
errs = append ( errs , storageInfosErrs [ i ] ... )
}
return storageInfo , errs
}
2021-01-04 09:42:09 -08:00
func ( z * erasureServerPools ) StorageInfo ( ctx context . Context ) ( StorageInfo , [ ] error ) {
2019-11-19 17:42:27 -08:00
var storageInfo StorageInfo
2020-12-01 13:50:33 -08:00
storageInfos := make ( [ ] StorageInfo , len ( z . serverPools ) )
storageInfosErrs := make ( [ ] [ ] error , len ( z . serverPools ) )
g := errgroup . WithNErrs ( len ( z . serverPools ) )
for index := range z . serverPools {
2019-11-19 17:42:27 -08:00
index := index
g . Go ( func ( ) error {
2021-01-04 09:42:09 -08:00
storageInfos [ index ] , storageInfosErrs [ index ] = z . serverPools [ index ] . StorageInfo ( ctx )
2019-11-19 17:42:27 -08:00
return nil
} , index )
}
// Wait for the go routines.
g . Wait ( )
2020-12-21 18:35:19 +01:00
storageInfo . Backend = z . BackendInfo ( )
2019-11-19 17:42:27 -08:00
for _ , lstorageInfo := range storageInfos {
2020-07-13 09:51:07 -07:00
storageInfo . Disks = append ( storageInfo . Disks , lstorageInfo . Disks ... )
2020-10-22 13:36:24 -07:00
}
2020-05-28 13:03:04 -07:00
var errs [ ] error
2020-12-01 13:50:33 -08:00
for i := range z . serverPools {
2020-05-28 13:03:04 -07:00
errs = append ( errs , storageInfosErrs [ i ] ... )
}
return storageInfo , errs
2019-11-19 17:42:27 -08:00
}
2021-04-03 09:03:42 -07:00
func ( z * erasureServerPools ) NSScanner ( ctx context . Context , bf * bloomFilter , updates chan <- madmin . DataUsageInfo ) error {
2021-05-19 19:25:44 -07:00
// Updates must be closed before we return.
defer close ( updates )
2020-03-19 00:19:29 +01:00
ctx , cancel := context . WithCancel ( ctx )
defer cancel ( )
2020-06-12 20:04:01 -07:00
2019-12-12 15:02:37 +01:00
var wg sync . WaitGroup
2020-03-19 00:19:29 +01:00
var mu sync . Mutex
var results [ ] dataUsageCache
var firstErr error
2020-12-15 17:34:54 -08:00
allBuckets , err := z . ListBuckets ( ctx )
if err != nil {
return err
}
2021-01-07 09:52:53 -08:00
if len ( allBuckets ) == 0 {
2021-04-03 09:03:42 -07:00
updates <- madmin . DataUsageInfo { } // no buckets found update data usage to reflect latest state
2021-01-07 09:52:53 -08:00
return nil
}
2021-02-26 15:11:42 -08:00
// Scanner latest allBuckets first.
2020-12-15 17:34:54 -08:00
sort . Slice ( allBuckets , func ( i , j int ) bool {
return allBuckets [ i ] . Created . After ( allBuckets [ j ] . Created )
} )
2020-03-19 00:19:29 +01:00
2020-12-01 13:50:33 -08:00
// Collect for each set in serverPools.
for _ , z := range z . serverPools {
2020-09-24 09:53:38 -07:00
for _ , erObj := range z . sets {
2020-03-19 00:19:29 +01:00
wg . Add ( 1 )
results = append ( results , dataUsageCache { } )
2020-06-12 20:04:01 -07:00
go func ( i int , erObj * erasureObjects ) {
2020-03-19 00:19:29 +01:00
updates := make ( chan dataUsageCache , 1 )
defer close ( updates )
// Start update collector.
go func ( ) {
defer wg . Done ( )
for info := range updates {
mu . Lock ( )
results [ i ] = info
mu . Unlock ( )
}
} ( )
2021-02-17 12:04:11 -08:00
// Start scanner. Blocks until done.
2021-02-26 15:11:42 -08:00
err := erObj . nsScanner ( ctx , allBuckets , bf , updates )
2020-03-19 00:19:29 +01:00
if err != nil {
2020-06-12 10:28:21 -07:00
logger . LogIf ( ctx , err )
2020-03-19 00:19:29 +01:00
mu . Lock ( )
if firstErr == nil {
firstErr = err
}
// Cancel remaining...
cancel ( )
mu . Unlock ( )
return
2019-12-12 15:02:37 +01:00
}
2020-06-12 20:04:01 -07:00
} ( len ( results ) - 1 , erObj )
2019-12-12 15:02:37 +01:00
}
}
2020-03-19 00:19:29 +01:00
updateCloser := make ( chan chan struct { } )
go func ( ) {
updateTicker := time . NewTicker ( 30 * time . Second )
defer updateTicker . Stop ( )
var lastUpdate time . Time
2020-07-14 18:59:05 -07:00
2021-01-06 09:35:47 -08:00
// We need to merge since we will get the same buckets from each pool.
2020-07-14 18:59:05 -07:00
// Therefore to get the exact bucket sizes we must merge before we can convert.
2020-07-24 11:02:10 -07:00
var allMerged dataUsageCache
2020-07-14 18:59:05 -07:00
2020-03-19 00:19:29 +01:00
update := func ( ) {
mu . Lock ( )
defer mu . Unlock ( )
2020-07-24 11:02:10 -07:00
allMerged = dataUsageCache { Info : dataUsageCacheInfo { Name : dataUsageRoot } }
2020-03-19 00:19:29 +01:00
for _ , info := range results {
if info . Info . LastUpdate . IsZero ( ) {
// Not filled yet.
return
}
allMerged . merge ( info )
}
if allMerged . root ( ) != nil && allMerged . Info . LastUpdate . After ( lastUpdate ) {
updates <- allMerged . dui ( allMerged . Info . Name , allBuckets )
lastUpdate = allMerged . Info . LastUpdate
}
}
for {
select {
case <- ctx . Done ( ) :
return
case v := <- updateCloser :
update ( )
2020-07-14 18:59:05 -07:00
// Enforce quotas when all is done.
2020-08-24 10:15:46 -07:00
if firstErr == nil {
for _ , b := range allBuckets {
enforceFIFOQuotaBucket ( ctx , z , b . Name , allMerged . bucketUsageInfo ( b . Name ) )
}
2020-07-14 18:59:05 -07:00
}
2020-03-19 00:19:29 +01:00
close ( v )
return
case <- updateTicker . C :
update ( )
}
}
} ( )
2019-12-12 15:02:37 +01:00
wg . Wait ( )
2020-03-19 00:19:29 +01:00
ch := make ( chan struct { } )
2020-06-12 10:28:21 -07:00
select {
case updateCloser <- ch :
<- ch
case <- ctx . Done ( ) :
2020-08-24 10:15:46 -07:00
if firstErr == nil {
firstErr = ctx . Err ( )
}
2020-06-12 10:28:21 -07:00
}
2020-03-19 00:19:29 +01:00
return firstErr
2019-12-12 15:02:37 +01:00
}
2020-12-01 13:50:33 -08:00
// MakeBucketWithLocation - creates a new bucket across all serverPools simultaneously
2019-11-19 17:42:27 -08:00
// even if one of the sets fail to create buckets, we proceed all the successful
// operations.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) MakeBucketWithLocation ( ctx context . Context , bucket string , opts BucketOptions ) error {
g := errgroup . WithNErrs ( len ( z . serverPools ) )
2019-11-19 17:42:27 -08:00
2021-08-19 22:21:02 +02:00
// Lock the bucket name before creating.
lk := z . NewNSLock ( minioMetaTmpBucket , bucket + ".lck" )
lkctx , err := lk . GetLock ( ctx , globalOperationTimeout )
if err != nil {
return err
}
ctx = lkctx . Context ( )
defer lk . Unlock ( lkctx . Cancel )
2019-11-19 17:42:27 -08:00
// Create buckets in parallel across all sets.
2020-12-01 13:50:33 -08:00
for index := range z . serverPools {
2019-11-19 17:42:27 -08:00
index := index
g . Go ( func ( ) error {
2020-12-01 13:50:33 -08:00
return z . serverPools [ index ] . MakeBucketWithLocation ( ctx , bucket , opts )
2019-11-19 17:42:27 -08:00
} , index )
}
errs := g . Wait ( )
2020-05-12 23:20:42 +01:00
// Return the first encountered error
2019-11-19 17:42:27 -08:00
for _ , err := range errs {
if err != nil {
return err
}
}
2020-05-19 13:53:54 -07:00
// If it doesn't exist we get a new, so ignore errors
2020-05-20 10:18:15 -07:00
meta := newBucketMetadata ( bucket )
2020-06-12 20:04:01 -07:00
if opts . LockEnabled {
meta . VersioningConfigXML = enabledBucketVersioningConfig
2020-05-21 11:03:59 -07:00
meta . ObjectLockConfigXML = enabledBucketObjectLockConfig
2020-05-20 10:18:15 -07:00
}
2020-06-12 20:04:01 -07:00
2020-05-19 13:53:54 -07:00
if err := meta . Save ( ctx , z ) ; err != nil {
return toObjectErr ( err , bucket )
2020-05-08 13:44:44 -07:00
}
2020-06-12 20:04:01 -07:00
2020-05-19 13:53:54 -07:00
globalBucketMetadataSys . Set ( bucket , meta )
2020-05-08 13:44:44 -07:00
2019-11-19 17:42:27 -08:00
// Success.
return nil
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) GetObjectNInfo ( ctx context . Context , bucket , object string , rs * HTTPRangeSpec , h http . Header , lockType LockType , opts ObjectOptions ) ( gr * GetObjectReader , err error ) {
2020-10-06 12:03:57 -07:00
if err = checkGetObjArgs ( ctx , bucket , object ) ; err != nil {
return nil , err
}
2020-09-19 08:39:41 -07:00
object = encodeDirObject ( object )
2021-02-16 02:43:47 -08:00
if z . SinglePool ( ) {
return z . serverPools [ 0 ] . GetObjectNInfo ( ctx , bucket , object , rs , h , lockType , opts )
}
var unlockOnDefer bool
var nsUnlocker = func ( ) { }
defer func ( ) {
if unlockOnDefer {
nsUnlocker ( )
}
} ( )
// Acquire lock
if lockType != noLock {
lock := z . NewNSLock ( bucket , object )
switch lockType {
case writeLock :
2021-04-29 20:55:21 -07:00
lkctx , err := lock . GetLock ( ctx , globalOperationTimeout )
2021-03-04 03:36:43 +01:00
if err != nil {
2021-02-16 02:43:47 -08:00
return nil , err
}
2021-04-29 20:55:21 -07:00
ctx = lkctx . Context ( )
nsUnlocker = func ( ) { lock . Unlock ( lkctx . Cancel ) }
2021-02-16 02:43:47 -08:00
case readLock :
2021-04-29 20:55:21 -07:00
lkctx , err := lock . GetRLock ( ctx , globalOperationTimeout )
2021-03-04 03:36:43 +01:00
if err != nil {
2021-02-16 02:43:47 -08:00
return nil , err
}
2021-04-29 20:55:21 -07:00
ctx = lkctx . Context ( )
nsUnlocker = func ( ) { lock . RUnlock ( lkctx . Cancel ) }
2021-02-16 02:43:47 -08:00
}
unlockOnDefer = true
}
2021-06-10 23:07:16 -07:00
checkPrecondFn := opts . CheckPrecondFn
2021-06-24 09:44:00 -07:00
opts . CheckPrecondFn = nil // do not need to apply pre-conditions at lower layer.
opts . NoLock = true // no locks needed at lower levels for getObjectInfo()
objInfo , zIdx , err := z . getLatestObjectInfoWithIdx ( ctx , bucket , object , opts )
if err != nil {
if objInfo . DeleteMarker {
if opts . VersionID == "" {
return & GetObjectReader {
ObjInfo : objInfo ,
} , toObjectErr ( errFileNotFound , bucket , object )
2019-11-19 17:42:27 -08:00
}
2021-06-24 09:44:00 -07:00
// Make sure to return object info to provide extra information.
return & GetObjectReader {
ObjInfo : objInfo ,
} , toObjectErr ( errMethodNotAllowed , bucket , object )
2021-06-10 23:07:16 -07:00
}
2021-06-24 09:44:00 -07:00
return nil , err
2021-06-14 20:00:13 +02:00
}
2021-06-24 09:44:00 -07:00
// check preconditions before reading the stream.
if checkPrecondFn != nil && checkPrecondFn ( objInfo ) {
2021-06-14 20:00:13 +02:00
return nil , PreConditionFailed { }
2021-02-16 02:43:47 -08:00
}
2021-06-24 09:44:00 -07:00
lockType = noLock // do not take locks at lower levels for GetObjectNInfo()
return z . serverPools [ zIdx ] . GetObjectNInfo ( ctx , bucket , object , rs , h , lockType , opts )
2019-11-19 17:42:27 -08:00
}
2021-06-24 09:44:00 -07:00
// getLatestObjectInfoWithIdx returns the objectInfo of the latest object from multiple pools (this function
// is present in-case there were duplicate writes to both pools, this function also returns the
// additional index where the latest object exists, that is used to start the GetObject stream.
func ( z * erasureServerPools ) getLatestObjectInfoWithIdx ( ctx context . Context , bucket , object string , opts ObjectOptions ) ( ObjectInfo , int , error ) {
2020-09-19 08:39:41 -07:00
object = encodeDirObject ( object )
2021-06-14 20:00:13 +02:00
results := make ( [ ] struct {
zIdx int
oi ObjectInfo
err error
} , len ( z . serverPools ) )
2021-02-16 02:43:47 -08:00
var wg sync . WaitGroup
for i , pool := range z . serverPools {
wg . Add ( 1 )
go func ( i int , pool * erasureSets ) {
defer wg . Done ( )
2021-06-14 20:00:13 +02:00
results [ i ] . zIdx = i
results [ i ] . oi , results [ i ] . err = pool . GetObjectInfo ( ctx , bucket , object , opts )
2021-02-16 02:43:47 -08:00
} ( i , pool )
}
wg . Wait ( )
2021-06-10 23:07:16 -07:00
// Sort the objInfos such that we always serve latest
// this is a defensive change to handle any duplicate
// content that may have been created, we always serve
// the latest object.
2021-06-14 20:00:13 +02:00
sort . Slice ( results , func ( i , j int ) bool {
a , b := results [ i ] , results [ j ]
if a . oi . ModTime . Equal ( b . oi . ModTime ) {
// On tiebreak, select the lowest zone index.
return a . zIdx < b . zIdx
}
return a . oi . ModTime . After ( b . oi . ModTime )
} )
2021-06-24 09:44:00 -07:00
2021-06-14 20:00:13 +02:00
for _ , res := range results {
err := res . err
2021-02-16 02:43:47 -08:00
if err == nil {
2021-06-24 09:44:00 -07:00
return res . oi , res . zIdx , nil
2021-02-16 02:43:47 -08:00
}
if ! isErrObjectNotFound ( err ) && ! isErrVersionNotFound ( err ) {
// some errors such as MethodNotAllowed for delete marker
// should be returned upwards.
2021-06-24 09:44:00 -07:00
return res . oi , res . zIdx , err
2019-11-19 17:42:27 -08:00
}
}
2021-02-16 02:43:47 -08:00
2020-09-19 08:39:41 -07:00
object = decodeDirObject ( object )
2020-07-02 16:17:27 -07:00
if opts . VersionID != "" {
2021-06-24 09:44:00 -07:00
return ObjectInfo { } , - 1 , VersionNotFound { Bucket : bucket , Object : object , VersionID : opts . VersionID }
2020-07-02 16:17:27 -07:00
}
2021-06-24 09:44:00 -07:00
return ObjectInfo { } , - 1 , ObjectNotFound { Bucket : bucket , Object : object }
}
func ( z * erasureServerPools ) GetObjectInfo ( ctx context . Context , bucket , object string , opts ObjectOptions ) ( objInfo ObjectInfo , err error ) {
if err = checkGetObjArgs ( ctx , bucket , object ) ; err != nil {
return objInfo , err
}
object = encodeDirObject ( object )
if z . SinglePool ( ) {
return z . serverPools [ 0 ] . GetObjectInfo ( ctx , bucket , object , opts )
}
if ! opts . NoLock {
opts . NoLock = true // avoid taking locks at lower levels for multi-pool setups.
// Lock the object before reading.
lk := z . NewNSLock ( bucket , object )
lkctx , err := lk . GetRLock ( ctx , globalOperationTimeout )
if err != nil {
return ObjectInfo { } , err
}
ctx = lkctx . Context ( )
defer lk . RUnlock ( lkctx . Cancel )
}
objInfo , _ , err = z . getLatestObjectInfoWithIdx ( ctx , bucket , object , opts )
return objInfo , err
2019-11-19 17:42:27 -08:00
}
2021-01-06 09:35:47 -08:00
// PutObject - writes an object to least used erasure pool.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) PutObject ( ctx context . Context , bucket string , object string , data * PutObjReader , opts ObjectOptions ) ( ObjectInfo , error ) {
2020-10-06 12:03:57 -07:00
// Validate put object input args.
if err := checkPutObjectArgs ( ctx , bucket , object , z ) ; err != nil {
return ObjectInfo { } , err
}
2020-09-19 08:39:41 -07:00
object = encodeDirObject ( object )
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2021-06-07 17:13:15 +02:00
if ! isMinioMetaBucketName ( bucket ) && ! hasSpaceFor ( getDiskInfos ( ctx , z . serverPools [ 0 ] . getHashedSet ( object ) . getDisks ( ) ) , data . Size ( ) ) {
return ObjectInfo { } , toObjectErr ( errDiskFull )
}
2020-12-01 13:50:33 -08:00
return z . serverPools [ 0 ] . PutObject ( ctx , bucket , object , data , opts )
2019-11-19 17:42:27 -08:00
}
2021-06-21 09:25:10 -07:00
if ! opts . NoLock {
ns := z . NewNSLock ( minioMetaMultipartBucket , pathJoin ( bucket , object , "newMultipartObject.lck" ) )
lkctx , err := ns . GetLock ( ctx , globalOperationTimeout )
if err != nil {
return ObjectInfo { } , err
}
ctx = lkctx . Context ( )
defer ns . Unlock ( lkctx . Cancel )
opts . NoLock = true
}
2019-11-19 17:42:27 -08:00
2021-02-10 11:45:02 -08:00
idx , err := z . getPoolIdx ( ctx , bucket , object , data . Size ( ) )
2020-06-17 08:33:14 -07:00
if err != nil {
return ObjectInfo { } , err
2019-11-19 17:42:27 -08:00
}
2020-06-17 08:33:14 -07:00
2021-01-06 09:35:47 -08:00
// Overwrite the object at the right pool
2020-12-01 13:50:33 -08:00
return z . serverPools [ idx ] . PutObject ( ctx , bucket , object , data , opts )
2019-11-19 17:42:27 -08:00
}
2021-06-16 02:43:14 +01:00
func ( z * erasureServerPools ) deletePrefix ( ctx context . Context , bucket string , prefix string ) error {
for _ , zone := range z . serverPools {
_ , err := zone . DeleteObject ( ctx , bucket , prefix , ObjectOptions { DeletePrefix : true } )
if err != nil {
return err
}
}
return nil
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) DeleteObject ( ctx context . Context , bucket string , object string , opts ObjectOptions ) ( objInfo ObjectInfo , err error ) {
2020-10-06 12:03:57 -07:00
if err = checkDelObjArgs ( ctx , bucket , object ) ; err != nil {
return objInfo , err
}
2021-06-16 02:43:14 +01:00
if opts . DeletePrefix {
err := z . deletePrefix ( ctx , bucket , object )
return ObjectInfo { } , err
}
2020-09-19 08:39:41 -07:00
object = encodeDirObject ( object )
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
return z . serverPools [ 0 ] . DeleteObject ( ctx , bucket , object , opts )
2019-11-19 17:42:27 -08:00
}
2021-02-08 18:12:28 -08:00
2021-03-16 19:02:20 +01:00
idx , err := z . getPoolIdxExisting ( ctx , bucket , object )
2021-02-08 18:12:28 -08:00
if err != nil {
return objInfo , err
2019-11-19 17:42:27 -08:00
}
2020-10-08 12:32:32 -07:00
2021-02-08 18:12:28 -08:00
return z . serverPools [ idx ] . DeleteObject ( ctx , bucket , object , opts )
2019-11-19 17:42:27 -08:00
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) DeleteObjects ( ctx context . Context , bucket string , objects [ ] ObjectToDelete , opts ObjectOptions ) ( [ ] DeletedObject , [ ] error ) {
2019-11-19 17:42:27 -08:00
derrs := make ( [ ] error , len ( objects ) )
2020-06-12 20:04:01 -07:00
dobjects := make ( [ ] DeletedObject , len ( objects ) )
2020-06-18 10:25:07 -07:00
objSets := set . NewStringSet ( )
2019-11-19 17:42:27 -08:00
for i := range derrs {
2020-09-19 08:39:41 -07:00
objects [ i ] . ObjectName = encodeDirObject ( objects [ i ] . ObjectName )
2020-06-12 20:04:01 -07:00
derrs [ i ] = checkDelObjArgs ( ctx , bucket , objects [ i ] . ObjectName )
2020-06-18 10:25:07 -07:00
objSets . Add ( objects [ i ] . ObjectName )
2019-11-19 17:42:27 -08:00
}
2021-08-17 07:50:00 -07:00
// Acquire a bulk write lock across 'objects'
multiDeleteLock := z . NewNSLock ( bucket , objSets . ToSlice ( ) ... )
lkctx , err := multiDeleteLock . GetLock ( ctx , globalOperationTimeout )
if err != nil {
for i := range derrs {
derrs [ i ] = err
}
return dobjects , derrs
}
ctx = lkctx . Context ( )
defer multiDeleteLock . Unlock ( lkctx . Cancel )
if z . SinglePool ( ) {
return z . serverPools [ 0 ] . DeleteObjects ( ctx , bucket , objects , opts )
}
// Fetch location of up to 10 objects concurrently.
2021-02-10 14:25:43 -08:00
poolObjIdxMap := map [ int ] [ ] ObjectToDelete { }
origIndexMap := map [ int ] [ ] int { }
2021-08-17 07:50:00 -07:00
var mu sync . Mutex
eg := errgroup . WithNErrs ( len ( objects ) ) . WithConcurrency ( 10 )
cctx , cancel := eg . WithCancelOnError ( ctx )
defer cancel ( )
for j , obj := range objects {
j := j
obj := obj
eg . Go ( func ( ) error {
idx , err := z . getPoolIdxExistingNoLock ( cctx , bucket , obj . ObjectName )
2021-03-16 19:02:20 +01:00
if isErrObjectNotFound ( err ) {
derrs [ j ] = err
2021-08-17 07:50:00 -07:00
return nil
2021-03-16 19:02:20 +01:00
}
2021-02-10 14:25:43 -08:00
if err != nil {
2021-08-17 07:50:00 -07:00
// unhandled errors return right here.
return err
2021-02-10 14:25:43 -08:00
}
2021-08-17 07:50:00 -07:00
mu . Lock ( )
2021-02-10 14:25:43 -08:00
poolObjIdxMap [ idx ] = append ( poolObjIdxMap [ idx ] , obj )
origIndexMap [ idx ] = append ( origIndexMap [ idx ] , j )
2021-08-17 07:50:00 -07:00
mu . Unlock ( )
return nil
} , j )
2021-02-10 14:25:43 -08:00
}
2021-08-17 07:50:00 -07:00
if err := eg . WaitErr ( ) ; err != nil {
2020-06-12 20:04:01 -07:00
for i := range derrs {
derrs [ i ] = err
}
DeletedObjects: Return objects on lock failure (#10874)
Return objects when locking fails.
<details>
<summary>Panic</summary>
```
: 2020/11/10 04:15:55 http: panic serving 10.10.62.153:44858: runtime error: index out of range [0] with length 0
: goroutine 363537270 [running]:
: net/http.(*conn).serve.func1(0xc019232780)
: net/http/server.go:1801 +0x147
: panic(0x1cadd60, 0xc001719260)
: runtime/panic.go:975 +0x47a
: github.com/minio/minio/cmd.criticalErrorHandler.ServeHTTP.func1(0xc0121d1200, 0x210cda0, 0xc0141940e0)
: github.com/minio/minio/cmd/generic-handlers.go:781 +0x1a8
: panic(0x1cadd60, 0xc001719260)
: runtime/panic.go:969 +0x1b9
: github.com/minio/minio/cmd.objectAPIHandlers.DeleteMultipleObjectsHandler(0x1e71ce8, 0x1e71cc8, 0x2108420, 0xc0192328c0, 0xc0121d1400)
: github.com/minio/minio/cmd/bucket-handlers.go:465 +0x2490
: net/http.HandlerFunc.ServeHTTP(...)
: net/http/server.go:2042
: github.com/minio/minio/cmd.httpTraceAll.func1(0x2108420, 0xc0192328c0, 0xc0121d1400)
: github.com/minio/minio/cmd/handler-utils.go:353 +0x158
: net/http.HandlerFunc.ServeHTTP(...)
: net/http/server.go:2042
: github.com/minio/minio/cmd.collectAPIStats.func1(0x2108420, 0xc019232820, 0xc0121d1400)
: github.com/minio/minio/cmd/handler-utils.go:380 +0xed
: net/http.HandlerFunc.ServeHTTP(...)
: net/http/server.go:2042
: github.com/minio/minio/cmd.maxClients.func1(0x2108420, 0xc019232820, 0xc0121d1400)
: github.com/minio/minio/cmd/handler-api.go:132 +0x33b
: net/http.HandlerFunc.ServeHTTP(0xc00271d590, 0x2108420, 0xc019232820, 0xc0121d1400)
: net/http/server.go:2042 +0x44
: github.com/minio/minio/cmd.redirectHandler.ServeHTTP(0x20e2180, 0xc00271d590, 0x2108420, 0xc019232820, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:192 +0x156
: github.com/minio/minio/cmd.customHeaderHandler.ServeHTTP(0x20e1060, 0xc0141a22b0, 0x21083e0, 0xc01814d2e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:751 +0x162
: github.com/minio/minio/cmd.securityHeaderHandler.ServeHTTP(0x20e0fc0, 0xc0141a22c0, 0x21083e0, 0xc01814d2e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:766 +0x1d6
: github.com/minio/minio/cmd.bucketForwardingHandler.ServeHTTP(0xc0121c7a40, 0x20e1120, 0xc0141a22d0, 0x21083e0, 0xc01814d2e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:624 +0xbf
: github.com/minio/minio/cmd.requestValidityHandler.ServeHTTP(0x20e0f20, 0xc01814d280, 0x21083e0, 0xc01814d2e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:608 +0x42a
: github.com/minio/minio/cmd.httpStatsHandler.ServeHTTP(0x20e10c0, 0xc0141a2300, 0x210cda0, 0xc0141940e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:536 +0xe4
: github.com/minio/minio/cmd.requestSizeLimitHandler.ServeHTTP(0x20e0fe0, 0xc0141a2310, 0x50004000000, 0x210cda0, 0xc0141940e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:68 +0xd4
: github.com/minio/minio/cmd.requestHeaderSizeLimitHandler.ServeHTTP(0x20e10a0, 0xc01814d2a0, 0x210cda0, 0xc0141940e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:93 +0x1b7
: github.com/minio/minio/cmd.crossDomainPolicy.ServeHTTP(0x20e1080, 0xc0141a2320, 0x210cda0, 0xc0141940e0, 0xc0121d1400)
: github.com/minio/minio/cmd/crossdomain-xml-handler.go:51 +0x82
: github.com/minio/minio/cmd.browserRedirectHandler.ServeHTTP(0x20e0fa0, 0xc0141a2330, 0x210cda0, 0xc0141940e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:276 +0x68
: github.com/minio/minio/cmd.minioReservedBucketHandler.ServeHTTP(0x20e0f00, 0xc0141a2340, 0x210cda0, 0xc0141940e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:344 +0xb8
: github.com/minio/minio/cmd.cacheControlHandler.ServeHTTP(0x20e1020, 0xc0141a2350, 0x210cda0, 0xc0141940e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:303 +0x1ce
: github.com/minio/minio/cmd.timeValidityHandler.ServeHTTP(0x20e0f40, 0xc0141a2360, 0x210cda0, 0xc0141940e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:414 +0x3ca
: github.com/minio/minio/cmd.resourceHandler.ServeHTTP(0x20e1160, 0xc0141a2370, 0x210cda0, 0xc0141940e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:516 +0xab
: github.com/minio/minio/cmd.authHandler.ServeHTTP(0x20e1100, 0xc0141a2380, 0x210cda0, 0xc0141940e0, 0xc0121d1400)
: github.com/minio/minio/cmd/auth-handler.go:502 +0x2e7
: github.com/minio/minio/cmd.sseTLSHandler.ServeHTTP(0x20e0ee0, 0xc0141a2390, 0x210cda0, 0xc0141940e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:802 +0x79
: github.com/minio/minio/cmd.reservedMetadataHandler.ServeHTTP(0x20e1140, 0xc0141a23a0, 0x210cda0, 0xc0141940e0, 0xc0121d1400)
: github.com/minio/minio/cmd/generic-handlers.go:139 +0x1b7
: github.com/gorilla/mux.(*Router).ServeHTTP(0xc00073fb00, 0x210cda0, 0xc0141940e0, 0xc0121d1200)
: github.com/gorilla/mux@v1.8.0/mux.go:210 +0xd3
: github.com/rs/cors.(*Cors).Handler.func1(0x210cda0, 0xc0141940e0, 0xc0121d1200)
: github.com/rs/cors@v1.7.0/cors.go:219 +0x1b9
: net/http.HandlerFunc.ServeHTTP(0xc0009aece0, 0x210cda0, 0xc0141940e0, 0xc0121d1200)
: net/http/server.go:2042 +0x44
: github.com/minio/minio/cmd.criticalErrorHandler.ServeHTTP(0x20e2180, 0xc0009aece0, 0x210cda0, 0xc0141940e0, 0xc0121d1200)
: github.com/minio/minio/cmd/generic-handlers.go:784 +0x85
: github.com/minio/minio/cmd/http.(*Server).Start.func1(0x210cda0, 0xc0141940e0, 0xc0121d1200)
: github.com/minio/minio/cmd/http/server.go:101 +0x258
: net/http.HandlerFunc.ServeHTTP(0xc000dc4080, 0x210cda0, 0xc0141940e0, 0xc0121d1200)
: net/http/server.go:2042 +0x44
: net/http.serverHandler.ServeHTTP(0xc000764c60, 0x210cda0, 0xc0141940e0, 0xc0121d1200)
: net/http/server.go:2843 +0xa3
: net/http.(*conn).serve(0xc019232780, 0x2114720, 0xc03381f6c0)
: net/http/server.go:1925 +0x8ad
: created by net/http.(*Server).Serve
: net/http/server.go:2969 +0x36c
```
</details>
2020-11-11 09:14:32 -08:00
return dobjects , derrs
2019-11-19 17:42:27 -08:00
}
2020-11-28 21:15:45 -08:00
2021-08-17 07:50:00 -07:00
// Delete concurrently in all server pools.
var wg sync . WaitGroup
wg . Add ( len ( z . serverPools ) )
2021-02-10 14:25:43 -08:00
for idx , pool := range z . serverPools {
2021-08-17 07:50:00 -07:00
go func ( idx int , pool * erasureSets ) {
defer wg . Done ( )
objs := poolObjIdxMap [ idx ]
if len ( objs ) > 0 {
orgIndexes := origIndexMap [ idx ]
deletedObjects , errs := pool . DeleteObjects ( ctx , bucket , objs , opts )
mu . Lock ( )
for i , derr := range errs {
if derr != nil {
derrs [ orgIndexes [ i ] ] = derr
}
dobjects [ orgIndexes [ i ] ] = deletedObjects [ i ]
}
mu . Unlock ( )
2020-06-12 20:04:01 -07:00
}
2021-08-17 07:50:00 -07:00
} ( idx , pool )
2019-11-19 17:42:27 -08:00
}
2021-08-17 07:50:00 -07:00
wg . Wait ( )
2020-06-12 20:04:01 -07:00
return dobjects , derrs
2019-11-19 17:42:27 -08:00
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) CopyObject ( ctx context . Context , srcBucket , srcObject , dstBucket , dstObject string , srcInfo ObjectInfo , srcOpts , dstOpts ObjectOptions ) ( objInfo ObjectInfo , err error ) {
2020-09-19 08:39:41 -07:00
srcObject = encodeDirObject ( srcObject )
dstObject = encodeDirObject ( dstObject )
2020-05-28 14:36:38 -07:00
cpSrcDstSame := isStringEqual ( pathJoin ( srcBucket , srcObject ) , pathJoin ( dstBucket , dstObject ) )
2019-11-19 17:42:27 -08:00
2021-06-21 09:25:10 -07:00
if ! dstOpts . NoLock {
ns := z . NewNSLock ( minioMetaMultipartBucket , pathJoin ( dstBucket , dstObject , "newMultipartObject.lck" ) )
lkctx , err := ns . GetLock ( ctx , globalOperationTimeout )
if err != nil {
return ObjectInfo { } , err
}
ctx = lkctx . Context ( )
defer ns . Unlock ( lkctx . Cancel )
dstOpts . NoLock = true
}
2021-02-10 11:45:02 -08:00
poolIdx , err := z . getPoolIdx ( ctx , dstBucket , dstObject , srcInfo . Size )
2020-06-17 08:33:14 -07:00
if err != nil {
return objInfo , err
2020-05-28 14:36:38 -07:00
}
2020-08-03 16:21:10 -07:00
if cpSrcDstSame && srcInfo . metadataOnly {
2020-09-14 15:57:13 -07:00
// Version ID is set for the destination and source == destination version ID.
2020-06-19 08:44:51 -07:00
if dstOpts . VersionID != "" && srcOpts . VersionID == dstOpts . VersionID {
2021-01-06 09:35:47 -08:00
return z . serverPools [ poolIdx ] . CopyObject ( ctx , srcBucket , srcObject , dstBucket , dstObject , srcInfo , srcOpts , dstOpts )
2020-06-19 08:44:51 -07:00
}
2020-09-14 15:57:13 -07:00
// Destination is not versioned and source version ID is empty
// perform an in-place update.
2020-06-19 08:44:51 -07:00
if ! dstOpts . Versioned && srcOpts . VersionID == "" {
2021-01-06 09:35:47 -08:00
return z . serverPools [ poolIdx ] . CopyObject ( ctx , srcBucket , srcObject , dstBucket , dstObject , srcInfo , srcOpts , dstOpts )
2020-06-19 08:44:51 -07:00
}
2020-09-14 15:57:13 -07:00
// Destination is versioned, source is not destination version,
// as a special case look for if the source object is not legacy
// from older format, for older format we will rewrite them as
// newer using PutObject() - this is an optimization to save space
2020-08-03 16:21:10 -07:00
if dstOpts . Versioned && srcOpts . VersionID != dstOpts . VersionID && ! srcInfo . Legacy {
// CopyObject optimization where we don't create an entire copy
// of the content, instead we add a reference.
srcInfo . versionOnly = true
2021-01-06 09:35:47 -08:00
return z . serverPools [ poolIdx ] . CopyObject ( ctx , srcBucket , srcObject , dstBucket , dstObject , srcInfo , srcOpts , dstOpts )
2020-08-03 16:21:10 -07:00
}
2020-06-19 08:44:51 -07:00
}
2020-06-17 11:13:41 -07:00
putOpts := ObjectOptions {
ServerSideEncryption : dstOpts . ServerSideEncryption ,
UserDefined : srcInfo . UserDefined ,
Versioned : dstOpts . Versioned ,
VersionID : dstOpts . VersionID ,
2020-11-19 11:50:22 -08:00
MTime : dstOpts . MTime ,
2020-06-17 11:13:41 -07:00
}
2021-01-06 09:35:47 -08:00
return z . serverPools [ poolIdx ] . PutObject ( ctx , dstBucket , dstObject , srcInfo . PutObjReader , putOpts )
2019-11-19 17:42:27 -08:00
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) ListObjectsV2 ( ctx context . Context , bucket , prefix , continuationToken , delimiter string , maxKeys int , fetchOwner bool , startAfter string ) ( ListObjectsV2Info , error ) {
2019-11-19 17:42:27 -08:00
marker := continuationToken
if marker == "" {
marker = startAfter
}
loi , err := z . ListObjects ( ctx , bucket , prefix , marker , delimiter , maxKeys )
if err != nil {
return ListObjectsV2Info { } , err
}
listObjectsV2Info := ListObjectsV2Info {
IsTruncated : loi . IsTruncated ,
ContinuationToken : continuationToken ,
NextContinuationToken : loi . NextMarker ,
Objects : loi . Objects ,
Prefixes : loi . Prefixes ,
}
return listObjectsV2Info , err
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) ListObjectVersions ( ctx context . Context , bucket , prefix , marker , versionMarker , delimiter string , maxKeys int ) ( ListObjectVersionsInfo , error ) {
2020-06-12 20:04:01 -07:00
loi := ListObjectVersionsInfo { }
if marker == "" && versionMarker != "" {
return loi , NotImplemented { }
}
2020-11-13 16:58:20 -08:00
opts := listPathOptions {
2020-10-28 09:18:35 -07:00
Bucket : bucket ,
Prefix : prefix ,
Separator : delimiter ,
2021-03-01 08:12:02 -08:00
Limit : maxKeysPlusOne ( maxKeys , marker != "" ) ,
2020-10-28 09:18:35 -07:00
Marker : marker ,
InclDeleted : true ,
2020-11-03 08:53:48 -08:00
AskDisks : globalAPIConfig . getListQuorum ( ) ,
2020-11-13 16:58:20 -08:00
}
2021-07-05 15:34:41 -07:00
merged , err := z . listPath ( ctx , & opts )
2020-10-28 09:18:35 -07:00
if err != nil && err != io . EOF {
return loi , err
2020-06-12 20:04:01 -07:00
}
2021-08-13 20:39:27 +02:00
defer merged . truncate ( 0 ) // Release when returning
2021-03-01 08:12:02 -08:00
if versionMarker == "" {
2021-07-05 15:34:41 -07:00
o := listPathOptions { Marker : marker }
2021-03-01 08:12:02 -08:00
// If we are not looking for a specific version skip it.
2021-07-05 15:34:41 -07:00
o . parseMarker ( )
merged . forwardPast ( o . Marker )
2021-03-01 08:12:02 -08:00
}
2020-12-19 09:36:04 -08:00
objects := merged . fileInfoVersions ( bucket , prefix , delimiter , versionMarker )
loi . IsTruncated = err == nil && len ( objects ) > 0
if maxKeys > 0 && len ( objects ) > maxKeys {
objects = objects [ : maxKeys ]
2020-10-28 09:18:35 -07:00
loi . IsTruncated = true
2020-06-12 20:04:01 -07:00
}
2020-12-19 09:36:04 -08:00
for _ , obj := range objects {
2021-02-05 16:24:40 -08:00
if obj . IsDir && obj . ModTime . IsZero ( ) && delimiter != "" {
2020-12-19 09:36:04 -08:00
loi . Prefixes = append ( loi . Prefixes , obj . Name )
} else {
loi . Objects = append ( loi . Objects , obj )
}
}
2020-06-12 20:04:01 -07:00
if loi . IsTruncated {
2020-12-19 09:36:04 -08:00
last := objects [ len ( objects ) - 1 ]
2021-07-05 15:34:41 -07:00
loi . NextMarker = opts . encodeMarker ( last . Name )
2020-10-28 09:18:35 -07:00
loi . NextVersionIDMarker = last . VersionID
2019-11-19 17:42:27 -08:00
}
2020-10-28 09:18:35 -07:00
return loi , nil
}
2019-11-19 17:42:27 -08:00
2021-03-01 08:12:02 -08:00
func maxKeysPlusOne ( maxKeys int , addOne bool ) int {
if maxKeys < 0 || maxKeys > maxObjectList {
maxKeys = maxObjectList
}
if addOne {
maxKeys ++
}
return maxKeys
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) ListObjects ( ctx context . Context , bucket , prefix , marker , delimiter string , maxKeys int ) ( ListObjectsInfo , error ) {
2020-10-28 09:18:35 -07:00
var loi ListObjectsInfo
2021-08-18 18:05:05 -07:00
if len ( prefix ) > 0 && maxKeys == 1 && delimiter == "" && marker == "" {
// Optimization for certain applications like
// - Cohesity
// - Actifio, Splunk etc.
// which send ListObjects requests where the actual object
// itself is the prefix and max-keys=1 in such scenarios
// we can simply verify locally if such an object exists
// to avoid the need for ListObjects().
objInfo , err := z . GetObjectInfo ( ctx , bucket , prefix , ObjectOptions { NoLock : true } )
if err == nil {
loi . Objects = append ( loi . Objects , objInfo )
return loi , nil
}
}
2021-07-05 15:34:41 -07:00
opts := listPathOptions {
2020-10-28 09:18:35 -07:00
Bucket : bucket ,
Prefix : prefix ,
Separator : delimiter ,
2021-03-01 08:12:02 -08:00
Limit : maxKeysPlusOne ( maxKeys , marker != "" ) ,
2020-10-28 09:18:35 -07:00
Marker : marker ,
InclDeleted : false ,
2020-11-03 08:53:48 -08:00
AskDisks : globalAPIConfig . getListQuorum ( ) ,
2021-07-05 15:34:41 -07:00
}
merged , err := z . listPath ( ctx , & opts )
2020-10-28 09:18:35 -07:00
if err != nil && err != io . EOF {
logger . LogIf ( ctx , err )
return loi , err
2020-06-12 20:04:01 -07:00
}
2021-07-05 15:34:41 -07:00
merged . forwardPast ( opts . Marker )
2021-08-13 20:39:27 +02:00
defer merged . truncate ( 0 ) // Release when returning
2020-12-19 09:36:04 -08:00
2020-10-28 09:18:35 -07:00
// Default is recursive, if delimiter is set then list non recursive.
2020-12-19 09:36:04 -08:00
objects := merged . fileInfos ( bucket , prefix , delimiter )
loi . IsTruncated = err == nil && len ( objects ) > 0
if maxKeys > 0 && len ( objects ) > maxKeys {
objects = objects [ : maxKeys ]
loi . IsTruncated = true
}
for _ , obj := range objects {
2021-02-05 16:24:40 -08:00
if obj . IsDir && obj . ModTime . IsZero ( ) && delimiter != "" {
2020-12-19 09:36:04 -08:00
loi . Prefixes = append ( loi . Prefixes , obj . Name )
} else {
loi . Objects = append ( loi . Objects , obj )
}
}
2020-06-12 20:04:01 -07:00
if loi . IsTruncated {
2020-12-19 09:36:04 -08:00
last := objects [ len ( objects ) - 1 ]
2021-07-05 15:34:41 -07:00
loi . NextMarker = opts . encodeMarker ( last . Name )
2020-06-12 20:04:01 -07:00
}
return loi , nil
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) ListMultipartUploads ( ctx context . Context , bucket , prefix , keyMarker , uploadIDMarker , delimiter string , maxUploads int ) ( ListMultipartsInfo , error ) {
2020-05-19 13:53:54 -07:00
if err := checkListMultipartArgs ( ctx , bucket , prefix , keyMarker , uploadIDMarker , delimiter , z ) ; err != nil {
return ListMultipartsInfo { } , err
}
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
return z . serverPools [ 0 ] . ListMultipartUploads ( ctx , bucket , prefix , keyMarker , uploadIDMarker , delimiter , maxUploads )
2019-11-19 17:42:27 -08:00
}
2020-05-19 13:53:54 -07:00
2021-01-06 09:35:47 -08:00
var poolResult = ListMultipartsInfo { }
poolResult . MaxUploads = maxUploads
poolResult . KeyMarker = keyMarker
poolResult . Prefix = prefix
poolResult . Delimiter = delimiter
for _ , pool := range z . serverPools {
result , err := pool . ListMultipartUploads ( ctx , bucket , prefix , keyMarker , uploadIDMarker ,
2019-11-19 17:42:27 -08:00
delimiter , maxUploads )
if err != nil {
return result , err
}
2021-01-06 09:35:47 -08:00
poolResult . Uploads = append ( poolResult . Uploads , result . Uploads ... )
2019-11-19 17:42:27 -08:00
}
2021-01-06 09:35:47 -08:00
return poolResult , nil
2019-11-19 17:42:27 -08:00
}
// Initiate a new multipart upload on a hashedSet based on object name.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) NewMultipartUpload ( ctx context . Context , bucket , object string , opts ObjectOptions ) ( string , error ) {
2020-05-19 13:53:54 -07:00
if err := checkNewMultipartArgs ( ctx , bucket , object , z ) ; err != nil {
return "" , err
}
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2021-06-07 17:13:15 +02:00
if ! isMinioMetaBucketName ( bucket ) && ! hasSpaceFor ( getDiskInfos ( ctx , z . serverPools [ 0 ] . getHashedSet ( object ) . getDisks ( ) ) , - 1 ) {
return "" , toObjectErr ( errDiskFull )
}
2020-12-01 13:50:33 -08:00
return z . serverPools [ 0 ] . NewMultipartUpload ( ctx , bucket , object , opts )
2019-11-19 17:42:27 -08:00
}
2020-06-17 08:33:14 -07:00
2021-05-06 10:45:33 -07:00
ns := z . NewNSLock ( minioMetaMultipartBucket , pathJoin ( bucket , object , "newMultipartObject.lck" ) )
lkctx , err := ns . GetLock ( ctx , globalOperationTimeout )
if err != nil {
return "" , err
}
ctx = lkctx . Context ( )
defer ns . Unlock ( lkctx . Cancel )
2021-04-21 10:57:36 -07:00
for idx , pool := range z . serverPools {
result , err := pool . ListMultipartUploads ( ctx , bucket , object , "" , "" , "" , maxUploadsList )
if err != nil {
return "" , err
}
// If there is a multipart upload with the same bucket/object name,
// create the new multipart in the same pool, this will avoid
// creating two multiparts uploads in two different pools
if len ( result . Uploads ) != 0 {
return z . serverPools [ idx ] . NewMultipartUpload ( ctx , bucket , object , opts )
}
}
2021-06-07 17:13:15 +02:00
idx , err := z . getPoolIdx ( ctx , bucket , object , - 1 )
2021-05-06 10:45:33 -07:00
if err != nil {
return "" , err
2020-06-17 08:33:14 -07:00
}
2020-12-01 13:50:33 -08:00
return z . serverPools [ idx ] . NewMultipartUpload ( ctx , bucket , object , opts )
2019-11-19 17:42:27 -08:00
}
// Copies a part of an object from source hashedSet to destination hashedSet.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) CopyObjectPart ( ctx context . Context , srcBucket , srcObject , destBucket , destObject string , uploadID string , partID int , startOffset int64 , length int64 , srcInfo ObjectInfo , srcOpts , dstOpts ObjectOptions ) ( PartInfo , error ) {
2020-05-19 13:53:54 -07:00
if err := checkNewMultipartArgs ( ctx , srcBucket , srcObject , z ) ; err != nil {
return PartInfo { } , err
}
2019-11-19 17:42:27 -08:00
return z . PutObjectPart ( ctx , destBucket , destObject , uploadID , partID ,
2021-02-10 08:52:50 -08:00
NewPutObjReader ( srcInfo . Reader ) , dstOpts )
2019-11-19 17:42:27 -08:00
}
// PutObjectPart - writes part of an object to hashedSet based on the object name.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) PutObjectPart ( ctx context . Context , bucket , object , uploadID string , partID int , data * PutObjReader , opts ObjectOptions ) ( PartInfo , error ) {
2020-05-19 13:53:54 -07:00
if err := checkPutObjectPartArgs ( ctx , bucket , object , z ) ; err != nil {
return PartInfo { } , err
}
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
return z . serverPools [ 0 ] . PutObjectPart ( ctx , bucket , object , uploadID , partID , data , opts )
2019-11-19 17:42:27 -08:00
}
2020-05-28 14:36:38 -07:00
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
_ , err := pool . GetMultipartInfo ( ctx , bucket , object , uploadID , opts )
2020-05-28 12:36:20 -07:00
if err == nil {
2021-01-06 09:35:47 -08:00
return pool . PutObjectPart ( ctx , bucket , object , uploadID , partID , data , opts )
2019-11-19 17:42:27 -08:00
}
2020-05-28 12:36:20 -07:00
switch err . ( type ) {
case InvalidUploadID :
2021-01-06 09:35:47 -08:00
// Look for information on the next pool
2020-05-28 12:36:20 -07:00
continue
}
// Any other unhandled errors such as quorum return.
return PartInfo { } , err
2019-11-19 17:42:27 -08:00
}
return PartInfo { } , InvalidUploadID {
Bucket : bucket ,
Object : object ,
UploadID : uploadID ,
}
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) GetMultipartInfo ( ctx context . Context , bucket , object , uploadID string , opts ObjectOptions ) ( MultipartInfo , error ) {
2020-05-28 12:36:20 -07:00
if err := checkListPartsArgs ( ctx , bucket , object , z ) ; err != nil {
return MultipartInfo { } , err
}
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
return z . serverPools [ 0 ] . GetMultipartInfo ( ctx , bucket , object , uploadID , opts )
2020-05-28 12:36:20 -07:00
}
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
mi , err := pool . GetMultipartInfo ( ctx , bucket , object , uploadID , opts )
2020-05-28 12:36:20 -07:00
if err == nil {
return mi , nil
}
switch err . ( type ) {
case InvalidUploadID :
2021-01-06 09:35:47 -08:00
// upload id not found, continue to the next pool.
2020-05-28 12:36:20 -07:00
continue
}
// any other unhandled error return right here.
return MultipartInfo { } , err
}
return MultipartInfo { } , InvalidUploadID {
Bucket : bucket ,
Object : object ,
UploadID : uploadID ,
}
}
2019-11-19 17:42:27 -08:00
// ListObjectParts - lists all uploaded parts to an object in hashedSet.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) ListObjectParts ( ctx context . Context , bucket , object , uploadID string , partNumberMarker int , maxParts int , opts ObjectOptions ) ( ListPartsInfo , error ) {
2020-05-19 13:53:54 -07:00
if err := checkListPartsArgs ( ctx , bucket , object , z ) ; err != nil {
return ListPartsInfo { } , err
}
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
return z . serverPools [ 0 ] . ListObjectParts ( ctx , bucket , object , uploadID , partNumberMarker , maxParts , opts )
2019-11-19 17:42:27 -08:00
}
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
_ , err := pool . GetMultipartInfo ( ctx , bucket , object , uploadID , opts )
2020-05-28 12:36:20 -07:00
if err == nil {
2021-01-06 09:35:47 -08:00
return pool . ListObjectParts ( ctx , bucket , object , uploadID , partNumberMarker , maxParts , opts )
2019-11-19 17:42:27 -08:00
}
2020-05-28 12:36:20 -07:00
switch err . ( type ) {
case InvalidUploadID :
continue
}
return ListPartsInfo { } , err
2019-11-19 17:42:27 -08:00
}
return ListPartsInfo { } , InvalidUploadID {
Bucket : bucket ,
Object : object ,
UploadID : uploadID ,
}
}
// Aborts an in-progress multipart operation on hashedSet based on the object name.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) AbortMultipartUpload ( ctx context . Context , bucket , object , uploadID string , opts ObjectOptions ) error {
2020-05-19 13:53:54 -07:00
if err := checkAbortMultipartArgs ( ctx , bucket , object , z ) ; err != nil {
return err
}
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
return z . serverPools [ 0 ] . AbortMultipartUpload ( ctx , bucket , object , uploadID , opts )
2019-11-19 17:42:27 -08:00
}
2020-05-19 13:53:54 -07:00
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
_ , err := pool . GetMultipartInfo ( ctx , bucket , object , uploadID , opts )
2020-05-28 12:36:20 -07:00
if err == nil {
2021-01-06 09:35:47 -08:00
return pool . AbortMultipartUpload ( ctx , bucket , object , uploadID , opts )
2019-11-19 17:42:27 -08:00
}
2020-05-28 12:36:20 -07:00
switch err . ( type ) {
case InvalidUploadID :
2021-01-06 09:35:47 -08:00
// upload id not found move to next pool
2020-05-28 12:36:20 -07:00
continue
}
return err
2019-11-19 17:42:27 -08:00
}
return InvalidUploadID {
Bucket : bucket ,
Object : object ,
UploadID : uploadID ,
}
}
// CompleteMultipartUpload - completes a pending multipart transaction, on hashedSet based on object name.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) CompleteMultipartUpload ( ctx context . Context , bucket , object , uploadID string , uploadedParts [ ] CompletePart , opts ObjectOptions ) ( objInfo ObjectInfo , err error ) {
2020-05-19 13:53:54 -07:00
if err = checkCompleteMultipartArgs ( ctx , bucket , object , z ) ; err != nil {
return objInfo , err
}
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
return z . serverPools [ 0 ] . CompleteMultipartUpload ( ctx , bucket , object , uploadID , uploadedParts , opts )
2019-11-19 17:42:27 -08:00
}
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
2021-04-21 10:57:36 -07:00
_ , err := pool . GetMultipartInfo ( ctx , bucket , object , uploadID , opts )
if err == nil {
2021-01-06 09:35:47 -08:00
return pool . CompleteMultipartUpload ( ctx , bucket , object , uploadID , uploadedParts , opts )
2019-11-19 17:42:27 -08:00
}
}
2021-04-21 10:57:36 -07:00
2019-11-19 17:42:27 -08:00
return objInfo , InvalidUploadID {
Bucket : bucket ,
Object : object ,
UploadID : uploadID ,
}
}
2020-12-01 13:50:33 -08:00
// GetBucketInfo - returns bucket info from one of the erasure coded serverPools.
func ( z * erasureServerPools ) GetBucketInfo ( ctx context . Context , bucket string ) ( bucketInfo BucketInfo , err error ) {
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
bucketInfo , err = z . serverPools [ 0 ] . GetBucketInfo ( ctx , bucket )
2020-05-19 13:53:54 -07:00
if err != nil {
return bucketInfo , err
}
meta , err := globalBucketMetadataSys . Get ( bucket )
if err == nil {
bucketInfo . Created = meta . Created
}
return bucketInfo , nil
2019-11-19 17:42:27 -08:00
}
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
bucketInfo , err = pool . GetBucketInfo ( ctx , bucket )
2019-11-19 17:42:27 -08:00
if err != nil {
if isErrBucketNotFound ( err ) {
continue
}
return bucketInfo , err
}
2020-05-19 13:53:54 -07:00
meta , err := globalBucketMetadataSys . Get ( bucket )
if err == nil {
bucketInfo . Created = meta . Created
}
2019-11-19 17:42:27 -08:00
return bucketInfo , nil
}
return bucketInfo , BucketNotFound {
Bucket : bucket ,
}
}
// IsNotificationSupported returns whether bucket notification is applicable for this layer.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) IsNotificationSupported ( ) bool {
2019-11-19 17:42:27 -08:00
return true
}
2020-07-20 12:52:49 -07:00
// IsListenSupported returns whether listen bucket notification is applicable for this layer.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) IsListenSupported ( ) bool {
2019-11-19 17:42:27 -08:00
return true
}
// IsEncryptionSupported returns whether server side encryption is implemented for this layer.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) IsEncryptionSupported ( ) bool {
2019-11-19 17:42:27 -08:00
return true
}
// IsCompressionSupported returns whether compression is applicable for this layer.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) IsCompressionSupported ( ) bool {
2019-11-19 17:42:27 -08:00
return true
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) IsTaggingSupported ( ) bool {
2020-05-23 11:09:35 -07:00
return true
}
2020-12-01 13:50:33 -08:00
// DeleteBucket - deletes a bucket on all serverPools simultaneously,
// even if one of the serverPools fail to delete buckets, we proceed to
2019-11-19 17:42:27 -08:00
// undo a successful operation.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) DeleteBucket ( ctx context . Context , bucket string , forceDelete bool ) error {
g := errgroup . WithNErrs ( len ( z . serverPools ) )
2019-11-19 17:42:27 -08:00
2020-12-01 13:50:33 -08:00
// Delete buckets in parallel across all serverPools.
for index := range z . serverPools {
2019-11-19 17:42:27 -08:00
index := index
g . Go ( func ( ) error {
2020-12-01 13:50:33 -08:00
return z . serverPools [ index ] . DeleteBucket ( ctx , bucket , forceDelete )
2019-11-19 17:42:27 -08:00
} , index )
}
errs := g . Wait ( )
2020-03-28 04:52:59 +00:00
2020-05-08 13:44:44 -07:00
// For any write quorum failure, we undo all the delete
// buckets operation by creating all the buckets again.
2019-11-19 17:42:27 -08:00
for _ , err := range errs {
if err != nil {
2021-06-09 17:13:00 -07:00
if ! z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
undoDeleteBucketServerPools ( ctx , bucket , z . serverPools , errs )
2019-11-19 17:42:27 -08:00
}
return err
}
}
2021-06-09 17:13:00 -07:00
deleteBucketMetadata ( ctx , z , bucket )
2019-11-19 17:42:27 -08:00
// Success.
return nil
}
2021-02-24 22:24:38 -08:00
// renameAll will rename bucket+prefix unconditionally across all disks to
2021-08-19 09:16:14 -07:00
// minioMetaTmpDeletedBucket + unique uuid,
2021-02-24 22:24:38 -08:00
// Note that set distribution is ignored so it should only be used in cases where
// data is not distributed across sets. Errors are logged but individual
// disk failures are not returned.
func ( z * erasureServerPools ) renameAll ( ctx context . Context , bucket , prefix string ) {
for _ , servers := range z . serverPools {
for _ , set := range servers . sets {
set . renameAll ( ctx , bucket , prefix )
}
}
}
2019-11-19 17:42:27 -08:00
// This function is used to undo a successful DeleteBucket operation.
2020-12-01 13:50:33 -08:00
func undoDeleteBucketServerPools ( ctx context . Context , bucket string , serverPools [ ] * erasureSets , errs [ ] error ) {
g := errgroup . WithNErrs ( len ( serverPools ) )
2019-11-19 17:42:27 -08:00
2020-12-01 13:50:33 -08:00
// Undo previous delete bucket on all underlying serverPools.
for index := range serverPools {
2019-11-19 17:42:27 -08:00
index := index
g . Go ( func ( ) error {
if errs [ index ] == nil {
2020-12-01 13:50:33 -08:00
return serverPools [ index ] . MakeBucketWithLocation ( ctx , bucket , BucketOptions { } )
2019-11-19 17:42:27 -08:00
}
return nil
} , index )
}
g . Wait ( )
}
2020-12-01 13:50:33 -08:00
// List all buckets from one of the serverPools, we are not doing merge
2019-11-19 17:42:27 -08:00
// sort here just for simplification. As per design it is assumed
2020-12-01 13:50:33 -08:00
// that all buckets are present on all serverPools.
func ( z * erasureServerPools ) ListBuckets ( ctx context . Context ) ( buckets [ ] BucketInfo , err error ) {
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
buckets , err = z . serverPools [ 0 ] . ListBuckets ( ctx )
2020-05-08 13:44:44 -07:00
} else {
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
buckets , err = pool . ListBuckets ( ctx )
2020-05-08 13:44:44 -07:00
if err != nil {
logger . LogIf ( ctx , err )
continue
}
break
}
2019-11-19 17:42:27 -08:00
}
2020-05-08 13:44:44 -07:00
if err != nil {
return nil , err
}
for i := range buckets {
2020-05-19 13:53:54 -07:00
meta , err := globalBucketMetadataSys . Get ( buckets [ i ] . Name )
2020-05-08 13:44:44 -07:00
if err == nil {
buckets [ i ] . Created = meta . Created
}
2019-11-19 17:42:27 -08:00
}
2020-05-08 13:44:44 -07:00
return buckets , nil
2019-11-19 17:42:27 -08:00
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) HealFormat ( ctx context . Context , dryRun bool ) ( madmin . HealResultItem , error ) {
2019-11-19 17:42:27 -08:00
// Acquire lock on format.json
2020-11-04 08:25:42 -08:00
formatLock := z . NewNSLock ( minioMetaBucket , formatConfigFile )
2021-04-29 20:55:21 -07:00
lkctx , err := formatLock . GetLock ( ctx , globalOperationTimeout )
2021-03-04 03:36:43 +01:00
if err != nil {
2019-11-19 17:42:27 -08:00
return madmin . HealResultItem { } , err
}
2021-04-29 20:55:21 -07:00
ctx = lkctx . Context ( )
defer formatLock . Unlock ( lkctx . Cancel )
2019-11-19 17:42:27 -08:00
var r = madmin . HealResultItem {
Type : madmin . HealItemMetadata ,
Detail : "disk-format" ,
}
2020-01-15 17:19:13 -08:00
var countNoHeal int
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
result , err := pool . HealFormat ( ctx , dryRun )
2020-09-16 21:14:35 -07:00
if err != nil && ! errors . Is ( err , errNoHealRequired ) {
2019-11-19 17:42:27 -08:00
logger . LogIf ( ctx , err )
continue
}
2020-12-01 13:50:33 -08:00
// Count errNoHealRequired across all serverPools,
2020-01-15 17:19:13 -08:00
// to return appropriate error to the caller
2020-09-16 21:14:35 -07:00
if errors . Is ( err , errNoHealRequired ) {
2020-01-15 17:19:13 -08:00
countNoHeal ++
}
2019-11-20 23:40:26 +05:30
r . DiskCount += result . DiskCount
r . SetCount += result . SetCount
r . Before . Drives = append ( r . Before . Drives , result . Before . Drives ... )
r . After . Drives = append ( r . After . Drives , result . After . Drives ... )
2019-11-19 17:42:27 -08:00
}
2020-09-16 21:14:35 -07:00
2020-12-01 13:50:33 -08:00
// No heal returned by all serverPools, return errNoHealRequired
if countNoHeal == len ( z . serverPools ) {
2020-01-15 17:19:13 -08:00
return r , errNoHealRequired
}
2020-09-16 21:14:35 -07:00
2019-11-19 17:42:27 -08:00
return r , nil
}
2020-12-13 11:57:08 -08:00
func ( z * erasureServerPools ) HealBucket ( ctx context . Context , bucket string , opts madmin . HealOpts ) ( madmin . HealResultItem , error ) {
2019-11-19 17:42:27 -08:00
var r = madmin . HealResultItem {
Type : madmin . HealItemBucket ,
Bucket : bucket ,
}
2020-12-14 12:07:07 -08:00
// Attempt heal on the bucket metadata, ignore any failures
_ , _ = z . HealObject ( ctx , minioMetaBucket , pathJoin ( bucketConfigPrefix , bucket , bucketMetadataFile ) , "" , opts )
2020-12-13 11:57:08 -08:00
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
result , err := pool . HealBucket ( ctx , bucket , opts )
2019-11-19 17:42:27 -08:00
if err != nil {
switch err . ( type ) {
case BucketNotFound :
continue
}
return result , err
}
2019-11-20 23:40:26 +05:30
r . DiskCount += result . DiskCount
r . SetCount += result . SetCount
r . Before . Drives = append ( r . Before . Drives , result . Before . Drives ... )
r . After . Drives = append ( r . After . Drives , result . After . Drives ... )
2019-11-19 17:42:27 -08:00
}
2020-12-14 12:07:07 -08:00
2019-11-19 17:42:27 -08:00
return r , nil
}
2020-02-25 21:22:28 +05:30
// Walk a bucket, optionally prefix recursively, until we have returned
// all the content to objectInfo channel, it is callers responsibility
// to allocate a receive channel for ObjectInfo, upon any unhandled
// error walker returns error. Optionally if context.Done() is received
// then Walk() stops the walker.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) Walk ( ctx context . Context , bucket , prefix string , results chan <- ObjectInfo , opts ObjectOptions ) error {
2020-02-25 21:22:28 +05:30
if err := checkListObjsArgs ( ctx , bucket , prefix , "" , z ) ; err != nil {
2020-02-26 09:28:58 +05:30
// Upon error close the channel.
close ( results )
2020-02-25 21:22:28 +05:30
return err
}
2020-07-10 22:21:04 -07:00
if opts . WalkVersions {
go func ( ) {
defer close ( results )
2020-11-03 08:53:48 -08:00
var marker , versionIDMarker string
2020-07-10 22:21:04 -07:00
for {
2020-11-03 08:53:48 -08:00
loi , err := z . ListObjectVersions ( ctx , bucket , prefix , marker , versionIDMarker , "" , 1000 )
if err != nil {
break
2020-07-10 22:21:04 -07:00
}
2020-11-03 08:53:48 -08:00
for _ , obj := range loi . Objects {
results <- obj
2020-07-10 22:21:04 -07:00
}
2020-11-03 08:53:48 -08:00
if ! loi . IsTruncated {
break
}
marker = loi . NextMarker
versionIDMarker = loi . NextVersionIDMarker
2020-07-10 22:21:04 -07:00
}
} ( )
return nil
}
2020-02-25 21:22:28 +05:30
go func ( ) {
defer close ( results )
2020-11-03 08:53:48 -08:00
var marker string
2020-02-25 21:22:28 +05:30
for {
2020-11-03 08:53:48 -08:00
loi , err := z . ListObjects ( ctx , bucket , prefix , marker , "" , 1000 )
if err != nil {
break
2020-02-25 21:22:28 +05:30
}
2020-11-03 08:53:48 -08:00
for _ , obj := range loi . Objects {
results <- obj
2020-02-25 21:22:28 +05:30
}
2020-11-03 08:53:48 -08:00
if ! loi . IsTruncated {
break
}
marker = loi . NextMarker
2020-02-25 21:22:28 +05:30
}
} ( )
return nil
}
2020-06-12 20:04:01 -07:00
// HealObjectFn closure function heals the object.
2020-08-24 13:47:01 -07:00
type HealObjectFn func ( bucket , object , versionID string ) error
2020-01-29 12:05:44 +05:30
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) HealObjects ( ctx context . Context , bucket , prefix string , opts madmin . HealOpts , healObject HealObjectFn ) error {
2021-03-06 09:25:48 -08:00
errCh := make ( chan error )
ctx , cancel := context . WithCancel ( ctx )
go func ( ) {
defer close ( errCh )
defer cancel ( )
for _ , erasureSet := range z . serverPools {
2020-11-11 10:58:16 -08:00
var wg sync . WaitGroup
2021-03-06 09:25:48 -08:00
for _ , set := range erasureSet . sets {
set := set
2020-11-11 10:58:16 -08:00
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
2021-03-06 09:25:48 -08:00
disks , _ := set . getOnlineDisksWithHealing ( )
if len ( disks ) == 0 {
errCh <- errors . New ( "HealObjects: No non-healing disks found" )
cancel ( )
2020-11-11 10:58:16 -08:00
return
}
2020-10-01 20:24:34 -07:00
2021-03-06 09:25:48 -08:00
healEntry := func ( entry metaCacheEntry ) {
if entry . isDir ( ) {
return
}
2021-03-07 09:38:31 -08:00
// We might land at .metacache, .trash, .multipart
// no need to heal them skip, only when bucket
// is '.minio.sys'
if bucket == minioMetaBucket {
if wildcard . Match ( "buckets/*/.metacache/*" , entry . name ) {
return
}
2021-03-10 09:38:35 -08:00
if wildcard . Match ( "tmp/*" , entry . name ) {
2021-03-07 09:38:31 -08:00
return
}
if wildcard . Match ( "multipart/*" , entry . name ) {
return
}
2021-03-10 09:38:35 -08:00
if wildcard . Match ( "tmp-old/*" , entry . name ) {
return
}
2021-03-07 09:38:31 -08:00
}
2021-03-06 09:25:48 -08:00
fivs , err := entry . fileInfoVersions ( bucket )
if err != nil {
errCh <- err
cancel ( )
return
}
waitForLowHTTPReq ( globalHealConfig . IOCount , globalHealConfig . Sleep )
for _ , version := range fivs . Versions {
if err := healObject ( bucket , version . Name , version . VersionID ) ; err != nil {
errCh <- err
cancel ( )
return
}
}
2021-01-27 11:19:28 +01:00
}
2021-03-06 09:25:48 -08:00
// How to resolve partial results.
resolver := metadataResolutionParams {
dirQuorum : 1 ,
objQuorum : 1 ,
bucket : bucket ,
}
2020-01-29 12:05:44 +05:30
2021-03-23 07:57:07 -07:00
path := baseDirFromPrefix ( prefix )
if path == "" {
path = prefix
}
2021-08-05 15:01:19 -07:00
lopts := listPathRawOptions {
2021-03-06 09:25:48 -08:00
disks : disks ,
bucket : bucket ,
2021-03-23 07:57:07 -07:00
path : path ,
2021-03-06 09:25:48 -08:00
recursive : true ,
forwardTo : "" ,
minDisks : 1 ,
reportNotFound : false ,
agreed : healEntry ,
partial : func ( entries metaCacheEntries , nAgreed int , errs [ ] error ) {
entry , ok := entries . resolve ( & resolver )
if ok {
healEntry ( * entry )
}
} ,
finished : nil ,
2021-08-05 15:01:19 -07:00
}
if err := listPathRaw ( ctx , lopts ) ; err != nil {
errCh <- fmt . Errorf ( "listPathRaw returned %w: opts(%#v)" , err , lopts )
2021-03-06 09:25:48 -08:00
cancel ( )
return
2020-11-11 10:58:16 -08:00
}
2021-03-06 09:25:48 -08:00
} ( )
2020-06-12 20:04:01 -07:00
}
2021-03-06 09:25:48 -08:00
wg . Wait ( )
2019-11-19 17:42:27 -08:00
}
2021-03-06 09:25:48 -08:00
} ( )
return <- errCh
2019-11-19 17:42:27 -08:00
}
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) HealObject ( ctx context . Context , bucket , object , versionID string , opts madmin . HealOpts ) ( madmin . HealResultItem , error ) {
2020-09-19 08:39:41 -07:00
object = encodeDirObject ( object )
2021-01-06 09:35:47 -08:00
for _ , pool := range z . serverPools {
result , err := pool . HealObject ( ctx , bucket , object , versionID , opts )
2021-01-25 18:53:37 +01:00
result . Object = decodeDirObject ( result . Object )
2019-11-19 17:42:27 -08:00
if err != nil {
2020-07-02 16:17:27 -07:00
if isErrObjectNotFound ( err ) || isErrVersionNotFound ( err ) {
2019-11-19 17:42:27 -08:00
continue
}
return result , err
}
return result , nil
}
2020-10-22 13:36:24 -07:00
if versionID != "" {
return madmin . HealResultItem { } , VersionNotFound {
Bucket : bucket ,
Object : object ,
VersionID : versionID ,
}
}
2019-11-19 17:42:27 -08:00
return madmin . HealResultItem { } , ObjectNotFound {
Bucket : bucket ,
Object : object ,
}
}
2021-03-17 04:06:57 +01:00
// GetMetrics - returns metrics of local disks
2021-01-18 20:35:38 -08:00
func ( z * erasureServerPools ) GetMetrics ( ctx context . Context ) ( * BackendMetrics , error ) {
2019-12-06 12:46:06 +05:30
logger . LogIf ( ctx , NotImplemented { } )
2021-01-18 20:35:38 -08:00
return & BackendMetrics { } , NotImplemented { }
2019-12-06 12:46:06 +05:30
}
2019-12-28 22:24:43 +05:30
2021-03-04 14:36:23 -08:00
func ( z * erasureServerPools ) getPoolAndSet ( id string ) ( poolIdx , setIdx , diskIdx int , err error ) {
2021-01-06 09:35:47 -08:00
for poolIdx := range z . serverPools {
format := z . serverPools [ poolIdx ] . format
2020-06-12 20:04:01 -07:00
for setIdx , set := range format . Erasure . Sets {
2021-03-04 14:36:23 -08:00
for i , diskID := range set {
2020-05-23 17:38:39 -07:00
if diskID == id {
2021-03-04 14:36:23 -08:00
return poolIdx , setIdx , i , nil
2020-05-23 17:38:39 -07:00
}
}
}
}
2021-03-04 14:36:23 -08:00
return - 1 , - 1 , - 1 , fmt . Errorf ( "DiskID(%s) %w" , id , errDiskNotFound )
2020-05-23 17:38:39 -07:00
}
2020-07-20 18:31:22 -07:00
// HealthOptions takes input options to return sepcific information
type HealthOptions struct {
Maintenance bool
}
// HealthResult returns the current state of the system, also
// additionally with any specific heuristic information which
// was queried
type HealthResult struct {
Healthy bool
2020-08-07 13:22:53 -07:00
HealingDrives int
2021-01-26 20:47:42 -08:00
PoolID , SetID int
2020-07-20 18:31:22 -07:00
WriteQuorum int
}
2021-02-09 01:00:44 -08:00
// ReadHealth returns if the cluster can serve read requests
func ( z * erasureServerPools ) ReadHealth ( ctx context . Context ) bool {
erasureSetUpCount := make ( [ ] [ ] int , len ( z . serverPools ) )
for i := range z . serverPools {
erasureSetUpCount [ i ] = make ( [ ] int , len ( z . serverPools [ i ] . sets ) )
}
diskIDs := globalNotificationSys . GetLocalDiskIDs ( ctx )
diskIDs = append ( diskIDs , getLocalDiskIDs ( z ) )
for _ , localDiskIDs := range diskIDs {
for _ , id := range localDiskIDs {
2021-03-04 14:36:23 -08:00
poolIdx , setIdx , _ , err := z . getPoolAndSet ( id )
2021-02-09 01:00:44 -08:00
if err != nil {
logger . LogIf ( ctx , err )
continue
}
erasureSetUpCount [ poolIdx ] [ setIdx ] ++
}
}
b := z . BackendInfo ( )
readQuorum := b . StandardSCData [ 0 ]
for poolIdx := range erasureSetUpCount {
for setIdx := range erasureSetUpCount [ poolIdx ] {
if erasureSetUpCount [ poolIdx ] [ setIdx ] < readQuorum {
return false
}
}
}
return true
}
2020-07-20 18:31:22 -07:00
// Health - returns current status of the object layer health,
// provides if write access exists across sets, additionally
// can be used to query scenarios if health may be lost
// if this node is taken down by an external orchestrator.
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) Health ( ctx context . Context , opts HealthOptions ) HealthResult {
erasureSetUpCount := make ( [ ] [ ] int , len ( z . serverPools ) )
for i := range z . serverPools {
erasureSetUpCount [ i ] = make ( [ ] int , len ( z . serverPools [ i ] . sets ) )
2020-05-23 17:38:39 -07:00
}
2020-06-04 14:58:34 -07:00
diskIDs := globalNotificationSys . GetLocalDiskIDs ( ctx )
2020-07-20 18:31:22 -07:00
if ! opts . Maintenance {
diskIDs = append ( diskIDs , getLocalDiskIDs ( z ) )
}
2020-05-23 17:38:39 -07:00
2020-07-20 18:31:22 -07:00
for _ , localDiskIDs := range diskIDs {
for _ , id := range localDiskIDs {
2021-03-04 14:36:23 -08:00
poolIdx , setIdx , _ , err := z . getPoolAndSet ( id )
2020-07-20 18:31:22 -07:00
if err != nil {
logger . LogIf ( ctx , err )
continue
}
2021-01-06 09:35:47 -08:00
erasureSetUpCount [ poolIdx ] [ setIdx ] ++
2020-05-23 17:38:39 -07:00
}
}
2020-08-13 15:21:20 -07:00
reqInfo := ( & logger . ReqInfo { } ) . AppendTags ( "maintenance" , strconv . FormatBool ( opts . Maintenance ) )
2021-01-16 12:08:02 -08:00
b := z . BackendInfo ( )
2021-01-22 12:09:24 -08:00
writeQuorum := b . StandardSCData [ 0 ]
if writeQuorum == b . StandardSCParity {
2020-09-02 22:54:56 -07:00
writeQuorum ++
}
2020-09-04 17:09:02 -07:00
var aggHealStateResult madmin . BgHealState
if opts . Maintenance {
// check if local disks are being healed, if they are being healed
// we need to tell healthy status as 'false' so that this server
// is not taken down for maintenance
var err error
2021-03-04 14:36:23 -08:00
aggHealStateResult , err = getAggregatedBackgroundHealState ( ctx , nil )
2020-09-04 17:09:02 -07:00
if err != nil {
logger . LogIf ( logger . SetReqInfo ( ctx , reqInfo ) , fmt . Errorf ( "Unable to verify global heal status: %w" , err ) )
return HealthResult {
Healthy : false ,
}
}
if len ( aggHealStateResult . HealDisks ) > 0 {
logger . LogIf ( logger . SetReqInfo ( ctx , reqInfo ) , fmt . Errorf ( "Total drives to be healed %d" , len ( aggHealStateResult . HealDisks ) ) )
}
}
2021-01-06 09:35:47 -08:00
for poolIdx := range erasureSetUpCount {
for setIdx := range erasureSetUpCount [ poolIdx ] {
if erasureSetUpCount [ poolIdx ] [ setIdx ] < writeQuorum {
2020-08-13 15:21:20 -07:00
logger . LogIf ( logger . SetReqInfo ( ctx , reqInfo ) ,
2021-01-06 09:35:47 -08:00
fmt . Errorf ( "Write quorum may be lost on pool: %d, set: %d, expected write quorum: %d" ,
poolIdx , setIdx , writeQuorum ) )
2020-07-20 18:31:22 -07:00
return HealthResult {
2020-09-04 17:09:02 -07:00
Healthy : false ,
HealingDrives : len ( aggHealStateResult . HealDisks ) ,
2021-01-26 20:47:42 -08:00
PoolID : poolIdx ,
2020-09-04 17:09:02 -07:00
SetID : setIdx ,
WriteQuorum : writeQuorum ,
2020-07-20 18:31:22 -07:00
}
2020-05-23 17:38:39 -07:00
}
}
}
2020-08-07 13:22:53 -07:00
2020-08-12 16:53:15 -07:00
// when maintenance is not specified we don't have
// to look at the healing side of the code.
if ! opts . Maintenance {
return HealthResult {
2020-09-02 22:54:56 -07:00
Healthy : true ,
WriteQuorum : writeQuorum ,
2020-08-12 16:53:15 -07:00
}
}
2020-07-20 18:31:22 -07:00
return HealthResult {
2020-09-02 22:54:56 -07:00
Healthy : len ( aggHealStateResult . HealDisks ) == 0 ,
2020-08-07 13:22:53 -07:00
HealingDrives : len ( aggHealStateResult . HealDisks ) ,
2020-09-02 22:54:56 -07:00
WriteQuorum : writeQuorum ,
2020-07-20 18:31:22 -07:00
}
2019-12-28 22:24:43 +05:30
}
2020-01-20 22:15:59 +05:30
2021-04-04 13:32:31 -07:00
// PutObjectMetadata - replace or add tags to an existing object
func ( z * erasureServerPools ) PutObjectMetadata ( ctx context . Context , bucket , object string , opts ObjectOptions ) ( ObjectInfo , error ) {
object = encodeDirObject ( object )
if z . SinglePool ( ) {
return z . serverPools [ 0 ] . PutObjectMetadata ( ctx , bucket , object , opts )
}
// We don't know the size here set 1GiB atleast.
idx , err := z . getPoolIdxExisting ( ctx , bucket , object )
if err != nil {
return ObjectInfo { } , err
}
return z . serverPools [ idx ] . PutObjectMetadata ( ctx , bucket , object , opts )
}
2020-05-23 11:09:35 -07:00
// PutObjectTags - replace or add tags to an existing object
2021-02-01 22:52:51 +01:00
func ( z * erasureServerPools ) PutObjectTags ( ctx context . Context , bucket , object string , tags string , opts ObjectOptions ) ( ObjectInfo , error ) {
2020-09-19 08:39:41 -07:00
object = encodeDirObject ( object )
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
return z . serverPools [ 0 ] . PutObjectTags ( ctx , bucket , object , tags , opts )
2020-01-20 22:15:59 +05:30
}
2020-06-29 13:07:26 -07:00
2021-02-16 02:43:47 -08:00
// We don't know the size here set 1GiB atleast.
2021-03-16 19:02:20 +01:00
idx , err := z . getPoolIdxExisting ( ctx , bucket , object )
2021-02-16 02:43:47 -08:00
if err != nil {
return ObjectInfo { } , err
2020-01-20 22:15:59 +05:30
}
2021-02-16 02:43:47 -08:00
return z . serverPools [ idx ] . PutObjectTags ( ctx , bucket , object , tags , opts )
2020-01-20 22:15:59 +05:30
}
2020-05-23 11:09:35 -07:00
// DeleteObjectTags - delete object tags from an existing object
2021-02-01 22:52:51 +01:00
func ( z * erasureServerPools ) DeleteObjectTags ( ctx context . Context , bucket , object string , opts ObjectOptions ) ( ObjectInfo , error ) {
2020-09-19 08:39:41 -07:00
object = encodeDirObject ( object )
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
return z . serverPools [ 0 ] . DeleteObjectTags ( ctx , bucket , object , opts )
2020-01-20 22:15:59 +05:30
}
2021-02-16 02:43:47 -08:00
2021-03-16 19:02:20 +01:00
idx , err := z . getPoolIdxExisting ( ctx , bucket , object )
2021-02-16 02:43:47 -08:00
if err != nil {
return ObjectInfo { } , err
2020-01-20 22:15:59 +05:30
}
2021-02-16 02:43:47 -08:00
return z . serverPools [ idx ] . DeleteObjectTags ( ctx , bucket , object , opts )
2020-01-20 22:15:59 +05:30
}
2020-05-23 11:09:35 -07:00
// GetObjectTags - get object tags from an existing object
2020-12-01 13:50:33 -08:00
func ( z * erasureServerPools ) GetObjectTags ( ctx context . Context , bucket , object string , opts ObjectOptions ) ( * tags . Tags , error ) {
2020-09-19 08:39:41 -07:00
object = encodeDirObject ( object )
2021-01-26 20:47:42 -08:00
if z . SinglePool ( ) {
2020-12-01 13:50:33 -08:00
return z . serverPools [ 0 ] . GetObjectTags ( ctx , bucket , object , opts )
2020-01-20 22:15:59 +05:30
}
2021-02-16 02:43:47 -08:00
2021-03-16 19:02:20 +01:00
idx , err := z . getPoolIdxExisting ( ctx , bucket , object )
2021-02-16 02:43:47 -08:00
if err != nil {
return nil , err
2020-01-20 22:15:59 +05:30
}
2021-02-16 02:43:47 -08:00
return z . serverPools [ idx ] . GetObjectTags ( ctx , bucket , object , opts )
2020-01-20 22:15:59 +05:30
}
2021-04-19 10:30:42 -07:00
// TransitionObject - transition object content to target tier.
func ( z * erasureServerPools ) TransitionObject ( ctx context . Context , bucket , object string , opts ObjectOptions ) error {
object = encodeDirObject ( object )
if z . SinglePool ( ) {
return z . serverPools [ 0 ] . TransitionObject ( ctx , bucket , object , opts )
}
idx , err := z . getPoolIdxExisting ( ctx , bucket , object )
if err != nil {
return err
}
return z . serverPools [ idx ] . TransitionObject ( ctx , bucket , object , opts )
}
// RestoreTransitionedObject - restore transitioned object content locally on this cluster.
func ( z * erasureServerPools ) RestoreTransitionedObject ( ctx context . Context , bucket , object string , opts ObjectOptions ) error {
object = encodeDirObject ( object )
if z . SinglePool ( ) {
return z . serverPools [ 0 ] . RestoreTransitionedObject ( ctx , bucket , object , opts )
}
idx , err := z . getPoolIdxExisting ( ctx , bucket , object )
if err != nil {
return err
}
return z . serverPools [ idx ] . RestoreTransitionedObject ( ctx , bucket , object , opts )
}