Added AnonListObjectsV2 support to GCS (#4584)

This commit is contained in:
Nitish Tiwari 2017-06-23 17:35:45 -07:00 committed by Harshavardhana
parent 8b7df7da37
commit 15b65a8342
17 changed files with 246 additions and 254 deletions

View File

@ -111,9 +111,15 @@ func (l *gcsGateway) AnonListObjects(bucket string, prefix string, marker string
return fromMinioClientListBucketResult(bucket, result), nil return fromMinioClientListBucketResult(bucket, result), nil
} }
// AnonListObjectsV2 - List objects anonymously v2 // AnonListObjectsV2 - List objects in V2 mode, anonymously
func (l *gcsGateway) AnonListObjectsV2(bucket, prefix, continuationToken string, fetchOwner bool, delimiter string, maxKeys int) (loi ListObjectsV2Info, e error) { func (l *gcsGateway) AnonListObjectsV2(bucket, prefix, continuationToken string, fetchOwner bool, delimiter string, maxKeys int) (ListObjectsV2Info, error) {
return loi, NotImplemented{} // Request V1 List Object to the backend
result, err := l.anonClient.ListObjects(bucket, prefix, continuationToken, delimiter, maxKeys)
if err != nil {
return ListObjectsV2Info{}, s3ToObjectError(traceError(err), bucket)
}
// translate V1 Result to V2Info
return fromMinioClientListBucketResultToV2Info(bucket, result), nil
} }
// AnonGetBucketInfo - Get bucket metadata anonymously. // AnonGetBucketInfo - Get bucket metadata anonymously.

View File

