Implement backblaze-b2 gateway support (#5002)

Fixes https://github.com/minio/minio/issues/4072
This commit is contained in:
Harshavardhana
2017-10-13 03:56:16 -07:00
committed by Nitish Tiwari
parent 3d0dced23c
commit 0c0d1e4150
35 changed files with 2711 additions and 11416 deletions

View File

@@ -70,6 +70,14 @@ func setObjectHeaders(w http.ResponseWriter, objInfo ObjectInfo, contentRange *h
w.Header().Set("ETag", "\""+objInfo.ETag+"\"")
}
if objInfo.ContentType != "" {
w.Header().Set("Content-Type", objInfo.ContentType)
}
if objInfo.ContentEncoding != "" {
w.Header().Set("Content-Encoding", objInfo.ContentEncoding)
}
// Set all other user defined metadata.
for k, v := range objInfo.UserDefined {
w.Header().Set(k, v)

133
cmd/gateway-b2-anonymous.go Normal file
View File

@@ -0,0 +1,133 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"time"
)
// mkRange converts offset, size into Range header equivalent.
func mkRange(offset, size int64) string {
if offset == 0 && size == 0 {
return ""
}
if size == 0 {
return fmt.Sprintf("%s%d-", byteRangePrefix, offset)
}
return fmt.Sprintf("%s%d-%d", byteRangePrefix, offset, offset+size-1)
}
// AnonGetObject - performs a plain http GET request on a public resource,
// fails if the resource is not public.
func (l *b2Objects) AnonGetObject(bucket string, object string, startOffset int64, length int64, writer io.Writer) error {
uri := fmt.Sprintf("%s/file/%s/%s", l.b2Client.DownloadURI, bucket, object)
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
return b2ToObjectError(traceError(err), bucket, object)
}
rng := mkRange(startOffset, length)
if rng != "" {
req.Header.Set("Range", rng)
}
resp, err := l.anonClient.Do(req)
if err != nil {
return b2ToObjectError(traceError(err), bucket, object)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return b2ToObjectError(traceError(errors.New(resp.Status)), bucket, object)
}
_, err = io.Copy(writer, resp.Body)
return b2ToObjectError(traceError(err), bucket, object)
}
// Converts http Header into ObjectInfo. This function looks for all the
// standard Backblaze B2 headers to convert into ObjectInfo.
//
// Content-Length is converted to Size.
// X-Bz-Upload-Timestamp is converted to ModTime.
// X-Bz-Info-<header>:<value> is converted to <header>:<value>
// Content-Type is converted to ContentType.
// X-Bz-Content-Sha1 is converted to ETag.
func headerToObjectInfo(bucket, object string, header http.Header) (objInfo ObjectInfo, err error) {
clen, err := strconv.ParseInt(header.Get("Content-Length"), 10, 64)
if err != nil {
return objInfo, b2ToObjectError(traceError(err), bucket, object)
}
// Converting upload timestamp in milliseconds to a time.Time value for ObjectInfo.ModTime.
timeStamp, err := strconv.ParseInt(header.Get("X-Bz-Upload-Timestamp"), 10, 64)
if err != nil {
return objInfo, b2ToObjectError(traceError(err), bucket, object)
}
// Populate user metadata by looking for all the X-Bz-Info-<name>
// HTTP headers, ignore other headers since they have their own
// designated meaning, for more details refer B2 API documentation.
userMetadata := make(map[string]string)
for key := range header {
if strings.HasPrefix(key, "X-Bz-Info-") {
var name string
name, err = url.QueryUnescape(strings.TrimPrefix(key, "X-Bz-Info-"))
if err != nil {
return objInfo, b2ToObjectError(traceError(err), bucket, object)
}
var val string
val, err = url.QueryUnescape(header.Get(key))
if err != nil {
return objInfo, b2ToObjectError(traceError(err), bucket, object)
}
userMetadata[name] = val
}
}
return ObjectInfo{
Bucket: bucket,
Name: object,
ContentType: header.Get("Content-Type"),
ModTime: time.Unix(0, 0).Add(time.Duration(timeStamp) * time.Millisecond),
Size: clen,
ETag: header.Get("X-Bz-File-Id"),
UserDefined: userMetadata,
}, nil
}
// AnonGetObjectInfo - performs a plain http HEAD request on a public resource,
// fails if the resource is not public.
func (l *b2Objects) AnonGetObjectInfo(bucket string, object string) (objInfo ObjectInfo, err error) {
uri := fmt.Sprintf("%s/file/%s/%s", l.b2Client.DownloadURI, bucket, object)
req, err := http.NewRequest("HEAD", uri, nil)
if err != nil {
return objInfo, b2ToObjectError(traceError(err), bucket, object)
}
resp, err := l.anonClient.Do(req)
if err != nil {
return objInfo, b2ToObjectError(traceError(err), bucket, object)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return objInfo, b2ToObjectError(traceError(errors.New(resp.Status)), bucket, object)
}
return headerToObjectInfo(bucket, object, resp.Header)
}

