move to GET for internal stream READs instead of POST (#20160)

the main reason is to let Go net/http perform necessary
book keeping properly, and in essential from consistency
point of view its GETs all the way.

Deprecate sendFile() as its buggy inside Go runtime.
This commit is contained in:
Harshavardhana 2024-07-26 05:55:01 -07:00 committed by GitHub
parent 15b609ecea
commit 064f36ca5a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 39 additions and 44 deletions

View File

@ -251,7 +251,7 @@ func guessIsRPCReq(req *http.Request) bool {
return true
}
return req.Method == http.MethodPost &&
return (req.Method == http.MethodPost || req.Method == http.MethodGet) &&
strings.HasPrefix(req.URL.Path, minioReservedBucketPath+SlashSeparator)
}

View File

@ -51,9 +51,10 @@ func TestGuessIsRPC(t *testing.T) {
r = &http.Request{
Proto: "HTTP/1.1",
Method: http.MethodGet,
URL: u,
}
if guessIsRPCReq(r) {
t.Fatal("Test shouldn't report as net/rpc for a non net/rpc request.")
if !guessIsRPCReq(r) {
t.Fatal("Test shouldn't fail for a possible net/rpc request.")
}
r = &http.Request{
Proto: "HTTP/1.1",

View File

@ -617,9 +617,11 @@ func (client *storageRESTClient) ReadFileStream(ctx context.Context, volume, pat
values.Set(storageRESTFilePath, path)
values.Set(storageRESTOffset, strconv.Itoa(int(offset)))
values.Set(storageRESTLength, strconv.Itoa(int(length)))
respBody, err := client.call(ctx, storageRESTMethodReadFileStream, values, nil, -1)
values.Set(storageRESTDiskID, *client.diskID.Load())
respBody, err := client.restClient.CallWithHTTPMethod(ctx, http.MethodGet, storageRESTMethodReadFileStream, values, nil, -1)
if err != nil {
return nil, err
return nil, toStorageErr(err)
}
return respBody, nil
}

View File

@ -20,7 +20,7 @@ package cmd
//go:generate msgp -file $GOFILE -unexported
const (
storageRESTVersion = "v59" // Change ReadOptions inclFreeVersions
storageRESTVersion = "v60" // ReadFileStream now uses http.MethodGet
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
storageRESTPrefix = minioReservedBucketPath + "/storage"
)

View File

@ -29,14 +29,12 @@ import (
"net/http"
"os/user"
"path"
"runtime"
"runtime/debug"
"strconv"
"strings"
"sync"
"time"
"github.com/dustin/go-humanize"
"github.com/minio/minio/internal/grid"
"github.com/tinylib/msgp/msgp"
@ -606,8 +604,6 @@ func (s *storageRESTServer) ReadFileStreamHandler(w http.ResponseWriter, r *http
return
}
w.Header().Set(xhttp.ContentLength, strconv.FormatInt(length, 10))
rc, err := s.getStorage().ReadFileStream(r.Context(), volume, filePath, offset, length)
if err != nil {
s.writeErrorResponse(w, err)
@ -615,28 +611,6 @@ func (s *storageRESTServer) ReadFileStreamHandler(w http.ResponseWriter, r *http
}
defer rc.Close()
noReadFrom := runtime.GOOS == "windows" || length < 4*humanize.MiByte
if !noReadFrom {
rf, ok := w.(io.ReaderFrom)
if ok {
// Attempt to use splice/sendfile() optimization, A very specific behavior mentioned below is necessary.
// See https://github.com/golang/go/blob/f7c5cbb82087c55aa82081e931e0142783700ce8/src/net/sendfile_linux.go#L20
// Windows can lock up with this optimization, so we fall back to regular copy.
sr, ok := rc.(*sendFileReader)
if ok {
// Sendfile sends in 4MiB chunks per sendfile syscall which is more than enough
// for most setups.
_, err = rf.ReadFrom(sr.Reader)
if !xnet.IsNetworkOrHostDown(err, true) { // do not need to log disconnected clients
storageLogIf(r.Context(), err)
}
if err == nil || !errors.Is(err, xhttp.ErrNotImplemented) {
return
}
}
}
} // noReadFrom means use io.Copy()
_, err = xioutil.Copy(w, rc)
if !xnet.IsNetworkOrHostDown(err, true) { // do not need to log disconnected clients
storageLogIf(r.Context(), err)
@ -1367,12 +1341,12 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodCreateFile).HandlerFunc(h(server.CreateFileHandler))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadFile).HandlerFunc(h(server.ReadFileHandler))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadFileStream).HandlerFunc(h(server.ReadFileStreamHandler))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersions).HandlerFunc(h(server.DeleteVersionsHandler))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodVerifyFile).HandlerFunc(h(server.VerifyFileHandler))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodStatInfoFile).HandlerFunc(h(server.StatInfoFile))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadMultiple).HandlerFunc(h(server.ReadMultiple))
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodCleanAbandoned).HandlerFunc(h(server.CleanAbandonedDataHandler))
subrouter.Methods(http.MethodGet).Path(storageRESTVersionPrefix + storageRESTMethodReadFileStream).HandlerFunc(h(server.ReadFileStreamHandler))
logger.FatalIf(storageListDirRPC.RegisterNoInput(gm, server.ListDirHandler, endpoint.Path), "unable to register handler")
logger.FatalIf(storageReadAllRPC.Register(gm, server.ReadAllHandler, endpoint.Path), "unable to register handler")
logger.FatalIf(storageWriteAllRPC.Register(gm, server.WriteAllHandler, endpoint.Path), "unable to register handler")

