/* * MinIO Cloud Storage, (C) 2017-2020 MinIO, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package b2 import ( "context" "crypto/sha1" "fmt" "hash" "io" "net/http" "strings" "sync" "time" b2 "github.com/kurin/blazer/base" "github.com/minio/cli" miniogopolicy "github.com/minio/minio-go/v6/pkg/policy" "github.com/minio/minio/cmd/logger" "github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/bucket/policy" "github.com/minio/minio/pkg/bucket/policy/condition" h2 "github.com/minio/minio/pkg/hash" minio "github.com/minio/minio/cmd" ) // Supported bucket types by B2 backend. const ( bucketTypePrivate = "allPrivate" bucketTypeReadOnly = "allPublic" b2Backend = "b2" ) func init() { const b2GatewayTemplate = `NAME: {{.HelpName}} - {{.Usage}} USAGE: {{.HelpName}} {{if .VisibleFlags}}[FLAGS]{{end}} {{if .VisibleFlags}} FLAGS: {{range .VisibleFlags}}{{.}} {{end}}{{end}} EXAMPLES: 1. Start minio gateway server for B2 backend {{.Prompt}} {{.EnvVarSetCommand}} MINIO_ACCESS_KEY{{.AssignmentOperator}}accountID {{.Prompt}} {{.EnvVarSetCommand}} MINIO_SECRET_KEY{{.AssignmentOperator}}applicationKey {{.Prompt}} {{.HelpName}} 2. Start minio gateway server for B2 backend with edge caching enabled {{.Prompt}} {{.EnvVarSetCommand}} MINIO_ACCESS_KEY{{.AssignmentOperator}}accountID {{.Prompt}} {{.EnvVarSetCommand}} MINIO_SECRET_KEY{{.AssignmentOperator}}applicationKey {{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_DRIVES{{.AssignmentOperator}}"/mnt/drive1,/mnt/drive2,/mnt/drive3,/mnt/drive4" {{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_EXCLUDE{{.AssignmentOperator}}"bucket1/*,*.png" {{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_QUOTA{{.AssignmentOperator}}90 {{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_AFTER{{.AssignmentOperator}}3 {{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_WATERMARK_LOW{{.AssignmentOperator}}75 {{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_WATERMARK_HIGH{{.AssignmentOperator}}85 {{.Prompt}} {{.HelpName}} ` minio.RegisterGatewayCommand(cli.Command{ Name: b2Backend, Usage: "Backblaze B2", Action: b2GatewayMain, CustomHelpTemplate: b2GatewayTemplate, HideHelpCommand: true, }) } // Handler for 'minio gateway b2' command line. func b2GatewayMain(ctx *cli.Context) { minio.StartGateway(ctx, &B2{ strictS3Compat: ctx.IsSet("compat") || ctx.GlobalIsSet("compat"), }) } // B2 implements MinIO Gateway type B2 struct { strictS3Compat bool } // Name implements Gateway interface. func (g *B2) Name() string { return b2Backend } // NewGatewayLayer returns b2 gateway layer, implements ObjectLayer interface to // talk to B2 remote backend. func (g *B2) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error) { ctx := minio.GlobalContext client, err := b2.AuthorizeAccount(ctx, creds.AccessKey, creds.SecretKey, b2.Transport(minio.NewGatewayHTTPTransport())) if err != nil { return nil, err } return &b2Objects{ creds: creds, b2Client: client, httpClient: &http.Client{ Transport: minio.NewGatewayHTTPTransport(), }, ctx: ctx, strictS3Compat: g.strictS3Compat, }, nil } // Production - Ready for production use. func (g *B2) Production() bool { return true } // b2Object implements gateway for MinIO and BackBlaze B2 compatible object storage servers. type b2Objects struct { minio.GatewayUnsupported mu sync.Mutex creds auth.Credentials b2Client *b2.B2 httpClient *http.Client ctx context.Context buckets []*b2.Bucket strictS3Compat bool } // Convert B2 errors to minio object layer errors. func b2ToObjectError(err error, params ...string) error { if err == nil { return nil } code, msgCode, msg := b2.MsgCode(err) if code == 0 { // We don't interpret non B2 errors. B2 errors have statusCode // to help us convert them to S3 object errors. return err } objErr := b2MsgCodeToObjectError(code, msgCode, msg, params...) if objErr == nil { return err } return objErr } func b2MsgCodeToObjectError(code int, msgCode string, msg string, params ...string) error { // Following code is a non-exhaustive check to convert // B2 errors into S3 compatible errors. // // For a more complete information - https://www.backblaze.com/b2/docs/ var err error bucket := "" object := "" uploadID := "" if len(params) >= 1 { bucket = params[0] } if len(params) == 2 { object = params[1] } if len(params) == 3 { uploadID = params[2] } switch msgCode { case "duplicate_bucket_name": err = minio.BucketAlreadyOwnedByYou{Bucket: bucket} case "bad_request": if object != "" { err = minio.ObjectNameInvalid{ Bucket: bucket, Object: object, } } else if bucket != "" { err = minio.BucketNotFound{Bucket: bucket} } case "bad_json": if object != "" { err = minio.ObjectNameInvalid{ Bucket: bucket, Object: object, } } else if bucket != "" { err = minio.BucketNameInvalid{Bucket: bucket} } case "bad_bucket_id": err = minio.BucketNotFound{Bucket: bucket} case "file_not_present", "not_found": err = minio.ObjectNotFound{ Bucket: bucket, Object: object, } case "cannot_delete_non_empty_bucket": err = minio.BucketNotEmpty{Bucket: bucket} } // Special interpretation like this is required for Multipart sessions. if strings.Contains(msg, "No active upload for") && uploadID != "" { err = minio.InvalidUploadID{UploadID: uploadID} } return err } // Shutdown saves any gateway metadata to disk // if necessary and reload upon next restart. func (l *b2Objects) Shutdown(ctx context.Context) error { return nil } // StorageInfo is not relevant to B2 backend. func (l *b2Objects) StorageInfo(ctx context.Context, _ bool) (si minio.StorageInfo) { si.Backend.Type = minio.BackendGateway si.Backend.GatewayOnline = minio.IsBackendOnline(ctx, l.httpClient, "https://api.backblazeb2.com/b2api/v1") return si } // MakeBucket creates a new container on B2 backend. func (l *b2Objects) MakeBucketWithLocation(ctx context.Context, bucket, location string) error { // location is ignored for B2 backend. // All buckets are set to private by default. _, err := l.b2Client.CreateBucket(l.ctx, bucket, bucketTypePrivate, nil, nil) logger.LogIf(ctx, err) return b2ToObjectError(err, bucket) } func (l *b2Objects) reAuthorizeAccount(ctx context.Context) error { client, err := b2.AuthorizeAccount(l.ctx, l.creds.AccessKey, l.creds.SecretKey, b2.Transport(minio.NewGatewayHTTPTransport())) if err != nil { return err } l.mu.Lock() l.b2Client.Update(client) l.mu.Unlock() return nil } // getETag returns an S3-compatible md5sum-ETag if requested and possible. func (l *b2Objects) getETag(f *b2.File, fi *b2.FileInfo) string { if l.strictS3Compat && fi.MD5 != "" { return fi.MD5 } return minio.ToS3ETag(f.ID) } // listBuckets is a wrapper similar to ListBuckets, which re-authorizes // the account and updates the B2 client safely. Once successfully // authorized performs the call again and returns list of buckets. // For any errors which are not actionable we return an error. func (l *b2Objects) listBuckets(ctx context.Context, err error) ([]*b2.Bucket, error) { if err != nil { if b2.Action(err) != b2.ReAuthenticate { return nil, err } if rerr := l.reAuthorizeAccount(ctx); rerr != nil { return nil, rerr } } if len(l.buckets) == 0 { bktList, lerr := l.b2Client.ListBuckets(l.ctx) if lerr != nil { return l.listBuckets(ctx, lerr) } l.buckets = bktList } return l.buckets, nil } // Bucket - is a helper which provides a *Bucket instance // for performing an API operation. B2 API doesn't // provide a direct way to access the bucket so we need // to employ following technique. func (l *b2Objects) Bucket(ctx context.Context, bucket string) (*b2.Bucket, error) { bktList, err := l.listBuckets(ctx, nil) if err != nil { logger.LogIf(ctx, err) return nil, b2ToObjectError(err, bucket) } for _, bkt := range bktList { if bkt.Name == bucket { return bkt, nil } } return nil, minio.BucketNotFound{Bucket: bucket} } // GetBucketInfo gets bucket metadata.. func (l *b2Objects) GetBucketInfo(ctx context.Context, bucket string) (bi minio.BucketInfo, err error) { if _, err = l.Bucket(ctx, bucket); err != nil { return bi, err } return minio.BucketInfo{ Name: bucket, Created: time.Unix(0, 0), }, nil } // ListBuckets lists all B2 buckets func (l *b2Objects) ListBuckets(ctx context.Context) ([]minio.BucketInfo, error) { bktList, err := l.listBuckets(ctx, nil) if err != nil { return nil, err } var bktInfo []minio.BucketInfo for _, bkt := range bktList { bktInfo = append(bktInfo, minio.BucketInfo{ Name: bkt.Name, Created: time.Unix(0, 0), }) } return bktInfo, nil } // DeleteBucket deletes a bucket on B2 func (l *b2Objects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error { bkt, err := l.Bucket(ctx, bucket) if err != nil { return err } err = bkt.DeleteBucket(l.ctx) if err != nil { l.buckets = []*b2.Bucket{} bkt, err := l.Bucket(ctx, bucket) if err != nil { return err } err = bkt.DeleteBucket(l.ctx) logger.LogIf(ctx, err) return b2ToObjectError(err, bucket) } logger.LogIf(ctx, err) return b2ToObjectError(err, bucket) } // ListObjects lists all objects in B2 bucket filtered by prefix, returns upto at max 1000 entries at a time. func (l *b2Objects) ListObjects(ctx context.Context, bucket string, prefix string, marker string, delimiter string, maxKeys int) (loi minio.ListObjectsInfo, err error) { bkt, err := l.Bucket(ctx, bucket) if err != nil { return loi, err } files, next, lerr := bkt.ListFileNames(l.ctx, maxKeys, marker, prefix, delimiter) if lerr != nil { l.buckets = []*b2.Bucket{} bkt, err := l.Bucket(ctx, bucket) if err != nil { return loi, err } files, next, lerr = bkt.ListFileNames(l.ctx, maxKeys, marker, prefix, delimiter) if lerr != nil { logger.LogIf(ctx, lerr) return loi, b2ToObjectError(lerr, bucket) } } loi.IsTruncated = next != "" loi.NextMarker = next for _, file := range files { switch file.Status { case "folder": loi.Prefixes = append(loi.Prefixes, file.Name) case "upload": loi.Objects = append(loi.Objects, minio.ObjectInfo{ Bucket: bucket, Name: file.Name, ModTime: file.Timestamp, Size: file.Size, ETag: l.getETag(file, file.Info), ContentType: file.Info.ContentType, UserDefined: file.Info.Info, }) } } return loi, nil } // ListObjectsV2 lists all objects in B2 bucket filtered by prefix, returns upto max 1000 entries at a time. func (l *b2Objects) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (loi minio.ListObjectsV2Info, err error) { // fetchOwner is not supported and unused. marker := continuationToken if marker == "" { // B2's continuation token is an object name to "start at" rather than "start after" // startAfter plus the lowest character B2 supports is used so that the startAfter // object isn't included in the results marker = startAfter + " " } bkt, err := l.Bucket(ctx, bucket) if err != nil { return loi, err } files, next, lerr := bkt.ListFileNames(l.ctx, maxKeys, marker, prefix, delimiter) if lerr != nil { l.buckets = []*b2.Bucket{} bkt, err := l.Bucket(ctx, bucket) if err != nil { return loi, err } files, next, lerr = bkt.ListFileNames(l.ctx, maxKeys, marker, prefix, delimiter) if lerr != nil { logger.LogIf(ctx, lerr) return loi, b2ToObjectError(lerr, bucket) } } loi.IsTruncated = next != "" loi.ContinuationToken = continuationToken loi.NextContinuationToken = next for _, file := range files { switch file.Status { case "folder": loi.Prefixes = append(loi.Prefixes, file.Name) case "upload": loi.Objects = append(loi.Objects, minio.ObjectInfo{ Bucket: bucket, Name: file.Name, ModTime: file.Timestamp, Size: file.Size, ETag: l.getETag(file, file.Info), ContentType: file.Info.ContentType, UserDefined: file.Info.Info, }) } } return loi, nil } // GetObjectNInfo - returns object info and locked object ReadCloser func (l *b2Objects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header, lockType minio.LockType, opts minio.ObjectOptions) (gr *minio.GetObjectReader, err error) { var objInfo minio.ObjectInfo objInfo, err = l.GetObjectInfo(ctx, bucket, object, opts) if err != nil { return nil, err } var startOffset, length int64 startOffset, length, err = rs.GetOffsetLength(objInfo.Size) if err != nil { return nil, err } pr, pw := io.Pipe() go func() { err := l.GetObject(ctx, bucket, object, startOffset, length, pw, "", opts) pw.CloseWithError(err) }() // Setup cleanup function to cause the above go-routine to // exit in case of partial read pipeCloser := func() { pr.Close() } return minio.NewGetObjectReaderFromReader(pr, objInfo, opts.CheckCopyPrecondFn, pipeCloser) } // GetObject reads an object from B2. Supports additional // parameters like offset and length which are synonymous with // HTTP Range requests. // // startOffset indicates the starting read location of the object. // length indicates the total length of the object. func (l *b2Objects) GetObject(ctx context.Context, bucket string, object string, startOffset int64, length int64, writer io.Writer, etag string, opts minio.ObjectOptions) error { bkt, err := l.Bucket(ctx, bucket) if err != nil { return err } reader, err := bkt.DownloadFileByName(l.ctx, object, startOffset, length, false) if err != nil { l.buckets = []*b2.Bucket{} bkt, err := l.Bucket(ctx, bucket) if err != nil { return err } reader, err = bkt.DownloadFileByName(l.ctx, object, startOffset, length, false) if err != nil { logger.LogIf(ctx, err) return b2ToObjectError(err, bucket, object) } } defer reader.Close() _, err = io.Copy(writer, reader) logger.LogIf(ctx, err) return b2ToObjectError(err, bucket, object) } // GetObjectInfo reads object info and replies back ObjectInfo func (l *b2Objects) GetObjectInfo(ctx context.Context, bucket string, object string, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) { bkt, err := l.Bucket(ctx, bucket) if err != nil { return objInfo, err } f, _, err := bkt.ListFileNames(l.ctx, 1, object, "", "") if err != nil { l.buckets = []*b2.Bucket{} bkt, err := l.Bucket(ctx, bucket) if err != nil { return objInfo, err } f, _, err = bkt.ListFileNames(l.ctx, 1, object, "", "") if err != nil { logger.LogIf(ctx, err) return objInfo, b2ToObjectError(err, bucket, object) } } // B2's list will return the next item in the bucket if the object doesn't // exist so we need to perform a name check too if len(f) != 1 || (len(f) == 1 && f[0].Name != object) { return objInfo, minio.ObjectNotFound{ Bucket: bucket, Object: object, } } fi, err := bkt.File(f[0].ID, object).GetFileInfo(l.ctx) if err != nil { l.buckets = []*b2.Bucket{} bkt, err := l.Bucket(ctx, bucket) if err != nil { return objInfo, err } fi, err = bkt.File(f[0].ID, object).GetFileInfo(l.ctx) if err != nil { logger.LogIf(ctx, err) return objInfo, b2ToObjectError(err, bucket, object) } } return minio.ObjectInfo{ Bucket: bucket, Name: object, ETag: l.getETag(f[0], fi), Size: fi.Size, ModTime: fi.Timestamp, ContentType: fi.ContentType, UserDefined: fi.Info, }, nil } // In B2 - You must always include the X-Bz-Content-Sha1 header with // your upload request. The value you provide can be: // (1) the 40-character hex checksum of the file, // (2) the string hex_digits_at_end, or // (3) the string do_not_verify. // For more reference - https://www.backblaze.com/b2/docs/uploading.html // // In our case we are going to use (2) option const sha1AtEOF = "hex_digits_at_end" // With the second option mentioned above, you append the 40-character hex sha1 // to the end of the request body, immediately after the contents of the file // being uploaded. Note that the content length is the size of the file plus 40 // of the original size of the reader. // // newB2Reader implements a B2 compatible reader by wrapping the hash.Reader into // a new io.Reader which will emit out the sha1 hex digits at io.EOF. // It also means that your overall content size is now original size + 40 bytes. // Additionally this reader also verifies Hash encapsulated inside hash.Reader // at io.EOF if the verification failed we return an error and do not send // the content to server. func newB2Reader(r *h2.Reader, size int64) *Reader { return &Reader{ r: r, size: size, sha1Hash: sha1.New(), } } // Reader - is a Reader wraps the hash.Reader which will emit out the sha1 // hex digits at io.EOF. It also means that your overall content size is // now original size + 40 bytes. Additionally this reader also verifies // Hash encapsulated inside hash.Reader at io.EOF if the verification // failed we return an error and do not send the content to server. type Reader struct { r *h2.Reader size int64 sha1Hash hash.Hash isEOF bool buf *strings.Reader } // Size - Returns the total size of Reader. func (nb *Reader) Size() int64 { return nb.size + 40 } func (nb *Reader) Read(p []byte) (int, error) { if nb.isEOF { return nb.buf.Read(p) } // Read into hash to update the on going checksum. n, err := io.TeeReader(nb.r, nb.sha1Hash).Read(p) if err == io.EOF { // Stream is not corrupted on this end // now fill in the last 40 bytes of sha1 hex // so that the server can verify the stream on // their end. err = nil nb.isEOF = true nb.buf = strings.NewReader(fmt.Sprintf("%x", nb.sha1Hash.Sum(nil))) } return n, err } // PutObject uploads the single upload to B2 backend by using *b2_upload_file* API, uploads upto 5GiB. func (l *b2Objects) PutObject(ctx context.Context, bucket string, object string, r *minio.PutObjReader, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) { data := r.Reader bkt, err := l.Bucket(ctx, bucket) if err != nil { return objInfo, err } contentType := opts.UserDefined["content-type"] delete(opts.UserDefined, "content-type") var u *b2.URL u, err = bkt.GetUploadURL(l.ctx) if err != nil { l.buckets = []*b2.Bucket{} bkt, err := l.Bucket(ctx, bucket) if err != nil { return objInfo, err } u, err = bkt.GetUploadURL(l.ctx) if err != nil { logger.LogIf(ctx, err) return objInfo, b2ToObjectError(err, bucket, object) } } hr := newB2Reader(data, data.Size()) var f *b2.File f, err = u.UploadFile(l.ctx, hr, int(hr.Size()), object, contentType, sha1AtEOF, opts.UserDefined) if err != nil { logger.LogIf(ctx, err) return objInfo, b2ToObjectError(err, bucket, object) } var fi *b2.FileInfo fi, err = f.GetFileInfo(l.ctx) if err != nil { logger.LogIf(ctx, err) return objInfo, b2ToObjectError(err, bucket, object) } return minio.ObjectInfo{ Bucket: bucket, Name: object, ETag: l.getETag(f, fi), Size: fi.Size, ModTime: fi.Timestamp, ContentType: fi.ContentType, UserDefined: fi.Info, }, nil } // DeleteObject deletes a blob in bucket func (l *b2Objects) DeleteObject(ctx context.Context, bucket string, object string) error { bkt, err := l.Bucket(ctx, bucket) if err != nil { return err } // If we hide the file we'll conform to B2's versioning policy, it also // saves an additional call to check if the file exists first _, err = bkt.HideFile(l.ctx, object) if err != nil { l.buckets = []*b2.Bucket{} bkt, err := l.Bucket(ctx, bucket) if err != nil { return err } _, err = bkt.HideFile(l.ctx, object) logger.LogIf(ctx, err) return b2ToObjectError(err, bucket, object) } logger.LogIf(ctx, err) return b2ToObjectError(err, bucket, object) } func (l *b2Objects) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) { errs := make([]error, len(objects)) for idx, object := range objects { errs[idx] = l.DeleteObject(ctx, bucket, object) } return errs, nil } // ListMultipartUploads lists all multipart uploads. func (l *b2Objects) ListMultipartUploads(ctx context.Context, bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (lmi minio.ListMultipartsInfo, err error) { // keyMarker, prefix, delimiter are all ignored, Backblaze B2 doesn't support any // of these parameters only equivalent parameter is uploadIDMarker. bkt, err := l.Bucket(ctx, bucket) if err != nil { return lmi, err } // The maximum number of files to return from this call. // The default value is 100, and the maximum allowed is 100. if maxUploads > 100 { maxUploads = 100 } largeFiles, nextMarker, err := bkt.ListUnfinishedLargeFiles(l.ctx, maxUploads, uploadIDMarker) if err != nil { l.buckets = []*b2.Bucket{} bkt, err := l.Bucket(ctx, bucket) if err != nil { return lmi, err } largeFiles, nextMarker, err = bkt.ListUnfinishedLargeFiles(l.ctx, maxUploads, uploadIDMarker) if err != nil { logger.LogIf(ctx, err) return lmi, b2ToObjectError(err, bucket) } } lmi = minio.ListMultipartsInfo{ MaxUploads: maxUploads, } if nextMarker != "" { lmi.IsTruncated = true lmi.NextUploadIDMarker = nextMarker } for _, largeFile := range largeFiles { lmi.Uploads = append(lmi.Uploads, minio.MultipartInfo{ Object: largeFile.Name, UploadID: largeFile.ID, Initiated: largeFile.Timestamp, }) } return lmi, nil } // NewMultipartUpload upload object in multiple parts, uses B2's LargeFile upload API. // Large files can range in size from 5MB to 10TB. // Each large file must consist of at least 2 parts, and all of the parts except the // last one must be at least 5MB in size. The last part must contain at least one byte. // For more information - https://www.backblaze.com/b2/docs/large_files.html func (l *b2Objects) NewMultipartUpload(ctx context.Context, bucket string, object string, opts minio.ObjectOptions) (string, error) { var uploadID string bkt, err := l.Bucket(ctx, bucket) if err != nil { return uploadID, err } contentType := opts.UserDefined["content-type"] delete(opts.UserDefined, "content-type") lf, err := bkt.StartLargeFile(l.ctx, object, contentType, opts.UserDefined) if err != nil { l.buckets = []*b2.Bucket{} bkt, err := l.Bucket(ctx, bucket) if err != nil { return uploadID, err } lf, err = bkt.StartLargeFile(l.ctx, object, contentType, opts.UserDefined) if err != nil { logger.LogIf(ctx, err) return uploadID, b2ToObjectError(err, bucket, object) } } return lf.ID, nil } // PutObjectPart puts a part of object in bucket, uses B2's LargeFile upload API. func (l *b2Objects) PutObjectPart(ctx context.Context, bucket string, object string, uploadID string, partID int, r *minio.PutObjReader, opts minio.ObjectOptions) (pi minio.PartInfo, err error) { data := r.Reader bkt, err := l.Bucket(ctx, bucket) if err != nil { return pi, err } fc, err := bkt.File(uploadID, object).CompileParts(0, nil).GetUploadPartURL(l.ctx) if err != nil { l.buckets = []*b2.Bucket{} bkt, err := l.Bucket(ctx, bucket) if err != nil { return pi, err } fc, err = bkt.File(uploadID, object).CompileParts(0, nil).GetUploadPartURL(l.ctx) if err != nil { logger.LogIf(ctx, err) return pi, b2ToObjectError(err, bucket, object, uploadID) } } hr := newB2Reader(data, data.Size()) _, err = fc.UploadPart(l.ctx, hr, sha1AtEOF, int(hr.Size()), partID) if err != nil { logger.LogIf(ctx, err) return pi, b2ToObjectError(err, bucket, object, uploadID) } return minio.PartInfo{ PartNumber: partID, LastModified: minio.UTCNow(), ETag: minio.ToS3ETag(fmt.Sprintf("%x", hr.sha1Hash.Sum(nil))), Size: data.Size(), }, nil } // ListObjectParts returns all object parts for specified object in specified bucket, uses B2's LargeFile upload API. func (l *b2Objects) ListObjectParts(ctx context.Context, bucket string, object string, uploadID string, partNumberMarker int, maxParts int, opts minio.ObjectOptions) (lpi minio.ListPartsInfo, err error) { bkt, err := l.Bucket(ctx, bucket) if err != nil { return lpi, err } lpi = minio.ListPartsInfo{ Bucket: bucket, Object: object, UploadID: uploadID, MaxParts: maxParts, PartNumberMarker: partNumberMarker, } // startPartNumber must be in the range 1 - 10000 for B2. partNumberMarker++ partsList, next, err := bkt.File(uploadID, object).ListParts(l.ctx, partNumberMarker, maxParts) if err != nil { l.buckets = []*b2.Bucket{} bkt, err := l.Bucket(ctx, bucket) if err != nil { return lpi, err } partsList, next, err = bkt.File(uploadID, object).ListParts(l.ctx, partNumberMarker, maxParts) if err != nil { logger.LogIf(ctx, err) return lpi, b2ToObjectError(err, bucket, object, uploadID) } } if next != 0 { lpi.IsTruncated = true lpi.NextPartNumberMarker = next } for _, part := range partsList { lpi.Parts = append(lpi.Parts, minio.PartInfo{ PartNumber: part.Number, ETag: minio.ToS3ETag(part.SHA1), Size: part.Size, }) } return lpi, nil } // AbortMultipartUpload aborts a on going multipart upload, uses B2's LargeFile upload API. func (l *b2Objects) AbortMultipartUpload(ctx context.Context, bucket string, object string, uploadID string) error { bkt, err := l.Bucket(ctx, bucket) if err != nil { return err } err = bkt.File(uploadID, object).CompileParts(0, nil).CancelLargeFile(l.ctx) if err != nil { l.buckets = []*b2.Bucket{} bkt, err := l.Bucket(ctx, bucket) if err != nil { return err } err = bkt.File(uploadID, object).CompileParts(0, nil).CancelLargeFile(l.ctx) logger.LogIf(ctx, err) return b2ToObjectError(err, bucket, object, uploadID) } logger.LogIf(ctx, err) return b2ToObjectError(err, bucket, object, uploadID) } // CompleteMultipartUpload completes ongoing multipart upload and finalizes object, uses B2's LargeFile upload API. func (l *b2Objects) CompleteMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, uploadedParts []minio.CompletePart, opts minio.ObjectOptions) (oi minio.ObjectInfo, err error) { bkt, err := l.Bucket(ctx, bucket) if err != nil { return oi, err } hashes := make(map[int]string) for i, uploadedPart := range uploadedParts { // B2 requires contigous part numbers starting with 1, they do not support // hand picking part numbers, we return an S3 compatible error instead. if i+1 != uploadedPart.PartNumber { logger.LogIf(ctx, minio.InvalidPart{}) return oi, b2ToObjectError(minio.InvalidPart{}, bucket, object, uploadID) } // Trim "-1" suffix in ETag as PutObjectPart() treats B2 returned SHA1 as ETag. hashes[uploadedPart.PartNumber] = strings.TrimSuffix(uploadedPart.ETag, "-1") } if _, err = bkt.File(uploadID, object).CompileParts(0, hashes).FinishLargeFile(l.ctx); err != nil { l.buckets = []*b2.Bucket{} bkt, err := l.Bucket(ctx, bucket) if err != nil { return oi, err } if _, err = bkt.File(uploadID, object).CompileParts(0, hashes).FinishLargeFile(l.ctx); err != nil { logger.LogIf(ctx, err) return oi, b2ToObjectError(err, bucket, object, uploadID) } } return l.GetObjectInfo(ctx, bucket, object, minio.ObjectOptions{}) } // SetBucketPolicy - B2 supports 2 types of bucket policies: // bucketType.AllPublic - bucketTypeReadOnly means that anybody can download the files is the bucket; // bucketType.AllPrivate - bucketTypePrivate means that you need an authorization token to download them. // Default is AllPrivate for all buckets. func (l *b2Objects) SetBucketPolicy(ctx context.Context, bucket string, bucketPolicy *policy.Policy) error { policyInfo, err := minio.PolicyToBucketAccessPolicy(bucketPolicy) if err != nil { // This should not happen. return b2ToObjectError(err, bucket) } var policies []minio.BucketAccessPolicy for prefix, policy := range miniogopolicy.GetPolicies(policyInfo.Statements, bucket, "") { policies = append(policies, minio.BucketAccessPolicy{ Prefix: prefix, Policy: policy, }) } prefix := bucket + "/*" // For all objects inside the bucket. if len(policies) != 1 { logger.LogIf(ctx, minio.NotImplemented{}) return minio.NotImplemented{} } if policies[0].Prefix != prefix { logger.LogIf(ctx, minio.NotImplemented{}) return minio.NotImplemented{} } if policies[0].Policy != miniogopolicy.BucketPolicyReadOnly { logger.LogIf(ctx, minio.NotImplemented{}) return minio.NotImplemented{} } bkt, err := l.Bucket(ctx, bucket) if err != nil { return err } bkt.Type = bucketTypeReadOnly _, err = bkt.Update(l.ctx) if err != nil { l.buckets = []*b2.Bucket{} bkt, err := l.Bucket(ctx, bucket) if err != nil { return err } bkt.Type = bucketTypeReadOnly _, err = bkt.Update(l.ctx) logger.LogIf(ctx, err) return b2ToObjectError(err) } logger.LogIf(ctx, err) return b2ToObjectError(err) } // GetBucketPolicy, returns the current bucketType from B2 backend and convert // it into S3 compatible bucket policy info. func (l *b2Objects) GetBucketPolicy(ctx context.Context, bucket string) (*policy.Policy, error) { bkt, err := l.Bucket(ctx, bucket) if err != nil { return nil, err } // bkt.Type can also be snapshot, but it is only allowed through B2 browser console, // just return back as policy not found for all cases. // CreateBucket always sets the value to allPrivate by default. if bkt.Type != bucketTypeReadOnly { return nil, minio.BucketPolicyNotFound{Bucket: bucket} } return &policy.Policy{ Version: policy.DefaultVersion, Statements: []policy.Statement{ policy.NewStatement( policy.Allow, policy.NewPrincipal("*"), policy.NewActionSet( policy.GetBucketLocationAction, policy.ListBucketAction, policy.GetObjectAction, ), policy.NewResourceSet( policy.NewResource(bucket, ""), policy.NewResource(bucket, "*"), ), condition.NewFunctions(), ), }, }, nil } // DeleteBucketPolicy - resets the bucketType of bucket on B2 to 'allPrivate'. func (l *b2Objects) DeleteBucketPolicy(ctx context.Context, bucket string) error { bkt, err := l.Bucket(ctx, bucket) if err != nil { return err } bkt.Type = bucketTypePrivate _, err = bkt.Update(l.ctx) if err != nil { l.buckets = []*b2.Bucket{} bkt, err := l.Bucket(ctx, bucket) if err != nil { return err } bkt.Type = bucketTypePrivate _, err = bkt.Update(l.ctx) logger.LogIf(ctx, err) return b2ToObjectError(err) } logger.LogIf(ctx, err) return b2ToObjectError(err) } // IsCompressionSupported returns whether compression is applicable for this layer. func (l *b2Objects) IsCompressionSupported() bool { return false } // IsReady returns whether the layer is ready to take requests. func (l *b2Objects) IsReady(ctx context.Context) bool { return minio.IsBackendOnline(ctx, l.httpClient, "https://api.backblazeb2.com/b2api/v1") }