703
cmd/gateway-b2.go Normal file
View File

@@ -0,0 +1,703 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"context"
"crypto/sha1"
"fmt"
"hash"
"io"
"io/ioutil"
"net/http"
"strings"
"sync"
"time"
b2 "github.com/minio/blazer/base"
"github.com/minio/minio-go/pkg/policy"
)
// Supported bucket types by B2 backend.
const (
bucketTypePrivate = "allPrivate"
bucketTypeReadOnly = "allPublic"
)
// b2Object implements gateway for Minio and BackBlaze B2 compatible object storage servers.
type b2Objects struct {
gatewayUnsupported
mu sync.Mutex
creds credential
b2Client *b2.B2
anonClient *http.Client
ctx context.Context
}
// newB2Gateway returns b2 gateway layer, implements GatewayLayer interface to
// talk to B2 remote backend.
func newB2Gateway() (GatewayLayer, error) {
ctx := context.Background()
creds := serverConfig.GetCredential()
client, err := b2.AuthorizeAccount(ctx, creds.AccessKey, creds.SecretKey, b2.Transport(newCustomHTTPTransport()))
if err != nil {
return nil, err
}
return &b2Objects{
creds: creds,
b2Client: client,
anonClient: &http.Client{
Transport: newCustomHTTPTransport(),
},
ctx: ctx,
}, nil
}
// Convert B2 errors to minio object layer errors.
func b2ToObjectError(err error, params ...string) error {
if err == nil {
return nil
}
e, ok := err.(*Error)
if !ok {
// Code should be fixed if this function is called without doing traceError()
// Else handling different situations in this function makes this function complicated.
errorIf(err, "Expected type *Error")
return err
}
err = e.e
bucket := ""
object := ""
uploadID := ""
if len(params) >= 1 {
bucket = params[0]
}
if len(params) == 2 {
object = params[1]
}
if len(params) == 3 {
uploadID = params[2]
}
// Following code is a non-exhaustive check to convert
// B2 errors into S3 compatible errors.
//
// For a more complete information - https://www.backblaze.com/b2/docs/
statusCode, code, msg := b2.Code(err)
if statusCode == 0 {
// We don't interpret non B2 errors. B2 errors have statusCode
// to help us convert them to S3 object errors.
return e
}
switch code {
case "duplicate_bucket_name":
err = BucketAlreadyOwnedByYou{Bucket: bucket}
case "bad_request":
if object != "" {
err = ObjectNameInvalid{bucket, object}
} else if bucket != "" {
err = BucketNotFound{Bucket: bucket}
}
case "bad_bucket_id":
err = BucketNotFound{Bucket: bucket}
case "file_not_present", "not_found":
err = ObjectNotFound{bucket, object}
case "cannot_delete_non_empty_bucket":
err = BucketNotEmpty{bucket, ""}
}
// Special interpretation like this is required for Multipart sessions.
if strings.Contains(msg, "No active upload for") && uploadID != "" {
err = InvalidUploadID{uploadID}
}
e.e = err
return e
}
// Shutdown saves any gateway metadata to disk
// if necessary and reload upon next restart.
func (l *b2Objects) Shutdown() error {
// TODO
return nil
}
// StorageInfo is not relevant to B2 backend.
func (l *b2Objects) StorageInfo() (si StorageInfo) {
return si
}
// MakeBucket creates a new container on B2 backend.
func (l *b2Objects) MakeBucketWithLocation(bucket, location string) error {
// location is ignored for B2 backend.
// All buckets are set to private by default.
_, err := l.b2Client.CreateBucket(l.ctx, bucket, bucketTypePrivate, nil, nil)
return b2ToObjectError(traceError(err), bucket)
}
func (l *b2Objects) reAuthorizeAccount() error {
client, err := b2.AuthorizeAccount(l.ctx, l.creds.AccessKey, l.creds.SecretKey, b2.Transport(newCustomHTTPTransport()))
if err != nil {
return err
}
l.mu.Lock()
l.b2Client.Update(client)
l.mu.Unlock()
return nil
}
// listBuckets is a wrapper similar to ListBuckets, which re-authorizes
// the account and updates the B2 client safely. Once successfully
// authorized performs the call again and returns list of buckets.
// For any errors which are not actionable we return an error.
func (l *b2Objects) listBuckets(err error) ([]*b2.Bucket, error) {
if err != nil {
if b2.Action(err) != b2.ReAuthenticate {
return nil, err
}
if rerr := l.reAuthorizeAccount(); rerr != nil {
return nil, rerr
}
}
bktList, lerr := l.b2Client.ListBuckets(l.ctx)
if lerr != nil {
return l.listBuckets(lerr)
}
return bktList, nil
}
// Bucket - is a helper which provides a *Bucket instance
// for performing an API operation. B2 API doesn't
// provide a direct way to access the bucket so we need
// to employ following technique.
func (l *b2Objects) Bucket(bucket string) (*b2.Bucket, error) {
bktList, err := l.listBuckets(nil)
if err != nil {
return nil, b2ToObjectError(traceError(err), bucket)
}
for _, bkt := range bktList {
if bkt.Name == bucket {
return bkt, nil
}
}
return nil, traceError(BucketNotFound{Bucket: bucket})
}
// GetBucketInfo gets bucket metadata..
func (l *b2Objects) GetBucketInfo(bucket string) (bi BucketInfo, err error) {
if _, err = l.Bucket(bucket); err != nil {
return bi, err
}
return BucketInfo{
Name: bucket,
Created: time.Unix(0, 0),
}, nil
}
// ListBuckets lists all B2 buckets
func (l *b2Objects) ListBuckets() ([]BucketInfo, error) {
bktList, err := l.listBuckets(nil)
if err != nil {
return nil, err
}
var bktInfo []BucketInfo
for _, bkt := range bktList {
bktInfo = append(bktInfo, BucketInfo{
Name: bkt.Name,
Created: time.Unix(0, 0),
})
}
return bktInfo, nil
}
// DeleteBucket deletes a bucket on B2
func (l *b2Objects) DeleteBucket(bucket string) error {
bkt, err := l.Bucket(bucket)
if err != nil {
return err
}
err = bkt.DeleteBucket(l.ctx)
return b2ToObjectError(traceError(err), bucket)
}
// ListObjects lists all objects in B2 bucket filtered by prefix, returns upto at max 1000 entries at a time.
func (l *b2Objects) ListObjects(bucket string, prefix string, marker string, delimiter string, maxKeys int) (loi ListObjectsInfo, err error) {
bkt, err := l.Bucket(bucket)
if err != nil {
return loi, err
}
loi = ListObjectsInfo{}
files, next, lerr := bkt.ListFileNames(l.ctx, maxKeys, marker, prefix, delimiter)
if lerr != nil {
return loi, b2ToObjectError(traceError(lerr), bucket)
}
loi.IsTruncated = next != ""
loi.NextMarker = next
for _, file := range files {
switch file.Status {
case "folder":
loi.Prefixes = append(loi.Prefixes, file.Name)
case "upload":
loi.Objects = append(loi.Objects, ObjectInfo{
Bucket: bucket,
Name: file.Name,
ModTime: file.Timestamp,
Size: file.Size,
ETag: file.Info.ID,
ContentType: file.Info.ContentType,
UserDefined: file.Info.Info,
})
}
}
return loi, nil
}
// ListObjectsV2 lists all objects in B2 bucket filtered by prefix, returns upto max 1000 entries at a time.
func (l *b2Objects) ListObjectsV2(bucket, prefix, continuationToken, delimiter string, maxKeys int,
fetchOwner bool, startAfter string) (loi ListObjectsV2Info, err error) {
// fetchOwner, startAfter are not supported and unused.
bkt, err := l.Bucket(bucket)
if err != nil {
return loi, err
}
loi = ListObjectsV2Info{}
files, next, lerr := bkt.ListFileNames(l.ctx, maxKeys, continuationToken, prefix, delimiter)
if lerr != nil {
return loi, b2ToObjectError(traceError(lerr), bucket)
}
loi.IsTruncated = next != ""
loi.ContinuationToken = continuationToken
loi.NextContinuationToken = next
for _, file := range files {
switch file.Status {
case "folder":
loi.Prefixes = append(loi.Prefixes, file.Name)
case "upload":
loi.Objects = append(loi.Objects, ObjectInfo{
Bucket: bucket,
Name: file.Name,
ModTime: file.Timestamp,
Size: file.Size,
ETag: file.Info.ID,
ContentType: file.Info.ContentType,
UserDefined: file.Info.Info,
})
}
}
return loi, nil
}
// GetObject reads an object from B2. Supports additional
// parameters like offset and length which are synonymous with
// HTTP Range requests.
//
// startOffset indicates the starting read location of the object.
// length indicates the total length of the object.
func (l *b2Objects) GetObject(bucket string, object string, startOffset int64, length int64, writer io.Writer) error {
bkt, err := l.Bucket(bucket)
if err != nil {
return err
}
reader, err := bkt.DownloadFileByName(l.ctx, object, startOffset, length)
if err != nil {
return b2ToObjectError(traceError(err), bucket, object)
}
defer reader.Close()
_, err = io.Copy(writer, reader)
return b2ToObjectError(traceError(err), bucket, object)
}
// GetObjectInfo reads object info and replies back ObjectInfo
func (l *b2Objects) GetObjectInfo(bucket string, object string) (objInfo ObjectInfo, err error) {
bkt, err := l.Bucket(bucket)
if err != nil {
return objInfo, err
}
f, err := bkt.DownloadFileByName(l.ctx, object, 0, 1)
if err != nil {
return objInfo, b2ToObjectError(traceError(err), bucket, object)
}
f.Close()
fi, err := bkt.File(f.ID, object).GetFileInfo(l.ctx)
if err != nil {
return objInfo, b2ToObjectError(traceError(err), bucket, object)
}
objInfo = ObjectInfo{
Bucket: bucket,
Name: object,
ETag: fi.ID,
Size: fi.Size,
ModTime: fi.Timestamp,
ContentType: fi.ContentType,
UserDefined: fi.Info,
}
return objInfo, nil
}
// In B2 - You must always include the X-Bz-Content-Sha1 header with
// your upload request. The value you provide can be:
// (1) the 40-character hex checksum of the file,
// (2) the string hex_digits_at_end, or
// (3) the string do_not_verify.
// For more reference - https://www.backblaze.com/b2/docs/uploading.html
//
const (
sha1NoVerify = "do_not_verify"
sha1AtEOF = "hex_digits_at_end"
)
// With the second option mentioned above, you append the 40-character hex sha1
// to the end of the request body, immediately after the contents of the file
// being uploaded. Note that the content length is the size of the file plus 40
// of the original size of the reader.
//
// newB2Reader implements a B2 compatible reader by wrapping the HashReader into
// a new io.Reader which will emit out the sha1 hex digits at io.EOF.
// It also means that your overall content size is now original size + 40 bytes.
// Additionally this reader also verifies Hash encapsulated inside HashReader
// at io.EOF if the verification failed we return an error and do not send
// the content to server.
func newB2Reader(r *HashReader, size int64) *B2Reader {
return &B2Reader{
r: r,
size: size,
sha1Hash: sha1.New(),
}
}
// B2Reader - is a Reader wraps the HashReader which will emit out the sha1
// hex digits at io.EOF. It also means that your overall content size is
// now original size + 40 bytes. Additionally this reader also verifies
// Hash encapsulated inside HashReader at io.EOF if the verification
// failed we return an error and do not send the content to server.
type B2Reader struct {
r *HashReader
size int64
sha1Hash hash.Hash
isEOF bool
buf *strings.Reader
}
// Size - Returns the total size of Reader.
func (nb *B2Reader) Size() int64 { return nb.size + 40 }
func (nb *B2Reader) Read(p []byte) (int, error) {
if nb.isEOF {
return nb.buf.Read(p)
}
// Read into hash to update the on going checksum.
n, err := io.TeeReader(nb.r, nb.sha1Hash).Read(p)
if err == io.EOF {
// Verify checksum at io.EOF
if err = nb.r.Verify(); err != nil {
return n, err
}
// Stream is not corrupted on this end
// now fill in the last 40 bytes of sha1 hex
// so that the server can verify the stream on
// their end.
err = nil
nb.isEOF = true
nb.buf = strings.NewReader(fmt.Sprintf("%x", nb.sha1Hash.Sum(nil)))
}
return n, err
}
// PutObject uploads the single upload to B2 backend by using *b2_upload_file* API, uploads upto 5GiB.
func (l *b2Objects) PutObject(bucket string, object string, data *HashReader, metadata map[string]string) (ObjectInfo, error) {
var objInfo ObjectInfo
bkt, err := l.Bucket(bucket)
if err != nil {
return objInfo, err
}
contentType := metadata["content-type"]
delete(metadata, "content-type")
delete(metadata, "etag")
var u *b2.URL
u, err = bkt.GetUploadURL(l.ctx)
if err != nil {
return objInfo, b2ToObjectError(traceError(err), bucket, object)
}
hr := newB2Reader(data, data.Size())
var f *b2.File
f, err = u.UploadFile(l.ctx, hr, int(hr.Size()), object, contentType, sha1AtEOF, metadata)
if err != nil {
return objInfo, b2ToObjectError(traceError(err), bucket, object)
}
var fi *b2.FileInfo
fi, err = f.GetFileInfo(l.ctx)
if err != nil {
return objInfo, b2ToObjectError(traceError(err), bucket, object)
}
return ObjectInfo{
Bucket: bucket,
Name: object,
ETag: fi.ID,
Size: fi.Size,
ModTime: fi.Timestamp,
ContentType: fi.ContentType,
UserDefined: fi.Info,
}, nil
}
// CopyObject copies a blob from source container to destination container.
func (l *b2Objects) CopyObject(srcBucket string, srcObject string, dstBucket string,
dstObject string, metadata map[string]string) (objInfo ObjectInfo, err error) {
return objInfo, traceError(NotImplemented{})
}
// DeleteObject deletes a blob in bucket
func (l *b2Objects) DeleteObject(bucket string, object string) error {
bkt, err := l.Bucket(bucket)
if err != nil {
return err
}
reader, err := bkt.DownloadFileByName(l.ctx, object, 0, 1)
if err != nil {
return b2ToObjectError(traceError(err), bucket, object)
}
io.Copy(ioutil.Discard, reader)
reader.Close()
err = bkt.File(reader.ID, object).DeleteFileVersion(l.ctx)
return b2ToObjectError(traceError(err), bucket, object)
}
// ListMultipartUploads lists all multipart uploads.
func (l *b2Objects) ListMultipartUploads(bucket string, prefix string, keyMarker string, uploadIDMarker string,
delimiter string, maxUploads int) (lmi ListMultipartsInfo, err error) {
// keyMarker, prefix, delimiter are all ignored, Backblaze B2 doesn't support any
// of these parameters only equivalent parameter is uploadIDMarker.
bkt, err := l.Bucket(bucket)
if err != nil {
return lmi, err
}
// The maximum number of files to return from this call.
// The default value is 100, and the maximum allowed is 100.
if maxUploads > 100 {
maxUploads = 100
}
largeFiles, nextMarker, err := bkt.ListUnfinishedLargeFiles(l.ctx, uploadIDMarker, maxUploads)
if err != nil {
return lmi, b2ToObjectError(traceError(err), bucket)
}
lmi = ListMultipartsInfo{
MaxUploads: maxUploads,
}
if nextMarker != "" {
lmi.IsTruncated = true
lmi.NextUploadIDMarker = nextMarker
}
for _, largeFile := range largeFiles {
lmi.Uploads = append(lmi.Uploads, uploadMetadata{
Object: largeFile.Name,
UploadID: largeFile.ID,
Initiated: largeFile.Timestamp,
})
}
return lmi, nil
}
// NewMultipartUpload upload object in multiple parts, uses B2's LargeFile upload API.
// Large files can range in size from 5MB to 10TB.
// Each large file must consist of at least 2 parts, and all of the parts except the
// last one must be at least 5MB in size. The last part must contain at least one byte.
// For more information - https://www.backblaze.com/b2/docs/large_files.html
func (l *b2Objects) NewMultipartUpload(bucket string, object string, metadata map[string]string) (string, error) {
var uploadID string
bkt, err := l.Bucket(bucket)
if err != nil {
return uploadID, err
}
contentType := metadata["content-type"]
delete(metadata, "content-type")
lf, err := bkt.StartLargeFile(l.ctx, object, contentType, metadata)
if err != nil {
return uploadID, b2ToObjectError(traceError(err), bucket, object)
}
return lf.ID, nil
}
// CopyObjectPart copy part of object to other bucket and object.
func (l *b2Objects) CopyObjectPart(srcBucket string, srcObject string, destBucket string, destObject string,
uploadID string, partID int, startOffset int64, length int64) (info PartInfo, err error) {
return PartInfo{}, traceError(NotImplemented{})
}
// PutObjectPart puts a part of object in bucket, uses B2's LargeFile upload API.
func (l *b2Objects) PutObjectPart(bucket string, object string, uploadID string, partID int, data *HashReader) (pi PartInfo, err error) {
bkt, err := l.Bucket(bucket)
if err != nil {
return pi, err
}
fc, err := bkt.File(uploadID, object).CompileParts(0, nil).GetUploadPartURL(l.ctx)
if err != nil {
return pi, b2ToObjectError(traceError(err), bucket, object, uploadID)
}
hr := newB2Reader(data, data.Size())
sha1, err := fc.UploadPart(l.ctx, hr, sha1AtEOF, int(hr.Size()), partID)
if err != nil {
return pi, b2ToObjectError(traceError(err), bucket, object, uploadID)
}
return PartInfo{
PartNumber: partID,
LastModified: UTCNow(),
ETag: sha1,
Size: data.Size(),
}, nil
}
// ListObjectParts returns all object parts for specified object in specified bucket, uses B2's LargeFile upload API.
func (l *b2Objects) ListObjectParts(bucket string, object string, uploadID string, partNumberMarker int, maxParts int) (lpi ListPartsInfo, err error) {
bkt, err := l.Bucket(bucket)
if err != nil {
return lpi, err
}
lpi = ListPartsInfo{
Bucket: bucket,
Object: object,
UploadID: uploadID,
MaxParts: maxParts,
PartNumberMarker: partNumberMarker,
}
// startPartNumber must be in the range 1 - 10000 for B2.
partNumberMarker++
partsList, next, err := bkt.File(uploadID, object).ListParts(l.ctx, partNumberMarker, maxParts)
if err != nil {
return lpi, b2ToObjectError(traceError(err), bucket, object, uploadID)
}
if next != 0 {
lpi.IsTruncated = true
lpi.NextPartNumberMarker = next
}
for _, part := range partsList {
lpi.Parts = append(lpi.Parts, PartInfo{
PartNumber: part.Number,
ETag: part.SHA1,
Size: part.Size,
})
}
return lpi, nil
}
// AbortMultipartUpload aborts a on going multipart upload, uses B2's LargeFile upload API.
func (l *b2Objects) AbortMultipartUpload(bucket string, object string, uploadID string) error {
bkt, err := l.Bucket(bucket)
if err != nil {
return err
}
err = bkt.File(uploadID, object).CompileParts(0, nil).CancelLargeFile(l.ctx)
return b2ToObjectError(traceError(err), bucket, object, uploadID)
}
// CompleteMultipartUpload completes ongoing multipart upload and finalizes object, uses B2's LargeFile upload API.
func (l *b2Objects) CompleteMultipartUpload(bucket string, object string, uploadID string, uploadedParts []completePart) (oi ObjectInfo, err error) {
bkt, err := l.Bucket(bucket)
if err != nil {
return oi, err
}
hashes := make(map[int]string)
for i, uploadedPart := range uploadedParts {
// B2 requires contigous part numbers starting with 1, they do not support
// hand picking part numbers, we return an S3 compatible error instead.
if i+1 != uploadedPart.PartNumber {
return oi, b2ToObjectError(traceError(InvalidPart{}), bucket, object, uploadID)
}
hashes[uploadedPart.PartNumber] = uploadedPart.ETag
}
if _, err = bkt.File(uploadID, object).CompileParts(0, hashes).FinishLargeFile(l.ctx); err != nil {
return oi, b2ToObjectError(traceError(err), bucket, object, uploadID)
}
return l.GetObjectInfo(bucket, object)
}
// SetBucketPolicies - B2 supports 2 types of bucket policies:
// bucketType.AllPublic - bucketTypeReadOnly means that anybody can download the files is the bucket;
// bucketType.AllPrivate - bucketTypePrivate means that you need an authorization token to download them.
// Default is AllPrivate for all buckets.
func (l *b2Objects) SetBucketPolicies(bucket string, policyInfo policy.BucketAccessPolicy) error {
var policies []BucketAccessPolicy
for prefix, policy := range policy.GetPolicies(policyInfo.Statements, bucket) {
policies = append(policies, BucketAccessPolicy{
Prefix: prefix,
Policy: policy,
})
}
prefix := bucket + "/*" // For all objects inside the bucket.
if len(policies) != 1 {
return traceError(NotImplemented{})
}
if policies[0].Prefix != prefix {
return traceError(NotImplemented{})
}
if policies[0].Policy != policy.BucketPolicyReadOnly {
return traceError(NotImplemented{})
}
bkt, err := l.Bucket(bucket)
if err != nil {
return err
}
bkt.Type = bucketTypeReadOnly
_, err = bkt.Update(l.ctx)
return b2ToObjectError(traceError(err))
}
// GetBucketPolicies, returns the current bucketType from B2 backend and convert
// it into S3 compatible bucket policy info.
func (l *b2Objects) GetBucketPolicies(bucket string) (policy.BucketAccessPolicy, error) {
policyInfo := policy.BucketAccessPolicy{Version: "2012-10-17"}
bkt, err := l.Bucket(bucket)
if err != nil {
return policyInfo, err
}
if bkt.Type == bucketTypeReadOnly {
policyInfo.Statements = policy.SetPolicy(policyInfo.Statements, policy.BucketPolicyReadOnly, bucket, "")
return policyInfo, nil
}
// bkt.Type can also be snapshot, but it is only allowed through B2 browser console,
// just return back as policy not found for all cases.
// CreateBucket always sets the value to allPrivate by default.
return policy.BucketAccessPolicy{}, traceError(PolicyNotFound{Bucket: bucket})
}
// DeleteBucketPolicies - resets the bucketType of bucket on B2 to 'allPrivate'.
func (l *b2Objects) DeleteBucketPolicies(bucket string) error {
bkt, err := l.Bucket(bucket)
if err != nil {
return err
}
bkt.Type = bucketTypePrivate
_, err = bkt.Update(l.ctx)
return b2ToObjectError(traceError(err))
}

