diff --git a/cmd/admin-heal-ops.go b/cmd/admin-heal-ops.go index cca1728aa..5a8c94f2b 100644 --- a/cmd/admin-heal-ops.go +++ b/cmd/admin-heal-ops.go @@ -539,11 +539,17 @@ func (h *healSequence) healDiskFormat() error { } res, err := objectAPI.HealFormat(h.ctx, h.settings.DryRun) - if err != nil { + // return any error, ignore error returned when disks have + // already healed. + if err != nil && err != errNoHealRequired { return errFnHealFromAPIErr(err) } - peersReInitFormat(globalAdminPeers, h.settings.DryRun) + // Healing succeeded notify the peers to reload format and re-initialize disks. + // We will not notify peers only if healing succeeded. + if err == nil { + peersReInitFormat(globalAdminPeers, h.settings.DryRun) + } // Push format heal result return h.pushHealResultItem(res) diff --git a/cmd/admin-rpc-client.go b/cmd/admin-rpc-client.go index ba53cfc29..98029f908 100644 --- a/cmd/admin-rpc-client.go +++ b/cmd/admin-rpc-client.go @@ -84,8 +84,7 @@ func (lc localAdminClient) ReInitFormat(dryRun bool) error { if objectAPI == nil { return errServerNotInitialized } - _, err := objectAPI.HealFormat(context.Background(), dryRun) - return err + return objectAPI.ReloadFormat(context.Background(), dryRun) } // ListLocks - Fetches lock information from local lock instrumentation. diff --git a/cmd/admin-rpc-server.go b/cmd/admin-rpc-server.go index 1344bf0ee..836700734 100644 --- a/cmd/admin-rpc-server.go +++ b/cmd/admin-rpc-server.go @@ -94,8 +94,7 @@ func (s *adminCmd) ReInitFormat(args *ReInitFormatArgs, reply *AuthRPCReply) err if objectAPI == nil { return errServerNotInitialized } - _, err := objectAPI.HealFormat(context.Background(), args.DryRun) - return err + return objectAPI.ReloadFormat(context.Background(), args.DryRun) } // ListLocks - lists locks held by requests handled by this server instance. diff --git a/cmd/auth-rpc-client.go b/cmd/auth-rpc-client.go index 7ba853807..39aadd911 100644 --- a/cmd/auth-rpc-client.go +++ b/cmd/auth-rpc-client.go @@ -136,6 +136,9 @@ func (authClient *AuthRPCClient) Login() (err error) { } if err = rpcClient.Call(loginMethod, &loginArgs, &LoginRPCReply{}); err != nil { + // Closing the connection here. + rpcClient.Close() + // gob doesn't provide any typed errors for us to reflect // upon, this is the only way to return proper error. if strings.Contains(err.Error(), "gob: wrong type") { @@ -198,6 +201,9 @@ func (authClient *AuthRPCClient) Call(serviceMethod string, args interface { // gob doesn't provide any typed errors for us to reflect // upon, this is the only way to return proper error. if err != nil && strings.Contains(err.Error(), "gob: wrong type") { + // Close the rpc client also when the servers have mismatching rpc versions. + authClient.Close() + err = errRPCAPIVersionUnsupported } break diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index 98a23cb3d..b1b980258 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -1046,6 +1046,11 @@ func (fs *FSObjects) ListObjects(ctx context.Context, bucket, prefix, marker, de return result, nil } +// ReloadFormat - no-op for fs, Valid only for XL. +func (fs *FSObjects) ReloadFormat(ctx context.Context, dryRun bool) error { + return errors.Trace(NotImplemented{}) +} + // HealFormat - no-op for fs, Valid only for XL. func (fs *FSObjects) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) { logger.LogIf(ctx, NotImplemented{}) diff --git a/cmd/gateway-unsupported.go b/cmd/gateway-unsupported.go index 6f1f760f9..5e2d5a14f 100644 --- a/cmd/gateway-unsupported.go +++ b/cmd/gateway-unsupported.go @@ -89,6 +89,11 @@ func (a GatewayUnsupported) DeleteBucketPolicy(ctx context.Context, bucket strin return NotImplemented{} } +// ReloadFormat - Not implemented stub. +func (a GatewayUnsupported) ReloadFormat(ctx context.Context, dryRun bool) error { + return NotImplemented{} +} + // HealFormat - Not implemented stub func (a GatewayUnsupported) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) { logger.LogIf(ctx, NotImplemented{}) diff --git a/cmd/object-api-interface.go b/cmd/object-api-interface.go index 8546d0efe..b9912398c 100644 --- a/cmd/object-api-interface.go +++ b/cmd/object-api-interface.go @@ -58,6 +58,7 @@ type ObjectLayer interface { CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []CompletePart) (objInfo ObjectInfo, err error) // Healing operations. + ReloadFormat(ctx context.Context, dryRun bool) error HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) HealBucket(ctx context.Context, bucket string, dryRun bool) ([]madmin.HealResultItem, error) HealObject(ctx context.Context, bucket, object string, dryRun bool) (madmin.HealResultItem, error) diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index 5aa59b37d..abca85493 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -36,15 +36,27 @@ import ( "github.com/minio/minio/pkg/sync/errgroup" ) +// setsStorageAPI is encapsulated type for Close() +type setsStorageAPI [][]StorageAPI + +func (s setsStorageAPI) Close() error { + for i := 0; i < len(s); i++ { + for _, disk := range s[i] { + if disk == nil { + continue + } + disk.Close() + } + } + return nil +} + // xlSets implements ObjectLayer combining a static list of erasure coded // object sets. NOTE: There is no dynamic scaling allowed or intended in // current design. type xlSets struct { sets []*xlObjects - // Format mutex to lock format. - formatMu sync.RWMutex - // Reference format. format *formatXLV3 @@ -52,7 +64,7 @@ type xlSets struct { xlDisksMu sync.RWMutex // Re-ordered list of disks per set. - xlDisks [][]StorageAPI + xlDisks setsStorageAPI // List of endpoints provided on the command line. endpoints EndpointList @@ -138,6 +150,29 @@ func findDiskIndex(refFormat, format *formatXLV3) (int, int, error) { return -1, -1, fmt.Errorf("diskID: %s not found", format.XL.This) } +// Re initializes all disks based on the reference format, this function is +// only used by HealFormat and ReloadFormat calls. +func (s *xlSets) reInitDisks(refFormat *formatXLV3, storageDisks []StorageAPI, formats []*formatXLV3) [][]StorageAPI { + xlDisks := make([][]StorageAPI, s.setCount) + for i := 0; i < len(refFormat.XL.Sets); i++ { + xlDisks[i] = make([]StorageAPI, s.drivesPerSet) + } + for k := range storageDisks { + if storageDisks[k] == nil || formats[k] == nil { + continue + } + i, j, err := findDiskIndex(refFormat, formats[k]) + if err != nil { + reqInfo := (&logger.ReqInfo{}).AppendTags("storageDisk", storageDisks[i].String()) + ctx := logger.SetReqInfo(context.Background(), reqInfo) + logger.LogIf(ctx, err) + continue + } + xlDisks[i][j] = storageDisks[k] + } + return xlDisks +} + // connectDisks - attempt to connect all the endpoints, loads format // and re-arranges the disks in proper position. func (s *xlSets) connectDisks() { @@ -150,9 +185,7 @@ func (s *xlSets) connectDisks() { printEndpointError(endpoint, err) continue } - s.formatMu.RLock() i, j, err := findDiskIndex(s.format, format) - s.formatMu.RUnlock() if err != nil { // Close the internal connection to avoid connection leaks. disk.Close() @@ -168,11 +201,15 @@ func (s *xlSets) connectDisks() { // monitorAndConnectEndpoints this is a monitoring loop to keep track of disconnected // endpoints by reconnecting them and making sure to place them into right position in // the set topology, this monitoring happens at a given monitoring interval. -func (s *xlSets) monitorAndConnectEndpoints(doneCh chan struct{}, monitorInterval time.Duration) { +func (s *xlSets) monitorAndConnectEndpoints(monitorInterval time.Duration) { ticker := time.NewTicker(monitorInterval) for { select { - case <-doneCh: + case <-globalServiceDoneCh: + // Stop the timer. + ticker.Stop() + return + case <-s.disksConnectDoneCh: // Stop the timer. ticker.Stop() return @@ -240,7 +277,7 @@ func newXLSets(endpoints EndpointList, format *formatXLV3, setCount int, drivesP } // Start the disk monitoring and connect routine. - go s.monitorAndConnectEndpoints(globalServiceDoneCh, defaultMonitorConnectEndpointInterval) + go s.monitorAndConnectEndpoints(defaultMonitorConnectEndpointInterval) return s, nil } @@ -917,12 +954,76 @@ func formatsToDrivesInfo(endpoints EndpointList, formats []*formatXLV3, sErrs [] return beforeDrives } -// HealFormat - heals missing `format.json` on freshly or corrupted -// disks (missing format.json but does have erasure coded data in it). -func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) { +// Reloads the format from the disk, usually called by a remote peer notifier while +// healing in a distributed setup. +func (s *xlSets) ReloadFormat(ctx context.Context, dryRun bool) (err error) { // Acquire lock on format.json formatLock := s.getHashedSet(formatConfigFile).nsMutex.NewNSLock(minioMetaBucket, formatConfigFile) - if err := formatLock.GetLock(globalHealingTimeout); err != nil { + if err = formatLock.GetRLock(globalHealingTimeout); err != nil { + return err + } + defer formatLock.RUnlock() + + storageDisks, err := initStorageDisks(s.endpoints) + if err != nil { + return err + } + defer func(storageDisks []StorageAPI) { + if err != nil { + closeStorageDisks(storageDisks) + } + }(storageDisks) + + formats, sErrs := loadFormatXLAll(storageDisks) + if err = checkFormatXLValues(formats); err != nil { + return err + } + + for index, sErr := range sErrs { + if sErr != nil { + // Look for acceptable heal errors, for any other + // errors we should simply quit and return. + if _, ok := formatHealErrors[sErr]; !ok { + return fmt.Errorf("Disk %s: %s", s.endpoints[index], sErr) + } + } + } + + refFormat, err := getFormatXLInQuorum(formats) + if err != nil { + return err + } + + // kill the monitoring loop such that we stop writing + // to indicate that we will re-initialize everything + // with new format. + s.disksConnectDoneCh <- struct{}{} + + // Replace the new format. + s.format = refFormat + + s.xlDisksMu.Lock() + { + // Close all existing disks. + s.xlDisks.Close() + + // Re initialize disks, after saving the new reference format. + s.xlDisks = s.reInitDisks(refFormat, storageDisks, formats) + } + s.xlDisksMu.Unlock() + + // Restart monitoring loop to monitor reformatted disks again. + go s.monitorAndConnectEndpoints(defaultMonitorConnectEndpointInterval) + + return nil +} + +// HealFormat - heals missing `format.json` on freshly or corrupted +// disks (missing format.json but does have erasure coded data in it). +func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealResultItem, err error) { + // Acquire lock on format.json + formatLock := s.getHashedSet(formatConfigFile).nsMutex.NewNSLock(minioMetaBucket, formatConfigFile) + if err = formatLock.GetLock(globalHealingTimeout); err != nil { return madmin.HealResultItem{}, err } defer formatLock.Unlock() @@ -931,7 +1032,12 @@ func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResult if err != nil { return madmin.HealResultItem{}, err } - defer closeStorageDisks(storageDisks) + + defer func(storageDisks []StorageAPI) { + if err != nil { + closeStorageDisks(storageDisks) + } + }(storageDisks) formats, sErrs := loadFormatXLAll(storageDisks) if err = checkFormatXLValues(formats); err != nil { @@ -939,7 +1045,7 @@ func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResult } // Prepare heal-result - res := madmin.HealResultItem{ + res = madmin.HealResultItem{ Type: madmin.HealItemMetadata, Detail: "disk-format", DiskCount: s.setCount * s.drivesPerSet, @@ -965,8 +1071,9 @@ func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResult } } + // no errors found, no healing is required. if !hasAnyErrors(sErrs) { - return res, nil + return res, errNoHealRequired } for index, sErr := range sErrs { @@ -1046,12 +1153,26 @@ func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResult return madmin.HealResultItem{}, err } - s.formatMu.Lock() - s.format = refFormat - s.formatMu.Unlock() + // kill the monitoring loop such that we stop writing + // to indicate that we will re-initialize everything + // with new format. + s.disksConnectDoneCh <- struct{}{} - // Connect disks, after saving the new reference format. - s.connectDisks() + // Replace with new reference format. + s.format = refFormat + + s.xlDisksMu.Lock() + { + // Disconnect/relinquish all existing disks. + s.xlDisks.Close() + + // Re initialize disks, after saving the new reference format. + s.xlDisks = s.reInitDisks(refFormat, storageDisks, tmpNewFormats) + } + s.xlDisksMu.Unlock() + + // Restart our monitoring loop to start monitoring newly formatted disks. + go s.monitorAndConnectEndpoints(defaultMonitorConnectEndpointInterval) } return res, nil diff --git a/cmd/xl-v1-errors.go b/cmd/xl-v1-errors.go index b8014d115..03815f792 100644 --- a/cmd/xl-v1-errors.go +++ b/cmd/xl-v1-errors.go @@ -1,5 +1,5 @@ /* - * Minio Cloud Storage, (C) 2016 Minio, Inc. + * Minio Cloud Storage, (C) 2016, 2018 Minio, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,3 +23,6 @@ var errXLReadQuorum = errors.New("Read failed. Insufficient number of disks onli // errXLWriteQuorum - did not meet write quorum. var errXLWriteQuorum = errors.New("Write failed. Insufficient number of disks online") + +// errNoHealRequired - returned when healing is attempted on a previously healed disks. +var errNoHealRequired = errors.New("No healing is required") diff --git a/cmd/xl-v1-healing.go b/cmd/xl-v1-healing.go index 7614cc5e3..d42eb9ab7 100644 --- a/cmd/xl-v1-healing.go +++ b/cmd/xl-v1-healing.go @@ -27,6 +27,10 @@ import ( "github.com/minio/minio/pkg/madmin" ) +func (xl xlObjects) ReloadFormat(ctx context.Context, dryRun bool) error { + return errors.Trace(NotImplemented{}) +} + func (xl xlObjects) HealFormat(ctx context.Context, dryRun bool) (madmin.HealResultItem, error) { logger.LogIf(ctx, NotImplemented{}) return madmin.HealResultItem{}, NotImplemented{}