Add storage layer contexts (#10321)

Add context to all (non-trivial) calls to the storage layer. 

Contexts are propagated through the REST client.

- `context.TODO()` is left in place for the places where it needs to be added to the caller.
- `endWalkCh` could probably be removed from the walkers, but no changes so far.

The "dangerous" part is that now a caller disconnecting *will* propagate down,  so a 
"delete" operation will now be interrupted. In some cases we might want to disconnect 
this functionality so the operation completes if it has started, leaving the system in a cleaner state.
This commit is contained in:
Klaus Post
2020-09-04 09:45:06 -07:00
committed by GitHub
parent 0037951b6e
commit 2d58a8d861
36 changed files with 466 additions and 467 deletions

View File

@@ -138,7 +138,7 @@ func (s *storageRESTServer) DiskInfoHandler(w http.ResponseWriter, r *http.Reque
if !s.IsValid(w, r) {
return
}
info, err := s.storage.DiskInfo()
info, err := s.storage.DiskInfo(r.Context())
if err != nil {
info.Error = err.Error()
}
@@ -179,7 +179,7 @@ func (s *storageRESTServer) MakeVolHandler(w http.ResponseWriter, r *http.Reques
}
vars := mux.Vars(r)
volume := vars[storageRESTVolume]
err := s.storage.MakeVol(volume)
err := s.storage.MakeVol(r.Context(), volume)
if err != nil {
s.writeErrorResponse(w, err)
}
@@ -192,7 +192,7 @@ func (s *storageRESTServer) MakeVolBulkHandler(w http.ResponseWriter, r *http.Re
}
vars := mux.Vars(r)
volumes := strings.Split(vars[storageRESTVolumes], ",")
err := s.storage.MakeVolBulk(volumes...)
err := s.storage.MakeVolBulk(r.Context(), volumes...)
if err != nil {
s.writeErrorResponse(w, err)
}
@@ -203,7 +203,7 @@ func (s *storageRESTServer) ListVolsHandler(w http.ResponseWriter, r *http.Reque
if !s.IsValid(w, r) {
return
}
infos, err := s.storage.ListVols()
infos, err := s.storage.ListVols(r.Context())
if err != nil {
s.writeErrorResponse(w, err)
return
@@ -219,7 +219,7 @@ func (s *storageRESTServer) StatVolHandler(w http.ResponseWriter, r *http.Reques
}
vars := mux.Vars(r)
volume := vars[storageRESTVolume]
info, err := s.storage.StatVol(volume)
info, err := s.storage.StatVol(r.Context(), volume)
if err != nil {
s.writeErrorResponse(w, err)
return
@@ -236,7 +236,7 @@ func (s *storageRESTServer) DeleteVolHandler(w http.ResponseWriter, r *http.Requ
vars := mux.Vars(r)
volume := vars[storageRESTVolume]
forceDelete := vars[storageRESTForceDelete] == "true"
err := s.storage.DeleteVol(volume, forceDelete)
err := s.storage.DeleteVol(r.Context(), volume, forceDelete)
if err != nil {
s.writeErrorResponse(w, err)
}
@@ -257,7 +257,7 @@ func (s *storageRESTServer) AppendFileHandler(w http.ResponseWriter, r *http.Req
s.writeErrorResponse(w, err)
return
}
err = s.storage.AppendFile(volume, filePath, buf)
err = s.storage.AppendFile(r.Context(), volume, filePath, buf)
if err != nil {
s.writeErrorResponse(w, err)
}
@@ -278,7 +278,7 @@ func (s *storageRESTServer) CreateFileHandler(w http.ResponseWriter, r *http.Req
s.writeErrorResponse(w, err)
return
}
err = s.storage.CreateFile(volume, filePath, int64(fileSize), r.Body)
err = s.storage.CreateFile(r.Context(), volume, filePath, int64(fileSize), r.Body)
if err != nil {
s.writeErrorResponse(w, err)
}
@@ -304,7 +304,7 @@ func (s *storageRESTServer) DeleteVersionHandler(w http.ResponseWriter, r *http.
return
}
err := s.storage.DeleteVersion(volume, filePath, fi)
err := s.storage.DeleteVersion(r.Context(), volume, filePath, fi)
if err != nil {
s.writeErrorResponse(w, err)
}
@@ -320,7 +320,7 @@ func (s *storageRESTServer) ReadVersionHandler(w http.ResponseWriter, r *http.Re
filePath := vars[storageRESTFilePath]
versionID := vars[storageRESTVersionID]
fi, err := s.storage.ReadVersion(volume, filePath, versionID)
fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID)
if err != nil {
s.writeErrorResponse(w, err)
return
@@ -351,7 +351,7 @@ func (s *storageRESTServer) WriteMetadataHandler(w http.ResponseWriter, r *http.
return
}
err = s.storage.WriteMetadata(volume, filePath, fi)
err = s.storage.WriteMetadata(r.Context(), volume, filePath, fi)
if err != nil {
s.writeErrorResponse(w, err)
}
@@ -371,7 +371,7 @@ func (s *storageRESTServer) WriteAllHandler(w http.ResponseWriter, r *http.Reque
return
}
err := s.storage.WriteAll(volume, filePath, io.LimitReader(r.Body, r.ContentLength))
err := s.storage.WriteAll(r.Context(), volume, filePath, io.LimitReader(r.Body, r.ContentLength))
if err != nil {
s.writeErrorResponse(w, err)
}
@@ -397,7 +397,7 @@ func (s *storageRESTServer) CheckPartsHandler(w http.ResponseWriter, r *http.Req
return
}
if err := s.storage.CheckParts(volume, filePath, fi); err != nil {
if err := s.storage.CheckParts(r.Context(), volume, filePath, fi); err != nil {
s.writeErrorResponse(w, err)
}
}
@@ -411,7 +411,7 @@ func (s *storageRESTServer) CheckFileHandler(w http.ResponseWriter, r *http.Requ
volume := vars[storageRESTVolume]
filePath := vars[storageRESTFilePath]
if err := s.storage.CheckFile(volume, filePath); err != nil {
if err := s.storage.CheckFile(r.Context(), volume, filePath); err != nil {
s.writeErrorResponse(w, err)
}
}
@@ -425,7 +425,7 @@ func (s *storageRESTServer) ReadAllHandler(w http.ResponseWriter, r *http.Reques
volume := vars[storageRESTVolume]
filePath := vars[storageRESTFilePath]
buf, err := s.storage.ReadAll(volume, filePath)
buf, err := s.storage.ReadAll(r.Context(), volume, filePath)
if err != nil {
s.writeErrorResponse(w, err)
return
@@ -469,7 +469,7 @@ func (s *storageRESTServer) ReadFileHandler(w http.ResponseWriter, r *http.Reque
verifier = NewBitrotVerifier(BitrotAlgorithmFromString(vars[storageRESTBitrotAlgo]), hash)
}
buf := make([]byte, length)
_, err = s.storage.ReadFile(volume, filePath, int64(offset), buf, verifier)
_, err = s.storage.ReadFile(r.Context(), volume, filePath, int64(offset), buf, verifier)
if err != nil {
s.writeErrorResponse(w, err)
return
@@ -498,7 +498,7 @@ func (s *storageRESTServer) ReadFileStreamHandler(w http.ResponseWriter, r *http
return
}
rc, err := s.storage.ReadFileStream(volume, filePath, int64(offset), int64(length))
rc, err := s.storage.ReadFileStream(r.Context(), volume, filePath, int64(offset), int64(length))
if err != nil {
s.writeErrorResponse(w, err)
return
@@ -524,7 +524,7 @@ func (s *storageRESTServer) WalkSplunkHandler(w http.ResponseWriter, r *http.Req
setEventStreamHeaders(w)
encoder := gob.NewEncoder(w)
fch, err := s.storage.WalkSplunk(volume, dirPath, markerPath, r.Context().Done())
fch, err := s.storage.WalkSplunk(r.Context(), volume, dirPath, markerPath, r.Context().Done())
if err != nil {
s.writeErrorResponse(w, err)
return
@@ -552,7 +552,7 @@ func (s *storageRESTServer) WalkVersionsHandler(w http.ResponseWriter, r *http.R
setEventStreamHeaders(w)
encoder := gob.NewEncoder(w)
fch, err := s.storage.WalkVersions(volume, dirPath, markerPath, recursive, r.Context().Done())
fch, err := s.storage.WalkVersions(r.Context(), volume, dirPath, markerPath, recursive, r.Context().Done())
if err != nil {
s.writeErrorResponse(w, err)
return
@@ -580,7 +580,7 @@ func (s *storageRESTServer) WalkHandler(w http.ResponseWriter, r *http.Request)
setEventStreamHeaders(w)
encoder := gob.NewEncoder(w)
fch, err := s.storage.Walk(volume, dirPath, markerPath, recursive, r.Context().Done())
fch, err := s.storage.Walk(r.Context(), volume, dirPath, markerPath, recursive, r.Context().Done())
if err != nil {
s.writeErrorResponse(w, err)
return
@@ -604,7 +604,7 @@ func (s *storageRESTServer) ListDirHandler(w http.ResponseWriter, r *http.Reques
return
}
entries, err := s.storage.ListDir(volume, dirPath, count)
entries, err := s.storage.ListDir(r.Context(), volume, dirPath, count)
if err != nil {
s.writeErrorResponse(w, err)
return
@@ -622,7 +622,7 @@ func (s *storageRESTServer) DeleteFileHandler(w http.ResponseWriter, r *http.Req
volume := vars[storageRESTVolume]
filePath := vars[storageRESTFilePath]
err := s.storage.DeleteFile(volume, filePath)
err := s.storage.DeleteFile(r.Context(), volume, filePath)
if err != nil {
s.writeErrorResponse(w, err)
}
@@ -663,7 +663,7 @@ func (s *storageRESTServer) DeleteVersionsHandler(w http.ResponseWriter, r *http
setEventStreamHeaders(w)
encoder := gob.NewEncoder(w)
done := keepHTTPResponseAlive(w)
errs := s.storage.DeleteVersions(volume, versions)
errs := s.storage.DeleteVersions(r.Context(), volume, versions)
done(nil)
for idx := range versions {
if errs[idx] != nil {
@@ -685,7 +685,7 @@ func (s *storageRESTServer) RenameDataHandler(w http.ResponseWriter, r *http.Req
dataDir := vars[storageRESTDataDir]
dstVolume := vars[storageRESTDstVolume]
dstFilePath := vars[storageRESTDstPath]
err := s.storage.RenameData(srcVolume, srcFilePath, dataDir, dstVolume, dstFilePath)
err := s.storage.RenameData(r.Context(), srcVolume, srcFilePath, dataDir, dstVolume, dstFilePath)
if err != nil {
s.writeErrorResponse(w, err)
}
@@ -701,7 +701,7 @@ func (s *storageRESTServer) RenameFileHandler(w http.ResponseWriter, r *http.Req
srcFilePath := vars[storageRESTSrcPath]
dstVolume := vars[storageRESTDstVolume]
dstFilePath := vars[storageRESTDstPath]
err := s.storage.RenameFile(srcVolume, srcFilePath, dstVolume, dstFilePath)
err := s.storage.RenameFile(r.Context(), srcVolume, srcFilePath, dstVolume, dstFilePath)
if err != nil {
s.writeErrorResponse(w, err)
}
@@ -809,7 +809,7 @@ func (s *storageRESTServer) VerifyFileHandler(w http.ResponseWriter, r *http.Req
setEventStreamHeaders(w)
encoder := gob.NewEncoder(w)
done := keepHTTPResponseAlive(w)
err = s.storage.VerifyFile(volume, filePath, fi)
err = s.storage.VerifyFile(r.Context(), volume, filePath, fi)
done(nil)
vresp := &VerifyFileResp{}
if err != nil {