mirror of
https://github.com/minio/minio.git
synced 2025-01-11 23:13:23 -05:00
add missing responseBody drain (#12147)
Signed-off-by: Harshavardhana <harsha@minio.io>
This commit is contained in:
parent
d825d92499
commit
d501c5e38b
@ -28,6 +28,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
|
xhttp "github.com/minio/minio/cmd/http"
|
||||||
"github.com/minio/minio/cmd/logger"
|
"github.com/minio/minio/cmd/logger"
|
||||||
xioutil "github.com/minio/minio/pkg/ioutil"
|
xioutil "github.com/minio/minio/pkg/ioutil"
|
||||||
)
|
)
|
||||||
@ -294,6 +295,7 @@ func (client *storageRESTClient) WalkDir(ctx context.Context, opts WalkDirOption
|
|||||||
logger.LogIf(ctx, err)
|
logger.LogIf(ctx, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
defer xhttp.DrainBody(respBody)
|
||||||
return waitForHTTPStream(respBody, wr)
|
return waitForHTTPStream(respBody, wr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -32,7 +32,6 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/minio/minio/cmd/http"
|
|
||||||
xhttp "github.com/minio/minio/cmd/http"
|
xhttp "github.com/minio/minio/cmd/http"
|
||||||
"github.com/minio/minio/cmd/logger"
|
"github.com/minio/minio/cmd/logger"
|
||||||
"github.com/minio/minio/cmd/rest"
|
"github.com/minio/minio/cmd/rest"
|
||||||
@ -208,7 +207,7 @@ func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageC
|
|||||||
pw.CloseWithError(cache.serializeTo(pw))
|
pw.CloseWithError(cache.serializeTo(pw))
|
||||||
}()
|
}()
|
||||||
respBody, err := client.call(ctx, storageRESTMethodNSScanner, url.Values{}, pr, -1)
|
respBody, err := client.call(ctx, storageRESTMethodNSScanner, url.Values{}, pr, -1)
|
||||||
defer http.DrainBody(respBody)
|
defer xhttp.DrainBody(respBody)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
pr.Close()
|
pr.Close()
|
||||||
return cache, err
|
return cache, err
|
||||||
@ -248,7 +247,7 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context) (info DiskInfo, e
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return info, err
|
return info, err
|
||||||
}
|
}
|
||||||
defer http.DrainBody(respBody)
|
defer xhttp.DrainBody(respBody)
|
||||||
if err = msgp.Decode(respBody, &info); err != nil {
|
if err = msgp.Decode(respBody, &info); err != nil {
|
||||||
return info, err
|
return info, err
|
||||||
}
|
}
|
||||||
@ -271,7 +270,7 @@ func (client *storageRESTClient) MakeVolBulk(ctx context.Context, volumes ...str
|
|||||||
values := make(url.Values)
|
values := make(url.Values)
|
||||||
values.Set(storageRESTVolumes, strings.Join(volumes, ","))
|
values.Set(storageRESTVolumes, strings.Join(volumes, ","))
|
||||||
respBody, err := client.call(ctx, storageRESTMethodMakeVolBulk, values, nil, -1)
|
respBody, err := client.call(ctx, storageRESTMethodMakeVolBulk, values, nil, -1)
|
||||||
defer http.DrainBody(respBody)
|
defer xhttp.DrainBody(respBody)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -280,7 +279,7 @@ func (client *storageRESTClient) MakeVol(ctx context.Context, volume string) (er
|
|||||||
values := make(url.Values)
|
values := make(url.Values)
|
||||||
values.Set(storageRESTVolume, volume)
|
values.Set(storageRESTVolume, volume)
|
||||||
respBody, err := client.call(ctx, storageRESTMethodMakeVol, values, nil, -1)
|
respBody, err := client.call(ctx, storageRESTMethodMakeVol, values, nil, -1)
|
||||||
defer http.DrainBody(respBody)
|
defer xhttp.DrainBody(respBody)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -290,7 +289,7 @@ func (client *storageRESTClient) ListVols(ctx context.Context) (vols []VolInfo,
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer http.DrainBody(respBody)
|
defer xhttp.DrainBody(respBody)
|
||||||
vinfos := VolsInfo(vols)
|
vinfos := VolsInfo(vols)
|
||||||
err = msgp.Decode(respBody, &vinfos)
|
err = msgp.Decode(respBody, &vinfos)
|
||||||
return vinfos, err
|
return vinfos, err
|
||||||
@ -304,7 +303,7 @@ func (client *storageRESTClient) StatVol(ctx context.Context, volume string) (vo
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer http.DrainBody(respBody)
|
defer xhttp.DrainBody(respBody)
|
||||||
err = msgp.Decode(respBody, &vol)
|
err = msgp.Decode(respBody, &vol)
|
||||||
return vol, err
|
return vol, err
|
||||||
}
|
}
|
||||||
@ -317,7 +316,7 @@ func (client *storageRESTClient) DeleteVol(ctx context.Context, volume string, f
|
|||||||
values.Set(storageRESTForceDelete, "true")
|
values.Set(storageRESTForceDelete, "true")
|
||||||
}
|
}
|
||||||
respBody, err := client.call(ctx, storageRESTMethodDeleteVol, values, nil, -1)
|
respBody, err := client.call(ctx, storageRESTMethodDeleteVol, values, nil, -1)
|
||||||
defer http.DrainBody(respBody)
|
defer xhttp.DrainBody(respBody)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -328,7 +327,7 @@ func (client *storageRESTClient) AppendFile(ctx context.Context, volume string,
|
|||||||
values.Set(storageRESTFilePath, path)
|
values.Set(storageRESTFilePath, path)
|
||||||
reader := bytes.NewReader(buf)
|
reader := bytes.NewReader(buf)
|
||||||
respBody, err := client.call(ctx, storageRESTMethodAppendFile, values, reader, -1)
|
respBody, err := client.call(ctx, storageRESTMethodAppendFile, values, reader, -1)
|
||||||
defer http.DrainBody(respBody)
|
defer xhttp.DrainBody(respBody)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -338,7 +337,7 @@ func (client *storageRESTClient) CreateFile(ctx context.Context, volume, path st
|
|||||||
values.Set(storageRESTFilePath, path)
|
values.Set(storageRESTFilePath, path)
|
||||||
values.Set(storageRESTLength, strconv.Itoa(int(size)))
|
values.Set(storageRESTLength, strconv.Itoa(int(size)))
|
||||||
respBody, err := client.call(ctx, storageRESTMethodCreateFile, values, ioutil.NopCloser(reader), size)
|
respBody, err := client.call(ctx, storageRESTMethodCreateFile, values, ioutil.NopCloser(reader), size)
|
||||||
defer http.DrainBody(respBody)
|
defer xhttp.DrainBody(respBody)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -353,7 +352,7 @@ func (client *storageRESTClient) WriteMetadata(ctx context.Context, volume, path
|
|||||||
}
|
}
|
||||||
|
|
||||||
respBody, err := client.call(ctx, storageRESTMethodWriteMetadata, values, &reader, -1)
|
respBody, err := client.call(ctx, storageRESTMethodWriteMetadata, values, &reader, -1)
|
||||||
defer http.DrainBody(respBody)
|
defer xhttp.DrainBody(respBody)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -368,7 +367,7 @@ func (client *storageRESTClient) UpdateMetadata(ctx context.Context, volume, pat
|
|||||||
}
|
}
|
||||||
|
|
||||||
respBody, err := client.call(ctx, storageRESTMethodUpdateMetadata, values, &reader, -1)
|
respBody, err := client.call(ctx, storageRESTMethodUpdateMetadata, values, &reader, -1)
|
||||||
defer http.DrainBody(respBody)
|
defer xhttp.DrainBody(respBody)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -384,7 +383,7 @@ func (client *storageRESTClient) DeleteVersion(ctx context.Context, volume, path
|
|||||||
}
|
}
|
||||||
|
|
||||||
respBody, err := client.call(ctx, storageRESTMethodDeleteVersion, values, &buffer, -1)
|
respBody, err := client.call(ctx, storageRESTMethodDeleteVersion, values, &buffer, -1)
|
||||||
defer http.DrainBody(respBody)
|
defer xhttp.DrainBody(respBody)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -394,7 +393,7 @@ func (client *storageRESTClient) WriteAll(ctx context.Context, volume string, pa
|
|||||||
values.Set(storageRESTVolume, volume)
|
values.Set(storageRESTVolume, volume)
|
||||||
values.Set(storageRESTFilePath, path)
|
values.Set(storageRESTFilePath, path)
|
||||||
respBody, err := client.call(ctx, storageRESTMethodWriteAll, values, bytes.NewBuffer(b), int64(len(b)))
|
respBody, err := client.call(ctx, storageRESTMethodWriteAll, values, bytes.NewBuffer(b), int64(len(b)))
|
||||||
defer http.DrainBody(respBody)
|
defer xhttp.DrainBody(respBody)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -404,7 +403,7 @@ func (client *storageRESTClient) CheckFile(ctx context.Context, volume string, p
|
|||||||
values.Set(storageRESTVolume, volume)
|
values.Set(storageRESTVolume, volume)
|
||||||
values.Set(storageRESTFilePath, path)
|
values.Set(storageRESTFilePath, path)
|
||||||
respBody, err := client.call(ctx, storageRESTMethodCheckFile, values, nil, -1)
|
respBody, err := client.call(ctx, storageRESTMethodCheckFile, values, nil, -1)
|
||||||
defer http.DrainBody(respBody)
|
defer xhttp.DrainBody(respBody)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -421,7 +420,7 @@ func (client *storageRESTClient) CheckParts(ctx context.Context, volume string,
|
|||||||
}
|
}
|
||||||
|
|
||||||
respBody, err := client.call(ctx, storageRESTMethodCheckParts, values, &reader, -1)
|
respBody, err := client.call(ctx, storageRESTMethodCheckParts, values, &reader, -1)
|
||||||
defer http.DrainBody(respBody)
|
defer xhttp.DrainBody(respBody)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -439,7 +438,7 @@ func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcP
|
|||||||
}
|
}
|
||||||
|
|
||||||
respBody, err := client.call(ctx, storageRESTMethodRenameData, values, &reader, -1)
|
respBody, err := client.call(ctx, storageRESTMethodRenameData, values, &reader, -1)
|
||||||
defer http.DrainBody(respBody)
|
defer xhttp.DrainBody(respBody)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -470,7 +469,7 @@ func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path,
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fi, err
|
return fi, err
|
||||||
}
|
}
|
||||||
defer http.DrainBody(respBody)
|
defer xhttp.DrainBody(respBody)
|
||||||
|
|
||||||
dec := msgpNewReader(respBody)
|
dec := msgpNewReader(respBody)
|
||||||
defer readMsgpReaderPool.Put(dec)
|
defer readMsgpReaderPool.Put(dec)
|
||||||
@ -488,7 +487,7 @@ func (client *storageRESTClient) ReadAll(ctx context.Context, volume string, pat
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer http.DrainBody(respBody)
|
defer xhttp.DrainBody(respBody)
|
||||||
return ioutil.ReadAll(respBody)
|
return ioutil.ReadAll(respBody)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -524,7 +523,7 @@ func (client *storageRESTClient) ReadFile(ctx context.Context, volume string, pa
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
defer http.DrainBody(respBody)
|
defer xhttp.DrainBody(respBody)
|
||||||
n, err := io.ReadFull(respBody, buf)
|
n, err := io.ReadFull(respBody, buf)
|
||||||
return int64(n), err
|
return int64(n), err
|
||||||
}
|
}
|
||||||
@ -539,7 +538,7 @@ func (client *storageRESTClient) ListDir(ctx context.Context, volume, dirPath st
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer http.DrainBody(respBody)
|
defer xhttp.DrainBody(respBody)
|
||||||
err = gob.NewDecoder(respBody).Decode(&entries)
|
err = gob.NewDecoder(respBody).Decode(&entries)
|
||||||
return entries, err
|
return entries, err
|
||||||
}
|
}
|
||||||
@ -552,7 +551,7 @@ func (client *storageRESTClient) Delete(ctx context.Context, volume string, path
|
|||||||
values.Set(storageRESTRecursive, strconv.FormatBool(recursive))
|
values.Set(storageRESTRecursive, strconv.FormatBool(recursive))
|
||||||
|
|
||||||
respBody, err := client.call(ctx, storageRESTMethodDeleteFile, values, nil, -1)
|
respBody, err := client.call(ctx, storageRESTMethodDeleteFile, values, nil, -1)
|
||||||
defer http.DrainBody(respBody)
|
defer xhttp.DrainBody(respBody)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -576,7 +575,7 @@ func (client *storageRESTClient) DeleteVersions(ctx context.Context, volume stri
|
|||||||
errs = make([]error, len(versions))
|
errs = make([]error, len(versions))
|
||||||
|
|
||||||
respBody, err := client.call(ctx, storageRESTMethodDeleteVersions, values, &buffer, -1)
|
respBody, err := client.call(ctx, storageRESTMethodDeleteVersions, values, &buffer, -1)
|
||||||
defer http.DrainBody(respBody)
|
defer xhttp.DrainBody(respBody)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
for i := range errs {
|
for i := range errs {
|
||||||
errs[i] = err
|
errs[i] = err
|
||||||
@ -615,7 +614,7 @@ func (client *storageRESTClient) RenameFile(ctx context.Context, srcVolume, srcP
|
|||||||
values.Set(storageRESTDstVolume, dstVolume)
|
values.Set(storageRESTDstVolume, dstVolume)
|
||||||
values.Set(storageRESTDstPath, dstPath)
|
values.Set(storageRESTDstPath, dstPath)
|
||||||
respBody, err := client.call(ctx, storageRESTMethodRenameFile, values, nil, -1)
|
respBody, err := client.call(ctx, storageRESTMethodRenameFile, values, nil, -1)
|
||||||
defer http.DrainBody(respBody)
|
defer xhttp.DrainBody(respBody)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -630,7 +629,7 @@ func (client *storageRESTClient) VerifyFile(ctx context.Context, volume, path st
|
|||||||
}
|
}
|
||||||
|
|
||||||
respBody, err := client.call(ctx, storageRESTMethodVerifyFile, values, &reader, -1)
|
respBody, err := client.call(ctx, storageRESTMethodVerifyFile, values, &reader, -1)
|
||||||
defer http.DrainBody(respBody)
|
defer xhttp.DrainBody(respBody)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -771,23 +771,6 @@ func waitForHTTPResponse(respBody io.Reader) (io.Reader, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// drainCloser can be used for wrapping an http response.
|
|
||||||
// It will drain the body before closing.
|
|
||||||
type drainCloser struct {
|
|
||||||
rc io.ReadCloser
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read forwards the read operation.
|
|
||||||
func (f drainCloser) Read(p []byte) (n int, err error) {
|
|
||||||
return f.rc.Read(p)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close drains the body and closes the upstream.
|
|
||||||
func (f drainCloser) Close() error {
|
|
||||||
xhttp.DrainBody(f.rc)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// httpStreamResponse allows streaming a response, but still send an error.
|
// httpStreamResponse allows streaming a response, but still send an error.
|
||||||
type httpStreamResponse struct {
|
type httpStreamResponse struct {
|
||||||
done chan error
|
done chan error
|
||||||
@ -879,7 +862,6 @@ func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error {
|
|||||||
case 0:
|
case 0:
|
||||||
// 0 is unbuffered, copy the rest.
|
// 0 is unbuffered, copy the rest.
|
||||||
_, err := io.Copy(w, respBody)
|
_, err := io.Copy(w, respBody)
|
||||||
respBody.Close()
|
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -889,18 +871,7 @@ func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
respBody.Close()
|
|
||||||
return errors.New(string(errorText))
|
return errors.New(string(errorText))
|
||||||
case 3:
|
|
||||||
// gob style is already deprecated, we can remove this when
|
|
||||||
// storage API version will be greater or equal to 23.
|
|
||||||
defer respBody.Close()
|
|
||||||
dec := gob.NewDecoder(respBody)
|
|
||||||
var err error
|
|
||||||
if de := dec.Decode(&err); de == nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return errors.New("rpc error")
|
|
||||||
case 2:
|
case 2:
|
||||||
// Block of data
|
// Block of data
|
||||||
var tmp [4]byte
|
var tmp [4]byte
|
||||||
@ -917,7 +888,6 @@ func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error {
|
|||||||
case 32:
|
case 32:
|
||||||
continue
|
continue
|
||||||
default:
|
default:
|
||||||
go xhttp.DrainBody(respBody)
|
|
||||||
return fmt.Errorf("unexpected filler byte: %d", tmp[0])
|
return fmt.Errorf("unexpected filler byte: %d", tmp[0])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user