Vendorize latest minio-go (#4989)

As minio-go behavior is fixed to treat empty byte arrays and nil byte
arrays in the same manner. These changes are needed in minio to
address the PutObject failure for S3 Gateway.

Fixes: https://github.com/minio/minio/issues/4974,
https://github.com/minio/minio-java/issues/615
This commit is contained in:
Nitish Tiwari 2017-09-28 20:40:38 +05:30 committed by Dee Koder
parent a5fbe1e16c
commit 789270af3c
31 changed files with 6339 additions and 444 deletions

View File

@ -45,7 +45,7 @@ func (l *s3Objects) AnonPutObject(bucket string, object string, size int64, data
delete(metadata, "etag") delete(metadata, "etag")
} }
oi, err := l.anonClient.PutObject(bucket, object, size, data, md5sumBytes, sha256sumBytes, toMinioClientMetadata(metadata)) oi, err := l.anonClient.PutObject(bucket, object, data, size, md5sumBytes, sha256sumBytes, toMinioClientMetadata(metadata))
if err != nil { if err != nil {
return objInfo, s3ToObjectError(traceError(err), bucket, object) return objInfo, s3ToObjectError(traceError(err), bucket, object)
} }

View File

@ -341,7 +341,7 @@ func (l *s3Objects) PutObject(bucket string, object string, data *HashReader, me
return objInfo, s3ToObjectError(traceError(err), bucket, object) return objInfo, s3ToObjectError(traceError(err), bucket, object)
} }
delete(metadata, "etag") delete(metadata, "etag")
oi, err := l.Client.PutObject(bucket, object, data.Size(), data, md5sumBytes, sha256sumBytes, toMinioClientMetadata(metadata)) oi, err := l.Client.PutObject(bucket, object, data, data.Size(), md5sumBytes, sha256sumBytes, toMinioClientMetadata(metadata))
if err != nil { if err != nil {
return objInfo, s3ToObjectError(traceError(err), bucket, object) return objInfo, s3ToObjectError(traceError(err), bucket, object)
} }
@ -448,17 +448,19 @@ func fromMinioClientMetadata(metadata map[string][]string) map[string]string {
} }
// toMinioClientMetadata converts metadata to map[string][]string // toMinioClientMetadata converts metadata to map[string][]string
func toMinioClientMetadata(metadata map[string]string) map[string][]string { func toMinioClientMetadata(metadata map[string]string) map[string]string {
mm := map[string][]string{} mm := map[string]string{}
for k, v := range metadata { for k, v := range metadata {
mm[http.CanonicalHeaderKey(k)] = []string{v} mm[http.CanonicalHeaderKey(k)] = v
} }
return mm return mm
} }
// NewMultipartUpload upload object in multiple parts // NewMultipartUpload upload object in multiple parts
func (l *s3Objects) NewMultipartUpload(bucket string, object string, metadata map[string]string) (uploadID string, err error) { func (l *s3Objects) NewMultipartUpload(bucket string, object string, metadata map[string]string) (uploadID string, err error) {
return l.Client.NewMultipartUpload(bucket, object, toMinioClientMetadata(metadata)) // Create PutObject options
opts := minio.PutObjectOptions{UserMetadata: metadata}
return l.Client.NewMultipartUpload(bucket, object, opts)
} }
// CopyObjectPart copy part of object to other bucket and object // CopyObjectPart copy part of object to other bucket and object
@ -489,7 +491,7 @@ func (l *s3Objects) PutObjectPart(bucket string, object string, uploadID string,
return pi, err return pi, err
} }
info, err := l.Client.PutObjectPart(bucket, object, uploadID, partID, data.Size(), data, md5HexBytes, sha256sumBytes) info, err := l.Client.PutObjectPart(bucket, object, uploadID, partID, data, data.Size(), md5HexBytes, sha256sumBytes)
if err != nil { if err != nil {
return pi, err return pi, err
} }

View File