104
cmd/gateway-b2_test.go Normal file
View File

@@ -0,0 +1,104 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"net/http"
"testing"
)
// Tests headerToObjectInfo
func TestHeaderToObjectInfo(t *testing.T) {
testCases := []struct {
bucket, object string
header http.Header
objInfo ObjectInfo
}{
{
bucket: "bucket",
object: "object",
header: http.Header{
"Content-Length": []string{"10"},
"Content-Type": []string{"application/javascript"},
"X-Bz-Upload-Timestamp": []string{"1000"},
"X-Bz-Info-X-Amz-Meta-1": []string{"test1"},
"X-Bz-File-Id": []string{"xxxxx"},
},
objInfo: ObjectInfo{
Bucket: "bucket",
Name: "object",
ContentType: "application/javascript",
Size: 10,
UserDefined: map[string]string{
"X-Amz-Meta-1": "test1",
},
ETag: "xxxxx",
},
},
}
for i, testCase := range testCases {
gotObjInfo, err := headerToObjectInfo(testCase.bucket, testCase.object, testCase.header)
if err != nil {
t.Fatalf("Test %d: %s", i+1, err)
}
if gotObjInfo.Bucket != testCase.objInfo.Bucket {
t.Errorf("Test %d: Expected %s, got %s", i+1, testCase.objInfo.Bucket, gotObjInfo.Bucket)
}
if gotObjInfo.Name != testCase.objInfo.Name {
t.Errorf("Test %d: Expected %s, got %s", i+1, testCase.objInfo.Name, gotObjInfo.Name)
}
if gotObjInfo.ContentType != testCase.objInfo.ContentType {
t.Errorf("Test %d: Expected %s, got %s", i+1, testCase.objInfo.ContentType, gotObjInfo.ContentType)
}
if gotObjInfo.ETag != testCase.objInfo.ETag {
t.Errorf("Test %d: Expected %s, got %s", i+1, testCase.objInfo.ETag, gotObjInfo.ETag)
}
}
}
// Tests mkRange test.
func TestMkRange(t *testing.T) {
testCases := []struct {
offset, size int64
expectedRng string
}{
// No offset set, size not set.
{
offset: 0,
size: 0,
expectedRng: "",
},
// Offset set, size not set.
{
offset: 10,
size: 0,
expectedRng: "bytes=10-",
},
// Offset set, size set.
{
offset: 10,
size: 11,
expectedRng: "bytes=10-20",
},
}
for i, testCase := range testCases {
gotRng := mkRange(testCase.offset, testCase.size)
if gotRng != testCase.expectedRng {
t.Errorf("Test %d: expected %s, got %s", i+1, testCase.expectedRng, gotRng)
}
}
}

