mirror of
https://github.com/minio/minio.git
synced 2024-12-24 06:05:55 -05:00
rename server sets to server pools
This commit is contained in:
parent
e6ea5c2703
commit
4ec45753e6
@ -105,8 +105,8 @@ func initTestErasureObjLayer(ctx context.Context) (ObjectLayer, []string, error)
|
|||||||
}
|
}
|
||||||
|
|
||||||
globalPolicySys = NewPolicySys()
|
globalPolicySys = NewPolicySys()
|
||||||
objLayer := &erasureServerSets{serverSets: make([]*erasureSets, 1)}
|
objLayer := &erasureServerPools{serverPools: make([]*erasureSets, 1)}
|
||||||
objLayer.serverSets[0], err = newErasureSets(ctx, endpoints, storageDisks, format)
|
objLayer.serverPools[0], err = newErasureSets(ctx, endpoints, storageDisks, format)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
@ -24,13 +24,13 @@ import (
|
|||||||
|
|
||||||
// getLocalServerProperty - returns madmin.ServerProperties for only the
|
// getLocalServerProperty - returns madmin.ServerProperties for only the
|
||||||
// local endpoints from given list of endpoints
|
// local endpoints from given list of endpoints
|
||||||
func getLocalServerProperty(endpointServerSets EndpointServerSets, r *http.Request) madmin.ServerProperties {
|
func getLocalServerProperty(endpointServerPools EndpointServerPools, r *http.Request) madmin.ServerProperties {
|
||||||
addr := r.Host
|
addr := r.Host
|
||||||
if globalIsDistErasure {
|
if globalIsDistErasure {
|
||||||
addr = GetLocalPeer(endpointServerSets)
|
addr = GetLocalPeer(endpointServerPools)
|
||||||
}
|
}
|
||||||
network := make(map[string]string)
|
network := make(map[string]string)
|
||||||
for _, ep := range endpointServerSets {
|
for _, ep := range endpointServerPools {
|
||||||
for _, endpoint := range ep.Endpoints {
|
for _, endpoint := range ep.Endpoints {
|
||||||
nodeName := endpoint.Host
|
nodeName := endpoint.Host
|
||||||
if nodeName == "" {
|
if nodeName == "" {
|
||||||
|
@ -39,7 +39,7 @@ type healingTracker struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func initAutoHeal(ctx context.Context, objAPI ObjectLayer) {
|
func initAutoHeal(ctx context.Context, objAPI ObjectLayer) {
|
||||||
z, ok := objAPI.(*erasureServerSets)
|
z, ok := objAPI.(*erasureServerPools)
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -116,7 +116,7 @@ func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) {
|
|||||||
// monitorLocalDisksAndHeal - ensures that detected new disks are healed
|
// monitorLocalDisksAndHeal - ensures that detected new disks are healed
|
||||||
// 1. Only the concerned erasure set will be listed and healed
|
// 1. Only the concerned erasure set will be listed and healed
|
||||||
// 2. Only the node hosting the disk is responsible to perform the heal
|
// 2. Only the node hosting the disk is responsible to perform the heal
|
||||||
func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerSets, bgSeq *healSequence) {
|
func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerPools, bgSeq *healSequence) {
|
||||||
// Perform automatic disk healing when a disk is replaced locally.
|
// Perform automatic disk healing when a disk is replaced locally.
|
||||||
wait:
|
wait:
|
||||||
for {
|
for {
|
||||||
@ -139,8 +139,8 @@ wait:
|
|||||||
logger.Info(fmt.Sprintf("Found drives to heal %d, proceeding to heal content...",
|
logger.Info(fmt.Sprintf("Found drives to heal %d, proceeding to heal content...",
|
||||||
len(healDisks)))
|
len(healDisks)))
|
||||||
|
|
||||||
erasureSetInZoneDisksToHeal = make([]map[int][]StorageAPI, len(z.serverSets))
|
erasureSetInZoneDisksToHeal = make([]map[int][]StorageAPI, len(z.serverPools))
|
||||||
for i := range z.serverSets {
|
for i := range z.serverPools {
|
||||||
erasureSetInZoneDisksToHeal[i] = map[int][]StorageAPI{}
|
erasureSetInZoneDisksToHeal[i] = map[int][]StorageAPI{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -159,10 +159,10 @@ wait:
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Calculate the set index where the current endpoint belongs
|
// Calculate the set index where the current endpoint belongs
|
||||||
z.serverSets[zoneIdx].erasureDisksMu.RLock()
|
z.serverPools[zoneIdx].erasureDisksMu.RLock()
|
||||||
// Protect reading reference format.
|
// Protect reading reference format.
|
||||||
setIndex, _, err := findDiskIndex(z.serverSets[zoneIdx].format, format)
|
setIndex, _, err := findDiskIndex(z.serverPools[zoneIdx].format, format)
|
||||||
z.serverSets[zoneIdx].erasureDisksMu.RUnlock()
|
z.serverPools[zoneIdx].erasureDisksMu.RUnlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
printEndpointError(endpoint, err, false)
|
printEndpointError(endpoint, err, false)
|
||||||
continue
|
continue
|
||||||
@ -197,7 +197,7 @@ wait:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
lbDisks := z.serverSets[i].sets[setIndex].getOnlineDisks()
|
lbDisks := z.serverPools[i].sets[setIndex].getOnlineDisks()
|
||||||
if err := healErasureSet(ctx, setIndex, buckets, lbDisks); err != nil {
|
if err := healErasureSet(ctx, setIndex, buckets, lbDisks); err != nil {
|
||||||
logger.LogIf(ctx, err)
|
logger.LogIf(ctx, err)
|
||||||
continue
|
continue
|
||||||
|
@ -52,7 +52,7 @@ type bootstrapRESTServer struct{}
|
|||||||
type ServerSystemConfig struct {
|
type ServerSystemConfig struct {
|
||||||
MinioPlatform string
|
MinioPlatform string
|
||||||
MinioRuntime string
|
MinioRuntime string
|
||||||
MinioEndpoints EndpointServerSets
|
MinioEndpoints EndpointServerPools
|
||||||
}
|
}
|
||||||
|
|
||||||
// Diff - returns error on first difference found in two configs.
|
// Diff - returns error on first difference found in two configs.
|
||||||
@ -159,9 +159,9 @@ func (client *bootstrapRESTClient) Verify(ctx context.Context, srcCfg ServerSyst
|
|||||||
return srcCfg.Diff(recvCfg)
|
return srcCfg.Diff(recvCfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func verifyServerSystemConfig(ctx context.Context, endpointServerSets EndpointServerSets) error {
|
func verifyServerSystemConfig(ctx context.Context, endpointServerPools EndpointServerPools) error {
|
||||||
srcCfg := getServerSystemCfg()
|
srcCfg := getServerSystemCfg()
|
||||||
clnts := newBootstrapRESTClients(endpointServerSets)
|
clnts := newBootstrapRESTClients(endpointServerPools)
|
||||||
var onlineServers int
|
var onlineServers int
|
||||||
var offlineEndpoints []string
|
var offlineEndpoints []string
|
||||||
var retries int
|
var retries int
|
||||||
@ -196,10 +196,10 @@ func verifyServerSystemConfig(ctx context.Context, endpointServerSets EndpointSe
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newBootstrapRESTClients(endpointServerSets EndpointServerSets) []*bootstrapRESTClient {
|
func newBootstrapRESTClients(endpointServerPools EndpointServerPools) []*bootstrapRESTClient {
|
||||||
seenHosts := set.NewStringSet()
|
seenHosts := set.NewStringSet()
|
||||||
var clnts []*bootstrapRESTClient
|
var clnts []*bootstrapRESTClient
|
||||||
for _, ep := range endpointServerSets {
|
for _, ep := range endpointServerPools {
|
||||||
for _, endpoint := range ep.Endpoints {
|
for _, endpoint := range ep.Endpoints {
|
||||||
if seenHosts.Contains(endpoint.Host) {
|
if seenHosts.Contains(endpoint.Host) {
|
||||||
continue
|
continue
|
||||||
|
@ -40,8 +40,8 @@ type HTTPConsoleLoggerSys struct {
|
|||||||
logBuf *ring.Ring
|
logBuf *ring.Ring
|
||||||
}
|
}
|
||||||
|
|
||||||
func mustGetNodeName(endpointServerSets EndpointServerSets) (nodeName string) {
|
func mustGetNodeName(endpointServerPools EndpointServerPools) (nodeName string) {
|
||||||
host, err := xnet.ParseHost(GetLocalPeer(endpointServerSets))
|
host, err := xnet.ParseHost(GetLocalPeer(endpointServerPools))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.FatalIf(err, "Unable to start console logging subsystem")
|
logger.FatalIf(err, "Unable to start console logging subsystem")
|
||||||
}
|
}
|
||||||
@ -63,8 +63,8 @@ func NewConsoleLogger(ctx context.Context) *HTTPConsoleLoggerSys {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SetNodeName - sets the node name if any after distributed setup has initialized
|
// SetNodeName - sets the node name if any after distributed setup has initialized
|
||||||
func (sys *HTTPConsoleLoggerSys) SetNodeName(endpointServerSets EndpointServerSets) {
|
func (sys *HTTPConsoleLoggerSys) SetNodeName(endpointServerPools EndpointServerPools) {
|
||||||
sys.nodeName = mustGetNodeName(endpointServerSets)
|
sys.nodeName = mustGetNodeName(endpointServerPools)
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasLogListeners returns true if console log listeners are registered
|
// HasLogListeners returns true if console log listeners are registered
|
||||||
|
@ -192,7 +192,7 @@ func crawlDataFolder(ctx context.Context, basePath string, cache dataUsageCache,
|
|||||||
|
|
||||||
// Add disks for set healing.
|
// Add disks for set healing.
|
||||||
if len(cache.Disks) > 0 {
|
if len(cache.Disks) > 0 {
|
||||||
objAPI, ok := newObjectLayerFn().(*erasureServerSets)
|
objAPI, ok := newObjectLayerFn().(*erasureServerPools)
|
||||||
if ok {
|
if ok {
|
||||||
s.disks = objAPI.GetDisksID(cache.Disks...)
|
s.disks = objAPI.GetDisksID(cache.Disks...)
|
||||||
if len(s.disks) != len(cache.Disks) {
|
if len(s.disks) != len(cache.Disks) {
|
||||||
@ -471,7 +471,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
objAPI, ok := newObjectLayerFn().(*erasureServerSets)
|
objAPI, ok := newObjectLayerFn().(*erasureServerPools)
|
||||||
if !ok || len(f.disks) == 0 {
|
if !ok || len(f.disks) == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -329,7 +329,7 @@ var (
|
|||||||
// CreateServerEndpoints - validates and creates new endpoints from input args, supports
|
// CreateServerEndpoints - validates and creates new endpoints from input args, supports
|
||||||
// both ellipses and without ellipses transparently.
|
// both ellipses and without ellipses transparently.
|
||||||
func createServerEndpoints(serverAddr string, args ...string) (
|
func createServerEndpoints(serverAddr string, args ...string) (
|
||||||
endpointServerSets EndpointServerSets, setupType SetupType, err error) {
|
endpointServerPools EndpointServerPools, setupType SetupType, err error) {
|
||||||
|
|
||||||
if len(args) == 0 {
|
if len(args) == 0 {
|
||||||
return nil, -1, errInvalidArgument
|
return nil, -1, errInvalidArgument
|
||||||
@ -352,13 +352,13 @@ func createServerEndpoints(serverAddr string, args ...string) (
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, -1, err
|
return nil, -1, err
|
||||||
}
|
}
|
||||||
endpointServerSets = append(endpointServerSets, ZoneEndpoints{
|
endpointServerPools = append(endpointServerPools, ZoneEndpoints{
|
||||||
SetCount: len(setArgs),
|
SetCount: len(setArgs),
|
||||||
DrivesPerSet: len(setArgs[0]),
|
DrivesPerSet: len(setArgs[0]),
|
||||||
Endpoints: endpointList,
|
Endpoints: endpointList,
|
||||||
})
|
})
|
||||||
setupType = newSetupType
|
setupType = newSetupType
|
||||||
return endpointServerSets, setupType, nil
|
return endpointServerPools, setupType, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var foundPrevLocal bool
|
var foundPrevLocal bool
|
||||||
@ -372,9 +372,9 @@ func createServerEndpoints(serverAddr string, args ...string) (
|
|||||||
return nil, -1, err
|
return nil, -1, err
|
||||||
}
|
}
|
||||||
if setDriveCount != 0 && setDriveCount != len(setArgs[0]) {
|
if setDriveCount != 0 && setDriveCount != len(setArgs[0]) {
|
||||||
return nil, -1, fmt.Errorf("All serverSets should have same drive per set ratio - expected %d, got %d", setDriveCount, len(setArgs[0]))
|
return nil, -1, fmt.Errorf("All serverPools should have same drive per set ratio - expected %d, got %d", setDriveCount, len(setArgs[0]))
|
||||||
}
|
}
|
||||||
if err = endpointServerSets.Add(ZoneEndpoints{
|
if err = endpointServerPools.Add(ZoneEndpoints{
|
||||||
SetCount: len(setArgs),
|
SetCount: len(setArgs),
|
||||||
DrivesPerSet: len(setArgs[0]),
|
DrivesPerSet: len(setArgs[0]),
|
||||||
Endpoints: endpointList,
|
Endpoints: endpointList,
|
||||||
@ -393,5 +393,5 @@ func createServerEndpoints(serverAddr string, args ...string) (
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return endpointServerSets, setupType, nil
|
return endpointServerPools, setupType, nil
|
||||||
}
|
}
|
||||||
|
@ -206,12 +206,12 @@ type ZoneEndpoints struct {
|
|||||||
Endpoints Endpoints
|
Endpoints Endpoints
|
||||||
}
|
}
|
||||||
|
|
||||||
// EndpointServerSets - list of list of endpoints
|
// EndpointServerPools - list of list of endpoints
|
||||||
type EndpointServerSets []ZoneEndpoints
|
type EndpointServerPools []ZoneEndpoints
|
||||||
|
|
||||||
// GetLocalZoneIdx returns the zone which endpoint belongs to locally.
|
// GetLocalZoneIdx returns the zone which endpoint belongs to locally.
|
||||||
// if ep is remote this code will return -1 zoneIndex
|
// if ep is remote this code will return -1 zoneIndex
|
||||||
func (l EndpointServerSets) GetLocalZoneIdx(ep Endpoint) int {
|
func (l EndpointServerPools) GetLocalZoneIdx(ep Endpoint) int {
|
||||||
for i, zep := range l {
|
for i, zep := range l {
|
||||||
for _, cep := range zep.Endpoints {
|
for _, cep := range zep.Endpoints {
|
||||||
if cep.IsLocal && ep.IsLocal {
|
if cep.IsLocal && ep.IsLocal {
|
||||||
@ -225,14 +225,14 @@ func (l EndpointServerSets) GetLocalZoneIdx(ep Endpoint) int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Add add zone endpoints
|
// Add add zone endpoints
|
||||||
func (l *EndpointServerSets) Add(zeps ZoneEndpoints) error {
|
func (l *EndpointServerPools) Add(zeps ZoneEndpoints) error {
|
||||||
existSet := set.NewStringSet()
|
existSet := set.NewStringSet()
|
||||||
for _, zep := range *l {
|
for _, zep := range *l {
|
||||||
for _, ep := range zep.Endpoints {
|
for _, ep := range zep.Endpoints {
|
||||||
existSet.Add(ep.String())
|
existSet.Add(ep.String())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Validate if there are duplicate endpoints across serverSets
|
// Validate if there are duplicate endpoints across serverPools
|
||||||
for _, ep := range zeps.Endpoints {
|
for _, ep := range zeps.Endpoints {
|
||||||
if existSet.Contains(ep.String()) {
|
if existSet.Contains(ep.String()) {
|
||||||
return fmt.Errorf("duplicate endpoints found")
|
return fmt.Errorf("duplicate endpoints found")
|
||||||
@ -243,17 +243,17 @@ func (l *EndpointServerSets) Add(zeps ZoneEndpoints) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// FirstLocal returns true if the first endpoint is local.
|
// FirstLocal returns true if the first endpoint is local.
|
||||||
func (l EndpointServerSets) FirstLocal() bool {
|
func (l EndpointServerPools) FirstLocal() bool {
|
||||||
return l[0].Endpoints[0].IsLocal
|
return l[0].Endpoints[0].IsLocal
|
||||||
}
|
}
|
||||||
|
|
||||||
// HTTPS - returns true if secure for URLEndpointType.
|
// HTTPS - returns true if secure for URLEndpointType.
|
||||||
func (l EndpointServerSets) HTTPS() bool {
|
func (l EndpointServerPools) HTTPS() bool {
|
||||||
return l[0].Endpoints.HTTPS()
|
return l[0].Endpoints.HTTPS()
|
||||||
}
|
}
|
||||||
|
|
||||||
// NEndpoints - returns all nodes count
|
// NEndpoints - returns all nodes count
|
||||||
func (l EndpointServerSets) NEndpoints() (count int) {
|
func (l EndpointServerPools) NEndpoints() (count int) {
|
||||||
for _, ep := range l {
|
for _, ep := range l {
|
||||||
count += len(ep.Endpoints)
|
count += len(ep.Endpoints)
|
||||||
}
|
}
|
||||||
@ -261,7 +261,7 @@ func (l EndpointServerSets) NEndpoints() (count int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Hostnames - returns list of unique hostnames
|
// Hostnames - returns list of unique hostnames
|
||||||
func (l EndpointServerSets) Hostnames() []string {
|
func (l EndpointServerPools) Hostnames() []string {
|
||||||
foundSet := set.NewStringSet()
|
foundSet := set.NewStringSet()
|
||||||
for _, ep := range l {
|
for _, ep := range l {
|
||||||
for _, endpoint := range ep.Endpoints {
|
for _, endpoint := range ep.Endpoints {
|
||||||
@ -277,7 +277,7 @@ func (l EndpointServerSets) Hostnames() []string {
|
|||||||
// hostsSorted will return all hosts found.
|
// hostsSorted will return all hosts found.
|
||||||
// The LOCAL host will be nil, but the indexes of all hosts should
|
// The LOCAL host will be nil, but the indexes of all hosts should
|
||||||
// remain consistent across the cluster.
|
// remain consistent across the cluster.
|
||||||
func (l EndpointServerSets) hostsSorted() []*xnet.Host {
|
func (l EndpointServerPools) hostsSorted() []*xnet.Host {
|
||||||
peers, localPeer := l.peers()
|
peers, localPeer := l.peers()
|
||||||
sort.Strings(peers)
|
sort.Strings(peers)
|
||||||
hosts := make([]*xnet.Host, len(peers))
|
hosts := make([]*xnet.Host, len(peers))
|
||||||
@ -298,7 +298,7 @@ func (l EndpointServerSets) hostsSorted() []*xnet.Host {
|
|||||||
|
|
||||||
// peers will return all peers, including local.
|
// peers will return all peers, including local.
|
||||||
// The local peer is returned as a separate string.
|
// The local peer is returned as a separate string.
|
||||||
func (l EndpointServerSets) peers() (peers []string, local string) {
|
func (l EndpointServerPools) peers() (peers []string, local string) {
|
||||||
allSet := set.NewStringSet()
|
allSet := set.NewStringSet()
|
||||||
for _, ep := range l {
|
for _, ep := range l {
|
||||||
for _, endpoint := range ep.Endpoints {
|
for _, endpoint := range ep.Endpoints {
|
||||||
@ -739,9 +739,9 @@ func CreateEndpoints(serverAddr string, foundLocal bool, args ...[]string) (Endp
|
|||||||
// the first element from the set of peers which indicate that
|
// the first element from the set of peers which indicate that
|
||||||
// they are local. There is always one entry that is local
|
// they are local. There is always one entry that is local
|
||||||
// even with repeated server endpoints.
|
// even with repeated server endpoints.
|
||||||
func GetLocalPeer(endpointServerSets EndpointServerSets) (localPeer string) {
|
func GetLocalPeer(endpointServerPools EndpointServerPools) (localPeer string) {
|
||||||
peerSet := set.NewStringSet()
|
peerSet := set.NewStringSet()
|
||||||
for _, ep := range endpointServerSets {
|
for _, ep := range endpointServerPools {
|
||||||
for _, endpoint := range ep.Endpoints {
|
for _, endpoint := range ep.Endpoints {
|
||||||
if endpoint.Type() != URLEndpointType {
|
if endpoint.Type() != URLEndpointType {
|
||||||
continue
|
continue
|
||||||
@ -840,12 +840,12 @@ func getOnlineProxyEndpointIdx() int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetProxyEndpoints - get all endpoints that can be used to proxy list request.
|
// GetProxyEndpoints - get all endpoints that can be used to proxy list request.
|
||||||
func GetProxyEndpoints(endpointServerSets EndpointServerSets) []ProxyEndpoint {
|
func GetProxyEndpoints(endpointServerPools EndpointServerPools) []ProxyEndpoint {
|
||||||
var proxyEps []ProxyEndpoint
|
var proxyEps []ProxyEndpoint
|
||||||
|
|
||||||
proxyEpSet := set.NewStringSet()
|
proxyEpSet := set.NewStringSet()
|
||||||
|
|
||||||
for _, ep := range endpointServerSets {
|
for _, ep := range endpointServerPools {
|
||||||
for _, endpoint := range ep.Endpoints {
|
for _, endpoint := range ep.Endpoints {
|
||||||
if endpoint.Type() != URLEndpointType {
|
if endpoint.Type() != URLEndpointType {
|
||||||
continue
|
continue
|
||||||
|
@ -55,8 +55,8 @@ func TestErasureParentDirIsObject(t *testing.T) {
|
|||||||
t.Fatalf("Unexpected object name returned got %s, expected %s", objInfo.Name, objectName)
|
t.Fatalf("Unexpected object name returned got %s, expected %s", objInfo.Name, objectName)
|
||||||
}
|
}
|
||||||
|
|
||||||
z := obj.(*erasureServerSets)
|
z := obj.(*erasureServerPools)
|
||||||
xl := z.serverSets[0].sets[0]
|
xl := z.serverPools[0].sets[0]
|
||||||
testCases := []struct {
|
testCases := []struct {
|
||||||
parentIsObject bool
|
parentIsObject bool
|
||||||
objectName string
|
objectName string
|
||||||
|
@ -178,8 +178,8 @@ func TestListOnlineDisks(t *testing.T) {
|
|||||||
|
|
||||||
object := "object"
|
object := "object"
|
||||||
data := bytes.Repeat([]byte("a"), 1024)
|
data := bytes.Repeat([]byte("a"), 1024)
|
||||||
z := obj.(*erasureServerSets)
|
z := obj.(*erasureServerPools)
|
||||||
erasureDisks := z.serverSets[0].sets[0].getDisks()
|
erasureDisks := z.serverPools[0].sets[0].getDisks()
|
||||||
for i, test := range testCases {
|
for i, test := range testCases {
|
||||||
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{})
|
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -274,8 +274,8 @@ func TestDisksWithAllParts(t *testing.T) {
|
|||||||
// make data with more than one part
|
// make data with more than one part
|
||||||
partCount := 3
|
partCount := 3
|
||||||
data := bytes.Repeat([]byte("a"), 6*1024*1024*partCount)
|
data := bytes.Repeat([]byte("a"), 6*1024*1024*partCount)
|
||||||
z := obj.(*erasureServerSets)
|
z := obj.(*erasureServerPools)
|
||||||
s := z.serverSets[0].sets[0]
|
s := z.serverPools[0].sets[0]
|
||||||
erasureDisks := s.getDisks()
|
erasureDisks := s.getDisks()
|
||||||
err = obj.MakeBucketWithLocation(ctx, "bucket", BucketOptions{})
|
err = obj.MakeBucketWithLocation(ctx, "bucket", BucketOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -42,8 +42,8 @@ func TestHealing(t *testing.T) {
|
|||||||
defer obj.Shutdown(context.Background())
|
defer obj.Shutdown(context.Background())
|
||||||
defer removeRoots(fsDirs)
|
defer removeRoots(fsDirs)
|
||||||
|
|
||||||
z := obj.(*erasureServerSets)
|
z := obj.(*erasureServerPools)
|
||||||
er := z.serverSets[0].sets[0]
|
er := z.serverPools[0].sets[0]
|
||||||
|
|
||||||
// Create "bucket"
|
// Create "bucket"
|
||||||
err = obj.MakeBucketWithLocation(ctx, "bucket", BucketOptions{})
|
err = obj.MakeBucketWithLocation(ctx, "bucket", BucketOptions{})
|
||||||
@ -197,8 +197,8 @@ func TestHealObjectCorrupted(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Test 1: Remove the object backend files from the first disk.
|
// Test 1: Remove the object backend files from the first disk.
|
||||||
z := objLayer.(*erasureServerSets)
|
z := objLayer.(*erasureServerPools)
|
||||||
er := z.serverSets[0].sets[0]
|
er := z.serverPools[0].sets[0]
|
||||||
erasureDisks := er.getDisks()
|
erasureDisks := er.getDisks()
|
||||||
firstDisk := erasureDisks[0]
|
firstDisk := erasureDisks[0]
|
||||||
err = firstDisk.Delete(context.Background(), bucket, pathJoin(object, xlStorageFormatFile), false)
|
err = firstDisk.Delete(context.Background(), bucket, pathJoin(object, xlStorageFormatFile), false)
|
||||||
@ -342,8 +342,8 @@ func TestHealObjectErasure(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Remove the object backend files from the first disk.
|
// Remove the object backend files from the first disk.
|
||||||
z := obj.(*erasureServerSets)
|
z := obj.(*erasureServerPools)
|
||||||
er := z.serverSets[0].sets[0]
|
er := z.serverPools[0].sets[0]
|
||||||
firstDisk := er.getDisks()[0]
|
firstDisk := er.getDisks()[0]
|
||||||
|
|
||||||
_, err = obj.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, ObjectOptions{})
|
_, err = obj.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, ObjectOptions{})
|
||||||
@ -366,7 +366,7 @@ func TestHealObjectErasure(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
erasureDisks := er.getDisks()
|
erasureDisks := er.getDisks()
|
||||||
z.serverSets[0].erasureDisksMu.Lock()
|
z.serverPools[0].erasureDisksMu.Lock()
|
||||||
er.getDisks = func() []StorageAPI {
|
er.getDisks = func() []StorageAPI {
|
||||||
// Nil more than half the disks, to remove write quorum.
|
// Nil more than half the disks, to remove write quorum.
|
||||||
for i := 0; i <= len(erasureDisks)/2; i++ {
|
for i := 0; i <= len(erasureDisks)/2; i++ {
|
||||||
@ -374,7 +374,7 @@ func TestHealObjectErasure(t *testing.T) {
|
|||||||
}
|
}
|
||||||
return erasureDisks
|
return erasureDisks
|
||||||
}
|
}
|
||||||
z.serverSets[0].erasureDisksMu.Unlock()
|
z.serverPools[0].erasureDisksMu.Unlock()
|
||||||
|
|
||||||
// Try healing now, expect to receive errDiskNotFound.
|
// Try healing now, expect to receive errDiskNotFound.
|
||||||
_, err = obj.HealObject(ctx, bucket, object, "", madmin.HealOpts{ScanMode: madmin.HealDeepScan})
|
_, err = obj.HealObject(ctx, bucket, object, "", madmin.HealOpts{ScanMode: madmin.HealDeepScan})
|
||||||
@ -419,8 +419,8 @@ func TestHealEmptyDirectoryErasure(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Remove the object backend files from the first disk.
|
// Remove the object backend files from the first disk.
|
||||||
z := obj.(*erasureServerSets)
|
z := obj.(*erasureServerPools)
|
||||||
er := z.serverSets[0].sets[0]
|
er := z.serverPools[0].sets[0]
|
||||||
firstDisk := er.getDisks()[0]
|
firstDisk := er.getDisks()[0]
|
||||||
err = firstDisk.DeleteVol(context.Background(), pathJoin(bucket, encodeDirObject(object)), true)
|
err = firstDisk.DeleteVol(context.Background(), pathJoin(bucket, encodeDirObject(object)), true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -148,13 +148,13 @@ func TestShuffleDisks(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
defer removeRoots(disks)
|
defer removeRoots(disks)
|
||||||
z := objLayer.(*erasureServerSets)
|
z := objLayer.(*erasureServerPools)
|
||||||
testShuffleDisks(t, z)
|
testShuffleDisks(t, z)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test shuffleDisks which returns shuffled slice of disks for their actual distribution.
|
// Test shuffleDisks which returns shuffled slice of disks for their actual distribution.
|
||||||
func testShuffleDisks(t *testing.T, z *erasureServerSets) {
|
func testShuffleDisks(t *testing.T, z *erasureServerPools) {
|
||||||
disks := z.serverSets[0].GetDisks(0)()
|
disks := z.serverPools[0].GetDisks(0)()
|
||||||
distribution := []int{16, 14, 12, 10, 8, 6, 4, 2, 1, 3, 5, 7, 9, 11, 13, 15}
|
distribution := []int{16, 14, 12, 10, 8, 6, 4, 2, 1, 3, 5, 7, 9, 11, 13, 15}
|
||||||
shuffledDisks := shuffleDisks(disks, distribution)
|
shuffledDisks := shuffleDisks(disks, distribution)
|
||||||
// From the "distribution" above you can notice that:
|
// From the "distribution" above you can notice that:
|
||||||
@ -196,6 +196,6 @@ func TestEvalDisks(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
defer removeRoots(disks)
|
defer removeRoots(disks)
|
||||||
z := objLayer.(*erasureServerSets)
|
z := objLayer.(*erasureServerPools)
|
||||||
testShuffleDisks(t, z)
|
testShuffleDisks(t, z)
|
||||||
}
|
}
|
||||||
|
@ -133,8 +133,8 @@ func TestErasureDeleteObjectsErasureSet(t *testing.T) {
|
|||||||
for _, dir := range fsDirs {
|
for _, dir := range fsDirs {
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
}
|
}
|
||||||
z := obj.(*erasureServerSets)
|
z := obj.(*erasureServerPools)
|
||||||
xl := z.serverSets[0].sets[0]
|
xl := z.serverPools[0].sets[0]
|
||||||
objs = append(objs, xl)
|
objs = append(objs, xl)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -206,8 +206,8 @@ func TestErasureDeleteObjectDiskNotFound(t *testing.T) {
|
|||||||
defer obj.Shutdown(context.Background())
|
defer obj.Shutdown(context.Background())
|
||||||
defer removeRoots(fsDirs)
|
defer removeRoots(fsDirs)
|
||||||
|
|
||||||
z := obj.(*erasureServerSets)
|
z := obj.(*erasureServerPools)
|
||||||
xl := z.serverSets[0].sets[0]
|
xl := z.serverPools[0].sets[0]
|
||||||
|
|
||||||
// Create "bucket"
|
// Create "bucket"
|
||||||
err = obj.MakeBucketWithLocation(ctx, "bucket", BucketOptions{})
|
err = obj.MakeBucketWithLocation(ctx, "bucket", BucketOptions{})
|
||||||
@ -226,7 +226,7 @@ func TestErasureDeleteObjectDiskNotFound(t *testing.T) {
|
|||||||
// for a 16 disk setup, quorum is 9. To simulate disks not found yet
|
// for a 16 disk setup, quorum is 9. To simulate disks not found yet
|
||||||
// quorum is available, we remove disks leaving quorum disks behind.
|
// quorum is available, we remove disks leaving quorum disks behind.
|
||||||
erasureDisks := xl.getDisks()
|
erasureDisks := xl.getDisks()
|
||||||
z.serverSets[0].erasureDisksMu.Lock()
|
z.serverPools[0].erasureDisksMu.Lock()
|
||||||
xl.getDisks = func() []StorageAPI {
|
xl.getDisks = func() []StorageAPI {
|
||||||
for i := range erasureDisks[:7] {
|
for i := range erasureDisks[:7] {
|
||||||
erasureDisks[i] = newNaughtyDisk(erasureDisks[i], nil, errFaultyDisk)
|
erasureDisks[i] = newNaughtyDisk(erasureDisks[i], nil, errFaultyDisk)
|
||||||
@ -234,7 +234,7 @@ func TestErasureDeleteObjectDiskNotFound(t *testing.T) {
|
|||||||
return erasureDisks
|
return erasureDisks
|
||||||
}
|
}
|
||||||
|
|
||||||
z.serverSets[0].erasureDisksMu.Unlock()
|
z.serverPools[0].erasureDisksMu.Unlock()
|
||||||
_, err = obj.DeleteObject(ctx, bucket, object, ObjectOptions{})
|
_, err = obj.DeleteObject(ctx, bucket, object, ObjectOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -248,14 +248,14 @@ func TestErasureDeleteObjectDiskNotFound(t *testing.T) {
|
|||||||
|
|
||||||
// Remove one more disk to 'lose' quorum, by setting it to nil.
|
// Remove one more disk to 'lose' quorum, by setting it to nil.
|
||||||
erasureDisks = xl.getDisks()
|
erasureDisks = xl.getDisks()
|
||||||
z.serverSets[0].erasureDisksMu.Lock()
|
z.serverPools[0].erasureDisksMu.Lock()
|
||||||
xl.getDisks = func() []StorageAPI {
|
xl.getDisks = func() []StorageAPI {
|
||||||
erasureDisks[7] = nil
|
erasureDisks[7] = nil
|
||||||
erasureDisks[8] = nil
|
erasureDisks[8] = nil
|
||||||
return erasureDisks
|
return erasureDisks
|
||||||
}
|
}
|
||||||
|
|
||||||
z.serverSets[0].erasureDisksMu.Unlock()
|
z.serverPools[0].erasureDisksMu.Unlock()
|
||||||
_, err = obj.DeleteObject(ctx, bucket, object, ObjectOptions{})
|
_, err = obj.DeleteObject(ctx, bucket, object, ObjectOptions{})
|
||||||
// since majority of disks are not available, metaquorum is not achieved and hence errErasureWriteQuorum error
|
// since majority of disks are not available, metaquorum is not achieved and hence errErasureWriteQuorum error
|
||||||
if !errors.Is(err, errErasureWriteQuorum) {
|
if !errors.Is(err, errErasureWriteQuorum) {
|
||||||
@ -276,8 +276,8 @@ func TestGetObjectNoQuorum(t *testing.T) {
|
|||||||
defer obj.Shutdown(context.Background())
|
defer obj.Shutdown(context.Background())
|
||||||
defer removeRoots(fsDirs)
|
defer removeRoots(fsDirs)
|
||||||
|
|
||||||
z := obj.(*erasureServerSets)
|
z := obj.(*erasureServerPools)
|
||||||
xl := z.serverSets[0].sets[0]
|
xl := z.serverPools[0].sets[0]
|
||||||
|
|
||||||
// Create "bucket"
|
// Create "bucket"
|
||||||
err = obj.MakeBucketWithLocation(ctx, "bucket", BucketOptions{})
|
err = obj.MakeBucketWithLocation(ctx, "bucket", BucketOptions{})
|
||||||
@ -312,11 +312,11 @@ func TestGetObjectNoQuorum(t *testing.T) {
|
|||||||
erasureDisks[i] = newNaughtyDisk(erasureDisks[i], diskErrors, errFaultyDisk)
|
erasureDisks[i] = newNaughtyDisk(erasureDisks[i], diskErrors, errFaultyDisk)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
z.serverSets[0].erasureDisksMu.Lock()
|
z.serverPools[0].erasureDisksMu.Lock()
|
||||||
xl.getDisks = func() []StorageAPI {
|
xl.getDisks = func() []StorageAPI {
|
||||||
return erasureDisks
|
return erasureDisks
|
||||||
}
|
}
|
||||||
z.serverSets[0].erasureDisksMu.Unlock()
|
z.serverPools[0].erasureDisksMu.Unlock()
|
||||||
// Fetch object from store.
|
// Fetch object from store.
|
||||||
err = xl.GetObject(ctx, bucket, object, 0, int64(len("abcd")), ioutil.Discard, "", opts)
|
err = xl.GetObject(ctx, bucket, object, 0, int64(len("abcd")), ioutil.Discard, "", opts)
|
||||||
if err != toObjectErr(errErasureReadQuorum, bucket, object) {
|
if err != toObjectErr(errErasureReadQuorum, bucket, object) {
|
||||||
@ -339,8 +339,8 @@ func TestPutObjectNoQuorum(t *testing.T) {
|
|||||||
defer obj.Shutdown(context.Background())
|
defer obj.Shutdown(context.Background())
|
||||||
defer removeRoots(fsDirs)
|
defer removeRoots(fsDirs)
|
||||||
|
|
||||||
z := obj.(*erasureServerSets)
|
z := obj.(*erasureServerPools)
|
||||||
xl := z.serverSets[0].sets[0]
|
xl := z.serverPools[0].sets[0]
|
||||||
|
|
||||||
// Create "bucket"
|
// Create "bucket"
|
||||||
err = obj.MakeBucketWithLocation(ctx, "bucket", BucketOptions{})
|
err = obj.MakeBucketWithLocation(ctx, "bucket", BucketOptions{})
|
||||||
@ -375,11 +375,11 @@ func TestPutObjectNoQuorum(t *testing.T) {
|
|||||||
erasureDisks[i] = newNaughtyDisk(erasureDisks[i], diskErrors, errFaultyDisk)
|
erasureDisks[i] = newNaughtyDisk(erasureDisks[i], diskErrors, errFaultyDisk)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
z.serverSets[0].erasureDisksMu.Lock()
|
z.serverPools[0].erasureDisksMu.Lock()
|
||||||
xl.getDisks = func() []StorageAPI {
|
xl.getDisks = func() []StorageAPI {
|
||||||
return erasureDisks
|
return erasureDisks
|
||||||
}
|
}
|
||||||
z.serverSets[0].erasureDisksMu.Unlock()
|
z.serverPools[0].erasureDisksMu.Unlock()
|
||||||
// Upload new content to same object "object"
|
// Upload new content to same object "object"
|
||||||
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts)
|
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts)
|
||||||
if !errors.Is(err, errErasureWriteQuorum) {
|
if !errors.Is(err, errErasureWriteQuorum) {
|
||||||
@ -405,8 +405,8 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
|
|||||||
partCount := 3
|
partCount := 3
|
||||||
data := bytes.Repeat([]byte("a"), 6*1024*1024*partCount)
|
data := bytes.Repeat([]byte("a"), 6*1024*1024*partCount)
|
||||||
|
|
||||||
z := obj.(*erasureServerSets)
|
z := obj.(*erasureServerPools)
|
||||||
xl := z.serverSets[0].sets[0]
|
xl := z.serverPools[0].sets[0]
|
||||||
erasureDisks := xl.getDisks()
|
erasureDisks := xl.getDisks()
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(GlobalContext)
|
ctx, cancel := context.WithCancel(GlobalContext)
|
||||||
|
@ -37,34 +37,34 @@ import (
|
|||||||
"github.com/minio/minio/pkg/sync/errgroup"
|
"github.com/minio/minio/pkg/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
type erasureServerSets struct {
|
type erasureServerPools struct {
|
||||||
GatewayUnsupported
|
GatewayUnsupported
|
||||||
|
|
||||||
serverSets []*erasureSets
|
serverPools []*erasureSets
|
||||||
|
|
||||||
// Shut down async operations
|
// Shut down async operations
|
||||||
shutdown context.CancelFunc
|
shutdown context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerSets) SingleZone() bool {
|
func (z *erasureServerPools) SingleZone() bool {
|
||||||
return len(z.serverSets) == 1
|
return len(z.serverPools) == 1
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize new zone of erasure sets.
|
// Initialize new zone of erasure sets.
|
||||||
func newErasureServerSets(ctx context.Context, endpointServerSets EndpointServerSets) (ObjectLayer, error) {
|
func newErasureServerPools(ctx context.Context, endpointServerPools EndpointServerPools) (ObjectLayer, error) {
|
||||||
var (
|
var (
|
||||||
deploymentID string
|
deploymentID string
|
||||||
err error
|
err error
|
||||||
|
|
||||||
formats = make([]*formatErasureV3, len(endpointServerSets))
|
formats = make([]*formatErasureV3, len(endpointServerPools))
|
||||||
storageDisks = make([][]StorageAPI, len(endpointServerSets))
|
storageDisks = make([][]StorageAPI, len(endpointServerPools))
|
||||||
z = &erasureServerSets{serverSets: make([]*erasureSets, len(endpointServerSets))}
|
z = &erasureServerPools{serverPools: make([]*erasureSets, len(endpointServerPools))}
|
||||||
)
|
)
|
||||||
|
|
||||||
var localDrives []string
|
var localDrives []string
|
||||||
|
|
||||||
local := endpointServerSets.FirstLocal()
|
local := endpointServerPools.FirstLocal()
|
||||||
for i, ep := range endpointServerSets {
|
for i, ep := range endpointServerPools {
|
||||||
for _, endpoint := range ep.Endpoints {
|
for _, endpoint := range ep.Endpoints {
|
||||||
if endpoint.IsLocal {
|
if endpoint.IsLocal {
|
||||||
localDrives = append(localDrives, endpoint.Path)
|
localDrives = append(localDrives, endpoint.Path)
|
||||||
@ -78,7 +78,7 @@ func newErasureServerSets(ctx context.Context, endpointServerSets EndpointServer
|
|||||||
if deploymentID == "" {
|
if deploymentID == "" {
|
||||||
deploymentID = formats[i].ID
|
deploymentID = formats[i].ID
|
||||||
}
|
}
|
||||||
z.serverSets[i], err = newErasureSets(ctx, ep.Endpoints, storageDisks[i], formats[i])
|
z.serverPools[i], err = newErasureSets(ctx, ep.Endpoints, storageDisks[i], formats[i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -88,18 +88,18 @@ func newErasureServerSets(ctx context.Context, endpointServerSets EndpointServer
|
|||||||
return z, nil
|
return z, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerSets) NewNSLock(bucket string, objects ...string) RWLocker {
|
func (z *erasureServerPools) NewNSLock(bucket string, objects ...string) RWLocker {
|
||||||
return z.serverSets[0].NewNSLock(bucket, objects...)
|
return z.serverPools[0].NewNSLock(bucket, objects...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetDisksID will return disks by their ID.
|
// GetDisksID will return disks by their ID.
|
||||||
func (z *erasureServerSets) GetDisksID(ids ...string) []StorageAPI {
|
func (z *erasureServerPools) GetDisksID(ids ...string) []StorageAPI {
|
||||||
idMap := make(map[string]struct{})
|
idMap := make(map[string]struct{})
|
||||||
for _, id := range ids {
|
for _, id := range ids {
|
||||||
idMap[id] = struct{}{}
|
idMap[id] = struct{}{}
|
||||||
}
|
}
|
||||||
res := make([]StorageAPI, 0, len(idMap))
|
res := make([]StorageAPI, 0, len(idMap))
|
||||||
for _, ss := range z.serverSets {
|
for _, ss := range z.serverPools {
|
||||||
for _, disks := range ss.erasureDisks {
|
for _, disks := range ss.erasureDisks {
|
||||||
for _, disk := range disks {
|
for _, disk := range disks {
|
||||||
id, _ := disk.GetDiskID()
|
id, _ := disk.GetDiskID()
|
||||||
@ -112,15 +112,15 @@ func (z *erasureServerSets) GetDisksID(ids ...string) []StorageAPI {
|
|||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerSets) GetAllLockers() []dsync.NetLocker {
|
func (z *erasureServerPools) GetAllLockers() []dsync.NetLocker {
|
||||||
return z.serverSets[0].GetAllLockers()
|
return z.serverPools[0].GetAllLockers()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerSets) SetDriveCount() int {
|
func (z *erasureServerPools) SetDriveCount() int {
|
||||||
return z.serverSets[0].SetDriveCount()
|
return z.serverPools[0].SetDriveCount()
|
||||||
}
|
}
|
||||||
|
|
||||||
type serverSetsAvailableSpace []zoneAvailableSpace
|
type serverPoolsAvailableSpace []zoneAvailableSpace
|
||||||
|
|
||||||
type zoneAvailableSpace struct {
|
type zoneAvailableSpace struct {
|
||||||
Index int
|
Index int
|
||||||
@ -128,7 +128,7 @@ type zoneAvailableSpace struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TotalAvailable - total available space
|
// TotalAvailable - total available space
|
||||||
func (p serverSetsAvailableSpace) TotalAvailable() uint64 {
|
func (p serverPoolsAvailableSpace) TotalAvailable() uint64 {
|
||||||
total := uint64(0)
|
total := uint64(0)
|
||||||
for _, z := range p {
|
for _, z := range p {
|
||||||
total += z.Available
|
total += z.Available
|
||||||
@ -137,42 +137,42 @@ func (p serverSetsAvailableSpace) TotalAvailable() uint64 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// getAvailableZoneIdx will return an index that can hold size bytes.
|
// getAvailableZoneIdx will return an index that can hold size bytes.
|
||||||
// -1 is returned if no serverSets have available space for the size given.
|
// -1 is returned if no serverPools have available space for the size given.
|
||||||
func (z *erasureServerSets) getAvailableZoneIdx(ctx context.Context, size int64) int {
|
func (z *erasureServerPools) getAvailableZoneIdx(ctx context.Context, size int64) int {
|
||||||
serverSets := z.getServerSetsAvailableSpace(ctx, size)
|
serverPools := z.getServerPoolsAvailableSpace(ctx, size)
|
||||||
total := serverSets.TotalAvailable()
|
total := serverPools.TotalAvailable()
|
||||||
if total == 0 {
|
if total == 0 {
|
||||||
return -1
|
return -1
|
||||||
}
|
}
|
||||||
// choose when we reach this many
|
// choose when we reach this many
|
||||||
choose := rand.Uint64() % total
|
choose := rand.Uint64() % total
|
||||||
atTotal := uint64(0)
|
atTotal := uint64(0)
|
||||||
for _, zone := range serverSets {
|
for _, zone := range serverPools {
|
||||||
atTotal += zone.Available
|
atTotal += zone.Available
|
||||||
if atTotal > choose && zone.Available > 0 {
|
if atTotal > choose && zone.Available > 0 {
|
||||||
return zone.Index
|
return zone.Index
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Should not happen, but print values just in case.
|
// Should not happen, but print values just in case.
|
||||||
logger.LogIf(ctx, fmt.Errorf("reached end of serverSets (total: %v, atTotal: %v, choose: %v)", total, atTotal, choose))
|
logger.LogIf(ctx, fmt.Errorf("reached end of serverPools (total: %v, atTotal: %v, choose: %v)", total, atTotal, choose))
|
||||||
return -1
|
return -1
|
||||||
}
|
}
|
||||||
|
|
||||||
// getServerSetsAvailableSpace will return the available space of each zone after storing the content.
|
// getServerPoolsAvailableSpace will return the available space of each zone after storing the content.
|
||||||
// If there is not enough space the zone will return 0 bytes available.
|
// If there is not enough space the zone will return 0 bytes available.
|
||||||
// Negative sizes are seen as 0 bytes.
|
// Negative sizes are seen as 0 bytes.
|
||||||
func (z *erasureServerSets) getServerSetsAvailableSpace(ctx context.Context, size int64) serverSetsAvailableSpace {
|
func (z *erasureServerPools) getServerPoolsAvailableSpace(ctx context.Context, size int64) serverPoolsAvailableSpace {
|
||||||
if size < 0 {
|
if size < 0 {
|
||||||
size = 0
|
size = 0
|
||||||
}
|
}
|
||||||
var serverSets = make(serverSetsAvailableSpace, len(z.serverSets))
|
var serverPools = make(serverPoolsAvailableSpace, len(z.serverPools))
|
||||||
|
|
||||||
storageInfos := make([]StorageInfo, len(z.serverSets))
|
storageInfos := make([]StorageInfo, len(z.serverPools))
|
||||||
g := errgroup.WithNErrs(len(z.serverSets))
|
g := errgroup.WithNErrs(len(z.serverPools))
|
||||||
for index := range z.serverSets {
|
for index := range z.serverPools {
|
||||||
index := index
|
index := index
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
storageInfos[index] = z.serverSets[index].StorageUsageInfo(ctx)
|
storageInfos[index] = z.serverPools[index].StorageUsageInfo(ctx)
|
||||||
return nil
|
return nil
|
||||||
}, index)
|
}, index)
|
||||||
}
|
}
|
||||||
@ -201,21 +201,21 @@ func (z *erasureServerSets) getServerSetsAvailableSpace(ctx context.Context, siz
|
|||||||
available = 0
|
available = 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
serverSets[i] = zoneAvailableSpace{
|
serverPools[i] = zoneAvailableSpace{
|
||||||
Index: i,
|
Index: i,
|
||||||
Available: available,
|
Available: available,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return serverSets
|
return serverPools
|
||||||
}
|
}
|
||||||
|
|
||||||
// getZoneIdx returns the found previous object and its corresponding zone idx,
|
// getZoneIdx returns the found previous object and its corresponding zone idx,
|
||||||
// if none are found falls back to most available space zone.
|
// if none are found falls back to most available space zone.
|
||||||
func (z *erasureServerSets) getZoneIdx(ctx context.Context, bucket, object string, opts ObjectOptions, size int64) (idx int, err error) {
|
func (z *erasureServerPools) getZoneIdx(ctx context.Context, bucket, object string, opts ObjectOptions, size int64) (idx int, err error) {
|
||||||
if z.SingleZone() {
|
if z.SingleZone() {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
for i, zone := range z.serverSets {
|
for i, zone := range z.serverPools {
|
||||||
objInfo, err := zone.GetObjectInfo(ctx, bucket, object, opts)
|
objInfo, err := zone.GetObjectInfo(ctx, bucket, object, opts)
|
||||||
switch err.(type) {
|
switch err.(type) {
|
||||||
case ObjectNotFound:
|
case ObjectNotFound:
|
||||||
@ -245,15 +245,15 @@ func (z *erasureServerSets) getZoneIdx(ctx context.Context, bucket, object strin
|
|||||||
return idx, nil
|
return idx, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerSets) Shutdown(ctx context.Context) error {
|
func (z *erasureServerPools) Shutdown(ctx context.Context) error {
|
||||||
defer z.shutdown()
|
defer z.shutdown()
|
||||||
|
|
||||||
g := errgroup.WithNErrs(len(z.serverSets))
|
g := errgroup.WithNErrs(len(z.serverPools))
|
||||||
|
|
||||||
for index := range z.serverSets {
|
for index := range z.serverPools {
|
||||||
index := index
|
index := index
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
return z.serverSets[index].Shutdown(ctx)
|
return z.serverPools[index].Shutdown(ctx)
|
||||||
}, index)
|
}, index)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -266,17 +266,17 @@ func (z *erasureServerSets) Shutdown(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerSets) StorageInfo(ctx context.Context, local bool) (StorageInfo, []error) {
|
func (z *erasureServerPools) StorageInfo(ctx context.Context, local bool) (StorageInfo, []error) {
|
||||||
var storageInfo StorageInfo
|
var storageInfo StorageInfo
|
||||||
storageInfo.Backend.Type = BackendErasure
|
storageInfo.Backend.Type = BackendErasure
|
||||||
|
|
||||||
storageInfos := make([]StorageInfo, len(z.serverSets))
|
storageInfos := make([]StorageInfo, len(z.serverPools))
|
||||||
storageInfosErrs := make([][]error, len(z.serverSets))
|
storageInfosErrs := make([][]error, len(z.serverPools))
|
||||||
g := errgroup.WithNErrs(len(z.serverSets))
|
g := errgroup.WithNErrs(len(z.serverPools))
|
||||||
for index := range z.serverSets {
|
for index := range z.serverPools {
|
||||||
index := index
|
index := index
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
storageInfos[index], storageInfosErrs[index] = z.serverSets[index].StorageInfo(ctx, local)
|
storageInfos[index], storageInfosErrs[index] = z.serverPools[index].StorageInfo(ctx, local)
|
||||||
return nil
|
return nil
|
||||||
}, index)
|
}, index)
|
||||||
}
|
}
|
||||||
@ -302,13 +302,13 @@ func (z *erasureServerSets) StorageInfo(ctx context.Context, local bool) (Storag
|
|||||||
storageInfo.Backend.RRSCParity = rrSCParity
|
storageInfo.Backend.RRSCParity = rrSCParity
|
||||||
|
|
||||||
var errs []error
|
var errs []error
|
||||||
for i := range z.serverSets {
|
for i := range z.serverPools {
|
||||||
errs = append(errs, storageInfosErrs[i]...)
|
errs = append(errs, storageInfosErrs[i]...)
|
||||||
}
|
}
|
||||||
return storageInfo, errs
|
return storageInfo, errs
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerSets) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error {
|
func (z *erasureServerPools) CrawlAndGetDataUsage(ctx context.Context, bf *bloomFilter, updates chan<- DataUsageInfo) error {
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
@ -319,8 +319,8 @@ func (z *erasureServerSets) CrawlAndGetDataUsage(ctx context.Context, bf *bloomF
|
|||||||
var knownBuckets = make(map[string]struct{}) // used to deduplicate buckets.
|
var knownBuckets = make(map[string]struct{}) // used to deduplicate buckets.
|
||||||
var allBuckets []BucketInfo
|
var allBuckets []BucketInfo
|
||||||
|
|
||||||
// Collect for each set in serverSets.
|
// Collect for each set in serverPools.
|
||||||
for _, z := range z.serverSets {
|
for _, z := range z.serverPools {
|
||||||
buckets, err := z.ListBuckets(ctx)
|
buckets, err := z.ListBuckets(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -424,17 +424,17 @@ func (z *erasureServerSets) CrawlAndGetDataUsage(ctx context.Context, bf *bloomF
|
|||||||
return firstErr
|
return firstErr
|
||||||
}
|
}
|
||||||
|
|
||||||
// MakeBucketWithLocation - creates a new bucket across all serverSets simultaneously
|
// MakeBucketWithLocation - creates a new bucket across all serverPools simultaneously
|
||||||
// even if one of the sets fail to create buckets, we proceed all the successful
|
// even if one of the sets fail to create buckets, we proceed all the successful
|
||||||
// operations.
|
// operations.
|
||||||
func (z *erasureServerSets) MakeBucketWithLocation(ctx context.Context, bucket string, opts BucketOptions) error {
|
func (z *erasureServerPools) MakeBucketWithLocation(ctx context.Context, bucket string, opts BucketOptions) error {
|
||||||
g := errgroup.WithNErrs(len(z.serverSets))
|
g := errgroup.WithNErrs(len(z.serverPools))
|
||||||
|
|
||||||
// Create buckets in parallel across all sets.
|
// Create buckets in parallel across all sets.
|
||||||
for index := range z.serverSets {
|
for index := range z.serverPools {
|
||||||
index := index
|
index := index
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
return z.serverSets[index].MakeBucketWithLocation(ctx, bucket, opts)
|
return z.serverPools[index].MakeBucketWithLocation(ctx, bucket, opts)
|
||||||
}, index)
|
}, index)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -464,14 +464,14 @@ func (z *erasureServerSets) MakeBucketWithLocation(ctx context.Context, bucket s
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerSets) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
|
func (z *erasureServerPools) GetObjectNInfo(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error) {
|
||||||
if err = checkGetObjArgs(ctx, bucket, object); err != nil {
|
if err = checkGetObjArgs(ctx, bucket, object); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
object = encodeDirObject(object)
|
object = encodeDirObject(object)
|
||||||
|
|
||||||
for _, zone := range z.serverSets {
|
for _, zone := range z.serverPools {
|
||||||
gr, err = zone.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
|
gr, err = zone.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
||||||
@ -487,14 +487,14 @@ func (z *erasureServerSets) GetObjectNInfo(ctx context.Context, bucket, object s
|
|||||||
return gr, ObjectNotFound{Bucket: bucket, Object: object}
|
return gr, ObjectNotFound{Bucket: bucket, Object: object}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerSets) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error {
|
func (z *erasureServerPools) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts ObjectOptions) error {
|
||||||
if err := checkGetObjArgs(ctx, bucket, object); err != nil {
|
if err := checkGetObjArgs(ctx, bucket, object); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
object = encodeDirObject(object)
|
object = encodeDirObject(object)
|
||||||
|
|
||||||
for _, zone := range z.serverSets {
|
for _, zone := range z.serverPools {
|
||||||
if err := zone.GetObject(ctx, bucket, object, startOffset, length, writer, etag, opts); err != nil {
|
if err := zone.GetObject(ctx, bucket, object, startOffset, length, writer, etag, opts); err != nil {
|
||||||
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
||||||
continue
|
continue
|
||||||
@ -509,13 +509,13 @@ func (z *erasureServerSets) GetObject(ctx context.Context, bucket, object string
|
|||||||
return ObjectNotFound{Bucket: bucket, Object: object}
|
return ObjectNotFound{Bucket: bucket, Object: object}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerSets) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
func (z *erasureServerPools) GetObjectInfo(ctx context.Context, bucket, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
||||||
if err = checkGetObjArgs(ctx, bucket, object); err != nil {
|
if err = checkGetObjArgs(ctx, bucket, object); err != nil {
|
||||||
return objInfo, err
|
return objInfo, err
|
||||||
}
|
}
|
||||||
|
|
||||||
object = encodeDirObject(object)
|
object = encodeDirObject(object)
|
||||||
for _, zone := range z.serverSets {
|
for _, zone := range z.serverPools {
|
||||||
objInfo, err = zone.GetObjectInfo(ctx, bucket, object, opts)
|
objInfo, err = zone.GetObjectInfo(ctx, bucket, object, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
||||||
@ -533,7 +533,7 @@ func (z *erasureServerSets) GetObjectInfo(ctx context.Context, bucket, object st
|
|||||||
}
|
}
|
||||||
|
|
||||||
// PutObject - writes an object to least used erasure zone.
|
// PutObject - writes an object to least used erasure zone.
|
||||||
func (z *erasureServerSets) PutObject(ctx context.Context, bucket string, object string, data *PutObjReader, opts ObjectOptions) (ObjectInfo, error) {
|
func (z *erasureServerPools) PutObject(ctx context.Context, bucket string, object string, data *PutObjReader, opts ObjectOptions) (ObjectInfo, error) {
|
||||||
// Validate put object input args.
|
// Validate put object input args.
|
||||||
if err := checkPutObjectArgs(ctx, bucket, object, z); err != nil {
|
if err := checkPutObjectArgs(ctx, bucket, object, z); err != nil {
|
||||||
return ObjectInfo{}, err
|
return ObjectInfo{}, err
|
||||||
@ -542,7 +542,7 @@ func (z *erasureServerSets) PutObject(ctx context.Context, bucket string, object
|
|||||||
object = encodeDirObject(object)
|
object = encodeDirObject(object)
|
||||||
|
|
||||||
if z.SingleZone() {
|
if z.SingleZone() {
|
||||||
return z.serverSets[0].PutObject(ctx, bucket, object, data, opts)
|
return z.serverPools[0].PutObject(ctx, bucket, object, data, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
idx, err := z.getZoneIdx(ctx, bucket, object, opts, data.Size())
|
idx, err := z.getZoneIdx(ctx, bucket, object, opts, data.Size())
|
||||||
@ -551,10 +551,10 @@ func (z *erasureServerSets) PutObject(ctx context.Context, bucket string, object
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Overwrite the object at the right zone
|
// Overwrite the object at the right zone
|
||||||
return z.serverSets[idx].PutObject(ctx, bucket, object, data, opts)
|
return z.serverPools[idx].PutObject(ctx, bucket, object, data, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerSets) DeleteObject(ctx context.Context, bucket string, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
func (z *erasureServerPools) DeleteObject(ctx context.Context, bucket string, object string, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
||||||
if err = checkDelObjArgs(ctx, bucket, object); err != nil {
|
if err = checkDelObjArgs(ctx, bucket, object); err != nil {
|
||||||
return objInfo, err
|
return objInfo, err
|
||||||
}
|
}
|
||||||
@ -562,9 +562,9 @@ func (z *erasureServerSets) DeleteObject(ctx context.Context, bucket string, obj
|
|||||||
object = encodeDirObject(object)
|
object = encodeDirObject(object)
|
||||||
|
|
||||||
if z.SingleZone() {
|
if z.SingleZone() {
|
||||||
return z.serverSets[0].DeleteObject(ctx, bucket, object, opts)
|
return z.serverPools[0].DeleteObject(ctx, bucket, object, opts)
|
||||||
}
|
}
|
||||||
for _, zone := range z.serverSets {
|
for _, zone := range z.serverPools {
|
||||||
objInfo, err = zone.DeleteObject(ctx, bucket, object, opts)
|
objInfo, err = zone.DeleteObject(ctx, bucket, object, opts)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return objInfo, nil
|
return objInfo, nil
|
||||||
@ -577,7 +577,7 @@ func (z *erasureServerSets) DeleteObject(ctx context.Context, bucket string, obj
|
|||||||
return objInfo, err
|
return objInfo, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerSets) DeleteObjects(ctx context.Context, bucket string, objects []ObjectToDelete, opts ObjectOptions) ([]DeletedObject, []error) {
|
func (z *erasureServerPools) DeleteObjects(ctx context.Context, bucket string, objects []ObjectToDelete, opts ObjectOptions) ([]DeletedObject, []error) {
|
||||||
derrs := make([]error, len(objects))
|
derrs := make([]error, len(objects))
|
||||||
dobjects := make([]DeletedObject, len(objects))
|
dobjects := make([]DeletedObject, len(objects))
|
||||||
objSets := set.NewStringSet()
|
objSets := set.NewStringSet()
|
||||||
@ -599,10 +599,10 @@ func (z *erasureServerSets) DeleteObjects(ctx context.Context, bucket string, ob
|
|||||||
defer multiDeleteLock.Unlock()
|
defer multiDeleteLock.Unlock()
|
||||||
|
|
||||||
if z.SingleZone() {
|
if z.SingleZone() {
|
||||||
return z.serverSets[0].DeleteObjects(ctx, bucket, objects, opts)
|
return z.serverPools[0].DeleteObjects(ctx, bucket, objects, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, zone := range z.serverSets {
|
for _, zone := range z.serverPools {
|
||||||
deletedObjects, errs := zone.DeleteObjects(ctx, bucket, objects, opts)
|
deletedObjects, errs := zone.DeleteObjects(ctx, bucket, objects, opts)
|
||||||
for i, derr := range errs {
|
for i, derr := range errs {
|
||||||
if derr != nil {
|
if derr != nil {
|
||||||
@ -614,7 +614,7 @@ func (z *erasureServerSets) DeleteObjects(ctx context.Context, bucket string, ob
|
|||||||
return dobjects, derrs
|
return dobjects, derrs
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerSets) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error) {
|
func (z *erasureServerPools) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (objInfo ObjectInfo, err error) {
|
||||||
srcObject = encodeDirObject(srcObject)
|
srcObject = encodeDirObject(srcObject)
|
||||||
dstObject = encodeDirObject(dstObject)
|
dstObject = encodeDirObject(dstObject)
|
||||||
|
|
||||||
@ -628,12 +628,12 @@ func (z *erasureServerSets) CopyObject(ctx context.Context, srcBucket, srcObject
|
|||||||
if cpSrcDstSame && srcInfo.metadataOnly {
|
if cpSrcDstSame && srcInfo.metadataOnly {
|
||||||
// Version ID is set for the destination and source == destination version ID.
|
// Version ID is set for the destination and source == destination version ID.
|
||||||
if dstOpts.VersionID != "" && srcOpts.VersionID == dstOpts.VersionID {
|
if dstOpts.VersionID != "" && srcOpts.VersionID == dstOpts.VersionID {
|
||||||
return z.serverSets[zoneIdx].CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
|
return z.serverPools[zoneIdx].CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
|
||||||
}
|
}
|
||||||
// Destination is not versioned and source version ID is empty
|
// Destination is not versioned and source version ID is empty
|
||||||
// perform an in-place update.
|
// perform an in-place update.
|
||||||
if !dstOpts.Versioned && srcOpts.VersionID == "" {
|
if !dstOpts.Versioned && srcOpts.VersionID == "" {
|
||||||
return z.serverSets[zoneIdx].CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
|
return z.serverPools[zoneIdx].CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
|
||||||
}
|
}
|
||||||
// Destination is versioned, source is not destination version,
|
// Destination is versioned, source is not destination version,
|
||||||
// as a special case look for if the source object is not legacy
|
// as a special case look for if the source object is not legacy
|
||||||
@ -643,7 +643,7 @@ func (z *erasureServerSets) CopyObject(ctx context.Context, srcBucket, srcObject
|
|||||||
// CopyObject optimization where we don't create an entire copy
|
// CopyObject optimization where we don't create an entire copy
|
||||||
// of the content, instead we add a reference.
|
// of the content, instead we add a reference.
|
||||||
srcInfo.versionOnly = true
|
srcInfo.versionOnly = true
|
||||||
return z.serverSets[zoneIdx].CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
|
return z.serverPools[zoneIdx].CopyObject(ctx, srcBucket, srcObject, dstBucket, dstObject, srcInfo, srcOpts, dstOpts)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -655,10 +655,10 @@ func (z *erasureServerSets) CopyObject(ctx context.Context, srcBucket, srcObject
|
|||||||
MTime: dstOpts.MTime,
|
MTime: dstOpts.MTime,
|
||||||
}
|
}
|
||||||
|
|
||||||
return z.serverSets[zoneIdx].PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts)
|
return z.serverPools[zoneIdx].PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, putOpts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerSets) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (ListObjectsV2Info, error) {
|
func (z *erasureServerPools) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (ListObjectsV2Info, error) {
|
||||||
marker := continuationToken
|
marker := continuationToken
|
||||||
if marker == "" {
|
if marker == "" {
|
||||||
marker = startAfter
|
marker = startAfter
|
||||||
@ -679,7 +679,7 @@ func (z *erasureServerSets) ListObjectsV2(ctx context.Context, bucket, prefix, c
|
|||||||
return listObjectsV2Info, err
|
return listObjectsV2Info, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerSets) ListObjectVersions(ctx context.Context, bucket, prefix, marker, versionMarker, delimiter string, maxKeys int) (ListObjectVersionsInfo, error) {
|
func (z *erasureServerPools) ListObjectVersions(ctx context.Context, bucket, prefix, marker, versionMarker, delimiter string, maxKeys int) (ListObjectVersionsInfo, error) {
|
||||||
loi := ListObjectVersionsInfo{}
|
loi := ListObjectVersionsInfo{}
|
||||||
if marker == "" && versionMarker != "" {
|
if marker == "" && versionMarker != "" {
|
||||||
return loi, NotImplemented{}
|
return loi, NotImplemented{}
|
||||||
@ -723,7 +723,7 @@ func (z *erasureServerSets) ListObjectVersions(ctx context.Context, bucket, pref
|
|||||||
return loi, nil
|
return loi, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerSets) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
|
func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsInfo, error) {
|
||||||
var loi ListObjectsInfo
|
var loi ListObjectsInfo
|
||||||
merged, err := z.listPath(ctx, listPathOptions{
|
merged, err := z.listPath(ctx, listPathOptions{
|
||||||
Bucket: bucket,
|
Bucket: bucket,
|
||||||
@ -747,13 +747,13 @@ func (z *erasureServerSets) ListObjects(ctx context.Context, bucket, prefix, mar
|
|||||||
return loi, nil
|
return loi, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerSets) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
|
func (z *erasureServerPools) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
|
||||||
if err := checkListMultipartArgs(ctx, bucket, prefix, keyMarker, uploadIDMarker, delimiter, z); err != nil {
|
if err := checkListMultipartArgs(ctx, bucket, prefix, keyMarker, uploadIDMarker, delimiter, z); err != nil {
|
||||||
return ListMultipartsInfo{}, err
|
return ListMultipartsInfo{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if z.SingleZone() {
|
if z.SingleZone() {
|
||||||
return z.serverSets[0].ListMultipartUploads(ctx, bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
|
return z.serverPools[0].ListMultipartUploads(ctx, bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
|
||||||
}
|
}
|
||||||
|
|
||||||
var zoneResult = ListMultipartsInfo{}
|
var zoneResult = ListMultipartsInfo{}
|
||||||
@ -761,7 +761,7 @@ func (z *erasureServerSets) ListMultipartUploads(ctx context.Context, bucket, pr
|
|||||||
zoneResult.KeyMarker = keyMarker
|
zoneResult.KeyMarker = keyMarker
|
||||||
zoneResult.Prefix = prefix
|
zoneResult.Prefix = prefix
|
||||||
zoneResult.Delimiter = delimiter
|
zoneResult.Delimiter = delimiter
|
||||||
for _, zone := range z.serverSets {
|
for _, zone := range z.serverPools {
|
||||||
result, err := zone.ListMultipartUploads(ctx, bucket, prefix, keyMarker, uploadIDMarker,
|
result, err := zone.ListMultipartUploads(ctx, bucket, prefix, keyMarker, uploadIDMarker,
|
||||||
delimiter, maxUploads)
|
delimiter, maxUploads)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -773,13 +773,13 @@ func (z *erasureServerSets) ListMultipartUploads(ctx context.Context, bucket, pr
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Initiate a new multipart upload on a hashedSet based on object name.
|
// Initiate a new multipart upload on a hashedSet based on object name.
|
||||||
func (z *erasureServerSets) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (string, error) {
|
func (z *erasureServerPools) NewMultipartUpload(ctx context.Context, bucket, object string, opts ObjectOptions) (string, error) {
|
||||||
if err := checkNewMultipartArgs(ctx, bucket, object, z); err != nil {
|
if err := checkNewMultipartArgs(ctx, bucket, object, z); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
if z.SingleZone() {
|
if z.SingleZone() {
|
||||||
return z.serverSets[0].NewMultipartUpload(ctx, bucket, object, opts)
|
return z.serverPools[0].NewMultipartUpload(ctx, bucket, object, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// We don't know the exact size, so we ask for at least 1GiB file.
|
// We don't know the exact size, so we ask for at least 1GiB file.
|
||||||
@ -788,11 +788,11 @@ func (z *erasureServerSets) NewMultipartUpload(ctx context.Context, bucket, obje
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
return z.serverSets[idx].NewMultipartUpload(ctx, bucket, object, opts)
|
return z.serverPools[idx].NewMultipartUpload(ctx, bucket, object, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copies a part of an object from source hashedSet to destination hashedSet.
|
// Copies a part of an object from source hashedSet to destination hashedSet.
|
||||||
func (z *erasureServerSets) CopyObjectPart(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, uploadID string, partID int, startOffset int64, length int64, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (PartInfo, error) {
|
func (z *erasureServerPools) CopyObjectPart(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, uploadID string, partID int, startOffset int64, length int64, srcInfo ObjectInfo, srcOpts, dstOpts ObjectOptions) (PartInfo, error) {
|
||||||
if err := checkNewMultipartArgs(ctx, srcBucket, srcObject, z); err != nil {
|
if err := checkNewMultipartArgs(ctx, srcBucket, srcObject, z); err != nil {
|
||||||
return PartInfo{}, err
|
return PartInfo{}, err
|
||||||
}
|
}
|
||||||
@ -802,16 +802,16 @@ func (z *erasureServerSets) CopyObjectPart(ctx context.Context, srcBucket, srcOb
|
|||||||
}
|
}
|
||||||
|
|
||||||
// PutObjectPart - writes part of an object to hashedSet based on the object name.
|
// PutObjectPart - writes part of an object to hashedSet based on the object name.
|
||||||
func (z *erasureServerSets) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *PutObjReader, opts ObjectOptions) (PartInfo, error) {
|
func (z *erasureServerPools) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *PutObjReader, opts ObjectOptions) (PartInfo, error) {
|
||||||
if err := checkPutObjectPartArgs(ctx, bucket, object, z); err != nil {
|
if err := checkPutObjectPartArgs(ctx, bucket, object, z); err != nil {
|
||||||
return PartInfo{}, err
|
return PartInfo{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if z.SingleZone() {
|
if z.SingleZone() {
|
||||||
return z.serverSets[0].PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts)
|
return z.serverPools[0].PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, zone := range z.serverSets {
|
for _, zone := range z.serverPools {
|
||||||
_, err := zone.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
|
_, err := zone.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return zone.PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts)
|
return zone.PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts)
|
||||||
@ -832,15 +832,15 @@ func (z *erasureServerSets) PutObjectPart(ctx context.Context, bucket, object, u
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerSets) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (MultipartInfo, error) {
|
func (z *erasureServerPools) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) (MultipartInfo, error) {
|
||||||
if err := checkListPartsArgs(ctx, bucket, object, z); err != nil {
|
if err := checkListPartsArgs(ctx, bucket, object, z); err != nil {
|
||||||
return MultipartInfo{}, err
|
return MultipartInfo{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if z.SingleZone() {
|
if z.SingleZone() {
|
||||||
return z.serverSets[0].GetMultipartInfo(ctx, bucket, object, uploadID, opts)
|
return z.serverPools[0].GetMultipartInfo(ctx, bucket, object, uploadID, opts)
|
||||||
}
|
}
|
||||||
for _, zone := range z.serverSets {
|
for _, zone := range z.serverPools {
|
||||||
mi, err := zone.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
|
mi, err := zone.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return mi, nil
|
return mi, nil
|
||||||
@ -862,15 +862,15 @@ func (z *erasureServerSets) GetMultipartInfo(ctx context.Context, bucket, object
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ListObjectParts - lists all uploaded parts to an object in hashedSet.
|
// ListObjectParts - lists all uploaded parts to an object in hashedSet.
|
||||||
func (z *erasureServerSets) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int, opts ObjectOptions) (ListPartsInfo, error) {
|
func (z *erasureServerPools) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int, opts ObjectOptions) (ListPartsInfo, error) {
|
||||||
if err := checkListPartsArgs(ctx, bucket, object, z); err != nil {
|
if err := checkListPartsArgs(ctx, bucket, object, z); err != nil {
|
||||||
return ListPartsInfo{}, err
|
return ListPartsInfo{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if z.SingleZone() {
|
if z.SingleZone() {
|
||||||
return z.serverSets[0].ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts)
|
return z.serverPools[0].ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts)
|
||||||
}
|
}
|
||||||
for _, zone := range z.serverSets {
|
for _, zone := range z.serverPools {
|
||||||
_, err := zone.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
|
_, err := zone.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return zone.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts)
|
return zone.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts)
|
||||||
@ -889,16 +889,16 @@ func (z *erasureServerSets) ListObjectParts(ctx context.Context, bucket, object,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Aborts an in-progress multipart operation on hashedSet based on the object name.
|
// Aborts an in-progress multipart operation on hashedSet based on the object name.
|
||||||
func (z *erasureServerSets) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error {
|
func (z *erasureServerPools) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts ObjectOptions) error {
|
||||||
if err := checkAbortMultipartArgs(ctx, bucket, object, z); err != nil {
|
if err := checkAbortMultipartArgs(ctx, bucket, object, z); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if z.SingleZone() {
|
if z.SingleZone() {
|
||||||
return z.serverSets[0].AbortMultipartUpload(ctx, bucket, object, uploadID, opts)
|
return z.serverPools[0].AbortMultipartUpload(ctx, bucket, object, uploadID, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, zone := range z.serverSets {
|
for _, zone := range z.serverPools {
|
||||||
_, err := zone.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
|
_, err := zone.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return zone.AbortMultipartUpload(ctx, bucket, object, uploadID, opts)
|
return zone.AbortMultipartUpload(ctx, bucket, object, uploadID, opts)
|
||||||
@ -918,21 +918,21 @@ func (z *erasureServerSets) AbortMultipartUpload(ctx context.Context, bucket, ob
|
|||||||
}
|
}
|
||||||
|
|
||||||
// CompleteMultipartUpload - completes a pending multipart transaction, on hashedSet based on object name.
|
// CompleteMultipartUpload - completes a pending multipart transaction, on hashedSet based on object name.
|
||||||
func (z *erasureServerSets) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
func (z *erasureServerPools) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart, opts ObjectOptions) (objInfo ObjectInfo, err error) {
|
||||||
if err = checkCompleteMultipartArgs(ctx, bucket, object, z); err != nil {
|
if err = checkCompleteMultipartArgs(ctx, bucket, object, z); err != nil {
|
||||||
return objInfo, err
|
return objInfo, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if z.SingleZone() {
|
if z.SingleZone() {
|
||||||
return z.serverSets[0].CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts)
|
return z.serverPools[0].CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Purge any existing object.
|
// Purge any existing object.
|
||||||
for _, zone := range z.serverSets {
|
for _, zone := range z.serverPools {
|
||||||
zone.DeleteObject(ctx, bucket, object, opts)
|
zone.DeleteObject(ctx, bucket, object, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, zone := range z.serverSets {
|
for _, zone := range z.serverPools {
|
||||||
result, err := zone.ListMultipartUploads(ctx, bucket, object, "", "", "", maxUploadsList)
|
result, err := zone.ListMultipartUploads(ctx, bucket, object, "", "", "", maxUploadsList)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return objInfo, err
|
return objInfo, err
|
||||||
@ -948,10 +948,10 @@ func (z *erasureServerSets) CompleteMultipartUpload(ctx context.Context, bucket,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetBucketInfo - returns bucket info from one of the erasure coded serverSets.
|
// GetBucketInfo - returns bucket info from one of the erasure coded serverPools.
|
||||||
func (z *erasureServerSets) GetBucketInfo(ctx context.Context, bucket string) (bucketInfo BucketInfo, err error) {
|
func (z *erasureServerPools) GetBucketInfo(ctx context.Context, bucket string) (bucketInfo BucketInfo, err error) {
|
||||||
if z.SingleZone() {
|
if z.SingleZone() {
|
||||||
bucketInfo, err = z.serverSets[0].GetBucketInfo(ctx, bucket)
|
bucketInfo, err = z.serverPools[0].GetBucketInfo(ctx, bucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return bucketInfo, err
|
return bucketInfo, err
|
||||||
}
|
}
|
||||||
@ -961,7 +961,7 @@ func (z *erasureServerSets) GetBucketInfo(ctx context.Context, bucket string) (b
|
|||||||
}
|
}
|
||||||
return bucketInfo, nil
|
return bucketInfo, nil
|
||||||
}
|
}
|
||||||
for _, zone := range z.serverSets {
|
for _, zone := range z.serverPools {
|
||||||
bucketInfo, err = zone.GetBucketInfo(ctx, bucket)
|
bucketInfo, err = zone.GetBucketInfo(ctx, bucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if isErrBucketNotFound(err) {
|
if isErrBucketNotFound(err) {
|
||||||
@ -981,43 +981,43 @@ func (z *erasureServerSets) GetBucketInfo(ctx context.Context, bucket string) (b
|
|||||||
}
|
}
|
||||||
|
|
||||||
// IsNotificationSupported returns whether bucket notification is applicable for this layer.
|
// IsNotificationSupported returns whether bucket notification is applicable for this layer.
|
||||||
func (z *erasureServerSets) IsNotificationSupported() bool {
|
func (z *erasureServerPools) IsNotificationSupported() bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsListenSupported returns whether listen bucket notification is applicable for this layer.
|
// IsListenSupported returns whether listen bucket notification is applicable for this layer.
|
||||||
func (z *erasureServerSets) IsListenSupported() bool {
|
func (z *erasureServerPools) IsListenSupported() bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsEncryptionSupported returns whether server side encryption is implemented for this layer.
|
// IsEncryptionSupported returns whether server side encryption is implemented for this layer.
|
||||||
func (z *erasureServerSets) IsEncryptionSupported() bool {
|
func (z *erasureServerPools) IsEncryptionSupported() bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsCompressionSupported returns whether compression is applicable for this layer.
|
// IsCompressionSupported returns whether compression is applicable for this layer.
|
||||||
func (z *erasureServerSets) IsCompressionSupported() bool {
|
func (z *erasureServerPools) IsCompressionSupported() bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerSets) IsTaggingSupported() bool {
|
func (z *erasureServerPools) IsTaggingSupported() bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteBucket - deletes a bucket on all serverSets simultaneously,
|
// DeleteBucket - deletes a bucket on all serverPools simultaneously,
|
||||||
// even if one of the serverSets fail to delete buckets, we proceed to
|
// even if one of the serverPools fail to delete buckets, we proceed to
|
||||||
// undo a successful operation.
|
// undo a successful operation.
|
||||||
func (z *erasureServerSets) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
|
func (z *erasureServerPools) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
|
||||||
if z.SingleZone() {
|
if z.SingleZone() {
|
||||||
return z.serverSets[0].DeleteBucket(ctx, bucket, forceDelete)
|
return z.serverPools[0].DeleteBucket(ctx, bucket, forceDelete)
|
||||||
}
|
}
|
||||||
g := errgroup.WithNErrs(len(z.serverSets))
|
g := errgroup.WithNErrs(len(z.serverPools))
|
||||||
|
|
||||||
// Delete buckets in parallel across all serverSets.
|
// Delete buckets in parallel across all serverPools.
|
||||||
for index := range z.serverSets {
|
for index := range z.serverPools {
|
||||||
index := index
|
index := index
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
return z.serverSets[index].DeleteBucket(ctx, bucket, forceDelete)
|
return z.serverPools[index].DeleteBucket(ctx, bucket, forceDelete)
|
||||||
}, index)
|
}, index)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1028,7 +1028,7 @@ func (z *erasureServerSets) DeleteBucket(ctx context.Context, bucket string, for
|
|||||||
for _, err := range errs {
|
for _, err := range errs {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if _, ok := err.(InsufficientWriteQuorum); ok {
|
if _, ok := err.(InsufficientWriteQuorum); ok {
|
||||||
undoDeleteBucketServerSets(ctx, bucket, z.serverSets, errs)
|
undoDeleteBucketServerPools(ctx, bucket, z.serverPools, errs)
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
@ -1043,9 +1043,9 @@ func (z *erasureServerSets) DeleteBucket(ctx context.Context, bucket string, for
|
|||||||
// Note that set distribution is ignored so it should only be used in cases where
|
// Note that set distribution is ignored so it should only be used in cases where
|
||||||
// data is not distributed across sets.
|
// data is not distributed across sets.
|
||||||
// Errors are logged but individual disk failures are not returned.
|
// Errors are logged but individual disk failures are not returned.
|
||||||
func (z *erasureServerSets) deleteAll(ctx context.Context, bucket, prefix string) {
|
func (z *erasureServerPools) deleteAll(ctx context.Context, bucket, prefix string) {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for _, servers := range z.serverSets {
|
for _, servers := range z.serverPools {
|
||||||
for _, set := range servers.sets {
|
for _, set := range servers.sets {
|
||||||
for _, disk := range set.getDisks() {
|
for _, disk := range set.getDisks() {
|
||||||
if disk == nil {
|
if disk == nil {
|
||||||
@ -1063,15 +1063,15 @@ func (z *erasureServerSets) deleteAll(ctx context.Context, bucket, prefix string
|
|||||||
}
|
}
|
||||||
|
|
||||||
// This function is used to undo a successful DeleteBucket operation.
|
// This function is used to undo a successful DeleteBucket operation.
|
||||||
func undoDeleteBucketServerSets(ctx context.Context, bucket string, serverSets []*erasureSets, errs []error) {
|
func undoDeleteBucketServerPools(ctx context.Context, bucket string, serverPools []*erasureSets, errs []error) {
|
||||||
g := errgroup.WithNErrs(len(serverSets))
|
g := errgroup.WithNErrs(len(serverPools))
|
||||||
|
|
||||||
// Undo previous delete bucket on all underlying serverSets.
|
// Undo previous delete bucket on all underlying serverPools.
|
||||||
for index := range serverSets {
|
for index := range serverPools {
|
||||||
index := index
|
index := index
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
if errs[index] == nil {
|
if errs[index] == nil {
|
||||||
return serverSets[index].MakeBucketWithLocation(ctx, bucket, BucketOptions{})
|
return serverPools[index].MakeBucketWithLocation(ctx, bucket, BucketOptions{})
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}, index)
|
}, index)
|
||||||
@ -1080,14 +1080,14 @@ func undoDeleteBucketServerSets(ctx context.Context, bucket string, serverSets [
|
|||||||
g.Wait()
|
g.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
// List all buckets from one of the serverSets, we are not doing merge
|
// List all buckets from one of the serverPools, we are not doing merge
|
||||||
// sort here just for simplification. As per design it is assumed
|
// sort here just for simplification. As per design it is assumed
|
||||||
// that all buckets are present on all serverSets.
|
// that all buckets are present on all serverPools.
|
||||||
func (z *erasureServerSets) ListBuckets(ctx context.Context) (buckets []BucketInfo, err error) {
|
func (z *erasureServerPools) ListBuckets(ctx context.Context) (buckets []BucketInfo, err error) {
|
||||||
if z.SingleZone() {
|
if z.SingleZone() {
|
||||||
buckets, err = z.serverSets[0].ListBuckets(ctx)
|
buckets, err = z.serverPools[0].ListBuckets(ctx)
|
||||||
} else {
|
} else {
|
||||||
for _, zone := range z.serverSets {
|
for _, zone := range z.serverPools {
|
||||||
buckets, err = zone.ListBuckets(ctx)
|
buckets, err = zone.ListBuckets(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.LogIf(ctx, err)
|
logger.LogIf(ctx, err)
|
||||||
@ -1108,7 +1108,7 @@ func (z *erasureServerSets) ListBuckets(ctx context.Context) (buckets []BucketIn
|
|||||||
return buckets, nil
|
return buckets, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerSets) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) {
|
func (z *erasureServerPools) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) {
|
||||||
// Acquire lock on format.json
|
// Acquire lock on format.json
|
||||||
formatLock := z.NewNSLock(minioMetaBucket, formatConfigFile)
|
formatLock := z.NewNSLock(minioMetaBucket, formatConfigFile)
|
||||||
if err := formatLock.GetLock(ctx, globalOperationTimeout); err != nil {
|
if err := formatLock.GetLock(ctx, globalOperationTimeout); err != nil {
|
||||||
@ -1122,13 +1122,13 @@ func (z *erasureServerSets) HealFormat(ctx context.Context, dryRun bool) (madmin
|
|||||||
}
|
}
|
||||||
|
|
||||||
var countNoHeal int
|
var countNoHeal int
|
||||||
for _, zone := range z.serverSets {
|
for _, zone := range z.serverPools {
|
||||||
result, err := zone.HealFormat(ctx, dryRun)
|
result, err := zone.HealFormat(ctx, dryRun)
|
||||||
if err != nil && !errors.Is(err, errNoHealRequired) {
|
if err != nil && !errors.Is(err, errNoHealRequired) {
|
||||||
logger.LogIf(ctx, err)
|
logger.LogIf(ctx, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Count errNoHealRequired across all serverSets,
|
// Count errNoHealRequired across all serverPools,
|
||||||
// to return appropriate error to the caller
|
// to return appropriate error to the caller
|
||||||
if errors.Is(err, errNoHealRequired) {
|
if errors.Is(err, errNoHealRequired) {
|
||||||
countNoHeal++
|
countNoHeal++
|
||||||
@ -1139,21 +1139,21 @@ func (z *erasureServerSets) HealFormat(ctx context.Context, dryRun bool) (madmin
|
|||||||
r.After.Drives = append(r.After.Drives, result.After.Drives...)
|
r.After.Drives = append(r.After.Drives, result.After.Drives...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// No heal returned by all serverSets, return errNoHealRequired
|
// No heal returned by all serverPools, return errNoHealRequired
|
||||||
if countNoHeal == len(z.serverSets) {
|
if countNoHeal == len(z.serverPools) {
|
||||||
return r, errNoHealRequired
|
return r, errNoHealRequired
|
||||||
}
|
}
|
||||||
|
|
||||||
return r, nil
|
return r, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerSets) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (madmin.HealResultItem, error) {
|
func (z *erasureServerPools) HealBucket(ctx context.Context, bucket string, dryRun, remove bool) (madmin.HealResultItem, error) {
|
||||||
var r = madmin.HealResultItem{
|
var r = madmin.HealResultItem{
|
||||||
Type: madmin.HealItemBucket,
|
Type: madmin.HealItemBucket,
|
||||||
Bucket: bucket,
|
Bucket: bucket,
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, zone := range z.serverSets {
|
for _, zone := range z.serverPools {
|
||||||
result, err := zone.HealBucket(ctx, bucket, dryRun, remove)
|
result, err := zone.HealBucket(ctx, bucket, dryRun, remove)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
switch err.(type) {
|
switch err.(type) {
|
||||||
@ -1175,7 +1175,7 @@ func (z *erasureServerSets) HealBucket(ctx context.Context, bucket string, dryRu
|
|||||||
// to allocate a receive channel for ObjectInfo, upon any unhandled
|
// to allocate a receive channel for ObjectInfo, upon any unhandled
|
||||||
// error walker returns error. Optionally if context.Done() is received
|
// error walker returns error. Optionally if context.Done() is received
|
||||||
// then Walk() stops the walker.
|
// then Walk() stops the walker.
|
||||||
func (z *erasureServerSets) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts ObjectOptions) error {
|
func (z *erasureServerPools) Walk(ctx context.Context, bucket, prefix string, results chan<- ObjectInfo, opts ObjectOptions) error {
|
||||||
if err := checkListObjsArgs(ctx, bucket, prefix, "", z); err != nil {
|
if err := checkListObjsArgs(ctx, bucket, prefix, "", z); err != nil {
|
||||||
// Upon error close the channel.
|
// Upon error close the channel.
|
||||||
close(results)
|
close(results)
|
||||||
@ -1236,12 +1236,12 @@ func (z *erasureServerSets) Walk(ctx context.Context, bucket, prefix string, res
|
|||||||
// HealObjectFn closure function heals the object.
|
// HealObjectFn closure function heals the object.
|
||||||
type HealObjectFn func(bucket, object, versionID string) error
|
type HealObjectFn func(bucket, object, versionID string) error
|
||||||
|
|
||||||
func (z *erasureServerSets) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObject HealObjectFn) error {
|
func (z *erasureServerPools) HealObjects(ctx context.Context, bucket, prefix string, opts madmin.HealOpts, healObject HealObjectFn) error {
|
||||||
// If listing did not return any entries upon first attempt, we
|
// If listing did not return any entries upon first attempt, we
|
||||||
// return `ObjectNotFound`, to indicate the caller for any
|
// return `ObjectNotFound`, to indicate the caller for any
|
||||||
// actions they may want to take as if `prefix` is missing.
|
// actions they may want to take as if `prefix` is missing.
|
||||||
err := toObjectErr(errFileNotFound, bucket, prefix)
|
err := toObjectErr(errFileNotFound, bucket, prefix)
|
||||||
for _, erasureSet := range z.serverSets {
|
for _, erasureSet := range z.serverPools {
|
||||||
for _, set := range erasureSet.sets {
|
for _, set := range erasureSet.sets {
|
||||||
var entryChs []FileInfoVersionsCh
|
var entryChs []FileInfoVersionsCh
|
||||||
var mu sync.Mutex
|
var mu sync.Mutex
|
||||||
@ -1294,7 +1294,7 @@ func (z *erasureServerSets) HealObjects(ctx context.Context, bucket, prefix stri
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerSets) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
|
func (z *erasureServerPools) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
|
||||||
object = encodeDirObject(object)
|
object = encodeDirObject(object)
|
||||||
|
|
||||||
lk := z.NewNSLock(bucket, object)
|
lk := z.NewNSLock(bucket, object)
|
||||||
@ -1313,7 +1313,7 @@ func (z *erasureServerSets) HealObject(ctx context.Context, bucket, object, vers
|
|||||||
defer lk.RUnlock()
|
defer lk.RUnlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, zone := range z.serverSets {
|
for _, zone := range z.serverPools {
|
||||||
result, err := zone.HealObject(ctx, bucket, object, versionID, opts)
|
result, err := zone.HealObject(ctx, bucket, object, versionID, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
||||||
@ -1336,9 +1336,9 @@ func (z *erasureServerSets) HealObject(ctx context.Context, bucket, object, vers
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerSets) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) {
|
func (z *erasureServerPools) ListBucketsHeal(ctx context.Context) ([]BucketInfo, error) {
|
||||||
var healBuckets []BucketInfo
|
var healBuckets []BucketInfo
|
||||||
for _, zone := range z.serverSets {
|
for _, zone := range z.serverPools {
|
||||||
bucketsInfo, err := zone.ListBucketsHeal(ctx)
|
bucketsInfo, err := zone.ListBucketsHeal(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
@ -1357,14 +1357,14 @@ func (z *erasureServerSets) ListBucketsHeal(ctx context.Context) ([]BucketInfo,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetMetrics - no op
|
// GetMetrics - no op
|
||||||
func (z *erasureServerSets) GetMetrics(ctx context.Context) (*Metrics, error) {
|
func (z *erasureServerPools) GetMetrics(ctx context.Context) (*Metrics, error) {
|
||||||
logger.LogIf(ctx, NotImplemented{})
|
logger.LogIf(ctx, NotImplemented{})
|
||||||
return &Metrics{}, NotImplemented{}
|
return &Metrics{}, NotImplemented{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *erasureServerSets) getZoneAndSet(id string) (int, int, error) {
|
func (z *erasureServerPools) getZoneAndSet(id string) (int, int, error) {
|
||||||
for zoneIdx := range z.serverSets {
|
for zoneIdx := range z.serverPools {
|
||||||
format := z.serverSets[zoneIdx].format
|
format := z.serverPools[zoneIdx].format
|
||||||
for setIdx, set := range format.Erasure.Sets {
|
for setIdx, set := range format.Erasure.Sets {
|
||||||
for _, diskID := range set {
|
for _, diskID := range set {
|
||||||
if diskID == id {
|
if diskID == id {
|
||||||
@ -1395,10 +1395,10 @@ type HealthResult struct {
|
|||||||
// provides if write access exists across sets, additionally
|
// provides if write access exists across sets, additionally
|
||||||
// can be used to query scenarios if health may be lost
|
// can be used to query scenarios if health may be lost
|
||||||
// if this node is taken down by an external orchestrator.
|
// if this node is taken down by an external orchestrator.
|
||||||
func (z *erasureServerSets) Health(ctx context.Context, opts HealthOptions) HealthResult {
|
func (z *erasureServerPools) Health(ctx context.Context, opts HealthOptions) HealthResult {
|
||||||
erasureSetUpCount := make([][]int, len(z.serverSets))
|
erasureSetUpCount := make([][]int, len(z.serverPools))
|
||||||
for i := range z.serverSets {
|
for i := range z.serverPools {
|
||||||
erasureSetUpCount[i] = make([]int, len(z.serverSets[i].sets))
|
erasureSetUpCount[i] = make([]int, len(z.serverPools[i].sets))
|
||||||
}
|
}
|
||||||
|
|
||||||
diskIDs := globalNotificationSys.GetLocalDiskIDs(ctx)
|
diskIDs := globalNotificationSys.GetLocalDiskIDs(ctx)
|
||||||
@ -1484,13 +1484,13 @@ func (z *erasureServerSets) Health(ctx context.Context, opts HealthOptions) Heal
|
|||||||
}
|
}
|
||||||
|
|
||||||
// PutObjectTags - replace or add tags to an existing object
|
// PutObjectTags - replace or add tags to an existing object
|
||||||
func (z *erasureServerSets) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) error {
|
func (z *erasureServerPools) PutObjectTags(ctx context.Context, bucket, object string, tags string, opts ObjectOptions) error {
|
||||||
object = encodeDirObject(object)
|
object = encodeDirObject(object)
|
||||||
if z.SingleZone() {
|
if z.SingleZone() {
|
||||||
return z.serverSets[0].PutObjectTags(ctx, bucket, object, tags, opts)
|
return z.serverPools[0].PutObjectTags(ctx, bucket, object, tags, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, zone := range z.serverSets {
|
for _, zone := range z.serverPools {
|
||||||
err := zone.PutObjectTags(ctx, bucket, object, tags, opts)
|
err := zone.PutObjectTags(ctx, bucket, object, tags, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
||||||
@ -1514,12 +1514,12 @@ func (z *erasureServerSets) PutObjectTags(ctx context.Context, bucket, object st
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DeleteObjectTags - delete object tags from an existing object
|
// DeleteObjectTags - delete object tags from an existing object
|
||||||
func (z *erasureServerSets) DeleteObjectTags(ctx context.Context, bucket, object string, opts ObjectOptions) error {
|
func (z *erasureServerPools) DeleteObjectTags(ctx context.Context, bucket, object string, opts ObjectOptions) error {
|
||||||
object = encodeDirObject(object)
|
object = encodeDirObject(object)
|
||||||
if z.SingleZone() {
|
if z.SingleZone() {
|
||||||
return z.serverSets[0].DeleteObjectTags(ctx, bucket, object, opts)
|
return z.serverPools[0].DeleteObjectTags(ctx, bucket, object, opts)
|
||||||
}
|
}
|
||||||
for _, zone := range z.serverSets {
|
for _, zone := range z.serverPools {
|
||||||
err := zone.DeleteObjectTags(ctx, bucket, object, opts)
|
err := zone.DeleteObjectTags(ctx, bucket, object, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
||||||
@ -1543,12 +1543,12 @@ func (z *erasureServerSets) DeleteObjectTags(ctx context.Context, bucket, object
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetObjectTags - get object tags from an existing object
|
// GetObjectTags - get object tags from an existing object
|
||||||
func (z *erasureServerSets) GetObjectTags(ctx context.Context, bucket, object string, opts ObjectOptions) (*tags.Tags, error) {
|
func (z *erasureServerPools) GetObjectTags(ctx context.Context, bucket, object string, opts ObjectOptions) (*tags.Tags, error) {
|
||||||
object = encodeDirObject(object)
|
object = encodeDirObject(object)
|
||||||
if z.SingleZone() {
|
if z.SingleZone() {
|
||||||
return z.serverSets[0].GetObjectTags(ctx, bucket, object, opts)
|
return z.serverPools[0].GetObjectTags(ctx, bucket, object, opts)
|
||||||
}
|
}
|
||||||
for _, zone := range z.serverSets {
|
for _, zone := range z.serverPools {
|
||||||
tags, err := zone.GetObjectTags(ctx, bucket, object, opts)
|
tags, err := zone.GetObjectTags(ctx, bucket, object, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
if isErrObjectNotFound(err) || isErrVersionNotFound(err) {
|
||||||
|
@ -422,7 +422,7 @@ func (s *erasureSets) SetDriveCount() int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// StorageUsageInfo - combines output of StorageInfo across all erasure coded object sets.
|
// StorageUsageInfo - combines output of StorageInfo across all erasure coded object sets.
|
||||||
// This only returns disk usage info for ServerSets to perform placement decision, this call
|
// This only returns disk usage info for ServerPools to perform placement decision, this call
|
||||||
// is not implemented in Object interface and is not meant to be used by other object
|
// is not implemented in Object interface and is not meant to be used by other object
|
||||||
// layer implementations.
|
// layer implementations.
|
||||||
func (s *erasureSets) StorageUsageInfo(ctx context.Context) StorageInfo {
|
func (s *erasureSets) StorageUsageInfo(ctx context.Context) StorageInfo {
|
||||||
|
@ -189,7 +189,7 @@ var (
|
|||||||
// registered listeners
|
// registered listeners
|
||||||
globalConsoleSys *HTTPConsoleLoggerSys
|
globalConsoleSys *HTTPConsoleLoggerSys
|
||||||
|
|
||||||
globalEndpoints EndpointServerSets
|
globalEndpoints EndpointServerPools
|
||||||
|
|
||||||
// Global server's network statistics
|
// Global server's network statistics
|
||||||
globalConnStats = newConnStats()
|
globalConnStats = newConnStats()
|
||||||
|
@ -61,10 +61,10 @@ func getLocalCPUInfo(ctx context.Context, r *http.Request) madmin.ServerCPUInfo
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func getLocalDrives(ctx context.Context, parallel bool, endpointServerSets EndpointServerSets, r *http.Request) madmin.ServerDrivesInfo {
|
func getLocalDrives(ctx context.Context, parallel bool, endpointServerPools EndpointServerPools, r *http.Request) madmin.ServerDrivesInfo {
|
||||||
var drivesPerfInfo []madmin.DrivePerfInfo
|
var drivesPerfInfo []madmin.DrivePerfInfo
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for _, ep := range endpointServerSets {
|
for _, ep := range endpointServerPools {
|
||||||
for _, endpoint := range ep.Endpoints {
|
for _, endpoint := range ep.Endpoints {
|
||||||
// Only proceed for local endpoints
|
// Only proceed for local endpoints
|
||||||
if endpoint.IsLocal {
|
if endpoint.IsLocal {
|
||||||
@ -105,7 +105,7 @@ func getLocalDrives(ctx context.Context, parallel bool, endpointServerSets Endpo
|
|||||||
|
|
||||||
addr := r.Host
|
addr := r.Host
|
||||||
if globalIsDistErasure {
|
if globalIsDistErasure {
|
||||||
addr = GetLocalPeer(endpointServerSets)
|
addr = GetLocalPeer(endpointServerPools)
|
||||||
}
|
}
|
||||||
if parallel {
|
if parallel {
|
||||||
return madmin.ServerDrivesInfo{
|
return madmin.ServerDrivesInfo{
|
||||||
|
@ -254,7 +254,7 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
z, ok := objAPI.(*erasureServerSets)
|
z, ok := objAPI.(*erasureServerPools)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -361,8 +361,8 @@ func startLockMaintenance(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// registerLockRESTHandlers - register lock rest router.
|
// registerLockRESTHandlers - register lock rest router.
|
||||||
func registerLockRESTHandlers(router *mux.Router, endpointServerSets EndpointServerSets) {
|
func registerLockRESTHandlers(router *mux.Router, endpointServerPools EndpointServerPools) {
|
||||||
for _, ep := range endpointServerSets {
|
for _, ep := range endpointServerPools {
|
||||||
for _, endpoint := range ep.Endpoints {
|
for _, endpoint := range ep.Endpoints {
|
||||||
if !endpoint.IsLocal {
|
if !endpoint.IsLocal {
|
||||||
continue
|
continue
|
||||||
|
@ -56,7 +56,7 @@ func newBucketMetacache(bucket string, cleanup bool) *bucketMetacache {
|
|||||||
if cleanup {
|
if cleanup {
|
||||||
// Recursively delete all caches.
|
// Recursively delete all caches.
|
||||||
objAPI := newObjectLayerFn()
|
objAPI := newObjectLayerFn()
|
||||||
ez, ok := objAPI.(*erasureServerSets)
|
ez, ok := objAPI.(*erasureServerPools)
|
||||||
if ok {
|
if ok {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(bucket, slashSeparator))
|
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(bucket, slashSeparator))
|
||||||
@ -410,7 +410,7 @@ func (b *bucketMetacache) deleteAll() {
|
|||||||
defer b.mu.Unlock()
|
defer b.mu.Unlock()
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
ez, ok := newObjectLayerFn().(*erasureServerSets)
|
ez, ok := newObjectLayerFn().(*erasureServerPools)
|
||||||
if !ok {
|
if !ok {
|
||||||
logger.LogIf(ctx, errors.New("bucketMetacache: expected objAPI to be *erasureZones"))
|
logger.LogIf(ctx, errors.New("bucketMetacache: expected objAPI to be *erasureZones"))
|
||||||
return
|
return
|
||||||
|
@ -34,7 +34,7 @@ import (
|
|||||||
// Required important fields are Bucket, Prefix, Separator.
|
// Required important fields are Bucket, Prefix, Separator.
|
||||||
// Other important fields are Limit, Marker.
|
// Other important fields are Limit, Marker.
|
||||||
// List ID always derived from the Marker.
|
// List ID always derived from the Marker.
|
||||||
func (z *erasureServerSets) listPath(ctx context.Context, o listPathOptions) (entries metaCacheEntriesSorted, err error) {
|
func (z *erasureServerPools) listPath(ctx context.Context, o listPathOptions) (entries metaCacheEntriesSorted, err error) {
|
||||||
if err := checkListObjsArgs(ctx, o.Bucket, o.Prefix, o.Marker, z); err != nil {
|
if err := checkListObjsArgs(ctx, o.Bucket, o.Prefix, o.Marker, z); err != nil {
|
||||||
return entries, err
|
return entries, err
|
||||||
}
|
}
|
||||||
@ -140,7 +140,7 @@ func (z *erasureServerSets) listPath(ctx context.Context, o listPathOptions) (en
|
|||||||
asked := 0
|
asked := 0
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
// Ask all sets and merge entries.
|
// Ask all sets and merge entries.
|
||||||
for _, zone := range z.serverSets {
|
for _, zone := range z.serverPools {
|
||||||
for _, set := range zone.sets {
|
for _, set := range zone.sets {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
asked++
|
asked++
|
||||||
|
@ -183,9 +183,9 @@ func (m *metacache) delete(ctx context.Context) {
|
|||||||
logger.LogIf(ctx, errors.New("metacache.delete: no object layer"))
|
logger.LogIf(ctx, errors.New("metacache.delete: no object layer"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ez, ok := objAPI.(*erasureServerSets)
|
ez, ok := objAPI.(*erasureServerPools)
|
||||||
if !ok {
|
if !ok {
|
||||||
logger.LogIf(ctx, errors.New("metacache.delete: expected objAPI to be *erasureServerSets"))
|
logger.LogIf(ctx, errors.New("metacache.delete: expected objAPI to be *erasureServerPools"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(m.bucket, m.id))
|
ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(m.bucket, m.id))
|
||||||
|
@ -1275,7 +1275,7 @@ func (sys *NotificationSys) restClientFromHash(s string) (client *peerRESTClient
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewNotificationSys - creates new notification system object.
|
// NewNotificationSys - creates new notification system object.
|
||||||
func NewNotificationSys(endpoints EndpointServerSets) *NotificationSys {
|
func NewNotificationSys(endpoints EndpointServerPools) *NotificationSys {
|
||||||
// targetList/bucketRulesMap/bucketRemoteTargetRulesMap are populated by NotificationSys.Init()
|
// targetList/bucketRulesMap/bucketRemoteTargetRulesMap are populated by NotificationSys.Init()
|
||||||
remote, all := newPeerRestClients(endpoints)
|
remote, all := newPeerRestClients(endpoints)
|
||||||
return &NotificationSys{
|
return &NotificationSys{
|
||||||
|
@ -837,7 +837,7 @@ func (client *peerRESTClient) ConsoleLog(logCh chan interface{}, doneCh <-chan s
|
|||||||
// The two slices will point to the same clients,
|
// The two slices will point to the same clients,
|
||||||
// but 'all' will contain nil entry for local client.
|
// but 'all' will contain nil entry for local client.
|
||||||
// The 'all' slice will be in the same order across the cluster.
|
// The 'all' slice will be in the same order across the cluster.
|
||||||
func newPeerRestClients(endpoints EndpointServerSets) (remote, all []*peerRESTClient) {
|
func newPeerRestClients(endpoints EndpointServerPools) (remote, all []*peerRESTClient) {
|
||||||
if !globalIsDistErasure {
|
if !globalIsDistErasure {
|
||||||
// Only useful in distributed setups
|
// Only useful in distributed setups
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
@ -674,11 +674,11 @@ func (s *peerRESTServer) PutBucketNotificationHandler(w http.ResponseWriter, r *
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Return disk IDs of all the local disks.
|
// Return disk IDs of all the local disks.
|
||||||
func getLocalDiskIDs(z *erasureServerSets) []string {
|
func getLocalDiskIDs(z *erasureServerPools) []string {
|
||||||
var ids []string
|
var ids []string
|
||||||
|
|
||||||
for zoneIdx := range z.serverSets {
|
for zoneIdx := range z.serverPools {
|
||||||
for _, set := range z.serverSets[zoneIdx].sets {
|
for _, set := range z.serverPools[zoneIdx].sets {
|
||||||
disks := set.getDisks()
|
disks := set.getDisks()
|
||||||
for _, disk := range disks {
|
for _, disk := range disks {
|
||||||
if disk == nil {
|
if disk == nil {
|
||||||
@ -723,7 +723,7 @@ func (s *peerRESTServer) GetLocalDiskIDs(w http.ResponseWriter, r *http.Request)
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
z, ok := objLayer.(*erasureServerSets)
|
z, ok := objLayer.(*erasureServerPools)
|
||||||
if !ok {
|
if !ok {
|
||||||
s.writeErrorResponse(w, errServerNotInitialized)
|
s.writeErrorResponse(w, errServerNotInitialized)
|
||||||
return
|
return
|
||||||
|
@ -23,9 +23,9 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// Composed function registering routers for only distributed Erasure setup.
|
// Composed function registering routers for only distributed Erasure setup.
|
||||||
func registerDistErasureRouters(router *mux.Router, endpointServerSets EndpointServerSets) {
|
func registerDistErasureRouters(router *mux.Router, endpointServerPools EndpointServerPools) {
|
||||||
// Register storage REST router only if its a distributed setup.
|
// Register storage REST router only if its a distributed setup.
|
||||||
registerStorageRESTHandlers(router, endpointServerSets)
|
registerStorageRESTHandlers(router, endpointServerPools)
|
||||||
|
|
||||||
// Register peer REST router only if its a distributed setup.
|
// Register peer REST router only if its a distributed setup.
|
||||||
registerPeerRESTHandlers(router)
|
registerPeerRESTHandlers(router)
|
||||||
@ -34,7 +34,7 @@ func registerDistErasureRouters(router *mux.Router, endpointServerSets EndpointS
|
|||||||
registerBootstrapRESTHandlers(router)
|
registerBootstrapRESTHandlers(router)
|
||||||
|
|
||||||
// Register distributed namespace lock routers.
|
// Register distributed namespace lock routers.
|
||||||
registerLockRESTHandlers(router, endpointServerSets)
|
registerLockRESTHandlers(router, endpointServerPools)
|
||||||
}
|
}
|
||||||
|
|
||||||
// List of some generic handlers which are applied for all incoming requests.
|
// List of some generic handlers which are applied for all incoming requests.
|
||||||
@ -83,14 +83,14 @@ var globalHandlers = []MiddlewareFunc{
|
|||||||
}
|
}
|
||||||
|
|
||||||
// configureServer handler returns final handler for the http server.
|
// configureServer handler returns final handler for the http server.
|
||||||
func configureServerHandler(endpointServerSets EndpointServerSets) (http.Handler, error) {
|
func configureServerHandler(endpointServerPools EndpointServerPools) (http.Handler, error) {
|
||||||
// Initialize router. `SkipClean(true)` stops gorilla/mux from
|
// Initialize router. `SkipClean(true)` stops gorilla/mux from
|
||||||
// normalizing URL path minio/minio#3256
|
// normalizing URL path minio/minio#3256
|
||||||
router := mux.NewRouter().SkipClean(true).UseEncodedPath()
|
router := mux.NewRouter().SkipClean(true).UseEncodedPath()
|
||||||
|
|
||||||
// Initialize distributed NS lock.
|
// Initialize distributed NS lock.
|
||||||
if globalIsDistErasure {
|
if globalIsDistErasure {
|
||||||
registerDistErasureRouters(router, endpointServerSets)
|
registerDistErasureRouters(router, endpointServerPools)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add STS router always.
|
// Add STS router always.
|
||||||
|
@ -524,12 +524,12 @@ func serverMain(ctx *cli.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Initialize object layer with the supplied disks, objectLayer is nil upon any error.
|
// Initialize object layer with the supplied disks, objectLayer is nil upon any error.
|
||||||
func newObjectLayer(ctx context.Context, endpointServerSets EndpointServerSets) (newObject ObjectLayer, err error) {
|
func newObjectLayer(ctx context.Context, endpointServerPools EndpointServerPools) (newObject ObjectLayer, err error) {
|
||||||
// For FS only, directly use the disk.
|
// For FS only, directly use the disk.
|
||||||
if endpointServerSets.NEndpoints() == 1 {
|
if endpointServerPools.NEndpoints() == 1 {
|
||||||
// Initialize new FS object layer.
|
// Initialize new FS object layer.
|
||||||
return NewFSObjectLayer(endpointServerSets[0].Endpoints[0].Path)
|
return NewFSObjectLayer(endpointServerPools[0].Endpoints[0].Path)
|
||||||
}
|
}
|
||||||
|
|
||||||
return newErasureServerSets(ctx, endpointServerSets)
|
return newErasureServerPools(ctx, endpointServerPools)
|
||||||
}
|
}
|
||||||
|
@ -58,7 +58,7 @@ func TestNewObjectLayer(t *testing.T) {
|
|||||||
t.Fatal("Unexpected object layer initialization error", err)
|
t.Fatal("Unexpected object layer initialization error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, ok = obj.(*erasureServerSets)
|
_, ok = obj.(*erasureServerPools)
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Fatal("Unexpected object layer detected", reflect.TypeOf(obj))
|
t.Fatal("Unexpected object layer detected", reflect.TypeOf(obj))
|
||||||
}
|
}
|
||||||
|
@ -1008,8 +1008,8 @@ func logFatalErrs(err error, endpoint Endpoint, exit bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// registerStorageRPCRouter - register storage rpc router.
|
// registerStorageRPCRouter - register storage rpc router.
|
||||||
func registerStorageRESTHandlers(router *mux.Router, endpointServerSets EndpointServerSets) {
|
func registerStorageRESTHandlers(router *mux.Router, endpointServerPools EndpointServerPools) {
|
||||||
for _, ep := range endpointServerSets {
|
for _, ep := range endpointServerPools {
|
||||||
for _, endpoint := range ep.Endpoints {
|
for _, endpoint := range ep.Endpoints {
|
||||||
if !endpoint.IsLocal {
|
if !endpoint.IsLocal {
|
||||||
continue
|
continue
|
||||||
|
@ -298,7 +298,7 @@ func isSameType(obj1, obj2 interface{}) bool {
|
|||||||
// defer s.Stop()
|
// defer s.Stop()
|
||||||
type TestServer struct {
|
type TestServer struct {
|
||||||
Root string
|
Root string
|
||||||
Disks EndpointServerSets
|
Disks EndpointServerPools
|
||||||
AccessKey string
|
AccessKey string
|
||||||
SecretKey string
|
SecretKey string
|
||||||
Server *httptest.Server
|
Server *httptest.Server
|
||||||
@ -415,7 +415,7 @@ func resetGlobalConfig() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func resetGlobalEndpoints() {
|
func resetGlobalEndpoints() {
|
||||||
globalEndpoints = EndpointServerSets{}
|
globalEndpoints = EndpointServerPools{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func resetGlobalIsErasure() {
|
func resetGlobalIsErasure() {
|
||||||
@ -1558,14 +1558,14 @@ func getRandomDisks(N int) ([]string, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Initialize object layer with the supplied disks, objectLayer is nil upon any error.
|
// Initialize object layer with the supplied disks, objectLayer is nil upon any error.
|
||||||
func newTestObjectLayer(ctx context.Context, endpointServerSets EndpointServerSets) (newObject ObjectLayer, err error) {
|
func newTestObjectLayer(ctx context.Context, endpointServerPools EndpointServerPools) (newObject ObjectLayer, err error) {
|
||||||
// For FS only, directly use the disk.
|
// For FS only, directly use the disk.
|
||||||
if endpointServerSets.NEndpoints() == 1 {
|
if endpointServerPools.NEndpoints() == 1 {
|
||||||
// Initialize new FS object layer.
|
// Initialize new FS object layer.
|
||||||
return NewFSObjectLayer(endpointServerSets[0].Endpoints[0].Path)
|
return NewFSObjectLayer(endpointServerPools[0].Endpoints[0].Path)
|
||||||
}
|
}
|
||||||
|
|
||||||
z, err := newErasureServerSets(ctx, endpointServerSets)
|
z, err := newErasureServerPools(ctx, endpointServerPools)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -1578,16 +1578,16 @@ func newTestObjectLayer(ctx context.Context, endpointServerSets EndpointServerSe
|
|||||||
}
|
}
|
||||||
|
|
||||||
// initObjectLayer - Instantiates object layer and returns it.
|
// initObjectLayer - Instantiates object layer and returns it.
|
||||||
func initObjectLayer(ctx context.Context, endpointServerSets EndpointServerSets) (ObjectLayer, []StorageAPI, error) {
|
func initObjectLayer(ctx context.Context, endpointServerPools EndpointServerPools) (ObjectLayer, []StorageAPI, error) {
|
||||||
objLayer, err := newTestObjectLayer(ctx, endpointServerSets)
|
objLayer, err := newTestObjectLayer(ctx, endpointServerPools)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var formattedDisks []StorageAPI
|
var formattedDisks []StorageAPI
|
||||||
// Should use the object layer tests for validating cache.
|
// Should use the object layer tests for validating cache.
|
||||||
if z, ok := objLayer.(*erasureServerSets); ok {
|
if z, ok := objLayer.(*erasureServerPools); ok {
|
||||||
formattedDisks = z.serverSets[0].GetDisks(0)()
|
formattedDisks = z.serverPools[0].GetDisks(0)()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Success.
|
// Success.
|
||||||
@ -2240,7 +2240,7 @@ func generateTLSCertKey(host string) ([]byte, []byte, error) {
|
|||||||
return certOut.Bytes(), keyOut.Bytes(), nil
|
return certOut.Bytes(), keyOut.Bytes(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func mustGetZoneEndpoints(args ...string) EndpointServerSets {
|
func mustGetZoneEndpoints(args ...string) EndpointServerPools {
|
||||||
endpoints := mustGetNewEndpoints(args...)
|
endpoints := mustGetNewEndpoints(args...)
|
||||||
drivesPerSet := len(args)
|
drivesPerSet := len(args)
|
||||||
setCount := 1
|
setCount := 1
|
||||||
@ -2261,8 +2261,8 @@ func mustGetNewEndpoints(args ...string) (endpoints Endpoints) {
|
|||||||
return endpoints
|
return endpoints
|
||||||
}
|
}
|
||||||
|
|
||||||
func getEndpointsLocalAddr(endpointServerSets EndpointServerSets) string {
|
func getEndpointsLocalAddr(endpointServerPools EndpointServerPools) string {
|
||||||
for _, endpoints := range endpointServerSets {
|
for _, endpoints := range endpointServerPools {
|
||||||
for _, endpoint := range endpoints.Endpoints {
|
for _, endpoint := range endpoints.Endpoints {
|
||||||
if endpoint.IsLocal && endpoint.Type() == URLEndpointType {
|
if endpoint.IsLocal && endpoint.Type() == URLEndpointType {
|
||||||
return endpoint.Host
|
return endpoint.Host
|
||||||
|
@ -1224,17 +1224,17 @@ func TestWebObjectLayerFaultyDisks(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Set faulty disks to Erasure backend
|
// Set faulty disks to Erasure backend
|
||||||
z := obj.(*erasureServerSets)
|
z := obj.(*erasureServerPools)
|
||||||
xl := z.serverSets[0].sets[0]
|
xl := z.serverPools[0].sets[0]
|
||||||
erasureDisks := xl.getDisks()
|
erasureDisks := xl.getDisks()
|
||||||
z.serverSets[0].erasureDisksMu.Lock()
|
z.serverPools[0].erasureDisksMu.Lock()
|
||||||
xl.getDisks = func() []StorageAPI {
|
xl.getDisks = func() []StorageAPI {
|
||||||
for i, d := range erasureDisks {
|
for i, d := range erasureDisks {
|
||||||
erasureDisks[i] = newNaughtyDisk(d, nil, errFaultyDisk)
|
erasureDisks[i] = newNaughtyDisk(d, nil, errFaultyDisk)
|
||||||
}
|
}
|
||||||
return erasureDisks
|
return erasureDisks
|
||||||
}
|
}
|
||||||
z.serverSets[0].erasureDisksMu.Unlock()
|
z.serverPools[0].erasureDisksMu.Unlock()
|
||||||
|
|
||||||
// Initialize web rpc endpoint.
|
// Initialize web rpc endpoint.
|
||||||
apiRouter := initTestWebRPCEndPoint(obj)
|
apiRouter := initTestWebRPCEndPoint(obj)
|
||||||
|
@ -112,19 +112,19 @@ In above example there are two server sets
|
|||||||
MinIO places new objects in server sets based on proportionate free space, per zone. Following pseudo code demonstrates this behavior.
|
MinIO places new objects in server sets based on proportionate free space, per zone. Following pseudo code demonstrates this behavior.
|
||||||
```go
|
```go
|
||||||
func getAvailableZoneIdx(ctx context.Context) int {
|
func getAvailableZoneIdx(ctx context.Context) int {
|
||||||
serverSets := z.getServerSetsAvailableSpace(ctx)
|
serverPools := z.getServerPoolsAvailableSpace(ctx)
|
||||||
total := serverSets.TotalAvailable()
|
total := serverPools.TotalAvailable()
|
||||||
// choose when we reach this many
|
// choose when we reach this many
|
||||||
choose := rand.Uint64() % total
|
choose := rand.Uint64() % total
|
||||||
atTotal := uint64(0)
|
atTotal := uint64(0)
|
||||||
for _, zone := range serverSets {
|
for _, zone := range serverPools {
|
||||||
atTotal += zone.Available
|
atTotal += zone.Available
|
||||||
if atTotal > choose && zone.Available > 0 {
|
if atTotal > choose && zone.Available > 0 {
|
||||||
return zone.Index
|
return zone.Index
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Should not happen, but print values just in case.
|
// Should not happen, but print values just in case.
|
||||||
panic(fmt.Errorf("reached end of serverSets (total: %v, atTotal: %v, choose: %v)", total, atTotal, choose))
|
panic(fmt.Errorf("reached end of serverPools (total: %v, atTotal: %v, choose: %v)", total, atTotal, choose))
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
@ -113,19 +113,19 @@ minio server http://host{1...32}/export{1...32} http://host{5...6}/export{1...8}
|
|||||||
MinIO根据每个区域的可用空间比例将新对象放置在区域中。以下伪代码演示了此行为。
|
MinIO根据每个区域的可用空间比例将新对象放置在区域中。以下伪代码演示了此行为。
|
||||||
```go
|
```go
|
||||||
func getAvailableZoneIdx(ctx context.Context) int {
|
func getAvailableZoneIdx(ctx context.Context) int {
|
||||||
serverSets := z.getServerSetsAvailableSpace(ctx)
|
serverPools := z.getServerPoolsAvailableSpace(ctx)
|
||||||
total := serverSets.TotalAvailable()
|
total := serverPools.TotalAvailable()
|
||||||
// choose when we reach this many
|
// choose when we reach this many
|
||||||
choose := rand.Uint64() % total
|
choose := rand.Uint64() % total
|
||||||
atTotal := uint64(0)
|
atTotal := uint64(0)
|
||||||
for _, zone := range serverSets {
|
for _, zone := range serverPools {
|
||||||
atTotal += zone.Available
|
atTotal += zone.Available
|
||||||
if atTotal > choose && zone.Available > 0 {
|
if atTotal > choose && zone.Available > 0 {
|
||||||
return zone.Index
|
return zone.Index
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Should not happen, but print values just in case.
|
// Should not happen, but print values just in case.
|
||||||
panic(fmt.Errorf("reached end of serverSets (total: %v, atTotal: %v, choose: %v)", total, atTotal, choose))
|
panic(fmt.Errorf("reached end of serverPools (total: %v, atTotal: %v, choose: %v)", total, atTotal, choose))
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user