@ -5,15 +5,31 @@
Please go through this link [Maintainer Responsibility](https://gist.github.com/abperiasamy/f4d9b31d3186bbd26522) Please go through this link [Maintainer Responsibility](https://gist.github.com/abperiasamy/f4d9b31d3186bbd26522)
### Making new releases ### Making new releases
Tag and sign your release commit, additionally this step requires you to have access to Minio's trusted private key.
Edit `libraryVersion` constant in `api.go`. ```sh
$ export GNUPGHOME=/media/${USER}/minio/trusted
``` $ git tag -s 4.0.0
$ grep libraryVersion api.go $ git push
libraryVersion = "0.3.0"
```
```
$ git tag 0.3.0
$ git push --tags $ git push --tags
``` ```
### Update version
Once release has been made update `libraryVersion` constant in `api.go` to next to be released version.
```sh
$ grep libraryVersion api.go
libraryVersion = "4.0.1"
```
Commit your changes
```
$ git commit -a -m "Update version for next release" --author "Minio Trusted <trusted@minio.io>"
```
### Announce
Announce new release by adding release notes at https://github.com/minio/minio-go/releases from `trusted@minio.io` account. Release notes requires two sections `highlights` and `changelog`. Highlights is a bulleted list of salient features in this release and Changelog contains list of all commits since the last release.
To generate `changelog`
```sh
$ git log --no-color --pretty=format:'-%d %s (%cr) <%an>' <last_release_tag>..<latest_release_tag>
```

View File

@ -55,6 +55,7 @@ func main() {
} }
log.Printf("%#v\n", minioClient) // minioClient is now setup log.Printf("%#v\n", minioClient) // minioClient is now setup
}
``` ```
## Quick Start Example - File Uploader ## Quick Start Example - File Uploader
@ -105,7 +106,7 @@ func main() {
contentType := "application/zip" contentType := "application/zip"
// Upload the zip file with FPutObject // Upload the zip file with FPutObject
n, err := minioClient.FPutObject(bucketName, objectName, filePath, contentType) n, err := minioClient.FPutObject(bucketName, objectName, filePath, minio.PutObjectOptions{ContentType:contentType})
if err != nil { if err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
@ -152,10 +153,13 @@ The full API Reference is available here.
### API Reference : File Object Operations ### API Reference : File Object Operations
* [`FPutObject`](https://docs.minio.io/docs/golang-client-api-reference#FPutObject) * [`FPutObject`](https://docs.minio.io/docs/golang-client-api-reference#FPutObject)
* [`FGetObject`](https://docs.minio.io/docs/golang-client-api-reference#FPutObject) * [`FGetObject`](https://docs.minio.io/docs/golang-client-api-reference#FPutObject)
* [`FPutObjectWithContext`](https://docs.minio.io/docs/golang-client-api-reference#FPutObjectWithContext)
* [`FGetObjectWithContext`](https://docs.minio.io/docs/golang-client-api-reference#FGetObjectWithContext)
### API Reference : Object Operations ### API Reference : Object Operations
* [`GetObject`](https://docs.minio.io/docs/golang-client-api-reference#GetObject) * [`GetObject`](https://docs.minio.io/docs/golang-client-api-reference#GetObject)
* [`PutObject`](https://docs.minio.io/docs/golang-client-api-reference#PutObject) * [`PutObject`](https://docs.minio.io/docs/golang-client-api-reference#PutObject)
* [`GetObjectWithContext`](https://docs.minio.io/docs/golang-client-api-reference#GetObjectWithContext)
* [`PutObjectWithContext`](https://docs.minio.io/docs/golang-client-api-reference#PutObjectWithContext)
* [`PutObjectStreaming`](https://docs.minio.io/docs/golang-client-api-reference#PutObjectStreaming) * [`PutObjectStreaming`](https://docs.minio.io/docs/golang-client-api-reference#PutObjectStreaming)
* [`StatObject`](https://docs.minio.io/docs/golang-client-api-reference#StatObject) * [`StatObject`](https://docs.minio.io/docs/golang-client-api-reference#StatObject)
* [`CopyObject`](https://docs.minio.io/docs/golang-client-api-reference#CopyObject) * [`CopyObject`](https://docs.minio.io/docs/golang-client-api-reference#CopyObject)
@ -204,10 +208,13 @@ The full API Reference is available here.
### Full Examples : File Object Operations ### Full Examples : File Object Operations
* [fputobject.go](https://github.com/minio/minio-go/blob/master/examples/s3/fputobject.go) * [fputobject.go](https://github.com/minio/minio-go/blob/master/examples/s3/fputobject.go)
* [fgetobject.go](https://github.com/minio/minio-go/blob/master/examples/s3/fgetobject.go) * [fgetobject.go](https://github.com/minio/minio-go/blob/master/examples/s3/fgetobject.go)
* [fputobject-context.go](https://github.com/minio/minio-go/blob/master/examples/s3/fputobject-context.go)
* [fgetobject-context.go](https://github.com/minio/minio-go/blob/master/examples/s3/fgetobject-context.go)
### Full Examples : Object Operations ### Full Examples : Object Operations
* [putobject.go](https://github.com/minio/minio-go/blob/master/examples/s3/putobject.go) * [putobject.go](https://github.com/minio/minio-go/blob/master/examples/s3/putobject.go)
* [getobject.go](https://github.com/minio/minio-go/blob/master/examples/s3/getobject.go) * [getobject.go](https://github.com/minio/minio-go/blob/master/examples/s3/getobject.go)
* [putobject-context.go](https://github.com/minio/minio-go/blob/master/examples/s3/putobject-context.go)
* [getobject-context.go](https://github.com/minio/minio-go/blob/master/examples/s3/getobject-context.go)
* [statobject.go](https://github.com/minio/minio-go/blob/master/examples/s3/statobject.go) * [statobject.go](https://github.com/minio/minio-go/blob/master/examples/s3/statobject.go)
* [copyobject.go](https://github.com/minio/minio-go/blob/master/examples/s3/copyobject.go) * [copyobject.go](https://github.com/minio/minio-go/blob/master/examples/s3/copyobject.go)
* [removeobject.go](https://github.com/minio/minio-go/blob/master/examples/s3/removeobject.go) * [removeobject.go](https://github.com/minio/minio-go/blob/master/examples/s3/removeobject.go)

View File

@ -17,6 +17,7 @@
package minio package minio
import ( import (
"context"
"encoding/base64" "encoding/base64"
"fmt" "fmt"
"net/http" "net/http"
@ -268,7 +269,7 @@ func (s *SourceInfo) getProps(c Client) (size int64, etag string, userMeta map[s
// uploadPartCopy - helper function to create a part in a multipart // uploadPartCopy - helper function to create a part in a multipart
// upload via an upload-part-copy request // upload via an upload-part-copy request
// https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html // https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html
func (c Client) uploadPartCopy(bucket, object, uploadID string, partNumber int, func (c Client) uploadPartCopy(ctx context.Context, bucket, object, uploadID string, partNumber int,
headers http.Header) (p CompletePart, err error) { headers http.Header) (p CompletePart, err error) {
// Build query parameters // Build query parameters
@ -277,7 +278,7 @@ func (c Client) uploadPartCopy(bucket, object, uploadID string, partNumber int,
urlValues.Set("uploadId", uploadID) urlValues.Set("uploadId", uploadID)
// Send upload-part-copy request // Send upload-part-copy request
resp, err := c.executeMethod("PUT", requestMetadata{ resp, err := c.executeMethod(ctx, "PUT", requestMetadata{
bucketName: bucket, bucketName: bucket,
objectName: object, objectName: object,
customHeader: headers, customHeader: headers,
@ -311,7 +312,7 @@ func (c Client) ComposeObject(dst DestinationInfo, srcs []SourceInfo) error {
if len(srcs) < 1 || len(srcs) > maxPartsCount { if len(srcs) < 1 || len(srcs) > maxPartsCount {
return ErrInvalidArgument("There must be as least one and up to 10000 source objects.") return ErrInvalidArgument("There must be as least one and up to 10000 source objects.")
} }
ctx := context.Background()
srcSizes := make([]int64, len(srcs)) srcSizes := make([]int64, len(srcs))
var totalSize, size, totalParts int64 var totalSize, size, totalParts int64
var srcUserMeta map[string]string var srcUserMeta map[string]string
@ -396,7 +397,7 @@ func (c Client) ComposeObject(dst DestinationInfo, srcs []SourceInfo) error {
} }
// Send copy request // Send copy request
resp, err := c.executeMethod("PUT", requestMetadata{ resp, err := c.executeMethod(ctx, "PUT", requestMetadata{
bucketName: dst.bucket, bucketName: dst.bucket,
objectName: dst.object, objectName: dst.object,
customHeader: h, customHeader: h,
@ -426,11 +427,11 @@ func (c Client) ComposeObject(dst DestinationInfo, srcs []SourceInfo) error {
if len(userMeta) == 0 && len(srcs) == 1 { if len(userMeta) == 0 && len(srcs) == 1 {
metaMap = srcUserMeta metaMap = srcUserMeta
} }
metaHeaders := make(map[string][]string) metaHeaders := make(map[string]string)
for k, v := range metaMap { for k, v := range metaMap {
metaHeaders[k] = append(metaHeaders[k], v) metaHeaders[k] = v
} }
uploadID, err := c.newUploadID(dst.bucket, dst.object, metaHeaders) uploadID, err := c.newUploadID(ctx, dst.bucket, dst.object, PutObjectOptions{UserMetadata: metaHeaders})
if err != nil { if err != nil {
return fmt.Errorf("Error creating new upload: %v", err) return fmt.Errorf("Error creating new upload: %v", err)
} }
@ -457,7 +458,7 @@ func (c Client) ComposeObject(dst DestinationInfo, srcs []SourceInfo) error {
fmt.Sprintf("bytes=%d-%d", start, end)) fmt.Sprintf("bytes=%d-%d", start, end))
// make upload-part-copy request // make upload-part-copy request
complPart, err := c.uploadPartCopy(dst.bucket, complPart, err := c.uploadPartCopy(ctx, dst.bucket,
dst.object, uploadID, partIndex, h) dst.object, uploadID, partIndex, h)
if err != nil { if err != nil {
return fmt.Errorf("Error in upload-part-copy - %v", err) return fmt.Errorf("Error in upload-part-copy - %v", err)
@ -468,7 +469,7 @@ func (c Client) ComposeObject(dst DestinationInfo, srcs []SourceInfo) error {
} }
// 3. Make final complete-multipart request. // 3. Make final complete-multipart request.
_, err = c.completeMultipartUpload(dst.bucket, dst.object, uploadID, _, err = c.completeMultipartUpload(ctx, dst.bucket, dst.object, uploadID,
completeMultipartUpload{Parts: objParts}) completeMultipartUpload{Parts: objParts})
if err != nil { if err != nil {
err = fmt.Errorf("Error in complete-multipart request - %v", err) err = fmt.Errorf("Error in complete-multipart request - %v", err)

View File

@ -0,0 +1,24 @@
/*
* Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package minio
import "context"
// GetObjectWithContext - returns an seekable, readable object.
func (c Client) GetObjectWithContext(ctx context.Context, bucketName, objectName string) (*Object, error) {
return c.getObjectWithContext(ctx, bucketName, objectName)
}

View File

@ -21,11 +21,23 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"context"
"github.com/minio/minio-go/pkg/s3utils" "github.com/minio/minio-go/pkg/s3utils"
) )
// FGetObjectWithContext - download contents of an object to a local file.
func (c Client) FGetObjectWithContext(ctx context.Context, bucketName, objectName, filePath string) error {
return c.fGetObjectWithContext(ctx, bucketName, objectName, filePath)
}
// FGetObject - download contents of an object to a local file. // FGetObject - download contents of an object to a local file.
func (c Client) FGetObject(bucketName, objectName, filePath string) error { func (c Client) FGetObject(bucketName, objectName, filePath string) error {
return c.fGetObjectWithContext(context.Background(), bucketName, objectName, filePath)
}
// fGetObjectWithContext - fgetObject wrapper function with context
func (c Client) fGetObjectWithContext(ctx context.Context, bucketName, objectName, filePath string) error {
// Input validation. // Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil { if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return err return err
@ -88,7 +100,7 @@ func (c Client) FGetObject(bucketName, objectName, filePath string) error {
} }
// Seek to current position for incoming reader. // Seek to current position for incoming reader.
objectReader, objectStat, err := c.getObject(bucketName, objectName, reqHeaders) objectReader, objectStat, err := c.getObject(ctx, bucketName, objectName, reqHeaders)
if err != nil { if err != nil {
return err return err
} }

View File

@ -17,6 +17,7 @@
package minio package minio
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -57,6 +58,11 @@ func (c Client) GetEncryptedObject(bucketName, objectName string, encryptMateria
// GetObject - returns an seekable, readable object. // GetObject - returns an seekable, readable object.
func (c Client) GetObject(bucketName, objectName string) (*Object, error) { func (c Client) GetObject(bucketName, objectName string) (*Object, error) {
return c.getObjectWithContext(context.Background(), bucketName, objectName)
}
// GetObject wrapper function that accepts a request context
func (c Client) getObjectWithContext(ctx context.Context, bucketName, objectName string) (*Object, error) {
// Input validation. // Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil { if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return nil, err return nil, err
@ -110,14 +116,14 @@ func (c Client) GetObject(bucketName, objectName string) (*Object, error) {
// Do not set objectInfo from the first readAt request because it will not get // Do not set objectInfo from the first readAt request because it will not get
// the whole object. // the whole object.
reqHeaders.SetRange(req.Offset, req.Offset+int64(len(req.Buffer))-1) reqHeaders.SetRange(req.Offset, req.Offset+int64(len(req.Buffer))-1)
httpReader, objectInfo, err = c.getObject(bucketName, objectName, reqHeaders) httpReader, objectInfo, err = c.getObject(ctx, bucketName, objectName, reqHeaders)
} else { } else {
if req.Offset > 0 { if req.Offset > 0 {
reqHeaders.SetRange(req.Offset, 0) reqHeaders.SetRange(req.Offset, 0)
} }
// First request is a Read request. // First request is a Read request.
httpReader, objectInfo, err = c.getObject(bucketName, objectName, reqHeaders) httpReader, objectInfo, err = c.getObject(ctx, bucketName, objectName, reqHeaders)
} }
if err != nil { if err != nil {
resCh <- getResponse{ resCh <- getResponse{
@ -195,14 +201,14 @@ func (c Client) GetObject(bucketName, objectName string) (*Object, error) {
if req.isReadAt { if req.isReadAt {
// Range is set with respect to the offset and length of the buffer requested. // Range is set with respect to the offset and length of the buffer requested.
reqHeaders.SetRange(req.Offset, req.Offset+int64(len(req.Buffer))-1) reqHeaders.SetRange(req.Offset, req.Offset+int64(len(req.Buffer))-1)
httpReader, _, err = c.getObject(bucketName, objectName, reqHeaders) httpReader, _, err = c.getObject(ctx, bucketName, objectName, reqHeaders)
} else { } else {
// Range is set with respect to the offset. // Range is set with respect to the offset.
if req.Offset > 0 { if req.Offset > 0 {
reqHeaders.SetRange(req.Offset, 0) reqHeaders.SetRange(req.Offset, 0)
} }
httpReader, objectInfo, err = c.getObject(bucketName, objectName, reqHeaders) httpReader, objectInfo, err = c.getObject(ctx, bucketName, objectName, reqHeaders)
} }
if err != nil { if err != nil {
resCh <- getResponse{ resCh <- getResponse{
@ -626,7 +632,7 @@ func newObject(reqCh chan<- getRequest, resCh <-chan getResponse, doneCh chan<-
// //
// For more information about the HTTP Range header. // For more information about the HTTP Range header.
// go to http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35. // go to http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35.
func (c Client) getObject(bucketName, objectName string, reqHeaders RequestHeaders) (io.ReadCloser, ObjectInfo, error) { func (c Client) getObject(ctx context.Context, bucketName, objectName string, reqHeaders RequestHeaders) (io.ReadCloser, ObjectInfo, error) {
// Validate input arguments. // Validate input arguments.
if err := s3utils.CheckValidBucketName(bucketName); err != nil { if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return nil, ObjectInfo{}, err return nil, ObjectInfo{}, err
@ -642,7 +648,7 @@ func (c Client) getObject(bucketName, objectName string, reqHeaders RequestHeade
} }
// Execute GET on objectName. // Execute GET on objectName.
resp, err := c.executeMethod("GET", requestMetadata{ resp, err := c.executeMethod(ctx, "GET", requestMetadata{
bucketName: bucketName, bucketName: bucketName,
objectName: objectName, objectName: objectName,
customHeader: customHeader, customHeader: customHeader,

View File

@ -17,6 +17,7 @@
package minio package minio
import ( import (
"context"
"encoding/json" "encoding/json"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
@ -79,7 +80,7 @@ func (c Client) getBucketPolicy(bucketName string) (policy.BucketAccessPolicy, e
urlValues.Set("policy", "") urlValues.Set("policy", "")
// Execute GET on bucket to list objects. // Execute GET on bucket to list objects.
resp, err := c.executeMethod("GET", requestMetadata{ resp, err := c.executeMethod(context.Background(), "GET", requestMetadata{
bucketName: bucketName, bucketName: bucketName,
queryValues: urlValues, queryValues: urlValues,
contentSHA256Bytes: emptySHA256, contentSHA256Bytes: emptySHA256,

View File

@ -17,6 +17,7 @@
package minio package minio
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
@ -38,7 +39,7 @@ import (
// //
func (c Client) ListBuckets() ([]BucketInfo, error) { func (c Client) ListBuckets() ([]BucketInfo, error) {
// Execute GET on service. // Execute GET on service.
resp, err := c.executeMethod("GET", requestMetadata{contentSHA256Bytes: emptySHA256}) resp, err := c.executeMethod(context.Background(), "GET", requestMetadata{contentSHA256Bytes: emptySHA256})
defer closeResponse(resp) defer closeResponse(resp)
if err != nil { if err != nil {
return nil, err return nil, err
@ -215,7 +216,7 @@ func (c Client) listObjectsV2Query(bucketName, objectPrefix, continuationToken s
urlValues.Set("max-keys", fmt.Sprintf("%d", maxkeys)) urlValues.Set("max-keys", fmt.Sprintf("%d", maxkeys))
// Execute GET on bucket to list objects. // Execute GET on bucket to list objects.
resp, err := c.executeMethod("GET", requestMetadata{ resp, err := c.executeMethod(context.Background(), "GET", requestMetadata{
bucketName: bucketName, bucketName: bucketName,
queryValues: urlValues, queryValues: urlValues,
contentSHA256Bytes: emptySHA256, contentSHA256Bytes: emptySHA256,
@ -393,7 +394,7 @@ func (c Client) listObjectsQuery(bucketName, objectPrefix, objectMarker, delimit
urlValues.Set("max-keys", fmt.Sprintf("%d", maxkeys)) urlValues.Set("max-keys", fmt.Sprintf("%d", maxkeys))
// Execute GET on bucket to list objects. // Execute GET on bucket to list objects.
resp, err := c.executeMethod("GET", requestMetadata{ resp, err := c.executeMethod(context.Background(), "GET", requestMetadata{
bucketName: bucketName, bucketName: bucketName,
queryValues: urlValues, queryValues: urlValues,
contentSHA256Bytes: emptySHA256, contentSHA256Bytes: emptySHA256,
@ -572,7 +573,7 @@ func (c Client) listMultipartUploadsQuery(bucketName, keyMarker, uploadIDMarker,
urlValues.Set("max-uploads", fmt.Sprintf("%d", maxUploads)) urlValues.Set("max-uploads", fmt.Sprintf("%d", maxUploads))
// Execute GET on bucketName to list multipart uploads. // Execute GET on bucketName to list multipart uploads.
resp, err := c.executeMethod("GET", requestMetadata{ resp, err := c.executeMethod(context.Background(), "GET", requestMetadata{
bucketName: bucketName, bucketName: bucketName,
queryValues: urlValues, queryValues: urlValues,
contentSHA256Bytes: emptySHA256, contentSHA256Bytes: emptySHA256,
@ -690,7 +691,7 @@ func (c Client) listObjectPartsQuery(bucketName, objectName, uploadID string, pa
urlValues.Set("max-parts", fmt.Sprintf("%d", maxParts)) urlValues.Set("max-parts", fmt.Sprintf("%d", maxParts))
// Execute GET on objectName to get list of parts. // Execute GET on objectName to get list of parts.
resp, err := c.executeMethod("GET", requestMetadata{ resp, err := c.executeMethod(context.Background(), "GET", requestMetadata{
bucketName: bucketName, bucketName: bucketName,
objectName: objectName, objectName: objectName,
queryValues: urlValues, queryValues: urlValues,

View File

@ -18,6 +18,7 @@ package minio
import ( import (
"bufio" "bufio"
"context"
"encoding/json" "encoding/json"
"io" "io"
"net/http" "net/http"
@ -46,7 +47,7 @@ func (c Client) getBucketNotification(bucketName string) (BucketNotification, er
urlValues.Set("notification", "") urlValues.Set("notification", "")
// Execute GET on bucket to list objects. // Execute GET on bucket to list objects.
resp, err := c.executeMethod("GET", requestMetadata{ resp, err := c.executeMethod(context.Background(), "GET", requestMetadata{
bucketName: bucketName, bucketName: bucketName,
queryValues: urlValues, queryValues: urlValues,
contentSHA256Bytes: emptySHA256, contentSHA256Bytes: emptySHA256,
@ -170,7 +171,7 @@ func (c Client) ListenBucketNotification(bucketName, prefix, suffix string, even
urlValues["events"] = events urlValues["events"] = events
// Execute GET on bucket to list objects. // Execute GET on bucket to list objects.
resp, err := c.executeMethod("GET", requestMetadata{ resp, err := c.executeMethod(context.Background(), "GET", requestMetadata{
bucketName: bucketName, bucketName: bucketName,
queryValues: urlValues, queryValues: urlValues,
contentSHA256Bytes: emptySHA256, contentSHA256Bytes: emptySHA256,

View File

@ -19,6 +19,7 @@ package minio
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"encoding/xml" "encoding/xml"
"fmt" "fmt"
@ -82,7 +83,7 @@ func (c Client) MakeBucket(bucketName string, location string) (err error) {
} }
// Execute PUT to create a new bucket. // Execute PUT to create a new bucket.
resp, err := c.executeMethod("PUT", reqMetadata) resp, err := c.executeMethod(context.Background(), "PUT", reqMetadata)
defer closeResponse(resp) defer closeResponse(resp)
if err != nil { if err != nil {
return err return err
@ -170,7 +171,7 @@ func (c Client) putBucketPolicy(bucketName string, policyInfo policy.BucketAcces
} }
// Execute PUT to upload a new bucket policy. // Execute PUT to upload a new bucket policy.
resp, err := c.executeMethod("PUT", reqMetadata) resp, err := c.executeMethod(context.Background(), "PUT", reqMetadata)
defer closeResponse(resp) defer closeResponse(resp)
if err != nil { if err != nil {
return err return err
@ -195,7 +196,7 @@ func (c Client) removeBucketPolicy(bucketName string) error {
urlValues.Set("policy", "") urlValues.Set("policy", "")
// Execute DELETE on objectName. // Execute DELETE on objectName.
resp, err := c.executeMethod("DELETE", requestMetadata{ resp, err := c.executeMethod(context.Background(), "DELETE", requestMetadata{
bucketName: bucketName, bucketName: bucketName,
queryValues: urlValues, queryValues: urlValues,
contentSHA256Bytes: emptySHA256, contentSHA256Bytes: emptySHA256,
@ -235,7 +236,7 @@ func (c Client) SetBucketNotification(bucketName string, bucketNotification Buck
} }
// Execute PUT to upload a new bucket notification. // Execute PUT to upload a new bucket notification.
resp, err := c.executeMethod("PUT", reqMetadata) resp, err := c.executeMethod(context.Background(), "PUT", reqMetadata)
defer closeResponse(resp) defer closeResponse(resp)
if err != nil { if err != nil {
return err return err

View File

@ -17,6 +17,7 @@
package minio package minio
import ( import (
"context"
"io" "io"
"math" "math"
"os" "os"
@ -77,7 +78,7 @@ func optimalPartInfo(objectSize int64) (totalPartsCount int, partSize int64, las
// getUploadID - fetch upload id if already present for an object name // getUploadID - fetch upload id if already present for an object name
// or initiate a new request to fetch a new upload id. // or initiate a new request to fetch a new upload id.
func (c Client) newUploadID(bucketName, objectName string, metaData map[string][]string) (uploadID string, err error) { func (c Client) newUploadID(ctx context.Context, bucketName, objectName string, opts PutObjectOptions) (uploadID string, err error) {
// Input validation. // Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil { if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return "", err return "", err
@ -87,7 +88,7 @@ func (c Client) newUploadID(bucketName, objectName string, metaData map[string][
} }
// Initiate multipart upload for an object. // Initiate multipart upload for an object.
initMultipartUploadResult, err := c.initiateMultipartUpload(bucketName, objectName, metaData) initMultipartUploadResult, err := c.initiateMultipartUpload(ctx, bucketName, objectName, opts)
if err != nil { if err != nil {
return "", err return "", err
} }

View File

@ -0,0 +1,38 @@
/*
* Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package minio
import (
"context"
"io"
)
// PutObjectWithContext - Identical to PutObject call, but accepts context to facilitate request cancellation.
func (c Client) PutObjectWithContext(ctx context.Context, bucketName, objectName string, reader io.Reader, objectSize int64,
opts PutObjectOptions) (n int64, err error) {
err = opts.validate()
if err != nil {
return 0, err
}
if opts.EncryptMaterials != nil {
if err = opts.EncryptMaterials.SetupEncryptMode(reader); err != nil {
return 0, err
}
return c.putObjectMultipartStreamNoLength(ctx, bucketName, objectName, opts.EncryptMaterials, opts)
}
return c.putObjectCommon(ctx, bucketName, objectName, reader, objectSize, opts)
}

View File

@ -17,13 +17,14 @@
package minio package minio
import ( import (
"context"
"io" "io"
"github.com/minio/minio-go/pkg/encrypt" "github.com/minio/minio-go/pkg/encrypt"
) )
// PutEncryptedObject - Encrypt and store object. // PutEncryptedObject - Encrypt and store object.
func (c Client) PutEncryptedObject(bucketName, objectName string, reader io.Reader, encryptMaterials encrypt.Materials, metadata map[string][]string, progress io.Reader) (n int64, err error) { func (c Client) PutEncryptedObject(bucketName, objectName string, reader io.Reader, encryptMaterials encrypt.Materials) (n int64, err error) {
if encryptMaterials == nil { if encryptMaterials == nil {
return 0, ErrInvalidArgument("Unable to recognize empty encryption properties") return 0, ErrInvalidArgument("Unable to recognize empty encryption properties")
@ -33,14 +34,10 @@ func (c Client) PutEncryptedObject(bucketName, objectName string, reader io.Read
return 0, err return 0, err
} }
if metadata == nil { return c.PutObjectWithContext(context.Background(), bucketName, objectName, reader, -1, PutObjectOptions{EncryptMaterials: encryptMaterials})
metadata = make(map[string][]string) }
}
// FPutEncryptedObject - Encrypt and store an object with contents from file at filePath.
// Set the necessary encryption headers, for future decryption. func (c Client) FPutEncryptedObject(bucketName, objectName, filePath string, encryptMaterials encrypt.Materials) (n int64, err error) {
metadata[amzHeaderIV] = []string{encryptMaterials.GetIV()} return c.FPutObjectWithContext(context.Background(), bucketName, objectName, filePath, PutObjectOptions{EncryptMaterials: encryptMaterials})
metadata[amzHeaderKey] = []string{encryptMaterials.GetKey()}
metadata[amzHeaderMatDesc] = []string{encryptMaterials.GetDesc()}
return c.putObjectMultipartStreamNoLength(bucketName, objectName, encryptMaterials, metadata, progress)
} }

View File

@ -0,0 +1,63 @@
/*
* Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package minio
import (
"context"
"mime"
"os"
"path/filepath"
"github.com/minio/minio-go/pkg/s3utils"
)
// FPutObjectWithContext - Create an object in a bucket, with contents from file at filePath. Allows request cancellation.
func (c Client) FPutObjectWithContext(ctx context.Context, bucketName, objectName, filePath string, opts PutObjectOptions) (n int64, err error) {
// Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return 0, err
}
if err := s3utils.CheckValidObjectName(objectName); err != nil {
return 0, err
}
// Open the referenced file.
fileReader, err := os.Open(filePath)
// If any error fail quickly here.
if err != nil {
return 0, err
}
defer fileReader.Close()
// Save the file stat.
fileStat, err := fileReader.Stat()
if err != nil {
return 0, err
}
// Save the file size.
fileSize := fileStat.Size()
// Set contentType based on filepath extension if not given or default
// value of "application/octet-stream" if the extension has no associated type.
if opts.ContentType == "" {
if opts.ContentType = mime.TypeByExtension(filepath.Ext(filePath)); opts.ContentType == "" {
opts.ContentType = "application/octet-stream"
}
}
return c.PutObjectWithContext(ctx, bucketName, objectName, fileReader, fileSize, opts)
}

View File

@ -17,50 +17,10 @@
package minio package minio
import ( import (
"mime" "context"
"os"
"path/filepath"
"github.com/minio/minio-go/pkg/s3utils"
) )
// FPutObject - Create an object in a bucket, with contents from file at filePath. // FPutObject - Create an object in a bucket, with contents from file at filePath
func (c Client) FPutObject(bucketName, objectName, filePath, contentType string) (n int64, err error) { func (c Client) FPutObject(bucketName, objectName, filePath string, opts PutObjectOptions) (n int64, err error) {
// Input validation. return c.FPutObjectWithContext(context.Background(), bucketName, objectName, filePath, opts)
if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return 0, err
}
if err := s3utils.CheckValidObjectName(objectName); err != nil {
return 0, err
}
// Open the referenced file.
fileReader, err := os.Open(filePath)
// If any error fail quickly here.
if err != nil {
return 0, err
}
defer fileReader.Close()
// Save the file stat.
fileStat, err := fileReader.Stat()
if err != nil {
return 0, err
}
// Save the file size.
fileSize := fileStat.Size()
objMetadata := make(map[string][]string)
// Set contentType based on filepath extension if not given or default
// value of "binary/octet-stream" if the extension has no associated type.
if contentType == "" {
if contentType = mime.TypeByExtension(filepath.Ext(filePath)); contentType == "" {
contentType = "application/octet-stream"
}
}
objMetadata["Content-Type"] = []string{contentType}
return c.putObjectCommon(bucketName, objectName, fileReader, fileSize, objMetadata, nil)
} }

View File

@ -18,6 +18,7 @@ package minio
import ( import (
"bytes" "bytes"
"context"
"encoding/xml" "encoding/xml"
"fmt" "fmt"
"io" "io"
@ -32,9 +33,9 @@ import (
"github.com/minio/minio-go/pkg/s3utils" "github.com/minio/minio-go/pkg/s3utils"
) )
func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Reader, size int64, func (c Client) putObjectMultipart(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64,
metadata map[string][]string, progress io.Reader) (n int64, err error) { opts PutObjectOptions) (n int64, err error) {
n, err = c.putObjectMultipartNoStream(bucketName, objectName, reader, metadata, progress) n, err = c.putObjectMultipartNoStream(ctx, bucketName, objectName, reader, opts)
if err != nil { if err != nil {
errResp := ToErrorResponse(err) errResp := ToErrorResponse(err)
// Verify if multipart functionality is not available, if not // Verify if multipart functionality is not available, if not
@ -45,13 +46,13 @@ func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Read
return 0, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName) return 0, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
} }
// Fall back to uploading as single PutObject operation. // Fall back to uploading as single PutObject operation.
return c.putObjectNoChecksum(bucketName, objectName, reader, size, metadata, progress) return c.putObjectNoChecksum(ctx, bucketName, objectName, reader, size, opts)
} }
} }
return n, err return n, err
} }
func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader io.Reader, metadata map[string][]string, progress io.Reader) (n int64, err error) { func (c Client) putObjectMultipartNoStream(ctx context.Context, bucketName, objectName string, reader io.Reader, opts PutObjectOptions) (n int64, err error) {
// Input validation. // Input validation.
if err = s3utils.CheckValidBucketName(bucketName); err != nil { if err = s3utils.CheckValidBucketName(bucketName); err != nil {
return 0, err return 0, err
@ -74,14 +75,14 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
} }
// Initiate a new multipart upload. // Initiate a new multipart upload.
uploadID, err := c.newUploadID(bucketName, objectName, metadata) uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
if err != nil { if err != nil {
return 0, err return 0, err
} }
defer func() { defer func() {
if err != nil { if err != nil {
c.abortMultipartUpload(bucketName, objectName, uploadID) c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
} }
}() }()
@ -117,12 +118,12 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
// Update progress reader appropriately to the latest offset // Update progress reader appropriately to the latest offset
// as we read from the source. // as we read from the source.
rd := newHook(bytes.NewReader(buf[:length]), progress) rd := newHook(bytes.NewReader(buf[:length]), opts.Progress)
// Proceed to upload the part. // Proceed to upload the part.
var objPart ObjectPart var objPart ObjectPart
objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber, objPart, err = c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber,
hashSums["md5"], hashSums["sha256"], int64(length), metadata) hashSums["md5"], hashSums["sha256"], int64(length), opts.UserMetadata)
if err != nil { if err != nil {
return totalUploadedSize, err return totalUploadedSize, err
} }
@ -158,7 +159,7 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
// Sort all completed parts. // Sort all completed parts.
sort.Sort(completedParts(complMultipartUpload.Parts)) sort.Sort(completedParts(complMultipartUpload.Parts))
if _, err = c.completeMultipartUpload(bucketName, objectName, uploadID, complMultipartUpload); err != nil { if _, err = c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload); err != nil {
return totalUploadedSize, err return totalUploadedSize, err
} }
@ -167,7 +168,7 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
} }
// initiateMultipartUpload - Initiates a multipart upload and returns an upload ID. // initiateMultipartUpload - Initiates a multipart upload and returns an upload ID.
func (c Client) initiateMultipartUpload(bucketName, objectName string, metadata map[string][]string) (initiateMultipartUploadResult, error) { func (c Client) initiateMultipartUpload(ctx context.Context, bucketName, objectName string, opts PutObjectOptions) (initiateMultipartUploadResult, error) {
// Input validation. // Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil { if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return initiateMultipartUploadResult{}, err return initiateMultipartUploadResult{}, err
@ -181,17 +182,7 @@ func (c Client) initiateMultipartUpload(bucketName, objectName string, metadata
urlValues.Set("uploads", "") urlValues.Set("uploads", "")
// Set ContentType header. // Set ContentType header.
customHeader := make(http.Header) customHeader := opts.Header()
for k, v := range metadata {
if len(v) > 0 {
customHeader.Set(k, v[0])
}
}
// Set a default content-type header if the latter is not provided
if v, ok := metadata["Content-Type"]; !ok || len(v) == 0 {
customHeader.Set("Content-Type", "application/octet-stream")
}
reqMetadata := requestMetadata{ reqMetadata := requestMetadata{
bucketName: bucketName, bucketName: bucketName,
@ -201,7 +192,7 @@ func (c Client) initiateMultipartUpload(bucketName, objectName string, metadata
} }
// Execute POST on an objectName to initiate multipart upload. // Execute POST on an objectName to initiate multipart upload.
resp, err := c.executeMethod("POST", reqMetadata) resp, err := c.executeMethod(ctx, "POST", reqMetadata)
defer closeResponse(resp) defer closeResponse(resp)
if err != nil { if err != nil {
return initiateMultipartUploadResult{}, err return initiateMultipartUploadResult{}, err
@ -223,8 +214,8 @@ func (c Client) initiateMultipartUpload(bucketName, objectName string, metadata
const serverEncryptionKeyPrefix = "x-amz-server-side-encryption" const serverEncryptionKeyPrefix = "x-amz-server-side-encryption"
// uploadPart - Uploads a part in a multipart upload. // uploadPart - Uploads a part in a multipart upload.
func (c Client) uploadPart(bucketName, objectName, uploadID string, reader io.Reader, func (c Client) uploadPart(ctx context.Context, bucketName, objectName, uploadID string, reader io.Reader,
partNumber int, md5Sum, sha256Sum []byte, size int64, metadata map[string][]string) (ObjectPart, error) { partNumber int, md5Sum, sha256Sum []byte, size int64, metadata map[string]string) (ObjectPart, error) {
// Input validation. // Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil { if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return ObjectPart{}, err return ObjectPart{}, err
@ -257,7 +248,7 @@ func (c Client) uploadPart(bucketName, objectName, uploadID string, reader io.Re
for k, v := range metadata { for k, v := range metadata {
if len(v) > 0 { if len(v) > 0 {
if strings.HasPrefix(strings.ToLower(k), serverEncryptionKeyPrefix) { if strings.HasPrefix(strings.ToLower(k), serverEncryptionKeyPrefix) {
customHeader.Set(k, v[0]) customHeader.Set(k, v)
} }
} }
} }
@ -274,7 +265,7 @@ func (c Client) uploadPart(bucketName, objectName, uploadID string, reader io.Re
} }
// Execute PUT on each part. // Execute PUT on each part.
resp, err := c.executeMethod("PUT", reqMetadata) resp, err := c.executeMethod(ctx, "PUT", reqMetadata)
defer closeResponse(resp) defer closeResponse(resp)
if err != nil { if err != nil {
return ObjectPart{}, err return ObjectPart{}, err
@ -295,7 +286,7 @@ func (c Client) uploadPart(bucketName, objectName, uploadID string, reader io.Re
} }
// completeMultipartUpload - Completes a multipart upload by assembling previously uploaded parts. // completeMultipartUpload - Completes a multipart upload by assembling previously uploaded parts.
func (c Client) completeMultipartUpload(bucketName, objectName, uploadID string, func (c Client) completeMultipartUpload(ctx context.Context, bucketName, objectName, uploadID string,
complete completeMultipartUpload) (completeMultipartUploadResult, error) { complete completeMultipartUpload) (completeMultipartUploadResult, error) {
// Input validation. // Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil { if err := s3utils.CheckValidBucketName(bucketName); err != nil {
@ -308,7 +299,6 @@ func (c Client) completeMultipartUpload(bucketName, objectName, uploadID string,
// Initialize url queries. // Initialize url queries.
urlValues := make(url.Values) urlValues := make(url.Values)
urlValues.Set("uploadId", uploadID) urlValues.Set("uploadId", uploadID)
// Marshal complete multipart body. // Marshal complete multipart body.
completeMultipartUploadBytes, err := xml.Marshal(complete) completeMultipartUploadBytes, err := xml.Marshal(complete)
if err != nil { if err != nil {
@ -327,7 +317,7 @@ func (c Client) completeMultipartUpload(bucketName, objectName, uploadID string,
} }
// Execute POST to complete multipart upload for an objectName. // Execute POST to complete multipart upload for an objectName.
resp, err := c.executeMethod("POST", reqMetadata) resp, err := c.executeMethod(ctx, "POST", reqMetadata)
defer closeResponse(resp) defer closeResponse(resp)
if err != nil { if err != nil {
return completeMultipartUploadResult{}, err return completeMultipartUploadResult{}, err

View File

@ -17,6 +17,7 @@
package minio package minio
import ( import (
"context"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
@ -26,11 +27,6 @@ import (
"github.com/minio/minio-go/pkg/s3utils" "github.com/minio/minio-go/pkg/s3utils"
) )
// PutObjectStreaming using AWS streaming signature V4
func (c Client) PutObjectStreaming(bucketName, objectName string, reader io.Reader) (n int64, err error) {
return c.PutObjectWithProgress(bucketName, objectName, reader, nil, nil)
}
// putObjectMultipartStream - upload a large object using // putObjectMultipartStream - upload a large object using
// multipart upload and streaming signature for signing payload. // multipart upload and streaming signature for signing payload.
// Comprehensive put object operation involving multipart uploads. // Comprehensive put object operation involving multipart uploads.
@ -41,8 +37,8 @@ func (c Client) PutObjectStreaming(bucketName, objectName string, reader io.Read
// - *minio.Object // - *minio.Object
// - Any reader which has a method 'ReadAt()' // - Any reader which has a method 'ReadAt()'
// //
func (c Client) putObjectMultipartStream(bucketName, objectName string, func (c Client) putObjectMultipartStream(ctx context.Context, bucketName, objectName string,
reader io.Reader, size int64, metadata map[string][]string, progress io.Reader) (n int64, err error) { reader io.Reader, size int64, opts PutObjectOptions) (n int64, err error) {
// Verify if reader is *minio.Object, *os.File or io.ReaderAt. // Verify if reader is *minio.Object, *os.File or io.ReaderAt.
// NOTE: Verification of object is kept for a specific purpose // NOTE: Verification of object is kept for a specific purpose
@ -50,9 +46,9 @@ func (c Client) putObjectMultipartStream(bucketName, objectName string,
// It is to indicate that *minio.Object implements io.ReaderAt. // It is to indicate that *minio.Object implements io.ReaderAt.
// and such a functionality is used in the subsequent code path. // and such a functionality is used in the subsequent code path.
if isFile(reader) || !isObject(reader) && isReadAt(reader) { if isFile(reader) || !isObject(reader) && isReadAt(reader) {
n, err = c.putObjectMultipartStreamFromReadAt(bucketName, objectName, reader.(io.ReaderAt), size, metadata, progress) n, err = c.putObjectMultipartStreamFromReadAt(ctx, bucketName, objectName, reader.(io.ReaderAt), size, opts)
} else { } else {
n, err = c.putObjectMultipartStreamNoChecksum(bucketName, objectName, reader, size, metadata, progress) n, err = c.putObjectMultipartStreamNoChecksum(ctx, bucketName, objectName, reader, size, opts)
} }
if err != nil { if err != nil {
errResp := ToErrorResponse(err) errResp := ToErrorResponse(err)
@ -64,7 +60,7 @@ func (c Client) putObjectMultipartStream(bucketName, objectName string,
return 0, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName) return 0, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
} }
// Fall back to uploading as single PutObject operation. // Fall back to uploading as single PutObject operation.
return c.putObjectNoChecksum(bucketName, objectName, reader, size, metadata, progress) return c.putObjectNoChecksum(ctx, bucketName, objectName, reader, size, opts)
} }
} }
return n, err return n, err
@ -94,8 +90,8 @@ type uploadPartReq struct {
// temporary files for staging all the data, these temporary files are // temporary files for staging all the data, these temporary files are
// cleaned automatically when the caller i.e http client closes the // cleaned automatically when the caller i.e http client closes the
// stream after uploading all the contents successfully. // stream after uploading all the contents successfully.
func (c Client) putObjectMultipartStreamFromReadAt(bucketName, objectName string, func (c Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketName, objectName string,
reader io.ReaderAt, size int64, metadata map[string][]string, progress io.Reader) (n int64, err error) { reader io.ReaderAt, size int64, opts PutObjectOptions) (n int64, err error) {
// Input validation. // Input validation.
if err = s3utils.CheckValidBucketName(bucketName); err != nil { if err = s3utils.CheckValidBucketName(bucketName); err != nil {
return 0, err return 0, err
@ -111,7 +107,7 @@ func (c Client) putObjectMultipartStreamFromReadAt(bucketName, objectName string
} }
// Initiate a new multipart upload. // Initiate a new multipart upload.
uploadID, err := c.newUploadID(bucketName, objectName, metadata) uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -122,7 +118,7 @@ func (c Client) putObjectMultipartStreamFromReadAt(bucketName, objectName string
// to relinquish storage space. // to relinquish storage space.
defer func() { defer func() {
if err != nil { if err != nil {
c.abortMultipartUpload(bucketName, objectName, uploadID) c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
} }
}() }()
@ -150,9 +146,8 @@ func (c Client) putObjectMultipartStreamFromReadAt(bucketName, objectName string
uploadPartsCh <- uploadPartReq{PartNum: p, Part: nil} uploadPartsCh <- uploadPartReq{PartNum: p, Part: nil}
} }
close(uploadPartsCh) close(uploadPartsCh)
// Receive each part number from the channel allowing three parallel uploads. // Receive each part number from the channel allowing three parallel uploads.
for w := 1; w <= totalWorkers; w++ { for w := 1; w <= opts.getNumThreads(); w++ {
go func(partSize int64) { go func(partSize int64) {
// Each worker will draw from the part channel and upload in parallel. // Each worker will draw from the part channel and upload in parallel.
for uploadReq := range uploadPartsCh { for uploadReq := range uploadPartsCh {
@ -170,13 +165,13 @@ func (c Client) putObjectMultipartStreamFromReadAt(bucketName, objectName string
} }
// Get a section reader on a particular offset. // Get a section reader on a particular offset.
sectionReader := newHook(io.NewSectionReader(reader, readOffset, partSize), progress) sectionReader := newHook(io.NewSectionReader(reader, readOffset, partSize), opts.Progress)
// Proceed to upload the part. // Proceed to upload the part.
var objPart ObjectPart var objPart ObjectPart
objPart, err = c.uploadPart(bucketName, objectName, uploadID, objPart, err = c.uploadPart(ctx, bucketName, objectName, uploadID,
sectionReader, uploadReq.PartNum, sectionReader, uploadReq.PartNum,
nil, nil, partSize, metadata) nil, nil, partSize, opts.UserMetadata)
if err != nil { if err != nil {
uploadedPartsCh <- uploadedPartRes{ uploadedPartsCh <- uploadedPartRes{
Size: 0, Size: 0,
@ -229,7 +224,7 @@ func (c Client) putObjectMultipartStreamFromReadAt(bucketName, objectName string
// Sort all completed parts. // Sort all completed parts.
sort.Sort(completedParts(complMultipartUpload.Parts)) sort.Sort(completedParts(complMultipartUpload.Parts))
_, err = c.completeMultipartUpload(bucketName, objectName, uploadID, complMultipartUpload) _, err = c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload)
if err != nil { if err != nil {
return totalUploadedSize, err return totalUploadedSize, err
} }
@ -238,8 +233,8 @@ func (c Client) putObjectMultipartStreamFromReadAt(bucketName, objectName string
return totalUploadedSize, nil return totalUploadedSize, nil
} }
func (c Client) putObjectMultipartStreamNoChecksum(bucketName, objectName string, func (c Client) putObjectMultipartStreamNoChecksum(ctx context.Context, bucketName, objectName string,
reader io.Reader, size int64, metadata map[string][]string, progress io.Reader) (n int64, err error) { reader io.Reader, size int64, opts PutObjectOptions) (n int64, err error) {
// Input validation. // Input validation.
if err = s3utils.CheckValidBucketName(bucketName); err != nil { if err = s3utils.CheckValidBucketName(bucketName); err != nil {
return 0, err return 0, err
@ -253,9 +248,8 @@ func (c Client) putObjectMultipartStreamNoChecksum(bucketName, objectName string
if err != nil { if err != nil {
return 0, err return 0, err
} }
// Initiates a new multipart request // Initiates a new multipart request
uploadID, err := c.newUploadID(bucketName, objectName, metadata) uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -266,7 +260,7 @@ func (c Client) putObjectMultipartStreamNoChecksum(bucketName, objectName string
// storage space. // storage space.
defer func() { defer func() {
if err != nil { if err != nil {
c.abortMultipartUpload(bucketName, objectName, uploadID) c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
} }
}() }()
@ -281,17 +275,16 @@ func (c Client) putObjectMultipartStreamNoChecksum(bucketName, objectName string
for partNumber = 1; partNumber <= totalPartsCount; partNumber++ { for partNumber = 1; partNumber <= totalPartsCount; partNumber++ {
// Update progress reader appropriately to the latest offset // Update progress reader appropriately to the latest offset
// as we read from the source. // as we read from the source.
hookReader := newHook(reader, progress) hookReader := newHook(reader, opts.Progress)
// Proceed to upload the part. // Proceed to upload the part.
if partNumber == totalPartsCount { if partNumber == totalPartsCount {
partSize = lastPartSize partSize = lastPartSize
} }
var objPart ObjectPart var objPart ObjectPart
objPart, err = c.uploadPart(bucketName, objectName, uploadID, objPart, err = c.uploadPart(ctx, bucketName, objectName, uploadID,
io.LimitReader(hookReader, partSize), io.LimitReader(hookReader, partSize),
partNumber, nil, nil, partSize, metadata) partNumber, nil, nil, partSize, opts.UserMetadata)
if err != nil { if err != nil {
return totalUploadedSize, err return totalUploadedSize, err
} }
@ -328,7 +321,7 @@ func (c Client) putObjectMultipartStreamNoChecksum(bucketName, objectName string
// Sort all completed parts. // Sort all completed parts.
sort.Sort(completedParts(complMultipartUpload.Parts)) sort.Sort(completedParts(complMultipartUpload.Parts))
_, err = c.completeMultipartUpload(bucketName, objectName, uploadID, complMultipartUpload) _, err = c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload)
if err != nil { if err != nil {
return totalUploadedSize, err return totalUploadedSize, err
} }
@ -339,7 +332,7 @@ func (c Client) putObjectMultipartStreamNoChecksum(bucketName, objectName string
// putObjectNoChecksum special function used Google Cloud Storage. This special function // putObjectNoChecksum special function used Google Cloud Storage. This special function
// is used for Google Cloud Storage since Google's multipart API is not S3 compatible. // is used for Google Cloud Storage since Google's multipart API is not S3 compatible.
func (c Client) putObjectNoChecksum(bucketName, objectName string, reader io.Reader, size int64, metaData map[string][]string, progress io.Reader) (n int64, err error) { func (c Client) putObjectNoChecksum(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (n int64, err error) {
// Input validation. // Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil { if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return 0, err return 0, err
@ -361,11 +354,11 @@ func (c Client) putObjectNoChecksum(bucketName, objectName string, reader io.Rea
// Update progress reader appropriately to the latest offset as we // Update progress reader appropriately to the latest offset as we
// read from the source. // read from the source.
readSeeker := newHook(reader, progress) readSeeker := newHook(reader, opts.Progress)
// This function does not calculate sha256 and md5sum for payload. // This function does not calculate sha256 and md5sum for payload.
// Execute put object. // Execute put object.
st, err := c.putObjectDo(bucketName, objectName, readSeeker, nil, nil, size, metaData) st, err := c.putObjectDo(ctx, bucketName, objectName, readSeeker, nil, nil, size, opts)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -377,7 +370,7 @@ func (c Client) putObjectNoChecksum(bucketName, objectName string, reader io.Rea
// putObjectDo - executes the put object http operation. // putObjectDo - executes the put object http operation.
// NOTE: You must have WRITE permissions on a bucket to add an object to it. // NOTE: You must have WRITE permissions on a bucket to add an object to it.
func (c Client) putObjectDo(bucketName, objectName string, reader io.Reader, md5Sum []byte, sha256Sum []byte, size int64, metaData map[string][]string) (ObjectInfo, error) { func (c Client) putObjectDo(ctx context.Context, bucketName, objectName string, reader io.Reader, md5Sum []byte, sha256Sum []byte, size int64, opts PutObjectOptions) (ObjectInfo, error) {
// Input validation. // Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil { if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return ObjectInfo{}, err return ObjectInfo{}, err
@ -385,21 +378,8 @@ func (c Client) putObjectDo(bucketName, objectName string, reader io.Reader, md5
if err := s3utils.CheckValidObjectName(objectName); err != nil { if err := s3utils.CheckValidObjectName(objectName); err != nil {
return ObjectInfo{}, err return ObjectInfo{}, err
} }
// Set headers. // Set headers.
customHeader := make(http.Header) customHeader := opts.Header()
// Set metadata to headers
for k, v := range metaData {
if len(v) > 0 {
customHeader.Set(k, v[0])
}
}
// If Content-Type is not provided, set the default application/octet-stream one
if v, ok := metaData["Content-Type"]; !ok || len(v) == 0 {
customHeader.Set("Content-Type", "application/octet-stream")
}
// Populate request metadata. // Populate request metadata.
reqMetadata := requestMetadata{ reqMetadata := requestMetadata{
@ -413,7 +393,7 @@ func (c Client) putObjectDo(bucketName, objectName string, reader io.Reader, md5
} }
// Execute PUT an objectName. // Execute PUT an objectName.
resp, err := c.executeMethod("PUT", reqMetadata) resp, err := c.executeMethod(ctx, "PUT", reqMetadata)
defer closeResponse(resp) defer closeResponse(resp)
if err != nil { if err != nil {
return ObjectInfo{}, err return ObjectInfo{}, err

View File

@ -18,119 +18,84 @@ package minio
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"io" "io"
"os" "net/http"
"reflect"
"runtime"
"runtime/debug" "runtime/debug"
"sort" "sort"
"strings" "strings"
"github.com/minio/minio-go/pkg/encrypt"
"github.com/minio/minio-go/pkg/s3utils" "github.com/minio/minio-go/pkg/s3utils"
) )
// toInt - converts go value to its integer representation based // PutObjectOptions represents options specified by user for PutObject call
// on the value kind if it is an integer. type PutObjectOptions struct {
func toInt(value reflect.Value) (size int64) { UserMetadata map[string]string
size = -1 Progress io.Reader
if value.IsValid() { ContentType string
switch value.Kind() { ContentEncoding string
case reflect.Int: ContentDisposition string
fallthrough CacheControl string
case reflect.Int8: EncryptMaterials encrypt.Materials
fallthrough NumThreads uint
case reflect.Int16:
fallthrough
case reflect.Int32:
fallthrough
case reflect.Int64:
size = value.Int()
}
}
return size
} }
// getReaderSize - Determine the size of Reader if available. // getNumThreads - gets the number of threads to be used in the multipart
func getReaderSize(reader io.Reader) (size int64, err error) { // put object operation
size = -1 func (opts PutObjectOptions) getNumThreads() (numThreads int) {
if reader == nil { if opts.NumThreads > 0 {
return -1, nil numThreads = int(opts.NumThreads)
}
// Verify if there is a method by name 'Size'.
sizeFn := reflect.ValueOf(reader).MethodByName("Size")
// Verify if there is a method by name 'Len'.
lenFn := reflect.ValueOf(reader).MethodByName("Len")
if sizeFn.IsValid() {
if sizeFn.Kind() == reflect.Func {
// Call the 'Size' function and save its return value.
result := sizeFn.Call([]reflect.Value{})
if len(result) == 1 {
size = toInt(result[0])
}
}
} else if lenFn.IsValid() {
if lenFn.Kind() == reflect.Func {
// Call the 'Len' function and save its return value.
result := lenFn.Call([]reflect.Value{})
if len(result) == 1 {
size = toInt(result[0])
}
}
} else { } else {
// Fallback to Stat() method, two possible Stat() structs exist. numThreads = totalWorkers
switch v := reader.(type) { }
case *os.File: return
var st os.FileInfo }
st, err = v.Stat()
if err != nil { // Header - constructs the headers from metadata entered by user in
// Handle this case specially for "windows", // PutObjectOptions struct
// certain files for example 'Stdin', 'Stdout' and func (opts PutObjectOptions) Header() (header http.Header) {
// 'Stderr' it is not allowed to fetch file information. header = make(http.Header)
if runtime.GOOS == "windows" {
if strings.Contains(err.Error(), "GetFileInformationByHandle") { if opts.ContentType != "" {
return -1, nil header["Content-Type"] = []string{opts.ContentType}
} else {
header["Content-Type"] = []string{"application/octet-stream"}
}
if opts.ContentEncoding != "" {
header["Content-Encoding"] = []string{opts.ContentEncoding}
}
if opts.ContentDisposition != "" {
header["Content-Disposition"] = []string{opts.ContentDisposition}
}
if opts.CacheControl != "" {
header["Cache-Control"] = []string{opts.CacheControl}
}
if opts.EncryptMaterials != nil {
header[amzHeaderIV] = []string{opts.EncryptMaterials.GetIV()}
header[amzHeaderKey] = []string{opts.EncryptMaterials.GetKey()}
header[amzHeaderMatDesc] = []string{opts.EncryptMaterials.GetDesc()}
}
for k, v := range opts.UserMetadata {
if !strings.HasPrefix(strings.ToLower(k), "x-amz-meta-") && !isStandardHeader(k) {
header["X-Amz-Meta-"+k] = []string{v}
} else {
header[k] = []string{v}
} }
} }
return return
} }
// Ignore if input is a directory, throw an error.
if st.Mode().IsDir() { // validate() checks if the UserMetadata map has standard headers or client side
return -1, ErrInvalidArgument("Input file cannot be a directory.") // encryption headers and raises an error if so.
} func (opts PutObjectOptions) validate() (err error) {
// Ignore 'Stdin', 'Stdout' and 'Stderr', since they for k := range opts.UserMetadata {
// represent *os.File type but internally do not if isStandardHeader(k) || isCSEHeader(k) {
// implement Seekable calls. Ignore them and treat return ErrInvalidArgument(k + " unsupported request parameter for user defined metadata")
// them like a stream with unknown length.
switch st.Name() {
case "stdin", "stdout", "stderr":
return
// Ignore read/write stream of os.Pipe() which have unknown length too.
case "|0", "|1":
return
}
var pos int64
pos, err = v.Seek(0, 1) // SeekCurrent.
if err != nil {
return -1, err
}
size = st.Size() - pos
case *Object:
var st ObjectInfo
st, err = v.Stat()
if err != nil {
return
}
var pos int64
pos, err = v.Seek(0, 1) // SeekCurrent.
if err != nil {
return -1, err
}
size = st.Size - pos
} }
} }
// Returns the size here. return nil
return size, err
} }
// completedParts is a collection of parts sortable by their part numbers. // completedParts is a collection of parts sortable by their part numbers.
@ -152,40 +117,12 @@ func (a completedParts) Less(i, j int) bool { return a[i].PartNumber < a[j].Part
// - For size input as -1 PutObject does a multipart Put operation // - For size input as -1 PutObject does a multipart Put operation
// until input stream reaches EOF. Maximum object size that can // until input stream reaches EOF. Maximum object size that can
// be uploaded through this operation will be 5TiB. // be uploaded through this operation will be 5TiB.
func (c Client) PutObject(bucketName, objectName string, reader io.Reader, contentType string) (n int64, err error) { func (c Client) PutObject(bucketName, objectName string, reader io.Reader, objectSize int64,
return c.PutObjectWithMetadata(bucketName, objectName, reader, map[string][]string{ opts PutObjectOptions) (n int64, err error) {
"Content-Type": []string{contentType}, return c.PutObjectWithContext(context.Background(), bucketName, objectName, reader, objectSize, opts)
}, nil)
} }
// PutObjectWithSize - is a helper PutObject similar in behavior to PutObject() func (c Client) putObjectCommon(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (n int64, err error) {
// but takes the size argument explicitly, this function avoids doing reflection
// internally to figure out the size of input stream. Also if the input size is
// lesser than 0 this function returns an error.
func (c Client) PutObjectWithSize(bucketName, objectName string, reader io.Reader, readerSize int64, metadata map[string][]string, progress io.Reader) (n int64, err error) {
return c.putObjectCommon(bucketName, objectName, reader, readerSize, metadata, progress)
}
// PutObjectWithMetadata using AWS streaming signature V4
func (c Client) PutObjectWithMetadata(bucketName, objectName string, reader io.Reader, metadata map[string][]string, progress io.Reader) (n int64, err error) {
return c.PutObjectWithProgress(bucketName, objectName, reader, metadata, progress)
}
// PutObjectWithProgress using AWS streaming signature V4
func (c Client) PutObjectWithProgress(bucketName, objectName string, reader io.Reader, metadata map[string][]string, progress io.Reader) (n int64, err error) {
// Size of the object.
var size int64
// Get reader size.
size, err = getReaderSize(reader)
if err != nil {
return 0, err
}
return c.putObjectCommon(bucketName, objectName, reader, size, metadata, progress)
}
func (c Client) putObjectCommon(bucketName, objectName string, reader io.Reader, size int64, metadata map[string][]string, progress io.Reader) (n int64, err error) {
// Check for largest object size allowed. // Check for largest object size allowed.
if size > int64(maxMultipartPutObjectSize) { if size > int64(maxMultipartPutObjectSize) {
return 0, ErrEntityTooLarge(size, maxMultipartPutObjectSize, bucketName, objectName) return 0, ErrEntityTooLarge(size, maxMultipartPutObjectSize, bucketName, objectName)
@ -194,30 +131,27 @@ func (c Client) putObjectCommon(bucketName, objectName string, reader io.Reader,
// NOTE: Streaming signature is not supported by GCS. // NOTE: Streaming signature is not supported by GCS.
if s3utils.IsGoogleEndpoint(c.endpointURL) { if s3utils.IsGoogleEndpoint(c.endpointURL) {
// Do not compute MD5 for Google Cloud Storage. // Do not compute MD5 for Google Cloud Storage.
return c.putObjectNoChecksum(bucketName, objectName, reader, size, metadata, progress) return c.putObjectNoChecksum(ctx, bucketName, objectName, reader, size, opts)
} }
if c.overrideSignerType.IsV2() { if c.overrideSignerType.IsV2() {
if size >= 0 && size < minPartSize { if size >= 0 && size < minPartSize {
return c.putObjectNoChecksum(bucketName, objectName, reader, size, metadata, progress) return c.putObjectNoChecksum(ctx, bucketName, objectName, reader, size, opts)
} }
return c.putObjectMultipart(bucketName, objectName, reader, size, metadata, progress) return c.putObjectMultipart(ctx, bucketName, objectName, reader, size, opts)
} }
if size < 0 { if size < 0 {
return c.putObjectMultipartStreamNoLength(bucketName, objectName, reader, metadata, progress) return c.putObjectMultipartStreamNoLength(ctx, bucketName, objectName, reader, opts)
} }
if size < minPartSize { if size < minPartSize {
return c.putObjectNoChecksum(bucketName, objectName, reader, size, metadata, progress) return c.putObjectNoChecksum(ctx, bucketName, objectName, reader, size, opts)
} }
// For all sizes greater than 64MiB do multipart. // For all sizes greater than 64MiB do multipart.
return c.putObjectMultipartStream(bucketName, objectName, reader, size, metadata, progress) return c.putObjectMultipartStream(ctx, bucketName, objectName, reader, size, opts)
} }
func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string, reader io.Reader, metadata map[string][]string, func (c Client) putObjectMultipartStreamNoLength(ctx context.Context, bucketName, objectName string, reader io.Reader, opts PutObjectOptions) (n int64, err error) {
progress io.Reader) (n int64, err error) {
// Input validation. // Input validation.
if err = s3utils.CheckValidBucketName(bucketName); err != nil { if err = s3utils.CheckValidBucketName(bucketName); err != nil {
return 0, err return 0, err
@ -238,16 +172,15 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
if err != nil { if err != nil {
return 0, err return 0, err
} }
// Initiate a new multipart upload. // Initiate a new multipart upload.
uploadID, err := c.newUploadID(bucketName, objectName, metadata) uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
if err != nil { if err != nil {
return 0, err return 0, err
} }
defer func() { defer func() {
if err != nil { if err != nil {
c.abortMultipartUpload(bucketName, objectName, uploadID) c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
} }
}() }()
@ -263,21 +196,20 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
for partNumber <= totalPartsCount { for partNumber <= totalPartsCount {
length, rErr := io.ReadFull(reader, buf) length, rErr := io.ReadFull(reader, buf)
if rErr == io.EOF { if rErr == io.EOF && partNumber > 1 {
break break
} }
if rErr != nil && rErr != io.ErrUnexpectedEOF { if rErr != nil && rErr != io.ErrUnexpectedEOF {
return 0, rErr return 0, rErr
} }
// Update progress reader appropriately to the latest offset // Update progress reader appropriately to the latest offset
// as we read from the source. // as we read from the source.
rd := newHook(bytes.NewReader(buf[:length]), progress) rd := newHook(bytes.NewReader(buf[:length]), opts.Progress)
// Proceed to upload the part. // Proceed to upload the part.
var objPart ObjectPart var objPart ObjectPart
objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber, objPart, err = c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber,
nil, nil, int64(length), metadata) nil, nil, int64(length), opts.UserMetadata)
if err != nil { if err != nil {
return totalUploadedSize, err return totalUploadedSize, err
} }
@ -313,7 +245,7 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
// Sort all completed parts. // Sort all completed parts.
sort.Sort(completedParts(complMultipartUpload.Parts)) sort.Sort(completedParts(complMultipartUpload.Parts))
if _, err = c.completeMultipartUpload(bucketName, objectName, uploadID, complMultipartUpload); err != nil { if _, err = c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload); err != nil {
return totalUploadedSize, err return totalUploadedSize, err
} }

View File

@ -18,6 +18,7 @@ package minio
import ( import (
"bytes" "bytes"
"context"
"encoding/xml" "encoding/xml"
"io" "io"
"net/http" "net/http"
@ -36,7 +37,7 @@ func (c Client) RemoveBucket(bucketName string) error {
return err return err
} }
// Execute DELETE on bucket. // Execute DELETE on bucket.
resp, err := c.executeMethod("DELETE", requestMetadata{ resp, err := c.executeMethod(context.Background(), "DELETE", requestMetadata{
bucketName: bucketName, bucketName: bucketName,
contentSHA256Bytes: emptySHA256, contentSHA256Bytes: emptySHA256,
}) })
@ -66,7 +67,7 @@ func (c Client) RemoveObject(bucketName, objectName string) error {
return err return err
} }
// Execute DELETE on objectName. // Execute DELETE on objectName.
resp, err := c.executeMethod("DELETE", requestMetadata{ resp, err := c.executeMethod(context.Background(), "DELETE", requestMetadata{
bucketName: bucketName, bucketName: bucketName,
objectName: objectName, objectName: objectName,
contentSHA256Bytes: emptySHA256, contentSHA256Bytes: emptySHA256,
@ -187,7 +188,7 @@ func (c Client) RemoveObjects(bucketName string, objectsCh <-chan string) <-chan
// Generate remove multi objects XML request // Generate remove multi objects XML request
removeBytes := generateRemoveMultiObjectsRequest(batch) removeBytes := generateRemoveMultiObjectsRequest(batch)
// Execute GET on bucket to list objects. // Execute GET on bucket to list objects.
resp, err := c.executeMethod("POST", requestMetadata{ resp, err := c.executeMethod(context.Background(), "POST", requestMetadata{
bucketName: bucketName, bucketName: bucketName,
queryValues: urlValues, queryValues: urlValues,
contentBody: bytes.NewReader(removeBytes), contentBody: bytes.NewReader(removeBytes),
@ -227,7 +228,7 @@ func (c Client) RemoveIncompleteUpload(bucketName, objectName string) error {
} }
if uploadID != "" { if uploadID != "" {
// Upload id found, abort the incomplete multipart upload. // Upload id found, abort the incomplete multipart upload.
err := c.abortMultipartUpload(bucketName, objectName, uploadID) err := c.abortMultipartUpload(context.Background(), bucketName, objectName, uploadID)
if err != nil { if err != nil {
return err return err
} }
@ -237,7 +238,7 @@ func (c Client) RemoveIncompleteUpload(bucketName, objectName string) error {
// abortMultipartUpload aborts a multipart upload for the given // abortMultipartUpload aborts a multipart upload for the given
// uploadID, all previously uploaded parts are deleted. // uploadID, all previously uploaded parts are deleted.
func (c Client) abortMultipartUpload(bucketName, objectName, uploadID string) error { func (c Client) abortMultipartUpload(ctx context.Context, bucketName, objectName, uploadID string) error {
// Input validation. // Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil { if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return err return err
@ -251,7 +252,7 @@ func (c Client) abortMultipartUpload(bucketName, objectName, uploadID string) er
urlValues.Set("uploadId", uploadID) urlValues.Set("uploadId", uploadID)
// Execute DELETE on multipart upload. // Execute DELETE on multipart upload.
resp, err := c.executeMethod("DELETE", requestMetadata{ resp, err := c.executeMethod(ctx, "DELETE", requestMetadata{
bucketName: bucketName, bucketName: bucketName,
objectName: objectName, objectName: objectName,
queryValues: urlValues, queryValues: urlValues,

View File

@ -17,6 +17,7 @@
package minio package minio
import ( import (
"context"
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
@ -33,7 +34,7 @@ func (c Client) BucketExists(bucketName string) (bool, error) {
} }
// Execute HEAD on bucketName. // Execute HEAD on bucketName.
resp, err := c.executeMethod("HEAD", requestMetadata{ resp, err := c.executeMethod(context.Background(), "HEAD", requestMetadata{
bucketName: bucketName, bucketName: bucketName,
contentSHA256Bytes: emptySHA256, contentSHA256Bytes: emptySHA256,
}) })
@ -108,7 +109,7 @@ func (c Client) statObject(bucketName, objectName string, reqHeaders RequestHead
} }
// Execute HEAD on objectName. // Execute HEAD on objectName.
resp, err := c.executeMethod("HEAD", requestMetadata{ resp, err := c.executeMethod(context.Background(), "HEAD", requestMetadata{
bucketName: bucketName, bucketName: bucketName,
objectName: objectName, objectName: objectName,
contentSHA256Bytes: emptySHA256, contentSHA256Bytes: emptySHA256,

View File

@ -19,6 +19,7 @@ package minio
import ( import (
"bytes" "bytes"
"context"
"crypto/md5" "crypto/md5"
"crypto/sha256" "crypto/sha256"
"encoding/base64" "encoding/base64"
@ -87,7 +88,7 @@ type Client struct {
// Global constants. // Global constants.
const ( const (
libraryName = "minio-go" libraryName = "minio-go"
libraryVersion = "3.0.2" libraryVersion = "4.0.0"
) )
// User Agent should always following the below style. // User Agent should always following the below style.
@ -494,9 +495,11 @@ var successStatus = []int{
// executeMethod - instantiates a given method, and retries the // executeMethod - instantiates a given method, and retries the
// request upon any error up to maxRetries attempts in a binomially // request upon any error up to maxRetries attempts in a binomially
// delayed manner using a standard back off algorithm. // delayed manner using a standard back off algorithm.
func (c Client) executeMethod(method string, metadata requestMetadata) (res *http.Response, err error) { func (c Client) executeMethod(ctx context.Context, method string, metadata requestMetadata) (res *http.Response, err error) {
var isRetryable bool // Indicates if request can be retried. var isRetryable bool // Indicates if request can be retried.
var bodySeeker io.Seeker // Extracted seeker from io.Reader. var bodySeeker io.Seeker // Extracted seeker from io.Reader.
var reqRetry = MaxRetry // Indicates how many times we can retry the request
if metadata.contentBody != nil { if metadata.contentBody != nil {
// Check if body is seekable then it is retryable. // Check if body is seekable then it is retryable.
bodySeeker, isRetryable = metadata.contentBody.(io.Seeker) bodySeeker, isRetryable = metadata.contentBody.(io.Seeker)
@ -504,6 +507,11 @@ func (c Client) executeMethod(method string, metadata requestMetadata) (res *htt
case os.Stdin, os.Stdout, os.Stderr: case os.Stdin, os.Stdout, os.Stderr:
isRetryable = false isRetryable = false
} }
// Retry only when reader is seekable
if !isRetryable {
reqRetry = 1
}
// Figure out if the body can be closed - if yes // Figure out if the body can be closed - if yes
// we will definitely close it upon the function // we will definitely close it upon the function
// return. // return.
@ -522,7 +530,7 @@ func (c Client) executeMethod(method string, metadata requestMetadata) (res *htt
// Blank indentifier is kept here on purpose since 'range' without // Blank indentifier is kept here on purpose since 'range' without
// blank identifiers is only supported since go1.4 // blank identifiers is only supported since go1.4
// https://golang.org/doc/go1.4#forrange. // https://golang.org/doc/go1.4#forrange.
for range c.newRetryTimer(MaxRetry, DefaultRetryUnit, DefaultRetryCap, MaxJitter, doneCh) { for range c.newRetryTimer(reqRetry, DefaultRetryUnit, DefaultRetryCap, MaxJitter, doneCh) {
// Retry executes the following function body if request has an // Retry executes the following function body if request has an
// error until maxRetries have been exhausted, retry attempts are // error until maxRetries have been exhausted, retry attempts are
// performed after waiting for a given period of time in a // performed after waiting for a given period of time in a
@ -545,6 +553,8 @@ func (c Client) executeMethod(method string, metadata requestMetadata) (res *htt
} }
return nil, err return nil, err
} }
// Add context to request
req = req.WithContext(ctx)
// Initiate the request. // Initiate the request.
res, err = c.do(req) res, err = c.do(req)
@ -720,7 +730,7 @@ func (c Client) newRequest(method string, metadata requestMetadata) (req *http.R
} }
// set md5Sum for content protection. // set md5Sum for content protection.
if metadata.contentMD5Bytes != nil { if len(metadata.contentMD5Bytes) > 0 {
req.Header.Set("Content-Md5", base64.StdEncoding.EncodeToString(metadata.contentMD5Bytes)) req.Header.Set("Content-Md5", base64.StdEncoding.EncodeToString(metadata.contentMD5Bytes))
} }

View File

@ -21,6 +21,7 @@ install:
- go get -u github.com/minio/go-homedir - go get -u github.com/minio/go-homedir
- go get -u github.com/remyoudompheng/go-misc/deadcode - go get -u github.com/remyoudompheng/go-misc/deadcode
- go get -u github.com/gordonklaus/ineffassign - go get -u github.com/gordonklaus/ineffassign
- go get -u github.com/dustin/go-humanize
# to run your custom scripts instead of automatic MSBuild # to run your custom scripts instead of automatic MSBuild
build_script: build_script:

View File

@ -50,7 +50,7 @@ const maxMultipartPutObjectSize = 1024 * 1024 * 1024 * 1024 * 5
const unsignedPayload = "UNSIGNED-PAYLOAD" const unsignedPayload = "UNSIGNED-PAYLOAD"
// Total number of parallel workers used for multipart operation. // Total number of parallel workers used for multipart operation.
var totalWorkers = 3 const totalWorkers = 4
// Signature related constants. // Signature related constants.
const ( const (

View File

@ -17,6 +17,7 @@
package minio package minio
import ( import (
"context"
"io" "io"
"github.com/minio/minio-go/pkg/policy" "github.com/minio/minio-go/pkg/policy"
@ -53,13 +54,13 @@ func (c Core) ListObjectsV2(bucketName, objectPrefix, continuationToken string,
} }
// PutObject - Upload object. Uploads using single PUT call. // PutObject - Upload object. Uploads using single PUT call.
func (c Core) PutObject(bucket, object string, size int64, data io.Reader, md5Sum, sha256Sum []byte, metadata map[string][]string) (ObjectInfo, error) { func (c Core) PutObject(bucket, object string, data io.Reader, size int64, md5Sum, sha256Sum []byte, metadata map[string]string) (ObjectInfo, error) {
return c.putObjectDo(bucket, object, data, md5Sum, sha256Sum, size, metadata) return c.putObjectDo(context.Background(), bucket, object, data, md5Sum, sha256Sum, size, PutObjectOptions{UserMetadata: metadata})
} }
// NewMultipartUpload - Initiates new multipart upload and returns the new uploaID. // NewMultipartUpload - Initiates new multipart upload and returns the new uploadID.
func (c Core) NewMultipartUpload(bucket, object string, metadata map[string][]string) (uploadID string, err error) { func (c Core) NewMultipartUpload(bucket, object string, opts PutObjectOptions) (uploadID string, err error) {
result, err := c.initiateMultipartUpload(bucket, object, metadata) result, err := c.initiateMultipartUpload(context.Background(), bucket, object, opts)
return result.UploadID, err return result.UploadID, err
} }
@ -69,14 +70,14 @@ func (c Core) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarker, de
} }
// PutObjectPart - Upload an object part. // PutObjectPart - Upload an object part.
func (c Core) PutObjectPart(bucket, object, uploadID string, partID int, size int64, data io.Reader, md5Sum, sha256Sum []byte) (ObjectPart, error) { func (c Core) PutObjectPart(bucket, object, uploadID string, partID int, data io.Reader, size int64, md5Sum, sha256Sum []byte) (ObjectPart, error) {
return c.PutObjectPartWithMetadata(bucket, object, uploadID, partID, size, data, md5Sum, sha256Sum, nil) return c.PutObjectPartWithMetadata(bucket, object, uploadID, partID, data, size, md5Sum, sha256Sum, nil)
} }
// PutObjectPartWithMetadata - upload an object part with additional request metadata. // PutObjectPartWithMetadata - upload an object part with additional request metadata.
func (c Core) PutObjectPartWithMetadata(bucket, object, uploadID string, partID int, func (c Core) PutObjectPartWithMetadata(bucket, object, uploadID string, partID int, data io.Reader,
size int64, data io.Reader, md5Sum, sha256Sum []byte, metadata map[string][]string) (ObjectPart, error) { size int64, md5Sum, sha256Sum []byte, metadata map[string]string) (ObjectPart, error) {
return c.uploadPart(bucket, object, uploadID, data, partID, md5Sum, sha256Sum, size, metadata) return c.uploadPart(context.Background(), bucket, object, uploadID, data, partID, md5Sum, sha256Sum, size, metadata)
} }
// ListObjectParts - List uploaded parts of an incomplete upload.x // ListObjectParts - List uploaded parts of an incomplete upload.x
@ -86,7 +87,7 @@ func (c Core) ListObjectParts(bucket, object, uploadID string, partNumberMarker
// CompleteMultipartUpload - Concatenate uploaded parts and commit to an object. // CompleteMultipartUpload - Concatenate uploaded parts and commit to an object.
func (c Core) CompleteMultipartUpload(bucket, object, uploadID string, parts []CompletePart) error { func (c Core) CompleteMultipartUpload(bucket, object, uploadID string, parts []CompletePart) error {
_, err := c.completeMultipartUpload(bucket, object, uploadID, completeMultipartUpload{ _, err := c.completeMultipartUpload(context.Background(), bucket, object, uploadID, completeMultipartUpload{
Parts: parts, Parts: parts,
}) })
return err return err
@ -94,7 +95,7 @@ func (c Core) CompleteMultipartUpload(bucket, object, uploadID string, parts []C
// AbortMultipartUpload - Abort an incomplete upload. // AbortMultipartUpload - Abort an incomplete upload.
func (c Core) AbortMultipartUpload(bucket, object, uploadID string) error { func (c Core) AbortMultipartUpload(bucket, object, uploadID string) error {
return c.abortMultipartUpload(bucket, object, uploadID) return c.abortMultipartUpload(context.Background(), bucket, object, uploadID)
} }
// GetBucketPolicy - fetches bucket access policy for a given bucket. // GetBucketPolicy - fetches bucket access policy for a given bucket.
@ -111,7 +112,7 @@ func (c Core) PutBucketPolicy(bucket string, bucketPolicy policy.BucketAccessPol
// partial objects and also downloading objects with special conditions // partial objects and also downloading objects with special conditions
// matching etag, modtime etc. // matching etag, modtime etc.
func (c Core) GetObject(bucketName, objectName string, reqHeaders RequestHeaders) (io.ReadCloser, ObjectInfo, error) { func (c Core) GetObject(bucketName, objectName string, reqHeaders RequestHeaders) (io.ReadCloser, ObjectInfo, error) {
return c.getObject(bucketName, objectName, reqHeaders) return c.getObject(context.Background(), bucketName, objectName, reqHeaders)
} }
// StatObject is a lower level API implemented to support special // StatObject is a lower level API implemented to support special

5889
vendor/github.com/minio/minio-go/functional_tests.go generated vendored Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,39 +0,0 @@
// +build go1.5,!go1.6,!go1.7,!go1.8
/*
* Minio Go Library for Amazon S3 Compatible Cloud Storage
* (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package minio
import (
"net/http"
"time"
)
// This default transport is similar to http.DefaultTransport
// but with additional DisableCompression:
var defaultMinioTransport http.RoundTripper = &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSHandshakeTimeout: 10 * time.Second,
// Set this value so that the underlying transport round-tripper
// doesn't try to auto decode the body of objects with
// content-encoding set to `gzip`.
//
// Refer:
// https://golang.org/src/net/http/transport.go?h=roundTrip#L1843
DisableCompression: true,
}

View File

@ -1,40 +0,0 @@
// +build go1.6,!go1.7,!go1.8
/*
* Minio Go Library for Amazon S3 Compatible Cloud Storage
* (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package minio
import (
"net/http"
"time"
)
// This default transport is similar to http.DefaultTransport
// but with additional DisableCompression:
var defaultMinioTransport http.RoundTripper = &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
// Set this value so that the underlying transport round-tripper
// doesn't try to auto decode the body of objects with
// content-encoding set to `gzip`.
//
// Refer:
// https://golang.org/src/net/http/transport.go?h=roundTrip#L1843
DisableCompression: true,
}

View File

@ -212,3 +212,41 @@ func getDefaultLocation(u url.URL, regionOverride string) (location string) {
// Default to location to 'us-east-1'. // Default to location to 'us-east-1'.
return "us-east-1" return "us-east-1"
} }
var supportedHeaders = []string{
"content-type",
"cache-control",
"content-encoding",
"content-disposition",
// Add more supported headers here.
}
// cseHeaders is list of client side encryption headers
var cseHeaders = []string{
"X-Amz-Iv",
"X-Amz-Key",
"X-Amz-Matdesc",
}
// isStandardHeader returns true if header is a supported header and not a custom header
func isStandardHeader(headerKey string) bool {
for _, header := range supportedHeaders {
if strings.Compare(strings.ToLower(headerKey), header) == 0 {
return true
}
}
return false
}
// isCSEHeader returns true if header is a client side encryption header.
func isCSEHeader(headerKey string) bool {
key := strings.ToLower(headerKey)
for _, h := range cseHeaders {
header := strings.ToLower(h)
if (header == key) ||
(("x-amz-meta-" + header) == key) {
return true
}
}
return false
}

6
vendor/vendor.json vendored
View File

@ -312,10 +312,10 @@
"revisionTime": "2016-02-29T08:42:30-08:00" "revisionTime": "2016-02-29T08:42:30-08:00"
}, },
{ {
"checksumSHA1": "RoElkV9hrX7Zd8YivXD+JOJOumA=", "checksumSHA1": "mqxOM3CsubB09O0nDEe4efu0JLQ=",
"path": "github.com/minio/minio-go", "path": "github.com/minio/minio-go",
"revision": "84539d76271caeffb7a1d5f058bd83c6449f8145", "revision": "414c6b6a2e97428776cd831d9745589ebcf873e5",
"revisionTime": "2017-09-01T08:51:27Z" "revisionTime": "2017-09-27T19:03:45Z"
}, },
{ {
"checksumSHA1": "5juljGXPkBWENR2Os7dlnPQER48=", "checksumSHA1": "5juljGXPkBWENR2Os7dlnPQER48=",