mirror of
https://github.com/minio/minio.git
synced 2025-01-11 23:13:23 -05:00
Initial implementation of Google Cloud Storage
This commit is contained in:
parent
4be609eb82
commit
3379f005a5
@ -16,24 +16,117 @@
|
|||||||
|
|
||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import "io"
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func toGCSPublicURL(bucket, object string) string {
|
||||||
|
return fmt.Sprintf("https://storage.googleapis.com/%s/%s", bucket, object)
|
||||||
|
}
|
||||||
|
|
||||||
|
// AnonPutObject creates a new object anonymously with the incoming data,
|
||||||
|
func (l *gcsGateway) AnonPutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string, sha256sum string) (ObjectInfo, error) {
|
||||||
|
|
||||||
|
return ObjectInfo{}, NotImplemented{}
|
||||||
|
}
|
||||||
|
|
||||||
// AnonGetObject - Get object anonymously
|
// AnonGetObject - Get object anonymously
|
||||||
func (l *gcsGateway) AnonGetObject(bucket string, key string, startOffset int64, length int64, writer io.Writer) error {
|
func (l *gcsGateway) AnonGetObject(bucket string, object string, startOffset int64, length int64, writer io.Writer) error {
|
||||||
return NotImplemented{}
|
req, err := http.NewRequest("GET", toGCSPublicURL(bucket, object), nil)
|
||||||
|
if err != nil {
|
||||||
|
return gcsToObjectError(traceError(err), bucket, object)
|
||||||
|
}
|
||||||
|
|
||||||
|
if length > 0 && startOffset > 0 {
|
||||||
|
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", startOffset, startOffset+length-1))
|
||||||
|
} else if startOffset > 0 {
|
||||||
|
req.Header.Add("Range", fmt.Sprintf("bytes=%d-", startOffset))
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := http.DefaultClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return gcsToObjectError(traceError(err), bucket, object)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusPartialContent && resp.StatusCode != http.StatusOK {
|
||||||
|
return gcsToObjectError(traceError(anonErrToObjectErr(resp.StatusCode, bucket, object)), bucket, object)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = io.Copy(writer, resp.Body)
|
||||||
|
return gcsToObjectError(traceError(err), bucket, object)
|
||||||
}
|
}
|
||||||
|
|
||||||
// AnonGetObjectInfo - Get object info anonymously
|
// AnonGetObjectInfo - Get object info anonymously
|
||||||
func (l *gcsGateway) AnonGetObjectInfo(bucket string, object string) (ObjectInfo, error) {
|
func (l *gcsGateway) AnonGetObjectInfo(bucket string, object string) (objInfo ObjectInfo, err error) {
|
||||||
return ObjectInfo{}, NotImplemented{}
|
resp, err := http.Head(toGCSPublicURL(bucket, object))
|
||||||
|
if err != nil {
|
||||||
|
return objInfo, gcsToObjectError(traceError(err), bucket, object)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
fmt.Println(resp.StatusCode)
|
||||||
|
return objInfo, gcsToObjectError(traceError(anonErrToObjectErr(resp.StatusCode, bucket, object)), bucket, object)
|
||||||
|
}
|
||||||
|
|
||||||
|
var contentLength int64
|
||||||
|
contentLengthStr := resp.Header.Get("Content-Length")
|
||||||
|
if contentLengthStr != "" {
|
||||||
|
contentLength, err = strconv.ParseInt(contentLengthStr, 0, 64)
|
||||||
|
if err != nil {
|
||||||
|
return objInfo, gcsToObjectError(traceError(errUnexpected), bucket, object)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t, err := time.Parse(time.RFC1123, resp.Header.Get("Last-Modified"))
|
||||||
|
if err != nil {
|
||||||
|
return objInfo, traceError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
objInfo.ModTime = t
|
||||||
|
objInfo.Bucket = bucket
|
||||||
|
objInfo.UserDefined = make(map[string]string)
|
||||||
|
if resp.Header.Get("Content-Encoding") != "" {
|
||||||
|
objInfo.UserDefined["Content-Encoding"] = resp.Header.Get("Content-Encoding")
|
||||||
|
}
|
||||||
|
objInfo.UserDefined["Content-Type"] = resp.Header.Get("Content-Type")
|
||||||
|
objInfo.MD5Sum = resp.Header.Get("Etag")
|
||||||
|
objInfo.ModTime = t
|
||||||
|
objInfo.Name = object
|
||||||
|
objInfo.Size = contentLength
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// AnonListObjects - List objects anonymously
|
// AnonListObjects - List objects anonymously
|
||||||
func (l *gcsGateway) AnonListObjects(bucket string, prefix string, marker string, delimiter string, maxKeys int) (ListObjectsInfo, error) {
|
func (l *gcsGateway) AnonListObjects(bucket string, prefix string, marker string, delimiter string, maxKeys int) (ListObjectsInfo, error) {
|
||||||
return ListObjectsInfo{}, NotImplemented{}
|
result, err := l.anonClient.ListObjects(bucket, prefix, marker, delimiter, maxKeys)
|
||||||
|
if err != nil {
|
||||||
|
return ListObjectsInfo{}, s3ToObjectError(traceError(err), bucket)
|
||||||
|
}
|
||||||
|
|
||||||
|
return fromMinioClientListBucketResult(bucket, result), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// AnonGetBucketInfo - Get bucket metadata anonymously.
|
// AnonGetBucketInfo - Get bucket metadata anonymously.
|
||||||
func (l *gcsGateway) AnonGetBucketInfo(bucket string) (BucketInfo, error) {
|
func (l *gcsGateway) AnonGetBucketInfo(bucket string) (bucketInfo BucketInfo, err error) {
|
||||||
return BucketInfo{}, NotImplemented{}
|
resp, err := http.Head(toGCSPublicURL(bucket, ""))
|
||||||
|
if err != nil {
|
||||||
|
return bucketInfo, gcsToObjectError(traceError(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
return bucketInfo, gcsToObjectError(traceError(anonErrToObjectErr(resp.StatusCode, bucket)), bucket)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Last-Modified date being returned by GCS
|
||||||
|
return BucketInfo{
|
||||||
|
Name: bucket,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
@ -37,6 +37,17 @@ import (
|
|||||||
"github.com/minio/minio-go/pkg/policy"
|
"github.com/minio/minio-go/pkg/policy"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// ErrNotValidMultipartIdentifier the multipart identifier is not in the correct form
|
||||||
|
ErrNotValidMultipartIdentifier = errors.New("Not a valid multipart identifier")
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// ZZZZMinioPrefix is used for metadata and multiparts. The prefix is being filtered out,
|
||||||
|
// hence the naming of ZZZZ (last prefix)
|
||||||
|
ZZZZMinioPrefix = "ZZZZ-Minio"
|
||||||
|
)
|
||||||
|
|
||||||
// Convert Minio errors to minio object layer errors.
|
// Convert Minio errors to minio object layer errors.
|
||||||
func gcsToObjectError(err error, params ...string) error {
|
func gcsToObjectError(err error, params ...string) error {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
@ -62,8 +73,6 @@ func gcsToObjectError(err error, params ...string) error {
|
|||||||
object = params[1]
|
object = params[1]
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("%+v\n ok=%v code=%v, message=%v body=%v reason=%v bucket=%v object=%v\n", err.Error(), ok, "", "", "", "", bucket, object)
|
|
||||||
|
|
||||||
// in some cases just a plain error is being returned
|
// in some cases just a plain error is being returned
|
||||||
switch err.Error() {
|
switch err.Error() {
|
||||||
case "storage: bucket doesn't exist":
|
case "storage: bucket doesn't exist":
|
||||||
@ -81,17 +90,15 @@ func gcsToObjectError(err error, params ...string) error {
|
|||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
|
|
||||||
googleApiErr, ok := err.(*googleapi.Error)
|
googleAPIErr, ok := err.(*googleapi.Error)
|
||||||
if !ok {
|
if !ok {
|
||||||
// We don't interpret non Minio errors. As minio errors will
|
// We don't interpret non Minio errors. As minio errors will
|
||||||
// have StatusCode to help to convert to object errors.
|
// have StatusCode to help to convert to object errors.
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("%+v\n ok=%v code=%v, message=%v body=%v reason=%v bucket=%v object=%v\n", googleApiErr, ok, googleApiErr.Code, googleApiErr.Message, googleApiErr.Body, googleApiErr.Errors[0].Reason, bucket, object)
|
reason := googleAPIErr.Errors[0].Reason
|
||||||
|
message := googleAPIErr.Errors[0].Message
|
||||||
reason := googleApiErr.Errors[0].Reason
|
|
||||||
message := googleApiErr.Errors[0].Message
|
|
||||||
|
|
||||||
switch reason {
|
switch reason {
|
||||||
case "forbidden":
|
case "forbidden":
|
||||||
@ -104,7 +111,6 @@ func gcsToObjectError(err error, params ...string) error {
|
|||||||
Bucket: bucket,
|
Bucket: bucket,
|
||||||
}
|
}
|
||||||
case "notFound":
|
case "notFound":
|
||||||
fmt.Println(object, bucket)
|
|
||||||
if object != "" {
|
if object != "" {
|
||||||
err = ObjectNotFound{
|
err = ObjectNotFound{
|
||||||
Bucket: bucket,
|
Bucket: bucket,
|
||||||
@ -141,7 +147,6 @@ func gcsToObjectError(err error, params ...string) error {
|
|||||||
// gcsGateway - Implements gateway for Minio and GCS compatible object storage servers.
|
// gcsGateway - Implements gateway for Minio and GCS compatible object storage servers.
|
||||||
type gcsGateway struct {
|
type gcsGateway struct {
|
||||||
client *storage.Client
|
client *storage.Client
|
||||||
Client *minio.Core
|
|
||||||
anonClient *minio.Core
|
anonClient *minio.Core
|
||||||
projectID string
|
projectID string
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
@ -149,8 +154,6 @@ type gcsGateway struct {
|
|||||||
|
|
||||||
// newGCSGateway returns gcs gatewaylayer
|
// newGCSGateway returns gcs gatewaylayer
|
||||||
func newGCSGateway(endpoint string, projectID, secretKey string, secure bool) (GatewayLayer, error) {
|
func newGCSGateway(endpoint string, projectID, secretKey string, secure bool) (GatewayLayer, error) {
|
||||||
fmt.Println(secretKey)
|
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
// Creates a client.
|
// Creates a client.
|
||||||
@ -159,12 +162,16 @@ func newGCSGateway(endpoint string, projectID, secretKey string, secure bool) (G
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
anonClient, err := minio.NewCore("storage.googleapis.com", "", "", secure)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
return &gcsGateway{
|
return &gcsGateway{
|
||||||
client: client,
|
client: client,
|
||||||
projectID: "minio-166400",
|
projectID: "minio-166400",
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
Client: nil,
|
anonClient: anonClient,
|
||||||
anonClient: nil,
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -248,12 +255,6 @@ func (l *gcsGateway) DeleteBucket(bucket string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
|
||||||
// ZZZZ_MINIO is used for metadata and multiparts. The prefix is being filtered out,
|
|
||||||
// hence the naming of ZZZZ (last prefix)
|
|
||||||
ZZZZ_MINIO_PREFIX = "ZZZZ-Minio"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ListObjects - lists all blobs in GCS bucket filtered by prefix
|
// ListObjects - lists all blobs in GCS bucket filtered by prefix
|
||||||
func (l *gcsGateway) ListObjects(bucket string, prefix string, marker string, delimiter string, maxKeys int) (ListObjectsInfo, error) {
|
func (l *gcsGateway) ListObjects(bucket string, prefix string, marker string, delimiter string, maxKeys int) (ListObjectsInfo, error) {
|
||||||
it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: delimiter, Prefix: prefix, Versions: false})
|
it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: delimiter, Prefix: prefix, Versions: false})
|
||||||
@ -274,7 +275,7 @@ func (l *gcsGateway) ListObjects(bucket string, prefix string, marker string, de
|
|||||||
|
|
||||||
attrs, _ := it.Next()
|
attrs, _ := it.Next()
|
||||||
if attrs == nil {
|
if attrs == nil {
|
||||||
} else if attrs.Prefix == ZZZZ_MINIO_PREFIX {
|
} else if attrs.Prefix == ZZZZMinioPrefix {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -292,7 +293,7 @@ func (l *gcsGateway) ListObjects(bucket string, prefix string, marker string, de
|
|||||||
return ListObjectsInfo{}, gcsToObjectError(traceError(err), bucket, prefix)
|
return ListObjectsInfo{}, gcsToObjectError(traceError(err), bucket, prefix)
|
||||||
}
|
}
|
||||||
|
|
||||||
if attrs.Prefix == ZZZZ_MINIO_PREFIX {
|
if attrs.Prefix == ZZZZMinioPrefix {
|
||||||
// we don't return our metadata prefix
|
// we don't return our metadata prefix
|
||||||
continue
|
continue
|
||||||
} else if attrs.Prefix != "" {
|
} else if attrs.Prefix != "" {
|
||||||
@ -405,7 +406,7 @@ func fromGCSObjectInfo(attrs *storage.ObjectAttrs) ObjectInfo {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetObjectInfo - reads object info and replies back ObjectInfo
|
// GetObjectInfo - reads object info and replies back ObjectInfo
|
||||||
func (l *gcsGateway) GetObjectInfo(bucket string, object string) (objInfo ObjectInfo, err error) {
|
func (l *gcsGateway) GetObjectInfo(bucket string, object string) (ObjectInfo, error) {
|
||||||
// if we want to mimic S3 behavior exactly, we need to verify if bucket exists first,
|
// if we want to mimic S3 behavior exactly, we need to verify if bucket exists first,
|
||||||
// otherwise gcs will just return object not exist in case of non-existing bucket
|
// otherwise gcs will just return object not exist in case of non-existing bucket
|
||||||
if _, err := l.client.Bucket(bucket).Attrs(l.ctx); err != nil {
|
if _, err := l.client.Bucket(bucket).Attrs(l.ctx); err != nil {
|
||||||
@ -500,7 +501,7 @@ func (l *gcsGateway) DeleteObject(bucket string, object string) error {
|
|||||||
// ListMultipartUploads - lists all multipart uploads.
|
// ListMultipartUploads - lists all multipart uploads.
|
||||||
func (l *gcsGateway) ListMultipartUploads(bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
|
func (l *gcsGateway) ListMultipartUploads(bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (ListMultipartsInfo, error) {
|
||||||
// TODO: implement prefix and prefixes, how does this work for Multiparts??
|
// TODO: implement prefix and prefixes, how does this work for Multiparts??
|
||||||
prefix = fmt.Sprintf("%s/multipart-", ZZZZ_MINIO_PREFIX)
|
prefix = fmt.Sprintf("%s/multipart-", ZZZZMinioPrefix)
|
||||||
|
|
||||||
it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: delimiter, Prefix: prefix, Versions: false})
|
it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: delimiter, Prefix: prefix, Versions: false})
|
||||||
|
|
||||||
@ -561,11 +562,11 @@ func (l *gcsGateway) ListMultipartUploads(bucket string, prefix string, keyMarke
|
|||||||
func fromGCSMultipartKey(s string) (key, uploadID string, partID int, err error) {
|
func fromGCSMultipartKey(s string) (key, uploadID string, partID int, err error) {
|
||||||
parts := strings.Split(s, "-")
|
parts := strings.Split(s, "-")
|
||||||
if parts[0] != "multipart" {
|
if parts[0] != "multipart" {
|
||||||
return "", "", 0, errors.New("Not a valid multipart identifier.")
|
return "", "", 0, ErrNotValidMultipartIdentifier
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(parts) != 4 {
|
if len(parts) != 4 {
|
||||||
return "", "", 0, errors.New("Not a valid multipart identifier.")
|
return "", "", 0, ErrNotValidMultipartIdentifier
|
||||||
}
|
}
|
||||||
|
|
||||||
key = parts[1]
|
key = parts[1]
|
||||||
@ -584,7 +585,7 @@ func toGCSMultipartKey(key string, uploadID string, partID int) string {
|
|||||||
// explicitly notes that uploaded parts with same number are being overwritten
|
// explicitly notes that uploaded parts with same number are being overwritten
|
||||||
|
|
||||||
// parts are allowed to be numbered from 1 to 10,000 (inclusive)
|
// parts are allowed to be numbered from 1 to 10,000 (inclusive)
|
||||||
return fmt.Sprintf("%s/multipart-%s-%s-%05d", ZZZZ_MINIO_PREFIX, key, uploadID, partID)
|
return fmt.Sprintf("%s/multipart-%s-%s-%05d", ZZZZMinioPrefix, key, uploadID, partID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewMultipartUpload - upload object in multiple parts
|
// NewMultipartUpload - upload object in multiple parts
|
||||||
@ -629,7 +630,7 @@ func (l *gcsGateway) PutObjectPart(bucket string, key string, uploadID string, p
|
|||||||
func (l *gcsGateway) ListObjectParts(bucket string, key string, uploadID string, partNumberMarker int, maxParts int) (ListPartsInfo, error) {
|
func (l *gcsGateway) ListObjectParts(bucket string, key string, uploadID string, partNumberMarker int, maxParts int) (ListPartsInfo, error) {
|
||||||
// TODO: support partNumberMarker
|
// TODO: support partNumberMarker
|
||||||
|
|
||||||
prefix := fmt.Sprintf("%s/multipart-%s-%s", ZZZZ_MINIO_PREFIX, key, uploadID)
|
prefix := fmt.Sprintf("%s/multipart-%s-%s", ZZZZMinioPrefix, key, uploadID)
|
||||||
delimiter := "/"
|
delimiter := "/"
|
||||||
|
|
||||||
it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: delimiter, Prefix: prefix, Versions: false})
|
it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{Delimiter: delimiter, Prefix: prefix, Versions: false})
|
||||||
@ -682,7 +683,7 @@ func (l *gcsGateway) ListObjectParts(bucket string, key string, uploadID string,
|
|||||||
|
|
||||||
// AbortMultipartUpload aborts a ongoing multipart upload
|
// AbortMultipartUpload aborts a ongoing multipart upload
|
||||||
func (l *gcsGateway) AbortMultipartUpload(bucket string, key string, uploadID string) error {
|
func (l *gcsGateway) AbortMultipartUpload(bucket string, key string, uploadID string) error {
|
||||||
prefix := fmt.Sprintf("%s/multipart-%s-%s", ZZZZ_MINIO_PREFIX, key, uploadID)
|
prefix := fmt.Sprintf("%s/multipart-%s-%s", ZZZZMinioPrefix, key, uploadID)
|
||||||
delimiter := "/"
|
delimiter := "/"
|
||||||
|
|
||||||
// delete part zero, ignoring errors here, we want to clean up all remains
|
// delete part zero, ignoring errors here, we want to clean up all remains
|
||||||
|
Loading…
Reference in New Issue
Block a user