Parallelize initialization of storageDisks (#8288)

This commit is contained in:
Harshavardhana 2019-09-27 16:47:12 -07:00 committed by GitHub
parent c1a17c2561
commit 127641731a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 74 additions and 43 deletions

View File

@ -677,34 +677,22 @@ func closeStorageDisks(storageDisks []StorageAPI) {
}
}
// Initialize storage disks based on input arguments.
func initStorageDisks(endpoints EndpointList) ([]StorageAPI, error) {
// Initialize storage disks for each endpoint.
// Errors are returned for each endpoint with matching index.
func initStorageDisksWithErrors(endpoints EndpointList) ([]StorageAPI, []error) {
// Bootstrap disks.
storageDisks := make([]StorageAPI, len(endpoints))
errs := make([]error, len(endpoints))
var wg sync.WaitGroup
for index, endpoint := range endpoints {
storage, err := newStorageAPI(endpoint)
if err != nil && err != errDiskNotFound {
return nil, err
}
storageDisks[index] = storage
wg.Add(1)
go func(index int, endpoint Endpoint) {
defer wg.Done()
storageDisks[index], errs[index] = newStorageAPI(endpoint)
}(index, endpoint)
}
return storageDisks, nil
}
// Runs through the faulty disks and record their errors.
func initDisksWithErrors(endpoints EndpointList) ([]StorageAPI, []error) {
storageDisks := make([]StorageAPI, len(endpoints))
var dErrs = make([]error, len(storageDisks))
for index, endpoint := range endpoints {
storage, err := newStorageAPI(endpoint)
if err != nil {
logger.LogIf(context.Background(), err)
dErrs[index] = err
continue
}
storageDisks[index] = storage
}
return storageDisks, dErrs
wg.Wait()
return storageDisks, errs
}
// formatXLV3ThisEmpty - find out if '.This' field is empty

View File

@ -85,9 +85,11 @@ func TestFixFormatV3(t *testing.T) {
}
endpoints := mustGetNewEndpointList(xlDirs...)
storageDisks, err := initStorageDisks(endpoints)
if err != nil {
t.Fatal(err)
storageDisks, errs := initStorageDisksWithErrors(endpoints)
for _, err := range errs {
if err != nil && err != errDiskNotFound {
t.Fatal(err)
}
}
format := newFormatXLV3(1, 8)
@ -566,3 +568,36 @@ func TestNewFormatSets(t *testing.T) {
}
}
}
func BenchmarkInitStorageDisks256(b *testing.B) {
benchmarkInitStorageDisksN(b, 256)
}
func BenchmarkInitStorageDisks1024(b *testing.B) {
benchmarkInitStorageDisksN(b, 1024)
}
func BenchmarkInitStorageDisks2048(b *testing.B) {
benchmarkInitStorageDisksN(b, 2048)
}
func BenchmarkInitStorageDisksMax(b *testing.B) {
benchmarkInitStorageDisksN(b, 32*204)
}
func benchmarkInitStorageDisksN(b *testing.B, nDisks int) {
b.ResetTimer()
b.ReportAllocs()
fsDirs, err := getRandomDisks(nDisks)
if err != nil {
b.Fatal(err)
}
endpoints := mustGetNewEndpointList(fsDirs...)
b.RunParallel(func(pb *testing.PB) {
endpoints := endpoints
for pb.Next() {
initStorageDisksWithErrors(endpoints)
}
})
}

View File

@ -172,11 +172,13 @@ var errXLV3ThisEmpty = fmt.Errorf("XL format version 3 has This field empty")
// time. additionally make sure to close all the disks used in this attempt.
func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints EndpointList, setCount, drivesPerSet int) (*formatXLV3, error) {
// Initialize all storage disks
storageDisks, err := initStorageDisks(endpoints)
if err != nil {
return nil, err
}
storageDisks, errs := initStorageDisksWithErrors(endpoints)
defer closeStorageDisks(storageDisks)
for i, err := range errs {
if err != nil && err != errDiskNotFound {
return nil, fmt.Errorf("Disk %s: %w", endpoints[i], err)
}
}
// Attempt to load all `format.json` from all disks.
formatConfigs, sErrs := loadFormatXLAll(storageDisks)
@ -203,7 +205,7 @@ func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints EndpointLi
// most part unless one of the formats is not consistent
// with expected XL format. For example if a user is
// trying to pool FS backend into an XL set.
if err = checkFormatXLValues(formatConfigs); err != nil {
if err := checkFormatXLValues(formatConfigs); err != nil {
return nil, err
}
@ -236,7 +238,7 @@ func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints EndpointLi
// This migration failed to capture '.This' field properly which indicates
// the disk UUID association. Below function is called to handle and fix
// this regression, for more info refer https://github.com/minio/minio/issues/5667
if err = fixFormatXLV3(storageDisks, endpoints, formatConfigs); err != nil {
if err := fixFormatXLV3(storageDisks, endpoints, formatConfigs); err != nil {
return nil, err
}

View File

@ -1615,9 +1615,11 @@ func newTestObjectLayer(endpoints EndpointList) (newObject ObjectLayer, err erro
return nil, err
}
storageDisks, err := initStorageDisks(endpoints)
if err != nil {
return nil, err
storageDisks, errs := initStorageDisksWithErrors(endpoints)
for _, err = range errs {
if err != nil && err != errDiskNotFound {
return nil, err
}
}
// Initialize list pool.

View File

@ -339,7 +339,7 @@ func (s *xlSets) StorageInfo(ctx context.Context) StorageInfo {
storageInfo.Backend.Sets[i] = make([]madmin.DriveInfo, s.drivesPerSet)
}
storageDisks, dErrs := initDisksWithErrors(s.endpoints)
storageDisks, dErrs := initStorageDisksWithErrors(s.endpoints)
defer closeStorageDisks(storageDisks)
formats, sErrs := loadFormatXLAll(storageDisks)
@ -1324,9 +1324,11 @@ func (s *xlSets) ReloadFormat(ctx context.Context, dryRun bool) (err error) {
}
defer formatLock.RUnlock()
storageDisks, err := initStorageDisks(s.endpoints)
if err != nil {
return err
storageDisks, errs := initStorageDisksWithErrors(s.endpoints)
for i, err := range errs {
if err != nil && err != errDiskNotFound {
return fmt.Errorf("Disk %s: %w", s.endpoints[i], err)
}
}
defer func(storageDisks []StorageAPI) {
if err != nil {
@ -1445,9 +1447,11 @@ func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealRe
}
defer formatLock.Unlock()
storageDisks, err := initStorageDisks(s.endpoints)
if err != nil {
return madmin.HealResultItem{}, err
storageDisks, errs := initStorageDisksWithErrors(s.endpoints)
for i, derr := range errs {
if derr != nil && derr != errDiskNotFound {
return madmin.HealResultItem{}, fmt.Errorf("Disk %s: %w", s.endpoints[i], derr)
}
}
defer func(storageDisks []StorageAPI) {