mirror of https://github.com/minio/minio.git
handler: Add a waitgroup to avoid expect100Continue crash. (#1623)
This waitgroup allows for safe blocking operation where we can cleanly control the flow of the writes and the underlying pipe altogether. Fixes #1553
This commit is contained in:
parent
5b29cefd40
commit
498ce1e9bb
|
@ -28,6 +28,7 @@ import (
|
|||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
mux "github.com/gorilla/mux"
|
||||
|
@ -570,14 +571,20 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
|
|||
case authTypePresigned, authTypeSigned:
|
||||
// Initialize a pipe for data pipe line.
|
||||
reader, writer := io.Pipe()
|
||||
|
||||
var wg = &sync.WaitGroup{}
|
||||
// Start writing in a routine.
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
shaWriter := sha256.New()
|
||||
multiWriter := io.MultiWriter(shaWriter, writer)
|
||||
if _, cerr := io.CopyN(multiWriter, r.Body, size); cerr != nil {
|
||||
errorIf(cerr, "Unable to read HTTP body.", nil)
|
||||
writer.CloseWithError(err)
|
||||
if _, wErr := io.CopyN(multiWriter, r.Body, size); wErr != nil {
|
||||
// Pipe closed.
|
||||
if wErr == io.ErrClosedPipe {
|
||||
return
|
||||
}
|
||||
errorIf(wErr, "Unable to read HTTP body.", nil)
|
||||
writer.CloseWithError(wErr)
|
||||
return
|
||||
}
|
||||
shaPayload := shaWriter.Sum(nil)
|
||||
|
@ -588,15 +595,16 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
|
|||
} else if isRequestPresignedSignatureV4(r) {
|
||||
s3Error = doesPresignedSignatureMatch(hex.EncodeToString(shaPayload), r, validateRegion)
|
||||
}
|
||||
var sErr error
|
||||
if s3Error != ErrNone {
|
||||
if s3Error == ErrSignatureDoesNotMatch {
|
||||
writer.CloseWithError(errSignatureMismatch)
|
||||
return
|
||||
sErr = errSignatureMismatch
|
||||
} else {
|
||||
sErr = fmt.Errorf("%v", getAPIError(s3Error))
|
||||
}
|
||||
writer.CloseWithError(fmt.Errorf("%v", getAPIError(s3Error)))
|
||||
writer.CloseWithError(sErr)
|
||||
return
|
||||
}
|
||||
// Close the writer.
|
||||
writer.Close()
|
||||
}()
|
||||
|
||||
|
@ -606,6 +614,10 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
|
|||
metadata["md5Sum"] = hex.EncodeToString(md5Bytes)
|
||||
// Create object.
|
||||
md5Sum, err = api.ObjectAPI.PutObject(bucket, object, size, reader, metadata)
|
||||
// Close the pipe.
|
||||
reader.Close()
|
||||
// Wait for all the routines to finish.
|
||||
wg.Wait()
|
||||
}
|
||||
if err != nil {
|
||||
errorIf(err, "PutObject failed.", nil)
|
||||
|
@ -710,22 +722,29 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
|
|||
}
|
||||
// No need to verify signature, anonymous request access is
|
||||
// already allowed.
|
||||
partMD5, err = api.ObjectAPI.PutObjectPart(bucket, object, uploadID, partID, size, r.Body, hex.EncodeToString(md5Bytes))
|
||||
hexMD5 := hex.EncodeToString(md5Bytes)
|
||||
partMD5, err = api.ObjectAPI.PutObjectPart(bucket, object, uploadID, partID, size, r.Body, hexMD5)
|
||||
case authTypePresigned, authTypeSigned:
|
||||
validateRegion := true // Validate region.
|
||||
// Initialize a pipe for data pipe line.
|
||||
reader, writer := io.Pipe()
|
||||
|
||||
var wg = &sync.WaitGroup{}
|
||||
// Start writing in a routine.
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
shaWriter := sha256.New()
|
||||
multiWriter := io.MultiWriter(shaWriter, writer)
|
||||
if _, err = io.CopyN(multiWriter, r.Body, size); err != nil {
|
||||
errorIf(err, "Unable to read HTTP body.", nil)
|
||||
writer.CloseWithError(err)
|
||||
if _, wErr := io.CopyN(multiWriter, r.Body, size); wErr != nil {
|
||||
// Pipe closed, just ignore it.
|
||||
if wErr == io.ErrClosedPipe {
|
||||
return
|
||||
}
|
||||
errorIf(wErr, "Unable to read HTTP body.", nil)
|
||||
writer.CloseWithError(wErr)
|
||||
return
|
||||
}
|
||||
shaPayload := shaWriter.Sum(nil)
|
||||
validateRegion := true // Validate region.
|
||||
var s3Error APIErrorCode
|
||||
if isRequestSignatureV4(r) {
|
||||
s3Error = doesSignatureMatch(hex.EncodeToString(shaPayload), r, validateRegion)
|
||||
|
@ -734,10 +753,11 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
|
|||
}
|
||||
if s3Error != ErrNone {
|
||||
if s3Error == ErrSignatureDoesNotMatch {
|
||||
writer.CloseWithError(errSignatureMismatch)
|
||||
return
|
||||
err = errSignatureMismatch
|
||||
} else {
|
||||
err = fmt.Errorf("%v", getAPIError(s3Error))
|
||||
}
|
||||
writer.CloseWithError(fmt.Errorf("%v", getAPIError(s3Error)))
|
||||
writer.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
// Close the writer.
|
||||
|
@ -745,6 +765,10 @@ func (api objectAPIHandlers) PutObjectPartHandler(w http.ResponseWriter, r *http
|
|||
}()
|
||||
md5SumHex := hex.EncodeToString(md5Bytes)
|
||||
partMD5, err = api.ObjectAPI.PutObjectPart(bucket, object, uploadID, partID, size, reader, md5SumHex)
|
||||
// Close the pipe.
|
||||
reader.Close()
|
||||
// Wait for all the routines to finish.
|
||||
wg.Wait()
|
||||
}
|
||||
if err != nil {
|
||||
errorIf(err, "PutObjectPart failed.", nil)
|
||||
|
|
|
@ -376,7 +376,7 @@ func (s *MyAPISuite) TestDeleteObject(c *C) {
|
|||
c.Assert(err, IsNil)
|
||||
c.Assert(response.StatusCode, Equals, http.StatusNoContent)
|
||||
|
||||
// Delete non existant object should return http.StatusNoContent.
|
||||
// Delete non existent object should return http.StatusNoContent.
|
||||
request, err = s.newRequest("DELETE", testAPIFSCacheServer.URL+"/deletebucketobject/myobject1", 0, nil)
|
||||
c.Assert(err, IsNil)
|
||||
client = http.Client{}
|
||||
|
@ -385,8 +385,8 @@ func (s *MyAPISuite) TestDeleteObject(c *C) {
|
|||
c.Assert(response.StatusCode, Equals, http.StatusNoContent)
|
||||
}
|
||||
|
||||
func (s *MyAPISuite) TestNonExistantBucket(c *C) {
|
||||
request, err := s.newRequest("HEAD", testAPIFSCacheServer.URL+"/nonexistantbucket", 0, nil)
|
||||
func (s *MyAPISuite) TestNonExistentBucket(c *C) {
|
||||
request, err := s.newRequest("HEAD", testAPIFSCacheServer.URL+"/nonexistentbucket", 0, nil)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
client := http.Client{}
|
||||
|
@ -686,9 +686,9 @@ func (s *MyAPISuite) TestListBuckets(c *C) {
|
|||
c.Assert(err, IsNil)
|
||||
}
|
||||
|
||||
func (s *MyAPISuite) TestNotBeAbleToCreateObjectInNonexistantBucket(c *C) {
|
||||
func (s *MyAPISuite) TestNotBeAbleToCreateObjectInNonexistentBucket(c *C) {
|
||||
buffer1 := bytes.NewReader([]byte("hello world"))
|
||||
request, err := s.newRequest("PUT", testAPIFSCacheServer.URL+"/innonexistantbucket/object", int64(buffer1.Len()), buffer1)
|
||||
request, err := s.newRequest("PUT", testAPIFSCacheServer.URL+"/innonexistentbucket/object", int64(buffer1.Len()), buffer1)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
client := http.Client{}
|
||||
|
|
|
@ -393,8 +393,8 @@ func (s *MyAPIXLSuite) TestDeleteObject(c *C) {
|
|||
c.Assert(response.StatusCode, Equals, http.StatusNoContent)
|
||||
}
|
||||
|
||||
func (s *MyAPIXLSuite) TestNonExistantBucket(c *C) {
|
||||
request, err := s.newRequest("HEAD", testAPIXLServer.URL+"/nonexistantbucket", 0, nil)
|
||||
func (s *MyAPIXLSuite) TestNonExistentBucket(c *C) {
|
||||
request, err := s.newRequest("HEAD", testAPIXLServer.URL+"/nonexistentbucket", 0, nil)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
client := http.Client{}
|
||||
|
@ -713,9 +713,9 @@ func (s *MyAPIXLSuite) TestListBuckets(c *C) {
|
|||
c.Assert(err, IsNil)
|
||||
}
|
||||
|
||||
func (s *MyAPIXLSuite) TestNotBeAbleToCreateObjectInNonexistantBucket(c *C) {
|
||||
func (s *MyAPIXLSuite) TestNotBeAbleToCreateObjectInNonexistentBucket(c *C) {
|
||||
buffer1 := bytes.NewReader([]byte("hello world"))
|
||||
request, err := s.newRequest("PUT", testAPIXLServer.URL+"/innonexistantbucket/object", int64(buffer1.Len()), buffer1)
|
||||
request, err := s.newRequest("PUT", testAPIXLServer.URL+"/innonexistentbucket/object", int64(buffer1.Len()), buffer1)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
client := http.Client{}
|
||||
|
|
Loading…
Reference in New Issue