View File

@@ -1,27 +0,0 @@
/*
* Minio Cloud Storage, (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import "errors"
var (
// Project ID format is not valid.
errGCSInvalidProjectID = errors.New("GCS project id is either empty or invalid")
// Project ID not found
errGCSProjectIDNotFound = errors.New("unknown project id")
)

View File

@@ -21,6 +21,7 @@ import (
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"math"
@@ -39,6 +40,14 @@ import (
"github.com/minio/minio-go/pkg/policy"
)
var (
// Project ID format is not valid.
errGCSInvalidProjectID = errors.New("GCS project id is either empty or invalid")
// Project ID not found
errGCSProjectIDNotFound = errors.New("unknown project id")
)
const (
// Path where multipart objects are saved.
// If we change the backend format we will use a different url path like /multipart/v2

View File

@@ -125,6 +125,31 @@ EXAMPLES:
`
const b2GatewayTemplate = `NAME:
{{.HelpName}} - {{.Usage}}
USAGE:
{{.HelpName}} {{if .VisibleFlags}}[FLAGS]{{end}}
{{if .VisibleFlags}}
FLAGS:
{{range .VisibleFlags}}{{.}}
{{end}}{{end}}
ENVIRONMENT VARIABLES:
ACCESS:
MINIO_ACCESS_KEY: B2 account id.
MINIO_SECRET_KEY: B2 application key.
BROWSER:
MINIO_BROWSER: To disable web browser access, set this value to "off".
EXAMPLES:
1. Start minio gateway server for B2 backend.
$ export MINIO_ACCESS_KEY=accountID
$ export MINIO_SECRET_KEY=applicationKey
$ {{.HelpName}}
`
var (
azureBackendCmd = cli.Command{
Name: "azure",
@@ -143,6 +168,7 @@ var (
Flags: append(serverFlags, globalFlags...),
HideHelpCommand: true,
}
gcsBackendCmd = cli.Command{
Name: "gcs",
Usage: "Google Cloud Storage.",
@@ -152,12 +178,21 @@ var (
HideHelpCommand: true,
}
b2BackendCmd = cli.Command{
Name: "b2",
Usage: "Backblaze B2.",
Action: b2GatewayMain,
CustomHelpTemplate: b2GatewayTemplate,
Flags: append(serverFlags, globalFlags...),
HideHelpCommand: true,
}
gatewayCmd = cli.Command{
Name: "gateway",
Usage: "Start object storage gateway.",
Flags: append(serverFlags, globalFlags...),
HideHelpCommand: true,
Subcommands: []cli.Command{azureBackendCmd, s3BackendCmd, gcsBackendCmd},
Subcommands: []cli.Command{azureBackendCmd, s3BackendCmd, gcsBackendCmd, b2BackendCmd},
}
)
@@ -168,6 +203,7 @@ const (
azureBackend gatewayBackend = "azure"
s3Backend gatewayBackend = "s3"
gcsBackend gatewayBackend = "gcs"
b2Backend gatewayBackend = "b2"
// Add more backends here.
)
@@ -177,6 +213,7 @@ const (
// - Azure Blob Storage.
// - AWS S3.
// - Google Cloud Storage.
// - Backblaze B2.
// - Add your favorite backend here.
func newGatewayLayer(backendType gatewayBackend, arg string) (GatewayLayer, error) {
switch backendType {
@@ -189,6 +226,11 @@ func newGatewayLayer(backendType gatewayBackend, arg string) (GatewayLayer, erro
// will be removed when gcs is ready for production use.
log.Println(colorYellow("\n *** Warning: Not Ready for Production ***"))
return newGCSGateway(arg)
case b2Backend:
// FIXME: The following print command is temporary and
// will be removed when B2 is ready for production use.
log.Println(colorYellow("\n *** Warning: Not Ready for Production ***"))
return newB2Gateway()
}
return nil, fmt.Errorf("Unrecognized backend type %s", backendType)
@@ -285,6 +327,17 @@ func gcsGatewayMain(ctx *cli.Context) {
gatewayMain(ctx, gcsBackend)
}
func b2GatewayMain(ctx *cli.Context) {
if ctx.Args().Present() && ctx.Args().First() == "help" {
cli.ShowCommandHelpAndExit(ctx, "b2", 1)
}
// Validate gateway arguments.
fatalIf(validateGatewayArguments(ctx.GlobalString("address"), ctx.Args().First()), "Invalid argument")
gatewayMain(ctx, b2Backend)
}
// Handler for 'minio gateway'.
func gatewayMain(ctx *cli.Context, backendType gatewayBackend) {
// Get quiet flag from command line argument.
@@ -393,6 +446,8 @@ func gatewayMain(ctx *cli.Context, backendType gatewayBackend) {
mode = globalMinioModeGatewayGCS
case s3Backend:
mode = globalMinioModeGatewayS3
case b2Backend:
mode = globalMinioModeGatewayB2
}
// Check update mode.

View File

@@ -26,12 +26,6 @@ func (a gatewayUnsupported) CopyObjectPart(srcBucket, srcObject, destBucket, des
return info, traceError(NotImplemented{})
}
// AnonPutObject creates a new object anonymously with the incoming data,
func (a gatewayUnsupported) AnonPutObject(bucket, object string, size int64, data io.Reader,
metadata map[string]string, sha256sum string) (ObjectInfo, error) {
return ObjectInfo{}, traceError(NotImplemented{})
}
// HealBucket - Not relevant.
func (a gatewayUnsupported) HealBucket(bucket string) error {
return traceError(NotImplemented{})
@@ -57,3 +51,26 @@ func (a gatewayUnsupported) ListUploadsHeal(bucket, prefix, marker, uploadIDMark
delimiter string, maxUploads int) (lmi ListMultipartsInfo, e error) {
return lmi, traceError(NotImplemented{})
}
// AnonListObjects - List objects anonymously
func (a gatewayUnsupported) AnonListObjects(bucket string, prefix string, marker string, delimiter string,
maxKeys int) (loi ListObjectsInfo, err error) {
return loi, traceError(NotImplemented{})
}
// AnonListObjectsV2 - List objects in V2 mode, anonymously
func (a gatewayUnsupported) AnonListObjectsV2(bucket, prefix, continuationToken, delimiter string, maxKeys int,
fetchOwner bool, startAfter string) (loi ListObjectsV2Info, err error) {
return loi, traceError(NotImplemented{})
}
// AnonGetBucketInfo - Get bucket metadata anonymously.
func (a gatewayUnsupported) AnonGetBucketInfo(bucket string) (bi BucketInfo, err error) {
return bi, traceError(NotImplemented{})
}
// AnonPutObject creates a new object anonymously with the incoming data,
func (a gatewayUnsupported) AnonPutObject(bucket, object string, size int64, data io.Reader,
metadata map[string]string, sha256sum string) (ObjectInfo, error) {
return ObjectInfo{}, traceError(NotImplemented{})
}

View File

@@ -53,6 +53,7 @@ const (
globalMinioModeGatewayAzure = "mode-gateway-azure"
globalMinioModeGatewayS3 = "mode-gateway-s3"
globalMinioModeGatewayGCS = "mode-gateway-gcs"
globalMinioModeGatewayB2 = "mode-gateway-b2"
// globalMinioSysTmp prefix is used in Azure/GCS gateway for save metadata sent by Initialize Multipart Upload API.
globalMinioSysTmp = "minio.sys.tmp/"