@ -483,6 +483,29 @@ func (l *gcsGateway) GetObject(bucket string, key string, startOffset int64, len
return nil return nil
} }
// fromMinioClientListBucketResultToV2Info converts minio ListBucketResult to ListObjectsV2Info
func fromMinioClientListBucketResultToV2Info(bucket string, result minio.ListBucketResult) ListObjectsV2Info {
objects := make([]ObjectInfo, len(result.Contents))
for i, oi := range result.Contents {
objects[i] = fromMinioClientObjectInfo(bucket, oi)
}
prefixes := make([]string, len(result.CommonPrefixes))
for i, p := range result.CommonPrefixes {
prefixes[i] = p.Prefix
}
return ListObjectsV2Info{
IsTruncated: result.IsTruncated,
Prefixes: prefixes,
Objects: objects,
ContinuationToken: result.Marker,
NextContinuationToken: result.NextMarker,
}
}
// fromGCSAttrsToObjectInfo converts GCS BucketAttrs to gateway ObjectInfo // fromGCSAttrsToObjectInfo converts GCS BucketAttrs to gateway ObjectInfo
func fromGCSAttrsToObjectInfo(attrs *storage.ObjectAttrs) ObjectInfo { func fromGCSAttrsToObjectInfo(attrs *storage.ObjectAttrs) ObjectInfo {
// All google cloud storage objects have a CRC32c hash, whereas composite objects may not have a MD5 hash // All google cloud storage objects have a CRC32c hash, whereas composite objects may not have a MD5 hash

View File

@ -16,7 +16,12 @@
package cmd package cmd
import "testing" import (
"reflect"
"testing"
minio "github.com/minio/minio-go"
)
func TestToGCSPageToken(t *testing.T) { func TestToGCSPageToken(t *testing.T) {
testCases := []struct { testCases := []struct {
@ -181,3 +186,26 @@ func TestGCSMultipartDataName(t *testing.T) {
t.Errorf("expected: %s, got: %s", expected, got) t.Errorf("expected: %s, got: %s", expected, got)
} }
} }
func TestFromMinioClientListBucketResultToV2Info(t *testing.T) {
listBucketResult := minio.ListBucketResult{
IsTruncated: false,
Marker: "testMarker",
NextMarker: "testMarker2",
CommonPrefixes: []minio.CommonPrefix{{Prefix: "one"}, {Prefix: "two"}},
Contents: []minio.ObjectInfo{{Key: "testobj", ContentType: ""}},
}
listBucketV2Info := ListObjectsV2Info{
Prefixes: []string{"one", "two"},
Objects: []ObjectInfo{{Name: "testobj", Bucket: "testbucket", UserDefined: map[string]string{"Content-Type": ""}}},
IsTruncated: false,
ContinuationToken: "testMarker",
NextContinuationToken: "testMarker2",
}
if got := fromMinioClientListBucketResultToV2Info("testbucket", listBucketResult); !reflect.DeepEqual(got, listBucketV2Info) {
t.Errorf("fromMinioClientListBucketResultToV2Info() = %v, want %v", got, listBucketV2Info)
}
}

View File

@ -29,7 +29,7 @@ import (
"github.com/minio/minio-go/pkg/s3utils" "github.com/minio/minio-go/pkg/s3utils"
) )
// GetEncryptedObject deciphers and streams data stored in the server after applying a specifed encryption materials, // GetEncryptedObject deciphers and streams data stored in the server after applying a specified encryption materials,
// returned stream should be closed by the caller. // returned stream should be closed by the caller.
func (c Client) GetEncryptedObject(bucketName, objectName string, encryptMaterials encrypt.Materials) (io.ReadCloser, error) { func (c Client) GetEncryptedObject(bucketName, objectName string, encryptMaterials encrypt.Materials) (io.ReadCloser, error) {
if encryptMaterials == nil { if encryptMaterials == nil {

View File

@ -17,6 +17,7 @@
package minio package minio
import ( import (
"errors"
"fmt" "fmt"
"net/http" "net/http"
"net/url" "net/url"
@ -86,8 +87,10 @@ func (c Client) ListObjectsV2(bucketName, objectPrefix string, recursive bool, d
// If recursive we do not delimit. // If recursive we do not delimit.
delimiter = "" delimiter = ""
} }
// Return object owner information by default // Return object owner information by default
fetchOwner := true fetchOwner := true
// Validate bucket name. // Validate bucket name.
if err := s3utils.CheckValidBucketName(bucketName); err != nil { if err := s3utils.CheckValidBucketName(bucketName); err != nil {
defer close(objectStatCh) defer close(objectStatCh)
@ -96,6 +99,7 @@ func (c Client) ListObjectsV2(bucketName, objectPrefix string, recursive bool, d
} }
return objectStatCh return objectStatCh
} }
// Validate incoming object prefix. // Validate incoming object prefix.
if err := s3utils.CheckValidObjectNamePrefix(objectPrefix); err != nil { if err := s3utils.CheckValidObjectNamePrefix(objectPrefix); err != nil {
defer close(objectStatCh) defer close(objectStatCh)
@ -122,7 +126,6 @@ func (c Client) ListObjectsV2(bucketName, objectPrefix string, recursive bool, d
// If contents are available loop through and send over channel. // If contents are available loop through and send over channel.
for _, object := range result.Contents { for _, object := range result.Contents {
// Save the marker.
select { select {
// Send object content. // Send object content.
case objectStatCh <- object: case objectStatCh <- object:
@ -135,12 +138,12 @@ func (c Client) ListObjectsV2(bucketName, objectPrefix string, recursive bool, d
// Send all common prefixes if any. // Send all common prefixes if any.
// NOTE: prefixes are only present if the request is delimited. // NOTE: prefixes are only present if the request is delimited.
for _, obj := range result.CommonPrefixes { for _, obj := range result.CommonPrefixes {
object := ObjectInfo{}
object.Key = obj.Prefix
object.Size = 0
select { select {
// Send object prefixes. // Send object prefixes.
case objectStatCh <- object: case objectStatCh <- ObjectInfo{
Key: obj.Prefix,
Size: 0,
}:
// If receives done from the caller, return here. // If receives done from the caller, return here.
case <-doneCh: case <-doneCh:
return return
@ -229,10 +232,17 @@ func (c Client) listObjectsV2Query(bucketName, objectPrefix, continuationToken s
// Decode listBuckets XML. // Decode listBuckets XML.
listBucketResult := ListBucketV2Result{} listBucketResult := ListBucketV2Result{}
err = xmlDecoder(resp.Body, &listBucketResult) if err = xmlDecoder(resp.Body, &listBucketResult); err != nil {
if err != nil {
return listBucketResult, err return listBucketResult, err
} }
// This is an additional verification check to make
// sure proper responses are received.
if listBucketResult.IsTruncated && listBucketResult.NextContinuationToken == "" {
return listBucketResult, errors.New("Truncated response should have continuation token set")
}
// Success.
return listBucketResult, nil return listBucketResult, nil
} }

View File

@ -155,7 +155,7 @@ func (c Client) ListenBucketNotification(bucketName, prefix, suffix string, even
return return
} }
// Continously run and listen on bucket notification. // Continuously run and listen on bucket notification.
// Create a done channel to control 'ListObjects' go routine. // Create a done channel to control 'ListObjects' go routine.
retryDoneCh := make(chan struct{}, 1) retryDoneCh := make(chan struct{}, 1)

View File

@ -45,23 +45,6 @@ func isReadAt(reader io.Reader) (ok bool) {
return return
} }
// shouldUploadPart - verify if part should be uploaded.
func shouldUploadPart(objPart ObjectPart, uploadReq uploadPartReq) bool {
// If part not found should upload the part.
if uploadReq.Part == nil {
return true
}
// if size mismatches should upload the part.
if objPart.Size != uploadReq.Part.Size {
return true
}
// if md5sum mismatches should upload the part.
if objPart.ETag != uploadReq.Part.ETag {
return true
}
return false
}
// optimalPartInfo - calculate the optimal part info for a given // optimalPartInfo - calculate the optimal part info for a given
// object size. // object size.
// //
@ -185,49 +168,6 @@ func (c Client) newUploadID(bucketName, objectName string, metaData map[string][
return initMultipartUploadResult.UploadID, nil return initMultipartUploadResult.UploadID, nil
} }
// getMpartUploadSession returns the upload id and the uploaded parts to continue a previous upload session
// or initiate a new multipart session if no current one found
func (c Client) getMpartUploadSession(bucketName, objectName string, metaData map[string][]string) (string, map[int]ObjectPart, error) {
// A map of all uploaded parts.
var partsInfo map[int]ObjectPart
var err error
uploadID, err := c.findUploadID(bucketName, objectName)
if err != nil {
return "", nil, err
}
if uploadID == "" {
// Initiates a new multipart request
uploadID, err = c.newUploadID(bucketName, objectName, metaData)
if err != nil {
return "", nil, err
}
} else {
// Fetch previously upload parts and maximum part size.
partsInfo, err = c.listObjectParts(bucketName, objectName, uploadID)
if err != nil {
// When the server returns NoSuchUpload even if its previouls acknowleged the existance of the upload id,
// initiate a new multipart upload
if respErr, ok := err.(ErrorResponse); ok && respErr.Code == "NoSuchUpload" {
uploadID, err = c.newUploadID(bucketName, objectName, metaData)
if err != nil {
return "", nil, err
}
} else {
return "", nil, err
}
}
}
// Allocate partsInfo if not done yet
if partsInfo == nil {
partsInfo = make(map[int]ObjectPart)
}
return uploadID, partsInfo, nil
}
// computeHash - Calculates hashes for an input read Seeker. // computeHash - Calculates hashes for an input read Seeker.
func computeHash(hashAlgorithms map[string]hash.Hash, hashSums map[string][]byte, reader io.ReadSeeker) (size int64, err error) { func computeHash(hashAlgorithms map[string]hash.Hash, hashSums map[string][]byte, reader io.ReadSeeker) (size int64, err error) {
hashWriter := ioutil.Discard hashWriter := ioutil.Discard

View File

@ -17,11 +17,7 @@
package minio package minio
import ( import (
"crypto/md5"
"crypto/sha256"
"encoding/hex"
"fmt" "fmt"
"hash"
"io" "io"
"io/ioutil" "io/ioutil"
"mime" "mime"
@ -109,11 +105,9 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string)
// putObjectMultipartFromFile - Creates object from contents of *os.File // putObjectMultipartFromFile - Creates object from contents of *os.File
// //
// NOTE: This function is meant to be used for readers with local // NOTE: This function is meant to be used for readers with local
// file as in *os.File. This function resumes by skipping all the // file as in *os.File. This function effectively utilizes file
// necessary parts which were already uploaded by verifying them // system capabilities of reading from specific sections and not
// against MD5SUM of each individual parts. This function also // having to create temporary files.
// effectively utilizes file system capabilities of reading from
// specific sections and not having to create temporary files.
func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileReader io.ReaderAt, fileSize int64, metaData map[string][]string, progress io.Reader) (int64, error) { func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileReader io.ReaderAt, fileSize int64, metaData map[string][]string, progress io.Reader) (int64, error) {
// Input validation. // Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil { if err := s3utils.CheckValidBucketName(bucketName); err != nil {
@ -123,8 +117,8 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe
return 0, err return 0, err
} }
// Get the upload id of a previously partially uploaded object or initiate a new multipart upload // Initiate a new multipart upload.
uploadID, partsInfo, err := c.getMpartUploadSession(bucketName, objectName, metaData) uploadID, err := c.newUploadID(bucketName, objectName, metaData)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -152,6 +146,9 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe
// Just for readability. // Just for readability.
lastPartNumber := totalPartsCount lastPartNumber := totalPartsCount
// Initialize parts uploaded map.
partsInfo := make(map[int]ObjectPart)
// Send each part through the partUploadCh to be uploaded. // Send each part through the partUploadCh to be uploaded.
for p := 1; p <= totalPartsCount; p++ { for p := 1; p <= totalPartsCount; p++ {
part, ok := partsInfo[p] part, ok := partsInfo[p]
@ -170,12 +167,7 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe
for uploadReq := range uploadPartsCh { for uploadReq := range uploadPartsCh {
// Add hash algorithms that need to be calculated by computeHash() // Add hash algorithms that need to be calculated by computeHash()
// In case of a non-v4 signature or https connection, sha256 is not needed. // In case of a non-v4 signature or https connection, sha256 is not needed.
hashAlgos := make(map[string]hash.Hash) hashAlgos, hashSums := c.hashMaterials()
hashSums := make(map[string][]byte)
hashAlgos["md5"] = md5.New()
if c.overrideSignerType.IsV4() && !c.secure {
hashAlgos["sha256"] = sha256.New()
}
// If partNumber was not uploaded we calculate the missing // If partNumber was not uploaded we calculate the missing
// part offset and size. For all other part numbers we // part offset and size. For all other part numbers we
@ -204,36 +196,24 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe
return return
} }
// Create the part to be uploaded. // Proceed to upload the part.
verifyObjPart := ObjectPart{ var objPart ObjectPart
ETag: hex.EncodeToString(hashSums["md5"]), objPart, err = c.uploadPart(bucketName, objectName, uploadID, sectionReader, uploadReq.PartNum,
PartNumber: uploadReq.PartNum, hashSums["md5"], hashSums["sha256"], prtSize)
Size: partSize, if err != nil {
} uploadedPartsCh <- uploadedPartRes{
Error: err,
// If this is the last part do not give it the full part size.
if uploadReq.PartNum == lastPartNumber {
verifyObjPart.Size = lastPartSize
}
// Verify if part should be uploaded.
if shouldUploadPart(verifyObjPart, uploadReq) {
// Proceed to upload the part.
var objPart ObjectPart
objPart, err = c.uploadPart(bucketName, objectName, uploadID, sectionReader, uploadReq.PartNum, hashSums["md5"], hashSums["sha256"], prtSize)
if err != nil {
uploadedPartsCh <- uploadedPartRes{
Error: err,
}
// Exit the goroutine.
return
} }
// Save successfully uploaded part metadata. // Exit the goroutine.
uploadReq.Part = &objPart return
} }
// Save successfully uploaded part metadata.
uploadReq.Part = &objPart
// Return through the channel the part size. // Return through the channel the part size.
uploadedPartsCh <- uploadedPartRes{ uploadedPartsCh <- uploadedPartRes{
Size: verifyObjPart.Size, Size: missingPartSize,
PartNum: uploadReq.PartNum, PartNum: uploadReq.PartNum,
Part: uploadReq.Part, Part: uploadReq.Part,
Error: nil, Error: nil,

View File

@ -18,12 +18,8 @@ package minio
import ( import (
"bytes" "bytes"
"crypto/md5"
"crypto/sha256"
"encoding/hex"
"encoding/xml" "encoding/xml"
"fmt" "fmt"
"hash"
"io" "io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
@ -36,7 +32,7 @@ import (
"github.com/minio/minio-go/pkg/s3utils" "github.com/minio/minio-go/pkg/s3utils"
) )
// Comprehensive put object operation involving multipart resumable uploads. // Comprehensive put object operation involving multipart uploads.
// //
// Following code handles these types of readers. // Following code handles these types of readers.
// //
@ -44,9 +40,6 @@ import (
// - *minio.Object // - *minio.Object
// - Any reader which has a method 'ReadAt()' // - Any reader which has a method 'ReadAt()'
// //
// If we exhaust all the known types, code proceeds to use stream as
// is where each part is re-downloaded, checksummed and verified
// before upload.
func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Reader, size int64, metaData map[string][]string, progress io.Reader) (n int64, err error) { func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Reader, size int64, metaData map[string][]string, progress io.Reader) (n int64, err error) {
if size > 0 && size > minPartSize { if size > 0 && size > minPartSize {
// Verify if reader is *os.File, then use file system functionalities. // Verify if reader is *os.File, then use file system functionalities.
@ -70,8 +63,6 @@ func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Read
// putObjectMultipartStreamNoChecksum - upload a large object using // putObjectMultipartStreamNoChecksum - upload a large object using
// multipart upload and streaming signature for signing payload. // multipart upload and streaming signature for signing payload.
// N B We don't resume an incomplete multipart upload, we overwrite
// existing parts of an incomplete upload.
func (c Client) putObjectMultipartStreamNoChecksum(bucketName, objectName string, func (c Client) putObjectMultipartStreamNoChecksum(bucketName, objectName string,
reader io.Reader, size int64, metadata map[string][]string, progress io.Reader) (int64, error) { reader io.Reader, size int64, metadata map[string][]string, progress io.Reader) (int64, error) {
@ -83,18 +74,11 @@ func (c Client) putObjectMultipartStreamNoChecksum(bucketName, objectName string
return 0, err return 0, err
} }
// Get the upload id of a previously partially uploaded object or initiate a new multipart upload // Initiates a new multipart request
uploadID, err := c.findUploadID(bucketName, objectName) uploadID, err := c.newUploadID(bucketName, objectName, metadata)
if err != nil { if err != nil {
return 0, err return 0, err
} }
if uploadID == "" {
// Initiates a new multipart request
uploadID, err = c.newUploadID(bucketName, objectName, metadata)
if err != nil {
return 0, err
}
}
// Calculate the optimal parts info for a given size. // Calculate the optimal parts info for a given size.
totalPartsCount, partSize, lastPartSize, err := optimalPartInfo(size) totalPartsCount, partSize, lastPartSize, err := optimalPartInfo(size)
@ -191,8 +175,8 @@ func (c Client) putObjectMultipartStream(bucketName, objectName string, reader i
// Complete multipart upload. // Complete multipart upload.
var complMultipartUpload completeMultipartUpload var complMultipartUpload completeMultipartUpload
// Get the upload id of a previously partially uploaded object or initiate a new multipart upload // Initiate a new multipart upload.
uploadID, partsInfo, err := c.getMpartUploadSession(bucketName, objectName, metaData) uploadID, err := c.newUploadID(bucketName, objectName, metaData)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -209,15 +193,13 @@ func (c Client) putObjectMultipartStream(bucketName, objectName string, reader i
// Initialize a temporary buffer. // Initialize a temporary buffer.
tmpBuffer := new(bytes.Buffer) tmpBuffer := new(bytes.Buffer)
// Initialize parts uploaded map.
partsInfo := make(map[int]ObjectPart)
for partNumber <= totalPartsCount { for partNumber <= totalPartsCount {
// Choose hash algorithms to be calculated by hashCopyN, avoid sha256 // Choose hash algorithms to be calculated by hashCopyN, avoid sha256
// with non-v4 signature request or HTTPS connection // with non-v4 signature request or HTTPS connection
hashSums := make(map[string][]byte) hashAlgos, hashSums := c.hashMaterials()
hashAlgos := make(map[string]hash.Hash)
hashAlgos["md5"] = md5.New()
if c.overrideSignerType.IsV4() && !c.secure {
hashAlgos["sha256"] = sha256.New()
}
// Calculates hash sums while copying partSize bytes into tmpBuffer. // Calculates hash sums while copying partSize bytes into tmpBuffer.
prtSize, rErr := hashCopyN(hashAlgos, hashSums, tmpBuffer, reader, partSize) prtSize, rErr := hashCopyN(hashAlgos, hashSums, tmpBuffer, reader, partSize)
@ -230,31 +212,23 @@ func (c Client) putObjectMultipartStream(bucketName, objectName string, reader i
// as we read from the source. // as we read from the source.
reader = newHook(tmpBuffer, progress) reader = newHook(tmpBuffer, progress)
part, ok := partsInfo[partNumber] // Proceed to upload the part.
var objPart ObjectPart
objPart, err = c.uploadPart(bucketName, objectName, uploadID, reader, partNumber, hashSums["md5"], hashSums["sha256"], prtSize)
if err != nil {
// Reset the temporary buffer upon any error.
tmpBuffer.Reset()
return totalUploadedSize, err
}
// Verify if part should be uploaded. // Save successfully uploaded part metadata.
if !ok || shouldUploadPart(ObjectPart{ partsInfo[partNumber] = objPart
ETag: hex.EncodeToString(hashSums["md5"]),
PartNumber: partNumber, // Update the progress reader for the skipped part.
Size: prtSize, if progress != nil {
}, uploadPartReq{PartNum: partNumber, Part: &part}) { if _, err = io.CopyN(ioutil.Discard, progress, prtSize); err != nil {
// Proceed to upload the part.
var objPart ObjectPart
objPart, err = c.uploadPart(bucketName, objectName, uploadID, reader, partNumber, hashSums["md5"], hashSums["sha256"], prtSize)
if err != nil {
// Reset the temporary buffer upon any error.
tmpBuffer.Reset()
return totalUploadedSize, err return totalUploadedSize, err
} }
// Save successfully uploaded part metadata.
partsInfo[partNumber] = objPart
} else {
// Update the progress reader for the skipped part.
if progress != nil {
if _, err = io.CopyN(ioutil.Discard, progress, prtSize); err != nil {
return totalUploadedSize, err
}
}
} }
// Reset the temporary buffer. // Reset the temporary buffer.

View File

@ -18,10 +18,7 @@ package minio
import ( import (
"bytes" "bytes"
"crypto/md5"
"crypto/sha256"
"fmt" "fmt"
"hash"
"io" "io"
"io/ioutil" "io/ioutil"
"sort" "sort"
@ -42,19 +39,6 @@ type uploadPartReq struct {
Part *ObjectPart // Size of the part uploaded. Part *ObjectPart // Size of the part uploaded.
} }
// shouldUploadPartReadAt - verify if part should be uploaded.
func shouldUploadPartReadAt(objPart ObjectPart, uploadReq uploadPartReq) bool {
// If part not found part should be uploaded.
if uploadReq.Part == nil {
return true
}
// if size mismatches part should be uploaded.
if uploadReq.Part.Size != objPart.Size {
return true
}
return false
}
// putObjectMultipartFromReadAt - Uploads files bigger than 5MiB. Supports reader // putObjectMultipartFromReadAt - Uploads files bigger than 5MiB. Supports reader
// of type which implements io.ReaderAt interface (ReadAt method). // of type which implements io.ReaderAt interface (ReadAt method).
// //
@ -74,8 +58,8 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
return 0, err return 0, err
} }
// Get the upload id of a previously partially uploaded object or initiate a new multipart upload // Initiate a new multipart upload.
uploadID, partsInfo, err := c.getMpartUploadSession(bucketName, objectName, metaData) uploadID, err := c.newUploadID(bucketName, objectName, metaData)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -92,9 +76,6 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
return 0, err return 0, err
} }
// Used for readability, lastPartNumber is always totalPartsCount.
lastPartNumber := totalPartsCount
// Declare a channel that sends the next part number to be uploaded. // Declare a channel that sends the next part number to be uploaded.
// Buffered to 10000 because thats the maximum number of parts allowed // Buffered to 10000 because thats the maximum number of parts allowed
// by S3. // by S3.
@ -105,6 +86,12 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
// by S3. // by S3.
uploadedPartsCh := make(chan uploadedPartRes, 10000) uploadedPartsCh := make(chan uploadedPartRes, 10000)
// Used for readability, lastPartNumber is always totalPartsCount.
lastPartNumber := totalPartsCount
// Initialize parts uploaded map.
partsInfo := make(map[int]ObjectPart)
// Send each part number to the channel to be processed. // Send each part number to the channel to be processed.
for p := 1; p <= totalPartsCount; p++ { for p := 1; p <= totalPartsCount; p++ {
part, ok := partsInfo[p] part, ok := partsInfo[p]
@ -145,12 +132,7 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
// Choose the needed hash algorithms to be calculated by hashCopyBuffer. // Choose the needed hash algorithms to be calculated by hashCopyBuffer.
// Sha256 is avoided in non-v4 signature requests or HTTPS connections // Sha256 is avoided in non-v4 signature requests or HTTPS connections
hashSums := make(map[string][]byte) hashAlgos, hashSums := c.hashMaterials()
hashAlgos := make(map[string]hash.Hash)
hashAlgos["md5"] = md5.New()
if c.overrideSignerType.IsV4() && !c.secure {
hashAlgos["sha256"] = sha256.New()
}
var prtSize int64 var prtSize int64
var err error var err error
@ -165,37 +147,25 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
return return
} }
// Verify object if its uploaded. // Proceed to upload the part.
verifyObjPart := ObjectPart{ var objPart ObjectPart
PartNumber: uploadReq.PartNum, objPart, err = c.uploadPart(bucketName, objectName, uploadID, tmpBuffer,
Size: partSize, uploadReq.PartNum, hashSums["md5"], hashSums["sha256"], prtSize)
} if err != nil {
// Special case if we see a last part number, save last part uploadedPartsCh <- uploadedPartRes{
// size as the proper part size. Size: 0,
if uploadReq.PartNum == lastPartNumber { Error: err,
verifyObjPart.Size = lastPartSize }
// Exit the goroutine.
return
} }
// Only upload the necessary parts. Otherwise return size through channel // Save successfully uploaded part metadata.
// to update any progress bar. uploadReq.Part = &objPart
if shouldUploadPartReadAt(verifyObjPart, uploadReq) {
// Proceed to upload the part.
var objPart ObjectPart
objPart, err = c.uploadPart(bucketName, objectName, uploadID, tmpBuffer, uploadReq.PartNum, hashSums["md5"], hashSums["sha256"], prtSize)
if err != nil {
uploadedPartsCh <- uploadedPartRes{
Size: 0,
Error: err,
}
// Exit the goroutine.
return
}
// Save successfully uploaded part metadata.
uploadReq.Part = &objPart
}
// Send successful part info through the channel. // Send successful part info through the channel.
uploadedPartsCh <- uploadedPartRes{ uploadedPartsCh <- uploadedPartRes{
Size: verifyObjPart.Size, Size: missingPartSize,
PartNum: uploadReq.PartNum, PartNum: uploadReq.PartNum,
Part: uploadReq.Part, Part: uploadReq.Part,
Error: nil, Error: nil,

View File

@ -17,9 +17,6 @@
package minio package minio
import ( import (
"crypto/md5"
"crypto/sha256"
"hash"
"io" "io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
@ -146,14 +143,13 @@ func (a completedParts) Less(i, j int) bool { return a[i].PartNumber < a[j].Part
// //
// You must have WRITE permissions on a bucket to create an object. // You must have WRITE permissions on a bucket to create an object.
// //
// - For size smaller than 5MiB PutObject automatically does a single atomic Put operation. // - For size smaller than 64MiB PutObject automatically does a single atomic Put operation.
// - For size larger than 5MiB PutObject automatically does a resumable multipart Put operation. // - For size larger than 64MiB PutObject automatically does a multipart Put operation.
// - For size input as -1 PutObject does a multipart Put operation until input stream reaches EOF. // - For size input as -1 PutObject does a multipart Put operation until input stream reaches EOF.
// Maximum object size that can be uploaded through this operation will be 5TiB. // Maximum object size that can be uploaded through this operation will be 5TiB.
// //
// NOTE: Google Cloud Storage does not implement Amazon S3 Compatible multipart PUT. // NOTE: Google Cloud Storage does not implement Amazon S3 Compatible multipart PUT.
// So we fall back to single PUT operation with the maximum limit of 5GiB. // So we fall back to single PUT operation with the maximum limit of 5GiB.
//
func (c Client) PutObject(bucketName, objectName string, reader io.Reader, contentType string) (n int64, err error) { func (c Client) PutObject(bucketName, objectName string, reader io.Reader, contentType string) (n int64, err error) {
return c.PutObjectWithProgress(bucketName, objectName, reader, contentType, nil) return c.PutObjectWithProgress(bucketName, objectName, reader, contentType, nil)
} }
@ -211,12 +207,7 @@ func (c Client) putObjectSingle(bucketName, objectName string, reader io.Reader,
// Add the appropriate hash algorithms that need to be calculated by hashCopyN // Add the appropriate hash algorithms that need to be calculated by hashCopyN
// In case of non-v4 signature request or HTTPS connection, sha256 is not needed. // In case of non-v4 signature request or HTTPS connection, sha256 is not needed.
hashAlgos := make(map[string]hash.Hash) hashAlgos, hashSums := c.hashMaterials()
hashSums := make(map[string][]byte)
hashAlgos["md5"] = md5.New()
if c.overrideSignerType.IsV4() && !c.secure {
hashAlgos["sha256"] = sha256.New()
}
// Initialize a new temporary file. // Initialize a new temporary file.
tmpFile, err := newTempFile("single$-putobject-single") tmpFile, err := newTempFile("single$-putobject-single")

View File

@ -176,7 +176,7 @@ func (c Client) RemoveObjects(bucketName string, objectsCh <-chan string) <-chan
} }
} }
if count == 0 { if count == 0 {
// Multi Objects Delete API doesn't accept empty object list, quit immediatly // Multi Objects Delete API doesn't accept empty object list, quit immediately
break break
} }
if count < maxEntries { if count < maxEntries {

View File

@ -36,8 +36,8 @@ type owner struct {
ID string ID string
} }
// commonPrefix container for prefix response. // CommonPrefix container for prefix response.
type commonPrefix struct { type CommonPrefix struct {
Prefix string Prefix string
} }
@ -45,7 +45,7 @@ type commonPrefix struct {
type ListBucketV2Result struct { type ListBucketV2Result struct {
// A response can contain CommonPrefixes only if you have // A response can contain CommonPrefixes only if you have
// specified a delimiter. // specified a delimiter.
CommonPrefixes []commonPrefix CommonPrefixes []CommonPrefix
// Metadata about each object returned. // Metadata about each object returned.
Contents []ObjectInfo Contents []ObjectInfo
Delimiter string Delimiter string
@ -74,7 +74,7 @@ type ListBucketV2Result struct {
type ListBucketResult struct { type ListBucketResult struct {
// A response can contain CommonPrefixes only if you have // A response can contain CommonPrefixes only if you have
// specified a delimiter. // specified a delimiter.
CommonPrefixes []commonPrefix CommonPrefixes []CommonPrefix
// Metadata about each object returned. // Metadata about each object returned.
Contents []ObjectInfo Contents []ObjectInfo
Delimiter string Delimiter string
@ -116,7 +116,7 @@ type ListMultipartUploadsResult struct {
Prefix string Prefix string
Delimiter string Delimiter string
// A response can contain CommonPrefixes only if you specify a delimiter. // A response can contain CommonPrefixes only if you specify a delimiter.
CommonPrefixes []commonPrefix CommonPrefixes []CommonPrefix
} }
// initiator container for who initiated multipart upload. // initiator container for who initiated multipart upload.

View File

@ -19,10 +19,13 @@ package minio
import ( import (
"bytes" "bytes"
"crypto/md5"
"crypto/sha256"
"encoding/base64" "encoding/base64"
"encoding/hex" "encoding/hex"
"errors" "errors"
"fmt" "fmt"
"hash"
"io" "io"
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
@ -289,6 +292,29 @@ func (c *Client) SetS3TransferAccelerate(accelerateEndpoint string) {
} }
} }
// Hash materials provides relevant initialized hash algo writers
// based on the expected signature type.
//
// - For signature v4 request if the connection is insecure compute only sha256.
// - For signature v4 request if the connection is secure compute only md5.
// - For anonymous request compute md5.
func (c *Client) hashMaterials() (hashAlgos map[string]hash.Hash, hashSums map[string][]byte) {
hashSums = make(map[string][]byte)
hashAlgos = make(map[string]hash.Hash)
if c.overrideSignerType.IsV4() {
if c.secure {
hashAlgos["md5"] = md5.New()
} else {
hashAlgos["sha256"] = sha256.New()
}
} else {
if c.overrideSignerType.IsAnonymous() {
hashAlgos["md5"] = md5.New()
}
}
return hashAlgos, hashSums
}
// requestMetadata - is container for all the values to make a request. // requestMetadata - is container for all the values to make a request.
type requestMetadata struct { type requestMetadata struct {
// If set newRequest presigns the URL. // If set newRequest presigns the URL.
@ -450,6 +476,13 @@ func (c Client) executeMethod(method string, metadata requestMetadata) (res *htt
case os.Stdin, os.Stdout, os.Stderr: case os.Stdin, os.Stdout, os.Stderr:
isRetryable = false isRetryable = false
} }
// Figure out if the body can be closed - if yes
// we will definitely close it upon the function
// return.
bodyCloser, ok := metadata.contentBody.(io.Closer)
if ok {
defer bodyCloser.Close()
}
} }
// Create a done channel to control 'newRetryTimer' go routine. // Create a done channel to control 'newRetryTimer' go routine.
@ -558,15 +591,23 @@ func (c Client) newRequest(method string, metadata requestMetadata) (req *http.R
method = "POST" method = "POST"
} }
var location string location := metadata.bucketLocation
// Gather location only if bucketName is present. if location == "" {
if metadata.bucketName != "" && metadata.bucketLocation == "" { if metadata.bucketName != "" {
location, err = c.getBucketLocation(metadata.bucketName) // Gather location only if bucketName is present.
if err != nil { location, err = c.getBucketLocation(metadata.bucketName)
return nil, err if err != nil {
if ToErrorResponse(err).Code != "AccessDenied" {
return nil, err
}
}
// Upon AccessDenied error on fetching bucket location, default
// to possible locations based on endpoint URL. This can usually
// happen when GetBucketLocation() is disabled using IAM policies.
}
if location == "" {
location = getDefaultLocation(c.endpointURL, c.region)
} }
} else {
location = metadata.bucketLocation
} }
// Construct a new target URL. // Construct a new target URL.
@ -575,8 +616,17 @@ func (c Client) newRequest(method string, metadata requestMetadata) (req *http.R
return nil, err return nil, err
} }
// Go net/http notoriously closes the request body.
// - The request Body, if non-nil, will be closed by the underlying Transport, even on errors.
// This can cause underlying *os.File seekers to fail, avoid that
// by making sure to wrap the closer as a nop.
var body io.ReadCloser
if metadata.contentBody != nil {
body = ioutil.NopCloser(metadata.contentBody)
}
// Initialize a new HTTP request for the method. // Initialize a new HTTP request for the method.
req, err = http.NewRequest(method, targetURL.String(), metadata.contentBody) req, err = http.NewRequest(method, targetURL.String(), body)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -86,6 +86,11 @@ func (c Client) getBucketLocation(bucketName string) (string, error) {
return "", err return "", err
} }
// Region set then no need to fetch bucket location.
if c.region != "" {
return c.region, nil
}
if s3utils.IsAmazonChinaEndpoint(c.endpointURL) { if s3utils.IsAmazonChinaEndpoint(c.endpointURL) {
// For china specifically we need to set everything to // For china specifically we need to set everything to
// cn-north-1 for now, there is no easier way until AWS S3 // cn-north-1 for now, there is no easier way until AWS S3
@ -100,11 +105,6 @@ func (c Client) getBucketLocation(bucketName string) (string, error) {
return "us-gov-west-1", nil return "us-gov-west-1", nil
} }
// Region set then no need to fetch bucket location.
if c.region != "" {
return c.region, nil
}
if location, ok := c.bucketLocCache.Get(bucketName); ok { if location, ok := c.bucketLocCache.Get(bucketName); ok {
return location, nil return location, nil
} }

View File

@ -192,3 +192,23 @@ func redactSignature(origAuth string) string {
// Strip out 256-bit signature from: Signature=<256-bit signature> // Strip out 256-bit signature from: Signature=<256-bit signature>
return regSign.ReplaceAllString(newAuth, "Signature=**REDACTED**") return regSign.ReplaceAllString(newAuth, "Signature=**REDACTED**")
} }
// Get default location returns the location based on the input
// URL `u`, if region override is provided then all location
// defaults to regionOverride.
//
// If no other cases match then the location is set to `us-east-1`
// as a last resort.
func getDefaultLocation(u url.URL, regionOverride string) (location string) {
if regionOverride != "" {
return regionOverride
}
if s3utils.IsAmazonChinaEndpoint(u) {
return "cn-north-1"
}
if s3utils.IsAmazonGovCloudEndpoint(u) {
return "us-gov-west-1"
}
// Default to location to 'us-east-1'.
return "us-east-1"
}

6
vendor/vendor.json vendored
View File

@ -306,10 +306,10 @@
"revisionTime": "2016-02-29T08:42:30-08:00" "revisionTime": "2016-02-29T08:42:30-08:00"
}, },
{ {
"checksumSHA1": "o6xvhvD7RCOkBZ5pUPOsPcsz/B8=", "checksumSHA1": "vRZLSG4FsBdanI19VMtKf17jzZA=",
"path": "github.com/minio/minio-go", "path": "github.com/minio/minio-go",
"revision": "79aa9c39f7be2cdf802aa550a00850dbc1e37835", "revision": "2cca719d0760cc8906b0843a3e1e93fe9dbd8bb4",
"revisionTime": "2017-06-19T22:00:32Z" "revisionTime": "2017-06-23T21:21:08Z"
}, },
{ {
"checksumSHA1": "wDNvEYgDy1gOkzJ81WuuYore3dw=", "checksumSHA1": "wDNvEYgDy1gOkzJ81WuuYore3dw=",