2021-04-18 12:41:13 -07:00
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
2018-02-15 17:45:57 -08:00
package cmd
import (
2018-03-14 12:01:47 -07:00
"context"
2021-02-26 16:53:06 -08:00
"encoding/binary"
2020-08-18 14:37:26 -07:00
"errors"
2018-02-15 17:45:57 -08:00
"fmt"
"hash/crc32"
2020-10-13 18:28:42 -07:00
"math/rand"
2018-09-20 19:22:09 -07:00
"net/http"
2021-11-15 09:46:55 -08:00
"reflect"
2022-07-25 17:51:32 -07:00
"strings"
2018-02-15 17:45:57 -08:00
"sync"
"time"
2020-06-12 20:04:01 -07:00
"github.com/dchest/siphash"
[feat]: change erasure coding default block size from 10MiB to 1MiB (#11721)
major performance improvements in range GETs to avoid large
read amplification when ranges are tiny and random
```
-------------------
Operation: GET
Operations: 142014 -> 339421
Duration: 4m50s -> 4m56s
* Average: +139.41% (+1177.3 MiB/s) throughput, +139.11% (+658.4) obj/s
* Fastest: +125.24% (+1207.4 MiB/s) throughput, +132.32% (+612.9) obj/s
* 50% Median: +139.06% (+1175.7 MiB/s) throughput, +133.46% (+660.9) obj/s
* Slowest: +203.40% (+1267.9 MiB/s) throughput, +198.59% (+753.5) obj/s
```
TTFB from 10MiB BlockSize
```
* First Access TTFB: Avg: 81ms, Median: 61ms, Best: 20ms, Worst: 2.056s
```
TTFB from 1MiB BlockSize
```
* First Access TTFB: Avg: 22ms, Median: 21ms, Best: 8ms, Worst: 91ms
```
Full object reads however do see a slight change which won't be
noticeable in real world, so not doing any comparisons
TTFB still had improvements with full object reads with 1MiB
```
* First Access TTFB: Avg: 68ms, Median: 35ms, Best: 11ms, Worst: 1.16s
```
v/s
TTFB with 10MiB
```
* First Access TTFB: Avg: 388ms, Median: 98ms, Best: 20ms, Worst: 4.156s
```
This change should affect all new uploads, previous uploads should
continue to work with business as usual. But dramatic improvements can
be seen with these changes.
2021-03-06 14:09:34 -08:00
"github.com/dustin/go-humanize"
2020-06-12 20:04:01 -07:00
"github.com/google/uuid"
2023-06-19 17:53:08 -07:00
"github.com/minio/madmin-go/v3"
2020-12-10 07:28:37 -08:00
"github.com/minio/minio-go/v7/pkg/set"
2020-07-14 17:38:05 +01:00
"github.com/minio/minio-go/v7/pkg/tags"
2021-06-01 14:59:40 -07:00
"github.com/minio/minio/internal/dsync"
2024-01-28 10:04:17 -08:00
xioutil "github.com/minio/minio/internal/ioutil"
2021-06-01 14:59:40 -07:00
"github.com/minio/minio/internal/logger"
2023-09-04 12:57:37 -07:00
"github.com/minio/pkg/v2/console"
"github.com/minio/pkg/v2/sync/errgroup"
2018-02-15 17:45:57 -08:00
)
2019-11-13 12:17:45 -08:00
// setsDsyncLockers is encapsulated type for Close()
type setsDsyncLockers [ ] [ ] dsync . NetLocker
2020-06-12 20:04:01 -07:00
// erasureSets implements ObjectLayer combining a static list of erasure coded
2018-02-15 17:45:57 -08:00
// object sets. NOTE: There is no dynamic scaling allowed or intended in
// current design.
2020-06-12 20:04:01 -07:00
type erasureSets struct {
sets [ ] * erasureObjects
2018-02-15 17:45:57 -08:00
// Reference format.
2020-06-12 20:04:01 -07:00
format * formatErasureV3
2018-02-15 17:45:57 -08:00
2020-06-12 20:04:01 -07:00
// erasureDisks mutex to lock erasureDisks.
erasureDisksMu sync . RWMutex
2018-02-15 17:45:57 -08:00
// Re-ordered list of disks per set.
2020-06-12 20:04:01 -07:00
erasureDisks [ ] [ ] StorageAPI
2018-02-15 17:45:57 -08:00
2019-11-13 12:17:45 -08:00
// Distributed locker clients.
2020-06-12 20:04:01 -07:00
erasureLockers setsDsyncLockers
2019-11-13 12:17:45 -08:00
2020-09-25 19:21:52 -07:00
// Distributed lock owner (constant per running instance).
erasureLockOwner string
2018-02-15 17:45:57 -08:00
// List of endpoints provided on the command line.
2022-01-10 09:07:49 -08:00
endpoints PoolEndpoints
2018-02-15 17:45:57 -08:00
2020-03-24 18:53:24 -07:00
// String version of all the endpoints, an optimization
// to avoid url.String() conversion taking CPU on
// large disk setups.
endpointStrings [ ] string
2018-02-15 17:45:57 -08:00
// Total number of sets and the number of disks per set.
2020-08-26 19:29:35 -07:00
setCount , setDriveCount int
2021-01-16 12:08:02 -08:00
defaultParityCount int
2018-02-15 17:45:57 -08:00
2021-03-04 14:36:23 -08:00
poolIndex int
2021-01-26 22:21:51 +01:00
2021-03-18 19:19:02 +01:00
// A channel to send the set index to the MRF when
// any disk belonging to that set is connected
setReconnectEvent chan int
2020-01-16 03:30:32 +01:00
2018-02-15 17:45:57 -08:00
// Distribution algorithm of choice.
distributionAlgo string
2020-06-12 20:04:01 -07:00
deploymentID [ 16 ] byte
2018-02-15 17:45:57 -08:00
2021-05-11 17:19:15 +01:00
lastConnectDisksOpTime time . Time
2018-02-15 17:45:57 -08:00
}
2021-06-16 14:26:26 -07:00
func ( s * erasureSets ) getDiskMap ( ) map [ Endpoint ] StorageAPI {
diskMap := make ( map [ Endpoint ] StorageAPI )
2020-03-24 23:26:13 -07:00
2020-06-12 20:04:01 -07:00
s . erasureDisksMu . RLock ( )
defer s . erasureDisksMu . RUnlock ( )
2018-02-15 17:45:57 -08:00
for i := 0 ; i < s . setCount ; i ++ {
2020-08-26 19:29:35 -07:00
for j := 0 ; j < s . setDriveCount ; j ++ {
2020-06-12 20:04:01 -07:00
disk := s . erasureDisks [ i ] [ j ]
2020-09-16 21:14:35 -07:00
if disk == OfflineDisk {
2018-02-15 17:45:57 -08:00
continue
}
2020-03-24 23:26:13 -07:00
if ! disk . IsOnline ( ) {
2018-02-15 17:45:57 -08:00
continue
}
2021-06-16 14:26:26 -07:00
diskMap [ disk . Endpoint ( ) ] = disk
2018-02-15 17:45:57 -08:00
}
}
2020-03-24 23:26:13 -07:00
return diskMap
2018-02-15 17:45:57 -08:00
}
// Initializes a new StorageAPI from the endpoint argument, returns
// StorageAPI and also `format` which exists on the disk.
2024-01-23 14:11:46 -08:00
func connectEndpoint ( endpoint Endpoint ) ( StorageAPI , * formatErasureV3 , [ ] byte , error ) {
2023-08-01 10:54:26 -07:00
disk , err := newStorageAPI ( endpoint , storageOpts {
cleanUp : false ,
healthCheck : false ,
} )
2018-02-15 17:45:57 -08:00
if err != nil {
2024-01-23 14:11:46 -08:00
return nil , nil , nil , err
2018-02-15 17:45:57 -08:00
}
2024-01-25 12:45:46 -08:00
format , formatData , err := loadFormatErasureWithData ( disk , false )
2018-02-15 17:45:57 -08:00
if err != nil {
2020-08-18 14:37:26 -07:00
if errors . Is ( err , errUnformattedDisk ) {
2024-01-25 12:45:46 -08:00
info , derr := disk . DiskInfo ( context . TODO ( ) , DiskInfoOptions { } )
2020-08-18 14:37:26 -07:00
if derr != nil && info . RootDisk {
2024-01-12 01:48:36 -08:00
disk . Close ( )
2024-01-23 14:11:46 -08:00
return nil , nil , nil , fmt . Errorf ( "Drive: %s is a root drive" , disk )
2020-08-18 14:37:26 -07:00
}
}
2024-01-12 01:48:36 -08:00
disk . Close ( )
2024-01-23 14:11:46 -08:00
return nil , nil , nil , fmt . Errorf ( "Drive: %s returned %w" , disk , err ) // make sure to '%w' to wrap the error
2018-02-15 17:45:57 -08:00
}
2023-08-01 10:54:26 -07:00
disk . Close ( )
disk , err = newStorageAPI ( endpoint , storageOpts {
cleanUp : true ,
healthCheck : true ,
} )
if err != nil {
2024-01-23 14:11:46 -08:00
return nil , nil , nil , err
2023-08-01 10:54:26 -07:00
}
2024-01-23 14:11:46 -08:00
return disk , format , formatData , nil
2018-02-15 17:45:57 -08:00
}
2020-03-27 14:48:30 -07:00
// findDiskIndex - returns the i,j'th position of the input `diskID` against the reference
// format, after successful validation.
// - i'th position is the set index
// - j'th position is the disk index in the current set
2020-06-12 20:04:01 -07:00
func findDiskIndexByDiskID ( refFormat * formatErasureV3 , diskID string ) ( int , int , error ) {
2022-01-24 19:40:02 -08:00
if diskID == "" {
return - 1 , - 1 , errDiskNotFound
}
2020-03-27 14:48:30 -07:00
if diskID == offlineDiskUUID {
2022-08-04 16:10:08 -07:00
return - 1 , - 1 , fmt . Errorf ( "DriveID: %s is offline" , diskID )
2020-03-27 14:48:30 -07:00
}
2020-06-12 20:04:01 -07:00
for i := 0 ; i < len ( refFormat . Erasure . Sets ) ; i ++ {
for j := 0 ; j < len ( refFormat . Erasure . Sets [ 0 ] ) ; j ++ {
if refFormat . Erasure . Sets [ i ] [ j ] == diskID {
2020-03-27 14:48:30 -07:00
return i , j , nil
}
}
}
2022-08-04 16:10:08 -07:00
return - 1 , - 1 , fmt . Errorf ( "DriveID: %s not found" , diskID )
2020-03-27 14:48:30 -07:00
}
2018-02-15 17:45:57 -08:00
// findDiskIndex - returns the i,j'th position of the input `format` against the reference
// format, after successful validation.
2020-01-16 03:30:32 +01:00
// - i'th position is the set index
// - j'th position is the disk index in the current set
2020-06-12 20:04:01 -07:00
func findDiskIndex ( refFormat , format * formatErasureV3 ) ( int , int , error ) {
if err := formatErasureV3Check ( refFormat , format ) ; err != nil {
2018-02-15 17:45:57 -08:00
return 0 , 0 , err
}
2020-06-12 20:04:01 -07:00
if format . Erasure . This == offlineDiskUUID {
2022-08-04 16:10:08 -07:00
return - 1 , - 1 , fmt . Errorf ( "DriveID: %s is offline" , format . Erasure . This )
2018-02-15 17:45:57 -08:00
}
2020-06-12 20:04:01 -07:00
for i := 0 ; i < len ( refFormat . Erasure . Sets ) ; i ++ {
for j := 0 ; j < len ( refFormat . Erasure . Sets [ 0 ] ) ; j ++ {
if refFormat . Erasure . Sets [ i ] [ j ] == format . Erasure . This {
2018-02-15 17:45:57 -08:00
return i , j , nil
}
}
}
2022-08-04 16:10:08 -07:00
return - 1 , - 1 , fmt . Errorf ( "DriveID: %s not found" , format . Erasure . This )
2018-02-15 17:45:57 -08:00
}
2020-01-10 02:35:06 -08:00
// connectDisks - attempt to connect all the endpoints, loads format
2018-03-27 18:11:39 -07:00
// and re-arranges the disks in proper position.
2020-06-12 20:04:01 -07:00
func ( s * erasureSets ) connectDisks ( ) {
2021-05-11 17:19:15 +01:00
defer func ( ) {
s . lastConnectDisksOpTime = time . Now ( )
} ( )
2020-03-24 23:26:13 -07:00
var wg sync . WaitGroup
diskMap := s . getDiskMap ( )
2022-01-10 09:07:49 -08:00
for _ , endpoint := range s . endpoints . Endpoints {
2022-02-21 15:51:54 -08:00
cdisk := diskMap [ endpoint ]
if cdisk != nil && cdisk . IsOnline ( ) {
if s . lastConnectDisksOpTime . IsZero ( ) {
continue
}
// An online-disk means its a valid disk but it may be a re-connected disk
// we verify that here based on LastConn(), however we make sure to avoid
// putting it back into the s.erasureDisks by re-placing the disk again.
_ , setIndex , _ := cdisk . GetDiskLoc ( )
if setIndex != - 1 {
continue
}
2018-03-27 18:11:39 -07:00
}
2023-11-09 09:33:32 -08:00
if cdisk != nil {
// Close previous offline disk.
cdisk . Close ( )
}
2022-02-21 15:51:54 -08:00
2020-03-24 23:26:13 -07:00
wg . Add ( 1 )
go func ( endpoint Endpoint ) {
defer wg . Done ( )
2024-01-23 14:11:46 -08:00
disk , format , formatData , err := connectEndpoint ( endpoint )
2020-03-24 23:26:13 -07:00
if err != nil {
2020-09-04 17:09:02 -07:00
if endpoint . IsLocal && errors . Is ( err , errUnformattedDisk ) {
globalBackgroundHealState . pushHealLocalDisks ( endpoint )
} else {
printEndpointError ( endpoint , err , true )
}
2020-03-24 23:26:13 -07:00
return
}
2021-03-04 14:36:23 -08:00
if disk . IsLocal ( ) && disk . Healing ( ) != nil {
2020-10-24 13:23:08 -07:00
globalBackgroundHealState . pushHealLocalDisks ( disk . Endpoint ( ) )
}
2023-05-18 03:09:41 +08:00
s . erasureDisksMu . Lock ( )
2020-03-24 23:26:13 -07:00
setIndex , diskIndex , err := findDiskIndex ( s . format , format )
if err != nil {
2020-10-24 13:23:08 -07:00
printEndpointError ( endpoint , err , false )
2021-11-15 09:46:55 -08:00
disk . Close ( )
2023-05-18 03:09:41 +08:00
s . erasureDisksMu . Unlock ( )
2020-03-24 23:26:13 -07:00
return
}
2020-09-28 19:39:32 -07:00
2021-11-15 09:46:55 -08:00
if currentDisk := s . erasureDisks [ setIndex ] [ diskIndex ] ; currentDisk != nil {
if ! reflect . DeepEqual ( currentDisk . Endpoint ( ) , disk . Endpoint ( ) ) {
2022-08-04 16:10:08 -07:00
err = fmt . Errorf ( "Detected unexpected drive ordering refusing to use the drive: expecting %s, found %s, refusing to use the drive" ,
2021-11-15 09:46:55 -08:00
currentDisk . Endpoint ( ) , disk . Endpoint ( ) )
printEndpointError ( endpoint , err , false )
disk . Close ( )
s . erasureDisksMu . Unlock ( )
return
}
2020-06-12 20:04:01 -07:00
s . erasureDisks [ setIndex ] [ diskIndex ] . Close ( )
2020-04-03 18:06:31 -07:00
}
2023-08-01 10:54:26 -07:00
disk . SetDiskID ( format . Erasure . This )
2021-03-04 14:36:23 -08:00
disk . SetDiskLoc ( s . poolIndex , setIndex , diskIndex )
2024-01-23 14:11:46 -08:00
disk . SetFormatData ( formatData )
2023-08-01 10:54:26 -07:00
s . erasureDisks [ setIndex ] [ diskIndex ] = disk
2023-12-13 19:27:55 -08:00
2024-02-14 10:37:34 -08:00
if disk . IsLocal ( ) {
2023-12-29 09:30:10 -08:00
globalLocalDrivesMu . Lock ( )
2024-02-14 10:37:34 -08:00
if globalIsDistErasure {
globalLocalSetDrives [ s . poolIndex ] [ setIndex ] [ diskIndex ] = disk
}
for i , ldisk := range globalLocalDrives {
_ , k , l := ldisk . GetDiskLoc ( )
if k == setIndex && l == diskIndex {
globalLocalDrives [ i ] = disk
break
}
}
2023-12-29 09:30:10 -08:00
globalLocalDrivesMu . Unlock ( )
}
2024-02-14 10:37:34 -08:00
s . erasureDisksMu . Unlock ( )
2020-03-24 23:26:13 -07:00
} ( endpoint )
2018-03-27 18:11:39 -07:00
}
2021-03-18 19:19:02 +01:00
2020-03-24 23:26:13 -07:00
wg . Wait ( )
2018-03-27 18:11:39 -07:00
}
2018-02-15 17:45:57 -08:00
// monitorAndConnectEndpoints this is a monitoring loop to keep track of disconnected
// endpoints by reconnecting them and making sure to place them into right position in
// the set topology, this monitoring happens at a given monitoring interval.
2020-06-12 20:04:01 -07:00
func ( s * erasureSets ) monitorAndConnectEndpoints ( ctx context . Context , monitorInterval time . Duration ) {
2020-10-13 18:28:42 -07:00
r := rand . New ( rand . NewSource ( time . Now ( ) . UnixNano ( ) ) )
time . Sleep ( time . Duration ( r . Float64 ( ) * float64 ( time . Second ) ) )
// Pre-emptively connect the disks if possible.
s . connectDisks ( )
2020-12-16 14:33:05 -08:00
monitor := time . NewTimer ( monitorInterval )
defer monitor . Stop ( )
2018-02-15 17:45:57 -08:00
for {
select {
2020-03-19 00:19:29 +01:00
case <- ctx . Done ( ) :
2018-04-09 10:25:41 -07:00
return
2020-12-16 14:33:05 -08:00
case <- monitor . C :
2020-12-17 16:52:47 -08:00
if serverDebugLog {
2022-08-04 16:10:08 -07:00
console . Debugln ( "running drive monitoring" )
2020-12-17 16:52:47 -08:00
}
2020-12-17 12:35:02 -08:00
s . connectDisks ( )
2022-05-17 22:42:59 -07:00
// Reset the timer for next interval
monitor . Reset ( monitorInterval )
2020-12-16 14:33:05 -08:00
}
2019-11-13 12:17:45 -08:00
}
}
2020-09-25 19:21:52 -07:00
func ( s * erasureSets ) GetLockers ( setIndex int ) func ( ) ( [ ] dsync . NetLocker , string ) {
return func ( ) ( [ ] dsync . NetLocker , string ) {
2020-12-10 07:28:37 -08:00
lockers := make ( [ ] dsync . NetLocker , len ( s . erasureLockers [ setIndex ] ) )
2020-06-12 20:04:01 -07:00
copy ( lockers , s . erasureLockers [ setIndex ] )
2020-09-25 19:21:52 -07:00
return lockers , s . erasureLockOwner
2018-02-15 17:45:57 -08:00
}
}
2024-02-23 16:19:13 -08:00
func ( s * erasureSets ) GetEndpointStrings ( setIndex int ) func ( ) [ ] string {
return func ( ) [ ] string {
eps := make ( [ ] string , s . setDriveCount )
copy ( eps , s . endpointStrings [ setIndex * s . setDriveCount : setIndex * s . setDriveCount + s . setDriveCount ] )
return eps
}
}
2021-09-29 19:36:19 +01:00
func ( s * erasureSets ) GetEndpoints ( setIndex int ) func ( ) [ ] Endpoint {
return func ( ) [ ] Endpoint {
eps := make ( [ ] Endpoint , s . setDriveCount )
2024-02-23 16:19:13 -08:00
copy ( eps , s . endpoints . Endpoints [ setIndex * s . setDriveCount : setIndex * s . setDriveCount + s . setDriveCount ] )
2020-06-10 17:10:31 -07:00
return eps
}
}
2018-02-15 17:45:57 -08:00
// GetDisks returns a closure for a given set, which provides list of disks per set.
2020-06-12 20:04:01 -07:00
func ( s * erasureSets ) GetDisks ( setIndex int ) func ( ) [ ] StorageAPI {
2018-02-15 17:45:57 -08:00
return func ( ) [ ] StorageAPI {
2020-06-12 20:04:01 -07:00
s . erasureDisksMu . RLock ( )
defer s . erasureDisksMu . RUnlock ( )
2020-08-26 19:29:35 -07:00
disks := make ( [ ] StorageAPI , s . setDriveCount )
2020-06-12 20:04:01 -07:00
copy ( disks , s . erasureDisks [ setIndex ] )
2018-02-15 17:45:57 -08:00
return disks
}
}
2020-09-04 17:09:02 -07:00
// defaultMonitorConnectEndpointInterval is the interval to monitor endpoint connections.
// Must be bigger than defaultMonitorNewDiskInterval.
const defaultMonitorConnectEndpointInterval = defaultMonitorNewDiskInterval + time . Second * 5
2018-02-15 17:45:57 -08:00
// Initialize new set of erasure coded sets.
2022-01-10 09:07:49 -08:00
func newErasureSets ( ctx context . Context , endpoints PoolEndpoints , storageDisks [ ] StorageAPI , format * formatErasureV3 , defaultParityCount , poolIdx int ) ( * erasureSets , error ) {
2020-06-12 20:04:01 -07:00
setCount := len ( format . Erasure . Sets )
2020-08-26 19:29:35 -07:00
setDriveCount := len ( format . Erasure . Sets [ 0 ] )
2020-04-27 14:39:57 -07:00
2022-01-10 09:07:49 -08:00
endpointStrings := make ( [ ] string , len ( endpoints . Endpoints ) )
for i , endpoint := range endpoints . Endpoints {
2021-09-25 18:51:03 +01:00
endpointStrings [ i ] = endpoint . String ( )
}
2020-06-12 20:04:01 -07:00
// Initialize the erasure sets instance.
s := & erasureSets {
2021-01-16 12:08:02 -08:00
sets : make ( [ ] * erasureObjects , setCount ) ,
erasureDisks : make ( [ ] [ ] StorageAPI , setCount ) ,
erasureLockers : make ( [ ] [ ] dsync . NetLocker , setCount ) ,
2021-03-26 19:37:58 +01:00
erasureLockOwner : globalLocalNodeName ,
2021-01-16 12:08:02 -08:00
endpoints : endpoints ,
endpointStrings : endpointStrings ,
setCount : setCount ,
setDriveCount : setDriveCount ,
defaultParityCount : defaultParityCount ,
format : format ,
2021-03-18 19:19:02 +01:00
setReconnectEvent : make ( chan int ) ,
2021-01-16 12:08:02 -08:00
distributionAlgo : format . Erasure . DistributionAlgo ,
deploymentID : uuid . MustParse ( format . ID ) ,
2021-03-04 14:36:23 -08:00
poolIndex : poolIdx ,
2018-02-15 17:45:57 -08:00
}
2020-06-12 20:04:01 -07:00
mutex := newNSLock ( globalIsDistErasure )
2018-06-01 16:41:23 -07:00
2020-03-27 14:48:30 -07:00
for i := 0 ; i < setCount ; i ++ {
2020-08-26 19:29:35 -07:00
s . erasureDisks [ i ] = make ( [ ] StorageAPI , setDriveCount )
2020-12-10 07:28:37 -08:00
}
2022-01-02 09:15:06 -08:00
erasureLockers := map [ string ] dsync . NetLocker { }
2022-01-10 09:07:49 -08:00
for _ , endpoint := range endpoints . Endpoints {
2020-12-10 07:28:37 -08:00
if _ , ok := erasureLockers [ endpoint . Host ] ; ! ok {
erasureLockers [ endpoint . Host ] = newLockAPI ( endpoint )
}
2020-04-29 13:42:37 -07:00
}
2018-02-15 17:45:57 -08:00
2024-01-24 13:36:44 -08:00
var wg sync . WaitGroup
var lk sync . Mutex
2020-04-29 13:42:37 -07:00
for i := 0 ; i < setCount ; i ++ {
2022-01-02 09:15:06 -08:00
lockerEpSet := set . NewStringSet ( )
2020-08-26 19:29:35 -07:00
for j := 0 ; j < setDriveCount ; j ++ {
2024-01-24 13:36:44 -08:00
wg . Add ( 1 )
go func ( i int , endpoint Endpoint ) {
defer wg . Done ( )
lk . Lock ( )
// Only add lockers only one per endpoint and per erasure set.
if locker , ok := erasureLockers [ endpoint . Host ] ; ok && ! lockerEpSet . Contains ( endpoint . Host ) {
lockerEpSet . Add ( endpoint . Host )
s . erasureLockers [ i ] = append ( s . erasureLockers [ i ] , locker )
}
lk . Unlock ( )
} ( i , endpoints . Endpoints [ i * setDriveCount + j ] )
2020-03-04 16:18:32 -08:00
}
2022-01-24 11:28:45 -08:00
}
2024-01-24 13:36:44 -08:00
wg . Wait ( )
2020-03-04 16:18:32 -08:00
2022-01-24 11:28:45 -08:00
for i := 0 ; i < setCount ; i ++ {
wg . Add ( 1 )
go func ( i int ) {
defer wg . Done ( )
var innerWg sync . WaitGroup
for j := 0 ; j < setDriveCount ; j ++ {
disk := storageDisks [ i * setDriveCount + j ]
if disk == nil {
continue
}
2023-12-29 09:30:10 -08:00
if disk . IsLocal ( ) && globalIsDistErasure {
globalLocalDrivesMu . RLock ( )
ldisk := globalLocalSetDrives [ poolIdx ] [ i ] [ j ]
if ldisk == nil {
globalLocalDrivesMu . RUnlock ( )
continue
}
2024-01-12 01:48:36 -08:00
disk . Close ( )
2023-12-29 09:30:10 -08:00
disk = ldisk
globalLocalDrivesMu . RUnlock ( )
}
2022-01-24 11:28:45 -08:00
innerWg . Add ( 1 )
go func ( disk StorageAPI , i , j int ) {
defer innerWg . Done ( )
diskID , err := disk . GetDiskID ( )
if err != nil {
if ! errors . Is ( err , errUnformattedDisk ) {
2024-04-04 13:04:40 +01:00
bootLogIf ( ctx , err )
2022-01-24 11:28:45 -08:00
}
return
}
2022-01-24 19:40:02 -08:00
if diskID == "" {
return
}
2022-01-24 11:28:45 -08:00
m , n , err := findDiskIndexByDiskID ( format , diskID )
if err != nil {
2024-04-04 13:04:40 +01:00
bootLogIf ( ctx , err )
2022-01-24 11:28:45 -08:00
return
}
if m != i || n != j {
2024-04-04 13:04:40 +01:00
bootLogIf ( ctx , fmt . Errorf ( "Detected unexpected drive ordering refusing to use the drive - poolID: %s, found drive mounted at (set=%s, drive=%s) expected mount at (set=%s, drive=%s): %s(%s)" , humanize . Ordinal ( poolIdx + 1 ) , humanize . Ordinal ( m + 1 ) , humanize . Ordinal ( n + 1 ) , humanize . Ordinal ( i + 1 ) , humanize . Ordinal ( j + 1 ) , disk , diskID ) )
2022-01-24 11:28:45 -08:00
s . erasureDisks [ i ] [ j ] = & unrecognizedDisk { storage : disk }
return
}
disk . SetDiskLoc ( s . poolIndex , m , n )
s . erasureDisks [ m ] [ n ] = disk
} ( disk , i , j )
}
innerWg . Wait ( )
// Initialize erasure objects for a given set.
s . sets [ i ] = & erasureObjects {
2022-11-22 16:23:36 +01:00
setIndex : i ,
poolIndex : poolIdx ,
setDriveCount : setDriveCount ,
defaultParityCount : defaultParityCount ,
getDisks : s . GetDisks ( i ) ,
getLockers : s . GetLockers ( i ) ,
getEndpoints : s . GetEndpoints ( i ) ,
2024-02-23 16:19:13 -08:00
getEndpointStrings : s . GetEndpointStrings ( i ) ,
2022-11-22 16:23:36 +01:00
nsMutex : mutex ,
2022-01-24 11:28:45 -08:00
}
} ( i )
2018-02-15 17:45:57 -08:00
}
2022-01-24 11:28:45 -08:00
wg . Wait ( )
2020-12-10 07:28:37 -08:00
// start cleanup stale uploads go-routine.
2021-10-04 10:52:28 -07:00
go s . cleanupStaleUploads ( ctx )
2020-12-10 07:28:37 -08:00
2021-02-26 09:52:27 -08:00
// start cleanup of deleted objects.
2021-10-04 10:52:28 -07:00
go s . cleanupDeletedObjects ( ctx )
2021-02-26 09:52:27 -08:00
2018-02-15 17:45:57 -08:00
// Start the disk monitoring and connect routine.
2022-05-16 05:36:00 -07:00
if ! globalIsTesting {
go s . monitorAndConnectEndpoints ( ctx , defaultMonitorConnectEndpointInterval )
}
2020-01-16 03:30:32 +01:00
2018-02-15 17:45:57 -08:00
return s , nil
}
2021-10-04 10:52:28 -07:00
// cleanup ".trash/" folder every 5m minutes with sufficient sleep cycles, between each
// deletes a dynamic sleeper is used with a factor of 10 ratio with max delay between
// deletes to be 2 seconds.
func ( s * erasureSets ) cleanupDeletedObjects ( ctx context . Context ) {
timer := time . NewTimer ( globalAPIConfig . getDeleteCleanupInterval ( ) )
2021-02-26 09:52:27 -08:00
defer timer . Stop ( )
for {
select {
case <- ctx . Done ( ) :
return
case <- timer . C :
2022-02-11 14:22:48 -08:00
var wg sync . WaitGroup
2021-02-26 09:52:27 -08:00
for _ , set := range s . sets {
2022-02-11 14:22:48 -08:00
wg . Add ( 1 )
go func ( set * erasureObjects ) {
defer wg . Done ( )
if set == nil {
return
}
set . cleanupDeletedObjects ( ctx )
} ( set )
2021-02-26 09:52:27 -08:00
}
2022-02-11 14:22:48 -08:00
wg . Wait ( )
2022-05-17 22:42:59 -07:00
// Reset for the next interval
timer . Reset ( globalAPIConfig . getDeleteCleanupInterval ( ) )
2021-02-26 09:52:27 -08:00
}
}
}
2021-10-04 10:52:28 -07:00
func ( s * erasureSets ) cleanupStaleUploads ( ctx context . Context ) {
timer := time . NewTimer ( globalAPIConfig . getStaleUploadsCleanupInterval ( ) )
2021-02-05 19:23:48 -08:00
defer timer . Stop ( )
2020-12-10 07:28:37 -08:00
for {
select {
case <- ctx . Done ( ) :
return
2021-02-05 19:23:48 -08:00
case <- timer . C :
2022-02-11 14:22:48 -08:00
var wg sync . WaitGroup
2020-12-10 07:28:37 -08:00
for _ , set := range s . sets {
2022-02-11 14:22:48 -08:00
wg . Add ( 1 )
go func ( set * erasureObjects ) {
defer wg . Done ( )
if set == nil {
return
}
set . cleanupStaleUploads ( ctx , globalAPIConfig . getStaleUploadsExpiry ( ) )
} ( set )
2020-12-10 07:28:37 -08:00
}
2022-02-11 14:22:48 -08:00
wg . Wait ( )
2022-05-17 22:42:59 -07:00
// Reset for the next interval
timer . Reset ( globalAPIConfig . getStaleUploadsCleanupInterval ( ) )
2020-12-10 07:28:37 -08:00
}
}
}
2021-01-26 22:21:51 +01:00
type auditObjectOp struct {
2022-07-07 17:04:25 -07:00
Name string ` json:"name" `
2021-01-26 13:39:55 -08:00
Pool int ` json:"poolId" `
Set int ` json:"setId" `
Disks [ ] string ` json:"disks" `
2021-01-26 22:21:51 +01:00
}
2022-05-04 08:45:27 +01:00
// Add erasure set information to the current context
2021-03-04 14:36:23 -08:00
func auditObjectErasureSet ( ctx context . Context , object string , set * erasureObjects ) {
2021-10-28 07:35:28 -07:00
if len ( logger . AuditTargets ( ) ) == 0 {
2021-01-26 22:21:51 +01:00
return
}
op := auditObjectOp {
2024-02-23 16:19:13 -08:00
Name : decodeDirObject ( object ) ,
2021-03-04 14:36:23 -08:00
Pool : set . poolIndex + 1 ,
Set : set . setIndex + 1 ,
2024-02-23 16:19:13 -08:00
Disks : set . getEndpointStrings ( ) ,
2021-01-26 22:21:51 +01:00
}
2022-07-07 17:04:25 -07:00
logger . GetReqInfo ( ctx ) . AppendTags ( "objectLocation" , op )
2021-01-26 22:21:51 +01:00
}
2019-11-13 12:17:45 -08:00
// NewNSLock - initialize a new namespace RWLocker instance.
2020-11-04 08:25:42 -08:00
func ( s * erasureSets ) NewNSLock ( bucket string , objects ... string ) RWLocker {
2020-02-21 11:29:57 +05:30
if len ( objects ) == 1 {
2020-11-04 08:25:42 -08:00
return s . getHashedSet ( objects [ 0 ] ) . NewNSLock ( bucket , objects ... )
2020-02-21 11:29:57 +05:30
}
2020-11-04 08:25:42 -08:00
return s . getHashedSet ( "" ) . NewNSLock ( bucket , objects ... )
2019-11-13 12:17:45 -08:00
}
2020-08-05 13:31:12 -07:00
// SetDriveCount returns the current drives per set.
func ( s * erasureSets ) SetDriveCount ( ) int {
2020-08-26 19:29:35 -07:00
return s . setDriveCount
2020-08-05 13:31:12 -07:00
}
2021-01-16 12:08:02 -08:00
// ParityCount returns the default parity count used while erasure
// coding objects
func ( s * erasureSets ) ParityCount ( ) int {
return s . defaultParityCount
}
2020-05-28 13:03:04 -07:00
// StorageInfo - combines output of StorageInfo across all erasure coded object sets.
2022-12-01 14:31:35 -08:00
func ( s * erasureSets ) StorageInfo ( ctx context . Context ) StorageInfo {
2021-03-04 14:36:23 -08:00
var storageInfo madmin . StorageInfo
2019-08-23 08:32:40 +05:30
2021-03-04 14:36:23 -08:00
storageInfos := make ( [ ] madmin . StorageInfo , len ( s . sets ) )
2019-10-14 09:44:51 -07:00
g := errgroup . WithNErrs ( len ( s . sets ) )
for index := range s . sets {
index := index
g . Go ( func ( ) error {
2022-12-01 14:31:35 -08:00
storageInfos [ index ] = s . sets [ index ] . StorageInfo ( ctx )
2019-10-14 09:44:51 -07:00
return nil
} , index )
2019-08-23 08:32:40 +05:30
}
2019-10-14 09:44:51 -07:00
2019-08-23 08:32:40 +05:30
// Wait for the go routines.
2019-10-14 09:44:51 -07:00
g . Wait ( )
2019-08-23 08:32:40 +05:30
for _ , lstorageInfo := range storageInfos {
2020-07-13 09:51:07 -07:00
storageInfo . Disks = append ( storageInfo . Disks , lstorageInfo . Disks ... )
2018-02-15 17:45:57 -08:00
}
2022-12-01 14:31:35 -08:00
return storageInfo
2018-02-15 17:45:57 -08:00
}
2021-03-02 17:28:04 -08:00
// StorageInfo - combines output of StorageInfo across all erasure coded object sets.
2023-12-21 16:56:43 -08:00
func ( s * erasureSets ) LocalStorageInfo ( ctx context . Context , metrics bool ) StorageInfo {
2021-03-02 17:28:04 -08:00
var storageInfo StorageInfo
storageInfos := make ( [ ] StorageInfo , len ( s . sets ) )
g := errgroup . WithNErrs ( len ( s . sets ) )
for index := range s . sets {
index := index
g . Go ( func ( ) error {
2023-12-21 16:56:43 -08:00
storageInfos [ index ] = s . sets [ index ] . LocalStorageInfo ( ctx , metrics )
2021-03-02 17:28:04 -08:00
return nil
} , index )
}
// Wait for the go routines.
g . Wait ( )
for _ , lstorageInfo := range storageInfos {
storageInfo . Disks = append ( storageInfo . Disks , lstorageInfo . Disks ... )
}
2022-12-01 14:31:35 -08:00
return storageInfo
2021-03-02 17:28:04 -08:00
}
2018-02-15 17:45:57 -08:00
// Shutdown shutsdown all erasure coded sets in parallel
// returns error upon first error.
2020-06-12 20:04:01 -07:00
func ( s * erasureSets ) Shutdown ( ctx context . Context ) error {
2018-02-15 17:45:57 -08:00
g := errgroup . WithNErrs ( len ( s . sets ) )
for index := range s . sets {
index := index
g . Go ( func ( ) error {
2018-03-14 12:01:47 -07:00
return s . sets [ index ] . Shutdown ( ctx )
2018-02-15 17:45:57 -08:00
} , index )
}
for _ , err := range g . Wait ( ) {
if err != nil {
return err
}
}
2020-09-10 09:18:19 -07:00
select {
2021-03-18 19:19:02 +01:00
case _ , ok := <- s . setReconnectEvent :
2020-09-10 09:18:19 -07:00
if ok {
2024-01-28 10:04:17 -08:00
xioutil . SafeClose ( s . setReconnectEvent )
2020-09-10 09:18:19 -07:00
}
default :
2024-01-28 10:04:17 -08:00
xioutil . SafeClose ( s . setReconnectEvent )
2020-09-10 09:18:19 -07:00
}
2018-02-15 17:45:57 -08:00
return nil
}
// hashes the key returning an integer based on the input algorithm.
// This function currently supports
// - CRCMOD
2020-06-12 20:04:01 -07:00
// - SIPMOD
2018-02-15 17:45:57 -08:00
// - all new algos.
2020-06-12 20:04:01 -07:00
func sipHashMod ( key string , cardinality int , id [ 16 ] byte ) int {
if cardinality <= 0 {
return - 1
}
2021-02-26 16:53:06 -08:00
// use the faster version as per siphash docs
// https://github.com/dchest/siphash#usage
k0 , k1 := binary . LittleEndian . Uint64 ( id [ 0 : 8 ] ) , binary . LittleEndian . Uint64 ( id [ 8 : 16 ] )
sum64 := siphash . Hash ( k0 , k1 , [ ] byte ( key ) )
return int ( sum64 % uint64 ( cardinality ) )
2020-06-12 20:04:01 -07:00
}
2018-02-15 17:45:57 -08:00
func crcHashMod ( key string , cardinality int ) int {
if cardinality <= 0 {
return - 1
}
keyCrc := crc32 . Checksum ( [ ] byte ( key ) , crc32 . IEEETable )
return int ( keyCrc % uint32 ( cardinality ) )
}
2020-06-12 20:04:01 -07:00
func hashKey ( algo string , key string , cardinality int , id [ 16 ] byte ) int {
2018-02-15 17:45:57 -08:00
switch algo {
2021-01-16 12:08:02 -08:00
case formatErasureVersionV2DistributionAlgoV1 :
2018-02-15 17:45:57 -08:00
return crcHashMod ( key , cardinality )
2021-01-16 12:08:02 -08:00
case formatErasureVersionV3DistributionAlgoV2 , formatErasureVersionV3DistributionAlgoV3 :
2020-06-12 20:04:01 -07:00
return sipHashMod ( key , cardinality , id )
2018-08-06 19:26:40 +02:00
default :
// Unknown algorithm returns -1, also if cardinality is lesser than 0.
return - 1
2018-02-15 17:45:57 -08:00
}
}
2019-05-13 20:25:49 +01:00
// Returns always a same erasure coded set for a given input.
2020-06-12 20:04:01 -07:00
func ( s * erasureSets ) getHashedSetIndex ( input string ) int {
return hashKey ( s . distributionAlgo , input , len ( s . sets ) , s . deploymentID )
2019-05-13 20:25:49 +01:00
}
2018-02-15 17:45:57 -08:00
// Returns always a same erasure coded set for a given input.
2020-06-12 20:04:01 -07:00
func ( s * erasureSets ) getHashedSet ( input string ) ( set * erasureObjects ) {
2019-05-13 20:25:49 +01:00
return s . sets [ s . getHashedSetIndex ( input ) ]
2018-02-15 17:45:57 -08:00
}
2022-07-25 17:51:32 -07:00
// listDeletedBuckets lists deleted buckets from all disks.
func listDeletedBuckets ( ctx context . Context , storageDisks [ ] StorageAPI , delBuckets map [ string ] VolInfo , readQuorum int ) error {
g := errgroup . WithNErrs ( len ( storageDisks ) )
var mu sync . Mutex
for index := range storageDisks {
index := index
g . Go ( func ( ) error {
if storageDisks [ index ] == nil {
// we ignore disk not found errors
return nil
}
2024-01-30 12:43:25 -08:00
volsInfo , err := storageDisks [ index ] . ListDir ( ctx , "" , minioMetaBucket , pathJoin ( bucketMetaPrefix , deletedBucketsPrefix ) , - 1 )
2022-07-25 17:51:32 -07:00
if err != nil {
2022-12-29 00:08:31 -08:00
if errors . Is ( err , errFileNotFound ) {
2022-07-25 17:51:32 -07:00
return nil
}
return err
}
for _ , volName := range volsInfo {
2022-12-29 00:08:31 -08:00
vi , err := storageDisks [ index ] . StatVol ( ctx , pathJoin ( minioMetaBucket , bucketMetaPrefix , deletedBucketsPrefix , volName ) )
if err == nil {
vi . Name = strings . TrimSuffix ( volName , SlashSeparator )
mu . Lock ( )
if _ , ok := delBuckets [ volName ] ; ! ok {
delBuckets [ volName ] = vi
2022-07-25 17:51:32 -07:00
}
2022-12-29 00:08:31 -08:00
mu . Unlock ( )
2022-07-25 17:51:32 -07:00
}
}
return nil
} , index )
}
return reduceReadQuorumErrs ( ctx , g . Wait ( ) , bucketMetadataOpIgnoredErrs , readQuorum )
}
2018-02-15 17:45:57 -08:00
// --- Object Operations ---
2018-09-20 19:22:09 -07:00
// GetObjectNInfo - returns object info and locked object ReadCloser
2023-04-17 12:16:37 -07:00
func ( s * erasureSets ) GetObjectNInfo ( ctx context . Context , bucket , object string , rs * HTTPRangeSpec , h http . Header , opts ObjectOptions ) ( gr * GetObjectReader , err error ) {
2021-01-26 22:21:51 +01:00
set := s . getHashedSet ( object )
2023-04-17 12:16:37 -07:00
return set . GetObjectNInfo ( ctx , bucket , object , rs , h , opts )
2018-09-20 19:22:09 -07:00
}
2018-02-15 17:45:57 -08:00
// PutObject - writes an object to hashedSet based on the object name.
2020-06-12 20:04:01 -07:00
func ( s * erasureSets ) PutObject ( ctx context . Context , bucket string , object string , data * PutObjReader , opts ObjectOptions ) ( objInfo ObjectInfo , err error ) {
2021-01-26 22:21:51 +01:00
set := s . getHashedSet ( object )
return set . PutObject ( ctx , bucket , object , data , opts )
2018-02-15 17:45:57 -08:00
}
// GetObjectInfo - reads object metadata from the hashedSet based on the object name.
2020-06-12 20:04:01 -07:00
func ( s * erasureSets ) GetObjectInfo ( ctx context . Context , bucket , object string , opts ObjectOptions ) ( objInfo ObjectInfo , err error ) {
2021-01-26 22:21:51 +01:00
set := s . getHashedSet ( object )
return set . GetObjectInfo ( ctx , bucket , object , opts )
2018-02-15 17:45:57 -08:00
}
2021-06-16 02:43:14 +01:00
func ( s * erasureSets ) deletePrefix ( ctx context . Context , bucket string , prefix string ) error {
2021-09-17 19:34:48 -07:00
var wg sync . WaitGroup
wg . Add ( len ( s . sets ) )
2021-06-16 02:43:14 +01:00
for _ , s := range s . sets {
2021-09-17 19:34:48 -07:00
go func ( s * erasureObjects ) {
defer wg . Done ( )
// This is a force delete, no reason to throw errors.
s . DeleteObject ( ctx , bucket , prefix , ObjectOptions { DeletePrefix : true } )
} ( s )
2021-06-16 02:43:14 +01:00
}
2021-09-17 19:34:48 -07:00
wg . Wait ( )
2021-06-16 02:43:14 +01:00
return nil
}
2018-02-15 17:45:57 -08:00
// DeleteObject - deletes an object from the hashedSet based on the object name.
2020-06-12 20:04:01 -07:00
func ( s * erasureSets ) DeleteObject ( ctx context . Context , bucket string , object string , opts ObjectOptions ) ( objInfo ObjectInfo , err error ) {
2023-08-09 16:30:22 -07:00
if opts . DeletePrefix && ! opts . DeletePrefixObject {
2021-06-16 02:43:14 +01:00
err := s . deletePrefix ( ctx , bucket , object )
return ObjectInfo { } , err
}
2022-05-04 08:45:27 +01:00
set := s . getHashedSet ( object )
2021-01-26 22:21:51 +01:00
return set . DeleteObject ( ctx , bucket , object , opts )
2018-02-15 17:45:57 -08:00
}
2019-05-13 20:25:49 +01:00
// DeleteObjects - bulk delete of objects
// Bulk delete is only possible within one set. For that purpose
// objects are group by set first, and then bulk delete is invoked
// for each set, the error response of each delete will be returned
2020-06-12 20:04:01 -07:00
func ( s * erasureSets ) DeleteObjects ( ctx context . Context , bucket string , objects [ ] ObjectToDelete , opts ObjectOptions ) ( [ ] DeletedObject , [ ] error ) {
2019-05-13 20:25:49 +01:00
type delObj struct {
// Set index associated to this object
setIndex int
// Original index from the list of arguments
// where this object is passed
origIndex int
2020-06-12 20:04:01 -07:00
// object to delete
object ObjectToDelete
2019-05-13 20:25:49 +01:00
}
// Transform []delObj to the list of object names
2020-06-12 20:04:01 -07:00
toNames := func ( delObjs [ ] delObj ) [ ] ObjectToDelete {
objs := make ( [ ] ObjectToDelete , len ( delObjs ) )
2019-05-13 20:25:49 +01:00
for i , obj := range delObjs {
2020-06-12 20:04:01 -07:00
objs [ i ] = obj . object
2019-05-13 20:25:49 +01:00
}
2020-06-12 20:04:01 -07:00
return objs
2019-05-13 20:25:49 +01:00
}
// The result of delete operation on all passed objects
2022-01-02 09:15:06 -08:00
delErrs := make ( [ ] error , len ( objects ) )
2019-05-13 20:25:49 +01:00
2020-06-12 20:04:01 -07:00
// The result of delete objects
2022-01-02 09:15:06 -08:00
delObjects := make ( [ ] DeletedObject , len ( objects ) )
2020-06-12 20:04:01 -07:00
2019-05-13 20:25:49 +01:00
// A map between a set and its associated objects
2022-01-02 09:15:06 -08:00
objSetMap := make ( map [ int ] [ ] delObj )
2019-05-13 20:25:49 +01:00
// Group objects by set index
for i , object := range objects {
2020-06-12 20:04:01 -07:00
index := s . getHashedSetIndex ( object . ObjectName )
objSetMap [ index ] = append ( objSetMap [ index ] , delObj { setIndex : index , origIndex : i , object : object } )
2019-05-13 20:25:49 +01:00
}
// Invoke bulk delete on objects per set and save
// the result of the delete operation
2022-01-06 10:47:49 -08:00
var wg sync . WaitGroup
var mu sync . Mutex
wg . Add ( len ( objSetMap ) )
for setIdx , objsGroup := range objSetMap {
go func ( set * erasureObjects , group [ ] delObj ) {
defer wg . Done ( )
dobjects , errs := set . DeleteObjects ( ctx , bucket , toNames ( group ) , opts )
mu . Lock ( )
defer mu . Unlock ( )
for i , obj := range group {
delErrs [ obj . origIndex ] = errs [ i ]
delObjects [ obj . origIndex ] = dobjects [ i ]
2021-01-26 22:21:51 +01:00
}
2022-01-06 10:47:49 -08:00
} ( s . sets [ setIdx ] , objsGroup )
2019-05-13 20:25:49 +01:00
}
2022-01-06 10:47:49 -08:00
wg . Wait ( )
2019-05-13 20:25:49 +01:00
2020-06-12 20:04:01 -07:00
return delObjects , delErrs
2019-05-13 20:25:49 +01:00
}
2018-02-15 17:45:57 -08:00
// CopyObject - copies objects from one hashedSet to another hashedSet, on server side.
2020-06-12 20:04:01 -07:00
func ( s * erasureSets ) CopyObject ( ctx context . Context , srcBucket , srcObject , dstBucket , dstObject string , srcInfo ObjectInfo , srcOpts , dstOpts ObjectOptions ) ( objInfo ObjectInfo , err error ) {
2018-02-15 17:45:57 -08:00
srcSet := s . getHashedSet ( srcObject )
2020-05-28 14:36:38 -07:00
dstSet := s . getHashedSet ( dstObject )
2018-02-15 17:45:57 -08:00
2020-08-03 16:21:10 -07:00
cpSrcDstSame := srcSet == dstSet
2018-02-15 17:45:57 -08:00
// Check if this request is only metadata update.
2020-08-03 16:21:10 -07:00
if cpSrcDstSame && srcInfo . metadataOnly {
2020-09-14 15:57:13 -07:00
// Version ID is set for the destination and source == destination version ID.
// perform an in-place update.
2020-06-19 08:44:51 -07:00
if dstOpts . VersionID != "" && srcOpts . VersionID == dstOpts . VersionID {
2022-06-21 19:20:11 -07:00
srcInfo . Reader . Close ( ) // We are not interested in the reader stream at this point close it.
2020-06-19 08:44:51 -07:00
return srcSet . CopyObject ( ctx , srcBucket , srcObject , dstBucket , dstObject , srcInfo , srcOpts , dstOpts )
}
2020-09-14 15:57:13 -07:00
// Destination is not versioned and source version ID is empty
// perform an in-place update.
2020-06-19 08:44:51 -07:00
if ! dstOpts . Versioned && srcOpts . VersionID == "" {
2022-06-21 19:20:11 -07:00
srcInfo . Reader . Close ( ) // We are not interested in the reader stream at this point close it.
2020-06-19 08:44:51 -07:00
return srcSet . CopyObject ( ctx , srcBucket , srcObject , dstBucket , dstObject , srcInfo , srcOpts , dstOpts )
}
2020-08-03 16:21:10 -07:00
// CopyObject optimization where we don't create an entire copy
// of the content, instead we add a reference, we disallow legacy
// objects to be self referenced in this manner so make sure
// that we actually create a new dataDir for legacy objects.
if dstOpts . Versioned && srcOpts . VersionID != dstOpts . VersionID && ! srcInfo . Legacy {
srcInfo . versionOnly = true
2022-06-21 19:20:11 -07:00
srcInfo . Reader . Close ( ) // We are not interested in the reader stream at this point close it.
2020-08-03 16:21:10 -07:00
return srcSet . CopyObject ( ctx , srcBucket , srcObject , dstBucket , dstObject , srcInfo , srcOpts , dstOpts )
}
2018-02-15 17:45:57 -08:00
}
2020-06-17 11:13:41 -07:00
putOpts := ObjectOptions {
ServerSideEncryption : dstOpts . ServerSideEncryption ,
UserDefined : srcInfo . UserDefined ,
Versioned : dstOpts . Versioned ,
VersionID : dstOpts . VersionID ,
2020-11-19 11:50:22 -08:00
MTime : dstOpts . MTime ,
2020-06-17 11:13:41 -07:00
}
2020-06-19 08:44:51 -07:00
2020-05-28 14:36:38 -07:00
return dstSet . putObject ( ctx , dstBucket , dstObject , srcInfo . PutObjReader , putOpts )
2018-02-15 17:45:57 -08:00
}
2020-06-12 20:04:01 -07:00
func ( s * erasureSets ) ListMultipartUploads ( ctx context . Context , bucket , prefix , keyMarker , uploadIDMarker , delimiter string , maxUploads int ) ( result ListMultipartsInfo , err error ) {
2018-02-15 17:45:57 -08:00
// In list multipart uploads we are going to treat input prefix as the object,
// this means that we are not supporting directory navigation.
2021-01-26 22:21:51 +01:00
set := s . getHashedSet ( prefix )
return set . ListMultipartUploads ( ctx , bucket , prefix , keyMarker , uploadIDMarker , delimiter , maxUploads )
2018-02-15 17:45:57 -08:00
}
// Initiate a new multipart upload on a hashedSet based on object name.
2022-08-30 01:57:16 +02:00
func ( s * erasureSets ) NewMultipartUpload ( ctx context . Context , bucket , object string , opts ObjectOptions ) ( res * NewMultipartUploadResult , err error ) {
2021-01-26 22:21:51 +01:00
set := s . getHashedSet ( object )
return set . NewMultipartUpload ( ctx , bucket , object , opts )
2018-02-15 17:45:57 -08:00
}
// PutObjectPart - writes part of an object to hashedSet based on the object name.
2020-06-12 20:04:01 -07:00
func ( s * erasureSets ) PutObjectPart ( ctx context . Context , bucket , object , uploadID string , partID int , data * PutObjReader , opts ObjectOptions ) ( info PartInfo , err error ) {
2021-01-26 22:21:51 +01:00
set := s . getHashedSet ( object )
return set . PutObjectPart ( ctx , bucket , object , uploadID , partID , data , opts )
2018-02-15 17:45:57 -08:00
}
2020-05-28 12:36:20 -07:00
// GetMultipartInfo - return multipart metadata info uploaded at hashedSet.
2020-06-12 20:04:01 -07:00
func ( s * erasureSets ) GetMultipartInfo ( ctx context . Context , bucket , object , uploadID string , opts ObjectOptions ) ( result MultipartInfo , err error ) {
2021-01-26 22:21:51 +01:00
set := s . getHashedSet ( object )
return set . GetMultipartInfo ( ctx , bucket , object , uploadID , opts )
2020-05-28 12:36:20 -07:00
}
2018-02-15 17:45:57 -08:00
// ListObjectParts - lists all uploaded parts to an object in hashedSet.
2020-06-12 20:04:01 -07:00
func ( s * erasureSets ) ListObjectParts ( ctx context . Context , bucket , object , uploadID string , partNumberMarker int , maxParts int , opts ObjectOptions ) ( result ListPartsInfo , err error ) {
2021-01-26 22:21:51 +01:00
set := s . getHashedSet ( object )
return set . ListObjectParts ( ctx , bucket , object , uploadID , partNumberMarker , maxParts , opts )
2018-02-15 17:45:57 -08:00
}
// Aborts an in-progress multipart operation on hashedSet based on the object name.
2020-09-14 15:57:13 -07:00
func ( s * erasureSets ) AbortMultipartUpload ( ctx context . Context , bucket , object , uploadID string , opts ObjectOptions ) error {
2021-01-26 22:21:51 +01:00
set := s . getHashedSet ( object )
return set . AbortMultipartUpload ( ctx , bucket , object , uploadID , opts )
2018-02-15 17:45:57 -08:00
}
// CompleteMultipartUpload - completes a pending multipart transaction, on hashedSet based on object name.
2020-06-12 20:04:01 -07:00
func ( s * erasureSets ) CompleteMultipartUpload ( ctx context . Context , bucket , object , uploadID string , uploadedParts [ ] CompletePart , opts ObjectOptions ) ( objInfo ObjectInfo , err error ) {
2021-01-26 22:21:51 +01:00
set := s . getHashedSet ( object )
return set . CompleteMultipartUpload ( ctx , bucket , object , uploadID , uploadedParts , opts )
2018-02-15 17:45:57 -08:00
}
/ *
All disks online
-- -- -- -- -- -- -- -- -
- All Unformatted - format all and return success .
- Some Unformatted - format all and return success .
- Any JBOD inconsistent - return failure
- Some are corrupt ( missing format . json ) - return failure
- Any unrecognized disks - return failure
Some disks are offline and we have quorum .
-- -- -- -- -- -- -- -- -
- Some unformatted - format all and return success ,
treat disks offline as corrupted .
- Any JBOD inconsistent - return failure
- Some are corrupt ( missing format . json )
- Any unrecognized disks - return failure
No read quorum
-- -- -- -- -- -- -- -- -
failure for all cases .
// Pseudo code for managing `format.json`.
// Generic checks.
if ( no quorum ) return error
if ( any disk is corrupt ) return error // Always error
if ( jbod inconsistent ) return error // Always error.
if ( disks not recognized ) // Always error.
// Specific checks.
if ( all disks online )
if ( all disks return format . json )
if ( jbod consistent )
if ( all disks recognized )
return
else
if ( all disks return format . json not found )
return error
else ( some disks return format . json not found )
( heal format )
return
fi
fi
else
if ( some disks return format . json not found )
// Offline disks are marked as dead.
( heal format ) // Offline disks should be marked as dead.
return success
fi
fi
* /
2020-07-13 09:51:07 -07:00
func formatsToDrivesInfo ( endpoints Endpoints , formats [ ] * formatErasureV3 , sErrs [ ] error ) ( beforeDrives [ ] madmin . HealDriveInfo ) {
beforeDrives = make ( [ ] madmin . HealDriveInfo , len ( endpoints ) )
2018-02-15 17:45:57 -08:00
// Existing formats are available (i.e. ok), so save it in
// result, also populate disks to be healed.
for i , format := range formats {
drive := endpoints . GetString ( i )
2022-01-02 09:15:06 -08:00
state := madmin . DriveStateCorrupt
2018-02-15 17:45:57 -08:00
switch {
case format != nil :
2019-08-30 14:11:18 -07:00
state = madmin . DriveStateOk
2018-02-15 17:45:57 -08:00
case sErrs [ i ] == errUnformattedDisk :
2019-08-30 14:11:18 -07:00
state = madmin . DriveStateMissing
2019-08-03 00:47:26 +05:30
case sErrs [ i ] == errDiskNotFound :
2019-08-30 14:11:18 -07:00
state = madmin . DriveStateOffline
}
2020-07-13 09:51:07 -07:00
beforeDrives [ i ] = madmin . HealDriveInfo {
2019-08-30 14:11:18 -07:00
UUID : func ( ) string {
if format != nil {
2020-06-12 20:04:01 -07:00
return format . Erasure . This
2019-08-30 14:11:18 -07:00
}
return ""
} ( ) ,
Endpoint : drive ,
State : state ,
2018-02-15 17:45:57 -08:00
}
}
return beforeDrives
}
2018-04-30 20:37:39 -07:00
// HealFormat - heals missing `format.json` on fresh unformatted disks.
2020-06-12 20:04:01 -07:00
func ( s * erasureSets ) HealFormat ( ctx context . Context , dryRun bool ) ( res madmin . HealResultItem , err error ) {
2023-08-01 10:54:26 -07:00
storageDisks , _ := initStorageDisksWithErrors ( s . endpoints . Endpoints , storageOpts {
2024-01-23 14:11:46 -08:00
cleanUp : false ,
2024-01-12 01:48:36 -08:00
healthCheck : false ,
2023-08-01 10:54:26 -07:00
} )
2018-04-09 10:25:41 -07:00
defer func ( storageDisks [ ] StorageAPI ) {
if err != nil {
2022-05-30 10:58:37 -07:00
closeStorageDisks ( storageDisks ... )
2018-04-09 10:25:41 -07:00
}
} ( storageDisks )
2018-04-03 23:58:48 -05:00
2020-06-12 20:04:01 -07:00
formats , sErrs := loadFormatErasureAll ( storageDisks , true )
2021-01-29 11:40:55 -08:00
if err = checkFormatErasureValues ( formats , storageDisks , s . setDriveCount ) ; err != nil {
2018-02-15 17:45:57 -08:00
return madmin . HealResultItem { } , err
}
2020-09-04 17:09:02 -07:00
refFormat , err := getFormatErasureInQuorum ( formats )
if err != nil {
return res , err
}
2018-02-15 17:45:57 -08:00
// Prepare heal-result
2018-04-09 10:25:41 -07:00
res = madmin . HealResultItem {
2018-02-15 17:45:57 -08:00
Type : madmin . HealItemMetadata ,
Detail : "disk-format" ,
2020-08-26 19:29:35 -07:00
DiskCount : s . setCount * s . setDriveCount ,
2018-02-15 17:45:57 -08:00
SetCount : s . setCount ,
}
// Fetch all the drive info status.
2022-01-10 09:07:49 -08:00
beforeDrives := formatsToDrivesInfo ( s . endpoints . Endpoints , formats , sErrs )
2018-02-15 17:45:57 -08:00
res . After . Drives = make ( [ ] madmin . HealDriveInfo , len ( beforeDrives ) )
res . Before . Drives = make ( [ ] madmin . HealDriveInfo , len ( beforeDrives ) )
// Copy "after" drive state too from before.
for k , v := range beforeDrives {
2021-03-04 14:36:23 -08:00
res . Before . Drives [ k ] = v
res . After . Drives [ k ] = v
2018-02-15 17:45:57 -08:00
}
2019-09-24 18:47:26 -07:00
if countErrs ( sErrs , errUnformattedDisk ) == 0 {
2018-04-30 20:37:39 -07:00
return res , errNoHealRequired
}
2024-01-16 15:13:14 -08:00
if ! reflect . DeepEqual ( s . format , refFormat ) {
// Format is corrupted and unrecognized by the running instance.
2024-04-04 13:04:40 +01:00
healingLogIf ( ctx , fmt . Errorf ( "Unable to heal the newly replaced drives due to format.json inconsistencies, please engage MinIO support for further assistance: %w" ,
2024-01-16 15:13:14 -08:00
errCorruptedFormat ) )
return res , errCorruptedFormat
}
2023-01-06 05:41:19 +01:00
formatOpID := mustGetUUID ( )
2018-02-15 17:45:57 -08:00
// Initialize a new set of set formats which will be written to disk.
2023-12-29 15:52:41 -08:00
newFormatSets , currentDisksInfo := newHealFormatSets ( refFormat , s . setCount , s . setDriveCount , formats , sErrs )
2018-02-15 17:45:57 -08:00
if ! dryRun {
2022-01-02 09:15:06 -08:00
tmpNewFormats := make ( [ ] * formatErasureV3 , s . setCount * s . setDriveCount )
2018-02-15 17:45:57 -08:00
for i := range newFormatSets {
for j := range newFormatSets [ i ] {
if newFormatSets [ i ] [ j ] == nil {
continue
}
2020-10-26 10:29:29 -07:00
res . After . Drives [ i * s . setDriveCount + j ] . UUID = newFormatSets [ i ] [ j ] . Erasure . This
res . After . Drives [ i * s . setDriveCount + j ] . State = madmin . DriveStateOk
2020-08-26 19:29:35 -07:00
tmpNewFormats [ i * s . setDriveCount + j ] = newFormatSets [ i ] [ j ]
2018-02-15 17:45:57 -08:00
}
}
2020-10-31 01:34:48 -07:00
// Save new formats `format.json` on unformatted disks.
2021-11-04 16:42:49 -07:00
for index , format := range tmpNewFormats {
if storageDisks [ index ] == nil || format == nil {
continue
}
2023-01-06 05:41:19 +01:00
if err := saveFormatErasure ( storageDisks [ index ] , format , formatOpID ) ; err != nil {
2024-04-04 13:04:40 +01:00
healingLogIf ( ctx , fmt . Errorf ( "Drive %s failed to write updated 'format.json': %v" , storageDisks [ index ] , err ) )
2024-01-12 01:48:36 -08:00
storageDisks [ index ] . Close ( )
2021-11-04 16:42:49 -07:00
tmpNewFormats [ index ] = nil // this disk failed to write new format
}
2020-08-07 13:22:53 -07:00
}
2020-09-16 21:14:35 -07:00
s . erasureDisksMu . Lock ( )
2018-04-09 10:25:41 -07:00
2020-10-31 01:34:48 -07:00
for index , format := range tmpNewFormats {
if format == nil {
2020-03-27 14:48:30 -07:00
continue
}
2020-10-31 01:34:48 -07:00
m , n , err := findDiskIndexByDiskID ( refFormat , format . Erasure . This )
2020-03-27 14:48:30 -07:00
if err != nil {
2024-04-04 13:04:40 +01:00
healingLogIf ( ctx , err )
2020-03-27 14:48:30 -07:00
continue
}
2020-06-12 20:04:01 -07:00
if s . erasureDisks [ m ] [ n ] != nil {
s . erasureDisks [ m ] [ n ] . Close ( )
2020-03-27 14:48:30 -07:00
}
2021-09-14 15:10:00 -07:00
2023-12-29 15:52:41 -08:00
if disk := storageDisks [ index ] ; disk != nil {
2024-01-12 01:48:36 -08:00
if disk . IsLocal ( ) {
disk . SetDiskLoc ( s . poolIndex , m , n )
2023-12-29 15:52:41 -08:00
xldisk , ok := disk . ( * xlStorageDiskIDCheck )
if ok {
2024-01-29 23:03:58 -08:00
_ , commonDeletes := calcCommonWritesDeletes ( currentDisksInfo [ m ] , ( s . setDriveCount + 1 ) / 2 )
xldisk . totalDeletes . Store ( commonDeletes )
xldisk . storage . setDeleteAttribute ( commonDeletes )
if globalDriveMonitoring {
go xldisk . monitorDiskWritable ( xldisk . diskCtx )
2024-01-12 01:48:36 -08:00
}
2023-12-29 15:52:41 -08:00
}
2024-01-12 01:48:36 -08:00
} else {
disk . Close ( ) // Close the remote storage client, re-initialize with healthchecks.
disk , err = newStorageRESTClient ( disk . Endpoint ( ) , true , globalGrid . Load ( ) )
if err != nil {
continue
}
disk . SetDiskLoc ( s . poolIndex , m , n )
2023-12-29 15:52:41 -08:00
}
s . erasureDisks [ m ] [ n ] = disk
2024-02-12 13:00:20 -08:00
if disk . IsLocal ( ) {
2023-12-29 15:52:41 -08:00
globalLocalDrivesMu . Lock ( )
2024-02-12 13:00:20 -08:00
if globalIsDistErasure {
globalLocalSetDrives [ s . poolIndex ] [ m ] [ n ] = disk
}
for i , ldisk := range globalLocalDrives {
_ , k , l := ldisk . GetDiskLoc ( )
if k == m && l == n {
globalLocalDrives [ i ] = disk
break
}
}
2023-12-29 15:52:41 -08:00
globalLocalDrivesMu . Unlock ( )
}
2021-09-14 15:10:00 -07:00
}
2020-03-27 14:48:30 -07:00
}
2018-04-09 10:25:41 -07:00
2020-09-16 21:14:35 -07:00
s . erasureDisksMu . Unlock ( )
2018-02-15 17:45:57 -08:00
}
return res , nil
}
// HealObject - heals inconsistent object on a hashedSet based on object name.
2020-06-12 20:04:01 -07:00
func ( s * erasureSets ) HealObject ( ctx context . Context , bucket , object , versionID string , opts madmin . HealOpts ) ( madmin . HealResultItem , error ) {
return s . getHashedSet ( object ) . HealObject ( ctx , bucket , object , versionID , opts )
2018-02-15 17:45:57 -08:00
}
2021-04-04 13:32:31 -07:00
// PutObjectMetadata - replace or add metadata to an existing object/version
func ( s * erasureSets ) PutObjectMetadata ( ctx context . Context , bucket , object string , opts ObjectOptions ) ( ObjectInfo , error ) {
er := s . getHashedSet ( object )
return er . PutObjectMetadata ( ctx , bucket , object , opts )
}
2023-03-16 07:48:05 -07:00
// DecomTieredObject - moves tiered object to another pool during decommissioning.
func ( s * erasureSets ) DecomTieredObject ( ctx context . Context , bucket , object string , fi FileInfo , opts ObjectOptions ) error {
er := s . getHashedSet ( object )
return er . DecomTieredObject ( ctx , bucket , object , fi , opts )
}
2020-05-23 11:09:35 -07:00
// PutObjectTags - replace or add tags to an existing object
2021-02-01 22:52:51 +01:00
func ( s * erasureSets ) PutObjectTags ( ctx context . Context , bucket , object string , tags string , opts ObjectOptions ) ( ObjectInfo , error ) {
2021-01-26 22:21:51 +01:00
er := s . getHashedSet ( object )
return er . PutObjectTags ( ctx , bucket , object , tags , opts )
2020-01-20 22:15:59 +05:30
}
2020-05-23 11:09:35 -07:00
// DeleteObjectTags - delete object tags from an existing object
2021-02-01 22:52:51 +01:00
func ( s * erasureSets ) DeleteObjectTags ( ctx context . Context , bucket , object string , opts ObjectOptions ) ( ObjectInfo , error ) {
2021-01-26 22:21:51 +01:00
er := s . getHashedSet ( object )
return er . DeleteObjectTags ( ctx , bucket , object , opts )
2020-01-20 22:15:59 +05:30
}
2020-05-23 11:09:35 -07:00
// GetObjectTags - get object tags from an existing object
2020-06-12 20:04:01 -07:00
func ( s * erasureSets ) GetObjectTags ( ctx context . Context , bucket , object string , opts ObjectOptions ) ( * tags . Tags , error ) {
2021-01-26 22:21:51 +01:00
er := s . getHashedSet ( object )
return er . GetObjectTags ( ctx , bucket , object , opts )
2020-01-20 22:15:59 +05:30
}
2021-04-19 10:30:42 -07:00
// TransitionObject - transition object content to target tier.
func ( s * erasureSets ) TransitionObject ( ctx context . Context , bucket , object string , opts ObjectOptions ) error {
return s . getHashedSet ( object ) . TransitionObject ( ctx , bucket , object , opts )
}
// RestoreTransitionedObject - restore transitioned object content locally on this cluster.
func ( s * erasureSets ) RestoreTransitionedObject ( ctx context . Context , bucket , object string , opts ObjectOptions ) error {
return s . getHashedSet ( object ) . RestoreTransitionedObject ( ctx , bucket , object , opts )
}
2022-11-28 19:20:55 +01:00
// CheckAbandonedParts - check object for abandoned parts.
func ( s * erasureSets ) CheckAbandonedParts ( ctx context . Context , bucket , object string , opts madmin . HealOpts ) error {
return s . getHashedSet ( object ) . checkAbandonedParts ( ctx , bucket , object , opts )
}