Make sure to close the disk connections (#5752)

Since we do not re-use storageDisks after moving
the connections to object layer we should close them
appropriately otherwise we have a lot of connection
leaks and these can compound as the time goes by.

This PR also refactors the initialization code to
re-use storageDisks for given set of endpoints until
we have confirmed a valid reference format.
This commit is contained in:
Harshavardhana 2018-04-03 23:58:48 -05:00 committed by Nitish Tiwari
parent d67e423a32
commit 85a57d2021
6 changed files with 171 additions and 131 deletions

View File

@ -315,27 +315,18 @@ func shouldInitXLDisks(errs []error) bool {
} }
// loadFormatXLAll - load all format config from all input disks in parallel. // loadFormatXLAll - load all format config from all input disks in parallel.
func loadFormatXLAll(endpoints EndpointList) ([]*formatXLV3, []error) { func loadFormatXLAll(storageDisks []StorageAPI) ([]*formatXLV3, []error) {
// Initialize sync waitgroup. // Initialize sync waitgroup.
var wg = &sync.WaitGroup{} var wg = &sync.WaitGroup{}
bootstrapDisks := make([]StorageAPI, len(endpoints))
for i, endpoint := range endpoints {
disk, err := newStorageAPI(endpoint)
if err != nil {
continue
}
bootstrapDisks[i] = disk
}
// Initialize list of errors. // Initialize list of errors.
var sErrs = make([]error, len(bootstrapDisks)) var sErrs = make([]error, len(storageDisks))
// Initialize format configs. // Initialize format configs.
var formats = make([]*formatXLV3, len(bootstrapDisks)) var formats = make([]*formatXLV3, len(storageDisks))
// Load format from each disk in parallel // Load format from each disk in parallel
for index, disk := range bootstrapDisks { for index, disk := range storageDisks {
if disk == nil { if disk == nil {
sErrs[index] = errDiskNotFound sErrs[index] = errDiskNotFound
continue continue
@ -344,10 +335,9 @@ func loadFormatXLAll(endpoints EndpointList) ([]*formatXLV3, []error) {
// Launch go-routine per disk. // Launch go-routine per disk.
go func(index int, disk StorageAPI) { go func(index int, disk StorageAPI) {
defer wg.Done() defer wg.Done()
format, lErr := loadFormatXL(disk) format, lErr := loadFormatXL(disk)
if lErr != nil { if lErr != nil {
// close the internal connection, to avoid fd leaks.
disk.Close()
sErrs[index] = lErr sErrs[index] = lErr
return return
} }
@ -530,12 +520,7 @@ func formatXLV3Check(reference *formatXLV3, format *formatXLV3) error {
} }
// saveFormatXLAll - populates `format.json` on disks in its order. // saveFormatXLAll - populates `format.json` on disks in its order.
func saveFormatXLAll(endpoints EndpointList, formats []*formatXLV3) error { func saveFormatXLAll(storageDisks []StorageAPI, formats []*formatXLV3) error {
storageDisks, err := initStorageDisks(endpoints)
if err != nil {
return err
}
var errs = make([]error, len(storageDisks)) var errs = make([]error, len(storageDisks))
var wg = &sync.WaitGroup{} var wg = &sync.WaitGroup{}
@ -556,17 +541,25 @@ func saveFormatXLAll(endpoints EndpointList, formats []*formatXLV3) error {
// Wait for the routines to finish. // Wait for the routines to finish.
wg.Wait() wg.Wait()
writeQuorum := len(endpoints)/2 + 1 writeQuorum := len(storageDisks)/2 + 1
return reduceWriteQuorumErrs(errs, nil, writeQuorum) return reduceWriteQuorumErrs(errs, nil, writeQuorum)
} }
// relinquishes the underlying connection for all storage disks.
func closeStorageDisks(storageDisks []StorageAPI) {
for _, disk := range storageDisks {
if disk == nil {
continue
}
disk.Close()
}
}
// Initialize storage disks based on input arguments. // Initialize storage disks based on input arguments.
func initStorageDisks(endpoints EndpointList) ([]StorageAPI, error) { func initStorageDisks(endpoints EndpointList) ([]StorageAPI, error) {
// Bootstrap disks. // Bootstrap disks.
storageDisks := make([]StorageAPI, len(endpoints)) storageDisks := make([]StorageAPI, len(endpoints))
for index, endpoint := range endpoints { for index, endpoint := range endpoints {
// Intentionally ignore disk not found errors. XL is designed
// to handle these errors internally.
storage, err := newStorageAPI(endpoint) storage, err := newStorageAPI(endpoint)
if err != nil && err != errDiskNotFound { if err != nil && err != errDiskNotFound {
return nil, err return nil, err
@ -598,12 +591,7 @@ func formatXLV3ThisEmpty(formats []*formatXLV3) bool {
} }
// fixFormatXLV3 - fix format XL configuration on all disks. // fixFormatXLV3 - fix format XL configuration on all disks.
func fixFormatXLV3(endpoints EndpointList, formats []*formatXLV3) error { func fixFormatXLV3(storageDisks []StorageAPI, endpoints EndpointList, formats []*formatXLV3) error {
storageDisks, err := initStorageDisks(endpoints)
if err != nil {
return err
}
for i, format := range formats { for i, format := range formats {
if format == nil || !endpoints[i].IsLocal { if format == nil || !endpoints[i].IsLocal {
continue continue
@ -617,7 +605,7 @@ func fixFormatXLV3(endpoints EndpointList, formats []*formatXLV3) error {
} }
if format.XL.This == "" { if format.XL.This == "" {
formats[i].XL.This = format.XL.Sets[0][i] formats[i].XL.This = format.XL.Sets[0][i]
if err = saveFormatXL(storageDisks[i], formats[i]); err != nil { if err := saveFormatXL(storageDisks[i], formats[i]); err != nil {
return err return err
} }
} }
@ -626,9 +614,9 @@ func fixFormatXLV3(endpoints EndpointList, formats []*formatXLV3) error {
} }
// initFormatXL - save XL format configuration on all disks. // initFormatXL - save XL format configuration on all disks.
func initFormatXL(endpoints EndpointList, setCount, disksPerSet int) (format *formatXLV3, err error) { func initFormatXL(storageDisks []StorageAPI, setCount, disksPerSet int) (format *formatXLV3, err error) {
format = newFormatXLV3(setCount, disksPerSet) format = newFormatXLV3(setCount, disksPerSet)
formats := make([]*formatXLV3, len(endpoints)) formats := make([]*formatXLV3, len(storageDisks))
for i := 0; i < setCount; i++ { for i := 0; i < setCount; i++ {
for j := 0; j < disksPerSet; j++ { for j := 0; j < disksPerSet; j++ {
@ -639,12 +627,12 @@ func initFormatXL(endpoints EndpointList, setCount, disksPerSet int) (format *fo
} }
// Initialize meta volume, if volume already exists ignores it. // Initialize meta volume, if volume already exists ignores it.
if err = initFormatXLMetaVolume(endpoints, formats); err != nil { if err = initFormatXLMetaVolume(storageDisks, formats); err != nil {
return format, fmt.Errorf("Unable to initialize '.minio.sys' meta volume, %s", err) return format, fmt.Errorf("Unable to initialize '.minio.sys' meta volume, %s", err)
} }
// Save formats `format.json` across all disks. // Save formats `format.json` across all disks.
if err = saveFormatXLAll(endpoints, formats); err != nil { if err = saveFormatXLAll(storageDisks, formats); err != nil {
return nil, err return nil, err
} }
@ -675,12 +663,7 @@ func makeFormatXLMetaVolumes(disk StorageAPI) error {
var initMetaVolIgnoredErrs = append(baseIgnoredErrs, errVolumeExists) var initMetaVolIgnoredErrs = append(baseIgnoredErrs, errVolumeExists)
// Initializes meta volume on all input storage disks. // Initializes meta volume on all input storage disks.
func initFormatXLMetaVolume(endpoints EndpointList, formats []*formatXLV3) error { func initFormatXLMetaVolume(storageDisks []StorageAPI, formats []*formatXLV3) error {
storageDisks, err := initStorageDisks(endpoints)
if err != nil {
return err
}
// This happens for the first time, but keep this here since this // This happens for the first time, but keep this here since this
// is the only place where it can be made expensive optimizing all // is the only place where it can be made expensive optimizing all
// other calls. Create minio meta volume, if it doesn't exist yet. // other calls. Create minio meta volume, if it doesn't exist yet.

View File

@ -87,6 +87,11 @@ func TestFixFormatV3(t *testing.T) {
} }
endpoints := mustGetNewEndpointList(xlDirs...) endpoints := mustGetNewEndpointList(xlDirs...)
storageDisks, err := initStorageDisks(endpoints)
if err != nil {
t.Fatal(err)
}
format := newFormatXLV3(1, 8) format := newFormatXLV3(1, 8)
formats := make([]*formatXLV3, 8) formats := make([]*formatXLV3, 8)
@ -96,17 +101,18 @@ func TestFixFormatV3(t *testing.T) {
formats[j] = &newFormat formats[j] = &newFormat
} }
if err = initFormatXLMetaVolume(endpoints, formats); err != nil { if err = initFormatXLMetaVolume(storageDisks, formats); err != nil {
t.Fatal(err) t.Fatal(err)
} }
formats[1] = nil formats[1] = nil
expThis := formats[2].XL.This expThis := formats[2].XL.This
formats[2].XL.This = "" formats[2].XL.This = ""
if err := fixFormatXLV3(endpoints, formats); err != nil { if err := fixFormatXLV3(storageDisks, endpoints, formats); err != nil {
t.Fatal(err) t.Fatal(err)
} }
newFormats, errs := loadFormatXLAll(endpoints)
newFormats, errs := loadFormatXLAll(storageDisks)
for _, err := range errs { for _, err := range errs {
if err != nil && errors.Cause(err) != errUnformattedDisk { if err != nil && errors.Cause(err) != errUnformattedDisk {
t.Fatal(err) t.Fatal(err)

View File

@ -88,6 +88,95 @@ func formatXLCleanupTmpLocalEndpoints(endpoints EndpointList) error {
return nil return nil
} }
// validate reference format against list of XL formats.
func validateXLFormats(format *formatXLV3, formats []*formatXLV3, endpoints EndpointList, setCount, drivesPerSet int) error {
for i := range formats {
if formats[i] == nil {
continue
}
if err := formatXLV3Check(format, formats[i]); err != nil {
return fmt.Errorf("%s format error: %s", endpoints[i], err)
}
}
if len(format.XL.Sets) != setCount {
return fmt.Errorf("Current backend format is inconsistent with input args (%s), Expected set count %d, got %d", endpoints, len(format.XL.Sets), setCount)
}
if len(format.XL.Sets[0]) != drivesPerSet {
return fmt.Errorf("Current backend format is inconsistent with input args (%s), Expected drive count per set %d, got %d", endpoints, len(format.XL.Sets[0]), drivesPerSet)
}
return nil
}
// Following error message is added to fix a regression in release
// RELEASE.2018-03-16T22-52-12Z after migrating v1 to v2 to v3. This
// migration failed to capture '.This' field properly which indicates
// the disk UUID association. Below error message is returned when
// we see this situation in format.json, for more info refer
// https://github.com/minio/minio/issues/5667
var errXLV3ThisEmpty = fmt.Errorf("XL format version 3 has This field empty")
// connect to list of endpoints and load all XL disk formats, validate the formats are correct
// and are in quorum, if no formats are found attempt to initialize all of them for the first
// time. additionally make sure to close all the disks used in this attempt.
func connectLoadInitFormats(firstDisk bool, endpoints EndpointList, setCount, drivesPerSet int) (*formatXLV3, error) {
storageDisks, err := initStorageDisks(endpoints)
if err != nil {
return nil, err
}
defer closeStorageDisks(storageDisks)
// Attempt to load all `format.json` from all disks.
formatConfigs, sErrs := loadFormatXLAll(storageDisks)
// Pre-emptively check if one of the formatted disks
// is invalid. This function returns success for the
// most part unless one of the formats is not consistent
// with expected XL format. For example if a user is
// trying to pool FS backend into an XL set.
if err = checkFormatXLValues(formatConfigs); err != nil {
return nil, err
}
for i, sErr := range sErrs {
if _, ok := formatCriticalErrors[errors.Cause(sErr)]; ok {
return nil, fmt.Errorf("Disk %s: %s", endpoints[i], sErr)
}
}
if shouldInitXLDisks(sErrs) {
if !firstDisk {
return nil, errNotFirstDisk
}
return initFormatXL(storageDisks, setCount, drivesPerSet)
}
// Following function is added to fix a regressions which was introduced
// in release RELEASE.2018-03-16T22-52-12Z after migrating v1 to v2 to v3.
// This migration failed to capture '.This' field properly which indicates
// the disk UUID association. Below function is called to handle and fix
// this regression, for more info refer https://github.com/minio/minio/issues/5667
if err = fixFormatXLV3(storageDisks, endpoints, formatConfigs); err != nil {
return nil, err
}
// If any of the .This field is still empty, we return error.
if formatXLV3ThisEmpty(formatConfigs) {
return nil, errXLV3ThisEmpty
}
format, err := getFormatXLInQuorum(formatConfigs)
if err != nil {
return nil, err
}
// Validate all format configs with reference format.
if err = validateXLFormats(format, formatConfigs, endpoints, setCount, drivesPerSet); err != nil {
return nil, err
}
return format, nil
}
// Format disks before initialization of object layer. // Format disks before initialization of object layer.
func waitForFormatXL(firstDisk bool, endpoints EndpointList, setCount, disksPerSet int) (format *formatXLV3, err error) { func waitForFormatXL(firstDisk bool, endpoints EndpointList, setCount, disksPerSet int) (format *formatXLV3, err error) {
if len(endpoints) == 0 || setCount == 0 || disksPerSet == 0 { if len(endpoints) == 0 || setCount == 0 || disksPerSet == 0 {
@ -121,65 +210,26 @@ func waitForFormatXL(firstDisk bool, endpoints EndpointList, setCount, disksPerS
for { for {
select { select {
case _ = <-retryTimerCh: case _ = <-retryTimerCh:
// Attempt to load all `format.json` from all disks. format, err := connectLoadInitFormats(firstDisk, endpoints, setCount, disksPerSet)
formatConfigs, sErrs := loadFormatXLAll(endpoints) if err != nil {
switch err {
// Pre-emptively check if one of the formatted disks case errNotFirstDisk:
// is invalid. This function returns success for the // Fresh setup, wait for first server to be up.
// most part unless one of the formats is not consistent
// with expected XL format. For example if a user is
// trying to pool FS backend into an XL set.
if err = checkFormatXLValues(formatConfigs); err != nil {
return nil, err
}
for i, sErr := range sErrs {
if _, ok := formatCriticalErrors[errors.Cause(sErr)]; ok {
return nil, fmt.Errorf("Disk %s: %s", endpoints[i], sErr)
}
}
if shouldInitXLDisks(sErrs) {
if !firstDisk {
console.Println("Waiting for the first server to format the disks.") console.Println("Waiting for the first server to format the disks.")
continue continue
case errXLReadQuorum:
// no quorum available continue to wait for minimum number of servers.
console.Printf("Waiting for a minimum of %d disks to come online (elapsed %s)\n", len(endpoints)/2, getElapsedTime())
continue
case errXLV3ThisEmpty:
// need to wait for this error to be healed, so continue.
continue
default:
// For all other unhandled errors we exit and fail.
return nil, err
} }
return initFormatXL(endpoints, setCount, disksPerSet)
} }
return format, nil
// Following function is added to fix a regressions which was introduced
// in release RELEASE.2018-03-16T22-52-12Z after migrating v1 to v2 to v3.
// This migration failed to capture '.This' field properly which indicates
// the disk UUID association. Below function is called to handle and fix
// this regression, for more info refer https://github.com/minio/minio/issues/5667
if err = fixFormatXLV3(endpoints, formatConfigs); err != nil {
return nil, err
}
// If any of the .This field is still empty we wait them to be fixed.
if formatXLV3ThisEmpty(formatConfigs) {
continue
}
format, err = getFormatXLInQuorum(formatConfigs)
if err == nil {
for i := range formatConfigs {
if formatConfigs[i] == nil {
continue
}
if err = formatXLV3Check(format, formatConfigs[i]); err != nil {
return nil, fmt.Errorf("%s format error: %s", endpoints[i], err)
}
}
if len(format.XL.Sets) != globalXLSetCount {
return nil, fmt.Errorf("Current backend format is inconsistent with input args (%s), Expected set count %d, got %d", endpoints, len(format.XL.Sets), globalXLSetCount)
}
if len(format.XL.Sets[0]) != globalXLSetDriveCount {
return nil, fmt.Errorf("Current backend format is inconsistent with input args (%s), Expected drive count per set %d, got %d", endpoints, len(format.XL.Sets[0]), globalXLSetDriveCount)
}
return format, nil
}
console.Printf("Waiting for a minimum of %d disks to come online (elapsed %s)\n", len(endpoints)/2, getElapsedTime())
case <-globalOSSignalCh: case <-globalOSSignalCh:
return nil, fmt.Errorf("Initializing data volumes gracefully stopped") return nil, fmt.Errorf("Initializing data volumes gracefully stopped")
} }

View File

@ -58,3 +58,7 @@ var errInvalidRange = errors.New("Invalid range")
// errInvalidRangeSource - returned when given range value exceeds // errInvalidRangeSource - returned when given range value exceeds
// the source object size. // the source object size.
var errInvalidRangeSource = errors.New("Range specified exceeds source object size") var errInvalidRangeSource = errors.New("Range specified exceeds source object size")
// error returned by disks which are to be initialized are waiting for the
// first server to initialize them in distributed set to initialize them.
var errNotFirstDisk = errors.New("Not first disk")

View File

@ -101,7 +101,7 @@ func connectEndpoint(endpoint Endpoint) (StorageAPI, *formatXLV3, error) {
format, err := loadFormatXL(disk) format, err := loadFormatXL(disk)
if err != nil { if err != nil {
// close the internal connection, to avoid fd leaks. // Close the internal connection to avoid connection leaks.
disk.Close() disk.Close()
return nil, nil, err return nil, nil, err
} }
@ -147,6 +147,8 @@ func (s *xlSets) connectDisks() {
i, j, err := findDiskIndex(s.format, format) i, j, err := findDiskIndex(s.format, format)
s.formatMu.RUnlock() s.formatMu.RUnlock()
if err != nil { if err != nil {
// Close the internal connection to avoid connection leaks.
disk.Close()
printEndpointError(endpoint, err) printEndpointError(endpoint, err)
continue continue
} }
@ -215,19 +217,8 @@ func newXLSets(endpoints EndpointList, format *formatXLV3, setCount int, drivesP
go s.sets[i].cleanupStaleMultipartUploads(globalMultipartCleanupInterval, globalMultipartExpiry, globalServiceDoneCh) go s.sets[i].cleanupStaleMultipartUploads(globalMultipartCleanupInterval, globalMultipartExpiry, globalServiceDoneCh)
} }
for _, endpoint := range endpoints { // Connect disks right away.
disk, nformat, err := connectEndpoint(endpoint) s.connectDisks()
if err != nil {
errorIf(err, "Unable to connect to endpoint %s", endpoint)
continue
}
i, j, err := findDiskIndex(format, nformat)
if err != nil {
errorIf(err, "Unable to find the endpoint %s in reference format", endpoint)
continue
}
s.xlDisks[i][j] = disk
}
// Initialize and load bucket policies. // Initialize and load bucket policies.
var err error var err error
@ -267,7 +258,19 @@ func (s *xlSets) StorageInfo(ctx context.Context) StorageInfo {
storageInfo.Backend.RRSCData = rrSCData storageInfo.Backend.RRSCData = rrSCData
storageInfo.Backend.RRSCParity = rrSCparity storageInfo.Backend.RRSCParity = rrSCparity
formats, sErrs := loadFormatXLAll(s.endpoints) storageInfo.Backend.Sets = make([][]madmin.DriveInfo, s.setCount)
for i := range storageInfo.Backend.Sets {
storageInfo.Backend.Sets[i] = make([]madmin.DriveInfo, s.drivesPerSet)
}
storageDisks, err := initStorageDisks(s.endpoints)
if err != nil {
return storageInfo
}
defer closeStorageDisks(storageDisks)
formats, sErrs := loadFormatXLAll(storageDisks)
drivesInfo := formatsToDrivesInfo(s.endpoints, formats, sErrs) drivesInfo := formatsToDrivesInfo(s.endpoints, formats, sErrs)
refFormat, err := getFormatXLInQuorum(formats) refFormat, err := getFormatXLInQuorum(formats)
if err != nil { if err != nil {
@ -276,11 +279,6 @@ func (s *xlSets) StorageInfo(ctx context.Context) StorageInfo {
return storageInfo return storageInfo
} }
storageInfo.Backend.Sets = make([][]madmin.DriveInfo, s.setCount)
for i := range storageInfo.Backend.Sets {
storageInfo.Backend.Sets[i] = make([]madmin.DriveInfo, s.drivesPerSet)
}
// fill all the available/online endpoints // fill all the available/online endpoints
for _, drive := range drivesInfo { for _, drive := range drivesInfo {
if drive.UUID == "" { if drive.UUID == "" {
@ -921,8 +919,14 @@ func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResult
} }
defer formatLock.Unlock() defer formatLock.Unlock()
formats, sErrs := loadFormatXLAll(s.endpoints) storageDisks, err := initStorageDisks(s.endpoints)
if err := checkFormatXLValues(formats); err != nil { if err != nil {
return madmin.HealResultItem{}, err
}
defer closeStorageDisks(storageDisks)
formats, sErrs := loadFormatXLAll(storageDisks)
if err = checkFormatXLValues(formats); err != nil {
return madmin.HealResultItem{}, err return madmin.HealResultItem{}, err
} }
@ -1025,12 +1029,12 @@ func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResult
// Initialize meta volume, if volume already exists ignores it, all disks which // Initialize meta volume, if volume already exists ignores it, all disks which
// are not found are ignored as well. // are not found are ignored as well.
if err = initFormatXLMetaVolume(s.endpoints, tmpNewFormats); err != nil { if err = initFormatXLMetaVolume(storageDisks, tmpNewFormats); err != nil {
return madmin.HealResultItem{}, fmt.Errorf("Unable to initialize '.minio.sys' meta volume, %s", err) return madmin.HealResultItem{}, fmt.Errorf("Unable to initialize '.minio.sys' meta volume, %s", err)
} }
// Save formats `format.json` across all disks. // Save formats `format.json` across all disks.
if err = saveFormatXLAll(s.endpoints, tmpNewFormats); err != nil { if err = saveFormatXLAll(storageDisks, tmpNewFormats); err != nil {
return madmin.HealResultItem{}, err return madmin.HealResultItem{}, err
} }

View File

@ -59,14 +59,7 @@ var xlTreeWalkIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied, errVolu
// Shutdown function for object storage interface. // Shutdown function for object storage interface.
func (xl xlObjects) Shutdown(ctx context.Context) error { func (xl xlObjects) Shutdown(ctx context.Context) error {
// Add any object layer shutdown activities here. // Add any object layer shutdown activities here.
for _, disk := range xl.getDisks() { closeStorageDisks(xl.getDisks())
// This closes storage rpc client connections if any.
// Otherwise this is a no-op.
if disk == nil {
continue
}
disk.Close()
}
return nil return nil
} }