From e25ace2151b42f96f38ff30a8219a0c5333e068c Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Tue, 12 May 2020 05:41:38 +0200 Subject: [PATCH] Forward RPC errors from crawler (#9569) The `keepHTTPResponseAlive` would cause errors to be returned with status OK. - Add '32' as a filler byte until a response is ready - '0' to indicate the response is ready to be consumed - '1' to indicate response has an error which needs to be returned to the caller Clear out 'file not found' errors from dir walker, since it may be in a folder that has been deleted since it was scanned. --- cmd/data-usage.go | 2 +- cmd/fastwalk.go | 3 ++ cmd/peer-rest-server.go | 2 +- cmd/storage-rest-server.go | 56 ++++++++++++++++++++++++++------------ 4 files changed, 44 insertions(+), 19 deletions(-) diff --git a/cmd/data-usage.go b/cmd/data-usage.go index 067b6833d..dcbf85dad 100644 --- a/cmd/data-usage.go +++ b/cmd/data-usage.go @@ -277,7 +277,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo size, err := f.getSize(Item{Path: path.Join(f.root, entName), Typ: typ}) sleepDuration(time.Since(t), f.dataUsageCrawlMult) - if err == errSkipFile { + if err == errSkipFile || err == errFileNotFound { return nil } logger.LogIf(ctx, err) diff --git a/cmd/fastwalk.go b/cmd/fastwalk.go index 560c5c382..2096982e6 100644 --- a/cmd/fastwalk.go +++ b/cmd/fastwalk.go @@ -19,6 +19,9 @@ var errSkipFile = errors.New("fastwalk: skip this file") func readDirFn(dirName string, fn func(entName string, typ os.FileMode) error) error { fis, err := readDir(dirName) if err != nil { + if os.IsNotExist(err) || err == errFileNotFound { + return nil + } return err } for _, fi := range fis { diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index fe7305954..2ea3ad666 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -473,7 +473,7 @@ func (s *peerRESTServer) DispatchNetOBDInfoHandler(w http.ResponseWriter, r *htt ctx := newContext(r, w, "DispatchNetOBDInfo") info := globalNotificationSys.NetOBDInfo(ctx) - done() + done(nil) logger.LogIf(ctx, gob.NewEncoder(w).Encode(info)) w.(http.Flusher).Flush() } diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 2d3e69ae7..4b431deca 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -153,10 +153,9 @@ func (s *storageRESTServer) CrawlAndGetDataUsageHandler(w http.ResponseWriter, r done := keepHTTPResponseAlive(w) usageInfo, err := s.storage.CrawlAndGetDataUsage(r.Context(), cache) - done() + done(err) if err != nil { - s.writeErrorResponse(w, err) return } w.Write(usageInfo.serialize()) @@ -552,7 +551,7 @@ func (s *storageRESTServer) DeleteFileBulkHandler(w http.ResponseWriter, r *http encoder := gob.NewEncoder(w) done := keepHTTPResponseAlive(w) errs, err := s.storage.DeleteFileBulk(volume, filePaths) - done() + done(nil) for idx := range filePaths { if err != nil { @@ -599,7 +598,7 @@ func (s *storageRESTServer) DeletePrefixesHandler(w http.ResponseWriter, r *http encoder := gob.NewEncoder(w) done := keepHTTPResponseAlive(w) errs, err := s.storage.DeletePrefixes(volume, prefixes) - done() + done(nil) for idx := range prefixes { if err != nil { dErrsResp.Errs[idx] = StorageErr(err.Error()) @@ -633,29 +632,44 @@ func (s *storageRESTServer) RenameFileHandler(w http.ResponseWriter, r *http.Req // operations, such as bitrot verification or data usage crawling. // Every 10 seconds a space character is sent. // The returned function should always be called to release resources. +// An optional error can be sent which will be picked as text only error, +// without its original type by the receiver. // waitForHTTPResponse should be used to the receiving side. -func keepHTTPResponseAlive(w http.ResponseWriter) func() { - doneCh := make(chan struct{}) +func keepHTTPResponseAlive(w http.ResponseWriter) func(error) { + doneCh := make(chan error) go func() { defer close(doneCh) ticker := time.NewTicker(time.Second * 10) for { select { case <-ticker.C: - w.Write([]byte(" ")) + // Response not ready, write a filler byte. + w.Write([]byte{32}) w.(http.Flusher).Flush() - case doneCh <- struct{}{}: - w.Write([]byte{0}) + case err := <-doneCh: + if err != nil { + w.Write([]byte{1}) + w.Write([]byte(err.Error())) + } else { + w.Write([]byte{0}) + } ticker.Stop() return } } }() - return func() { + return func(err error) { + if doneCh == nil { + return + } // Indicate we are ready to write. - <-doneCh + doneCh <- err + // Wait for channel to be closed so we don't race on writes. <-doneCh + + // Clear so we can be called multiple times without crashing. + doneCh = nil } } @@ -669,14 +683,22 @@ func waitForHTTPResponse(respBody io.Reader) (io.Reader, error) { if err != nil { return nil, err } - if b != ' ' { - if b != 0 { - reader.UnreadByte() + // Check if we have a response ready or a filler byte. + switch b { + case 0: + return reader, nil + case 1: + errorText, err := ioutil.ReadAll(reader) + if err != nil { + return nil, err } - break + return nil, errors.New(string(errorText)) + case 32: + continue + default: + return nil, fmt.Errorf("unexpected filler byte: %d", b) } } - return reader, nil } // VerifyFileResp - VerifyFile()'s response. @@ -720,7 +742,7 @@ func (s *storageRESTServer) VerifyFile(w http.ResponseWriter, r *http.Request) { encoder := gob.NewEncoder(w) done := keepHTTPResponseAlive(w) err = s.storage.VerifyFile(volume, filePath, size, BitrotAlgorithmFromString(algoStr), hash, int64(shardSize)) - done() + done(nil) vresp := &VerifyFileResp{} if err != nil { vresp.Err = StorageErr(err.Error())