mirror of
https://github.com/minio/minio.git
synced 2025-11-07 04:42:56 -05:00
Remove gateway implementations for manta, sia and b2 (#7115)
This commit is contained in:
committed by
kannappanr
parent
4fdacb8b14
commit
3265112d04
@@ -1,850 +0,0 @@
|
||||
/*
|
||||
* Minio Cloud Storage, (C) 2017, 2018 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package b2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha1"
|
||||
"fmt"
|
||||
"hash"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
b2 "github.com/minio/blazer/base"
|
||||
"github.com/minio/cli"
|
||||
miniogopolicy "github.com/minio/minio-go/pkg/policy"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/auth"
|
||||
h2 "github.com/minio/minio/pkg/hash"
|
||||
"github.com/minio/minio/pkg/policy"
|
||||
"github.com/minio/minio/pkg/policy/condition"
|
||||
|
||||
minio "github.com/minio/minio/cmd"
|
||||
)
|
||||
|
||||
// Supported bucket types by B2 backend.
|
||||
const (
|
||||
bucketTypePrivate = "allPrivate"
|
||||
bucketTypeReadOnly = "allPublic"
|
||||
b2Backend = "b2"
|
||||
)
|
||||
|
||||
func init() {
|
||||
const b2GatewayTemplate = `NAME:
|
||||
{{.HelpName}} - {{.Usage}}
|
||||
|
||||
USAGE:
|
||||
{{.HelpName}} {{if .VisibleFlags}}[FLAGS]{{end}}
|
||||
{{if .VisibleFlags}}
|
||||
FLAGS:
|
||||
{{range .VisibleFlags}}{{.}}
|
||||
{{end}}{{end}}
|
||||
ENVIRONMENT VARIABLES:
|
||||
ACCESS:
|
||||
MINIO_ACCESS_KEY: B2 account id.
|
||||
MINIO_SECRET_KEY: B2 application key.
|
||||
|
||||
BROWSER:
|
||||
MINIO_BROWSER: To disable web browser access, set this value to "off".
|
||||
|
||||
DOMAIN:
|
||||
MINIO_DOMAIN: To enable virtual-host-style requests, set this value to Minio host domain name.
|
||||
|
||||
CACHE:
|
||||
MINIO_CACHE_DRIVES: List of mounted drives or directories delimited by ";".
|
||||
MINIO_CACHE_EXCLUDE: List of cache exclusion patterns delimited by ";".
|
||||
MINIO_CACHE_EXPIRY: Cache expiry duration in days.
|
||||
MINIO_CACHE_MAXUSE: Maximum permitted usage of the cache in percentage (0-100).
|
||||
|
||||
EXAMPLES:
|
||||
1. Start minio gateway server for B2 backend.
|
||||
$ export MINIO_ACCESS_KEY=accountID
|
||||
$ export MINIO_SECRET_KEY=applicationKey
|
||||
$ {{.HelpName}}
|
||||
|
||||
2. Start minio gateway server for B2 backend with edge caching enabled.
|
||||
$ export MINIO_ACCESS_KEY=accountID
|
||||
$ export MINIO_SECRET_KEY=applicationKey
|
||||
$ export MINIO_CACHE_DRIVES="/mnt/drive1;/mnt/drive2;/mnt/drive3;/mnt/drive4"
|
||||
$ export MINIO_CACHE_EXCLUDE="bucket1/*;*.png"
|
||||
$ export MINIO_CACHE_EXPIRY=40
|
||||
$ export MINIO_CACHE_MAXUSE=80
|
||||
$ {{.HelpName}}
|
||||
`
|
||||
minio.RegisterGatewayCommand(cli.Command{
|
||||
Name: b2Backend,
|
||||
Usage: "Backblaze B2",
|
||||
Action: b2GatewayMain,
|
||||
CustomHelpTemplate: b2GatewayTemplate,
|
||||
HideHelpCommand: true,
|
||||
})
|
||||
}
|
||||
|
||||
// Handler for 'minio gateway b2' command line.
|
||||
func b2GatewayMain(ctx *cli.Context) {
|
||||
minio.StartGateway(ctx, &B2{})
|
||||
}
|
||||
|
||||
// B2 implements Minio Gateway
|
||||
type B2 struct{}
|
||||
|
||||
// Name implements Gateway interface.
|
||||
func (g *B2) Name() string {
|
||||
return b2Backend
|
||||
}
|
||||
|
||||
// NewGatewayLayer returns b2 gateway layer, implements ObjectLayer interface to
|
||||
// talk to B2 remote backend.
|
||||
func (g *B2) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error) {
|
||||
ctx := context.Background()
|
||||
client, err := b2.AuthorizeAccount(ctx, creds.AccessKey, creds.SecretKey, b2.Transport(minio.NewCustomHTTPTransport()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &b2Objects{
|
||||
creds: creds,
|
||||
b2Client: client,
|
||||
ctx: ctx,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Production - Ready for production use.
|
||||
func (g *B2) Production() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// b2Object implements gateway for Minio and BackBlaze B2 compatible object storage servers.
|
||||
type b2Objects struct {
|
||||
minio.GatewayUnsupported
|
||||
mu sync.Mutex
|
||||
creds auth.Credentials
|
||||
b2Client *b2.B2
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
// Convert B2 errors to minio object layer errors.
|
||||
func b2ToObjectError(err error, params ...string) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
bucket := ""
|
||||
object := ""
|
||||
uploadID := ""
|
||||
if len(params) >= 1 {
|
||||
bucket = params[0]
|
||||
}
|
||||
if len(params) == 2 {
|
||||
object = params[1]
|
||||
}
|
||||
if len(params) == 3 {
|
||||
uploadID = params[2]
|
||||
}
|
||||
|
||||
// Following code is a non-exhaustive check to convert
|
||||
// B2 errors into S3 compatible errors.
|
||||
//
|
||||
// For a more complete information - https://www.backblaze.com/b2/docs/
|
||||
statusCode, code, msg := b2.Code(err)
|
||||
if statusCode == 0 {
|
||||
// We don't interpret non B2 errors. B2 errors have statusCode
|
||||
// to help us convert them to S3 object errors.
|
||||
return err
|
||||
}
|
||||
|
||||
switch code {
|
||||
case "duplicate_bucket_name":
|
||||
err = minio.BucketAlreadyOwnedByYou{Bucket: bucket}
|
||||
case "bad_request":
|
||||
if object != "" {
|
||||
err = minio.ObjectNameInvalid{
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
}
|
||||
} else if bucket != "" {
|
||||
err = minio.BucketNotFound{Bucket: bucket}
|
||||
}
|
||||
case "bad_json":
|
||||
if object != "" {
|
||||
err = minio.ObjectNameInvalid{
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
}
|
||||
} else if bucket != "" {
|
||||
err = minio.BucketNameInvalid{Bucket: bucket}
|
||||
}
|
||||
case "bad_bucket_id":
|
||||
err = minio.BucketNotFound{Bucket: bucket}
|
||||
case "file_not_present", "not_found":
|
||||
err = minio.ObjectNotFound{
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
}
|
||||
case "cannot_delete_non_empty_bucket":
|
||||
err = minio.BucketNotEmpty{Bucket: bucket}
|
||||
}
|
||||
|
||||
// Special interpretation like this is required for Multipart sessions.
|
||||
if strings.Contains(msg, "No active upload for") && uploadID != "" {
|
||||
err = minio.InvalidUploadID{UploadID: uploadID}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Shutdown saves any gateway metadata to disk
|
||||
// if necessary and reload upon next restart.
|
||||
func (l *b2Objects) Shutdown(ctx context.Context) error {
|
||||
// TODO
|
||||
return nil
|
||||
}
|
||||
|
||||
// StorageInfo is not relevant to B2 backend.
|
||||
func (l *b2Objects) StorageInfo(ctx context.Context) (si minio.StorageInfo) {
|
||||
return si
|
||||
}
|
||||
|
||||
// MakeBucket creates a new container on B2 backend.
|
||||
func (l *b2Objects) MakeBucketWithLocation(ctx context.Context, bucket, location string) error {
|
||||
// location is ignored for B2 backend.
|
||||
|
||||
// All buckets are set to private by default.
|
||||
_, err := l.b2Client.CreateBucket(l.ctx, bucket, bucketTypePrivate, nil, nil)
|
||||
logger.LogIf(ctx, err)
|
||||
return b2ToObjectError(err, bucket)
|
||||
}
|
||||
|
||||
func (l *b2Objects) reAuthorizeAccount(ctx context.Context) error {
|
||||
client, err := b2.AuthorizeAccount(l.ctx, l.creds.AccessKey, l.creds.SecretKey, b2.Transport(minio.NewCustomHTTPTransport()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
l.mu.Lock()
|
||||
l.b2Client.Update(client)
|
||||
l.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// listBuckets is a wrapper similar to ListBuckets, which re-authorizes
|
||||
// the account and updates the B2 client safely. Once successfully
|
||||
// authorized performs the call again and returns list of buckets.
|
||||
// For any errors which are not actionable we return an error.
|
||||
func (l *b2Objects) listBuckets(ctx context.Context, err error) ([]*b2.Bucket, error) {
|
||||
if err != nil {
|
||||
if b2.Action(err) != b2.ReAuthenticate {
|
||||
return nil, err
|
||||
}
|
||||
if rerr := l.reAuthorizeAccount(ctx); rerr != nil {
|
||||
return nil, rerr
|
||||
}
|
||||
}
|
||||
bktList, lerr := l.b2Client.ListBuckets(l.ctx)
|
||||
if lerr != nil {
|
||||
return l.listBuckets(ctx, lerr)
|
||||
}
|
||||
return bktList, nil
|
||||
}
|
||||
|
||||
// Bucket - is a helper which provides a *Bucket instance
|
||||
// for performing an API operation. B2 API doesn't
|
||||
// provide a direct way to access the bucket so we need
|
||||
// to employ following technique.
|
||||
func (l *b2Objects) Bucket(ctx context.Context, bucket string) (*b2.Bucket, error) {
|
||||
bktList, err := l.listBuckets(ctx, nil)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return nil, b2ToObjectError(err, bucket)
|
||||
}
|
||||
for _, bkt := range bktList {
|
||||
if bkt.Name == bucket {
|
||||
return bkt, nil
|
||||
}
|
||||
}
|
||||
return nil, minio.BucketNotFound{Bucket: bucket}
|
||||
}
|
||||
|
||||
// GetBucketInfo gets bucket metadata..
|
||||
func (l *b2Objects) GetBucketInfo(ctx context.Context, bucket string) (bi minio.BucketInfo, err error) {
|
||||
if _, err = l.Bucket(ctx, bucket); err != nil {
|
||||
return bi, err
|
||||
}
|
||||
return minio.BucketInfo{
|
||||
Name: bucket,
|
||||
Created: time.Unix(0, 0),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ListBuckets lists all B2 buckets
|
||||
func (l *b2Objects) ListBuckets(ctx context.Context) ([]minio.BucketInfo, error) {
|
||||
bktList, err := l.listBuckets(ctx, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var bktInfo []minio.BucketInfo
|
||||
for _, bkt := range bktList {
|
||||
bktInfo = append(bktInfo, minio.BucketInfo{
|
||||
Name: bkt.Name,
|
||||
Created: time.Unix(0, 0),
|
||||
})
|
||||
}
|
||||
return bktInfo, nil
|
||||
}
|
||||
|
||||
// DeleteBucket deletes a bucket on B2
|
||||
func (l *b2Objects) DeleteBucket(ctx context.Context, bucket string) error {
|
||||
bkt, err := l.Bucket(ctx, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = bkt.DeleteBucket(l.ctx)
|
||||
logger.LogIf(ctx, err)
|
||||
return b2ToObjectError(err, bucket)
|
||||
}
|
||||
|
||||
// ListObjects lists all objects in B2 bucket filtered by prefix, returns upto at max 1000 entries at a time.
|
||||
func (l *b2Objects) ListObjects(ctx context.Context, bucket string, prefix string, marker string, delimiter string, maxKeys int) (loi minio.ListObjectsInfo, err error) {
|
||||
bkt, err := l.Bucket(ctx, bucket)
|
||||
if err != nil {
|
||||
return loi, err
|
||||
}
|
||||
files, next, lerr := bkt.ListFileNames(l.ctx, maxKeys, marker, prefix, delimiter)
|
||||
if lerr != nil {
|
||||
logger.LogIf(ctx, lerr)
|
||||
return loi, b2ToObjectError(lerr, bucket)
|
||||
}
|
||||
loi.IsTruncated = next != ""
|
||||
loi.NextMarker = next
|
||||
for _, file := range files {
|
||||
switch file.Status {
|
||||
case "folder":
|
||||
loi.Prefixes = append(loi.Prefixes, file.Name)
|
||||
case "upload":
|
||||
loi.Objects = append(loi.Objects, minio.ObjectInfo{
|
||||
Bucket: bucket,
|
||||
Name: file.Name,
|
||||
ModTime: file.Timestamp,
|
||||
Size: file.Size,
|
||||
ETag: minio.ToS3ETag(file.Info.ID),
|
||||
ContentType: file.Info.ContentType,
|
||||
UserDefined: file.Info.Info,
|
||||
})
|
||||
}
|
||||
}
|
||||
return loi, nil
|
||||
}
|
||||
|
||||
// ListObjectsV2 lists all objects in B2 bucket filtered by prefix, returns upto max 1000 entries at a time.
|
||||
func (l *b2Objects) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int,
|
||||
fetchOwner bool, startAfter string) (loi minio.ListObjectsV2Info, err error) {
|
||||
// fetchOwner is not supported and unused.
|
||||
marker := continuationToken
|
||||
if marker == "" {
|
||||
// B2's continuation token is an object name to "start at" rather than "start after"
|
||||
// startAfter plus the lowest character B2 supports is used so that the startAfter
|
||||
// object isn't included in the results
|
||||
marker = startAfter + " "
|
||||
}
|
||||
|
||||
bkt, err := l.Bucket(ctx, bucket)
|
||||
if err != nil {
|
||||
return loi, err
|
||||
}
|
||||
files, next, lerr := bkt.ListFileNames(l.ctx, maxKeys, marker, prefix, delimiter)
|
||||
if lerr != nil {
|
||||
logger.LogIf(ctx, lerr)
|
||||
return loi, b2ToObjectError(lerr, bucket)
|
||||
}
|
||||
loi.IsTruncated = next != ""
|
||||
loi.ContinuationToken = continuationToken
|
||||
loi.NextContinuationToken = next
|
||||
for _, file := range files {
|
||||
switch file.Status {
|
||||
case "folder":
|
||||
loi.Prefixes = append(loi.Prefixes, file.Name)
|
||||
case "upload":
|
||||
loi.Objects = append(loi.Objects, minio.ObjectInfo{
|
||||
Bucket: bucket,
|
||||
Name: file.Name,
|
||||
ModTime: file.Timestamp,
|
||||
Size: file.Size,
|
||||
ETag: minio.ToS3ETag(file.Info.ID),
|
||||
ContentType: file.Info.ContentType,
|
||||
UserDefined: file.Info.Info,
|
||||
})
|
||||
}
|
||||
}
|
||||
return loi, nil
|
||||
}
|
||||
|
||||
// GetObjectNInfo - returns object info and locked object ReadCloser
|
||||
func (l *b2Objects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header, lockType minio.LockType, opts minio.ObjectOptions) (gr *minio.GetObjectReader, err error) {
|
||||
var objInfo minio.ObjectInfo
|
||||
objInfo, err = l.GetObjectInfo(ctx, bucket, object, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var startOffset, length int64
|
||||
startOffset, length, err = rs.GetOffsetLength(objInfo.Size)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pr, pw := io.Pipe()
|
||||
go func() {
|
||||
err := l.GetObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag, opts)
|
||||
pw.CloseWithError(err)
|
||||
}()
|
||||
// Setup cleanup function to cause the above go-routine to
|
||||
// exit in case of partial read
|
||||
pipeCloser := func() { pr.Close() }
|
||||
return minio.NewGetObjectReaderFromReader(pr, objInfo, pipeCloser), nil
|
||||
}
|
||||
|
||||
// GetObject reads an object from B2. Supports additional
|
||||
// parameters like offset and length which are synonymous with
|
||||
// HTTP Range requests.
|
||||
//
|
||||
// startOffset indicates the starting read location of the object.
|
||||
// length indicates the total length of the object.
|
||||
func (l *b2Objects) GetObject(ctx context.Context, bucket string, object string, startOffset int64, length int64, writer io.Writer, etag string, opts minio.ObjectOptions) error {
|
||||
bkt, err := l.Bucket(ctx, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
reader, err := bkt.DownloadFileByName(l.ctx, object, startOffset, length)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return b2ToObjectError(err, bucket, object)
|
||||
}
|
||||
defer reader.Close()
|
||||
_, err = io.Copy(writer, reader)
|
||||
logger.LogIf(ctx, err)
|
||||
return b2ToObjectError(err, bucket, object)
|
||||
}
|
||||
|
||||
// GetObjectInfo reads object info and replies back ObjectInfo
|
||||
func (l *b2Objects) GetObjectInfo(ctx context.Context, bucket string, object string, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
|
||||
bkt, err := l.Bucket(ctx, bucket)
|
||||
if err != nil {
|
||||
return objInfo, err
|
||||
}
|
||||
f, err := bkt.DownloadFileByName(l.ctx, object, 0, 1)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return objInfo, b2ToObjectError(err, bucket, object)
|
||||
}
|
||||
f.Close()
|
||||
fi, err := bkt.File(f.ID, object).GetFileInfo(l.ctx)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return objInfo, b2ToObjectError(err, bucket, object)
|
||||
}
|
||||
return minio.ObjectInfo{
|
||||
Bucket: bucket,
|
||||
Name: object,
|
||||
ETag: minio.ToS3ETag(fi.ID),
|
||||
Size: fi.Size,
|
||||
ModTime: fi.Timestamp,
|
||||
ContentType: fi.ContentType,
|
||||
UserDefined: fi.Info,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// In B2 - You must always include the X-Bz-Content-Sha1 header with
|
||||
// your upload request. The value you provide can be:
|
||||
// (1) the 40-character hex checksum of the file,
|
||||
// (2) the string hex_digits_at_end, or
|
||||
// (3) the string do_not_verify.
|
||||
// For more reference - https://www.backblaze.com/b2/docs/uploading.html
|
||||
//
|
||||
// In our case we are going to use (2) option
|
||||
const sha1AtEOF = "hex_digits_at_end"
|
||||
|
||||
// With the second option mentioned above, you append the 40-character hex sha1
|
||||
// to the end of the request body, immediately after the contents of the file
|
||||
// being uploaded. Note that the content length is the size of the file plus 40
|
||||
// of the original size of the reader.
|
||||
//
|
||||
// newB2Reader implements a B2 compatible reader by wrapping the hash.Reader into
|
||||
// a new io.Reader which will emit out the sha1 hex digits at io.EOF.
|
||||
// It also means that your overall content size is now original size + 40 bytes.
|
||||
// Additionally this reader also verifies Hash encapsulated inside hash.Reader
|
||||
// at io.EOF if the verification failed we return an error and do not send
|
||||
// the content to server.
|
||||
func newB2Reader(r *h2.Reader, size int64) *Reader {
|
||||
return &Reader{
|
||||
r: r,
|
||||
size: size,
|
||||
sha1Hash: sha1.New(),
|
||||
}
|
||||
}
|
||||
|
||||
// Reader - is a Reader wraps the hash.Reader which will emit out the sha1
|
||||
// hex digits at io.EOF. It also means that your overall content size is
|
||||
// now original size + 40 bytes. Additionally this reader also verifies
|
||||
// Hash encapsulated inside hash.Reader at io.EOF if the verification
|
||||
// failed we return an error and do not send the content to server.
|
||||
type Reader struct {
|
||||
r *h2.Reader
|
||||
size int64
|
||||
sha1Hash hash.Hash
|
||||
|
||||
isEOF bool
|
||||
buf *strings.Reader
|
||||
}
|
||||
|
||||
// Size - Returns the total size of Reader.
|
||||
func (nb *Reader) Size() int64 { return nb.size + 40 }
|
||||
func (nb *Reader) Read(p []byte) (int, error) {
|
||||
if nb.isEOF {
|
||||
return nb.buf.Read(p)
|
||||
}
|
||||
// Read into hash to update the on going checksum.
|
||||
n, err := io.TeeReader(nb.r, nb.sha1Hash).Read(p)
|
||||
if err == io.EOF {
|
||||
// Stream is not corrupted on this end
|
||||
// now fill in the last 40 bytes of sha1 hex
|
||||
// so that the server can verify the stream on
|
||||
// their end.
|
||||
err = nil
|
||||
nb.isEOF = true
|
||||
nb.buf = strings.NewReader(fmt.Sprintf("%x", nb.sha1Hash.Sum(nil)))
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
// PutObject uploads the single upload to B2 backend by using *b2_upload_file* API, uploads upto 5GiB.
|
||||
func (l *b2Objects) PutObject(ctx context.Context, bucket string, object string, r *minio.PutObjReader, metadata map[string]string, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
|
||||
data := r.Reader
|
||||
|
||||
bkt, err := l.Bucket(ctx, bucket)
|
||||
if err != nil {
|
||||
return objInfo, err
|
||||
}
|
||||
contentType := metadata["content-type"]
|
||||
delete(metadata, "content-type")
|
||||
|
||||
var u *b2.URL
|
||||
u, err = bkt.GetUploadURL(l.ctx)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return objInfo, b2ToObjectError(err, bucket, object)
|
||||
}
|
||||
|
||||
hr := newB2Reader(data, data.Size())
|
||||
var f *b2.File
|
||||
f, err = u.UploadFile(l.ctx, hr, int(hr.Size()), object, contentType, sha1AtEOF, metadata)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return objInfo, b2ToObjectError(err, bucket, object)
|
||||
}
|
||||
|
||||
var fi *b2.FileInfo
|
||||
fi, err = f.GetFileInfo(l.ctx)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return objInfo, b2ToObjectError(err, bucket, object)
|
||||
}
|
||||
|
||||
return minio.ObjectInfo{
|
||||
Bucket: bucket,
|
||||
Name: object,
|
||||
ETag: minio.ToS3ETag(fi.ID),
|
||||
Size: fi.Size,
|
||||
ModTime: fi.Timestamp,
|
||||
ContentType: fi.ContentType,
|
||||
UserDefined: fi.Info,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// DeleteObject deletes a blob in bucket
|
||||
func (l *b2Objects) DeleteObject(ctx context.Context, bucket string, object string) error {
|
||||
bkt, err := l.Bucket(ctx, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
reader, err := bkt.DownloadFileByName(l.ctx, object, 0, 1)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return b2ToObjectError(err, bucket, object)
|
||||
}
|
||||
io.Copy(ioutil.Discard, reader)
|
||||
reader.Close()
|
||||
err = bkt.File(reader.ID, object).DeleteFileVersion(l.ctx)
|
||||
logger.LogIf(ctx, err)
|
||||
return b2ToObjectError(err, bucket, object)
|
||||
}
|
||||
|
||||
// ListMultipartUploads lists all multipart uploads.
|
||||
func (l *b2Objects) ListMultipartUploads(ctx context.Context, bucket string, prefix string, keyMarker string, uploadIDMarker string,
|
||||
delimiter string, maxUploads int) (lmi minio.ListMultipartsInfo, err error) {
|
||||
// keyMarker, prefix, delimiter are all ignored, Backblaze B2 doesn't support any
|
||||
// of these parameters only equivalent parameter is uploadIDMarker.
|
||||
bkt, err := l.Bucket(ctx, bucket)
|
||||
if err != nil {
|
||||
return lmi, err
|
||||
}
|
||||
// The maximum number of files to return from this call.
|
||||
// The default value is 100, and the maximum allowed is 100.
|
||||
if maxUploads > 100 {
|
||||
maxUploads = 100
|
||||
}
|
||||
largeFiles, nextMarker, err := bkt.ListUnfinishedLargeFiles(l.ctx, uploadIDMarker, maxUploads)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return lmi, b2ToObjectError(err, bucket)
|
||||
}
|
||||
lmi = minio.ListMultipartsInfo{
|
||||
MaxUploads: maxUploads,
|
||||
}
|
||||
if nextMarker != "" {
|
||||
lmi.IsTruncated = true
|
||||
lmi.NextUploadIDMarker = nextMarker
|
||||
}
|
||||
for _, largeFile := range largeFiles {
|
||||
lmi.Uploads = append(lmi.Uploads, minio.MultipartInfo{
|
||||
Object: largeFile.Name,
|
||||
UploadID: largeFile.ID,
|
||||
Initiated: largeFile.Timestamp,
|
||||
})
|
||||
}
|
||||
return lmi, nil
|
||||
}
|
||||
|
||||
// NewMultipartUpload upload object in multiple parts, uses B2's LargeFile upload API.
|
||||
// Large files can range in size from 5MB to 10TB.
|
||||
// Each large file must consist of at least 2 parts, and all of the parts except the
|
||||
// last one must be at least 5MB in size. The last part must contain at least one byte.
|
||||
// For more information - https://www.backblaze.com/b2/docs/large_files.html
|
||||
func (l *b2Objects) NewMultipartUpload(ctx context.Context, bucket string, object string, metadata map[string]string, o minio.ObjectOptions) (string, error) {
|
||||
var uploadID string
|
||||
bkt, err := l.Bucket(ctx, bucket)
|
||||
if err != nil {
|
||||
return uploadID, err
|
||||
}
|
||||
|
||||
contentType := metadata["content-type"]
|
||||
delete(metadata, "content-type")
|
||||
lf, err := bkt.StartLargeFile(l.ctx, object, contentType, metadata)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return uploadID, b2ToObjectError(err, bucket, object)
|
||||
}
|
||||
|
||||
return lf.ID, nil
|
||||
}
|
||||
|
||||
// PutObjectPart puts a part of object in bucket, uses B2's LargeFile upload API.
|
||||
func (l *b2Objects) PutObjectPart(ctx context.Context, bucket string, object string, uploadID string, partID int, r *minio.PutObjReader, opts minio.ObjectOptions) (pi minio.PartInfo, err error) {
|
||||
data := r.Reader
|
||||
bkt, err := l.Bucket(ctx, bucket)
|
||||
if err != nil {
|
||||
return pi, err
|
||||
}
|
||||
|
||||
fc, err := bkt.File(uploadID, object).CompileParts(0, nil).GetUploadPartURL(l.ctx)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return pi, b2ToObjectError(err, bucket, object, uploadID)
|
||||
}
|
||||
|
||||
hr := newB2Reader(data, data.Size())
|
||||
sha1, err := fc.UploadPart(l.ctx, hr, sha1AtEOF, int(hr.Size()), partID)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return pi, b2ToObjectError(err, bucket, object, uploadID)
|
||||
}
|
||||
|
||||
return minio.PartInfo{
|
||||
PartNumber: partID,
|
||||
LastModified: minio.UTCNow(),
|
||||
ETag: minio.ToS3ETag(sha1),
|
||||
Size: data.Size(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ListObjectParts returns all object parts for specified object in specified bucket, uses B2's LargeFile upload API.
|
||||
func (l *b2Objects) ListObjectParts(ctx context.Context, bucket string, object string, uploadID string, partNumberMarker int, maxParts int, opts minio.ObjectOptions) (lpi minio.ListPartsInfo, err error) {
|
||||
bkt, err := l.Bucket(ctx, bucket)
|
||||
if err != nil {
|
||||
return lpi, err
|
||||
}
|
||||
lpi = minio.ListPartsInfo{
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
UploadID: uploadID,
|
||||
MaxParts: maxParts,
|
||||
PartNumberMarker: partNumberMarker,
|
||||
}
|
||||
// startPartNumber must be in the range 1 - 10000 for B2.
|
||||
partNumberMarker++
|
||||
partsList, next, err := bkt.File(uploadID, object).ListParts(l.ctx, partNumberMarker, maxParts)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return lpi, b2ToObjectError(err, bucket, object, uploadID)
|
||||
}
|
||||
if next != 0 {
|
||||
lpi.IsTruncated = true
|
||||
lpi.NextPartNumberMarker = next
|
||||
}
|
||||
for _, part := range partsList {
|
||||
lpi.Parts = append(lpi.Parts, minio.PartInfo{
|
||||
PartNumber: part.Number,
|
||||
ETag: minio.ToS3ETag(part.SHA1),
|
||||
Size: part.Size,
|
||||
})
|
||||
}
|
||||
return lpi, nil
|
||||
}
|
||||
|
||||
// AbortMultipartUpload aborts a on going multipart upload, uses B2's LargeFile upload API.
|
||||
func (l *b2Objects) AbortMultipartUpload(ctx context.Context, bucket string, object string, uploadID string) error {
|
||||
bkt, err := l.Bucket(ctx, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = bkt.File(uploadID, object).CompileParts(0, nil).CancelLargeFile(l.ctx)
|
||||
logger.LogIf(ctx, err)
|
||||
return b2ToObjectError(err, bucket, object, uploadID)
|
||||
}
|
||||
|
||||
// CompleteMultipartUpload completes ongoing multipart upload and finalizes object, uses B2's LargeFile upload API.
|
||||
func (l *b2Objects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, uploadedParts []minio.CompletePart, opts minio.ObjectOptions) (oi minio.ObjectInfo, err error) {
|
||||
bkt, err := l.Bucket(ctx, bucket)
|
||||
if err != nil {
|
||||
return oi, err
|
||||
}
|
||||
hashes := make(map[int]string)
|
||||
for i, uploadedPart := range uploadedParts {
|
||||
// B2 requires contigous part numbers starting with 1, they do not support
|
||||
// hand picking part numbers, we return an S3 compatible error instead.
|
||||
if i+1 != uploadedPart.PartNumber {
|
||||
logger.LogIf(ctx, minio.InvalidPart{})
|
||||
return oi, b2ToObjectError(minio.InvalidPart{}, bucket, object, uploadID)
|
||||
}
|
||||
|
||||
// Trim "-1" suffix in ETag as PutObjectPart() treats B2 returned SHA1 as ETag.
|
||||
hashes[uploadedPart.PartNumber] = strings.TrimSuffix(uploadedPart.ETag, "-1")
|
||||
}
|
||||
|
||||
if _, err = bkt.File(uploadID, object).CompileParts(0, hashes).FinishLargeFile(l.ctx); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return oi, b2ToObjectError(err, bucket, object, uploadID)
|
||||
}
|
||||
|
||||
return l.GetObjectInfo(ctx, bucket, object, minio.ObjectOptions{})
|
||||
}
|
||||
|
||||
// SetBucketPolicy - B2 supports 2 types of bucket policies:
|
||||
// bucketType.AllPublic - bucketTypeReadOnly means that anybody can download the files is the bucket;
|
||||
// bucketType.AllPrivate - bucketTypePrivate means that you need an authorization token to download them.
|
||||
// Default is AllPrivate for all buckets.
|
||||
func (l *b2Objects) SetBucketPolicy(ctx context.Context, bucket string, bucketPolicy *policy.Policy) error {
|
||||
policyInfo, err := minio.PolicyToBucketAccessPolicy(bucketPolicy)
|
||||
if err != nil {
|
||||
// This should not happen.
|
||||
return b2ToObjectError(err, bucket)
|
||||
}
|
||||
|
||||
var policies []minio.BucketAccessPolicy
|
||||
for prefix, policy := range miniogopolicy.GetPolicies(policyInfo.Statements, bucket, "") {
|
||||
policies = append(policies, minio.BucketAccessPolicy{
|
||||
Prefix: prefix,
|
||||
Policy: policy,
|
||||
})
|
||||
}
|
||||
prefix := bucket + "/*" // For all objects inside the bucket.
|
||||
if len(policies) != 1 {
|
||||
logger.LogIf(ctx, minio.NotImplemented{})
|
||||
return minio.NotImplemented{}
|
||||
}
|
||||
if policies[0].Prefix != prefix {
|
||||
logger.LogIf(ctx, minio.NotImplemented{})
|
||||
return minio.NotImplemented{}
|
||||
}
|
||||
if policies[0].Policy != miniogopolicy.BucketPolicyReadOnly {
|
||||
logger.LogIf(ctx, minio.NotImplemented{})
|
||||
return minio.NotImplemented{}
|
||||
}
|
||||
bkt, err := l.Bucket(ctx, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bkt.Type = bucketTypeReadOnly
|
||||
_, err = bkt.Update(l.ctx)
|
||||
logger.LogIf(ctx, err)
|
||||
return b2ToObjectError(err)
|
||||
}
|
||||
|
||||
// GetBucketPolicy, returns the current bucketType from B2 backend and convert
|
||||
// it into S3 compatible bucket policy info.
|
||||
func (l *b2Objects) GetBucketPolicy(ctx context.Context, bucket string) (*policy.Policy, error) {
|
||||
bkt, err := l.Bucket(ctx, bucket)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// bkt.Type can also be snapshot, but it is only allowed through B2 browser console,
|
||||
// just return back as policy not found for all cases.
|
||||
// CreateBucket always sets the value to allPrivate by default.
|
||||
if bkt.Type != bucketTypeReadOnly {
|
||||
return nil, minio.BucketPolicyNotFound{Bucket: bucket}
|
||||
}
|
||||
|
||||
return &policy.Policy{
|
||||
Version: policy.DefaultVersion,
|
||||
Statements: []policy.Statement{
|
||||
policy.NewStatement(
|
||||
policy.Allow,
|
||||
policy.NewPrincipal("*"),
|
||||
policy.NewActionSet(
|
||||
policy.GetBucketLocationAction,
|
||||
policy.ListBucketAction,
|
||||
policy.GetObjectAction,
|
||||
),
|
||||
policy.NewResourceSet(
|
||||
policy.NewResource(bucket, ""),
|
||||
policy.NewResource(bucket, "*"),
|
||||
),
|
||||
condition.NewFunctions(),
|
||||
),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// DeleteBucketPolicy - resets the bucketType of bucket on B2 to 'allPrivate'.
|
||||
func (l *b2Objects) DeleteBucketPolicy(ctx context.Context, bucket string) error {
|
||||
bkt, err := l.Bucket(ctx, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bkt.Type = bucketTypePrivate
|
||||
_, err = bkt.Update(l.ctx)
|
||||
logger.LogIf(ctx, err)
|
||||
return b2ToObjectError(err)
|
||||
}
|
||||
|
||||
// IsCompressionSupported returns whether compression is applicable for this layer.
|
||||
func (l *b2Objects) IsCompressionSupported() bool {
|
||||
return false
|
||||
}
|
||||
@@ -1,119 +0,0 @@
|
||||
/*
|
||||
* Minio Cloud Storage, (C) 2017 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package b2
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
b2 "github.com/minio/blazer/base"
|
||||
|
||||
minio "github.com/minio/minio/cmd"
|
||||
)
|
||||
|
||||
// Test b2 object error.
|
||||
func TestB2ObjectError(t *testing.T) {
|
||||
testCases := []struct {
|
||||
params []string
|
||||
b2Err error
|
||||
expectedErr error
|
||||
}{
|
||||
{
|
||||
[]string{}, nil, nil,
|
||||
},
|
||||
{
|
||||
[]string{}, fmt.Errorf("Not *Error"), fmt.Errorf("Not *Error"),
|
||||
},
|
||||
{
|
||||
[]string{}, fmt.Errorf("Non B2 Error"), fmt.Errorf("Non B2 Error"),
|
||||
},
|
||||
{
|
||||
[]string{"bucket"}, b2.Error{
|
||||
StatusCode: 1,
|
||||
Code: "duplicate_bucket_name",
|
||||
}, minio.BucketAlreadyOwnedByYou{
|
||||
Bucket: "bucket",
|
||||
},
|
||||
},
|
||||
{
|
||||
[]string{"bucket"}, b2.Error{
|
||||
StatusCode: 1,
|
||||
Code: "bad_request",
|
||||
}, minio.BucketNotFound{
|
||||
Bucket: "bucket",
|
||||
},
|
||||
},
|
||||
{
|
||||
[]string{"bucket", "object"}, b2.Error{
|
||||
StatusCode: 1,
|
||||
Code: "bad_request",
|
||||
}, minio.ObjectNameInvalid{
|
||||
Bucket: "bucket",
|
||||
Object: "object",
|
||||
},
|
||||
},
|
||||
{
|
||||
[]string{"bucket"}, b2.Error{
|
||||
StatusCode: 1,
|
||||
Code: "bad_bucket_id",
|
||||
}, minio.BucketNotFound{Bucket: "bucket"},
|
||||
},
|
||||
{
|
||||
[]string{"bucket", "object"}, b2.Error{
|
||||
StatusCode: 1,
|
||||
Code: "file_not_present",
|
||||
}, minio.ObjectNotFound{
|
||||
Bucket: "bucket",
|
||||
Object: "object",
|
||||
},
|
||||
},
|
||||
{
|
||||
[]string{"bucket", "object"}, b2.Error{
|
||||
StatusCode: 1,
|
||||
Code: "not_found",
|
||||
}, minio.ObjectNotFound{
|
||||
Bucket: "bucket",
|
||||
Object: "object",
|
||||
},
|
||||
},
|
||||
{
|
||||
[]string{"bucket"}, b2.Error{
|
||||
StatusCode: 1,
|
||||
Code: "cannot_delete_non_empty_bucket",
|
||||
}, minio.BucketNotEmpty{
|
||||
Bucket: "bucket",
|
||||
},
|
||||
},
|
||||
{
|
||||
[]string{"bucket", "object", "uploadID"}, b2.Error{
|
||||
StatusCode: 1,
|
||||
Message: "No active upload for",
|
||||
}, minio.InvalidUploadID{
|
||||
UploadID: "uploadID",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for i, testCase := range testCases {
|
||||
actualErr := b2ToObjectError(testCase.b2Err, testCase.params...)
|
||||
if actualErr != nil {
|
||||
if actualErr.Error() != testCase.expectedErr.Error() {
|
||||
t.Errorf("Test %d: Expected %s, got %s", i+1, testCase.expectedErr, actualErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -19,12 +19,9 @@ package gateway
|
||||
import (
|
||||
// Import all gateways.
|
||||
_ "github.com/minio/minio/cmd/gateway/azure"
|
||||
_ "github.com/minio/minio/cmd/gateway/b2"
|
||||
_ "github.com/minio/minio/cmd/gateway/gcs"
|
||||
_ "github.com/minio/minio/cmd/gateway/manta"
|
||||
_ "github.com/minio/minio/cmd/gateway/nas"
|
||||
_ "github.com/minio/minio/cmd/gateway/oss"
|
||||
_ "github.com/minio/minio/cmd/gateway/s3"
|
||||
_ "github.com/minio/minio/cmd/gateway/sia"
|
||||
// Add your gateway here.
|
||||
)
|
||||
|
||||
@@ -1,666 +0,0 @@
|
||||
/*
|
||||
* Minio Cloud Storage, (C) 2017, 2018 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package manta
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/pem"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
triton "github.com/joyent/triton-go"
|
||||
"github.com/joyent/triton-go/authentication"
|
||||
terrors "github.com/joyent/triton-go/errors"
|
||||
"github.com/joyent/triton-go/storage"
|
||||
"github.com/minio/cli"
|
||||
minio "github.com/minio/minio/cmd"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/auth"
|
||||
)
|
||||
|
||||
// stor is a namespace within manta where you store any documents that are deemed as private
|
||||
// and require access credentials to read them. Within the stor namespace, you can create any
|
||||
// number of directories and objects.
|
||||
const (
|
||||
mantaBackend = "manta"
|
||||
defaultMantaRoot = "/stor"
|
||||
defaultMantaURL = "https://us-east.manta.joyent.com"
|
||||
)
|
||||
|
||||
var mantaRoot = defaultMantaRoot
|
||||
|
||||
func init() {
|
||||
const mantaGatewayTemplate = `NAME:
|
||||
{{.HelpName}} - {{.Usage}}
|
||||
USAGE:
|
||||
{{.HelpName}} {{if .VisibleFlags}}[FLAGS]{{end}} [ENDPOINT]
|
||||
{{if .VisibleFlags}}
|
||||
FLAGS:
|
||||
{{range .VisibleFlags}}{{.}}
|
||||
{{end}}{{end}}
|
||||
ENDPOINT:
|
||||
Manta server endpoint. Default ENDPOINT is https://us-east.manta.joyent.com
|
||||
|
||||
ENVIRONMENT VARIABLES:
|
||||
ACCESS:
|
||||
MINIO_ACCESS_KEY: The Manta account name.
|
||||
MINIO_SECRET_KEY: A KeyID associated with the Manta account.
|
||||
MANTA_KEY_MATERIAL: The path to the SSH Key associated with the Manta account if the MINIO_SECRET_KEY is not in SSH Agent.
|
||||
MANTA_SUBUSER: The username of a user who has limited access to your account.
|
||||
|
||||
BROWSER:
|
||||
MINIO_BROWSER: To disable web browser access, set this value to "off".
|
||||
|
||||
DOMAIN:
|
||||
MINIO_DOMAIN: To enable virtual-host-style requests, set this value to Minio host domain name.
|
||||
|
||||
CACHE:
|
||||
MINIO_CACHE_DRIVES: List of mounted drives or directories delimited by ";".
|
||||
MINIO_CACHE_EXCLUDE: List of cache exclusion patterns delimited by ";".
|
||||
MINIO_CACHE_EXPIRY: Cache expiry duration in days.
|
||||
MINIO_CACHE_MAXUSE: Maximum permitted usage of the cache in percentage (0-100).
|
||||
|
||||
EXAMPLES:
|
||||
1. Start minio gateway server for Manta Object Storage backend.
|
||||
$ export MINIO_ACCESS_KEY=manta_account_name
|
||||
$ export MINIO_SECRET_KEY=manta_key_id
|
||||
$ {{.HelpName}}
|
||||
|
||||
2. Start minio gateway server for Manta Object Storage backend on custom endpoint.
|
||||
$ export MINIO_ACCESS_KEY=manta_account_name
|
||||
$ export MINIO_SECRET_KEY=manta_key_id
|
||||
$ {{.HelpName}} https://us-west.manta.joyent.com
|
||||
|
||||
3. Start minio gateway server for Manta Object Storage backend without using SSH Agent.
|
||||
$ export MINIO_ACCESS_KEY=manta_account_name
|
||||
$ export MINIO_SECRET_KEY=manta_key_id
|
||||
$ export MANTA_KEY_MATERIAL=~/.ssh/custom_rsa
|
||||
$ {{.HelpName}}
|
||||
|
||||
4. Start minio gateway server for Manta Object Storage backend with edge caching enabled.
|
||||
$ export MINIO_ACCESS_KEY=manta_account_name
|
||||
$ export MINIO_SECRET_KEY=manta_key_id
|
||||
$ export MINIO_CACHE_DRIVES="/mnt/drive1;/mnt/drive2;/mnt/drive3;/mnt/drive4"
|
||||
$ export MINIO_CACHE_EXCLUDE="bucket1/*;*.png"
|
||||
$ export MINIO_CACHE_EXPIRY=40
|
||||
$ export MINIO_CACHE_MAXUSE=80
|
||||
$ {{.HelpName}}
|
||||
`
|
||||
|
||||
minio.RegisterGatewayCommand(cli.Command{
|
||||
Name: mantaBackend,
|
||||
Usage: "Manta Object Storage",
|
||||
Action: mantaGatewayMain,
|
||||
CustomHelpTemplate: mantaGatewayTemplate,
|
||||
HideHelpCommand: true,
|
||||
})
|
||||
}
|
||||
|
||||
func mantaGatewayMain(ctx *cli.Context) {
|
||||
args := ctx.Args()
|
||||
if !ctx.Args().Present() {
|
||||
args = cli.Args{"https://us-east.manta.joyent.com"}
|
||||
}
|
||||
|
||||
// Validate gateway arguments.
|
||||
logger.FatalIf(minio.ValidateGatewayArguments(ctx.GlobalString("address"), args.First()), "Invalid argument")
|
||||
|
||||
// Start the gateway..
|
||||
minio.StartGateway(ctx, &Manta{args.First()})
|
||||
}
|
||||
|
||||
// Manta implements Gateway.
|
||||
type Manta struct {
|
||||
host string
|
||||
}
|
||||
|
||||
// Name implements Gateway interface.
|
||||
func (g *Manta) Name() string {
|
||||
return mantaBackend
|
||||
}
|
||||
|
||||
// NewGatewayLayer returns manta gateway layer, implements ObjectLayer interface to
|
||||
// talk to manta remote backend.
|
||||
func (g *Manta) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error) {
|
||||
var err error
|
||||
var secure bool
|
||||
var signer authentication.Signer
|
||||
var endpoint = defaultMantaURL
|
||||
ctx := context.Background()
|
||||
|
||||
if g.host != "" {
|
||||
endpoint, secure, err = minio.ParseGatewayEndpoint(g.host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if secure {
|
||||
endpoint = "https://" + endpoint
|
||||
} else {
|
||||
endpoint = "http://" + endpoint
|
||||
}
|
||||
}
|
||||
if overrideRoot, ok := os.LookupEnv("MANTA_ROOT"); ok {
|
||||
mantaRoot = overrideRoot
|
||||
}
|
||||
|
||||
keyMaterial := os.Getenv("MANTA_KEY_MATERIAL")
|
||||
|
||||
if keyMaterial == "" {
|
||||
input := authentication.SSHAgentSignerInput{
|
||||
KeyID: creds.SecretKey,
|
||||
AccountName: creds.AccessKey,
|
||||
}
|
||||
if userName, ok := os.LookupEnv("MANTA_SUBUSER"); ok {
|
||||
input.Username = userName
|
||||
}
|
||||
signer, err = authentication.NewSSHAgentSigner(input)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
var keyBytes []byte
|
||||
if _, err = os.Stat(keyMaterial); err == nil {
|
||||
keyBytes, err = ioutil.ReadFile(keyMaterial)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Error reading key material from %s: %s",
|
||||
keyMaterial, err)
|
||||
}
|
||||
block, _ := pem.Decode(keyBytes)
|
||||
if block == nil {
|
||||
return nil, fmt.Errorf(
|
||||
"Failed to read key material '%s': no key found", keyMaterial)
|
||||
}
|
||||
|
||||
if block.Headers["Proc-Type"] == "4,ENCRYPTED" {
|
||||
return nil, fmt.Errorf(
|
||||
"Failed to read key '%s': password protected keys are\n"+
|
||||
"not currently supported. Please decrypt the key prior to use.", keyMaterial)
|
||||
}
|
||||
|
||||
} else {
|
||||
keyBytes = []byte(keyMaterial)
|
||||
}
|
||||
|
||||
input := authentication.PrivateKeySignerInput{
|
||||
KeyID: creds.SecretKey,
|
||||
PrivateKeyMaterial: keyBytes,
|
||||
AccountName: creds.AccessKey,
|
||||
}
|
||||
if userName, ok := os.LookupEnv("MANTA_SUBUSER"); ok {
|
||||
input.Username = userName
|
||||
}
|
||||
|
||||
signer, err = authentication.NewPrivateKeySigner(input)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
tc, err := storage.NewClient(&triton.ClientConfig{
|
||||
MantaURL: endpoint,
|
||||
AccountName: creds.AccessKey,
|
||||
Signers: []authentication.Signer{signer},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tc.Client.HTTPClient = &http.Client{
|
||||
Transport: minio.NewCustomHTTPTransport(),
|
||||
}
|
||||
|
||||
return &tritonObjects{
|
||||
client: tc,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Production - Manta is production ready.
|
||||
func (g *Manta) Production() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// tritonObjects - Implements Object layer for Triton Manta storage
|
||||
type tritonObjects struct {
|
||||
minio.GatewayUnsupported
|
||||
client *storage.StorageClient
|
||||
}
|
||||
|
||||
// Shutdown - save any gateway metadata to disk
|
||||
// if necessary and reload upon next restart.
|
||||
func (t *tritonObjects) Shutdown(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// StorageInfo - Not relevant to Triton backend.
|
||||
func (t *tritonObjects) StorageInfo(ctx context.Context) (si minio.StorageInfo) {
|
||||
return si
|
||||
}
|
||||
|
||||
//
|
||||
// ~~~ Buckets ~~~
|
||||
//
|
||||
|
||||
// MakeBucketWithLocation - Create a new directory within manta.
|
||||
//
|
||||
// https://apidocs.joyent.com/manta/api.html#PutDirectory
|
||||
func (t *tritonObjects) MakeBucketWithLocation(ctx context.Context, bucket, location string) error {
|
||||
err := t.client.Dir().Put(ctx, &storage.PutDirectoryInput{
|
||||
DirectoryName: path.Join(mantaRoot, bucket),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetBucketInfo - Get directory metadata..
|
||||
//
|
||||
// https://apidocs.joyent.com/manta/api.html#GetObject
|
||||
func (t *tritonObjects) GetBucketInfo(ctx context.Context, bucket string) (bi minio.BucketInfo, e error) {
|
||||
var info minio.BucketInfo
|
||||
resp, err := t.client.Objects().Get(ctx, &storage.GetObjectInput{
|
||||
ObjectPath: path.Join(mantaRoot, bucket),
|
||||
})
|
||||
if err != nil {
|
||||
return info, err
|
||||
}
|
||||
|
||||
return minio.BucketInfo{
|
||||
Name: bucket,
|
||||
Created: resp.LastModified,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ListBuckets - Lists all Manta directories, uses Manta equivalent
|
||||
// ListDirectories.
|
||||
//
|
||||
// https://apidocs.joyent.com/manta/api.html#ListDirectory
|
||||
func (t *tritonObjects) ListBuckets(ctx context.Context) (buckets []minio.BucketInfo, err error) {
|
||||
dirs, err := t.client.Dir().List(ctx, &storage.ListDirectoryInput{
|
||||
DirectoryName: path.Join(mantaRoot),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, dir := range dirs.Entries {
|
||||
if dir.Type == "directory" {
|
||||
buckets = append(buckets, minio.BucketInfo{
|
||||
Name: dir.Name,
|
||||
Created: dir.ModifiedTime,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return buckets, nil
|
||||
}
|
||||
|
||||
// DeleteBucket - Delete a directory in Manta, uses Manta equivalent
|
||||
// DeleteDirectory.
|
||||
//
|
||||
// https://apidocs.joyent.com/manta/api.html#DeleteDirectory
|
||||
func (t *tritonObjects) DeleteBucket(ctx context.Context, bucket string) error {
|
||||
return t.client.Dir().Delete(ctx, &storage.DeleteDirectoryInput{
|
||||
DirectoryName: path.Join(mantaRoot, bucket),
|
||||
})
|
||||
}
|
||||
|
||||
//
|
||||
// ~~~ Objects ~~~
|
||||
//
|
||||
|
||||
// ListObjects - Lists all objects in Manta with a container filtered by prefix
|
||||
// and marker, uses Manta equivalent ListDirectory.
|
||||
//
|
||||
// https://apidocs.joyent.com/manta/api.html#ListDirectory
|
||||
func (t *tritonObjects) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (result minio.ListObjectsInfo, err error) {
|
||||
var (
|
||||
dirName string
|
||||
objs *storage.ListDirectoryOutput
|
||||
input *storage.ListDirectoryInput
|
||||
|
||||
pathBase = path.Base(prefix)
|
||||
)
|
||||
|
||||
// Make sure to only request a Dir.List for the parent "directory" for a
|
||||
// given prefix first. We don't know if our prefix is referencing a
|
||||
// directory or file name and can't send file names into Dir.List because
|
||||
// that'll cause Manta to return file content in the response body. Dir.List
|
||||
// expects to parse out directory entries in JSON. So, try the first
|
||||
// directory name of the prefix path provided.
|
||||
if pathDir := path.Dir(prefix); pathDir == "." {
|
||||
dirName = path.Join(mantaRoot, bucket)
|
||||
} else {
|
||||
dirName = path.Join(mantaRoot, bucket, pathDir)
|
||||
}
|
||||
|
||||
if marker != "" {
|
||||
// Manta uses the marker as the key to start at rather than start after
|
||||
// A space is appended to the marker so that the corresponding object is not
|
||||
// included in the results
|
||||
marker += " "
|
||||
}
|
||||
|
||||
input = &storage.ListDirectoryInput{
|
||||
DirectoryName: dirName,
|
||||
Limit: uint64(maxKeys),
|
||||
Marker: marker,
|
||||
}
|
||||
objs, err = t.client.Dir().List(ctx, input)
|
||||
if err != nil {
|
||||
if terrors.IsResourceNotFoundError(err) {
|
||||
return result, nil
|
||||
}
|
||||
logger.LogIf(ctx, err)
|
||||
return result, err
|
||||
}
|
||||
|
||||
for _, obj := range objs.Entries {
|
||||
// If the base name of our prefix was found to be of type "directory"
|
||||
// than we need to pull the directory entries for that instead.
|
||||
if obj.Name == pathBase && obj.Type == "directory" {
|
||||
input.DirectoryName = path.Join(mantaRoot, bucket, prefix)
|
||||
objs, err = t.client.Dir().List(ctx, input)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return result, err
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
isTruncated := true // Always send a second request.
|
||||
if marker == "" && len(objs.Entries) < maxKeys {
|
||||
isTruncated = false
|
||||
} else if marker != "" && len(objs.Entries) < maxKeys {
|
||||
isTruncated = false
|
||||
}
|
||||
|
||||
for _, obj := range objs.Entries {
|
||||
if obj.Type == "directory" {
|
||||
result.Prefixes = append(result.Prefixes, obj.Name+delimiter)
|
||||
} else {
|
||||
result.Objects = append(result.Objects, minio.ObjectInfo{
|
||||
Name: obj.Name,
|
||||
Size: int64(obj.Size),
|
||||
ModTime: obj.ModifiedTime,
|
||||
ETag: obj.ETag,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
result.IsTruncated = isTruncated
|
||||
if isTruncated {
|
||||
result.NextMarker = result.Objects[len(result.Objects)-1].Name
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
//
|
||||
// ~~~ Objects ~~~
|
||||
//
|
||||
|
||||
// ListObjectsV2 - Lists all objects in Manta with a container filtered by prefix
|
||||
// and continuationToken, uses Manta equivalent ListDirectory.
|
||||
//
|
||||
// https://apidocs.joyent.com/manta/api.html#ListDirectory
|
||||
func (t *tritonObjects) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result minio.ListObjectsV2Info, err error) {
|
||||
var (
|
||||
dirName string
|
||||
objs *storage.ListDirectoryOutput
|
||||
input *storage.ListDirectoryInput
|
||||
|
||||
pathBase = path.Base(prefix)
|
||||
)
|
||||
|
||||
marker := continuationToken
|
||||
if marker == "" {
|
||||
marker = startAfter
|
||||
}
|
||||
|
||||
if marker != "" {
|
||||
// Manta uses the marker as the key to start at rather than start after.
|
||||
// A space is appended to the marker so that the corresponding object is not
|
||||
// included in the results
|
||||
marker += " "
|
||||
}
|
||||
|
||||
if pathDir := path.Dir(prefix); pathDir == "." {
|
||||
dirName = path.Join(mantaRoot, bucket)
|
||||
} else {
|
||||
dirName = path.Join(mantaRoot, bucket, pathDir)
|
||||
}
|
||||
|
||||
input = &storage.ListDirectoryInput{
|
||||
DirectoryName: dirName,
|
||||
Limit: uint64(maxKeys),
|
||||
Marker: marker,
|
||||
}
|
||||
objs, err = t.client.Dir().List(ctx, input)
|
||||
if err != nil {
|
||||
if terrors.IsResourceNotFoundError(err) {
|
||||
return result, nil
|
||||
}
|
||||
logger.LogIf(ctx, err)
|
||||
return result, err
|
||||
}
|
||||
|
||||
for _, obj := range objs.Entries {
|
||||
if obj.Name == pathBase && obj.Type == "directory" {
|
||||
input.DirectoryName = path.Join(mantaRoot, bucket, prefix)
|
||||
objs, err = t.client.Dir().List(ctx, input)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return result, err
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
isTruncated := true // Always send a second request.
|
||||
if continuationToken == "" && len(objs.Entries) < maxKeys {
|
||||
isTruncated = false
|
||||
} else if continuationToken != "" && len(objs.Entries) < maxKeys {
|
||||
isTruncated = false
|
||||
}
|
||||
|
||||
for _, obj := range objs.Entries {
|
||||
if obj.Type == "directory" {
|
||||
result.Prefixes = append(result.Prefixes, obj.Name+delimiter)
|
||||
} else {
|
||||
result.Objects = append(result.Objects, minio.ObjectInfo{
|
||||
Name: obj.Name,
|
||||
Size: int64(obj.Size),
|
||||
ModTime: obj.ModifiedTime,
|
||||
ETag: obj.ETag,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
result.IsTruncated = isTruncated
|
||||
if isTruncated {
|
||||
result.NextContinuationToken = result.Objects[len(result.Objects)-1].Name
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// GetObjectNInfo - returns object info and locked object ReadCloser
|
||||
func (t *tritonObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header, lockType minio.LockType, opts minio.ObjectOptions) (gr *minio.GetObjectReader, err error) {
|
||||
var objInfo minio.ObjectInfo
|
||||
objInfo, err = t.GetObjectInfo(ctx, bucket, object, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var startOffset, length int64
|
||||
startOffset, length, err = rs.GetOffsetLength(objInfo.Size)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pr, pw := io.Pipe()
|
||||
go func() {
|
||||
err := t.GetObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag, opts)
|
||||
pw.CloseWithError(err)
|
||||
}()
|
||||
// Setup cleanup function to cause the above go-routine to
|
||||
// exit in case of partial read
|
||||
pipeCloser := func() { pr.Close() }
|
||||
return minio.NewGetObjectReaderFromReader(pr, objInfo, pipeCloser), nil
|
||||
}
|
||||
|
||||
// GetObject - Reads an object from Manta. Supports additional parameters like
|
||||
// offset and length which are synonymous with HTTP Range requests.
|
||||
//
|
||||
// startOffset indicates the starting read location of the object. length
|
||||
// indicates the total length of the object.
|
||||
//
|
||||
// https://apidocs.joyent.com/manta/api.html#GetObject
|
||||
func (t *tritonObjects) GetObject(ctx context.Context, bucket, object string, startOffset int64, length int64, writer io.Writer, etag string, opts minio.ObjectOptions) error {
|
||||
// Start offset cannot be negative.
|
||||
if startOffset < 0 {
|
||||
logger.LogIf(ctx, fmt.Errorf("Unexpected error"))
|
||||
return fmt.Errorf("Unexpected error")
|
||||
}
|
||||
|
||||
output, err := t.client.Objects().Get(ctx, &storage.GetObjectInput{
|
||||
ObjectPath: path.Join(mantaRoot, bucket, object),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer output.ObjectReader.Close()
|
||||
|
||||
// Read until startOffset and discard, Manta object storage doesn't support range GET requests yet.
|
||||
if _, err = io.CopyN(ioutil.Discard, output.ObjectReader, startOffset); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if length > 0 {
|
||||
_, err = io.Copy(writer, io.LimitReader(output.ObjectReader, length))
|
||||
} else {
|
||||
_, err = io.Copy(writer, output.ObjectReader)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// GetObjectInfo - reads blob metadata properties and replies back minio.ObjectInfo,
|
||||
// uses Triton equivalent GetBlobProperties.
|
||||
//
|
||||
// https://apidocs.joyent.com/manta/api.html#GetObject
|
||||
func (t *tritonObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
|
||||
info, err := t.client.Objects().GetInfo(ctx, &storage.GetInfoInput{
|
||||
ObjectPath: path.Join(mantaRoot, bucket, object),
|
||||
})
|
||||
if err != nil {
|
||||
if terrors.IsStatusNotFoundCode(err) {
|
||||
return objInfo, minio.ObjectNotFound{
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
}
|
||||
}
|
||||
|
||||
return objInfo, err
|
||||
}
|
||||
|
||||
return minio.ObjectInfo{
|
||||
Bucket: bucket,
|
||||
ContentType: info.ContentType,
|
||||
Size: int64(info.ContentLength),
|
||||
ETag: info.ETag,
|
||||
ModTime: info.LastModified,
|
||||
UserDefined: info.Metadata,
|
||||
IsDir: strings.HasSuffix(info.ContentType, "type=directory"),
|
||||
}, nil
|
||||
}
|
||||
|
||||
type dummySeeker struct {
|
||||
io.Reader
|
||||
}
|
||||
|
||||
func (d dummySeeker) Seek(offset int64, whence int) (int64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// PutObject - Create a new blob with the incoming data, uses Triton equivalent
|
||||
// CreateBlockBlobFromReader.
|
||||
//
|
||||
// https://apidocs.joyent.com/manta/api.html#PutObject
|
||||
func (t *tritonObjects) PutObject(ctx context.Context, bucket, object string, r *minio.PutObjReader, metadata map[string]string, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
|
||||
data := r.Reader
|
||||
if err = t.client.Objects().Put(ctx, &storage.PutObjectInput{
|
||||
ContentLength: uint64(data.Size()),
|
||||
ObjectPath: path.Join(mantaRoot, bucket, object),
|
||||
ContentType: metadata["content-type"],
|
||||
// TODO: Change to `string(data.md5sum)` if/when that becomes an exported field
|
||||
ContentMD5: metadata["content-md5"],
|
||||
ObjectReader: dummySeeker{data},
|
||||
ForceInsert: true,
|
||||
}); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return objInfo, err
|
||||
}
|
||||
if err = data.Verify(); err != nil {
|
||||
t.DeleteObject(ctx, bucket, object)
|
||||
logger.LogIf(ctx, err)
|
||||
return objInfo, err
|
||||
}
|
||||
|
||||
return t.GetObjectInfo(ctx, bucket, object, opts)
|
||||
}
|
||||
|
||||
// CopyObject - Copies a blob from source container to destination container.
|
||||
// Uses Manta Snaplinks API.
|
||||
//
|
||||
// https://apidocs.joyent.com/manta/api.html#PutSnapLink
|
||||
func (t *tritonObjects) CopyObject(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
|
||||
if err = t.client.SnapLinks().Put(ctx, &storage.PutSnapLinkInput{
|
||||
SourcePath: path.Join(mantaRoot, srcBucket, srcObject),
|
||||
LinkPath: path.Join(mantaRoot, destBucket, destObject),
|
||||
}); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return objInfo, err
|
||||
}
|
||||
|
||||
return t.GetObjectInfo(ctx, destBucket, destObject, dstOpts)
|
||||
}
|
||||
|
||||
// DeleteObject - Delete a blob in Manta, uses Triton equivalent DeleteBlob API.
|
||||
//
|
||||
// https://apidocs.joyent.com/manta/api.html#DeleteObject
|
||||
func (t *tritonObjects) DeleteObject(ctx context.Context, bucket, object string) error {
|
||||
if err := t.client.Objects().Delete(ctx, &storage.DeleteObjectInput{
|
||||
ObjectPath: path.Join(mantaRoot, bucket, object),
|
||||
}); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsCompressionSupported returns whether compression is applicable for this layer.
|
||||
func (t *tritonObjects) IsCompressionSupported() bool {
|
||||
return false
|
||||
}
|
||||
@@ -1,658 +0,0 @@
|
||||
/*
|
||||
* Minio Cloud Storage, (C) 2017, 2018 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package sia
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
humanize "github.com/dustin/go-humanize"
|
||||
"github.com/fatih/color"
|
||||
"github.com/minio/cli"
|
||||
"github.com/minio/minio-go/pkg/set"
|
||||
minio "github.com/minio/minio/cmd"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/auth"
|
||||
"github.com/minio/sha256-simd"
|
||||
)
|
||||
|
||||
const (
|
||||
siaBackend = "sia"
|
||||
)
|
||||
|
||||
type siaObjects struct {
|
||||
minio.GatewayUnsupported
|
||||
Address string // Address and port of Sia Daemon.
|
||||
TempDir string // Temporary storage location for file transfers.
|
||||
RootDir string // Root directory to store files on Sia.
|
||||
password string // Sia password for uploading content in authenticated manner.
|
||||
}
|
||||
|
||||
func init() {
|
||||
const siaGatewayTemplate = `NAME:
|
||||
{{.HelpName}} - {{.Usage}}
|
||||
|
||||
USAGE:
|
||||
{{.HelpName}} {{if .VisibleFlags}}[FLAGS]{{end}} [SIA_DAEMON_ADDR]
|
||||
{{if .VisibleFlags}}
|
||||
FLAGS:
|
||||
{{range .VisibleFlags}}{{.}}
|
||||
{{end}}{{end}}
|
||||
ENVIRONMENT VARIABLES: (Default values in parenthesis)
|
||||
ACCESS:
|
||||
MINIO_ACCESS_KEY: Custom access key (Do not reuse same access keys on all instances)
|
||||
MINIO_SECRET_KEY: Custom secret key (Do not reuse same secret keys on all instances)
|
||||
|
||||
BROWSER:
|
||||
MINIO_BROWSER: To disable web browser access, set this value to "off".
|
||||
|
||||
DOMAIN:
|
||||
MINIO_DOMAIN: To enable virtual-host-style requests, set this value to Minio host domain name.
|
||||
|
||||
CACHE:
|
||||
MINIO_CACHE_DRIVES: List of mounted drives or directories delimited by ";".
|
||||
MINIO_CACHE_EXCLUDE: List of cache exclusion patterns delimited by ";".
|
||||
MINIO_CACHE_EXPIRY: Cache expiry duration in days.
|
||||
MINIO_CACHE_MAXUSE: Maximum permitted usage of the cache in percentage (0-100).
|
||||
|
||||
SIA_TEMP_DIR: The name of the local Sia temporary storage directory. (.sia_temp)
|
||||
SIA_API_PASSWORD: API password for Sia daemon. (default is empty)
|
||||
|
||||
EXAMPLES:
|
||||
1. Start minio gateway server for Sia backend.
|
||||
$ {{.HelpName}}
|
||||
|
||||
2. Start minio gateway server for Sia backend with edge caching enabled.
|
||||
$ export MINIO_CACHE_DRIVES="/mnt/drive1;/mnt/drive2;/mnt/drive3;/mnt/drive4"
|
||||
$ export MINIO_CACHE_EXCLUDE="bucket1/*;*.png"
|
||||
$ export MINIO_CACHE_EXPIRY=40
|
||||
$ export MINIO_CACHE_MAXUSE=80
|
||||
$ {{.HelpName}}
|
||||
`
|
||||
|
||||
minio.RegisterGatewayCommand(cli.Command{
|
||||
Name: siaBackend,
|
||||
Usage: "Sia Decentralized Cloud",
|
||||
Action: siaGatewayMain,
|
||||
CustomHelpTemplate: siaGatewayTemplate,
|
||||
HideHelpCommand: true,
|
||||
})
|
||||
}
|
||||
|
||||
// Handler for 'minio gateway sia' command line.
|
||||
func siaGatewayMain(ctx *cli.Context) {
|
||||
// Validate gateway arguments.
|
||||
host := ctx.Args().First()
|
||||
// Validate gateway arguments.
|
||||
logger.FatalIf(minio.ValidateGatewayArguments(ctx.GlobalString("address"), host), "Invalid argument")
|
||||
|
||||
minio.StartGateway(ctx, &Sia{host})
|
||||
}
|
||||
|
||||
// Sia implements Gateway.
|
||||
type Sia struct {
|
||||
host string // Sia daemon host address
|
||||
}
|
||||
|
||||
// Name implements Gateway interface.
|
||||
func (g *Sia) Name() string {
|
||||
return siaBackend
|
||||
}
|
||||
|
||||
// NewGatewayLayer returns Sia gateway layer, implements ObjectLayer interface to
|
||||
// talk to Sia backend.
|
||||
func (g *Sia) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error) {
|
||||
sia := &siaObjects{
|
||||
Address: g.host,
|
||||
// RootDir uses access key directly, provides partitioning for
|
||||
// concurrent users talking to same sia daemon.
|
||||
RootDir: creds.AccessKey,
|
||||
TempDir: os.Getenv("SIA_TEMP_DIR"),
|
||||
password: os.Getenv("SIA_API_PASSWORD"),
|
||||
}
|
||||
|
||||
// If Address not provided on command line or ENV, default to:
|
||||
if sia.Address == "" {
|
||||
sia.Address = "127.0.0.1:9980"
|
||||
}
|
||||
|
||||
// If local Sia temp directory not specified, default to:
|
||||
if sia.TempDir == "" {
|
||||
sia.TempDir = ".sia_temp"
|
||||
}
|
||||
|
||||
var err error
|
||||
sia.TempDir, err = filepath.Abs(sia.TempDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create the temp directory with proper permissions.
|
||||
// Ignore error when dir already exists.
|
||||
if err = os.MkdirAll(sia.TempDir, 0700); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
colorBlue := color.New(color.FgBlue).SprintfFunc()
|
||||
colorBold := color.New(color.Bold).SprintFunc()
|
||||
|
||||
formatStr := "%" + fmt.Sprintf("%ds", len(sia.Address)+7)
|
||||
logger.StartupMessage(colorBlue("\nSia Configuration:"))
|
||||
logger.StartupMessage(colorBlue(" API Address:") + colorBold(fmt.Sprintf(formatStr, sia.Address)))
|
||||
logger.StartupMessage(colorBlue(" Staging Directory:") + colorBold(fmt.Sprintf(" %s", sia.TempDir)))
|
||||
|
||||
return sia, nil
|
||||
}
|
||||
|
||||
// Production - sia gateway is not ready for production use.
|
||||
func (g *Sia) Production() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// non2xx returns true for non-success HTTP status codes.
|
||||
func non2xx(code int) bool {
|
||||
return code < 200 || code > 299
|
||||
}
|
||||
|
||||
// decodeError returns the api.Error from a API response. This method should
|
||||
// only be called if the response's status code is non-2xx. The error returned
|
||||
// may not be of type api.Error in the event of an error unmarshalling the
|
||||
// JSON.
|
||||
type siaError struct {
|
||||
// Message describes the error in English. Typically it is set to
|
||||
// `err.Error()`. This field is required.
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
func (s siaError) Error() string {
|
||||
return s.Message
|
||||
}
|
||||
|
||||
func decodeError(resp *http.Response) error {
|
||||
// Error is a type that is encoded as JSON and returned in an API response in
|
||||
// the event of an error. Only the Message field is required. More fields may
|
||||
// be added to this struct in the future for better error reporting.
|
||||
var apiErr siaError
|
||||
if err := json.NewDecoder(resp.Body).Decode(&apiErr); err != nil {
|
||||
return err
|
||||
}
|
||||
return apiErr
|
||||
}
|
||||
|
||||
// MethodNotSupported - returned if call returned error.
|
||||
type MethodNotSupported struct {
|
||||
method string
|
||||
}
|
||||
|
||||
func (s MethodNotSupported) Error() string {
|
||||
return fmt.Sprintf("API call not recognized: %s", s.method)
|
||||
}
|
||||
|
||||
// apiGet wraps a GET request with a status code check, such that if the GET does
|
||||
// not return 2xx, the error will be read and returned. The response body is
|
||||
// not closed.
|
||||
func apiGet(ctx context.Context, addr, call, apiPassword string) (*http.Response, error) {
|
||||
req, err := http.NewRequest("GET", "http://"+addr+call, nil)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Set("User-Agent", "Sia-Agent")
|
||||
if apiPassword != "" {
|
||||
req.SetBasicAuth("", apiPassword)
|
||||
}
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return nil, err
|
||||
}
|
||||
if resp.StatusCode == http.StatusNotFound {
|
||||
minio.CloseResponse(resp.Body)
|
||||
logger.LogIf(ctx, MethodNotSupported{call})
|
||||
return nil, MethodNotSupported{call}
|
||||
}
|
||||
if non2xx(resp.StatusCode) {
|
||||
err := decodeError(resp)
|
||||
minio.CloseResponse(resp.Body)
|
||||
logger.LogIf(ctx, err)
|
||||
return nil, err
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// apiPost wraps a POST request with a status code check, such that if the POST
|
||||
// does not return 2xx, the error will be read and returned. The response body
|
||||
// is not closed.
|
||||
func apiPost(ctx context.Context, addr, call, vals, apiPassword string) (*http.Response, error) {
|
||||
req, err := http.NewRequest("POST", "http://"+addr+call, strings.NewReader(vals))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Set("User-Agent", "Sia-Agent")
|
||||
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
||||
if apiPassword != "" {
|
||||
req.SetBasicAuth("", apiPassword)
|
||||
}
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.StatusCode == http.StatusNotFound {
|
||||
minio.CloseResponse(resp.Body)
|
||||
return nil, MethodNotSupported{call}
|
||||
}
|
||||
|
||||
if non2xx(resp.StatusCode) {
|
||||
err := decodeError(resp)
|
||||
minio.CloseResponse(resp.Body)
|
||||
return nil, err
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// post makes an API call and discards the response. An error is returned if
|
||||
// the response status is not 2xx.
|
||||
func post(ctx context.Context, addr, call, vals, apiPassword string) error {
|
||||
resp, err := apiPost(ctx, addr, call, vals, apiPassword)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
minio.CloseResponse(resp.Body)
|
||||
return nil
|
||||
}
|
||||
|
||||
// list makes a lists all the uploaded files, decodes the json response.
|
||||
func list(ctx context.Context, addr string, apiPassword string, obj *renterFiles) error {
|
||||
resp, err := apiGet(ctx, addr, "/renter/files", apiPassword)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer minio.CloseResponse(resp.Body)
|
||||
|
||||
if resp.StatusCode == http.StatusNoContent {
|
||||
logger.LogIf(ctx, fmt.Errorf("Expecting a response, but API returned %s", resp.Status))
|
||||
return fmt.Errorf("Expecting a response, but API returned %s", resp.Status)
|
||||
}
|
||||
err = json.NewDecoder(resp.Body).Decode(obj)
|
||||
logger.LogIf(ctx, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// get makes an API call and discards the response. An error is returned if the
|
||||
// responsee status is not 2xx.
|
||||
func get(ctx context.Context, addr, call, apiPassword string) error {
|
||||
resp, err := apiGet(ctx, addr, call, apiPassword)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
minio.CloseResponse(resp.Body)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown saves any gateway metadata to disk
|
||||
// if necessary and reload upon next restart.
|
||||
func (s *siaObjects) Shutdown(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// StorageInfo is not relevant to Sia backend.
|
||||
func (s *siaObjects) StorageInfo(ctx context.Context) (si minio.StorageInfo) {
|
||||
return si
|
||||
}
|
||||
|
||||
// MakeBucket creates a new container on Sia backend.
|
||||
func (s *siaObjects) MakeBucketWithLocation(ctx context.Context, bucket, location string) error {
|
||||
srcFile := path.Join(s.TempDir, minio.MustGetUUID())
|
||||
defer os.Remove(srcFile)
|
||||
|
||||
writer, err := os.Create(srcFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = io.Copy(writer, bytes.NewReader([]byte(""))); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sha256sum := sha256.Sum256([]byte(bucket))
|
||||
var siaObj = path.Join(s.RootDir, bucket, hex.EncodeToString(sha256sum[:]))
|
||||
return post(ctx, s.Address, "/renter/upload/"+siaObj, "source="+srcFile, s.password)
|
||||
}
|
||||
|
||||
// GetBucketInfo gets bucket metadata.
|
||||
func (s *siaObjects) GetBucketInfo(ctx context.Context, bucket string) (bi minio.BucketInfo, err error) {
|
||||
sha256sum := sha256.Sum256([]byte(bucket))
|
||||
var siaObj = path.Join(s.RootDir, bucket, hex.EncodeToString(sha256sum[:]))
|
||||
|
||||
dstFile := path.Join(s.TempDir, minio.MustGetUUID())
|
||||
defer os.Remove(dstFile)
|
||||
|
||||
if err := get(ctx, s.Address, "/renter/download/"+siaObj+"?destination="+url.QueryEscape(dstFile), s.password); err != nil {
|
||||
return bi, err
|
||||
}
|
||||
return minio.BucketInfo{Name: bucket}, nil
|
||||
}
|
||||
|
||||
// ListBuckets will detect and return existing buckets on Sia.
|
||||
func (s *siaObjects) ListBuckets(ctx context.Context) (buckets []minio.BucketInfo, err error) {
|
||||
sObjs, serr := s.listRenterFiles(ctx, "")
|
||||
if serr != nil {
|
||||
return buckets, serr
|
||||
}
|
||||
|
||||
m := make(set.StringSet)
|
||||
|
||||
prefix := s.RootDir + "/"
|
||||
for _, sObj := range sObjs {
|
||||
if strings.HasPrefix(sObj.SiaPath, prefix) {
|
||||
siaObj := strings.TrimPrefix(sObj.SiaPath, prefix)
|
||||
idx := strings.Index(siaObj, "/")
|
||||
if idx > 0 {
|
||||
m.Add(siaObj[0:idx])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, bktName := range m.ToSlice() {
|
||||
buckets = append(buckets, minio.BucketInfo{
|
||||
Name: bktName,
|
||||
Created: time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
})
|
||||
}
|
||||
|
||||
return buckets, nil
|
||||
}
|
||||
|
||||
// DeleteBucket deletes a bucket on Sia.
|
||||
func (s *siaObjects) DeleteBucket(ctx context.Context, bucket string) error {
|
||||
sha256sum := sha256.Sum256([]byte(bucket))
|
||||
var siaObj = path.Join(s.RootDir, bucket, hex.EncodeToString(sha256sum[:]))
|
||||
|
||||
return post(ctx, s.Address, "/renter/delete/"+siaObj, "", s.password)
|
||||
}
|
||||
|
||||
func (s *siaObjects) ListObjects(ctx context.Context, bucket string, prefix string, marker string, delimiter string, maxKeys int) (loi minio.ListObjectsInfo, err error) {
|
||||
siaObjs, siaErr := s.listRenterFiles(ctx, bucket)
|
||||
if siaErr != nil {
|
||||
return loi, siaErr
|
||||
}
|
||||
|
||||
loi.IsTruncated = false
|
||||
loi.NextMarker = ""
|
||||
|
||||
root := s.RootDir + "/"
|
||||
|
||||
sha256sum := sha256.Sum256([]byte(bucket))
|
||||
// FIXME(harsha) - No paginated output supported for Sia backend right now, only prefix
|
||||
// based filtering. Once list renter files API supports paginated output we can support
|
||||
// paginated results here as well - until then Listing is an expensive operation.
|
||||
for _, sObj := range siaObjs {
|
||||
name := strings.TrimPrefix(sObj.SiaPath, path.Join(root, bucket)+"/")
|
||||
// Skip the file created specially when bucket was created.
|
||||
if name == hex.EncodeToString(sha256sum[:]) {
|
||||
continue
|
||||
}
|
||||
if strings.HasPrefix(name, prefix) {
|
||||
loi.Objects = append(loi.Objects, minio.ObjectInfo{
|
||||
Bucket: bucket,
|
||||
Name: name,
|
||||
Size: int64(sObj.Filesize),
|
||||
IsDir: false,
|
||||
})
|
||||
}
|
||||
}
|
||||
return loi, nil
|
||||
}
|
||||
|
||||
// GetObjectNInfo - returns object info and locked object ReadCloser
|
||||
func (s *siaObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header, lockType minio.LockType, opts minio.ObjectOptions) (gr *minio.GetObjectReader, err error) {
|
||||
var objInfo minio.ObjectInfo
|
||||
objInfo, err = s.GetObjectInfo(ctx, bucket, object, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var startOffset, length int64
|
||||
startOffset, length, err = rs.GetOffsetLength(objInfo.Size)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pr, pw := io.Pipe()
|
||||
go func() {
|
||||
err := s.GetObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag, opts)
|
||||
pw.CloseWithError(err)
|
||||
}()
|
||||
// Setup cleanup function to cause the above go-routine to
|
||||
// exit in case of partial read
|
||||
pipeCloser := func() { pr.Close() }
|
||||
return minio.NewGetObjectReaderFromReader(pr, objInfo, pipeCloser), nil
|
||||
}
|
||||
|
||||
func (s *siaObjects) GetObject(ctx context.Context, bucket string, object string, startOffset int64, length int64, writer io.Writer, etag string, opts minio.ObjectOptions) error {
|
||||
dstFile := path.Join(s.TempDir, minio.MustGetUUID())
|
||||
defer os.Remove(dstFile)
|
||||
|
||||
var siaObj = path.Join(s.RootDir, bucket, object)
|
||||
if err := get(ctx, s.Address, "/renter/download/"+siaObj+"?destination="+url.QueryEscape(dstFile), s.password); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
reader, err := os.Open(dstFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer reader.Close()
|
||||
st, err := reader.Stat()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
size := st.Size()
|
||||
if _, err = reader.Seek(startOffset, os.SEEK_SET); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// For negative length we read everything.
|
||||
if length < 0 {
|
||||
length = size - startOffset
|
||||
}
|
||||
|
||||
bufSize := int64(1 * humanize.MiByte)
|
||||
if bufSize > length {
|
||||
bufSize = length
|
||||
}
|
||||
|
||||
// Reply back invalid range if the input offset and length fall out of range.
|
||||
if startOffset > size || startOffset+length > size {
|
||||
logger.LogIf(ctx, minio.InvalidRange{
|
||||
OffsetBegin: startOffset,
|
||||
OffsetEnd: length,
|
||||
ResourceSize: size,
|
||||
})
|
||||
return minio.InvalidRange{
|
||||
OffsetBegin: startOffset,
|
||||
OffsetEnd: length,
|
||||
ResourceSize: size,
|
||||
}
|
||||
}
|
||||
|
||||
// Allocate a staging buffer.
|
||||
buf := make([]byte, int(bufSize))
|
||||
|
||||
_, err = io.CopyBuffer(writer, io.LimitReader(reader, length), buf)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// findSiaObject retrieves the siaObjectInfo for the Sia object with the given
|
||||
// Sia path name.
|
||||
func (s *siaObjects) findSiaObject(ctx context.Context, bucket, object string) (siaObjectInfo, error) {
|
||||
siaPath := path.Join(s.RootDir, bucket, object)
|
||||
|
||||
sObjs, err := s.listRenterFiles(ctx, "")
|
||||
if err != nil {
|
||||
return siaObjectInfo{}, err
|
||||
}
|
||||
|
||||
for _, sObj := range sObjs {
|
||||
if sObj.SiaPath == siaPath {
|
||||
return sObj, nil
|
||||
}
|
||||
}
|
||||
logger.LogIf(ctx, minio.ObjectNotFound{
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
})
|
||||
return siaObjectInfo{}, minio.ObjectNotFound{
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
}
|
||||
}
|
||||
|
||||
// GetObjectInfo reads object info and replies back ObjectInfo
|
||||
func (s *siaObjects) GetObjectInfo(ctx context.Context, bucket string, object string, opts minio.ObjectOptions) (minio.ObjectInfo, error) {
|
||||
so, err := s.findSiaObject(ctx, bucket, object)
|
||||
if err != nil {
|
||||
return minio.ObjectInfo{}, err
|
||||
}
|
||||
|
||||
// Metadata about sia objects is just quite minimal. Sia only provides file size.
|
||||
return minio.ObjectInfo{
|
||||
Bucket: bucket,
|
||||
Name: object,
|
||||
ModTime: time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
Size: int64(so.Filesize),
|
||||
IsDir: false,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// PutObject creates a new object with the incoming data,
|
||||
func (s *siaObjects) PutObject(ctx context.Context, bucket string, object string, r *minio.PutObjReader, metadata map[string]string, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
|
||||
data := r.Reader
|
||||
srcFile := path.Join(s.TempDir, minio.MustGetUUID())
|
||||
writer, err := os.Create(srcFile)
|
||||
if err != nil {
|
||||
return objInfo, err
|
||||
}
|
||||
|
||||
wsize, err := io.CopyN(writer, data, data.Size())
|
||||
if err != nil {
|
||||
os.Remove(srcFile)
|
||||
return objInfo, err
|
||||
}
|
||||
|
||||
if err = post(ctx, s.Address, "/renter/upload/"+path.Join(s.RootDir, bucket, object), "source="+srcFile, s.password); err != nil {
|
||||
os.Remove(srcFile)
|
||||
return objInfo, err
|
||||
}
|
||||
defer s.deleteTempFileWhenUploadCompletes(ctx, srcFile, bucket, object)
|
||||
|
||||
return minio.ObjectInfo{
|
||||
Name: object,
|
||||
Bucket: bucket,
|
||||
ModTime: time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
Size: wsize,
|
||||
ETag: minio.GenETag(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// DeleteObject deletes a blob in bucket
|
||||
func (s *siaObjects) DeleteObject(ctx context.Context, bucket string, object string) error {
|
||||
// Tell Sia daemon to delete the object
|
||||
var siaObj = path.Join(s.RootDir, bucket, object)
|
||||
return post(ctx, s.Address, "/renter/delete/"+siaObj, "", s.password)
|
||||
}
|
||||
|
||||
// siaObjectInfo represents object info stored on Sia
|
||||
type siaObjectInfo struct {
|
||||
SiaPath string `json:"siapath"`
|
||||
LocalPath string `json:"localpath"`
|
||||
Filesize uint64 `json:"filesize"`
|
||||
Available bool `json:"available"`
|
||||
Renewing bool `json:"renewing"`
|
||||
Redundancy float64 `json:"redundancy"`
|
||||
UploadProgress float64 `json:"uploadprogress"`
|
||||
}
|
||||
|
||||
type renterFiles struct {
|
||||
Files []siaObjectInfo `json:"files"`
|
||||
}
|
||||
|
||||
// listRenterFiles will return a list of existing objects in the bucket provided
|
||||
func (s *siaObjects) listRenterFiles(ctx context.Context, bucket string) (siaObjs []siaObjectInfo, err error) {
|
||||
// Get list of all renter files
|
||||
var rf renterFiles
|
||||
if err = list(ctx, s.Address, s.password, &rf); err != nil {
|
||||
return siaObjs, err
|
||||
}
|
||||
|
||||
var prefix string
|
||||
root := s.RootDir + "/"
|
||||
if bucket == "" {
|
||||
prefix = root
|
||||
} else {
|
||||
prefix = root + bucket + "/"
|
||||
}
|
||||
|
||||
for _, f := range rf.Files {
|
||||
if strings.HasPrefix(f.SiaPath, prefix) {
|
||||
siaObjs = append(siaObjs, f)
|
||||
}
|
||||
}
|
||||
|
||||
return siaObjs, nil
|
||||
}
|
||||
|
||||
// deleteTempFileWhenUploadCompletes checks the status of a Sia file upload
|
||||
// until it reaches 100% upload progress, then deletes the local temp copy from
|
||||
// the filesystem.
|
||||
func (s *siaObjects) deleteTempFileWhenUploadCompletes(ctx context.Context, tempFile string, bucket, object string) {
|
||||
var soi siaObjectInfo
|
||||
// Wait until 100% upload instead of 1x redundancy because if we delete
|
||||
// after 1x redundancy, the user has to pay the cost of other hosts
|
||||
// redistributing the file.
|
||||
for soi.UploadProgress < 100.0 {
|
||||
var err error
|
||||
soi, err = s.findSiaObject(ctx, bucket, object)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
// Sleep between each check so that we're not hammering
|
||||
// the Sia daemon with requests.
|
||||
time.Sleep(15 * time.Second)
|
||||
}
|
||||
|
||||
os.Remove(tempFile)
|
||||
}
|
||||
|
||||
// IsCompressionSupported returns whether compression is applicable for this layer.
|
||||
func (s *siaObjects) IsCompressionSupported() bool {
|
||||
return false
|
||||
}
|
||||
@@ -1,32 +0,0 @@
|
||||
/*
|
||||
* Minio Cloud Storage, (C) 2017 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package sia
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestSianon2xx(t *testing.T) {
|
||||
for i := 0; i < 1000; i++ {
|
||||
actual := non2xx(i)
|
||||
expected := i < 200 || i > 299
|
||||
|
||||
if actual != expected {
|
||||
t.Errorf("Test case %d: non2xx(%d) returned %t but expected %t", i+1, i, actual, expected)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user