speedup getFormatErasureInQuorum use driveCount (#14239)

startup speed-up, currently getFormatErasureInQuorum()
would spend up to 2-3secs when there are 3000+ drives
for example in a setup, simplify this implementation
to use drive counts.
This commit is contained in:
Harshavardhana 2022-02-04 12:21:21 -08:00 committed by GitHub
parent 778cccb15d
commit 6123377e66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 165 additions and 94 deletions

View File

@ -1211,6 +1211,9 @@ func formatsToDrivesInfo(endpoints Endpoints, formats []*formatErasureV3, sErrs
// If it is a single node Erasure and all disks are root disks, it is most likely a test setup, else it is a production setup. // If it is a single node Erasure and all disks are root disks, it is most likely a test setup, else it is a production setup.
// On a test setup we allow creation of format.json on root disks to help with dev/testing. // On a test setup we allow creation of format.json on root disks to help with dev/testing.
func isTestSetup(infos []DiskInfo, errs []error) bool { func isTestSetup(infos []DiskInfo, errs []error) bool {
if globalIsCICD {
return true
}
rootDiskCount := 0 rootDiskCount := 0
for i := range errs { for i := range errs {
if errs[i] == nil || errs[i] == errUnformattedDisk { if errs[i] == nil || errs[i] == errUnformattedDisk {
@ -1245,6 +1248,9 @@ func getHealDiskInfos(storageDisks []StorageAPI, errs []error) ([]DiskInfo, []er
// Mark root disks as down so as not to heal them. // Mark root disks as down so as not to heal them.
func markRootDisksAsDown(storageDisks []StorageAPI, errs []error) { func markRootDisksAsDown(storageDisks []StorageAPI, errs []error) {
if globalIsCICD {
return
}
var infos []DiskInfo var infos []DiskInfo
infos, errs = getHealDiskInfos(storageDisks, errs) infos, errs = getHealDiskInfos(storageDisks, errs)
if !isTestSetup(infos, errs) { if !isTestSetup(infos, errs) {

View File

@ -19,8 +19,6 @@ package cmd
import ( import (
"context" "context"
"crypto/sha256"
"encoding/hex"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -124,6 +122,13 @@ type formatErasureV3 struct {
} `json:"xl"` } `json:"xl"`
} }
func (f *formatErasureV3) Drives() (drives int) {
for _, set := range f.Erasure.Sets {
drives += len(set)
}
return drives
}
func (f *formatErasureV3) Clone() *formatErasureV3 { func (f *formatErasureV3) Clone() *formatErasureV3 {
b, err := json.Marshal(f) b, err := json.Marshal(f)
if err != nil { if err != nil {
@ -545,43 +550,36 @@ func formatErasureFixLocalDeploymentID(endpoints Endpoints, storageDisks []Stora
// Get backend Erasure format in quorum `format.json`. // Get backend Erasure format in quorum `format.json`.
func getFormatErasureInQuorum(formats []*formatErasureV3) (*formatErasureV3, error) { func getFormatErasureInQuorum(formats []*formatErasureV3) (*formatErasureV3, error) {
formatHashes := make([]string, len(formats)) formatCountMap := make(map[int]int, len(formats))
for i, format := range formats { for _, format := range formats {
if format == nil { if format == nil {
continue continue
} }
h := sha256.New() formatCountMap[format.Drives()]++
for _, set := range format.Erasure.Sets {
for _, diskID := range set {
h.Write([]byte(diskID))
}
}
formatHashes[i] = hex.EncodeToString(h.Sum(nil))
} }
formatCountMap := make(map[string]int) maxDrives := 0
for _, hash := range formatHashes {
if hash == "" {
continue
}
formatCountMap[hash]++
}
maxHash := ""
maxCount := 0 maxCount := 0
for hash, count := range formatCountMap { for drives, count := range formatCountMap {
if count > maxCount { if count > maxCount {
maxCount = count maxCount = count
maxHash = hash maxDrives = drives
} }
} }
if maxDrives == 0 {
return nil, errErasureReadQuorum
}
if maxCount < len(formats)/2 { if maxCount < len(formats)/2 {
return nil, errErasureReadQuorum return nil, errErasureReadQuorum
} }
for i, hash := range formatHashes { for i, format := range formats {
if hash == maxHash { if format == nil {
continue
}
if format.Drives() == maxDrives {
format := formats[i].Clone() format := formats[i].Clone()
format.Erasure.This = "" format.Erasure.This = ""
return format, nil return format, nil
@ -624,43 +622,6 @@ func formatErasureV3Check(reference *formatErasureV3, format *formatErasureV3) e
return fmt.Errorf("Disk ID %s not found in any disk sets %s", this, format.Erasure.Sets) return fmt.Errorf("Disk ID %s not found in any disk sets %s", this, format.Erasure.Sets)
} }
// Initializes meta volume only on local storage disks.
func initErasureMetaVolumesInLocalDisks(storageDisks []StorageAPI, formats []*formatErasureV3) error {
// Compute the local disks eligible for meta volumes (re)initialization
disksToInit := make([]StorageAPI, 0, len(storageDisks))
for index := range storageDisks {
if formats[index] == nil || storageDisks[index] == nil || !storageDisks[index].IsLocal() {
// Ignore create meta volume on disks which are not found or not local.
continue
}
disksToInit = append(disksToInit, storageDisks[index])
}
// Initialize errs to collect errors inside go-routine.
g := errgroup.WithNErrs(len(disksToInit))
// Initialize all disks in parallel.
for index := range disksToInit {
// Initialize a new index variable in each loop so each
// goroutine will return its own instance of index variable.
index := index
g.Go(func() error {
return makeFormatErasureMetaVolumes(disksToInit[index])
}, index)
}
// Return upon first error.
for _, err := range g.Wait() {
if err == nil {
continue
}
return toObjectErr(err, minioMetaBucket)
}
// Return success here.
return nil
}
// saveFormatErasureAll - populates `format.json` on disks in its order. // saveFormatErasureAll - populates `format.json` on disks in its order.
func saveFormatErasureAll(ctx context.Context, storageDisks []StorageAPI, formats []*formatErasureV3) error { func saveFormatErasureAll(ctx context.Context, storageDisks []StorageAPI, formats []*formatErasureV3) error {
g := errgroup.WithNErrs(len(storageDisks)) g := errgroup.WithNErrs(len(storageDisks))

View File

@ -18,6 +18,8 @@
package cmd package cmd
import ( import (
"crypto/sha256"
"encoding/hex"
"encoding/json" "encoding/json"
"errors" "errors"
"io/ioutil" "io/ioutil"
@ -54,10 +56,6 @@ func TestFixFormatV3(t *testing.T) {
formats[j] = newFormat formats[j] = newFormat
} }
if err = initErasureMetaVolumesInLocalDisks(storageDisks, formats); err != nil {
t.Fatal(err)
}
formats[1] = nil formats[1] = nil
expThis := formats[2].Erasure.This expThis := formats[2].Erasure.This
formats[2].Erasure.This = "" formats[2].Erasure.This = ""
@ -342,6 +340,102 @@ func TestGetFormatErasureInQuorumCheck(t *testing.T) {
} }
} }
// Get backend Erasure format in quorum `format.json`.
func getFormatErasureInQuorumOld(formats []*formatErasureV3) (*formatErasureV3, error) {
formatHashes := make([]string, len(formats))
for i, format := range formats {
if format == nil {
continue
}
h := sha256.New()
for _, set := range format.Erasure.Sets {
for _, diskID := range set {
h.Write([]byte(diskID))
}
}
formatHashes[i] = hex.EncodeToString(h.Sum(nil))
}
formatCountMap := make(map[string]int)
for _, hash := range formatHashes {
if hash == "" {
continue
}
formatCountMap[hash]++
}
maxHash := ""
maxCount := 0
for hash, count := range formatCountMap {
if count > maxCount {
maxCount = count
maxHash = hash
}
}
if maxCount < len(formats)/2 {
return nil, errErasureReadQuorum
}
for i, hash := range formatHashes {
if hash == maxHash {
format := formats[i].Clone()
format.Erasure.This = ""
return format, nil
}
}
return nil, errErasureReadQuorum
}
func BenchmarkGetFormatErasureInQuorumOld(b *testing.B) {
setCount := 200
setDriveCount := 15
format := newFormatErasureV3(setCount, setDriveCount)
format.Erasure.DistributionAlgo = formatErasureVersionV2DistributionAlgoV1
formats := make([]*formatErasureV3, 15*200)
for i := 0; i < setCount; i++ {
for j := 0; j < setDriveCount; j++ {
newFormat := format.Clone()
newFormat.Erasure.This = format.Erasure.Sets[i][j]
formats[i*setDriveCount+j] = newFormat
}
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
_, _ = getFormatErasureInQuorumOld(formats)
}
}
func BenchmarkGetFormatErasureInQuorum(b *testing.B) {
setCount := 200
setDriveCount := 15
format := newFormatErasureV3(setCount, setDriveCount)
format.Erasure.DistributionAlgo = formatErasureVersionV2DistributionAlgoV1
formats := make([]*formatErasureV3, 15*200)
for i := 0; i < setCount; i++ {
for j := 0; j < setDriveCount; j++ {
newFormat := format.Clone()
newFormat.Erasure.This = format.Erasure.Sets[i][j]
formats[i*setDriveCount+j] = newFormat
}
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
_, _ = getFormatErasureInQuorum(formats)
}
}
// Tests formatErasureGetDeploymentID() // Tests formatErasureGetDeploymentID()
func TestGetErasureID(t *testing.T) { func TestGetErasureID(t *testing.T) {
setCount := 2 setCount := 2

View File

@ -185,7 +185,7 @@ func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints,
for i, err := range errs { for i, err := range errs {
if err != nil { if err != nil {
if err == errDiskNotFound && retryCount >= 5 { if err == errDiskNotFound && retryCount >= 10 {
logger.Error("Unable to connect to %s: %v", endpoints[i], isServerResolvable(endpoints[i], time.Second)) logger.Error("Unable to connect to %s: %v", endpoints[i], isServerResolvable(endpoints[i], time.Second))
} else { } else {
logger.Error("Unable to use the drive %s: %v", endpoints[i], err) logger.Error("Unable to use the drive %s: %v", endpoints[i], err)
@ -202,7 +202,7 @@ func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints Endpoints,
// Check if we have // Check if we have
for i, sErr := range sErrs { for i, sErr := range sErrs {
// print the error, nonetheless, which is perhaps unhandled // print the error, nonetheless, which is perhaps unhandled
if sErr != errUnformattedDisk && sErr != errDiskNotFound && retryCount >= 5 { if sErr != errUnformattedDisk && sErr != errDiskNotFound && retryCount >= 10 {
if sErr != nil { if sErr != nil {
logger.Error("Unable to read 'format.json' from %s: %v\n", endpoints[i], sErr) logger.Error("Unable to read 'format.json' from %s: %v\n", endpoints[i], sErr)
} }
@ -315,23 +315,28 @@ func waitForFormatErasure(firstDisk bool, endpoints Endpoints, poolCount, setCou
tries++ // tried already once tries++ // tried already once
// Wait on each try for an update. // Wait on each try for an update.
ticker := time.NewTicker(250 * time.Millisecond) ticker := time.NewTicker(150 * time.Millisecond)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
if tries == 10 {
// Reset the tries count such that we log only for every 10 retries.
tries = 1
}
storageDisks, format, err := connectLoadInitFormats(tries, firstDisk, endpoints, poolCount, setCount, setDriveCount, deploymentID, distributionAlgo) storageDisks, format, err := connectLoadInitFormats(tries, firstDisk, endpoints, poolCount, setCount, setDriveCount, deploymentID, distributionAlgo)
if err != nil { if err != nil {
tries++ tries++
switch err { switch err {
case errNotFirstDisk: case errNotFirstDisk:
// Fresh setup, wait for first server to be up. // Fresh setup, wait for first server to be up.
logger.Info("Waiting for the first server to format the disks.") logger.Info("Waiting for the first server to format the disks (elapsed %s)\n", getElapsedTime())
continue continue
case errFirstDiskWait: case errFirstDiskWait:
// Fresh setup, wait for other servers to come up. // Fresh setup, wait for other servers to come up.
logger.Info("Waiting for all other servers to be online to format the disks.") logger.Info("Waiting for all other servers to be online to format the disks (elapses %s)\n", getElapsedTime())
continue continue
case errErasureReadQuorum: case errErasureReadQuorum:
// no quorum available continue to wait for minimum number of servers. // no quorum available continue to wait for minimum number of servers.

View File

@ -531,10 +531,24 @@ func serverMain(ctx *cli.Context) {
} }
} }
if globalBrowserEnabled {
srv, err := initConsoleServer()
if err != nil {
logger.FatalIf(err, "Unable to initialize console service")
}
setConsoleSrv(srv)
go func() {
logger.FatalIf(newConsoleServerFn().Serve(), "Unable to initialize console server")
}()
}
newObject, err := newObjectLayer(GlobalContext, globalEndpoints) newObject, err := newObjectLayer(GlobalContext, globalEndpoints)
if err != nil { if err != nil {
logFatalErrs(err, Endpoint{}, true) logFatalErrs(err, Endpoint{}, true)
} }
logger.SetDeploymentID(globalDeploymentID) logger.SetDeploymentID(globalDeploymentID)
// Enable background operations for erasure coding // Enable background operations for erasure coding
@ -624,19 +638,6 @@ func serverMain(ctx *cli.Context) {
logStartupMessage(color.RedBold("WARNING: Strict AWS S3 compatible incoming PUT, POST content payload validation is turned off, caution is advised do not use in production")) logStartupMessage(color.RedBold("WARNING: Strict AWS S3 compatible incoming PUT, POST content payload validation is turned off, caution is advised do not use in production"))
} }
if globalBrowserEnabled {
srv, err := initConsoleServer()
if err != nil {
logger.FatalIf(err, "Unable to initialize console service")
}
setConsoleSrv(srv)
go func() {
logger.FatalIf(newConsoleServerFn().Serve(), "Unable to initialize console server")
}()
}
if serverDebugLog { if serverDebugLog {
logger.Info("== DEBUG Mode enabled ==") logger.Info("== DEBUG Mode enabled ==")
logger.Info("Currently set environment settings:") logger.Info("Currently set environment settings:")

View File

@ -124,7 +124,7 @@ func GetCurrentReleaseTime() (releaseTime time.Time, err error) {
// "/.dockerenv": "file", // "/.dockerenv": "file",
// //
func IsDocker() bool { func IsDocker() bool {
if env.Get("MINIO_CI_CD", "") == "" { if !globalIsCICD {
_, err := os.Stat("/.dockerenv") _, err := os.Stat("/.dockerenv")
if osIsNotExist(err) { if osIsNotExist(err) {
return false return false
@ -140,7 +140,7 @@ func IsDocker() bool {
// IsDCOS returns true if minio is running in DCOS. // IsDCOS returns true if minio is running in DCOS.
func IsDCOS() bool { func IsDCOS() bool {
if env.Get("MINIO_CI_CD", "") == "" { if !globalIsCICD {
// http://mesos.apache.org/documentation/latest/docker-containerizer/ // http://mesos.apache.org/documentation/latest/docker-containerizer/
// Mesos docker containerizer sets this value // Mesos docker containerizer sets this value
return env.Get("MESOS_CONTAINER_NAME", "") != "" return env.Get("MESOS_CONTAINER_NAME", "") != ""
@ -150,7 +150,7 @@ func IsDCOS() bool {
// IsKubernetes returns true if minio is running in kubernetes. // IsKubernetes returns true if minio is running in kubernetes.
func IsKubernetes() bool { func IsKubernetes() bool {
if env.Get("MINIO_CI_CD", "") == "" { if !globalIsCICD {
// Kubernetes env used to validate if we are // Kubernetes env used to validate if we are
// indeed running inside a kubernetes pod // indeed running inside a kubernetes pod
// is KUBERNETES_SERVICE_HOST // is KUBERNETES_SERVICE_HOST

View File

@ -216,14 +216,10 @@ func newXLStorage(ep Endpoint) (s *xlStorage, err error) {
if globalIsCICD { if globalIsCICD {
rootDisk = true rootDisk = true
} else { } else {
rootDisk, err = disk.IsRootDisk(path, SlashSeparator) if globalRootDiskThreshold > 0 {
if err != nil { // When you do not want rely on automatic verification
return nil, err // of rejecting root disks, we need to add this threshold
} // to ensure that root disks are ignored properly.
if !rootDisk && globalRootDiskThreshold > 0 {
// If for some reason we couldn't detect the root disk
// use - MINIO_ROOTDISK_THRESHOLD_SIZE to figure out if
// this disk is a root disk.
info, err := disk.GetInfo(path) info, err := disk.GetInfo(path)
if err != nil { if err != nil {
return nil, err return nil, err
@ -232,6 +228,14 @@ func newXLStorage(ep Endpoint) (s *xlStorage, err error) {
// treat those disks with size less than or equal to the // treat those disks with size less than or equal to the
// threshold as rootDisks. // threshold as rootDisks.
rootDisk = info.Total <= globalRootDiskThreshold rootDisk = info.Total <= globalRootDiskThreshold
} else {
// When root disk threshold is not set, we rely
// on automatic detection - does not work in
// container environments.
rootDisk, err = disk.IsRootDisk(path, SlashSeparator)
if err != nil {
return nil, err
}
} }
} }