Forward RPC errors from crawler (#9569)

The `keepHTTPResponseAlive` would cause errors to be 
returned with status OK.

- Add '32' as a filler byte until a response is ready
- '0' to indicate the response is ready to be consumed
- '1' to indicate response has an error which needs
to be returned to the caller

Clear out 'file not found' errors from dir walker, since it may be 
in a folder that has been deleted since it was scanned.
This commit is contained in:
Klaus Post 2020-05-12 05:41:38 +02:00 committed by GitHub
parent a8e5a86fa0
commit e25ace2151
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 44 additions and 19 deletions

View File

@ -277,7 +277,7 @@ func (f *folderScanner) scanQueuedLevels(ctx context.Context, folders []cachedFo
size, err := f.getSize(Item{Path: path.Join(f.root, entName), Typ: typ}) size, err := f.getSize(Item{Path: path.Join(f.root, entName), Typ: typ})
sleepDuration(time.Since(t), f.dataUsageCrawlMult) sleepDuration(time.Since(t), f.dataUsageCrawlMult)
if err == errSkipFile { if err == errSkipFile || err == errFileNotFound {
return nil return nil
} }
logger.LogIf(ctx, err) logger.LogIf(ctx, err)

View File

@ -19,6 +19,9 @@ var errSkipFile = errors.New("fastwalk: skip this file")
func readDirFn(dirName string, fn func(entName string, typ os.FileMode) error) error { func readDirFn(dirName string, fn func(entName string, typ os.FileMode) error) error {
fis, err := readDir(dirName) fis, err := readDir(dirName)
if err != nil { if err != nil {
if os.IsNotExist(err) || err == errFileNotFound {
return nil
}
return err return err
} }
for _, fi := range fis { for _, fi := range fis {

View File

@ -473,7 +473,7 @@ func (s *peerRESTServer) DispatchNetOBDInfoHandler(w http.ResponseWriter, r *htt
ctx := newContext(r, w, "DispatchNetOBDInfo") ctx := newContext(r, w, "DispatchNetOBDInfo")
info := globalNotificationSys.NetOBDInfo(ctx) info := globalNotificationSys.NetOBDInfo(ctx)
done() done(nil)
logger.LogIf(ctx, gob.NewEncoder(w).Encode(info)) logger.LogIf(ctx, gob.NewEncoder(w).Encode(info))
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
} }

View File

@ -153,10 +153,9 @@ func (s *storageRESTServer) CrawlAndGetDataUsageHandler(w http.ResponseWriter, r
done := keepHTTPResponseAlive(w) done := keepHTTPResponseAlive(w)
usageInfo, err := s.storage.CrawlAndGetDataUsage(r.Context(), cache) usageInfo, err := s.storage.CrawlAndGetDataUsage(r.Context(), cache)
done()
done(err)
if err != nil { if err != nil {
s.writeErrorResponse(w, err)
return return
} }
w.Write(usageInfo.serialize()) w.Write(usageInfo.serialize())
@ -552,7 +551,7 @@ func (s *storageRESTServer) DeleteFileBulkHandler(w http.ResponseWriter, r *http
encoder := gob.NewEncoder(w) encoder := gob.NewEncoder(w)
done := keepHTTPResponseAlive(w) done := keepHTTPResponseAlive(w)
errs, err := s.storage.DeleteFileBulk(volume, filePaths) errs, err := s.storage.DeleteFileBulk(volume, filePaths)
done() done(nil)
for idx := range filePaths { for idx := range filePaths {
if err != nil { if err != nil {
@ -599,7 +598,7 @@ func (s *storageRESTServer) DeletePrefixesHandler(w http.ResponseWriter, r *http
encoder := gob.NewEncoder(w) encoder := gob.NewEncoder(w)
done := keepHTTPResponseAlive(w) done := keepHTTPResponseAlive(w)
errs, err := s.storage.DeletePrefixes(volume, prefixes) errs, err := s.storage.DeletePrefixes(volume, prefixes)
done() done(nil)
for idx := range prefixes { for idx := range prefixes {
if err != nil { if err != nil {
dErrsResp.Errs[idx] = StorageErr(err.Error()) dErrsResp.Errs[idx] = StorageErr(err.Error())
@ -633,29 +632,44 @@ func (s *storageRESTServer) RenameFileHandler(w http.ResponseWriter, r *http.Req
// operations, such as bitrot verification or data usage crawling. // operations, such as bitrot verification or data usage crawling.
// Every 10 seconds a space character is sent. // Every 10 seconds a space character is sent.
// The returned function should always be called to release resources. // The returned function should always be called to release resources.
// An optional error can be sent which will be picked as text only error,
// without its original type by the receiver.
// waitForHTTPResponse should be used to the receiving side. // waitForHTTPResponse should be used to the receiving side.
func keepHTTPResponseAlive(w http.ResponseWriter) func() { func keepHTTPResponseAlive(w http.ResponseWriter) func(error) {
doneCh := make(chan struct{}) doneCh := make(chan error)
go func() { go func() {
defer close(doneCh) defer close(doneCh)
ticker := time.NewTicker(time.Second * 10) ticker := time.NewTicker(time.Second * 10)
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
w.Write([]byte(" ")) // Response not ready, write a filler byte.
w.Write([]byte{32})
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
case doneCh <- struct{}{}: case err := <-doneCh:
w.Write([]byte{0}) if err != nil {
w.Write([]byte{1})
w.Write([]byte(err.Error()))
} else {
w.Write([]byte{0})
}
ticker.Stop() ticker.Stop()
return return
} }
} }
}() }()
return func() { return func(err error) {
if doneCh == nil {
return
}
// Indicate we are ready to write. // Indicate we are ready to write.
<-doneCh doneCh <- err
// Wait for channel to be closed so we don't race on writes. // Wait for channel to be closed so we don't race on writes.
<-doneCh <-doneCh
// Clear so we can be called multiple times without crashing.
doneCh = nil
} }
} }
@ -669,14 +683,22 @@ func waitForHTTPResponse(respBody io.Reader) (io.Reader, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if b != ' ' { // Check if we have a response ready or a filler byte.
if b != 0 { switch b {
reader.UnreadByte() case 0:
return reader, nil
case 1:
errorText, err := ioutil.ReadAll(reader)
if err != nil {
return nil, err
} }
break return nil, errors.New(string(errorText))
case 32:
continue
default:
return nil, fmt.Errorf("unexpected filler byte: %d", b)
} }
} }
return reader, nil
} }
// VerifyFileResp - VerifyFile()'s response. // VerifyFileResp - VerifyFile()'s response.
@ -720,7 +742,7 @@ func (s *storageRESTServer) VerifyFile(w http.ResponseWriter, r *http.Request) {
encoder := gob.NewEncoder(w) encoder := gob.NewEncoder(w)
done := keepHTTPResponseAlive(w) done := keepHTTPResponseAlive(w)
err = s.storage.VerifyFile(volume, filePath, size, BitrotAlgorithmFromString(algoStr), hash, int64(shardSize)) err = s.storage.VerifyFile(volume, filePath, size, BitrotAlgorithmFromString(algoStr), hash, int64(shardSize))
done() done(nil)
vresp := &VerifyFileResp{} vresp := &VerifyFileResp{}
if err != nil { if err != nil {
vresp.Err = StorageErr(err.Error()) vresp.Err = StorageErr(err.Error())