View File

@ -34,8 +34,9 @@ import (
// Block sizes constant.
const (
SmallBlock = 32 * humanize.KiByte // Default r/w block size for smaller objects.
LargeBlock = 1 * humanize.MiByte // Default r/w block size for normal objects.
SmallBlock = 32 * humanize.KiByte // Default r/w block size for smaller objects.
MediumBlock = 128 * humanize.KiByte // Default r/w block size for medium sized objects.
LargeBlock = 1 * humanize.MiByte // Default r/w block size for normal objects.
)
// aligned sync.Pool's
@ -46,6 +47,12 @@ var (
return &b
},
}
ODirectPoolMedium = sync.Pool{
New: func() interface{} {
b := disk.AlignedBlock(MediumBlock)
return &b
},
}
ODirectPoolSmall = sync.Pool{
New: func() interface{} {
b := disk.AlignedBlock(SmallBlock)
@ -294,13 +301,19 @@ func NewSkipReader(r io.Reader, n int64) io.Reader {
return &SkipReader{r, n}
}
// writerOnly hides an io.Writer value's optional ReadFrom method
// from io.Copy.
type writerOnly struct {
io.Writer
}
// Copy is exactly like io.Copy but with reusable buffers.
func Copy(dst io.Writer, src io.Reader) (written int64, err error) {
bufp := ODirectPoolLarge.Get().(*[]byte)
bufp := ODirectPoolMedium.Get().(*[]byte)
defer ODirectPoolMedium.Put(bufp)
buf := *bufp
defer ODirectPoolLarge.Put(bufp)
return io.CopyBuffer(dst, src, buf)
return io.CopyBuffer(writerOnly{dst}, src, buf)
}
// SameFile returns if the files are same.

View File

@ -129,13 +129,13 @@ func removeEmptyPort(host string) string {
}
// Copied from http.NewRequest but implemented to ensure we reuse `url.URL` instance.
func (c *Client) newRequest(ctx context.Context, u url.URL, body io.Reader) (*http.Request, error) {
func (c *Client) newRequest(ctx context.Context, method string, u url.URL, body io.Reader) (*http.Request, error) {
rc, ok := body.(io.ReadCloser)
if !ok && body != nil {
rc = io.NopCloser(body)
}
req := &http.Request{
Method: http.MethodPost,
Method: method,
URL: &u,
Proto: "HTTP/1.1",
ProtoMajor: 1,
@ -290,8 +290,8 @@ func (c *Client) dumpHTTP(req *http.Request, resp *http.Response) {
// ErrClientClosed returned when *Client is closed.
var ErrClientClosed = errors.New("rest client is closed")
// Call - make a REST call with context.
func (c *Client) Call(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (reply io.ReadCloser, err error) {
// CallWithHTTPMethod - make a REST call with context, using a custom HTTP method.
func (c *Client) CallWithHTTPMethod(ctx context.Context, httpMethod, rpcMethod string, values url.Values, body io.Reader, length int64) (reply io.ReadCloser, err error) {
switch atomic.LoadInt32(&c.connected) {
case closed:
// client closed, this is usually a manual process
@ -307,10 +307,10 @@ func (c *Client) Call(ctx context.Context, method string, values url.Values, bod
// Shallow copy. We don't modify the *UserInfo, if set.
// All other fields are copied.
u := *c.url
u.Path = path.Join(u.Path, method)
u.Path = path.Join(u.Path, rpcMethod)
u.RawQuery = values.Encode()
req, err := c.newRequest(ctx, u, body)
req, err := c.newRequest(ctx, httpMethod, u, body)
if err != nil {
return nil, &NetworkError{Err: err}
}
@ -382,6 +382,11 @@ func (c *Client) Call(ctx context.Context, method string, values url.Values, bod
return resp.Body, nil
}
// Call - make a REST call with context.
func (c *Client) Call(ctx context.Context, rpcMethod string, values url.Values, body io.Reader, length int64) (reply io.ReadCloser, err error) {
return c.CallWithHTTPMethod(ctx, http.MethodPost, rpcMethod, values, body, length)
}
// Close closes all idle connections of the underlying http client
func (c *Client) Close() {
atomic.StoreInt32(&c.connected, closed)