config/main: Re-write config files - add to new config v3

- New config format.

```
{
	"version": "3",
	"address": ":9000",
    "backend": {
          "type": "fs",
          "disk": "/path"
    },
	"credential": {
		"accessKey": "WLGDGYAQYIGI833EV05A",
		"secretKey": "BYvgJM101sHngl2uzjXS/OBF/aMxAN06JrJ3qJlF"
	},
	"region": "us-east-1",
	"logger": {
		"file": {
			"enable": false,
			"fileName": "",
			"level": "error"
		},
		"syslog": {
			"enable": false,
			"address": "",
			"level": "debug"
		},
		"console": {
			"enable": true,
			"level": "fatal"
		}
	}
}
```

New command lines in lieu of supporting XL.

Minio initialize filesystem backend.
~~~
$ minio init fs <path>
~~~

Minio initialize XL backend.
~~~
$ minio init xl <url1>...<url16>
~~~

For 'fs' backend it starts the server.
~~~
$ minio server
~~~

For 'xl' backend it waits for servers to join.
~~~
$ minio server
... [PROGRESS BAR] of servers connecting
~~~

Now on other servers execute 'join' and they connect.
~~~
....
minio join <url1> -- from <url2> && minio server
minio join <url1> -- from <url3> && minio server
...
...
minio join <url1> -- from <url16> && minio server
~~~
This commit is contained in:
Harshavardhana
2016-02-12 15:27:10 -08:00
parent 85e50f2bb9
commit aaf97ea02c
97 changed files with 2716 additions and 23396 deletions

499
vendor/github.com/minio/minio-go/API.md generated vendored Normal file
View File

