/* * MinIO Object Storage (c) 2021 MinIO, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package gcs import ( "context" "encoding/base64" "encoding/hex" "encoding/json" "errors" "fmt" "io" "io/ioutil" "math" "net/http" "os" "path" "regexp" "strconv" "strings" "time" "cloud.google.com/go/storage" humanize "github.com/dustin/go-humanize" "github.com/minio/cli" "github.com/minio/madmin-go" miniogopolicy "github.com/minio/minio-go/v7/pkg/policy" minio "github.com/minio/minio/cmd" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/bucket/policy" "github.com/minio/pkg/bucket/policy/condition" "github.com/minio/pkg/env" "google.golang.org/api/googleapi" "google.golang.org/api/iterator" "google.golang.org/api/option" ) var ( // Project ID format is not valid. errGCSInvalidProjectID = fmt.Errorf("GCS project id is either empty or invalid") // Project ID not found errGCSProjectIDNotFound = fmt.Errorf("Unknown project id") // Invalid format. errGCSFormat = fmt.Errorf("Unknown format") ) const ( // Path where multipart objects are saved. // If we change the backend format we will use a different url path like /multipart/v2 // but we will not migrate old data. gcsMinioMultipartPathV1 = minio.GatewayMinioSysTmp + "multipart/v1" // Multipart meta file. gcsMinioMultipartMeta = "gcs.json" // gcs.json version number gcsMinioMultipartMetaCurrentVersion = "1" // token prefixed with GCS returned marker to differentiate // from user supplied marker. gcsTokenPrefix = "{minio}" // Maximum component object count to create a composite object. // Refer https://cloud.google.com/storage/docs/composite-objects gcsMaxComponents = 32 // Every 24 hours we scan minio.sys.tmp to delete expired multiparts in minio.sys.tmp gcsCleanupInterval = time.Hour * 24 // The cleanup routine deletes files older than 2 weeks in minio.sys.tmp gcsMultipartExpiry = time.Hour * 24 * 14 // Project ID key in credentials.json gcsProjectIDKey = "project_id" ) func init() { const gcsGatewayTemplate = `NAME: {{.HelpName}} - {{.Usage}} USAGE: {{.HelpName}} {{if .VisibleFlags}}[FLAGS]{{end}} [PROJECTID] {{if .VisibleFlags}} FLAGS: {{range .VisibleFlags}}{{.}} {{end}}{{end}} PROJECTID: optional GCS project-id expected GOOGLE_APPLICATION_CREDENTIALS env is not set GOOGLE_APPLICATION_CREDENTIALS: path to credentials.json, generated it from here https://developers.google.com/identity/protocols/application-default-credentials EXAMPLES: 1. Start minio gateway server for GCS backend {{.Prompt}} {{.EnvVarSetCommand}} GOOGLE_APPLICATION_CREDENTIALS{{.AssignmentOperator}}/path/to/credentials.json {{.Prompt}} {{.EnvVarSetCommand}} MINIO_ROOT_USER{{.AssignmentOperator}}accesskey {{.Prompt}} {{.EnvVarSetCommand}} MINIO_ROOT_PASSWORD{{.AssignmentOperator}}secretkey {{.Prompt}} {{.HelpName}} mygcsprojectid 2. Start minio gateway server for GCS backend with edge caching enabled {{.Prompt}} {{.EnvVarSetCommand}} GOOGLE_APPLICATION_CREDENTIALS{{.AssignmentOperator}}/path/to/credentials.json {{.Prompt}} {{.EnvVarSetCommand}} MINIO_ROOT_USER{{.AssignmentOperator}}accesskey {{.Prompt}} {{.EnvVarSetCommand}} MINIO_ROOT_PASSWORD{{.AssignmentOperator}}secretkey {{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_DRIVES{{.AssignmentOperator}}"/mnt/drive1,/mnt/drive2,/mnt/drive3,/mnt/drive4" {{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_EXCLUDE{{.AssignmentOperator}}"bucket1/*;*.png" {{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_AFTER{{.AssignmentOperator}}3 {{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_WATERMARK_LOW{{.AssignmentOperator}}75 {{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_WATERMARK_HIGH{{.AssignmentOperator}}85 {{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_QUOTA{{.AssignmentOperator}}90 {{.Prompt}} {{.HelpName}} mygcsprojectid ` minio.RegisterGatewayCommand(cli.Command{ Name: minio.GCSBackendGateway, Usage: "Google Cloud Storage", Action: gcsGatewayMain, CustomHelpTemplate: gcsGatewayTemplate, HideHelpCommand: true, }) } // Handler for 'minio gateway gcs' command line. func gcsGatewayMain(ctx *cli.Context) { projectID := ctx.Args().First() if projectID == "" && os.Getenv("GOOGLE_APPLICATION_CREDENTIALS") == "" { logger.LogIf(minio.GlobalContext, errGCSProjectIDNotFound, logger.Application) cli.ShowCommandHelpAndExit(ctx, minio.GCSBackendGateway, 1) } if projectID != "" && !isValidGCSProjectIDFormat(projectID) { reqInfo := (&logger.ReqInfo{}).AppendTags("projectID", ctx.Args().First()) contxt := logger.SetReqInfo(minio.GlobalContext, reqInfo) logger.LogIf(contxt, errGCSInvalidProjectID, logger.Application) cli.ShowCommandHelpAndExit(ctx, minio.GCSBackendGateway, 1) } minio.StartGateway(ctx, &GCS{projectID}) } // GCS implements Azure. type GCS struct { projectID string } // Name returns the name of gcs ObjectLayer. func (g *GCS) Name() string { return minio.GCSBackendGateway } // NewGatewayLayer returns gcs ObjectLayer. func (g *GCS) NewGatewayLayer(creds madmin.Credentials) (minio.ObjectLayer, error) { ctx := minio.GlobalContext var err error if g.projectID == "" { // If project ID is not provided on command line, we figure it out // from the credentials.json file. g.projectID, err = gcsParseProjectID(env.Get("GOOGLE_APPLICATION_CREDENTIALS", "")) if err != nil { return nil, err } } metrics := minio.NewMetrics() t := &minio.MetricsTransport{ Transport: minio.NewGatewayHTTPTransport(), Metrics: metrics, } // Initialize a GCS client. // Send user-agent in this format for Google to obtain usage insights while participating in the // Google Cloud Technology Partners (https://cloud.google.com/partners/) client, err := storage.NewClient(ctx, option.WithUserAgent(fmt.Sprintf("MinIO/%s (GPN:MinIO;)", minio.Version))) if err != nil { return nil, err } gcs := &gcsGateway{ client: client, projectID: g.projectID, metrics: metrics, httpClient: &http.Client{ Transport: t, }, } // Start background process to cleanup old files in minio.sys.tmp go gcs.CleanupGCSMinioSysTmp(ctx) return gcs, nil } // Stored in gcs.json - Contents of this file is not used anywhere. It can be // used for debugging purposes. type gcsMultipartMetaV1 struct { Version string `json:"version"` // Version number Bucket string `json:"bucket"` // Bucket name Object string `json:"object"` // Object name } // Returns name of the multipart meta object. func gcsMultipartMetaName(uploadID string) string { return fmt.Sprintf("%s/%s/%s", gcsMinioMultipartPathV1, uploadID, gcsMinioMultipartMeta) } // Returns name of the part object. func gcsMultipartDataName(uploadID string, partNumber int, etag string) string { return fmt.Sprintf("%s/%s/%05d.%s", gcsMinioMultipartPathV1, uploadID, partNumber, etag) } // Convert MinIO errors to minio object layer errors. func gcsToObjectError(err error, params ...string) error { if err == nil { return nil } bucket := "" object := "" uploadID := "" if len(params) >= 1 { bucket = params[0] } if len(params) == 2 { object = params[1] } if len(params) == 3 { uploadID = params[2] } // in some cases just a plain error is being returned switch err.Error() { case "storage: bucket doesn't exist": err = minio.BucketNotFound{ Bucket: bucket, } return err case "storage: object doesn't exist": if uploadID != "" { err = minio.InvalidUploadID{ UploadID: uploadID, } } else { err = minio.ObjectNotFound{ Bucket: bucket, Object: object, } } return err } googleAPIErr, ok := err.(*googleapi.Error) if !ok { // We don't interpret non MinIO errors. As minio errors will // have StatusCode to help to convert to object errors. return err } if len(googleAPIErr.Errors) == 0 { return err } reason := googleAPIErr.Errors[0].Reason message := googleAPIErr.Errors[0].Message switch reason { case "required": // Anonymous users does not have storage.xyz access to project 123. fallthrough case "keyInvalid": fallthrough case "forbidden": err = minio.PrefixAccessDenied{ Bucket: bucket, Object: object, } case "invalid": err = minio.BucketNameInvalid{ Bucket: bucket, } case "notFound": if object != "" { err = minio.ObjectNotFound{ Bucket: bucket, Object: object, } break } err = minio.BucketNotFound{Bucket: bucket} case "conflict": if message == "You already own this bucket. Please select another name." { err = minio.BucketAlreadyOwnedByYou{Bucket: bucket} break } if message == "Sorry, that name is not available. Please try a different one." { err = minio.BucketAlreadyExists{Bucket: bucket} break } err = minio.BucketNotEmpty{Bucket: bucket} } return err } // gcsProjectIDRegex defines a valid gcs project id format var gcsProjectIDRegex = regexp.MustCompile("^[a-z][a-z0-9-]{5,29}$") // isValidGCSProjectIDFormat - checks if a given project id format is valid or not. // Project IDs must start with a lowercase letter and can have lowercase ASCII letters, // digits or hyphens. Project IDs must be between 6 and 30 characters. // Ref: https://cloud.google.com/resource-manager/reference/rest/v1/projects#Project (projectId section) func isValidGCSProjectIDFormat(projectID string) bool { // Checking projectID format return gcsProjectIDRegex.MatchString(projectID) } // gcsGateway - Implements gateway for MinIO and GCS compatible object storage servers. type gcsGateway struct { minio.GatewayUnsupported client *storage.Client httpClient *http.Client metrics *minio.BackendMetrics projectID string } // Returns projectID from the GOOGLE_APPLICATION_CREDENTIALS file. func gcsParseProjectID(credsFile string) (projectID string, err error) { contents, err := ioutil.ReadFile(credsFile) if err != nil { return projectID, err } googleCreds := make(map[string]string) if err = json.Unmarshal(contents, &googleCreds); err != nil { return projectID, err } return googleCreds[gcsProjectIDKey], err } // GetMetrics returns this gateway's metrics func (l *gcsGateway) GetMetrics(ctx context.Context) (*minio.BackendMetrics, error) { return l.metrics, nil } // Cleanup old files in minio.sys.tmp of the given bucket. func (l *gcsGateway) CleanupGCSMinioSysTmpBucket(ctx context.Context, bucket string) { it := l.client.Bucket(bucket).Objects(ctx, &storage.Query{Prefix: minio.GatewayMinioSysTmp, Versions: false}) for { attrs, err := it.Next() if err != nil { if err != iterator.Done { reqInfo := &logger.ReqInfo{BucketName: bucket} ctx := logger.SetReqInfo(minio.GlobalContext, reqInfo) logger.LogIf(ctx, err) } return } if time.Since(attrs.Updated) > gcsMultipartExpiry { // Delete files older than 2 weeks. err := l.client.Bucket(bucket).Object(attrs.Name).Delete(ctx) if err != nil { reqInfo := &logger.ReqInfo{BucketName: bucket, ObjectName: attrs.Name} ctx := logger.SetReqInfo(minio.GlobalContext, reqInfo) logger.LogIf(ctx, err) return } } } } // Cleanup old files in minio.sys.tmp of all buckets. func (l *gcsGateway) CleanupGCSMinioSysTmp(ctx context.Context) { for { it := l.client.Buckets(ctx, l.projectID) for { attrs, err := it.Next() if err != nil { break } l.CleanupGCSMinioSysTmpBucket(ctx, attrs.Name) } // Run the cleanup loop every 1 day. time.Sleep(gcsCleanupInterval) } } // Shutdown - save any gateway metadata to disk // if necessary and reload upon next restart. func (l *gcsGateway) Shutdown(ctx context.Context) error { return nil } // StorageInfo - Not relevant to GCS backend. func (l *gcsGateway) StorageInfo(ctx context.Context) (si minio.StorageInfo, _ []error) { si.Backend.Type = madmin.Gateway si.Backend.GatewayOnline = minio.IsBackendOnline(ctx, "storage.googleapis.com:443") return si, nil } // MakeBucketWithLocation - Create a new container on GCS backend. func (l *gcsGateway) MakeBucketWithLocation(ctx context.Context, bucket string, opts minio.BucketOptions) error { if opts.LockEnabled || opts.VersioningEnabled { return minio.NotImplemented{} } bkt := l.client.Bucket(bucket) // we'll default to the us multi-region in case of us-east-1 location := opts.Location if location == "us-east-1" { location = "us" } err := bkt.Create(ctx, l.projectID, &storage.BucketAttrs{ Location: location, }) logger.LogIf(ctx, err) return gcsToObjectError(err, bucket) } // GetBucketInfo - Get bucket metadata.. func (l *gcsGateway) GetBucketInfo(ctx context.Context, bucket string) (minio.BucketInfo, error) { attrs, err := l.client.Bucket(bucket).Attrs(ctx) if err != nil { logger.LogIf(ctx, err) return minio.BucketInfo{}, gcsToObjectError(err, bucket) } return minio.BucketInfo{ Name: attrs.Name, Created: attrs.Created, }, nil } // ListBuckets lists all buckets under your project-id on GCS. func (l *gcsGateway) ListBuckets(ctx context.Context) (buckets []minio.BucketInfo, err error) { it := l.client.Buckets(ctx, l.projectID) // Iterate and capture all the buckets. for { attrs, ierr := it.Next() if ierr == iterator.Done { break } if ierr != nil { return buckets, gcsToObjectError(ierr) } buckets = append(buckets, minio.BucketInfo{ Name: attrs.Name, Created: attrs.Created, }) } return buckets, nil } // DeleteBucket delete a bucket on GCS. func (l *gcsGateway) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error { itObject := l.client.Bucket(bucket).Objects(ctx, &storage.Query{ Delimiter: minio.SlashSeparator, Versions: false, }) // We list the bucket and if we find any objects we return BucketNotEmpty error. If we // find only "minio.sys.tmp/" then we remove it before deleting the bucket. gcsMinioPathFound := false nonGCSMinioPathFound := false for { objAttrs, err := itObject.Next() if err == iterator.Done { break } if err != nil { logger.LogIf(ctx, err) return gcsToObjectError(err) } if objAttrs.Prefix == minio.GatewayMinioSysTmp { gcsMinioPathFound = true continue } nonGCSMinioPathFound = true break } if nonGCSMinioPathFound { logger.LogIf(ctx, minio.BucketNotEmpty{}) return gcsToObjectError(minio.BucketNotEmpty{}) } if gcsMinioPathFound { // Remove minio.sys.tmp before deleting the bucket. itObject = l.client.Bucket(bucket).Objects(ctx, &storage.Query{Versions: false, Prefix: minio.GatewayMinioSysTmp}) for { objAttrs, err := itObject.Next() if err == iterator.Done { break } if err != nil { logger.LogIf(ctx, err) return gcsToObjectError(err) } err = l.client.Bucket(bucket).Object(objAttrs.Name).Delete(ctx) if err != nil { logger.LogIf(ctx, err) return gcsToObjectError(err) } } } err := l.client.Bucket(bucket).Delete(ctx) logger.LogIf(ctx, err) return gcsToObjectError(err, bucket) } func toGCSPageToken(name string) string { length := uint16(len(name)) b := []byte{ 0xa, byte(length & 0xFF), } length = length >> 7 if length > 0 { b = append(b, byte(length&0xFF)) } b = append(b, []byte(name)...) return base64.StdEncoding.EncodeToString(b) } // Returns true if marker was returned by GCS, i.e prefixed with // ##minio by minio gcs minio. func isGCSMarker(marker string) bool { return strings.HasPrefix(marker, gcsTokenPrefix) } // ListObjects - lists all blobs in GCS bucket filtered by prefix func (l *gcsGateway) ListObjects(ctx context.Context, bucket string, prefix string, marker string, delimiter string, maxKeys int) (minio.ListObjectsInfo, error) { if maxKeys == 0 { return minio.ListObjectsInfo{}, nil } it := l.client.Bucket(bucket).Objects(ctx, &storage.Query{ Delimiter: delimiter, Prefix: prefix, Versions: false, }) // To accommodate S3-compatible applications using // ListObjectsV1 to use object keys as markers to control the // listing of objects, we use the following encoding scheme to // distinguish between GCS continuation tokens and application // supplied markers. // // - NextMarker in ListObjectsV1 response is constructed by // prefixing "{minio}" to the GCS continuation token, // e.g, "{minio}CgRvYmoz" // // - Application supplied markers are transformed to a // GCS continuation token. // If application is using GCS continuation token we should // strip the gcsTokenPrefix we added. token := "" if marker != "" { if isGCSMarker(marker) { token = strings.TrimPrefix(marker, gcsTokenPrefix) } else { token = toGCSPageToken(marker) } } nextMarker := "" var prefixes []string var objects []minio.ObjectInfo var nextPageToken string var err error pager := iterator.NewPager(it, maxKeys, token) for { gcsObjects := make([]*storage.ObjectAttrs, 0) nextPageToken, err = pager.NextPage(&gcsObjects) if err != nil { logger.LogIf(ctx, err) return minio.ListObjectsInfo{}, gcsToObjectError(err, bucket, prefix) } for _, attrs := range gcsObjects { // Due to minio.GatewayMinioSysTmp keys being skipped, the number of objects + prefixes // returned may not total maxKeys. This behavior is compatible with the S3 spec which // allows the response to include less keys than maxKeys. if attrs.Prefix == minio.GatewayMinioSysTmp { // We don't return our metadata prefix. continue } if !strings.HasPrefix(prefix, minio.GatewayMinioSysTmp) { // If client lists outside gcsMinioPath then we filter out gcsMinioPath/* entries. // But if the client lists inside gcsMinioPath then we return the entries in gcsMinioPath/ // which will be helpful to observe the "directory structure" for debugging purposes. if strings.HasPrefix(attrs.Prefix, minio.GatewayMinioSysTmp) || strings.HasPrefix(attrs.Name, minio.GatewayMinioSysTmp) { continue } } if attrs.Prefix != "" { prefixes = append(prefixes, attrs.Prefix) } else { objects = append(objects, fromGCSAttrsToObjectInfo(attrs)) } // The NextMarker property should only be set in the response if a delimiter is used if delimiter != "" { if attrs.Prefix > nextMarker { nextMarker = attrs.Prefix } else if attrs.Name > nextMarker { nextMarker = attrs.Name } } } // Exit the loop if at least one item can be returned from // the current page or there are no more pages available if nextPageToken == "" || len(prefixes)+len(objects) > 0 { break } } if nextPageToken == "" { nextMarker = "" } else if nextMarker != "" { nextMarker = gcsTokenPrefix + toGCSPageToken(nextMarker) } return minio.ListObjectsInfo{ IsTruncated: nextPageToken != "", NextMarker: nextMarker, Prefixes: prefixes, Objects: objects, }, nil } // ListObjectsV2 - lists all blobs in GCS bucket filtered by prefix func (l *gcsGateway) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (minio.ListObjectsV2Info, error) { if maxKeys == 0 { return minio.ListObjectsV2Info{ContinuationToken: continuationToken}, nil } it := l.client.Bucket(bucket).Objects(ctx, &storage.Query{ Delimiter: delimiter, Prefix: prefix, Versions: false, }) token := continuationToken if token == "" && startAfter != "" { token = toGCSPageToken(startAfter) } var prefixes []string var objects []minio.ObjectInfo var nextPageToken string var err error pager := iterator.NewPager(it, maxKeys, token) for { gcsObjects := make([]*storage.ObjectAttrs, 0) nextPageToken, err = pager.NextPage(&gcsObjects) if err != nil { logger.LogIf(ctx, err) return minio.ListObjectsV2Info{}, gcsToObjectError(err, bucket, prefix) } for _, attrs := range gcsObjects { // Due to minio.GatewayMinioSysTmp keys being skipped, the number of objects + prefixes // returned may not total maxKeys. This behavior is compatible with the S3 spec which // allows the response to include less keys than maxKeys. if attrs.Prefix == minio.GatewayMinioSysTmp { // We don't return our metadata prefix. continue } if !strings.HasPrefix(prefix, minio.GatewayMinioSysTmp) { // If client lists outside gcsMinioPath then we filter out gcsMinioPath/* entries. // But if the client lists inside gcsMinioPath then we return the entries in gcsMinioPath/ // which will be helpful to observe the "directory structure" for debugging purposes. if strings.HasPrefix(attrs.Prefix, minio.GatewayMinioSysTmp) || strings.HasPrefix(attrs.Name, minio.GatewayMinioSysTmp) { continue } } if attrs.Prefix != "" { prefixes = append(prefixes, attrs.Prefix) } else { objects = append(objects, fromGCSAttrsToObjectInfo(attrs)) } } // Exit the loop if at least one item can be returned from // the current page or there are no more pages available if nextPageToken == "" || len(prefixes)+len(objects) > 0 { break } } return minio.ListObjectsV2Info{ IsTruncated: nextPageToken != "", ContinuationToken: continuationToken, NextContinuationToken: nextPageToken, Prefixes: prefixes, Objects: objects, }, nil } // GetObjectNInfo - returns object info and locked object ReadCloser func (l *gcsGateway) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header, lockType minio.LockType, opts minio.ObjectOptions) (gr *minio.GetObjectReader, err error) { var objInfo minio.ObjectInfo objInfo, err = l.GetObjectInfo(ctx, bucket, object, opts) if err != nil { return nil, err } var startOffset, length int64 startOffset, length, err = rs.GetOffsetLength(objInfo.Size) if err != nil { return nil, err } pr, pw := io.Pipe() go func() { err := l.getObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag, opts) pw.CloseWithError(err) }() // Setup cleanup function to cause the above go-routine to // exit in case of partial read pipeCloser := func() { pr.Close() } return minio.NewGetObjectReaderFromReader(pr, objInfo, opts, pipeCloser) } // GetObject - reads an object from GCS. Supports additional // parameters like offset and length which are synonymous with // HTTP Range requests. // // startOffset indicates the starting read location of the object. // length indicates the total length of the object. func (l *gcsGateway) getObject(ctx context.Context, bucket string, key string, startOffset int64, length int64, writer io.Writer, etag string, opts minio.ObjectOptions) error { // if we want to mimic S3 behavior exactly, we need to verify if bucket exists first, // otherwise gcs will just return object not exist in case of non-existing bucket if _, err := l.client.Bucket(bucket).Attrs(ctx); err != nil { logger.LogIf(ctx, err, logger.Application) return gcsToObjectError(err, bucket) } // GCS storage decompresses a gzipped object by default and returns the data. // Refer to https://cloud.google.com/storage/docs/transcoding#decompressive_transcoding // Need to set `Accept-Encoding` header to `gzip` when issuing a GetObject call, to be able // to download the object in compressed state. // Calling ReadCompressed with true accomplishes that. object := l.client.Bucket(bucket).Object(key).ReadCompressed(true) r, err := object.NewRangeReader(ctx, startOffset, length) if err != nil { logger.LogIf(ctx, err, logger.Application) return gcsToObjectError(err, bucket, key) } defer r.Close() if _, err := io.Copy(writer, r); err != nil { logger.LogIf(ctx, err) return gcsToObjectError(err, bucket, key) } return nil } // fromGCSAttrsToObjectInfo converts GCS BucketAttrs to gateway ObjectInfo func fromGCSAttrsToObjectInfo(attrs *storage.ObjectAttrs) minio.ObjectInfo { // All google cloud storage objects have a CRC32c hash, whereas composite objects may not have a MD5 hash // Refer https://cloud.google.com/storage/docs/hashes-etags. Use CRC32C for ETag metadata := make(map[string]string) var ( expiry time.Time e error ) for k, v := range attrs.Metadata { k = http.CanonicalHeaderKey(k) // Translate the GCS custom metadata prefix if strings.HasPrefix(k, "X-Goog-Meta-") { k = strings.Replace(k, "X-Goog-Meta-", "X-Amz-Meta-", 1) } if k == "Expires" { if expiry, e = time.Parse(http.TimeFormat, v); e == nil { expiry = expiry.UTC() } continue } metadata[k] = v } if attrs.ContentType != "" { metadata["Content-Type"] = attrs.ContentType } if attrs.ContentEncoding != "" { metadata["Content-Encoding"] = attrs.ContentEncoding } if attrs.CacheControl != "" { metadata["Cache-Control"] = attrs.CacheControl } if attrs.ContentDisposition != "" { metadata["Content-Disposition"] = attrs.ContentDisposition } if attrs.ContentLanguage != "" { metadata["Content-Language"] = attrs.ContentLanguage } etag := hex.EncodeToString(attrs.MD5) if etag == "" { etag = minio.ToS3ETag(fmt.Sprintf("%d", attrs.CRC32C)) } return minio.ObjectInfo{ Name: attrs.Name, Bucket: attrs.Bucket, ModTime: attrs.Updated, Size: attrs.Size, ETag: etag, UserDefined: metadata, ContentType: attrs.ContentType, ContentEncoding: attrs.ContentEncoding, Expires: expiry, } } // applyMetadataToGCSAttrs applies metadata to a GCS ObjectAttrs instance func applyMetadataToGCSAttrs(metadata map[string]string, attrs *storage.ObjectAttrs) { attrs.Metadata = make(map[string]string) for k, v := range metadata { k = http.CanonicalHeaderKey(k) switch { case strings.HasPrefix(k, "X-Amz-Meta-"): // Translate the S3 user-defined metadata prefix k = strings.Replace(k, "X-Amz-Meta-", "x-goog-meta-", 1) attrs.Metadata[k] = v case k == "Content-Type": attrs.ContentType = v case k == "Content-Encoding": attrs.ContentEncoding = v case k == "Cache-Control": attrs.CacheControl = v case k == "Content-Disposition": attrs.ContentDisposition = v case k == "Content-Language": attrs.ContentLanguage = v } } } // GetObjectInfo - reads object info and replies back ObjectInfo func (l *gcsGateway) GetObjectInfo(ctx context.Context, bucket string, object string, opts minio.ObjectOptions) (minio.ObjectInfo, error) { // if we want to mimic S3 behavior exactly, we need to verify if bucket exists first, // otherwise gcs will just return object not exist in case of non-existing bucket if _, err := l.client.Bucket(bucket).Attrs(ctx); err != nil { logger.LogIf(ctx, err, logger.Application) return minio.ObjectInfo{}, gcsToObjectError(err, bucket) } attrs, err := l.client.Bucket(bucket).Object(object).Attrs(ctx) if err != nil { logger.LogIf(ctx, err) return minio.ObjectInfo{}, gcsToObjectError(err, bucket, object) } return fromGCSAttrsToObjectInfo(attrs), nil } // PutObject - Create a new object with the incoming data, func (l *gcsGateway) PutObject(ctx context.Context, bucket string, key string, r *minio.PutObjReader, opts minio.ObjectOptions) (minio.ObjectInfo, error) { data := r.Reader nctx, cancel := context.WithCancel(ctx) defer cancel() // if we want to mimic S3 behavior exactly, we need to verify if bucket exists first, // otherwise gcs will just return object not exist in case of non-existing bucket if _, err := l.client.Bucket(bucket).Attrs(nctx); err != nil { logger.LogIf(ctx, err, logger.Application) return minio.ObjectInfo{}, gcsToObjectError(err, bucket) } object := l.client.Bucket(bucket).Object(key) w := object.NewWriter(nctx) // Disable "chunked" uploading in GCS client if the size of the data to be uploaded is below // the current chunk-size of the writer. This avoids an unnecessary memory allocation. if data.Size() < int64(w.ChunkSize) { w.ChunkSize = 0 } applyMetadataToGCSAttrs(opts.UserDefined, &w.ObjectAttrs) if _, err := io.Copy(w, data); err != nil { // Close the object writer upon error. logger.LogIf(ctx, err) return minio.ObjectInfo{}, gcsToObjectError(err, bucket, key) } // Close the object writer upon success. if err := w.Close(); err != nil { logger.LogIf(ctx, err) return minio.ObjectInfo{}, gcsToObjectError(err, bucket, key) } return fromGCSAttrsToObjectInfo(w.Attrs()), nil } // CopyObject - Copies a blob from source container to destination container. func (l *gcsGateway) CopyObject(ctx context.Context, srcBucket string, srcObject string, destBucket string, destObject string, srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions) (minio.ObjectInfo, error) { if srcOpts.CheckPrecondFn != nil && srcOpts.CheckPrecondFn(srcInfo) { return minio.ObjectInfo{}, minio.PreConditionFailed{} } src := l.client.Bucket(srcBucket).Object(srcObject) dst := l.client.Bucket(destBucket).Object(destObject) copier := dst.CopierFrom(src) applyMetadataToGCSAttrs(srcInfo.UserDefined, &copier.ObjectAttrs) attrs, err := copier.Run(ctx) if err != nil { logger.LogIf(ctx, err) return minio.ObjectInfo{}, gcsToObjectError(err, destBucket, destObject) } return fromGCSAttrsToObjectInfo(attrs), nil } // DeleteObject - Deletes a blob in bucket func (l *gcsGateway) DeleteObject(ctx context.Context, bucket string, object string, opts minio.ObjectOptions) (minio.ObjectInfo, error) { err := l.client.Bucket(bucket).Object(object).Delete(ctx) if err != nil { logger.LogIf(ctx, err) return minio.ObjectInfo{}, gcsToObjectError(err, bucket, object) } return minio.ObjectInfo{ Bucket: bucket, Name: object, }, nil } func (l *gcsGateway) DeleteObjects(ctx context.Context, bucket string, objects []minio.ObjectToDelete, opts minio.ObjectOptions) ([]minio.DeletedObject, []error) { errs := make([]error, len(objects)) dobjects := make([]minio.DeletedObject, len(objects)) for idx, object := range objects { _, errs[idx] = l.DeleteObject(ctx, bucket, object.ObjectName, opts) if errs[idx] == nil { dobjects[idx] = minio.DeletedObject{ ObjectName: object.ObjectName, } } } return dobjects, errs } // NewMultipartUpload - upload object in multiple parts func (l *gcsGateway) NewMultipartUpload(ctx context.Context, bucket string, key string, o minio.ObjectOptions) (uploadID string, err error) { // generate new uploadid uploadID = minio.MustGetUUID() // generate name for part zero meta := gcsMultipartMetaName(uploadID) w := l.client.Bucket(bucket).Object(meta).NewWriter(ctx) defer w.Close() applyMetadataToGCSAttrs(o.UserDefined, &w.ObjectAttrs) if err = json.NewEncoder(w).Encode(gcsMultipartMetaV1{ gcsMinioMultipartMetaCurrentVersion, bucket, key, }); err != nil { logger.LogIf(ctx, err) return "", gcsToObjectError(err, bucket, key) } return uploadID, nil } // ListMultipartUploads - lists the (first) multipart upload for an object // matched _exactly_ by the prefix func (l *gcsGateway) ListMultipartUploads(ctx context.Context, bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (minio.ListMultipartsInfo, error) { // List objects under /gcsMinioMultipartPathV1 it := l.client.Bucket(bucket).Objects(ctx, &storage.Query{ Prefix: gcsMinioMultipartPathV1, }) var uploads []minio.MultipartInfo for { attrs, err := it.Next() if err == iterator.Done { break } if err != nil { logger.LogIf(ctx, err) return minio.ListMultipartsInfo{ KeyMarker: keyMarker, UploadIDMarker: uploadIDMarker, MaxUploads: maxUploads, Prefix: prefix, Delimiter: delimiter, }, gcsToObjectError(err) } // Skip entries other than gcs.json if !strings.HasSuffix(attrs.Name, gcsMinioMultipartMeta) { continue } // Extract multipart upload information from gcs.json obj := l.client.Bucket(bucket).Object(attrs.Name) objReader, rErr := obj.NewReader(ctx) if rErr != nil { logger.LogIf(ctx, rErr) return minio.ListMultipartsInfo{}, rErr } defer objReader.Close() var mpMeta gcsMultipartMetaV1 dec := json.NewDecoder(objReader) decErr := dec.Decode(&mpMeta) if decErr != nil { logger.LogIf(ctx, decErr) return minio.ListMultipartsInfo{}, decErr } if prefix == mpMeta.Object { // Extract uploadId // E.g minio.sys.tmp/multipart/v1/d063ad89-fdc4-4ea3-a99e-22dba98151f5/gcs.json components := strings.SplitN(attrs.Name, minio.SlashSeparator, 5) if len(components) != 5 { compErr := errors.New("Invalid multipart upload format") logger.LogIf(ctx, compErr) return minio.ListMultipartsInfo{}, compErr } upload := minio.MultipartInfo{ Object: mpMeta.Object, UploadID: components[3], Initiated: attrs.Created, } uploads = append(uploads, upload) } } return minio.ListMultipartsInfo{ KeyMarker: keyMarker, UploadIDMarker: uploadIDMarker, MaxUploads: maxUploads, Prefix: prefix, Delimiter: delimiter, Uploads: uploads, NextKeyMarker: "", NextUploadIDMarker: "", IsTruncated: false, }, nil } // Checks if minio.sys.tmp/multipart/v1//gcs.json exists, returns // an object layer compatible error upon any error. func (l *gcsGateway) checkUploadIDExists(ctx context.Context, bucket string, key string, uploadID string) error { _, err := l.client.Bucket(bucket).Object(gcsMultipartMetaName(uploadID)).Attrs(ctx) logger.LogIf(ctx, err) return gcsToObjectError(err, bucket, key, uploadID) } // PutObjectPart puts a part of object in bucket func (l *gcsGateway) PutObjectPart(ctx context.Context, bucket string, key string, uploadID string, partNumber int, r *minio.PutObjReader, opts minio.ObjectOptions) (minio.PartInfo, error) { data := r.Reader if err := l.checkUploadIDExists(ctx, bucket, key, uploadID); err != nil { return minio.PartInfo{}, err } etag := data.MD5HexString() if etag == "" { // Generate random ETag. etag = minio.GenETag() } object := l.client.Bucket(bucket).Object(gcsMultipartDataName(uploadID, partNumber, etag)) w := object.NewWriter(ctx) // Disable "chunked" uploading in GCS client. If enabled, it can cause a corner case // where it tries to upload 0 bytes in the last chunk and get error from server. w.ChunkSize = 0 if _, err := io.Copy(w, data); err != nil { // Make sure to close object writer upon error. w.Close() logger.LogIf(ctx, err) return minio.PartInfo{}, gcsToObjectError(err, bucket, key) } // Make sure to close the object writer upon success. if err := w.Close(); err != nil { logger.LogIf(ctx, err) return minio.PartInfo{}, gcsToObjectError(err, bucket, key) } return minio.PartInfo{ PartNumber: partNumber, ETag: etag, LastModified: minio.UTCNow(), Size: data.Size(), }, nil } // gcsGetPartInfo returns PartInfo of a given object part func gcsGetPartInfo(ctx context.Context, attrs *storage.ObjectAttrs) (minio.PartInfo, error) { components := strings.SplitN(attrs.Name, minio.SlashSeparator, 5) if len(components) != 5 { logger.LogIf(ctx, errors.New("Invalid multipart upload format")) return minio.PartInfo{}, errors.New("Invalid multipart upload format") } partComps := strings.SplitN(components[4], ".", 2) if len(partComps) != 2 { logger.LogIf(ctx, errors.New("Invalid multipart part format")) return minio.PartInfo{}, errors.New("Invalid multipart part format") } partNum, pErr := strconv.Atoi(partComps[0]) if pErr != nil { logger.LogIf(ctx, pErr) return minio.PartInfo{}, errors.New("Invalid part number") } return minio.PartInfo{ PartNumber: partNum, LastModified: attrs.Updated, Size: attrs.Size, ETag: partComps[1], }, nil } // GetMultipartInfo returns multipart info of the uploadId of the object func (l *gcsGateway) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts minio.ObjectOptions) (result minio.MultipartInfo, err error) { result.Bucket = bucket result.Object = object result.UploadID = uploadID return result, nil } // ListObjectParts returns all object parts for specified object in specified bucket func (l *gcsGateway) ListObjectParts(ctx context.Context, bucket string, key string, uploadID string, partNumberMarker int, maxParts int, opts minio.ObjectOptions) (minio.ListPartsInfo, error) { it := l.client.Bucket(bucket).Objects(ctx, &storage.Query{ Prefix: path.Join(gcsMinioMultipartPathV1, uploadID), }) var ( count int partInfos []minio.PartInfo ) isTruncated := true for count < maxParts { attrs, err := it.Next() if err == iterator.Done { isTruncated = false break } if err != nil { logger.LogIf(ctx, err) return minio.ListPartsInfo{}, gcsToObjectError(err) } if strings.HasSuffix(attrs.Name, gcsMinioMultipartMeta) { continue } partInfo, pErr := gcsGetPartInfo(ctx, attrs) if pErr != nil { logger.LogIf(ctx, pErr) return minio.ListPartsInfo{}, pErr } if partInfo.PartNumber <= partNumberMarker { continue } partInfos = append(partInfos, partInfo) count++ } nextPartNumberMarker := 0 if isTruncated { nextPartNumberMarker = partInfos[maxParts-1].PartNumber } return minio.ListPartsInfo{ Bucket: bucket, Object: key, UploadID: uploadID, PartNumberMarker: partNumberMarker, NextPartNumberMarker: nextPartNumberMarker, MaxParts: maxParts, Parts: partInfos, IsTruncated: isTruncated, }, nil } // Called by AbortMultipartUpload and CompleteMultipartUpload for cleaning up. func (l *gcsGateway) cleanupMultipartUpload(ctx context.Context, bucket, key, uploadID string) error { prefix := fmt.Sprintf("%s/%s/", gcsMinioMultipartPathV1, uploadID) // iterate through all parts and delete them it := l.client.Bucket(bucket).Objects(ctx, &storage.Query{Prefix: prefix, Versions: false}) for { attrs, err := it.Next() if err == iterator.Done { break } if err != nil { logger.LogIf(ctx, err) return gcsToObjectError(err, bucket, key) } object := l.client.Bucket(bucket).Object(attrs.Name) // Ignore the error as parallel AbortMultipartUpload might have deleted it. object.Delete(ctx) } return nil } // AbortMultipartUpload aborts a ongoing multipart upload func (l *gcsGateway) AbortMultipartUpload(ctx context.Context, bucket string, key string, uploadID string, opts minio.ObjectOptions) error { if err := l.checkUploadIDExists(ctx, bucket, key, uploadID); err != nil { return err } return l.cleanupMultipartUpload(ctx, bucket, key, uploadID) } // CompleteMultipartUpload completes ongoing multipart upload and finalizes object // Note that there is a limit (currently 32) to the number of components that can // be composed in a single operation. There is a per-project rate limit (currently 200) // to the number of source objects you can compose per second. func (l *gcsGateway) CompleteMultipartUpload(ctx context.Context, bucket string, key string, uploadID string, uploadedParts []minio.CompletePart, opts minio.ObjectOptions) (minio.ObjectInfo, error) { meta := gcsMultipartMetaName(uploadID) object := l.client.Bucket(bucket).Object(meta) partZeroAttrs, err := object.Attrs(ctx) if err != nil { logger.LogIf(ctx, err) return minio.ObjectInfo{}, gcsToObjectError(err, bucket, key, uploadID) } r, err := object.NewReader(ctx) if err != nil { logger.LogIf(ctx, err) return minio.ObjectInfo{}, gcsToObjectError(err, bucket, key) } defer r.Close() // Check version compatibility of the meta file before compose() multipartMeta := gcsMultipartMetaV1{} if err = json.NewDecoder(r).Decode(&multipartMeta); err != nil { logger.LogIf(ctx, err) return minio.ObjectInfo{}, gcsToObjectError(err, bucket, key) } if multipartMeta.Version != gcsMinioMultipartMetaCurrentVersion { logger.LogIf(ctx, errGCSFormat) return minio.ObjectInfo{}, gcsToObjectError(errGCSFormat, bucket, key) } // Validate if the gcs.json stores valid entries for the bucket and key. if multipartMeta.Bucket != bucket || multipartMeta.Object != key { return minio.ObjectInfo{}, gcsToObjectError(minio.InvalidUploadID{ UploadID: uploadID, }, bucket, key) } var parts []*storage.ObjectHandle partSizes := make([]int64, len(uploadedParts)) for i, uploadedPart := range uploadedParts { parts = append(parts, l.client.Bucket(bucket).Object(gcsMultipartDataName(uploadID, uploadedPart.PartNumber, uploadedPart.ETag))) partAttr, pErr := l.client.Bucket(bucket).Object(gcsMultipartDataName(uploadID, uploadedPart.PartNumber, uploadedPart.ETag)).Attrs(ctx) if pErr != nil { logger.LogIf(ctx, pErr) return minio.ObjectInfo{}, gcsToObjectError(pErr, bucket, key, uploadID) } partSizes[i] = partAttr.Size } // Error out if parts except last part sizing < 5MiB. for i, size := range partSizes[:len(partSizes)-1] { if size < 5*humanize.MiByte { logger.LogIf(ctx, minio.PartTooSmall{ PartNumber: uploadedParts[i].PartNumber, PartSize: size, PartETag: uploadedParts[i].ETag, }) return minio.ObjectInfo{}, minio.PartTooSmall{ PartNumber: uploadedParts[i].PartNumber, PartSize: size, PartETag: uploadedParts[i].ETag, } } } // Returns name of the composed object. gcsMultipartComposeName := func(uploadID string, composeNumber int) string { return fmt.Sprintf("%s/tmp/%s/composed-object-%05d", minio.GatewayMinioSysTmp, uploadID, composeNumber) } composeCount := int(math.Ceil(float64(len(parts)) / float64(gcsMaxComponents))) if composeCount > 1 { // Create composes of every 32 parts. composeParts := make([]*storage.ObjectHandle, composeCount) for i := 0; i < composeCount; i++ { // Create 'composed-object-N' using next 32 parts. composeParts[i] = l.client.Bucket(bucket).Object(gcsMultipartComposeName(uploadID, i)) start := i * gcsMaxComponents end := start + gcsMaxComponents if end > len(parts) { end = len(parts) } composer := composeParts[i].ComposerFrom(parts[start:end]...) composer.ContentType = partZeroAttrs.ContentType composer.Metadata = partZeroAttrs.Metadata if _, err = composer.Run(ctx); err != nil { logger.LogIf(ctx, err) return minio.ObjectInfo{}, gcsToObjectError(err, bucket, key) } } // As composes are successfully created, final object needs to be created using composes. parts = composeParts } composer := l.client.Bucket(bucket).Object(key).ComposerFrom(parts...) composer.ContentType = partZeroAttrs.ContentType composer.ContentEncoding = partZeroAttrs.ContentEncoding composer.CacheControl = partZeroAttrs.CacheControl composer.ContentDisposition = partZeroAttrs.ContentDisposition composer.ContentLanguage = partZeroAttrs.ContentLanguage composer.Metadata = partZeroAttrs.Metadata attrs, err := composer.Run(ctx) if err != nil { logger.LogIf(ctx, err) return minio.ObjectInfo{}, gcsToObjectError(err, bucket, key) } if err = l.cleanupMultipartUpload(ctx, bucket, key, uploadID); err != nil { return minio.ObjectInfo{}, gcsToObjectError(err, bucket, key) } return fromGCSAttrsToObjectInfo(attrs), nil } // SetBucketPolicy - Set policy on bucket func (l *gcsGateway) SetBucketPolicy(ctx context.Context, bucket string, bucketPolicy *policy.Policy) error { policyInfo, err := minio.PolicyToBucketAccessPolicy(bucketPolicy) if err != nil { logger.LogIf(ctx, err) return gcsToObjectError(err, bucket) } var policies []minio.BucketAccessPolicy for prefix, policy := range miniogopolicy.GetPolicies(policyInfo.Statements, bucket, "") { policies = append(policies, minio.BucketAccessPolicy{ Prefix: prefix, Policy: policy, }) } prefix := bucket + "/*" // For all objects inside the bucket. if len(policies) != 1 { logger.LogIf(ctx, minio.NotImplemented{}) return minio.NotImplemented{} } if policies[0].Prefix != prefix { logger.LogIf(ctx, minio.NotImplemented{}) return minio.NotImplemented{} } acl := l.client.Bucket(bucket).ACL() if policies[0].Policy == miniogopolicy.BucketPolicyNone { if err := acl.Delete(ctx, storage.AllUsers); err != nil { logger.LogIf(ctx, err) return gcsToObjectError(err, bucket) } return nil } var role storage.ACLRole switch policies[0].Policy { case miniogopolicy.BucketPolicyReadOnly: role = storage.RoleReader case miniogopolicy.BucketPolicyWriteOnly: role = storage.RoleWriter default: logger.LogIf(ctx, minio.NotImplemented{}) return minio.NotImplemented{} } if err := acl.Set(ctx, storage.AllUsers, role); err != nil { logger.LogIf(ctx, err) return gcsToObjectError(err, bucket) } return nil } // GetBucketPolicy - Get policy on bucket func (l *gcsGateway) GetBucketPolicy(ctx context.Context, bucket string) (*policy.Policy, error) { rules, err := l.client.Bucket(bucket).ACL().List(ctx) if err != nil { return nil, gcsToObjectError(err, bucket) } var readOnly, writeOnly bool for _, r := range rules { if r.Entity != storage.AllUsers || r.Role == storage.RoleOwner { continue } switch r.Role { case storage.RoleReader: readOnly = true case storage.RoleWriter: writeOnly = true } } actionSet := policy.NewActionSet() if readOnly { actionSet.Add(policy.GetBucketLocationAction) actionSet.Add(policy.ListBucketAction) actionSet.Add(policy.GetObjectAction) } if writeOnly { actionSet.Add(policy.GetBucketLocationAction) actionSet.Add(policy.ListBucketMultipartUploadsAction) actionSet.Add(policy.AbortMultipartUploadAction) actionSet.Add(policy.DeleteObjectAction) actionSet.Add(policy.ListMultipartUploadPartsAction) actionSet.Add(policy.PutObjectAction) } // Return NoSuchBucketPolicy error, when policy is not set if len(actionSet) == 0 { return nil, gcsToObjectError(minio.BucketPolicyNotFound{}, bucket) } return &policy.Policy{ Version: policy.DefaultVersion, Statements: []policy.Statement{ policy.NewStatement( policy.Allow, policy.NewPrincipal("*"), actionSet, policy.NewResourceSet( policy.NewResource(bucket, ""), policy.NewResource(bucket, "*"), ), condition.NewFunctions(), ), }, }, nil } // DeleteBucketPolicy - Delete all policies on bucket func (l *gcsGateway) DeleteBucketPolicy(ctx context.Context, bucket string) error { // This only removes the storage.AllUsers policies if err := l.client.Bucket(bucket).ACL().Delete(ctx, storage.AllUsers); err != nil { return gcsToObjectError(err, bucket) } return nil } // IsCompressionSupported returns whether compression is applicable for this layer. func (l *gcsGateway) IsCompressionSupported() bool { return false }