diff --git a/cmd/metacache-walk.go b/cmd/metacache-walk.go index bf0be256a..9af4814cf 100644 --- a/cmd/metacache-walk.go +++ b/cmd/metacache-walk.go @@ -28,6 +28,7 @@ import ( "strings" "github.com/gorilla/mux" + xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" xioutil "github.com/minio/minio/pkg/ioutil" ) @@ -294,6 +295,7 @@ func (client *storageRESTClient) WalkDir(ctx context.Context, opts WalkDirOption logger.LogIf(ctx, err) return err } + defer xhttp.DrainBody(respBody) return waitForHTTPStream(respBody, wr) } diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 52ad9c643..e6ff6d641 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -32,7 +32,6 @@ import ( "sync" "time" - "github.com/minio/minio/cmd/http" xhttp "github.com/minio/minio/cmd/http" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/cmd/rest" @@ -208,7 +207,7 @@ func (client *storageRESTClient) NSScanner(ctx context.Context, cache dataUsageC pw.CloseWithError(cache.serializeTo(pw)) }() respBody, err := client.call(ctx, storageRESTMethodNSScanner, url.Values{}, pr, -1) - defer http.DrainBody(respBody) + defer xhttp.DrainBody(respBody) if err != nil { pr.Close() return cache, err @@ -248,7 +247,7 @@ func (client *storageRESTClient) DiskInfo(ctx context.Context) (info DiskInfo, e if err != nil { return info, err } - defer http.DrainBody(respBody) + defer xhttp.DrainBody(respBody) if err = msgp.Decode(respBody, &info); err != nil { return info, err } @@ -271,7 +270,7 @@ func (client *storageRESTClient) MakeVolBulk(ctx context.Context, volumes ...str values := make(url.Values) values.Set(storageRESTVolumes, strings.Join(volumes, ",")) respBody, err := client.call(ctx, storageRESTMethodMakeVolBulk, values, nil, -1) - defer http.DrainBody(respBody) + defer xhttp.DrainBody(respBody) return err } @@ -280,7 +279,7 @@ func (client *storageRESTClient) MakeVol(ctx context.Context, volume string) (er values := make(url.Values) values.Set(storageRESTVolume, volume) respBody, err := client.call(ctx, storageRESTMethodMakeVol, values, nil, -1) - defer http.DrainBody(respBody) + defer xhttp.DrainBody(respBody) return err } @@ -290,7 +289,7 @@ func (client *storageRESTClient) ListVols(ctx context.Context) (vols []VolInfo, if err != nil { return } - defer http.DrainBody(respBody) + defer xhttp.DrainBody(respBody) vinfos := VolsInfo(vols) err = msgp.Decode(respBody, &vinfos) return vinfos, err @@ -304,7 +303,7 @@ func (client *storageRESTClient) StatVol(ctx context.Context, volume string) (vo if err != nil { return } - defer http.DrainBody(respBody) + defer xhttp.DrainBody(respBody) err = msgp.Decode(respBody, &vol) return vol, err } @@ -317,7 +316,7 @@ func (client *storageRESTClient) DeleteVol(ctx context.Context, volume string, f values.Set(storageRESTForceDelete, "true") } respBody, err := client.call(ctx, storageRESTMethodDeleteVol, values, nil, -1) - defer http.DrainBody(respBody) + defer xhttp.DrainBody(respBody) return err } @@ -328,7 +327,7 @@ func (client *storageRESTClient) AppendFile(ctx context.Context, volume string, values.Set(storageRESTFilePath, path) reader := bytes.NewReader(buf) respBody, err := client.call(ctx, storageRESTMethodAppendFile, values, reader, -1) - defer http.DrainBody(respBody) + defer xhttp.DrainBody(respBody) return err } @@ -338,7 +337,7 @@ func (client *storageRESTClient) CreateFile(ctx context.Context, volume, path st values.Set(storageRESTFilePath, path) values.Set(storageRESTLength, strconv.Itoa(int(size))) respBody, err := client.call(ctx, storageRESTMethodCreateFile, values, ioutil.NopCloser(reader), size) - defer http.DrainBody(respBody) + defer xhttp.DrainBody(respBody) return err } @@ -353,7 +352,7 @@ func (client *storageRESTClient) WriteMetadata(ctx context.Context, volume, path } respBody, err := client.call(ctx, storageRESTMethodWriteMetadata, values, &reader, -1) - defer http.DrainBody(respBody) + defer xhttp.DrainBody(respBody) return err } @@ -368,7 +367,7 @@ func (client *storageRESTClient) UpdateMetadata(ctx context.Context, volume, pat } respBody, err := client.call(ctx, storageRESTMethodUpdateMetadata, values, &reader, -1) - defer http.DrainBody(respBody) + defer xhttp.DrainBody(respBody) return err } @@ -384,7 +383,7 @@ func (client *storageRESTClient) DeleteVersion(ctx context.Context, volume, path } respBody, err := client.call(ctx, storageRESTMethodDeleteVersion, values, &buffer, -1) - defer http.DrainBody(respBody) + defer xhttp.DrainBody(respBody) return err } @@ -394,7 +393,7 @@ func (client *storageRESTClient) WriteAll(ctx context.Context, volume string, pa values.Set(storageRESTVolume, volume) values.Set(storageRESTFilePath, path) respBody, err := client.call(ctx, storageRESTMethodWriteAll, values, bytes.NewBuffer(b), int64(len(b))) - defer http.DrainBody(respBody) + defer xhttp.DrainBody(respBody) return err } @@ -404,7 +403,7 @@ func (client *storageRESTClient) CheckFile(ctx context.Context, volume string, p values.Set(storageRESTVolume, volume) values.Set(storageRESTFilePath, path) respBody, err := client.call(ctx, storageRESTMethodCheckFile, values, nil, -1) - defer http.DrainBody(respBody) + defer xhttp.DrainBody(respBody) return err } @@ -421,7 +420,7 @@ func (client *storageRESTClient) CheckParts(ctx context.Context, volume string, } respBody, err := client.call(ctx, storageRESTMethodCheckParts, values, &reader, -1) - defer http.DrainBody(respBody) + defer xhttp.DrainBody(respBody) return err } @@ -439,7 +438,7 @@ func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcP } respBody, err := client.call(ctx, storageRESTMethodRenameData, values, &reader, -1) - defer http.DrainBody(respBody) + defer xhttp.DrainBody(respBody) return err } @@ -470,7 +469,7 @@ func (client *storageRESTClient) ReadVersion(ctx context.Context, volume, path, if err != nil { return fi, err } - defer http.DrainBody(respBody) + defer xhttp.DrainBody(respBody) dec := msgpNewReader(respBody) defer readMsgpReaderPool.Put(dec) @@ -488,7 +487,7 @@ func (client *storageRESTClient) ReadAll(ctx context.Context, volume string, pat if err != nil { return nil, err } - defer http.DrainBody(respBody) + defer xhttp.DrainBody(respBody) return ioutil.ReadAll(respBody) } @@ -524,7 +523,7 @@ func (client *storageRESTClient) ReadFile(ctx context.Context, volume string, pa if err != nil { return 0, err } - defer http.DrainBody(respBody) + defer xhttp.DrainBody(respBody) n, err := io.ReadFull(respBody, buf) return int64(n), err } @@ -539,7 +538,7 @@ func (client *storageRESTClient) ListDir(ctx context.Context, volume, dirPath st if err != nil { return nil, err } - defer http.DrainBody(respBody) + defer xhttp.DrainBody(respBody) err = gob.NewDecoder(respBody).Decode(&entries) return entries, err } @@ -552,7 +551,7 @@ func (client *storageRESTClient) Delete(ctx context.Context, volume string, path values.Set(storageRESTRecursive, strconv.FormatBool(recursive)) respBody, err := client.call(ctx, storageRESTMethodDeleteFile, values, nil, -1) - defer http.DrainBody(respBody) + defer xhttp.DrainBody(respBody) return err } @@ -576,7 +575,7 @@ func (client *storageRESTClient) DeleteVersions(ctx context.Context, volume stri errs = make([]error, len(versions)) respBody, err := client.call(ctx, storageRESTMethodDeleteVersions, values, &buffer, -1) - defer http.DrainBody(respBody) + defer xhttp.DrainBody(respBody) if err != nil { for i := range errs { errs[i] = err @@ -615,7 +614,7 @@ func (client *storageRESTClient) RenameFile(ctx context.Context, srcVolume, srcP values.Set(storageRESTDstVolume, dstVolume) values.Set(storageRESTDstPath, dstPath) respBody, err := client.call(ctx, storageRESTMethodRenameFile, values, nil, -1) - defer http.DrainBody(respBody) + defer xhttp.DrainBody(respBody) return err } @@ -630,7 +629,7 @@ func (client *storageRESTClient) VerifyFile(ctx context.Context, volume, path st } respBody, err := client.call(ctx, storageRESTMethodVerifyFile, values, &reader, -1) - defer http.DrainBody(respBody) + defer xhttp.DrainBody(respBody) if err != nil { return err } diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 1d96fe4fa..a423d8623 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -771,23 +771,6 @@ func waitForHTTPResponse(respBody io.Reader) (io.Reader, error) { } } -// drainCloser can be used for wrapping an http response. -// It will drain the body before closing. -type drainCloser struct { - rc io.ReadCloser -} - -// Read forwards the read operation. -func (f drainCloser) Read(p []byte) (n int, err error) { - return f.rc.Read(p) -} - -// Close drains the body and closes the upstream. -func (f drainCloser) Close() error { - xhttp.DrainBody(f.rc) - return nil -} - // httpStreamResponse allows streaming a response, but still send an error. type httpStreamResponse struct { done chan error @@ -879,7 +862,6 @@ func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error { case 0: // 0 is unbuffered, copy the rest. _, err := io.Copy(w, respBody) - respBody.Close() if err == io.EOF { return nil } @@ -889,18 +871,7 @@ func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error { if err != nil { return err } - respBody.Close() return errors.New(string(errorText)) - case 3: - // gob style is already deprecated, we can remove this when - // storage API version will be greater or equal to 23. - defer respBody.Close() - dec := gob.NewDecoder(respBody) - var err error - if de := dec.Decode(&err); de == nil { - return err - } - return errors.New("rpc error") case 2: // Block of data var tmp [4]byte @@ -917,7 +888,6 @@ func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error { case 32: continue default: - go xhttp.DrainBody(respBody) return fmt.Errorf("unexpected filler byte: %d", tmp[0]) } }