@@ -0,0 +1,499 @@
## API Documentation
### Minio client object creation
Minio client object is created using minio-go:
```go
package main
import (
"fmt"
"github.com/minio/minio-go"
)
func main() {
s3Client, err := minio.New("s3.amazonaws.com", "YOUR-ACCESSKEYID", "YOUR-SECRETACCESSKEY", false)
if err !!= nil {
fmt.Println(err)
return
}
}
```
s3Client can be used to perform operations on S3 storage. APIs are described below.
### Bucket operations
* [`MakeBucket`](#MakeBucket)
* [`ListBuckets`](#ListBuckets)
* [`BucketExists`](#BucketExists)
* [`RemoveBucket`](#RemoveBucket)
* [`ListObjects`](#ListObjects)
* [`ListIncompleteUploads`](#ListIncompleteUploads)
### Object operations
* [`GetObject`](#GetObject)
* [`PutObject`](#PutObject)
* [`StatObject`](#StatObject)
* [`RemoveObject`](#RemoveObject)
* [`RemoveIncompleteUpload`](#RemoveIncompleteUpload)
### File operations.
* [`FPutObject`](#FPutObject)
* [`FGetObject`](#FPutObject)
### Bucket policy operations.
* [`SetBucketPolicy`](#SetBucketPolicy)
* [`GetBucketPolicy`](#GetBucketPolicy)
* [`RemoveBucketPolicy`](#RemoveBucketPolicy)
### Presigned operations
* [`PresignedGetObject`](#PresignedGetObject)
* [`PresignedPutObject`](#PresignedPutObject)
* [`PresignedPostPolicy`](#PresignedPostPolicy)
### Bucket operations
---------------------------------------
<a name="MakeBucket">
#### MakeBucket(bucketName, location)
Create a new bucket.
__Arguments__
* `bucketName` _string_ - Name of the bucket.
* `location` _string_ - region valid values are _us-west-1_, _us-west-2_, _eu-west-1_, _eu-central-1_, _ap-southeast-1_, _ap-northeast-1_, _ap-southeast-2_, _sa-east-1_
__Example__
```go
err := s3Client.MakeBucket("mybucket", "us-west-1")
if err != nil {
fmt.Println(err)
return
}
fmt.Println("Successfully created mybucket.")
```
---------------------------------------
<a name="ListBuckets">
#### ListBuckets()
List all buckets.
`bucketList` emits bucket with the format:
* `bucket.Name` _string_: bucket name
* `bucket.CreationDate` time.Time : date when bucket was created
__Example__
```go
buckets, err := s3Client.ListBuckets()
if err != nil {
fmt.Println(err)
return
}
for _, bucket := range buckets {
fmt.Println(bucket)
}
```
---------------------------------------
<a name="BucketExists">
#### BucketExists(bucketName)
Check if bucket exists.
__Arguments__
* `bucketName` _string_ : name of the bucket
__Example__
```go
err := s3Client.BucketExists("mybucket")
if err != nil {
fmt.Println(err)
return
}
```
---------------------------------------
<a name="RemoveBucket">
#### RemoveBucket(bucketName)
Remove a bucket.
__Arguments__
* `bucketName` _string_ : name of the bucket
__Example__
```go
err := s3Client.RemoveBucket("mybucket")
if err != nil {
fmt.Println(err)
return
}
```
---------------------------------------
<a name="GetBucketPolicy">
#### GetBucketPolicy(bucketName, objectPrefix)
Get access permissions on a bucket or a prefix.
__Arguments__
* `bucketName` _string_ : name of the bucket
* `objectPrefix` _string_ : name of the object prefix
__Example__
```go
bucketPolicy, err := s3Client.GetBucketPolicy("mybucket")
if err != nil {
fmt.Println(err)
return
}
fmt.Println("Access permissions for mybucket is", bucketPolicy)
```
---------------------------------------
<a name="SetBucketPolicy">
#### SetBucketPolicy(bucketname, objectPrefix, policy)
Set access permissions on bucket or an object prefix.
__Arguments__
* `bucketName` _string_: name of the bucket
* `objectPrefix` _string_ : name of the object prefix
* `policy` _BucketPolicy_: policy can be _non_, _readonly_, _readwrite_, _writeonly_
__Example__
```go
err := s3Client.SetBucketPolicy("mybucket", "myprefix", "readwrite")
if err != nil {
fmt.Println(err)
return
}
```
---------------------------------------
<a name="RemoveBucketPolicy">
#### RemoveBucketPolicy(bucketname, objectPrefix)
Remove existing permissions on bucket or an object prefix.
__Arguments__
* `bucketName` _string_: name of the bucket
* `objectPrefix` _string_ : name of the object prefix
__Example__
```go
err := s3Client.RemoveBucketPolicy("mybucket", "myprefix")
if err != nil {
fmt.Println(err)
return
}
```
---------------------------------------
<a name="ListObjects">
#### ListObjects(bucketName, prefix, recursive, doneCh)
List objects in a bucket.
__Arguments__
* `bucketName` _string_: name of the bucket
* `objectPrefix` _string_: the prefix of the objects that should be listed
* `recursive` _bool_: `true` indicates recursive style listing and `false` indicates directory style listing delimited by '/'
* `doneCh` chan struct{} : channel for pro-actively closing the internal go routine
__Return Value__
* `<-chan ObjectInfo` _chan ObjectInfo_: Read channel for all the objects in the bucket, the object is of the format:
* `objectInfo.Key` _string_: name of the object
* `objectInfo.Size` _int64_: size of the object
* `objectInfo.ETag` _string_: etag of the object
* `objectInfo.LastModified` _time.Time_: modified time stamp
__Example__
```go
// Create a done channel to control 'ListObjects' go routine.
doneCh := make(chan struct{})
// Indicate to our routine to exit cleanly upon return.
defer close(doneCh)
isRecursive := true
objectCh := s3Client.ListObjects("mybucket", "myprefix", isRecursive, doneCh)
for object := range objectCh {
if object.Err != nil {
fmt.Println(object.Err)
return
}
fmt.Println(object)
}
```
---------------------------------------
<a name="ListIncompleteUploads">
#### ListIncompleteUploads(bucketName, prefix, recursive)
List partially uploaded objects in a bucket.
__Arguments__
* `bucketname` _string_: name of the bucket
* `prefix` _string_: prefix of the object names that are partially uploaded
* `recursive` bool: directory style listing when false, recursive listing when true
* `doneCh` chan struct{} : channel for pro-actively closing the internal go routine
__Return Value__
* `<-chan ObjectMultipartInfo` _chan ObjectMultipartInfo_ : emits multipart objects of the format:
* `multiPartObjInfo.Key` _string_: name of the incomplete object
* `multiPartObjInfo.UploadID` _string_: upload ID of the incomplete object
* `multiPartObjInfo.Size` _int64_: size of the incompletely uploaded object
__Example__
```go
// Create a done channel to control 'ListObjects' go routine.
doneCh := make(chan struct{})
// Indicate to our routine to exit cleanly upon return.
defer close(doneCh)
isRecursive := true
multiPartObjectCh := s3Client.ListIncompleteUploads("mybucket", "myprefix", isRecursive, doneCh)
for multiPartObject := range multiPartObjectCh {
if multiPartObject.Err != nil {
fmt.Println(multiPartObject.Err)
return
}
fmt.Println(multiPartObject)
}
```
---------------------------------------
### Object operations
<a name="GetObject">
#### GetObject(bucketName, objectName)
Download an object.
__Arguments__
* `bucketName` _string_: name of the bucket
* `objectName` _string_: name of the object
__Return Value__
* `object` _*minio.Object_ : _minio.Object_ represents object reader.
__Example__
```go
object, err := s3Client.GetObject("mybucket", "photo.jpg")
if err != nil {
fmt.Println(err)
return
}
localFile _ := os.Open("/tmp/local-file")
if _, err := io.Copy(localFile, object); err != nil {
fmt.Println(err)
return
}
```
---------------------------------------
---------------------------------------
<a name="FGetObject">
#### FGetObject(bucketName, objectName, filePath)
Callback is called with `error` in case of error or `null` in case of success
__Arguments__
* `bucketName` _string_: name of the bucket
* `objectName` _string_: name of the object
* `filePath` _string_: path to which the object data will be written to
__Example__
```go
err := s3Client.FGetObject("mybucket", "photo.jpg", "/tmp/photo.jpg")
if err != nil {
fmt.Println(err)
return
}
```
---------------------------------------
<a name="PutObject">
#### PutObject(bucketName, objectName, reader, contentType)
Upload an object.
Uploading a stream
__Arguments__
* `bucketName` _string_: name of the bucket
* `objectName` _string_: name of the object
* `reader` _io.Reader_: Any golang object implementing io.Reader
* `contentType` _string_: content type of the object.
__Example__
```go
file, err := os.Open("my-testfile")
if err != nil {
fmt.Println(err)
return
}
defer file.Close()
n, err := s3Client.PutObject("my-bucketname", "my-objectname", object, "application/octet-stream")
if err != nil {
fmt.Println(err)
return
}
```
---------------------------------------
<a name="FPutObject">
#### FPutObject(bucketName, objectName, filePath, contentType)
Uploads the object using contents from a file
__Arguments__
* `bucketName` _string_: name of the bucket
* `objectName` _string_: name of the object
* `filePath` _string_: file path of the file to be uploaded
* `contentType` _string_: content type of the object
__Example__
```go
n, err := s3Client.FPutObject("my-bucketname", "my-objectname", "/tmp/my-filename.csv", "application/csv")
if err != nil {
fmt.Println(err)
return
}
```
---------------------------------------
<a name="StatObject">
#### StatObject(bucketName, objectName)
Get metadata of an object.
__Arguments__
* `bucketName` _string_: name of the bucket
* `objectName` _string_: name of the object
__Return Value__
`objInfo` _ObjectInfo_ : object stat info for following format:
* `objInfo.Size` _int64_: size of the object
* `objInfo.ETag` _string_: etag of the object
* `objInfo.ContentType` _string_: Content-Type of the object
* `objInfo.LastModified` _string_: modified time stamp
__Example__
```go
objInfo, err := s3Client.StatObject("mybucket", "photo.jpg")
if err != nil {
fmt.Println(err)
return
}
fmt.Println(objInfo)
```
---------------------------------------
<a name="RemoveObject">
#### RemoveObject(bucketName, objectName)
Remove an object.
__Arguments__
* `bucketName` _string_: name of the bucket
* `objectName` _string_: name of the object
__Example__
```go
err := s3Client.RemoveObject("mybucket", "photo.jpg")
if err != nil {
fmt.Println(err)
return
}
```
---------------------------------------
<a name="RemoveIncompleteUpload">
#### RemoveIncompleteUpload(bucketName, objectName)
Remove an partially uploaded object.
__Arguments__
* `bucketName` _string_: name of the bucket
* `objectName` _string_: name of the object
__Example__
```go
err := s3Client.RemoveIncompleteUpload("mybucket", "photo.jpg")
if err != nil {
fmt.Println(err)
return
}
```
### Presigned operations
---------------------------------------
<a name="PresignedGetObject">
#### PresignedGetObject(bucketName, objectName, expiry)
Generate a presigned URL for GET.
__Arguments__
* `bucketName` _string_: name of the bucket.
* `objectName` _string_: name of the object.
* `expiry` _time.Duration_: expiry in seconds.
`reqParams` _url.Values_ : additional response header overrides supports _response-expires_, _response-content-type_, _response-cache-control_, _response-content-disposition_
__Example__
```go
// Set request parameters for content-disposition.
reqParams := make(url.Values)
reqParams.Set("response-content-disposition", "attachment; filename=\"your-filename.txt\"")
// Generates a presigned url which expires in a day.
presignedURL, err := s3Client.PresignedGetObject("mybucket", "photo.jpg", time.Second * 24 * 60 * 60, reqParams)
if err != nil {
fmt.Println(err)
return
}
```
---------------------------------------
<a name="PresignedPutObject">
#### PresignedPutObject(bucketName, objectName, expiry)
Generate a presigned URL for PUT.
<blockquote>
NOTE: you can upload to S3 only with specified object name.
</blockquote>
__Arguments__
* `bucketName` _string_: name of the bucket
* `objectName` _string_: name of the object
* `expiry` _time.Duration_: expiry in seconds
__Example__
```go
// Generates a url which expires in a day.
presignedURL, err := s3Client.PresignedPutObject("mybucket", "photo.jpg", time.Second * 24 * 60 * 60)
if err != nil {
fmt.Println(err)
return
}
```
---------------------------------------
<a name="PresignedPostPolicy">
#### PresignedPostPolicy
PresignedPostPolicy we can provide policies specifying conditions restricting
what you want to allow in a POST request, such as bucket name where objects can be
uploaded, key name prefixes that you want to allow for the object being created and more.
We need to create our policy first:
```go
policy := minio.NewPostPolicy()
```
Apply upload policy restrictions:
```go
policy.SetBucket("my-bucketname")
policy.SetKey("my-objectname")
policy.SetExpires(time.Now().UTC().AddDate(0, 0, 10)) // expires in 10 days
// Only allow 'png' images.
policy.SetContentType("image/png")
// Only allow content size in range 1KB to 1MB.
policy.SetContentLengthRange(1024, 1024*1024)
```
Get the POST form key/value object:
```go
formData, err := s3Client.PresignedPostPolicy(policy)
if err != nil {
fmt.Println(err)
return
}
```
POST your content from the command line using `curl`:
```go
fmt.Printf("curl ")
for k, v := range m {
fmt.Printf("-F %s=%s ", k, v)
}
fmt.Printf("-F file=@/etc/bash.bashrc ")
fmt.Printf("https://my-bucketname.s3.amazonaws.com\n")
```

View File

@@ -71,7 +71,7 @@ export GOROOT=$(brew --prefix)/Cellar/go/${GOVERSION}/libexec
export PATH=$PATH:${GOPATH}/bin
```
##### Source the new enviornment
##### Source the new environment
```sh
$ source ~/.bash_profile

View File

@@ -61,12 +61,14 @@ func main() {
## Documentation
[API documentation](./API.md)
## Examples
### Bucket Operations.
* [MakeBucket(bucketName, BucketACL, location) error](examples/s3/makebucket.go)
* [MakeBucket(bucketName, location) error](examples/s3/makebucket.go)
* [BucketExists(bucketName) error](examples/s3/bucketexists.go)
* [RemoveBucket(bucketName) error](examples/s3/removebucket.go)
* [GetBucketACL(bucketName) (BucketACL, error)](examples/s3/getbucketacl.go)
* [SetBucketACL(bucketName, BucketACL) error)](examples/s3/setbucketacl.go)
* [ListBuckets() []BucketInfo](examples/s3/listbuckets.go)
* [ListObjects(bucketName, objectPrefix, recursive, chan<- struct{}) <-chan ObjectInfo](examples/s3/listobjects.go)
* [ListIncompleteUploads(bucketName, prefix, recursive, chan<- struct{}) <-chan ObjectMultipartInfo](examples/s3/listincompleteuploads.go)
@@ -87,6 +89,11 @@ func main() {
* [PresignedPutObject(bucketName, objectName, time.Duration) (string, error)](examples/s3/presignedputobject.go)
* [PresignedPostPolicy(NewPostPolicy()) (map[string]string, error)](examples/s3/presignedpostpolicy.go)
### Bucket Policy Operations.
* [SetBucketPolicy(bucketName, objectPrefix, BucketPolicy) error](examples/s3/setbucketpolicy.go)
* [GetBucketPolicy(bucketName, objectPrefix) (BucketPolicy, error)](examples/s3/getbucketpolicy.go)
* [RemoveBucketPolicy(bucketName, objectPrefix) error](examples/s3/removebucketpolicy.go)
### API Reference
[![GoDoc](http://img.shields.io/badge/go-documentation-blue.svg?style=flat-square)](http://godoc.org/github.com/minio/minio-go)

View File

@@ -20,115 +20,89 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"sort"
"strings"
"sync"
"time"
)
// GetBucketACL - Get the permissions on an existing bucket.
//
// Returned values are:
//
// private - Owner gets full access.
// public-read - Owner gets full access, others get read access.
// public-read-write - Owner gets full access, others get full access
// too.
// authenticated-read - Owner gets full access, authenticated users
// get read access.
func (c Client) GetBucketACL(bucketName string) (BucketACL, error) {
// GetBucketPolicy - get bucket policy at a given path.
func (c Client) GetBucketPolicy(bucketName, objectPrefix string) (bucketPolicy BucketPolicy, err error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return "", err
return BucketPolicyNone, err
}
if err := isValidObjectPrefix(objectPrefix); err != nil {
return BucketPolicyNone, err
}
policy, err := c.getBucketPolicy(bucketName, objectPrefix)
if err != nil {
return BucketPolicyNone, err
}
if policy.Statements == nil {
return BucketPolicyNone, nil
}
if isBucketPolicyReadWrite(policy.Statements, bucketName, objectPrefix) {
return BucketPolicyReadWrite, nil
} else if isBucketPolicyWriteOnly(policy.Statements, bucketName, objectPrefix) {
return BucketPolicyWriteOnly, nil
} else if isBucketPolicyReadOnly(policy.Statements, bucketName, objectPrefix) {
return BucketPolicyReadOnly, nil
}
return BucketPolicyNone, nil
}
// Set acl query.
func (c Client) getBucketPolicy(bucketName string, objectPrefix string) (BucketAccessPolicy, error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return BucketAccessPolicy{}, err
}
if err := isValidObjectPrefix(objectPrefix); err != nil {
return BucketAccessPolicy{}, err
}
// Get resources properly escaped and lined up before
// using them in http request.
urlValues := make(url.Values)
urlValues.Set("acl", "")
urlValues.Set("policy", "")
// Execute GET acl on bucketName.
// Execute GET on bucket to list objects.
resp, err := c.executeMethod("GET", requestMetadata{
bucketName: bucketName,
queryValues: urlValues,
})
defer closeResponse(resp)
if err != nil {
return "", err
return BucketAccessPolicy{}, err
}
if resp != nil {
if resp.StatusCode != http.StatusOK {
return "", httpRespToErrorResponse(resp, bucketName, "")
}
}
// Decode access control policy.
policy := accessControlPolicy{}
err = xmlDecoder(resp.Body, &policy)
if err != nil {
return "", err
}
// We need to avoid following de-serialization check for Google
// Cloud Storage. On Google Cloud Storage "private" canned ACL's
// policy do not have grant list. Treat it as a valid case, check
// for all other vendors.
if !isGoogleEndpoint(c.endpointURL) {
if policy.AccessControlList.Grant == nil {
errorResponse := ErrorResponse{
Code: "InternalError",
Message: "Access control Grant list is empty. " + reportIssue,
BucketName: bucketName,
RequestID: resp.Header.Get("x-amz-request-id"),
HostID: resp.Header.Get("x-amz-id-2"),
Region: resp.Header.Get("x-amz-bucket-region"),
errResponse := httpRespToErrorResponse(resp, bucketName, "")
if ToErrorResponse(errResponse).Code == "NoSuchBucketPolicy" {
return BucketAccessPolicy{Version: "2012-10-17"}, nil
}
return "", errorResponse
return BucketAccessPolicy{}, errResponse
}
}
// Boolean cues to indentify right canned acls.
var publicRead, publicWrite, authenticatedRead bool
// Handle grants.
grants := policy.AccessControlList.Grant
for _, g := range grants {
if g.Grantee.URI == "" && g.Permission == "FULL_CONTROL" {
continue
}
if g.Grantee.URI == "http://acs.amazonaws.com/groups/global/AuthenticatedUsers" && g.Permission == "READ" {
authenticatedRead = true
break
} else if g.Grantee.URI == "http://acs.amazonaws.com/groups/global/AllUsers" && g.Permission == "WRITE" {
publicWrite = true
} else if g.Grantee.URI == "http://acs.amazonaws.com/groups/global/AllUsers" && g.Permission == "READ" {
publicRead = true
}
// Read access policy up to maxAccessPolicySize.
// http://docs.aws.amazon.com/AmazonS3/latest/dev/access-policy-language-overview.html
// bucket policies are limited to 20KB in size, using a limit reader.
bucketPolicyBuf, err := ioutil.ReadAll(io.LimitReader(resp.Body, maxAccessPolicySize))
if err != nil {
return BucketAccessPolicy{}, err
}
// Verify if acl is authenticated read.
if authenticatedRead {
return BucketACL("authenticated-read"), nil
policy, err := unMarshalBucketPolicy(bucketPolicyBuf)
if err != nil {
return BucketAccessPolicy{}, err
}
// Verify if acl is private.
if !publicWrite && !publicRead {
return BucketACL("private"), nil
}
// Verify if acl is public-read.
if !publicWrite && publicRead {
return BucketACL("public-read"), nil
}
// Verify if acl is public-read-write.
if publicRead && publicWrite {
return BucketACL("public-read-write"), nil
}
return "", ErrorResponse{
Code: "NoSuchBucketPolicy",
Message: "The specified bucket does not have a bucket policy.",
BucketName: bucketName,
RequestID: "minio",
// Sort the policy actions and resources for convenience.
for _, statement := range policy.Statements {
sort.Strings(statement.Actions)
sort.Strings(statement.Resources)
}
return policy, nil
}
// GetObject - returns an seekable, readable object.
@@ -140,8 +114,9 @@ func (c Client) GetObject(bucketName, objectName string) (*Object, error) {
if err := isValidObjectName(objectName); err != nil {
return nil, err
}
// Send an explicit info to get the actual object size.
objectInfo, err := c.StatObject(bucketName, objectName)
// Start the request as soon Get is initiated.
httpReader, objectInfo, err := c.getObject(bucketName, objectName, 0, 0)
if err != nil {
return nil, err
}
@@ -153,8 +128,7 @@ func (c Client) GetObject(bucketName, objectName string) (*Object, error) {
// Create done channel.
doneCh := make(chan struct{})
// This routine feeds partial object data as and when the caller
// reads.
// This routine feeds partial object data as and when the caller reads.
go func() {
defer close(reqCh)
defer close(resCh)
@@ -167,27 +141,27 @@ func (c Client) GetObject(bucketName, objectName string) (*Object, error) {
return
// Request message.
case req := <-reqCh:
// Get shortest length.
// NOTE: Last remaining bytes are usually smaller than
// req.Buffer size. Use that as the final length.
// Don't use Math.min() here to avoid converting int64 to float64
length := int64(len(req.Buffer))
if objectInfo.Size-req.Offset < length {
length = objectInfo.Size - req.Offset
}
httpReader, _, err := c.getObject(bucketName, objectName, req.Offset, int64(length))
if err != nil {
resCh <- readResponse{
Error: err,
// Offset changes fetch the new object at an Offset.
if req.DidOffsetChange {
// Read from offset.
httpReader, _, err = c.getObject(bucketName, objectName, req.Offset, 0)
if err != nil {
resCh <- readResponse{
Error: err,
}
return
}
return
}
// Read at least req.Buffer bytes, if not we have
// reached our EOF.
size, err := io.ReadFull(httpReader, req.Buffer)
if err == io.ErrUnexpectedEOF {
// If an EOF happens after reading some but not
// all the bytes ReadFull returns ErrUnexpectedEOF
err = io.EOF
}
// Reply back how much was read.
resCh <- readResponse{
Size: int(size),
Error: err,
@@ -208,8 +182,9 @@ type readResponse struct {
// Read request message container to communicate with internal
// go-routine.
type readRequest struct {
Buffer []byte
Offset int64 // readAt offset.
Buffer []byte
Offset int64 // readAt offset.
DidOffsetChange bool
}
// Object represents an open object. It implements Read, ReadAt,
@@ -222,6 +197,7 @@ type Object struct {
reqCh chan<- readRequest
resCh <-chan readResponse
doneCh chan<- struct{}
prevOffset int64
currOffset int64
objectInfo ObjectInfo
@@ -244,7 +220,7 @@ func (o *Object) Read(b []byte) (n int, err error) {
o.mutex.Lock()
defer o.mutex.Unlock()
// Previous prevErr is which was saved in previous operation.
// prevErr is previous error saved from previous operation.
if o.prevErr != nil || o.isClosed {
return 0, o.prevErr
}
@@ -254,13 +230,27 @@ func (o *Object) Read(b []byte) (n int, err error) {
return 0, io.EOF
}
// Send current information over control channel to indicate we
// are ready.
// Send current information over control channel to indicate we are ready.
reqMsg := readRequest{}
// Send the offset and pointer to the buffer over the channel.
// Send the pointer to the buffer over the channel.
reqMsg.Buffer = b
reqMsg.Offset = o.currOffset
// Verify if offset has changed and currOffset is greater than
// previous offset. Perhaps due to Seek().
offsetChange := o.prevOffset - o.currOffset
if offsetChange < 0 {
offsetChange = -offsetChange
}
if offsetChange > 0 {
// Fetch the new reader at the current offset again.
reqMsg.Offset = o.currOffset
reqMsg.DidOffsetChange = true
} else {
// No offset changes no need to fetch new reader, continue
// reading.
reqMsg.DidOffsetChange = false
reqMsg.Offset = 0
}
// Send read request over the control channel.
o.reqCh <- reqMsg
@@ -274,6 +264,9 @@ func (o *Object) Read(b []byte) (n int, err error) {
// Update current offset.
o.currOffset += bytesRead
// Save the current offset as previous offset.
o.prevOffset = o.currOffset
if dataMsg.Error == nil {
// If currOffset read is equal to objectSize
// We have reached end of file, we return io.EOF.
@@ -317,7 +310,7 @@ func (o *Object) ReadAt(b []byte, offset int64) (n int, err error) {
o.mutex.Lock()
defer o.mutex.Unlock()
// prevErr is which was saved in previous operation.
// prevErr is error which was saved in previous operation.
if o.prevErr != nil || o.isClosed {
return 0, o.prevErr
}
@@ -334,7 +327,16 @@ func (o *Object) ReadAt(b []byte, offset int64) (n int, err error) {
// Send the offset and pointer to the buffer over the channel.
reqMsg.Buffer = b
reqMsg.Offset = offset
// For ReadAt offset always changes, minor optimization where
// offset same as currOffset we don't change the offset.
reqMsg.DidOffsetChange = offset != o.currOffset
if reqMsg.DidOffsetChange {
// Set new offset.
reqMsg.Offset = offset
// Save new offset as current offset.
o.currOffset = offset
}
// Send read request over the control channel.
o.reqCh <- reqMsg
@@ -345,10 +347,16 @@ func (o *Object) ReadAt(b []byte, offset int64) (n int, err error) {
// Bytes read.
bytesRead := int64(dataMsg.Size)
// Update current offset.
o.currOffset += bytesRead
// Save current offset as previous offset before returning.
o.prevOffset = o.currOffset
if dataMsg.Error == nil {
// If offset+bytes read is equal to objectSize
// If currentOffset is equal to objectSize
// we have reached end of file, we return io.EOF.
if offset+bytesRead == o.objectInfo.Size {
if o.currOffset >= o.objectInfo.Size {
return dataMsg.Size, io.EOF
}
return dataMsg.Size, nil
@@ -378,7 +386,7 @@ func (o *Object) Seek(offset int64, whence int) (n int64, err error) {
defer o.mutex.Unlock()
if o.prevErr != nil {
// At EOF seeking is legal, for any other errors we return.
// At EOF seeking is legal allow only io.EOF, for any other errors we return.
if o.prevErr != io.EOF {
return 0, o.prevErr
}
@@ -388,6 +396,11 @@ func (o *Object) Seek(offset int64, whence int) (n int64, err error) {
if offset < 0 && whence != 2 {
return 0, ErrInvalidArgument(fmt.Sprintf("Negative position not allowed for %d.", whence))
}
// Save current offset as previous offset.
o.prevOffset = o.currOffset
// Switch through whence.
switch whence {
default:
return 0, ErrInvalidArgument(fmt.Sprintf("Invalid whence %d", whence))

View File

@@ -18,8 +18,11 @@ package minio
import (
"bytes"
"encoding/base64"
"encoding/hex"
"encoding/json"
"encoding/xml"
"fmt"
"io/ioutil"
"net/http"
"net/url"
@@ -27,28 +30,18 @@ import (
/// Bucket operations
// MakeBucket makes a new bucket.
// MakeBucket creates a new bucket with bucketName.
//
// Optional arguments are acl and location - by default all buckets are created
// with ``private`` acl and in US Standard region.
//
// ACL valid values - http://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html
//
// private - owner gets full access [default].
// public-read - owner gets full access, all others get read access.
// public-read-write - owner gets full access, all others get full access too.
// authenticated-read - owner gets full access, authenticated users get read access.
// Location is an optional argument, by default all buckets are
// created in US Standard Region.
//
// For Amazon S3 for more supported regions - http://docs.aws.amazon.com/general/latest/gr/rande.html
// For Google Cloud Storage for more supported regions - https://cloud.google.com/storage/docs/bucket-locations
func (c Client) MakeBucket(bucketName string, acl BucketACL, location string) error {
func (c Client) MakeBucket(bucketName string, location string) error {
// Validate the input arguments.
if err := isValidBucketName(bucketName); err != nil {
return err
}
if !acl.isValidBucketACL() {
return ErrInvalidArgument("Unrecognized ACL " + acl.String())
}
// If location is empty, treat is a default region 'us-east-1'.
if location == "" {
@@ -56,7 +49,7 @@ func (c Client) MakeBucket(bucketName string, acl BucketACL, location string) er
}
// Instantiate the request.
req, err := c.makeBucketRequest(bucketName, acl, location)
req, err := c.makeBucketRequest(bucketName, location)
if err != nil {
return err
}
@@ -82,14 +75,11 @@ func (c Client) MakeBucket(bucketName string, acl BucketACL, location string) er
}
// makeBucketRequest constructs request for makeBucket.
func (c Client) makeBucketRequest(bucketName string, acl BucketACL, location string) (*http.Request, error) {
func (c Client) makeBucketRequest(bucketName string, location string) (*http.Request, error) {
// Validate input arguments.
if err := isValidBucketName(bucketName); err != nil {
return nil, err
}
if !acl.isValidBucketACL() {
return nil, ErrInvalidArgument("Unrecognized ACL " + acl.String())
}
// In case of Amazon S3. The make bucket issued on already
// existing bucket would fail with 'AuthorizationMalformed' error
@@ -106,12 +96,6 @@ func (c Client) makeBucketRequest(bucketName string, acl BucketACL, location str
return nil, err
}
// by default bucket acl is set to private.
req.Header.Set("x-amz-acl", "private")
if acl != "" {
req.Header.Set("x-amz-acl", string(acl))
}
// set UserAgent for the request.
c.setUserAgent(req)
@@ -131,9 +115,12 @@ func (c Client) makeBucketRequest(bucketName string, acl BucketACL, location str
}
createBucketConfigBuffer := bytes.NewBuffer(createBucketConfigBytes)
req.Body = ioutil.NopCloser(createBucketConfigBuffer)
req.ContentLength = int64(createBucketConfigBuffer.Len())
req.ContentLength = int64(len(createBucketConfigBytes))
// Set content-md5.
req.Header.Set("Content-Md5", base64.StdEncoding.EncodeToString(sumMD5(createBucketConfigBytes)))
if c.signature.isV4() {
req.Header.Set("X-Amz-Content-Sha256", hex.EncodeToString(sum256(createBucketConfigBuffer.Bytes())))
// Set sha256.
req.Header.Set("X-Amz-Content-Sha256", hex.EncodeToString(sum256(createBucketConfigBytes)))
}
}
@@ -150,57 +137,122 @@ func (c Client) makeBucketRequest(bucketName string, acl BucketACL, location str
return req, nil
}
// SetBucketACL set the permissions on an existing bucket using access control lists (ACL).
// SetBucketPolicy set the access permissions on an existing bucket.
//
// For example
//
// private - owner gets full access [default].
// public-read - owner gets full access, all others get read access.
// public-read-write - owner gets full access, all others get full access too.
// authenticated-read - owner gets full access, authenticated users get read access.
func (c Client) SetBucketACL(bucketName string, acl BucketACL) error {
// none - owner gets full access [default].
// readonly - anonymous get access for everyone at a given object prefix.
// readwrite - anonymous list/put/delete access to a given object prefix.
// writeonly - anonymous put/delete access to a given object prefix.
func (c Client) SetBucketPolicy(bucketName string, objectPrefix string, bucketPolicy BucketPolicy) error {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return err
}
if !acl.isValidBucketACL() {
return ErrInvalidArgument("Unrecognized ACL " + acl.String())
if err := isValidObjectPrefix(objectPrefix); err != nil {
return err
}
if !bucketPolicy.isValidBucketPolicy() {
return ErrInvalidArgument(fmt.Sprintf("Invalid bucket policy provided. %s", bucketPolicy))
}
policy, err := c.getBucketPolicy(bucketName, objectPrefix)
if err != nil {
return err
}
// For bucket policy set to 'none' we need to remove the policy.
if bucketPolicy == BucketPolicyNone && policy.Statements == nil {
// No policies to set, return success.
return nil
}
// Remove any previous policies at this path.
policy.Statements = removeBucketPolicyStatement(policy.Statements, bucketName, objectPrefix)
bucketResourceStatement := &Statement{}
objectResourceStatement := &Statement{}
if bucketPolicy == BucketPolicyReadWrite {
// Read write policy.
bucketResourceStatement.Effect = "Allow"
bucketResourceStatement.Principal.AWS = []string{"*"}
bucketResourceStatement.Resources = []string{fmt.Sprintf("%s%s", awsResourcePrefix, bucketName)}
bucketResourceStatement.Actions = readWriteBucketActions
objectResourceStatement.Effect = "Allow"
objectResourceStatement.Principal.AWS = []string{"*"}
objectResourceStatement.Resources = []string{fmt.Sprintf("%s%s", awsResourcePrefix, bucketName+"/"+objectPrefix+"*")}
objectResourceStatement.Actions = readWriteObjectActions
// Save the read write policy.
policy.Statements = append(policy.Statements, *bucketResourceStatement, *objectResourceStatement)
} else if bucketPolicy == BucketPolicyReadOnly {
// Read only policy.
bucketResourceStatement.Effect = "Allow"
bucketResourceStatement.Principal.AWS = []string{"*"}
bucketResourceStatement.Resources = []string{fmt.Sprintf("%s%s", awsResourcePrefix, bucketName)}
bucketResourceStatement.Actions = readOnlyBucketActions
objectResourceStatement.Effect = "Allow"
objectResourceStatement.Principal.AWS = []string{"*"}
objectResourceStatement.Resources = []string{fmt.Sprintf("%s%s", awsResourcePrefix, bucketName+"/"+objectPrefix+"*")}
objectResourceStatement.Actions = readOnlyObjectActions
// Save the read only policy.
policy.Statements = append(policy.Statements, *bucketResourceStatement, *objectResourceStatement)
} else if bucketPolicy == BucketPolicyWriteOnly {
// Write only policy.
bucketResourceStatement.Effect = "Allow"
bucketResourceStatement.Principal.AWS = []string{"*"}
bucketResourceStatement.Resources = []string{fmt.Sprintf("%s%s", awsResourcePrefix, bucketName)}
bucketResourceStatement.Actions = writeOnlyBucketActions
objectResourceStatement.Effect = "Allow"
objectResourceStatement.Principal.AWS = []string{"*"}
objectResourceStatement.Resources = []string{fmt.Sprintf("%s%s", awsResourcePrefix, bucketName+"/"+objectPrefix+"*")}
objectResourceStatement.Actions = writeOnlyObjectActions
// Save the write only policy.
policy.Statements = append(policy.Statements, *bucketResourceStatement, *objectResourceStatement)
}
// Save the updated policies.
return c.putBucketPolicy(bucketName, policy)
}
// Saves a new bucket policy.
func (c Client) putBucketPolicy(bucketName string, policy BucketAccessPolicy) error {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return err
}
// Set acl query.
// If there are no policy statements, we should remove entire policy.
if len(policy.Statements) == 0 {
return c.removeBucketPolicy(bucketName)
}
// Get resources properly escaped and lined up before
// using them in http request.
urlValues := make(url.Values)
urlValues.Set("acl", "")
urlValues.Set("policy", "")
// Add misc headers.
customHeader := make(http.Header)
if acl != "" {
customHeader.Set("x-amz-acl", acl.String())
} else {
customHeader.Set("x-amz-acl", "private")
policyBytes, err := json.Marshal(&policy)
if err != nil {
return err
}
// Execute PUT bucket.
resp, err := c.executeMethod("PUT", requestMetadata{
bucketName: bucketName,
queryValues: urlValues,
customHeader: customHeader,
})
policyBuffer := bytes.NewReader(policyBytes)
reqMetadata := requestMetadata{
bucketName: bucketName,
queryValues: urlValues,
contentBody: policyBuffer,
contentLength: int64(len(policyBytes)),
contentMD5Bytes: sumMD5(policyBytes),
contentSHA256Bytes: sum256(policyBytes),
}
// Execute PUT to upload a new bucket policy.
resp, err := c.executeMethod("PUT", reqMetadata)
defer closeResponse(resp)
if err != nil {
return err
}
if err != nil {
return err
}
if resp != nil {
// if error return.
if resp.StatusCode != http.StatusOK {
if resp.StatusCode != http.StatusNoContent {
return httpRespToErrorResponse(resp, bucketName, "")
}
}
// return
return nil
}

View File

@@ -19,6 +19,7 @@ package minio
import (
"crypto/md5"
"crypto/sha256"
"fmt"
"hash"
"io"
"math"
@@ -94,62 +95,13 @@ func optimalPartInfo(objectSize int64) (totalPartsCount int, partSize int64, las
return totalPartsCount, partSize, lastPartSize, nil
}
// Compatibility code for Golang < 1.5.x.
// copyBuffer is identical to io.CopyBuffer, since such a function is
// not available/implemented in Golang version < 1.5.x, we use a
// custom call exactly implementng io.CopyBuffer from Golang > 1.5.x
// version does.
// hashCopyBuffer is identical to hashCopyN except that it doesn't take
// any size argument but takes a buffer argument and reader should be
// of io.ReaderAt interface.
//
// copyBuffer stages through the provided buffer (if one is required)
// rather than allocating a temporary one. If buf is nil, one is
// allocated; otherwise if it has zero length, copyBuffer panics.
//
// FIXME: Remove this code when distributions move to newer Golang versions.
func copyBuffer(writer io.Writer, reader io.Reader, buf []byte) (written int64, err error) {
// If the reader has a WriteTo method, use it to do the copy.
// Avoids an allocation and a copy.
if wt, ok := reader.(io.WriterTo); ok {
return wt.WriteTo(writer)
}
// Similarly, if the writer has a ReadFrom method, use it to do
// the copy.
if rt, ok := writer.(io.ReaderFrom); ok {
return rt.ReadFrom(reader)
}
if buf == nil {
buf = make([]byte, 32*1024)
}
for {
nr, er := reader.Read(buf)
if nr > 0 {
nw, ew := writer.Write(buf[0:nr])
if nw > 0 {
written += int64(nw)
}
if ew != nil {
err = ew
break
}
if nr != nw {
err = io.ErrShortWrite
break
}
}
if er == io.EOF {
break
}
if er != nil {
err = er
break
}
}
return written, err
}
// hashCopyBuffer is identical to hashCopyN except that it stages
// through the provided buffer (if one is required) rather than
// allocating a temporary one. If buf is nil, one is allocated for 5MiB.
func (c Client) hashCopyBuffer(writer io.Writer, reader io.Reader, buf []byte) (md5Sum, sha256Sum []byte, size int64, err error) {
// Stages reads from offsets into the buffer, if buffer is nil it is
// initialized to optimalBufferSize.
func (c Client) hashCopyBuffer(writer io.Writer, reader io.ReaderAt, buf []byte) (md5Sum, sha256Sum []byte, size int64, err error) {
// MD5 and SHA256 hasher.
var hashMD5, hashSHA256 hash.Hash
// MD5 and SHA256 hasher.
@@ -160,14 +112,61 @@ func (c Client) hashCopyBuffer(writer io.Writer, reader io.Reader, buf []byte) (
hashWriter = io.MultiWriter(writer, hashMD5, hashSHA256)
}
// Allocate buf if not initialized.
// Buffer is nil, initialize.
if buf == nil {
buf = make([]byte, optimalReadBufferSize)
}
// Offset to start reading from.
var readAtOffset int64
// Following block reads data at an offset from the input
// reader and copies data to into local temporary file.
for {
readAtSize, rerr := reader.ReadAt(buf, readAtOffset)
if rerr != nil {
if rerr != io.EOF {
return nil, nil, 0, rerr
}
}
writeSize, werr := hashWriter.Write(buf[:readAtSize])
if werr != nil {
return nil, nil, 0, werr
}
if readAtSize != writeSize {
return nil, nil, 0, fmt.Errorf("Read size was not completely written to writer. wanted %d, got %d - %s", readAtSize, writeSize, reportIssue)
}
readAtOffset += int64(writeSize)
size += int64(writeSize)
if rerr == io.EOF {
break
}
}
// Finalize md5 sum and sha256 sum.
md5Sum = hashMD5.Sum(nil)
if c.signature.isV4() {
sha256Sum = hashSHA256.Sum(nil)
}
return md5Sum, sha256Sum, size, err
}
// hashCopy is identical to hashCopyN except that it doesn't take
// any size argument.
func (c Client) hashCopy(writer io.Writer, reader io.Reader) (md5Sum, sha256Sum []byte, size int64, err error) {
// MD5 and SHA256 hasher.
var hashMD5, hashSHA256 hash.Hash
// MD5 and SHA256 hasher.
hashMD5 = md5.New()
hashWriter := io.MultiWriter(writer, hashMD5)
if c.signature.isV4() {
hashSHA256 = sha256.New()
hashWriter = io.MultiWriter(writer, hashMD5, hashSHA256)
}
// Using copyBuffer to copy in large buffers, default buffer
// for io.Copy of 32KiB is too small.
size, err = copyBuffer(hashWriter, reader, buf)
size, err = io.Copy(hashWriter, reader)
if err != nil {
return nil, nil, 0, err
}
@@ -244,12 +243,8 @@ func (c Client) getUploadID(bucketName, objectName, contentType string) (uploadI
return uploadID, isNew, nil
}
// computeHashBuffer - Calculates MD5 and SHA256 for an input read
// Seeker is identical to computeHash except that it stages
// through the provided buffer (if one is required) rather than
// allocating a temporary one. If buf is nil, it uses a temporary
// buffer.
func (c Client) computeHashBuffer(reader io.ReadSeeker, buf []byte) (md5Sum, sha256Sum []byte, size int64, err error) {
// computeHash - Calculates MD5 and SHA256 for an input read Seeker.
func (c Client) computeHash(reader io.ReadSeeker) (md5Sum, sha256Sum []byte, size int64, err error) {
// MD5 and SHA256 hasher.
var hashMD5, hashSHA256 hash.Hash
// MD5 and SHA256 hasher.
@@ -261,16 +256,9 @@ func (c Client) computeHashBuffer(reader io.ReadSeeker, buf []byte) (md5Sum, sha
}
// If no buffer is provided, no need to allocate just use io.Copy.
if buf == nil {
size, err = io.Copy(hashWriter, reader)
if err != nil {
return nil, nil, 0, err
}
} else {
size, err = copyBuffer(hashWriter, reader, buf)
if err != nil {
return nil, nil, 0, err
}
size, err = io.Copy(hashWriter, reader)
if err != nil {
return nil, nil, 0, err
}
// Seek back reader to the beginning location.
@@ -285,8 +273,3 @@ func (c Client) computeHashBuffer(reader io.ReadSeeker, buf []byte) (md5Sum, sha
}
return md5Sum, sha256Sum, size, nil
}
// computeHash - Calculates MD5 and SHA256 for an input read Seeker.
func (c Client) computeHash(reader io.ReadSeeker) (md5Sum, sha256Sum []byte, size int64, err error) {
return c.computeHashBuffer(reader, nil)
}

View File

@@ -97,7 +97,7 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
tmpBuffer := new(bytes.Buffer)
// Read defaults to reading at 5MiB buffer.
readBuffer := make([]byte, optimalReadBufferSize)
readAtBuffer := make([]byte, optimalReadBufferSize)
// Upload all the missing parts.
for partNumber <= lastPartNumber {
@@ -147,7 +147,7 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
// Calculates MD5 and SHA256 sum for a section reader.
var md5Sum, sha256Sum []byte
var prtSize int64
md5Sum, sha256Sum, prtSize, err = c.hashCopyBuffer(tmpBuffer, sectionReader, readBuffer)
md5Sum, sha256Sum, prtSize, err = c.hashCopyBuffer(tmpBuffer, sectionReader, readAtBuffer)
if err != nil {
return 0, err
}

View File

@@ -50,6 +50,54 @@ func (c Client) RemoveBucket(bucketName string) error {
return nil
}
// RemoveBucketPolicy remove a bucket policy on given path.
func (c Client) RemoveBucketPolicy(bucketName, objectPrefix string) error {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return err
}
if err := isValidObjectPrefix(objectPrefix); err != nil {
return err
}
policy, err := c.getBucketPolicy(bucketName, objectPrefix)
if err != nil {
return err
}
// No bucket policy found, nothing to remove return success.
if policy.Statements == nil {
return nil
}
// Save new statements after removing requested bucket policy.
policy.Statements = removeBucketPolicyStatement(policy.Statements, bucketName, objectPrefix)
// Commit the update policy.
return c.putBucketPolicy(bucketName, policy)
}
// Removes all policies on a bucket.
func (c Client) removeBucketPolicy(bucketName string) error {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return err
}
// Get resources properly escaped and lined up before
// using them in http request.
urlValues := make(url.Values)
urlValues.Set("policy", "")
// Execute DELETE on objectName.
resp, err := c.executeMethod("DELETE", requestMetadata{
bucketName: bucketName,
queryValues: urlValues,
})
defer closeResponse(resp)
if err != nil {
return err
}
return nil
}
// RemoveObject remove an object from a bucket.
func (c Client) RemoveObject(bucketName, objectName string) error {
// Input validation.

View File

@@ -171,27 +171,3 @@ type createBucketConfiguration struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CreateBucketConfiguration" json:"-"`
Location string `xml:"LocationConstraint"`
}
// grant container for the grantee and his or her permissions.
type grant struct {
// grantee container for DisplayName and ID of the person being
// granted permissions.
Grantee struct {
ID string
DisplayName string
EmailAddress string
Type string
URI string
}
Permission string
}
// accessControlPolicy contains the elements providing ACL permissions
// for a bucket.
type accessControlPolicy struct {
// accessControlList container for ACL information.
AccessControlList struct {
Grant []grant
}
Owner owner
}

View File

@@ -23,6 +23,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"net/http"
"net/http/httputil"
"net/url"
@@ -30,6 +31,7 @@ import (
"regexp"
"runtime"
"strings"
"sync"
"time"
)
@@ -57,9 +59,12 @@ type Client struct {
httpClient *http.Client
bucketLocCache *bucketLocationCache
// Advanced functionality
// Advanced functionality.
isTraceEnabled bool
traceOutput io.Writer
// Random seed.
random *rand.Rand
}
// Global constants.
@@ -120,6 +125,29 @@ func New(endpoint string, accessKeyID, secretAccessKey string, insecure bool) (*
return clnt, nil
}
// lockedRandSource provides protected rand source, implements rand.Source interface.
type lockedRandSource struct {
lk sync.Mutex
src rand.Source
}
// Int63 returns a non-negative pseudo-random 63-bit integer as an
// int64.
func (r *lockedRandSource) Int63() (n int64) {
r.lk.Lock()
n = r.src.Int63()
r.lk.Unlock()
return
}
// Seed uses the provided seed value to initialize the generator to a
// deterministic state.
func (r *lockedRandSource) Seed(seed int64) {
r.lk.Lock()
r.src.Seed(seed)
r.lk.Unlock()
}
func privateNew(endpoint, accessKeyID, secretAccessKey string, insecure bool) (*Client, error) {
// construct endpoint.
endpointURL, err := getEndpointURL(endpoint, insecure)
@@ -147,8 +175,12 @@ func privateNew(endpoint, accessKeyID, secretAccessKey string, insecure bool) (*
Transport: http.DefaultTransport,
}
// Instantiae bucket location cache.
clnt.bucketLocCache = newBucketLocationCache()
// Introduce a new locked random seed.
clnt.random = rand.New(&lockedRandSource{src: rand.NewSource(time.Now().UTC().UnixNano())})
// Return.
return clnt, nil
}
@@ -384,7 +416,7 @@ func (c Client) executeMethod(method string, metadata requestMetadata) (res *htt
// error until maxRetries have been exhausted, retry attempts are
// performed after waiting for a given period of time in a
// binomial fashion.
for range newRetryTimer(MaxRetry, time.Second, time.Second*30, MaxJitter) {
for range c.newRetryTimer(MaxRetry, time.Second, time.Second*30, MaxJitter) {
if isRetryable {
// Seek back to beginning for each attempt.
if _, err = bodySeeker.Seek(0, 0); err != nil {
@@ -422,6 +454,15 @@ func (c Client) executeMethod(method string, metadata requestMetadata) (res *htt
}
}
// Read the body to be saved later.
errBodyBytes, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, err
}
// Save the body.
errBodySeeker := bytes.NewReader(errBodyBytes)
res.Body = ioutil.NopCloser(errBodySeeker)
// For errors verify if its retryable otherwise fail quickly.
errResponse := ToErrorResponse(httpRespToErrorResponse(res, metadata.bucketName, metadata.objectName))
// Bucket region if set in error response, we can retry the
@@ -435,6 +476,16 @@ func (c Client) executeMethod(method string, metadata requestMetadata) (res *htt
if isS3CodeRetryable(errResponse.Code) {
continue // Retry.
}
// Verify if http status code is retryable.
if isHTTPStatusRetryable(res.StatusCode) {
continue // Retry.
}
// Save the body back again.
errBodySeeker.Seek(0, 0) // Seek back to starting point.
res.Body = ioutil.NopCloser(errBodySeeker)
// For all other cases break out of the retry loop.
break
}
@@ -522,7 +573,7 @@ func (c Client) newRequest(method string, metadata requestMetadata) (req *http.R
// set md5Sum for content protection.
if metadata.contentMD5Bytes != nil {
req.Header.Set("Content-MD5", base64.StdEncoding.EncodeToString(metadata.contentMD5Bytes))
req.Header.Set("Content-Md5", base64.StdEncoding.EncodeToString(metadata.contentMD5Bytes))
}
// Sign the request for all authenticated requests.

View File

@@ -1,75 +0,0 @@
/*
* Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package minio
// BucketACL - Bucket level access control.
type BucketACL string
// Different types of ACL's currently supported for buckets.
const (
bucketPrivate = BucketACL("private")
bucketReadOnly = BucketACL("public-read")
bucketPublic = BucketACL("public-read-write")
bucketAuthenticated = BucketACL("authenticated-read")
)
// Stringify acl.
func (b BucketACL) String() string {
if string(b) == "" {
return "private"
}
return string(b)
}
// isValidBucketACL - Is provided acl string supported.
func (b BucketACL) isValidBucketACL() bool {
switch true {
case b.isPrivate():
fallthrough
case b.isReadOnly():
fallthrough
case b.isPublic():
fallthrough
case b.isAuthenticated():
return true
case b.String() == "private":
// By default its "private"
return true
default:
return false
}
}
// isPrivate - Is acl Private.
func (b BucketACL) isPrivate() bool {
return b == bucketPrivate
}
// isPublicRead - Is acl PublicRead.
func (b BucketACL) isReadOnly() bool {
return b == bucketReadOnly
}
// isPublicReadWrite - Is acl PublicReadWrite.
func (b BucketACL) isPublic() bool {
return b == bucketPublic
}
// isAuthenticated - Is acl AuthenticatedRead.
func (b BucketACL) isAuthenticated() bool {
return b == bucketAuthenticated
}

View File

@@ -21,6 +21,7 @@ import (
"net/http"
"net/url"
"path"
"strings"
"sync"
)
@@ -67,11 +68,6 @@ func (r *bucketLocationCache) Delete(bucketName string) {
// getBucketLocation - Get location for the bucketName from location map cache.
func (c Client) getBucketLocation(bucketName string) (string, error) {
// For anonymous requests, default to "us-east-1" and let other calls
// move forward.
if c.anonymous {
return "us-east-1", nil
}
if location, ok := c.bucketLocCache.Get(bucketName); ok {
return location, nil
}
@@ -92,15 +88,10 @@ func (c Client) getBucketLocation(bucketName string) (string, error) {
if resp.StatusCode != http.StatusOK {
err = httpRespToErrorResponse(resp, bucketName, "")
errResp := ToErrorResponse(err)
// AccessDenied without a signature mismatch code,
// usually means that the bucket policy has certain
// restrictions where some API operations are not
// allowed. Handle this case so that top level callers can
// interpret this easily and fall back if needed to a
// lower functionality call. Read each individual API
// specific code for such fallbacks.
if errResp.Code == "AccessDenied" && errResp.Message == "Access Denied" {
// In this case return as "us-east-1" and let the call fail.
// For access denied error, it could be an anonymous
// request. Move forward and let the top level callers
// succeed if possible based on their policy.
if errResp.Code == "AccessDenied" && strings.Contains(errResp.Message, "Access Denied") {
return "us-east-1", nil
}
return "", err

392
vendor/github.com/minio/minio-go/bucket-policy.go generated vendored Normal file
View File

@@ -0,0 +1,392 @@
/*
* Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package minio
import (
"encoding/json"
"sort"
)
// maximum supported access policy size.
const maxAccessPolicySize = 20 * 1024 * 1024 // 20KiB.
// Resource prefix for all aws resources.
const awsResourcePrefix = "arn:aws:s3:::"
// BucketPolicy - Bucket level policy.
type BucketPolicy string
// Different types of Policies currently supported for buckets.
const (
BucketPolicyNone BucketPolicy = "none"
BucketPolicyReadOnly = "readonly"
BucketPolicyReadWrite = "readwrite"
BucketPolicyWriteOnly = "writeonly"
)
// isValidBucketPolicy - Is provided policy value supported.
func (p BucketPolicy) isValidBucketPolicy() bool {
switch p {
case BucketPolicyNone, BucketPolicyReadOnly, BucketPolicyReadWrite, BucketPolicyWriteOnly:
return true
}
return false
}
// User - canonical users list.
type User struct {
AWS []string
}
// Statement - minio policy statement
type Statement struct {
Sid string
Effect string
Principal User `json:"Principal"`
Actions []string `json:"Action"`
Resources []string `json:"Resource"`
Conditions map[string]map[string]string `json:"Condition,omitempty"`
}
// BucketAccessPolicy - minio policy collection
type BucketAccessPolicy struct {
Version string // date in 0000-00-00 format
Statements []Statement `json:"Statement"`
}
// Read write actions.
var (
readWriteBucketActions = []string{
"s3:GetBucketLocation",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
// Add more bucket level read-write actions here.
}
readWriteObjectActions = []string{
"s3:AbortMultipartUpload",
"s3:DeleteObject",
"s3:GetObject",
"s3:ListMultipartUploadParts",
"s3:PutObject",
// Add more object level read-write actions here.
}
)
// Write only actions.
var (
writeOnlyBucketActions = []string{
"s3:GetBucketLocation",
"s3:ListBucketMultipartUploads",
// Add more bucket level write actions here.
}
writeOnlyObjectActions = []string{
"s3:AbortMultipartUpload",
"s3:DeleteObject",
"s3:ListMultipartUploadParts",
"s3:PutObject",
// Add more object level write actions here.
}
)
// Read only actions.
var (
readOnlyBucketActions = []string{
"s3:GetBucketLocation",
"s3:ListBucket",
// Add more bucket level read actions here.
}
readOnlyObjectActions = []string{
"s3:GetObject",
// Add more object level read actions here.
}
)
// subsetActions returns true if the first array is completely
// contained in the second array. There must be at least
// the same number of duplicate values in second as there
// are in first.
func subsetActions(first, second []string) bool {
set := make(map[string]int)
for _, value := range second {
set[value]++
}
for _, value := range first {
if count, found := set[value]; !found {
return false
} else if count < 1 {
return false
} else {
set[value] = count - 1
}
}
return true
}
// Verifies if we have read/write policy set at bucketName, objectPrefix.
func isBucketPolicyReadWrite(statements []Statement, bucketName string, objectPrefix string) bool {
var commonActions, readWrite bool
sort.Strings(readWriteBucketActions)
sort.Strings(readWriteObjectActions)
for _, statement := range statements {
for _, resource := range statement.Resources {
if resource == awsResourcePrefix+bucketName {
if subsetActions(readWriteBucketActions, statement.Actions) {
commonActions = true
continue
}
} else if resource == awsResourcePrefix+bucketName+"/"+objectPrefix+"*" {
if subsetActions(readWriteObjectActions, statement.Actions) {
readWrite = true
}
}
}
}
return commonActions && readWrite
}
// Verifies if we have write only policy set at bucketName, objectPrefix.
func isBucketPolicyWriteOnly(statements []Statement, bucketName string, objectPrefix string) bool {
var commonActions, writeOnly bool
sort.Strings(writeOnlyBucketActions)
sort.Strings(writeOnlyObjectActions)
for _, statement := range statements {
for _, resource := range statement.Resources {
if resource == awsResourcePrefix+bucketName {
if subsetActions(writeOnlyBucketActions, statement.Actions) {
commonActions = true
continue
}
} else if resource == awsResourcePrefix+bucketName+"/"+objectPrefix+"*" {
if subsetActions(writeOnlyObjectActions, statement.Actions) {
writeOnly = true
}
}
}
}
return commonActions && writeOnly
}
// Verifies if we have read only policy set at bucketName, objectPrefix.
func isBucketPolicyReadOnly(statements []Statement, bucketName string, objectPrefix string) bool {
var commonActions, readOnly bool
sort.Strings(readOnlyBucketActions)
sort.Strings(readOnlyObjectActions)
for _, statement := range statements {
for _, resource := range statement.Resources {
if resource == awsResourcePrefix+bucketName {
if subsetActions(readOnlyBucketActions, statement.Actions) {
commonActions = true
continue
}
} else if resource == awsResourcePrefix+bucketName+"/"+objectPrefix+"*" {
if subsetActions(readOnlyObjectActions, statement.Actions) {
readOnly = true
break
}
}
}
}
return commonActions && readOnly
}
// Removes read write bucket policy if found.
func removeBucketPolicyStatementReadWrite(statements []Statement, bucketName string, objectPrefix string) []Statement {
var newStatements []Statement
for _, statement := range statements {
for _, resource := range statement.Resources {
if resource == awsResourcePrefix+bucketName {
var newActions []string
for _, action := range statement.Actions {
switch action {
case "s3:GetBucketLocation", "s3:ListBucket", "s3:ListBucketMultipartUploads":
continue
}
newActions = append(newActions, action)
}
statement.Actions = newActions
} else if resource == awsResourcePrefix+bucketName+"/"+objectPrefix+"*" {
var newActions []string
for _, action := range statement.Actions {
switch action {
case "s3:PutObject", "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts", "s3:DeleteObject", "s3:GetObject":
continue
}
newActions = append(newActions, action)
}
statement.Actions = newActions
}
}
if len(statement.Actions) != 0 {
newStatements = append(newStatements, statement)
}
}
return newStatements
}
// Removes write only bucket policy if found.
func removeBucketPolicyStatementWriteOnly(statements []Statement, bucketName string, objectPrefix string) []Statement {
var newStatements []Statement
for _, statement := range statements {
for _, resource := range statement.Resources {
if resource == awsResourcePrefix+bucketName {
var newActions []string
for _, action := range statement.Actions {
switch action {
case "s3:GetBucketLocation", "s3:ListBucketMultipartUploads":
continue
}
newActions = append(newActions, action)
}
statement.Actions = newActions
} else if resource == awsResourcePrefix+bucketName+"/"+objectPrefix+"*" {
var newActions []string
for _, action := range statement.Actions {
switch action {
case "s3:PutObject", "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts", "s3:DeleteObject":
continue
}
newActions = append(newActions, action)
}
statement.Actions = newActions
}
}
if len(statement.Actions) != 0 {
newStatements = append(newStatements, statement)
}
}
return newStatements
}
// Removes read only bucket policy if found.
func removeBucketPolicyStatementReadOnly(statements []Statement, bucketName string, objectPrefix string) []Statement {
var newStatements []Statement
for _, statement := range statements {
for _, resource := range statement.Resources {
if resource == awsResourcePrefix+bucketName {
var newActions []string
for _, action := range statement.Actions {
switch action {
case "s3:GetBucketLocation", "s3:ListBucket":
continue
}
newActions = append(newActions, action)
}
statement.Actions = newActions
} else if resource == awsResourcePrefix+bucketName+"/"+objectPrefix+"*" {
var newActions []string
for _, action := range statement.Actions {
if action == "s3:GetObject" {
continue
}
newActions = append(newActions, action)
}
statement.Actions = newActions
}
}
if len(statement.Actions) != 0 {
newStatements = append(newStatements, statement)
}
}
return newStatements
}
// Remove bucket policies based on the type.
func removeBucketPolicyStatement(statements []Statement, bucketName string, objectPrefix string) []Statement {
// Verify type of policy to be removed.
if isBucketPolicyReadWrite(statements, bucketName, objectPrefix) {
statements = removeBucketPolicyStatementReadWrite(statements, bucketName, objectPrefix)
} else if isBucketPolicyWriteOnly(statements, bucketName, objectPrefix) {
statements = removeBucketPolicyStatementWriteOnly(statements, bucketName, objectPrefix)
} else if isBucketPolicyReadOnly(statements, bucketName, objectPrefix) {
statements = removeBucketPolicyStatementReadOnly(statements, bucketName, objectPrefix)
}
return statements
}
// Unmarshals bucket policy byte array into a structured bucket access policy.
func unMarshalBucketPolicy(bucketPolicyBuf []byte) (BucketAccessPolicy, error) {
// Untyped lazy JSON struct.
type bucketAccessPolicyUntyped struct {
Version string
Statement []struct {
Sid string
Effect string
Principal struct {
AWS json.RawMessage
}
Action json.RawMessage
Resource json.RawMessage
Condition map[string]map[string]string
}
}
var policyUntyped = bucketAccessPolicyUntyped{}
// Unmarshal incoming policy into an untyped structure, to be
// evaluated lazily later.
err := json.Unmarshal(bucketPolicyBuf, &policyUntyped)
if err != nil {
return BucketAccessPolicy{}, err
}
var policy = BucketAccessPolicy{}
policy.Version = policyUntyped.Version
for _, stmtUntyped := range policyUntyped.Statement {
statement := Statement{}
// These are properly typed messages.
statement.Sid = stmtUntyped.Sid
statement.Effect = stmtUntyped.Effect
statement.Conditions = stmtUntyped.Condition
// AWS user can have two different types, either as []string
// and either as regular 'string'. We fall back to doing this
// since there is no other easier way to fix this.
err = json.Unmarshal(stmtUntyped.Principal.AWS, &statement.Principal.AWS)
if err != nil {
var awsUser string
err = json.Unmarshal(stmtUntyped.Principal.AWS, &awsUser)
if err != nil {
return BucketAccessPolicy{}, err
}
statement.Principal.AWS = []string{awsUser}
}
// Actions can have two different types, either as []string
// and either as regular 'string'. We fall back to doing this
// since there is no other easier way to fix this.
err = json.Unmarshal(stmtUntyped.Action, &statement.Actions)
if err != nil {
var action string
err = json.Unmarshal(stmtUntyped.Action, &action)
if err != nil {
return BucketAccessPolicy{}, err
}
statement.Actions = []string{action}
}
// Resources can have two different types, either as []string
// and either as regular 'string'. We fall back to doing this
// since there is no other easier way to fix this.
err = json.Unmarshal(stmtUntyped.Resource, &statement.Resources)
if err != nil {
var resource string
err = json.Unmarshal(stmtUntyped.Resource, &resource)
if err != nil {
return BucketAccessPolicy{}, err
}
statement.Resources = []string{resource}
}
// Append the typed policy.
policy.Statements = append(policy.Statements, statement)
}
return policy, nil
}

View File

@@ -124,7 +124,7 @@ func postPresignSignatureV2(policyBase64, secretAccessKey string) string {
// Signature = Base64( HMAC-SHA1( YourSecretAccessKeyID, UTF-8-Encoding-Of( StringToSign ) ) );
//
// StringToSign = HTTP-Verb + "\n" +
// Content-MD5 + "\n" +
// Content-Md5 + "\n" +
// Content-Type + "\n" +
// Date + "\n" +
// CanonicalizedProtocolHeaders +
@@ -172,7 +172,7 @@ func signV2(req http.Request, accessKeyID, secretAccessKey string) *http.Request
// From the Amazon docs:
//
// StringToSign = HTTP-Verb + "\n" +
// Content-MD5 + "\n" +
// Content-Md5 + "\n" +
// Content-Type + "\n" +
// Date + "\n" +
// CanonicalizedProtocolHeaders +
@@ -192,7 +192,7 @@ func getStringToSignV2(req http.Request) string {
func writeDefaultHeaders(buf *bytes.Buffer, req http.Request) {
buf.WriteString(req.Method)
buf.WriteByte('\n')
buf.WriteString(req.Header.Get("Content-MD5"))
buf.WriteString(req.Header.Get("Content-Md5"))
buf.WriteByte('\n')
buf.WriteString(req.Header.Get("Content-Type"))
buf.WriteByte('\n')

View File

@@ -17,8 +17,10 @@
package minio
import (
"math/rand"
"net"
"net/http"
"net/url"
"strings"
"time"
)
@@ -33,19 +35,36 @@ const NoJitter = 0.0
// newRetryTimer creates a timer with exponentially increasing delays
// until the maximum retry attempts are reached.
func newRetryTimer(maxRetry int, unit time.Duration, cap time.Duration, jitter float64) <-chan int {
func (c Client) newRetryTimer(maxRetry int, unit time.Duration, cap time.Duration, jitter float64) <-chan int {
attemptCh := make(chan int)
// Seed random function with current unix nano time.
rand.Seed(time.Now().UTC().UnixNano())
// computes the exponential backoff duration according to
// https://www.awsarchitectureblog.com/2015/03/backoff.html
exponentialBackoffWait := func(attempt int) time.Duration {
// normalize jitter to the range [0, 1.0]
if jitter < NoJitter {
jitter = NoJitter
}
if jitter > MaxJitter {
jitter = MaxJitter
}
//sleep = random_between(0, min(cap, base * 2 ** attempt))
sleep := unit * time.Duration(1<<uint(attempt))
if sleep > cap {
sleep = cap
}
if jitter != NoJitter {
sleep -= time.Duration(c.random.Float64() * float64(sleep) * jitter)
}
return sleep
}
go func() {
defer close(attemptCh)
for i := 0; i < maxRetry; i++ {
attemptCh <- i + 1 // Attempts start from 1.
// Grow the interval at an exponential rate,
// starting at unit and capping at cap
time.Sleep(exponentialBackoffWait(unit, i, cap, jitter))
time.Sleep(exponentialBackoffWait(i))
}
}()
return attemptCh
@@ -56,40 +75,47 @@ func isNetErrorRetryable(err error) bool {
switch err.(type) {
case *net.DNSError, *net.OpError, net.UnknownNetworkError:
return true
case *url.Error:
// For a URL error, where it replies back "connection closed"
// retry again.
if strings.Contains(err.Error(), "Connection closed by foreign host") {
return true
}
}
return false
}
// computes the exponential backoff duration according to
// https://www.awsarchitectureblog.com/2015/03/backoff.html
func exponentialBackoffWait(base time.Duration, attempt int, cap time.Duration, jitter float64) time.Duration {
// normalize jitter to the range [0, 1.0]
if jitter < NoJitter {
jitter = NoJitter
}
if jitter > MaxJitter {
jitter = MaxJitter
}
//sleep = random_between(0, min(cap, base * 2 ** attempt))
sleep := base * time.Duration(1<<uint(attempt))
if sleep > cap {
sleep = cap
}
if jitter != NoJitter {
sleep -= time.Duration(rand.Float64() * float64(sleep) * jitter)
}
return sleep
// List of AWS S3 error codes which are retryable.
var retryableS3Codes = map[string]struct{}{
"RequestError": {},
"RequestTimeout": {},
"Throttling": {},
"ThrottlingException": {},
"RequestLimitExceeded": {},
"RequestThrottled": {},
"InternalError": {},
"ExpiredToken": {},
"ExpiredTokenException": {},
// Add more AWS S3 codes here.
}
// isS3CodeRetryable - is s3 error code retryable.
func isS3CodeRetryable(s3Code string) bool {
switch s3Code {
case "RequestError", "RequestTimeout", "Throttling", "ThrottlingException":
fallthrough
case "RequestLimitExceeded", "RequestThrottled", "InternalError":
fallthrough
case "ExpiredToken", "ExpiredTokenException":
return true
}
return false
func isS3CodeRetryable(s3Code string) (ok bool) {
_, ok = retryableS3Codes[s3Code]
return ok
}
// List of HTTP status codes which are retryable.
var retryableHTTPStatusCodes = map[int]struct{}{
429: {}, // http.StatusTooManyRequests is not part of the Go 1.5 library, yet
http.StatusInternalServerError: {},
http.StatusBadGateway: {},
http.StatusServiceUnavailable: {},
// Add more HTTP status codes here.
}
// isHTTPStatusRetryable - is HTTP error code retryable.
func isHTTPStatusRetryable(httpStatusCode int) (ok bool) {
_, ok = retryableHTTPStatusCodes[httpStatusCode]
return ok
}

View File

@@ -19,6 +19,7 @@ package minio
import (
"bytes"
"crypto/hmac"
"crypto/md5"
"crypto/sha256"
"encoding/hex"
"encoding/xml"
@@ -47,6 +48,13 @@ func sum256(data []byte) []byte {
return hash.Sum(nil)
}
// sumMD5 calculate sumMD5 sum for an input byte array.
func sumMD5(data []byte) []byte {
hash := md5.New()
hash.Write(data)
return hash.Sum(nil)
}
// sumHMAC calculate hmac between two input byte array.
func sumHMAC(key []byte, data []byte) []byte {
hash := hmac.New(sha256.New, key)