Optimize FileInfo(Version) transfer (#10775)

File Info decoding, in particular, is showing up as a major 
allocator and time consumer for internode data transfers

Switch to message pack for cross-server transfers:

```
MSGP:

Size: 945 bytes

BenchmarkEncodeFileInfoMsgp-32    	 1558444	       866 ns/op	   1.16 MB/s	       0 B/op	       0 allocs/op
BenchmarkDecodeFileInfoMsgp-32    	  479968	      2487 ns/op	   0.40 MB/s	     848 B/op	      18 allocs/op

GOB:

Size: 1409 bytes

BenchmarkEncodeFileInfoGOB-32    	  333339	      3237 ns/op	   0.31 MB/s	     576 B/op	      19 allocs/op
BenchmarkDecodeFileInfoGOB-32    	   20869	     57837 ns/op	   0.02 MB/s	   16439 B/op	     428 allocs/op
```
This commit is contained in:
Klaus Post
2020-11-02 17:07:52 -08:00
committed by GitHub
parent 86e0d272f3
commit 37749f4623
8 changed files with 1777 additions and 41 deletions

View File

@@ -33,6 +33,8 @@ import (
"strings"
"time"
"github.com/tinylib/msgp/msgp"
jwtreq "github.com/dgrijalva/jwt-go/request"
"github.com/gorilla/mux"
"github.com/minio/minio/cmd/config"
@@ -308,7 +310,7 @@ func (s *storageRESTServer) DeleteVersionHandler(w http.ResponseWriter, r *http.
}
var fi FileInfo
if err := gob.NewDecoder(r.Body).Decode(&fi); err != nil {
if err := msgp.Decode(r.Body, &fi); err != nil {
s.writeErrorResponse(w, err)
return
}
@@ -339,9 +341,7 @@ func (s *storageRESTServer) ReadVersionHandler(w http.ResponseWriter, r *http.Re
s.writeErrorResponse(w, err)
return
}
gob.NewEncoder(w).Encode(fi)
w.(http.Flusher).Flush()
logger.LogIf(r.Context(), msgp.Encode(w, &fi))
}
// WriteMetadata write new updated metadata.
@@ -359,13 +359,12 @@ func (s *storageRESTServer) WriteMetadataHandler(w http.ResponseWriter, r *http.
}
var fi FileInfo
err := gob.NewDecoder(r.Body).Decode(&fi)
if err != nil {
if err := msgp.Decode(r.Body, &fi); err != nil {
s.writeErrorResponse(w, err)
return
}
err = s.storage.WriteMetadata(r.Context(), volume, filePath, fi)
err := s.storage.WriteMetadata(r.Context(), volume, filePath, fi)
if err != nil {
s.writeErrorResponse(w, err)
}
@@ -411,7 +410,7 @@ func (s *storageRESTServer) CheckPartsHandler(w http.ResponseWriter, r *http.Req
}
var fi FileInfo
if err := gob.NewDecoder(r.Body).Decode(&fi); err != nil {
if err := msgp.Decode(r.Body, &fi); err != nil {
s.writeErrorResponse(w, err)
return
}
@@ -541,16 +540,17 @@ func (s *storageRESTServer) WalkSplunkHandler(w http.ResponseWriter, r *http.Req
markerPath := vars[storageRESTMarkerPath]
setEventStreamHeaders(w)
encoder := gob.NewEncoder(w)
fch, err := s.storage.WalkSplunk(r.Context(), volume, dirPath, markerPath, r.Context().Done())
if err != nil {
s.writeErrorResponse(w, err)
return
}
encoder := msgp.NewWriter(w)
for fi := range fch {
encoder.Encode(&fi)
logger.LogIf(r.Context(), fi.EncodeMsg(encoder))
}
logger.LogIf(r.Context(), encoder.Flush())
}
// WalkVersionsHandler - remote caller to start walking at a requested directory path.
@@ -569,16 +569,17 @@ func (s *storageRESTServer) WalkVersionsHandler(w http.ResponseWriter, r *http.R
}
setEventStreamHeaders(w)
encoder := gob.NewEncoder(w)
fch, err := s.storage.WalkVersions(r.Context(), volume, dirPath, markerPath, recursive, r.Context().Done())
if err != nil {
s.writeErrorResponse(w, err)
return
}
encoder := msgp.NewWriter(w)
for fi := range fch {
logger.LogIf(r.Context(), encoder.Encode(&fi))
logger.LogIf(r.Context(), fi.EncodeMsg(encoder))
}
logger.LogIf(r.Context(), encoder.Flush())
}
// WalkHandler - remote caller to start walking at a requested directory path.
@@ -597,16 +598,17 @@ func (s *storageRESTServer) WalkHandler(w http.ResponseWriter, r *http.Request)
}
setEventStreamHeaders(w)
encoder := gob.NewEncoder(w)
fch, err := s.storage.Walk(r.Context(), volume, dirPath, markerPath, recursive, r.Context().Done())
if err != nil {
s.writeErrorResponse(w, err)
return
}
encoder := msgp.NewWriter(w)
for fi := range fch {
logger.LogIf(r.Context(), encoder.Encode(&fi))
logger.LogIf(r.Context(), fi.EncodeMsg(encoder))
}
logger.LogIf(r.Context(), encoder.Flush())
}
// ListDirHandler - list a directory.
@@ -674,9 +676,10 @@ func (s *storageRESTServer) DeleteVersionsHandler(w http.ResponseWriter, r *http
}
versions := make([]FileInfo, totalVersions)
decoder := gob.NewDecoder(r.Body)
decoder := msgp.NewReader(r.Body)
for i := 0; i < totalVersions; i++ {
if err := decoder.Decode(&versions[i]); err != nil {
dst := &versions[i]
if err := dst.DecodeMsg(decoder); err != nil {
s.writeErrorResponse(w, err)
return
}
@@ -979,8 +982,7 @@ func (s *storageRESTServer) VerifyFileHandler(w http.ResponseWriter, r *http.Req
}
var fi FileInfo
err := gob.NewDecoder(r.Body).Decode(&fi)
if err != nil {
if err := msgp.Decode(r.Body, &fi); err != nil {
s.writeErrorResponse(w, err)
return
}
@@ -988,7 +990,7 @@ func (s *storageRESTServer) VerifyFileHandler(w http.ResponseWriter, r *http.Req
setEventStreamHeaders(w)
encoder := gob.NewEncoder(w)
done := keepHTTPResponseAlive(w)
err = s.storage.VerifyFile(r.Context(), volume, filePath, fi)
err := s.storage.VerifyFile(r.Context(), volume, filePath, fi)
done(nil)
vresp := &VerifyFileResp{}
if err != nil {