diff --git a/cmd/admin-handlers_test.go b/cmd/admin-handlers_test.go index 9181ffab3..8e759ece0 100644 --- a/cmd/admin-handlers_test.go +++ b/cmd/admin-handlers_test.go @@ -47,6 +47,10 @@ func prepareAdminXLTestBed() (*adminXLTestBed, error) { // reset global variables to start afresh. resetTestGlobals() + // Set globalIsXL to indicate that the setup uses an erasure + // code backend. + globalIsXL = true + // Initializing objectLayer for HealFormatHandler. objLayer, xlDirs, xlErr := initTestXLObjLayer() if xlErr != nil { @@ -63,15 +67,6 @@ func prepareAdminXLTestBed() (*adminXLTestBed, error) { globalEndpoints = mustGetZoneEndpoints(xlDirs...) - // Set globalIsXL to indicate that the setup uses an erasure - // code backend. - globalIsXL = true - - // Init global heal state - if globalIsXL { - globalAllHealState = initHealState() - } - globalConfigSys = NewConfigSys() globalIAMSys = NewIAMSys() diff --git a/cmd/test-utils_test.go b/cmd/test-utils_test.go index 418a3c8cd..89161c339 100644 --- a/cmd/test-utils_test.go +++ b/cmd/test-utils_test.go @@ -459,15 +459,30 @@ func resetGlobalIsXL() { // reset global heal state func resetGlobalHealState() { + // Init global heal state if globalAllHealState == nil { - return - } - globalAllHealState.Lock() - defer globalAllHealState.Unlock() - for _, v := range globalAllHealState.healSeqMap { - if !v.hasEnded() { - v.stop() + globalAllHealState = initHealState() + } else { + globalAllHealState.Lock() + for _, v := range globalAllHealState.healSeqMap { + if !v.hasEnded() { + v.stop() + } } + globalAllHealState.Unlock() + } + + // Init background heal state + if globalBackgroundHealState == nil { + globalBackgroundHealState = initHealState() + } else { + globalBackgroundHealState.Lock() + for _, v := range globalBackgroundHealState.healSeqMap { + if !v.hasEnded() { + v.stop() + } + } + globalBackgroundHealState.Unlock() } } diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index 45376801f..7d70a43ea 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -56,6 +56,11 @@ func (s setsStorageAPI) Close() error { return nil } +// Information of a new disk connection +type diskConnectInfo struct { + setIndex int +} + // xlSets implements ObjectLayer combining a static list of erasure coded // object sets. NOTE: There is no dynamic scaling allowed or intended in // current design. @@ -80,6 +85,8 @@ type xlSets struct { // Total number of sets and the number of disks per set. setCount, drivesPerSet int + disksConnectEvent chan diskConnectInfo + // Done channel to control monitoring loop. disksConnectDoneCh chan struct{} @@ -88,6 +95,9 @@ type xlSets struct { // Merge tree walk pool *MergeWalkPool + + mrfMU sync.Mutex + mrfUploads map[string]int } // isConnected - checks if the endpoint is connected or not. @@ -135,6 +145,8 @@ func connectEndpoint(endpoint Endpoint) (StorageAPI, *formatXLV3, error) { // findDiskIndex - returns the i,j'th position of the input `format` against the reference // format, after successful validation. +// - i'th position is the set index +// - j'th position is the disk index in the current set func findDiskIndex(refFormat, format *formatXLV3) (int, int, error) { if err := formatXLV3Check(refFormat, format); err != nil { return 0, 0, err @@ -198,7 +210,7 @@ func (s *xlSets) connectDisks() { printEndpointError(endpoint, err) continue } - i, j, err := findDiskIndex(s.format, format) + setIndex, diskIndex, err := findDiskIndex(s.format, format) if err != nil { // Close the internal connection to avoid connection leaks. disk.Close() @@ -207,8 +219,14 @@ func (s *xlSets) connectDisks() { } disk.SetDiskID(format.XL.This) s.xlDisksMu.Lock() - s.xlDisks[i][j] = disk + s.xlDisks[setIndex][diskIndex] = disk s.xlDisksMu.Unlock() + + // Send a new disk connect event with a timeout + select { + case s.disksConnectEvent <- diskConnectInfo{setIndex: setIndex}: + case <-time.After(100 * time.Millisecond): + } } } @@ -216,6 +234,7 @@ func (s *xlSets) connectDisks() { // endpoints by reconnecting them and making sure to place them into right position in // the set topology, this monitoring happens at a given monitoring interval. func (s *xlSets) monitorAndConnectEndpoints(monitorInterval time.Duration) { + ticker := time.NewTicker(monitorInterval) // Stop the timer. defer ticker.Stop() @@ -264,9 +283,11 @@ func newXLSets(endpoints Endpoints, format *formatXLV3, setCount int, drivesPerS setCount: setCount, drivesPerSet: drivesPerSet, format: format, + disksConnectEvent: make(chan diskConnectInfo), disksConnectDoneCh: make(chan struct{}), distributionAlgo: format.XL.DistributionAlgo, pool: NewMergeWalkPool(globalMergeLookupTimeout), + mrfUploads: make(map[string]int), } mutex := newNSLock(globalIsDistXL) @@ -281,10 +302,11 @@ func newXLSets(endpoints Endpoints, format *formatXLV3, setCount int, drivesPerS // Initialize xl objects for a given set. s.sets[i] = &xlObjects{ - getDisks: s.GetDisks(i), - getLockers: s.GetLockers(i), - nsMutex: mutex, - bp: bp, + getDisks: s.GetDisks(i), + getLockers: s.GetLockers(i), + nsMutex: mutex, + bp: bp, + mrfUploadCh: make(chan partialUpload, 10000), } go s.sets[i].cleanupStaleMultipartUploads(context.Background(), @@ -304,6 +326,9 @@ func newXLSets(endpoints Endpoints, format *formatXLV3, setCount int, drivesPerS // Start the disk monitoring and connect routine. go s.monitorAndConnectEndpoints(defaultMonitorConnectEndpointInterval) + go s.maintainMRFList() + go s.healMRFRoutine() + return s, nil } @@ -1665,3 +1690,72 @@ func (s *xlSets) IsReady(_ context.Context) bool { // Disks are not ready return false } + +// maintainMRFList gathers the list of successful partial uploads +// from all underlying xl sets and puts them in a global map which +// should not have more than 10000 entries. +func (s *xlSets) maintainMRFList() { + var agg = make(chan partialUpload, 10000) + for i, xl := range s.sets { + go func(c <-chan partialUpload, setIndex int) { + for msg := range c { + msg.failedSet = setIndex + select { + case agg <- msg: + default: + } + } + }(xl.mrfUploadCh, i) + } + + for fUpload := range agg { + s.mrfMU.Lock() + if len(s.mrfUploads) > 10000 { + s.mrfMU.Unlock() + continue + } + s.mrfUploads[pathJoin(fUpload.bucket, fUpload.object)] = fUpload.failedSet + s.mrfMU.Unlock() + } +} + +// healMRFRoutine monitors new disks connection, sweep the MRF list +// to find objects related to the new disk that needs to be healed. +func (s *xlSets) healMRFRoutine() { + // Wait until background heal state is initialized + var bgSeq *healSequence + for { + var ok bool + bgSeq, ok = globalBackgroundHealState.getHealSequenceByToken(bgHealingUUID) + if ok { + break + } + time.Sleep(time.Second) + } + + for e := range s.disksConnectEvent { + // Get the list of objects related the xl set + // to which the connected disk belongs. + var mrfUploads []string + s.mrfMU.Lock() + for k, v := range s.mrfUploads { + if v == e.setIndex { + mrfUploads = append(mrfUploads, k) + } + } + s.mrfMU.Unlock() + + // Heal objects + for _, u := range mrfUploads { + // Send an object to be healed with a timeout + select { + case bgSeq.sourceCh <- u: + case <-time.After(100 * time.Millisecond): + } + + s.mrfMU.Lock() + delete(s.mrfUploads, u) + s.mrfMU.Unlock() + } + } +} diff --git a/cmd/xl-v1-multipart.go b/cmd/xl-v1-multipart.go index bdc502984..7de8b8d56 100644 --- a/cmd/xl-v1-multipart.go +++ b/cmd/xl-v1-multipart.go @@ -584,8 +584,10 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string, uploadIDPath := xl.getUploadIDDir(bucket, object, uploadID) + storageDisks := xl.getDisks() + // Read metadata associated with the object from all disks. - partsMetadata, errs := readAllXLMetadata(ctx, xl.getDisks(), minioMetaMultipartBucket, uploadIDPath) + partsMetadata, errs := readAllXLMetadata(ctx, storageDisks, minioMetaMultipartBucket, uploadIDPath) // get Quorum for this object _, writeQuorum, err := objectQuorumFromMeta(ctx, xl, partsMetadata, errs) @@ -598,7 +600,7 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string, return oi, toObjectErr(reducedErr, bucket, object) } - onlineDisks, modTime := listOnlineDisks(xl.getDisks(), partsMetadata, errs) + onlineDisks, modTime := listOnlineDisks(storageDisks, partsMetadata, errs) // Calculate full object size. var objectSize int64 @@ -743,10 +745,17 @@ func (xl xlObjects) CompleteMultipartUpload(ctx context.Context, bucket string, } // Rename the multipart object to final location. - if _, err = rename(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, bucket, object, true, writeQuorum, nil); err != nil { + if onlineDisks, err = rename(ctx, onlineDisks, minioMetaMultipartBucket, uploadIDPath, bucket, object, true, writeQuorum, nil); err != nil { return oi, toObjectErr(err, bucket, object) } + // Check if there is any offline disk and add it to the MRF list + for i := 0; i < len(onlineDisks); i++ { + if onlineDisks[i] == nil || storageDisks[i] == nil { + xl.addPartialUpload(bucket, object) + } + } + // Success, return object info. return xlMeta.ToObjectInfo(bucket, object), nil } diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index 1744aa637..13dd50a2c 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -626,7 +626,7 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, // NOTE: Do not use online disks slice here: the reason is that existing object should be purged // regardless of `xl.json` status and rolled back in case of errors. Also allow renaming the // existing object if it is not present in quorum disks so users can overwrite stale objects. - _, err = rename(ctx, xl.getDisks(), bucket, object, minioMetaTmpBucket, newUniqueID, true, writeQuorum, []error{errFileNotFound}) + _, err = rename(ctx, storageDisks, bucket, object, minioMetaTmpBucket, newUniqueID, true, writeQuorum, []error{errFileNotFound}) if err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } @@ -646,11 +646,19 @@ func (xl xlObjects) putObject(ctx context.Context, bucket string, object string, } // Rename the successfully written temporary object to final location. - _, err = rename(ctx, onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, true, writeQuorum, nil) - if err != nil { + if onlineDisks, err = rename(ctx, onlineDisks, minioMetaTmpBucket, tempObj, bucket, object, true, writeQuorum, nil); err != nil { return ObjectInfo{}, toObjectErr(err, bucket, object) } + // Whether a disk was initially or becomes offline + // during this upload, send it to the MRF list. + for i := 0; i < len(onlineDisks); i++ { + if onlineDisks[i] == nil || storageDisks[i] == nil { + xl.addPartialUpload(bucket, object) + break + } + } + // Object info is the same in all disks, so we can pick the first meta // of the first disk xlMeta = partsMetadata[0] @@ -960,3 +968,12 @@ func (xl xlObjects) ListObjectsV2(ctx context.Context, bucket, prefix, continuat } return listObjectsV2Info, err } + +// Send the successul but partial upload, however ignore +// if the channel is blocked by other items. +func (xl xlObjects) addPartialUpload(bucket, key string) { + select { + case xl.mrfUploadCh <- partialUpload{bucket: bucket, object: key}: + default: + } +} diff --git a/cmd/xl-v1.go b/cmd/xl-v1.go index fd97ff86c..754e80acb 100644 --- a/cmd/xl-v1.go +++ b/cmd/xl-v1.go @@ -39,6 +39,14 @@ const ( // OfflineDisk represents an unavailable disk. var OfflineDisk StorageAPI // zero value is nil +// partialUpload is a successful upload of an object +// but not written in all disks (having quorum) +type partialUpload struct { + bucket string + object string + failedSet int +} + // xlObjects - Implements XL object layer. type xlObjects struct { // getDisks returns list of storageAPIs. @@ -55,6 +63,8 @@ type xlObjects struct { // TODO: ListObjects pool management, should be removed in future. listPool *TreeWalkPool + + mrfUploadCh chan partialUpload } // NewNSLock - initialize a new namespace RWLocker instance.