mirror of
https://github.com/minio/minio.git
synced 2025-11-07 21:02:58 -05:00
rename zones to serverSets to avoid terminology conflict (#10679)
we are bringing in availability zones, we should avoid zones as per server expansion concept.
This commit is contained in:
@@ -105,8 +105,8 @@ func initTestErasureObjLayer(ctx context.Context) (ObjectLayer, []string, error)
|
||||
}
|
||||
|
||||
globalPolicySys = NewPolicySys()
|
||||
objLayer := &erasureZones{zones: make([]*erasureSets, 1)}
|
||||
objLayer.zones[0], err = newErasureSets(ctx, endpoints, storageDisks, format)
|
||||
objLayer := &erasureServerSets{serverSets: make([]*erasureSets, 1)}
|
||||
objLayer.serverSets[0], err = newErasureSets(ctx, endpoints, storageDisks, format)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
@@ -24,13 +24,13 @@ import (
|
||||
|
||||
// getLocalServerProperty - returns madmin.ServerProperties for only the
|
||||
// local endpoints from given list of endpoints
|
||||
func getLocalServerProperty(endpointZones EndpointZones, r *http.Request) madmin.ServerProperties {
|
||||
func getLocalServerProperty(endpointServerSets EndpointServerSets, r *http.Request) madmin.ServerProperties {
|
||||
addr := r.Host
|
||||
if globalIsDistErasure {
|
||||
addr = GetLocalPeer(endpointZones)
|
||||
addr = GetLocalPeer(endpointServerSets)
|
||||
}
|
||||
network := make(map[string]string)
|
||||
for _, ep := range endpointZones {
|
||||
for _, ep := range endpointServerSets {
|
||||
for _, endpoint := range ep.Endpoints {
|
||||
nodeName := endpoint.Host
|
||||
if nodeName == "" {
|
||||
|
||||
@@ -39,7 +39,7 @@ type healingTracker struct {
|
||||
}
|
||||
|
||||
func initAutoHeal(ctx context.Context, objAPI ObjectLayer) {
|
||||
z, ok := objAPI.(*erasureZones)
|
||||
z, ok := objAPI.(*erasureServerSets)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
@@ -107,7 +107,7 @@ func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) {
|
||||
// monitorLocalDisksAndHeal - ensures that detected new disks are healed
|
||||
// 1. Only the concerned erasure set will be listed and healed
|
||||
// 2. Only the node hosting the disk is responsible to perform the heal
|
||||
func monitorLocalDisksAndHeal(ctx context.Context, z *erasureZones, bgSeq *healSequence) {
|
||||
func monitorLocalDisksAndHeal(ctx context.Context, z *erasureServerSets, bgSeq *healSequence) {
|
||||
// Perform automatic disk healing when a disk is replaced locally.
|
||||
for {
|
||||
select {
|
||||
@@ -129,8 +129,8 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureZones, bgSeq *healS
|
||||
logger.Info(fmt.Sprintf("Found drives to heal %d, proceeding to heal content...",
|
||||
len(healDisks)))
|
||||
|
||||
erasureSetInZoneDisksToHeal = make([]map[int][]StorageAPI, len(z.zones))
|
||||
for i := range z.zones {
|
||||
erasureSetInZoneDisksToHeal = make([]map[int][]StorageAPI, len(z.serverSets))
|
||||
for i := range z.serverSets {
|
||||
erasureSetInZoneDisksToHeal[i] = map[int][]StorageAPI{}
|
||||
}
|
||||
}
|
||||
@@ -149,7 +149,7 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureZones, bgSeq *healS
|
||||
}
|
||||
|
||||
// Calculate the set index where the current endpoint belongs
|
||||
setIndex, _, err := findDiskIndex(z.zones[zoneIdx].format, format)
|
||||
setIndex, _, err := findDiskIndex(z.serverSets[zoneIdx].format, format)
|
||||
if err != nil {
|
||||
printEndpointError(endpoint, err, false)
|
||||
continue
|
||||
@@ -164,7 +164,7 @@ func monitorLocalDisksAndHeal(ctx context.Context, z *erasureZones, bgSeq *healS
|
||||
for _, disk := range disks {
|
||||
logger.Info("Healing disk '%s' on %s zone", disk, humanize.Ordinal(i+1))
|
||||
|
||||
lbDisks := z.zones[i].sets[setIndex].getOnlineDisks()
|
||||
lbDisks := z.serverSets[i].sets[setIndex].getOnlineDisks()
|
||||
if err := healErasureSet(ctx, setIndex, buckets, lbDisks); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
continue
|
||||
|
||||
@@ -54,7 +54,7 @@ type bootstrapRESTServer struct{}
|
||||
type ServerSystemConfig struct {
|
||||
MinioPlatform string
|
||||
MinioRuntime string
|
||||
MinioEndpoints EndpointZones
|
||||
MinioEndpoints EndpointServerSets
|
||||
}
|
||||
|
||||
// Diff - returns error on first difference found in two configs.
|
||||
@@ -161,9 +161,9 @@ func (client *bootstrapRESTClient) Verify(ctx context.Context, srcCfg ServerSyst
|
||||
return srcCfg.Diff(recvCfg)
|
||||
}
|
||||
|
||||
func verifyServerSystemConfig(ctx context.Context, endpointZones EndpointZones) error {
|
||||
func verifyServerSystemConfig(ctx context.Context, endpointServerSets EndpointServerSets) error {
|
||||
srcCfg := getServerSystemCfg()
|
||||
clnts := newBootstrapRESTClients(endpointZones)
|
||||
clnts := newBootstrapRESTClients(endpointServerSets)
|
||||
var onlineServers int
|
||||
var offlineEndpoints []string
|
||||
var retries int
|
||||
@@ -198,10 +198,10 @@ func verifyServerSystemConfig(ctx context.Context, endpointZones EndpointZones)
|
||||
return nil
|
||||
}
|
||||
|
||||
func newBootstrapRESTClients(endpointZones EndpointZones) []*bootstrapRESTClient {
|
||||
func newBootstrapRESTClients(endpointServerSets EndpointServerSets) []*bootstrapRESTClient {
|
||||
seenHosts := set.NewStringSet()
|
||||
var clnts []*bootstrapRESTClient
|
||||
for _, ep := range endpointZones {
|
||||
for _, ep := range endpointServerSets {
|
||||
for _, endpoint := range ep.Endpoints {
|
||||
if seenHosts.Contains(endpoint.Host) {
|
||||
continue
|
||||
|
||||
@@ -40,8 +40,8 @@ type HTTPConsoleLoggerSys struct {
|
||||
logBuf *ring.Ring
|
||||
}
|
||||
|
||||
func mustGetNodeName(endpointZones EndpointZones) (nodeName string) {
|
||||
host, err := xnet.ParseHost(GetLocalPeer(endpointZones))
|
||||
func mustGetNodeName(endpointServerSets EndpointServerSets) (nodeName string) {
|
||||
host, err := xnet.ParseHost(GetLocalPeer(endpointServerSets))
|
||||
if err != nil {
|
||||
logger.FatalIf(err, "Unable to start console logging subsystem")
|
||||
}
|
||||
@@ -63,8 +63,8 @@ func NewConsoleLogger(ctx context.Context) *HTTPConsoleLoggerSys {
|
||||
}
|
||||
|
||||
// SetNodeName - sets the node name if any after distributed setup has initialized
|
||||
func (sys *HTTPConsoleLoggerSys) SetNodeName(endpointZones EndpointZones) {
|
||||
sys.nodeName = mustGetNodeName(endpointZones)
|
||||
func (sys *HTTPConsoleLoggerSys) SetNodeName(endpointServerSets EndpointServerSets) {
|
||||
sys.nodeName = mustGetNodeName(endpointServerSets)
|
||||
}
|
||||
|
||||
// HasLogListeners returns true if console log listeners are registered
|
||||
|
||||
@@ -329,7 +329,7 @@ var (
|
||||
// CreateServerEndpoints - validates and creates new endpoints from input args, supports
|
||||
// both ellipses and without ellipses transparently.
|
||||
func createServerEndpoints(serverAddr string, args ...string) (
|
||||
endpointZones EndpointZones, setupType SetupType, err error) {
|
||||
endpointServerSets EndpointServerSets, setupType SetupType, err error) {
|
||||
|
||||
if len(args) == 0 {
|
||||
return nil, -1, errInvalidArgument
|
||||
@@ -352,13 +352,13 @@ func createServerEndpoints(serverAddr string, args ...string) (
|
||||
if err != nil {
|
||||
return nil, -1, err
|
||||
}
|
||||
endpointZones = append(endpointZones, ZoneEndpoints{
|
||||
endpointServerSets = append(endpointServerSets, ZoneEndpoints{
|
||||
SetCount: len(setArgs),
|
||||
DrivesPerSet: len(setArgs[0]),
|
||||
Endpoints: endpointList,
|
||||
})
|
||||
setupType = newSetupType
|
||||
return endpointZones, setupType, nil
|
||||
return endpointServerSets, setupType, nil
|
||||
}
|
||||
|
||||
var prevSetupType SetupType
|
||||
@@ -374,12 +374,12 @@ func createServerEndpoints(serverAddr string, args ...string) (
|
||||
return nil, -1, err
|
||||
}
|
||||
if setDriveCount != 0 && setDriveCount != len(setArgs[0]) {
|
||||
return nil, -1, fmt.Errorf("All zones should have same drive per set ratio - expected %d, got %d", setDriveCount, len(setArgs[0]))
|
||||
return nil, -1, fmt.Errorf("All serverSets should have same drive per set ratio - expected %d, got %d", setDriveCount, len(setArgs[0]))
|
||||
}
|
||||
if prevSetupType != UnknownSetupType && prevSetupType != setupType {
|
||||
return nil, -1, fmt.Errorf("All zones should be of the same setup-type to maintain the original SLA expectations - expected %s, got %s", prevSetupType, setupType)
|
||||
return nil, -1, fmt.Errorf("All serverSets should be of the same setup-type to maintain the original SLA expectations - expected %s, got %s", prevSetupType, setupType)
|
||||
}
|
||||
if err = endpointZones.Add(ZoneEndpoints{
|
||||
if err = endpointServerSets.Add(ZoneEndpoints{
|
||||
SetCount: len(setArgs),
|
||||
DrivesPerSet: len(setArgs[0]),
|
||||
Endpoints: endpointList,
|
||||
@@ -393,5 +393,5 @@ func createServerEndpoints(serverAddr string, args ...string) (
|
||||
prevSetupType = setupType
|
||||
}
|
||||
|
||||
return endpointZones, setupType, nil
|
||||
return endpointServerSets, setupType, nil
|
||||
}
|
||||
|
||||
@@ -201,12 +201,12 @@ type ZoneEndpoints struct {
|
||||
Endpoints Endpoints
|
||||
}
|
||||
|
||||
// EndpointZones - list of list of endpoints
|
||||
type EndpointZones []ZoneEndpoints
|
||||
// EndpointServerSets - list of list of endpoints
|
||||
type EndpointServerSets []ZoneEndpoints
|
||||
|
||||
// GetLocalZoneIdx returns the zone which endpoint belongs to locally.
|
||||
// if ep is remote this code will return -1 zoneIndex
|
||||
func (l EndpointZones) GetLocalZoneIdx(ep Endpoint) int {
|
||||
func (l EndpointServerSets) GetLocalZoneIdx(ep Endpoint) int {
|
||||
for i, zep := range l {
|
||||
for _, cep := range zep.Endpoints {
|
||||
if cep.IsLocal && ep.IsLocal {
|
||||
@@ -220,14 +220,14 @@ func (l EndpointZones) GetLocalZoneIdx(ep Endpoint) int {
|
||||
}
|
||||
|
||||
// Add add zone endpoints
|
||||
func (l *EndpointZones) Add(zeps ZoneEndpoints) error {
|
||||
func (l *EndpointServerSets) Add(zeps ZoneEndpoints) error {
|
||||
existSet := set.NewStringSet()
|
||||
for _, zep := range *l {
|
||||
for _, ep := range zep.Endpoints {
|
||||
existSet.Add(ep.String())
|
||||
}
|
||||
}
|
||||
// Validate if there are duplicate endpoints across zones
|
||||
// Validate if there are duplicate endpoints across serverSets
|
||||
for _, ep := range zeps.Endpoints {
|
||||
if existSet.Contains(ep.String()) {
|
||||
return fmt.Errorf("duplicate endpoints found")
|
||||
@@ -238,17 +238,17 @@ func (l *EndpointZones) Add(zeps ZoneEndpoints) error {
|
||||
}
|
||||
|
||||
// FirstLocal returns true if the first endpoint is local.
|
||||
func (l EndpointZones) FirstLocal() bool {
|
||||
func (l EndpointServerSets) FirstLocal() bool {
|
||||
return l[0].Endpoints[0].IsLocal
|
||||
}
|
||||
|
||||
// HTTPS - returns true if secure for URLEndpointType.
|
||||
func (l EndpointZones) HTTPS() bool {
|
||||
func (l EndpointServerSets) HTTPS() bool {
|
||||
return l[0].Endpoints.HTTPS()
|
||||
}
|
||||
|
||||
// NEndpoints - returns all nodes count
|
||||
func (l EndpointZones) NEndpoints() (count int) {
|
||||
func (l EndpointServerSets) NEndpoints() (count int) {
|
||||
for _, ep := range l {
|
||||
count += len(ep.Endpoints)
|
||||
}
|
||||
@@ -256,7 +256,7 @@ func (l EndpointZones) NEndpoints() (count int) {
|
||||
}
|
||||
|
||||
// Hostnames - returns list of unique hostnames
|
||||
func (l EndpointZones) Hostnames() []string {
|
||||
func (l EndpointServerSets) Hostnames() []string {
|
||||
foundSet := set.NewStringSet()
|
||||
for _, ep := range l {
|
||||
for _, endpoint := range ep.Endpoints {
|
||||
@@ -688,9 +688,9 @@ func CreateEndpoints(serverAddr string, foundLocal bool, args ...[]string) (Endp
|
||||
// the first element from the set of peers which indicate that
|
||||
// they are local. There is always one entry that is local
|
||||
// even with repeated server endpoints.
|
||||
func GetLocalPeer(endpointZones EndpointZones) (localPeer string) {
|
||||
func GetLocalPeer(endpointServerSets EndpointServerSets) (localPeer string) {
|
||||
peerSet := set.NewStringSet()
|
||||
for _, ep := range endpointZones {
|
||||
for _, ep := range endpointServerSets {
|
||||
for _, endpoint := range ep.Endpoints {
|
||||
if endpoint.Type() != URLEndpointType {
|
||||
continue
|
||||
@@ -713,9 +713,9 @@ func GetLocalPeer(endpointZones EndpointZones) (localPeer string) {
|
||||
}
|
||||
|
||||
// GetRemotePeers - get hosts information other than this minio service.
|
||||
func GetRemotePeers(endpointZones EndpointZones) []string {
|
||||
func GetRemotePeers(endpointServerSets EndpointServerSets) []string {
|
||||
peerSet := set.NewStringSet()
|
||||
for _, ep := range endpointZones {
|
||||
for _, ep := range endpointServerSets {
|
||||
for _, endpoint := range ep.Endpoints {
|
||||
if endpoint.Type() != URLEndpointType {
|
||||
continue
|
||||
@@ -745,12 +745,12 @@ func GetProxyEndpointLocalIndex(proxyEps []ProxyEndpoint) int {
|
||||
}
|
||||
|
||||
// GetProxyEndpoints - get all endpoints that can be used to proxy list request.
|
||||
func GetProxyEndpoints(endpointZones EndpointZones) ([]ProxyEndpoint, error) {
|
||||
func GetProxyEndpoints(endpointServerSets EndpointServerSets) ([]ProxyEndpoint, error) {
|
||||
var proxyEps []ProxyEndpoint
|
||||
|
||||
proxyEpSet := set.NewStringSet()
|
||||
|
||||
for _, ep := range endpointZones {
|
||||
for _, ep := range endpointServerSets {
|
||||
for _, endpoint := range ep.Endpoints {
|
||||
if endpoint.Type() != URLEndpointType {
|
||||
continue
|
||||
|
||||
@@ -55,8 +55,8 @@ func TestErasureParentDirIsObject(t *testing.T) {
|
||||
t.Fatalf("Unexpected object name returned got %s, expected %s", objInfo.Name, objectName)
|
||||
}
|
||||
|
||||
z := obj.(*erasureZones)
|
||||
xl := z.zones[0].sets[0]
|
||||
z := obj.(*erasureServerSets)
|
||||
xl := z.serverSets[0].sets[0]
|
||||
testCases := []struct {
|
||||
parentIsObject bool
|
||||
objectName string
|
||||
|
||||
@@ -178,8 +178,8 @@ func TestListOnlineDisks(t *testing.T) {
|
||||
|
||||
object := "object"
|
||||
data := bytes.Repeat([]byte("a"), 1024)
|
||||
z := obj.(*erasureZones)
|
||||
erasureDisks := z.zones[0].sets[0].getDisks()
|
||||
z := obj.(*erasureServerSets)
|
||||
erasureDisks := z.serverSets[0].sets[0].getDisks()
|
||||
for i, test := range testCases {
|
||||
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader(data), int64(len(data)), "", ""), ObjectOptions{})
|
||||
if err != nil {
|
||||
@@ -274,8 +274,8 @@ func TestDisksWithAllParts(t *testing.T) {
|
||||
// make data with more than one part
|
||||
partCount := 3
|
||||
data := bytes.Repeat([]byte("a"), 6*1024*1024*partCount)
|
||||
z := obj.(*erasureZones)
|
||||
s := z.zones[0].sets[0]
|
||||
z := obj.(*erasureServerSets)
|
||||
s := z.serverSets[0].sets[0]
|
||||
erasureDisks := s.getDisks()
|
||||
err = obj.MakeBucketWithLocation(ctx, "bucket", BucketOptions{})
|
||||
if err != nil {
|
||||
|
||||
@@ -42,8 +42,8 @@ func TestHealing(t *testing.T) {
|
||||
defer obj.Shutdown(context.Background())
|
||||
defer removeRoots(fsDirs)
|
||||
|
||||
z := obj.(*erasureZones)
|
||||
er := z.zones[0].sets[0]
|
||||
z := obj.(*erasureServerSets)
|
||||
er := z.serverSets[0].sets[0]
|
||||
|
||||
// Create "bucket"
|
||||
err = obj.MakeBucketWithLocation(ctx, "bucket", BucketOptions{})
|
||||
@@ -197,8 +197,8 @@ func TestHealObjectCorrupted(t *testing.T) {
|
||||
}
|
||||
|
||||
// Test 1: Remove the object backend files from the first disk.
|
||||
z := objLayer.(*erasureZones)
|
||||
er := z.zones[0].sets[0]
|
||||
z := objLayer.(*erasureServerSets)
|
||||
er := z.serverSets[0].sets[0]
|
||||
erasureDisks := er.getDisks()
|
||||
firstDisk := erasureDisks[0]
|
||||
err = firstDisk.DeleteFile(context.Background(), bucket, pathJoin(object, xlStorageFormatFile))
|
||||
@@ -342,8 +342,8 @@ func TestHealObjectErasure(t *testing.T) {
|
||||
}
|
||||
|
||||
// Remove the object backend files from the first disk.
|
||||
z := obj.(*erasureZones)
|
||||
er := z.zones[0].sets[0]
|
||||
z := obj.(*erasureServerSets)
|
||||
er := z.serverSets[0].sets[0]
|
||||
firstDisk := er.getDisks()[0]
|
||||
|
||||
_, err = obj.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, ObjectOptions{})
|
||||
@@ -366,7 +366,7 @@ func TestHealObjectErasure(t *testing.T) {
|
||||
}
|
||||
|
||||
erasureDisks := er.getDisks()
|
||||
z.zones[0].erasureDisksMu.Lock()
|
||||
z.serverSets[0].erasureDisksMu.Lock()
|
||||
er.getDisks = func() []StorageAPI {
|
||||
// Nil more than half the disks, to remove write quorum.
|
||||
for i := 0; i <= len(erasureDisks)/2; i++ {
|
||||
@@ -374,7 +374,7 @@ func TestHealObjectErasure(t *testing.T) {
|
||||
}
|
||||
return erasureDisks
|
||||
}
|
||||
z.zones[0].erasureDisksMu.Unlock()
|
||||
z.serverSets[0].erasureDisksMu.Unlock()
|
||||
|
||||
// Try healing now, expect to receive errDiskNotFound.
|
||||
_, err = obj.HealObject(ctx, bucket, object, "", madmin.HealOpts{ScanMode: madmin.HealDeepScan})
|
||||
@@ -419,8 +419,8 @@ func TestHealEmptyDirectoryErasure(t *testing.T) {
|
||||
}
|
||||
|
||||
// Remove the object backend files from the first disk.
|
||||
z := obj.(*erasureZones)
|
||||
er := z.zones[0].sets[0]
|
||||
z := obj.(*erasureServerSets)
|
||||
er := z.serverSets[0].sets[0]
|
||||
firstDisk := er.getDisks()[0]
|
||||
err = firstDisk.DeleteVol(context.Background(), pathJoin(bucket, encodeDirObject(object)), true)
|
||||
if err != nil {
|
||||
|
||||
@@ -148,13 +148,13 @@ func TestShuffleDisks(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer removeRoots(disks)
|
||||
z := objLayer.(*erasureZones)
|
||||
z := objLayer.(*erasureServerSets)
|
||||
testShuffleDisks(t, z)
|
||||
}
|
||||
|
||||
// Test shuffleDisks which returns shuffled slice of disks for their actual distribution.
|
||||
func testShuffleDisks(t *testing.T, z *erasureZones) {
|
||||
disks := z.zones[0].GetDisks(0)()
|
||||
func testShuffleDisks(t *testing.T, z *erasureServerSets) {
|
||||
disks := z.serverSets[0].GetDisks(0)()
|
||||
distribution := []int{16, 14, 12, 10, 8, 6, 4, 2, 1, 3, 5, 7, 9, 11, 13, 15}
|
||||
shuffledDisks := shuffleDisks(disks, distribution)
|
||||
// From the "distribution" above you can notice that:
|
||||
@@ -196,6 +196,6 @@ func TestEvalDisks(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer removeRoots(disks)
|
||||
z := objLayer.(*erasureZones)
|
||||
z := objLayer.(*erasureServerSets)
|
||||
testShuffleDisks(t, z)
|
||||
}
|
||||
|
||||
@@ -132,8 +132,8 @@ func TestErasureDeleteObjectsErasureSet(t *testing.T) {
|
||||
for _, dir := range fsDirs {
|
||||
defer os.RemoveAll(dir)
|
||||
}
|
||||
z := obj.(*erasureZones)
|
||||
xl := z.zones[0].sets[0]
|
||||
z := obj.(*erasureServerSets)
|
||||
xl := z.serverSets[0].sets[0]
|
||||
objs = append(objs, xl)
|
||||
}
|
||||
|
||||
@@ -205,8 +205,8 @@ func TestErasureDeleteObjectDiskNotFound(t *testing.T) {
|
||||
defer obj.Shutdown(context.Background())
|
||||
defer removeRoots(fsDirs)
|
||||
|
||||
z := obj.(*erasureZones)
|
||||
xl := z.zones[0].sets[0]
|
||||
z := obj.(*erasureServerSets)
|
||||
xl := z.serverSets[0].sets[0]
|
||||
|
||||
// Create "bucket"
|
||||
err = obj.MakeBucketWithLocation(ctx, "bucket", BucketOptions{})
|
||||
@@ -225,7 +225,7 @@ func TestErasureDeleteObjectDiskNotFound(t *testing.T) {
|
||||
// for a 16 disk setup, quorum is 9. To simulate disks not found yet
|
||||
// quorum is available, we remove disks leaving quorum disks behind.
|
||||
erasureDisks := xl.getDisks()
|
||||
z.zones[0].erasureDisksMu.Lock()
|
||||
z.serverSets[0].erasureDisksMu.Lock()
|
||||
xl.getDisks = func() []StorageAPI {
|
||||
for i := range erasureDisks[:7] {
|
||||
erasureDisks[i] = newNaughtyDisk(erasureDisks[i], nil, errFaultyDisk)
|
||||
@@ -233,7 +233,7 @@ func TestErasureDeleteObjectDiskNotFound(t *testing.T) {
|
||||
return erasureDisks
|
||||
}
|
||||
|
||||
z.zones[0].erasureDisksMu.Unlock()
|
||||
z.serverSets[0].erasureDisksMu.Unlock()
|
||||
_, err = obj.DeleteObject(ctx, bucket, object, ObjectOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -247,14 +247,14 @@ func TestErasureDeleteObjectDiskNotFound(t *testing.T) {
|
||||
|
||||
// Remove one more disk to 'lose' quorum, by setting it to nil.
|
||||
erasureDisks = xl.getDisks()
|
||||
z.zones[0].erasureDisksMu.Lock()
|
||||
z.serverSets[0].erasureDisksMu.Lock()
|
||||
xl.getDisks = func() []StorageAPI {
|
||||
erasureDisks[7] = nil
|
||||
erasureDisks[8] = nil
|
||||
return erasureDisks
|
||||
}
|
||||
|
||||
z.zones[0].erasureDisksMu.Unlock()
|
||||
z.serverSets[0].erasureDisksMu.Unlock()
|
||||
_, err = obj.DeleteObject(ctx, bucket, object, ObjectOptions{})
|
||||
// since majority of disks are not available, metaquorum is not achieved and hence errErasureWriteQuorum error
|
||||
if err != toObjectErr(errErasureWriteQuorum, bucket, object) {
|
||||
@@ -275,8 +275,8 @@ func TestGetObjectNoQuorum(t *testing.T) {
|
||||
defer obj.Shutdown(context.Background())
|
||||
defer removeRoots(fsDirs)
|
||||
|
||||
z := obj.(*erasureZones)
|
||||
xl := z.zones[0].sets[0]
|
||||
z := obj.(*erasureServerSets)
|
||||
xl := z.serverSets[0].sets[0]
|
||||
|
||||
// Create "bucket"
|
||||
err = obj.MakeBucketWithLocation(ctx, "bucket", BucketOptions{})
|
||||
@@ -311,11 +311,11 @@ func TestGetObjectNoQuorum(t *testing.T) {
|
||||
erasureDisks[i] = newNaughtyDisk(erasureDisks[i], diskErrors, errFaultyDisk)
|
||||
}
|
||||
}
|
||||
z.zones[0].erasureDisksMu.Lock()
|
||||
z.serverSets[0].erasureDisksMu.Lock()
|
||||
xl.getDisks = func() []StorageAPI {
|
||||
return erasureDisks
|
||||
}
|
||||
z.zones[0].erasureDisksMu.Unlock()
|
||||
z.serverSets[0].erasureDisksMu.Unlock()
|
||||
// Fetch object from store.
|
||||
err = xl.GetObject(ctx, bucket, object, 0, int64(len("abcd")), ioutil.Discard, "", opts)
|
||||
if err != toObjectErr(errErasureReadQuorum, bucket, object) {
|
||||
@@ -338,8 +338,8 @@ func TestPutObjectNoQuorum(t *testing.T) {
|
||||
defer obj.Shutdown(context.Background())
|
||||
defer removeRoots(fsDirs)
|
||||
|
||||
z := obj.(*erasureZones)
|
||||
xl := z.zones[0].sets[0]
|
||||
z := obj.(*erasureServerSets)
|
||||
xl := z.serverSets[0].sets[0]
|
||||
|
||||
// Create "bucket"
|
||||
err = obj.MakeBucketWithLocation(ctx, "bucket", BucketOptions{})
|
||||
@@ -374,11 +374,11 @@ func TestPutObjectNoQuorum(t *testing.T) {
|
||||
erasureDisks[i] = newNaughtyDisk(erasureDisks[i], diskErrors, errFaultyDisk)
|
||||
}
|
||||
}
|
||||
z.zones[0].erasureDisksMu.Lock()
|
||||
z.serverSets[0].erasureDisksMu.Lock()
|
||||
xl.getDisks = func() []StorageAPI {
|
||||
return erasureDisks
|
||||
}
|
||||
z.zones[0].erasureDisksMu.Unlock()
|
||||
z.serverSets[0].erasureDisksMu.Unlock()
|
||||
// Upload new content to same object "object"
|
||||
_, err = obj.PutObject(ctx, bucket, object, mustGetPutObjReader(t, bytes.NewReader([]byte("abcd")), int64(len("abcd")), "", ""), opts)
|
||||
if err != toObjectErr(errErasureWriteQuorum, bucket, object) {
|
||||
@@ -404,8 +404,8 @@ func testObjectQuorumFromMeta(obj ObjectLayer, instanceType string, dirs []strin
|
||||
partCount := 3
|
||||
data := bytes.Repeat([]byte("a"), 6*1024*1024*partCount)
|
||||
|
||||
z := obj.(*erasureZones)
|
||||
xl := z.zones[0].sets[0]
|
||||
z := obj.(*erasureServerSets)
|
||||
xl := z.serverSets[0].sets[0]
|
||||
erasureDisks := xl.getDisks()
|
||||
|
||||
ctx, cancel := context.WithCancel(GlobalContext)
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -351,7 +351,7 @@ func newErasureSets(ctx context.Context, endpoints Endpoints, storageDisks []Sto
|
||||
sets: make([]*erasureObjects, setCount),
|
||||
erasureDisks: make([][]StorageAPI, setCount),
|
||||
erasureLockers: make([][]dsync.NetLocker, setCount),
|
||||
erasureLockOwner: mustGetUUID(),
|
||||
erasureLockOwner: GetLocalPeer(globalEndpoints),
|
||||
endpoints: endpoints,
|
||||
endpointStrings: endpointStrings,
|
||||
setCount: setCount,
|
||||
@@ -435,7 +435,7 @@ func (s *erasureSets) SetDriveCount() int {
|
||||
}
|
||||
|
||||
// StorageUsageInfo - combines output of StorageInfo across all erasure coded object sets.
|
||||
// This only returns disk usage info for Zones to perform placement decision, this call
|
||||
// This only returns disk usage info for ServerSets to perform placement decision, this call
|
||||
// is not implemented in Object interface and is not meant to be used by other object
|
||||
// layer implementations.
|
||||
func (s *erasureSets) StorageUsageInfo(ctx context.Context) StorageInfo {
|
||||
|
||||
@@ -213,8 +213,8 @@ func TestHashedLayer(t *testing.T) {
|
||||
defer os.RemoveAll(dir)
|
||||
}
|
||||
|
||||
z := obj.(*erasureZones)
|
||||
objs = append(objs, z.zones[0].sets[0])
|
||||
z := obj.(*erasureServerSets)
|
||||
objs = append(objs, z.serverSets[0].sets[0])
|
||||
}
|
||||
|
||||
sets := &erasureSets{sets: objs, distributionAlgo: "CRCMOD"}
|
||||
|
||||
@@ -188,7 +188,7 @@ var (
|
||||
// registered listeners
|
||||
globalConsoleSys *HTTPConsoleLoggerSys
|
||||
|
||||
globalEndpoints EndpointZones
|
||||
globalEndpoints EndpointServerSets
|
||||
|
||||
// Global server's network statistics
|
||||
globalConnStats = newConnStats()
|
||||
|
||||
@@ -247,7 +247,7 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
z, ok := objAPI.(*erasureZones)
|
||||
z, ok := objAPI.(*erasureServerSets)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
@@ -368,8 +368,8 @@ func startLockMaintenance(ctx context.Context) {
|
||||
}
|
||||
|
||||
// registerLockRESTHandlers - register lock rest router.
|
||||
func registerLockRESTHandlers(router *mux.Router, endpointZones EndpointZones) {
|
||||
for _, ep := range endpointZones {
|
||||
func registerLockRESTHandlers(router *mux.Router, endpointServerSets EndpointServerSets) {
|
||||
for _, ep := range endpointServerSets {
|
||||
for _, endpoint := range ep.Endpoints {
|
||||
if !endpoint.IsLocal {
|
||||
continue
|
||||
|
||||
@@ -1187,7 +1187,7 @@ func (sys *NotificationSys) GetLocalDiskIDs(ctx context.Context) (localDiskIDs [
|
||||
}
|
||||
|
||||
// NewNotificationSys - creates new notification system object.
|
||||
func NewNotificationSys(endpoints EndpointZones) *NotificationSys {
|
||||
func NewNotificationSys(endpoints EndpointServerSets) *NotificationSys {
|
||||
// targetList/bucketRulesMap/bucketRemoteTargetRulesMap are populated by NotificationSys.Init()
|
||||
return &NotificationSys{
|
||||
targetList: event.NewTargetList(),
|
||||
|
||||
@@ -61,10 +61,10 @@ func getLocalCPUOBDInfo(ctx context.Context, r *http.Request) madmin.ServerCPUOB
|
||||
|
||||
}
|
||||
|
||||
func getLocalDrivesOBD(ctx context.Context, parallel bool, endpointZones EndpointZones, r *http.Request) madmin.ServerDrivesOBDInfo {
|
||||
func getLocalDrivesOBD(ctx context.Context, parallel bool, endpointServerSets EndpointServerSets, r *http.Request) madmin.ServerDrivesOBDInfo {
|
||||
var drivesOBDInfo []madmin.DriveOBDInfo
|
||||
var wg sync.WaitGroup
|
||||
for _, ep := range endpointZones {
|
||||
for _, ep := range endpointServerSets {
|
||||
for _, endpoint := range ep.Endpoints {
|
||||
// Only proceed for local endpoints
|
||||
if endpoint.IsLocal {
|
||||
@@ -105,7 +105,7 @@ func getLocalDrivesOBD(ctx context.Context, parallel bool, endpointZones Endpoin
|
||||
|
||||
addr := r.Host
|
||||
if globalIsDistErasure {
|
||||
addr = GetLocalPeer(endpointZones)
|
||||
addr = GetLocalPeer(endpointServerSets)
|
||||
}
|
||||
if parallel {
|
||||
return madmin.ServerDrivesOBDInfo{
|
||||
|
||||
@@ -822,8 +822,8 @@ func (client *peerRESTClient) ConsoleLog(logCh chan interface{}, doneCh <-chan s
|
||||
}()
|
||||
}
|
||||
|
||||
func getRemoteHosts(endpointZones EndpointZones) []*xnet.Host {
|
||||
peers := GetRemotePeers(endpointZones)
|
||||
func getRemoteHosts(endpointServerSets EndpointServerSets) []*xnet.Host {
|
||||
peers := GetRemotePeers(endpointServerSets)
|
||||
remoteHosts := make([]*xnet.Host, 0, len(peers))
|
||||
for _, hostStr := range peers {
|
||||
host, err := xnet.ParseHost(hostStr)
|
||||
@@ -838,7 +838,7 @@ func getRemoteHosts(endpointZones EndpointZones) []*xnet.Host {
|
||||
}
|
||||
|
||||
// newPeerRestClients creates new peer clients.
|
||||
func newPeerRestClients(endpoints EndpointZones) []*peerRESTClient {
|
||||
func newPeerRestClients(endpoints EndpointServerSets) []*peerRESTClient {
|
||||
peerHosts := getRemoteHosts(endpoints)
|
||||
restClients := make([]*peerRESTClient, len(peerHosts))
|
||||
for i, host := range peerHosts {
|
||||
|
||||
@@ -727,11 +727,11 @@ func (s *peerRESTServer) PutBucketNotificationHandler(w http.ResponseWriter, r *
|
||||
}
|
||||
|
||||
// Return disk IDs of all the local disks.
|
||||
func getLocalDiskIDs(z *erasureZones) []string {
|
||||
func getLocalDiskIDs(z *erasureServerSets) []string {
|
||||
var ids []string
|
||||
|
||||
for zoneIdx := range z.zones {
|
||||
for _, set := range z.zones[zoneIdx].sets {
|
||||
for zoneIdx := range z.serverSets {
|
||||
for _, set := range z.serverSets[zoneIdx].sets {
|
||||
disks := set.getDisks()
|
||||
for _, disk := range disks {
|
||||
if disk == nil {
|
||||
@@ -776,7 +776,7 @@ func (s *peerRESTServer) GetLocalDiskIDs(w http.ResponseWriter, r *http.Request)
|
||||
return
|
||||
}
|
||||
|
||||
z, ok := objLayer.(*erasureZones)
|
||||
z, ok := objLayer.(*erasureServerSets)
|
||||
if !ok {
|
||||
s.writeErrorResponse(w, errServerNotInitialized)
|
||||
return
|
||||
|
||||
@@ -23,9 +23,9 @@ import (
|
||||
)
|
||||
|
||||
// Composed function registering routers for only distributed Erasure setup.
|
||||
func registerDistErasureRouters(router *mux.Router, endpointZones EndpointZones) {
|
||||
func registerDistErasureRouters(router *mux.Router, endpointServerSets EndpointServerSets) {
|
||||
// Register storage REST router only if its a distributed setup.
|
||||
registerStorageRESTHandlers(router, endpointZones)
|
||||
registerStorageRESTHandlers(router, endpointServerSets)
|
||||
|
||||
// Register peer REST router only if its a distributed setup.
|
||||
registerPeerRESTHandlers(router)
|
||||
@@ -34,7 +34,7 @@ func registerDistErasureRouters(router *mux.Router, endpointZones EndpointZones)
|
||||
registerBootstrapRESTHandlers(router)
|
||||
|
||||
// Register distributed namespace lock routers.
|
||||
registerLockRESTHandlers(router, endpointZones)
|
||||
registerLockRESTHandlers(router, endpointServerSets)
|
||||
}
|
||||
|
||||
// List of some generic handlers which are applied for all incoming requests.
|
||||
@@ -79,14 +79,14 @@ var globalHandlers = []MiddlewareFunc{
|
||||
}
|
||||
|
||||
// configureServer handler returns final handler for the http server.
|
||||
func configureServerHandler(endpointZones EndpointZones) (http.Handler, error) {
|
||||
func configureServerHandler(endpointServerSets EndpointServerSets) (http.Handler, error) {
|
||||
// Initialize router. `SkipClean(true)` stops gorilla/mux from
|
||||
// normalizing URL path minio/minio#3256
|
||||
router := mux.NewRouter().SkipClean(true).UseEncodedPath()
|
||||
|
||||
// Initialize distributed NS lock.
|
||||
if globalIsDistErasure {
|
||||
registerDistErasureRouters(router, endpointZones)
|
||||
registerDistErasureRouters(router, endpointServerSets)
|
||||
}
|
||||
|
||||
// Add STS router always.
|
||||
|
||||
@@ -516,12 +516,12 @@ func serverMain(ctx *cli.Context) {
|
||||
}
|
||||
|
||||
// Initialize object layer with the supplied disks, objectLayer is nil upon any error.
|
||||
func newObjectLayer(ctx context.Context, endpointZones EndpointZones) (newObject ObjectLayer, err error) {
|
||||
func newObjectLayer(ctx context.Context, endpointServerSets EndpointServerSets) (newObject ObjectLayer, err error) {
|
||||
// For FS only, directly use the disk.
|
||||
if endpointZones.NEndpoints() == 1 {
|
||||
if endpointServerSets.NEndpoints() == 1 {
|
||||
// Initialize new FS object layer.
|
||||
return NewFSObjectLayer(endpointZones[0].Endpoints[0].Path)
|
||||
return NewFSObjectLayer(endpointServerSets[0].Endpoints[0].Path)
|
||||
}
|
||||
|
||||
return newErasureZones(ctx, endpointZones)
|
||||
return newErasureServerSets(ctx, endpointServerSets)
|
||||
}
|
||||
|
||||
@@ -58,7 +58,7 @@ func TestNewObjectLayer(t *testing.T) {
|
||||
t.Fatal("Unexpected object layer initialization error", err)
|
||||
}
|
||||
|
||||
_, ok = obj.(*erasureZones)
|
||||
_, ok = obj.(*erasureServerSets)
|
||||
if !ok {
|
||||
t.Fatal("Unexpected object layer detected", reflect.TypeOf(obj))
|
||||
}
|
||||
|
||||
@@ -890,8 +890,8 @@ func logFatalErrs(err error, endpoint Endpoint, exit bool) {
|
||||
}
|
||||
|
||||
// registerStorageRPCRouter - register storage rpc router.
|
||||
func registerStorageRESTHandlers(router *mux.Router, endpointZones EndpointZones) {
|
||||
for _, ep := range endpointZones {
|
||||
func registerStorageRESTHandlers(router *mux.Router, endpointServerSets EndpointServerSets) {
|
||||
for _, ep := range endpointServerSets {
|
||||
for _, endpoint := range ep.Endpoints {
|
||||
if !endpoint.IsLocal {
|
||||
continue
|
||||
|
||||
@@ -286,7 +286,7 @@ func isSameType(obj1, obj2 interface{}) bool {
|
||||
// defer s.Stop()
|
||||
type TestServer struct {
|
||||
Root string
|
||||
Disks EndpointZones
|
||||
Disks EndpointServerSets
|
||||
AccessKey string
|
||||
SecretKey string
|
||||
Server *httptest.Server
|
||||
@@ -403,7 +403,7 @@ func resetGlobalConfig() {
|
||||
}
|
||||
|
||||
func resetGlobalEndpoints() {
|
||||
globalEndpoints = EndpointZones{}
|
||||
globalEndpoints = EndpointServerSets{}
|
||||
}
|
||||
|
||||
func resetGlobalIsErasure() {
|
||||
@@ -1546,14 +1546,14 @@ func getRandomDisks(N int) ([]string, error) {
|
||||
}
|
||||
|
||||
// Initialize object layer with the supplied disks, objectLayer is nil upon any error.
|
||||
func newTestObjectLayer(ctx context.Context, endpointZones EndpointZones) (newObject ObjectLayer, err error) {
|
||||
func newTestObjectLayer(ctx context.Context, endpointServerSets EndpointServerSets) (newObject ObjectLayer, err error) {
|
||||
// For FS only, directly use the disk.
|
||||
if endpointZones.NEndpoints() == 1 {
|
||||
if endpointServerSets.NEndpoints() == 1 {
|
||||
// Initialize new FS object layer.
|
||||
return NewFSObjectLayer(endpointZones[0].Endpoints[0].Path)
|
||||
return NewFSObjectLayer(endpointServerSets[0].Endpoints[0].Path)
|
||||
}
|
||||
|
||||
z, err := newErasureZones(ctx, endpointZones)
|
||||
z, err := newErasureServerSets(ctx, endpointServerSets)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1566,16 +1566,16 @@ func newTestObjectLayer(ctx context.Context, endpointZones EndpointZones) (newOb
|
||||
}
|
||||
|
||||
// initObjectLayer - Instantiates object layer and returns it.
|
||||
func initObjectLayer(ctx context.Context, endpointZones EndpointZones) (ObjectLayer, []StorageAPI, error) {
|
||||
objLayer, err := newTestObjectLayer(ctx, endpointZones)
|
||||
func initObjectLayer(ctx context.Context, endpointServerSets EndpointServerSets) (ObjectLayer, []StorageAPI, error) {
|
||||
objLayer, err := newTestObjectLayer(ctx, endpointServerSets)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
var formattedDisks []StorageAPI
|
||||
// Should use the object layer tests for validating cache.
|
||||
if z, ok := objLayer.(*erasureZones); ok {
|
||||
formattedDisks = z.zones[0].GetDisks(0)()
|
||||
if z, ok := objLayer.(*erasureServerSets); ok {
|
||||
formattedDisks = z.serverSets[0].GetDisks(0)()
|
||||
}
|
||||
|
||||
// Success.
|
||||
@@ -2212,7 +2212,7 @@ func generateTLSCertKey(host string) ([]byte, []byte, error) {
|
||||
return certOut.Bytes(), keyOut.Bytes(), nil
|
||||
}
|
||||
|
||||
func mustGetZoneEndpoints(args ...string) EndpointZones {
|
||||
func mustGetZoneEndpoints(args ...string) EndpointServerSets {
|
||||
endpoints := mustGetNewEndpoints(args...)
|
||||
return []ZoneEndpoints{{
|
||||
SetCount: 1,
|
||||
@@ -2227,8 +2227,8 @@ func mustGetNewEndpoints(args ...string) (endpoints Endpoints) {
|
||||
return endpoints
|
||||
}
|
||||
|
||||
func getEndpointsLocalAddr(endpointZones EndpointZones) string {
|
||||
for _, endpoints := range endpointZones {
|
||||
func getEndpointsLocalAddr(endpointServerSets EndpointServerSets) string {
|
||||
for _, endpoints := range endpointServerSets {
|
||||
for _, endpoint := range endpoints.Endpoints {
|
||||
if endpoint.IsLocal && endpoint.Type() == URLEndpointType {
|
||||
return endpoint.Host
|
||||
|
||||
@@ -1224,17 +1224,17 @@ func TestWebObjectLayerFaultyDisks(t *testing.T) {
|
||||
}
|
||||
|
||||
// Set faulty disks to Erasure backend
|
||||
z := obj.(*erasureZones)
|
||||
xl := z.zones[0].sets[0]
|
||||
z := obj.(*erasureServerSets)
|
||||
xl := z.serverSets[0].sets[0]
|
||||
erasureDisks := xl.getDisks()
|
||||
z.zones[0].erasureDisksMu.Lock()
|
||||
z.serverSets[0].erasureDisksMu.Lock()
|
||||
xl.getDisks = func() []StorageAPI {
|
||||
for i, d := range erasureDisks {
|
||||
erasureDisks[i] = newNaughtyDisk(d, nil, errFaultyDisk)
|
||||
}
|
||||
return erasureDisks
|
||||
}
|
||||
z.zones[0].erasureDisksMu.Unlock()
|
||||
z.serverSets[0].erasureDisksMu.Unlock()
|
||||
|
||||
// Initialize web rpc endpoint.
|
||||
apiRouter := initTestWebRPCEndPoint(obj)
|
||||
|
||||
Reference in New Issue
Block a user