mirror of
https://github.com/minio/minio.git
synced 2024-12-26 07:05:55 -05:00
b330c2c57e
Advantages avoids 100's of stats which are needed for each upload operation in FS/NAS gateway mode when uploading a large multipart object, dramatically increases performance for multipart uploads by avoiding recursive calls. For other gateway's simplifies the approach since azure, gcs, hdfs gateway's don't capture any specific metadata during upload which needs handler validation for encryption/compression. Erasure coding was already optimized, additionally just avoids small allocations of large data structure. Fixes #7206
794 lines
28 KiB
Go
794 lines
28 KiB
Go
/*
|
|
* MinIO Cloud Storage, (C) 2018 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package s3
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"io"
|
|
"net/http"
|
|
"path"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/minio/minio-go/v6/pkg/encrypt"
|
|
minio "github.com/minio/minio/cmd"
|
|
|
|
"github.com/minio/minio/cmd/logger"
|
|
)
|
|
|
|
const (
|
|
// name of custom multipart metadata file for s3 backend.
|
|
gwdareMetaJSON string = "dare.meta"
|
|
|
|
// name of temporary per part metadata file
|
|
gwpartMetaJSON string = "part.meta"
|
|
// custom multipart files are stored under the defaultMinioGWPrefix
|
|
defaultMinioGWPrefix = ".minio"
|
|
defaultGWContentFileName = "data"
|
|
)
|
|
|
|
// s3EncObjects is a wrapper around s3Objects and implements gateway calls for
|
|
// custom large objects encrypted at the gateway
|
|
type s3EncObjects struct {
|
|
s3Objects
|
|
}
|
|
|
|
/*
|
|
NOTE:
|
|
Custom gateway encrypted objects are stored on backend as follows:
|
|
obj/.minio/data <= encrypted content
|
|
obj/.minio/dare.meta <= metadata
|
|
|
|
When a multipart upload operation is in progress, the metadata set during
|
|
NewMultipartUpload is stored in obj/.minio/uploadID/dare.meta and each
|
|
UploadPart operation saves additional state of the part's encrypted ETag and
|
|
encrypted size in obj/.minio/uploadID/part1/part.meta
|
|
|
|
All the part metadata and temp dare.meta are cleaned up when upload completes
|
|
*/
|
|
|
|
// ListObjects lists all blobs in S3 bucket filtered by prefix
|
|
func (l *s3EncObjects) ListObjects(ctx context.Context, bucket string, prefix string, marker string, delimiter string, maxKeys int) (loi minio.ListObjectsInfo, e error) {
|
|
var continuationToken, startAfter string
|
|
res, err := l.ListObjectsV2(ctx, bucket, prefix, continuationToken, delimiter, maxKeys, false, startAfter)
|
|
if err != nil {
|
|
return loi, err
|
|
}
|
|
loi.IsTruncated = res.IsTruncated
|
|
loi.NextMarker = res.NextContinuationToken
|
|
loi.Objects = res.Objects
|
|
loi.Prefixes = res.Prefixes
|
|
return loi, nil
|
|
|
|
}
|
|
|
|
// ListObjectsV2 lists all blobs in S3 bucket filtered by prefix
|
|
func (l *s3EncObjects) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (loi minio.ListObjectsV2Info, e error) {
|
|
|
|
var objects []minio.ObjectInfo
|
|
var prefixes []string
|
|
var isTruncated bool
|
|
|
|
// filter out objects that contain a .minio prefix, but is not a dare.meta metadata file.
|
|
for {
|
|
loi, e = l.s3Objects.ListObjectsV2(ctx, bucket, prefix, continuationToken, delimiter, 1000, fetchOwner, startAfter)
|
|
if e != nil {
|
|
return loi, minio.ErrorRespToObjectError(e, bucket)
|
|
}
|
|
for _, obj := range loi.Objects {
|
|
startAfter = obj.Name
|
|
continuationToken = loi.NextContinuationToken
|
|
isTruncated = loi.IsTruncated
|
|
|
|
if !isGWObject(obj.Name) {
|
|
continue
|
|
}
|
|
// get objectname and ObjectInfo from the custom metadata file
|
|
if strings.HasSuffix(obj.Name, gwdareMetaJSON) {
|
|
objSlice := strings.Split(obj.Name, minio.SlashSeparator+defaultMinioGWPrefix)
|
|
gwMeta, e := l.getGWMetadata(ctx, bucket, getDareMetaPath(objSlice[0]))
|
|
if e != nil {
|
|
continue
|
|
}
|
|
oInfo := gwMeta.ToObjectInfo(bucket, objSlice[0])
|
|
objects = append(objects, oInfo)
|
|
} else {
|
|
objects = append(objects, obj)
|
|
}
|
|
if len(objects) > maxKeys {
|
|
break
|
|
}
|
|
}
|
|
for _, p := range loi.Prefixes {
|
|
objName := strings.TrimSuffix(p, minio.SlashSeparator)
|
|
gm, err := l.getGWMetadata(ctx, bucket, getDareMetaPath(objName))
|
|
// if prefix is actually a custom multi-part object, append it to objects
|
|
if err == nil {
|
|
objects = append(objects, gm.ToObjectInfo(bucket, objName))
|
|
continue
|
|
}
|
|
isPrefix := l.isPrefix(ctx, bucket, p, fetchOwner, startAfter)
|
|
if isPrefix {
|
|
prefixes = append(prefixes, p)
|
|
}
|
|
}
|
|
if (len(objects) > maxKeys) || !loi.IsTruncated {
|
|
break
|
|
}
|
|
}
|
|
|
|
loi.IsTruncated = isTruncated
|
|
loi.ContinuationToken = continuationToken
|
|
loi.Objects = make([]minio.ObjectInfo, 0)
|
|
loi.Prefixes = make([]string, 0)
|
|
loi.Objects = append(loi.Objects, objects...)
|
|
|
|
for _, pfx := range prefixes {
|
|
if pfx != prefix {
|
|
loi.Prefixes = append(loi.Prefixes, pfx)
|
|
}
|
|
}
|
|
// Set continuation token if s3 returned truncated list
|
|
if isTruncated {
|
|
if len(objects) > 0 {
|
|
loi.NextContinuationToken = objects[len(objects)-1].Name
|
|
}
|
|
}
|
|
return loi, nil
|
|
}
|
|
|
|
// isGWObject returns true if it is a custom object
|
|
func isGWObject(objName string) bool {
|
|
isEncrypted := strings.Contains(objName, defaultMinioGWPrefix)
|
|
if !isEncrypted {
|
|
return true
|
|
}
|
|
// ignore temp part.meta files
|
|
if strings.Contains(objName, gwpartMetaJSON) {
|
|
return false
|
|
}
|
|
|
|
pfxSlice := strings.Split(objName, minio.SlashSeparator)
|
|
var i1, i2 int
|
|
for i := len(pfxSlice) - 1; i >= 0; i-- {
|
|
p := pfxSlice[i]
|
|
if p == defaultMinioGWPrefix {
|
|
i1 = i
|
|
}
|
|
if p == gwdareMetaJSON {
|
|
i2 = i
|
|
}
|
|
if i1 > 0 && i2 > 0 {
|
|
break
|
|
}
|
|
}
|
|
// incomplete uploads would have a uploadID between defaultMinioGWPrefix and gwdareMetaJSON
|
|
return i2 > 0 && i1 > 0 && i2-i1 == 1
|
|
}
|
|
|
|
// isPrefix returns true if prefix exists and is not an incomplete multipart upload entry
|
|
func (l *s3EncObjects) isPrefix(ctx context.Context, bucket, prefix string, fetchOwner bool, startAfter string) bool {
|
|
var continuationToken, delimiter string
|
|
|
|
for {
|
|
loi, e := l.s3Objects.ListObjectsV2(ctx, bucket, prefix, continuationToken, delimiter, 1000, fetchOwner, startAfter)
|
|
if e != nil {
|
|
return false
|
|
}
|
|
for _, obj := range loi.Objects {
|
|
if isGWObject(obj.Name) {
|
|
return true
|
|
}
|
|
}
|
|
|
|
continuationToken = loi.NextContinuationToken
|
|
if !loi.IsTruncated {
|
|
break
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// GetObject reads an object from S3. Supports additional
|
|
// parameters like offset and length which are synonymous with
|
|
// HTTP Range requests.
|
|
func (l *s3EncObjects) GetObject(ctx context.Context, bucket string, key string, startOffset int64, length int64, writer io.Writer, etag string, opts minio.ObjectOptions) error {
|
|
return l.getObject(ctx, bucket, key, startOffset, length, writer, etag, opts)
|
|
}
|
|
|
|
func (l *s3EncObjects) isGWEncrypted(ctx context.Context, bucket, object string) bool {
|
|
_, err := l.s3Objects.GetObjectInfo(ctx, bucket, getDareMetaPath(object), minio.ObjectOptions{})
|
|
return err == nil
|
|
}
|
|
|
|
// getDaremetadata fetches dare.meta from s3 backend and marshals into a structured format.
|
|
func (l *s3EncObjects) getGWMetadata(ctx context.Context, bucket, metaFileName string) (m gwMetaV1, err error) {
|
|
oi, err1 := l.s3Objects.GetObjectInfo(ctx, bucket, metaFileName, minio.ObjectOptions{})
|
|
if err1 != nil {
|
|
return m, err1
|
|
}
|
|
var buffer bytes.Buffer
|
|
err = l.s3Objects.GetObject(ctx, bucket, metaFileName, 0, oi.Size, &buffer, oi.ETag, minio.ObjectOptions{})
|
|
if err != nil {
|
|
return m, err
|
|
}
|
|
return readGWMetadata(ctx, buffer)
|
|
}
|
|
|
|
// writes dare metadata to the s3 backend
|
|
func (l *s3EncObjects) writeGWMetadata(ctx context.Context, bucket, metaFileName string, m gwMetaV1, o minio.ObjectOptions) error {
|
|
reader, err := getGWMetadata(ctx, bucket, metaFileName, m)
|
|
if err != nil {
|
|
logger.LogIf(ctx, err)
|
|
return err
|
|
}
|
|
_, err = l.s3Objects.PutObject(ctx, bucket, metaFileName, reader, o)
|
|
return err
|
|
}
|
|
|
|
// returns path of temporary metadata json file for the upload
|
|
func getTmpDareMetaPath(object, uploadID string) string {
|
|
return path.Join(getGWMetaPath(object), uploadID, gwdareMetaJSON)
|
|
}
|
|
|
|
// returns path of metadata json file for encrypted objects
|
|
func getDareMetaPath(object string) string {
|
|
return path.Join(getGWMetaPath(object), gwdareMetaJSON)
|
|
}
|
|
|
|
// returns path of temporary part metadata file for multipart uploads
|
|
func getPartMetaPath(object, uploadID string, partID int) string {
|
|
return path.Join(object, defaultMinioGWPrefix, uploadID, strconv.Itoa(partID), gwpartMetaJSON)
|
|
}
|
|
|
|
// deletes the custom dare metadata file saved at the backend
|
|
func (l *s3EncObjects) deleteGWMetadata(ctx context.Context, bucket, metaFileName string) error {
|
|
return l.s3Objects.DeleteObject(ctx, bucket, metaFileName)
|
|
}
|
|
|
|
func (l *s3EncObjects) getObject(ctx context.Context, bucket string, key string, startOffset int64, length int64, writer io.Writer, etag string, opts minio.ObjectOptions) error {
|
|
var o minio.ObjectOptions
|
|
if minio.GlobalGatewaySSE.SSEC() {
|
|
o = opts
|
|
}
|
|
dmeta, err := l.getGWMetadata(ctx, bucket, getDareMetaPath(key))
|
|
if err != nil {
|
|
// unencrypted content
|
|
return l.s3Objects.GetObject(ctx, bucket, key, startOffset, length, writer, etag, o)
|
|
}
|
|
if startOffset < 0 {
|
|
logger.LogIf(ctx, minio.InvalidRange{})
|
|
}
|
|
|
|
// For negative length read everything.
|
|
if length < 0 {
|
|
length = dmeta.Stat.Size - startOffset
|
|
}
|
|
// Reply back invalid range if the input offset and length fall out of range.
|
|
if startOffset > dmeta.Stat.Size || startOffset+length > dmeta.Stat.Size {
|
|
logger.LogIf(ctx, minio.InvalidRange{OffsetBegin: startOffset, OffsetEnd: length, ResourceSize: dmeta.Stat.Size})
|
|
return minio.InvalidRange{OffsetBegin: startOffset, OffsetEnd: length, ResourceSize: dmeta.Stat.Size}
|
|
}
|
|
// Get start part index and offset.
|
|
_, partOffset, err := dmeta.ObjectToPartOffset(ctx, startOffset)
|
|
if err != nil {
|
|
return minio.InvalidRange{OffsetBegin: startOffset, OffsetEnd: length, ResourceSize: dmeta.Stat.Size}
|
|
}
|
|
|
|
// Calculate endOffset according to length
|
|
endOffset := startOffset
|
|
if length > 0 {
|
|
endOffset += length - 1
|
|
}
|
|
|
|
// Get last part index to read given length.
|
|
if _, _, err := dmeta.ObjectToPartOffset(ctx, endOffset); err != nil {
|
|
return minio.InvalidRange{OffsetBegin: startOffset, OffsetEnd: length, ResourceSize: dmeta.Stat.Size}
|
|
}
|
|
return l.s3Objects.GetObject(ctx, bucket, key, partOffset, endOffset, writer, dmeta.ETag, o)
|
|
}
|
|
|
|
// GetObjectNInfo - returns object info and locked object ReadCloser
|
|
func (l *s3EncObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header, lockType minio.LockType, o minio.ObjectOptions) (gr *minio.GetObjectReader, err error) {
|
|
var opts minio.ObjectOptions
|
|
if minio.GlobalGatewaySSE.SSEC() {
|
|
opts = o
|
|
}
|
|
objInfo, err := l.GetObjectInfo(ctx, bucket, object, opts)
|
|
if err != nil {
|
|
return l.s3Objects.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
|
|
}
|
|
fn, off, length, err := minio.NewGetObjectReader(rs, objInfo, o)
|
|
if err != nil {
|
|
return nil, minio.ErrorRespToObjectError(err)
|
|
}
|
|
if l.isGWEncrypted(ctx, bucket, object) {
|
|
object = getGWContentPath(object)
|
|
}
|
|
pr, pw := io.Pipe()
|
|
go func() {
|
|
err := l.getObject(ctx, bucket, object, off, length, pw, objInfo.ETag, opts)
|
|
pw.CloseWithError(err)
|
|
}()
|
|
|
|
// Setup cleanup function to cause the above go-routine to
|
|
// exit in case of partial read
|
|
pipeCloser := func() { pr.Close() }
|
|
return fn(pr, h, o.CheckCopyPrecondFn, pipeCloser)
|
|
}
|
|
|
|
// GetObjectInfo reads object info and replies back ObjectInfo
|
|
// For custom gateway encrypted large objects, the ObjectInfo is retrieved from the dare.meta file.
|
|
func (l *s3EncObjects) GetObjectInfo(ctx context.Context, bucket string, object string, o minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
|
|
var opts minio.ObjectOptions
|
|
if minio.GlobalGatewaySSE.SSEC() {
|
|
opts = o
|
|
}
|
|
|
|
gwMeta, err := l.getGWMetadata(ctx, bucket, getDareMetaPath(object))
|
|
if err != nil {
|
|
return l.s3Objects.GetObjectInfo(ctx, bucket, object, opts)
|
|
}
|
|
return gwMeta.ToObjectInfo(bucket, object), nil
|
|
}
|
|
|
|
// CopyObject copies an object from source bucket to a destination bucket.
|
|
func (l *s3EncObjects) CopyObject(ctx context.Context, srcBucket string, srcObject string, dstBucket string, dstObject string, srcInfo minio.ObjectInfo, s, d minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
|
|
cpSrcDstSame := strings.EqualFold(path.Join(srcBucket, srcObject), path.Join(dstBucket, dstObject))
|
|
if cpSrcDstSame {
|
|
var gwMeta gwMetaV1
|
|
if s.ServerSideEncryption != nil && d.ServerSideEncryption != nil &&
|
|
((s.ServerSideEncryption.Type() == encrypt.SSEC && d.ServerSideEncryption.Type() == encrypt.SSEC) ||
|
|
(s.ServerSideEncryption.Type() == encrypt.S3 && d.ServerSideEncryption.Type() == encrypt.S3)) {
|
|
gwMeta, err = l.getGWMetadata(ctx, srcBucket, getDareMetaPath(srcObject))
|
|
if err != nil {
|
|
return
|
|
}
|
|
header := make(http.Header)
|
|
if d.ServerSideEncryption != nil {
|
|
d.ServerSideEncryption.Marshal(header)
|
|
}
|
|
for k, v := range header {
|
|
srcInfo.UserDefined[k] = v[0]
|
|
}
|
|
gwMeta.Meta = srcInfo.UserDefined
|
|
if err = l.writeGWMetadata(ctx, dstBucket, getDareMetaPath(dstObject), gwMeta, minio.ObjectOptions{}); err != nil {
|
|
return objInfo, minio.ErrorRespToObjectError(err)
|
|
}
|
|
return gwMeta.ToObjectInfo(dstBucket, dstObject), nil
|
|
}
|
|
}
|
|
dstOpts := minio.ObjectOptions{ServerSideEncryption: d.ServerSideEncryption, UserDefined: srcInfo.UserDefined}
|
|
return l.PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, dstOpts)
|
|
}
|
|
|
|
// DeleteObject deletes a blob in bucket
|
|
// For custom gateway encrypted large objects, cleans up encrypted content and metadata files
|
|
// from the backend.
|
|
func (l *s3EncObjects) DeleteObject(ctx context.Context, bucket string, object string) error {
|
|
|
|
// Get dare meta json
|
|
if _, err := l.getGWMetadata(ctx, bucket, getDareMetaPath(object)); err != nil {
|
|
return l.s3Objects.DeleteObject(ctx, bucket, object)
|
|
}
|
|
// delete encrypted object
|
|
l.s3Objects.DeleteObject(ctx, bucket, getGWContentPath(object))
|
|
return l.deleteGWMetadata(ctx, bucket, getDareMetaPath(object))
|
|
}
|
|
|
|
// ListMultipartUploads lists all multipart uploads.
|
|
func (l *s3EncObjects) ListMultipartUploads(ctx context.Context, bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (lmi minio.ListMultipartsInfo, e error) {
|
|
|
|
lmi, e = l.s3Objects.ListMultipartUploads(ctx, bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
|
|
if e != nil {
|
|
return
|
|
}
|
|
lmi.KeyMarker = strings.TrimSuffix(lmi.KeyMarker, getGWContentPath(minio.SlashSeparator))
|
|
lmi.NextKeyMarker = strings.TrimSuffix(lmi.NextKeyMarker, getGWContentPath(minio.SlashSeparator))
|
|
for i := range lmi.Uploads {
|
|
lmi.Uploads[i].Object = strings.TrimSuffix(lmi.Uploads[i].Object, getGWContentPath(minio.SlashSeparator))
|
|
}
|
|
return
|
|
}
|
|
|
|
// NewMultipartUpload uploads object in multiple parts
|
|
func (l *s3EncObjects) NewMultipartUpload(ctx context.Context, bucket string, object string, o minio.ObjectOptions) (uploadID string, err error) {
|
|
var sseOpts encrypt.ServerSide
|
|
if o.ServerSideEncryption == nil {
|
|
return l.s3Objects.NewMultipartUpload(ctx, bucket, object, minio.ObjectOptions{UserDefined: o.UserDefined})
|
|
}
|
|
// Decide if sse options needed to be passed to backend
|
|
if (minio.GlobalGatewaySSE.SSEC() && o.ServerSideEncryption.Type() == encrypt.SSEC) ||
|
|
(minio.GlobalGatewaySSE.SSES3() && o.ServerSideEncryption.Type() == encrypt.S3) {
|
|
sseOpts = o.ServerSideEncryption
|
|
}
|
|
|
|
uploadID, err = l.s3Objects.NewMultipartUpload(ctx, bucket, getGWContentPath(object), minio.ObjectOptions{ServerSideEncryption: sseOpts})
|
|
if err != nil {
|
|
return
|
|
}
|
|
// Create uploadID and write a temporary dare.meta object under object/uploadID prefix
|
|
gwmeta := newGWMetaV1()
|
|
gwmeta.Meta = o.UserDefined
|
|
gwmeta.Stat.ModTime = time.Now().UTC()
|
|
err = l.writeGWMetadata(ctx, bucket, getTmpDareMetaPath(object, uploadID), gwmeta, minio.ObjectOptions{})
|
|
if err != nil {
|
|
return uploadID, minio.ErrorRespToObjectError(err)
|
|
}
|
|
return uploadID, nil
|
|
}
|
|
|
|
// PutObject creates a new object with the incoming data,
|
|
func (l *s3EncObjects) PutObject(ctx context.Context, bucket string, object string, data *minio.PutObjReader, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
|
|
var sseOpts encrypt.ServerSide
|
|
// Decide if sse options needed to be passed to backend
|
|
if opts.ServerSideEncryption != nil &&
|
|
((minio.GlobalGatewaySSE.SSEC() && opts.ServerSideEncryption.Type() == encrypt.SSEC) ||
|
|
(minio.GlobalGatewaySSE.SSES3() && opts.ServerSideEncryption.Type() == encrypt.S3) ||
|
|
opts.ServerSideEncryption.Type() == encrypt.KMS) {
|
|
sseOpts = opts.ServerSideEncryption
|
|
}
|
|
if opts.ServerSideEncryption == nil {
|
|
defer l.deleteGWMetadata(ctx, bucket, getDareMetaPath(object))
|
|
defer l.DeleteObject(ctx, bucket, getGWContentPath(object))
|
|
return l.s3Objects.PutObject(ctx, bucket, object, data, minio.ObjectOptions{UserDefined: opts.UserDefined})
|
|
}
|
|
|
|
oi, err := l.s3Objects.PutObject(ctx, bucket, getGWContentPath(object), data, minio.ObjectOptions{ServerSideEncryption: sseOpts})
|
|
if err != nil {
|
|
return objInfo, minio.ErrorRespToObjectError(err)
|
|
}
|
|
|
|
gwMeta := newGWMetaV1()
|
|
gwMeta.Meta = make(map[string]string)
|
|
for k, v := range opts.UserDefined {
|
|
gwMeta.Meta[k] = v
|
|
}
|
|
encMD5 := data.MD5CurrentHexString()
|
|
|
|
gwMeta.ETag = encMD5
|
|
gwMeta.Stat.Size = oi.Size
|
|
gwMeta.Stat.ModTime = time.Now().UTC()
|
|
if err = l.writeGWMetadata(ctx, bucket, getDareMetaPath(object), gwMeta, minio.ObjectOptions{}); err != nil {
|
|
return objInfo, minio.ErrorRespToObjectError(err)
|
|
}
|
|
objInfo = gwMeta.ToObjectInfo(bucket, object)
|
|
// delete any unencrypted content of the same name created previously
|
|
l.s3Objects.DeleteObject(ctx, bucket, object)
|
|
return objInfo, nil
|
|
}
|
|
|
|
// PutObjectPart puts a part of object in bucket
|
|
func (l *s3EncObjects) PutObjectPart(ctx context.Context, bucket string, object string, uploadID string, partID int, data *minio.PutObjReader, opts minio.ObjectOptions) (pi minio.PartInfo, e error) {
|
|
|
|
if opts.ServerSideEncryption == nil {
|
|
return l.s3Objects.PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts)
|
|
}
|
|
|
|
var s3Opts minio.ObjectOptions
|
|
// for sse-s3 encryption options should not be passed to backend
|
|
if opts.ServerSideEncryption != nil && opts.ServerSideEncryption.Type() == encrypt.SSEC && minio.GlobalGatewaySSE.SSEC() {
|
|
s3Opts = opts
|
|
}
|
|
|
|
uploadPath := getTmpGWMetaPath(object, uploadID)
|
|
tmpDareMeta := path.Join(uploadPath, gwdareMetaJSON)
|
|
_, err := l.s3Objects.GetObjectInfo(ctx, bucket, tmpDareMeta, minio.ObjectOptions{})
|
|
if err != nil {
|
|
return pi, minio.InvalidUploadID{UploadID: uploadID}
|
|
}
|
|
|
|
pi, e = l.s3Objects.PutObjectPart(ctx, bucket, getGWContentPath(object), uploadID, partID, data, s3Opts)
|
|
if e != nil {
|
|
return
|
|
}
|
|
gwMeta := newGWMetaV1()
|
|
gwMeta.Parts = make([]minio.ObjectPartInfo, 1)
|
|
// Add incoming part.
|
|
gwMeta.Parts[0] = minio.ObjectPartInfo{
|
|
Number: partID,
|
|
ETag: pi.ETag,
|
|
Size: pi.Size,
|
|
}
|
|
gwMeta.ETag = data.MD5CurrentHexString() // encrypted ETag
|
|
gwMeta.Stat.Size = pi.Size
|
|
gwMeta.Stat.ModTime = pi.LastModified
|
|
|
|
if err = l.writeGWMetadata(ctx, bucket, getPartMetaPath(object, uploadID, partID), gwMeta, minio.ObjectOptions{}); err != nil {
|
|
return pi, minio.ErrorRespToObjectError(err)
|
|
}
|
|
return minio.PartInfo{
|
|
Size: gwMeta.Stat.Size,
|
|
ETag: minio.CanonicalizeETag(gwMeta.ETag),
|
|
LastModified: gwMeta.Stat.ModTime,
|
|
PartNumber: partID,
|
|
}, nil
|
|
}
|
|
|
|
// CopyObjectPart creates a part in a multipart upload by copying
|
|
// existing object or a part of it.
|
|
func (l *s3EncObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, destBucket, destObject, uploadID string,
|
|
partID int, startOffset, length int64, srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions) (p minio.PartInfo, err error) {
|
|
return l.PutObjectPart(ctx, destBucket, destObject, uploadID, partID, srcInfo.PutObjReader, dstOpts)
|
|
}
|
|
|
|
// GetMultipartInfo returns multipart info of the uploadId of the object
|
|
func (l *s3EncObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts minio.ObjectOptions) (result minio.MultipartInfo, err error) {
|
|
result.Bucket = bucket
|
|
result.Object = object
|
|
result.UploadID = uploadID
|
|
// We do not store parts uploaded so far in the dare.meta. Only CompleteMultipartUpload finalizes the parts under upload prefix.Otherwise,
|
|
// there could be situations of dare.meta getting corrupted by competing upload parts.
|
|
dm, err := l.getGWMetadata(ctx, bucket, getTmpDareMetaPath(object, uploadID))
|
|
if err != nil {
|
|
return l.s3Objects.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
|
|
}
|
|
result.UserDefined = dm.ToObjectInfo(bucket, object).UserDefined
|
|
return result, nil
|
|
}
|
|
|
|
// ListObjectParts returns all object parts for specified object in specified bucket
|
|
func (l *s3EncObjects) ListObjectParts(ctx context.Context, bucket string, object string, uploadID string, partNumberMarker int, maxParts int, opts minio.ObjectOptions) (lpi minio.ListPartsInfo, e error) {
|
|
// We do not store parts uploaded so far in the dare.meta. Only CompleteMultipartUpload finalizes the parts under upload prefix.Otherwise,
|
|
// there could be situations of dare.meta getting corrupted by competing upload parts.
|
|
dm, err := l.getGWMetadata(ctx, bucket, getTmpDareMetaPath(object, uploadID))
|
|
if err != nil {
|
|
return l.s3Objects.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts)
|
|
}
|
|
|
|
lpi, err = l.s3Objects.ListObjectParts(ctx, bucket, getGWContentPath(object), uploadID, partNumberMarker, maxParts, opts)
|
|
if err != nil {
|
|
return lpi, err
|
|
}
|
|
for i, part := range lpi.Parts {
|
|
partMeta, err := l.getGWMetadata(ctx, bucket, getPartMetaPath(object, uploadID, part.PartNumber))
|
|
if err != nil || len(partMeta.Parts) == 0 {
|
|
return lpi, minio.InvalidPart{}
|
|
}
|
|
lpi.Parts[i].ETag = partMeta.ETag
|
|
}
|
|
lpi.UserDefined = dm.ToObjectInfo(bucket, object).UserDefined
|
|
lpi.Object = object
|
|
return lpi, nil
|
|
}
|
|
|
|
// AbortMultipartUpload aborts a ongoing multipart upload
|
|
func (l *s3EncObjects) AbortMultipartUpload(ctx context.Context, bucket string, object string, uploadID string) error {
|
|
if _, err := l.getGWMetadata(ctx, bucket, getTmpDareMetaPath(object, uploadID)); err != nil {
|
|
return l.s3Objects.AbortMultipartUpload(ctx, bucket, object, uploadID)
|
|
}
|
|
|
|
if err := l.s3Objects.AbortMultipartUpload(ctx, bucket, getGWContentPath(object), uploadID); err != nil {
|
|
return err
|
|
}
|
|
|
|
uploadPrefix := getTmpGWMetaPath(object, uploadID)
|
|
var continuationToken, startAfter, delimiter string
|
|
for {
|
|
loi, err := l.s3Objects.ListObjectsV2(ctx, bucket, uploadPrefix, continuationToken, delimiter, 1000, false, startAfter)
|
|
if err != nil {
|
|
return minio.InvalidUploadID{UploadID: uploadID}
|
|
}
|
|
for _, obj := range loi.Objects {
|
|
if err := l.s3Objects.DeleteObject(ctx, bucket, obj.Name); err != nil {
|
|
return minio.ErrorRespToObjectError(err)
|
|
}
|
|
startAfter = obj.Name
|
|
}
|
|
continuationToken = loi.NextContinuationToken
|
|
if !loi.IsTruncated {
|
|
break
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// CompleteMultipartUpload completes ongoing multipart upload and finalizes object
|
|
func (l *s3EncObjects) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []minio.CompletePart, opts minio.ObjectOptions) (oi minio.ObjectInfo, e error) {
|
|
|
|
tmpMeta, err := l.getGWMetadata(ctx, bucket, getTmpDareMetaPath(object, uploadID))
|
|
if err != nil {
|
|
oi, e = l.s3Objects.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts)
|
|
if e == nil {
|
|
// delete any encrypted version of object that might exist
|
|
defer l.deleteGWMetadata(ctx, bucket, getDareMetaPath(object))
|
|
defer l.DeleteObject(ctx, bucket, getGWContentPath(object))
|
|
}
|
|
return oi, e
|
|
}
|
|
gwMeta := newGWMetaV1()
|
|
gwMeta.Meta = make(map[string]string)
|
|
for k, v := range tmpMeta.Meta {
|
|
gwMeta.Meta[k] = v
|
|
}
|
|
// Allocate parts similar to incoming slice.
|
|
gwMeta.Parts = make([]minio.ObjectPartInfo, len(uploadedParts))
|
|
|
|
bkUploadedParts := make([]minio.CompletePart, len(uploadedParts))
|
|
// Calculate full object size.
|
|
var objectSize int64
|
|
|
|
// Validate each part and then commit to disk.
|
|
for i, part := range uploadedParts {
|
|
partMeta, err := l.getGWMetadata(ctx, bucket, getPartMetaPath(object, uploadID, part.PartNumber))
|
|
if err != nil || len(partMeta.Parts) == 0 {
|
|
return oi, minio.InvalidPart{}
|
|
}
|
|
bkUploadedParts[i] = minio.CompletePart{PartNumber: part.PartNumber, ETag: partMeta.Parts[0].ETag}
|
|
gwMeta.Parts[i] = partMeta.Parts[0]
|
|
objectSize += partMeta.Parts[0].Size
|
|
}
|
|
oi, e = l.s3Objects.CompleteMultipartUpload(ctx, bucket, getGWContentPath(object), uploadID, bkUploadedParts, opts)
|
|
if e != nil {
|
|
return oi, e
|
|
}
|
|
|
|
//delete any unencrypted version of object that might be on the backend
|
|
defer l.s3Objects.DeleteObject(ctx, bucket, object)
|
|
|
|
// Save the final object size and modtime.
|
|
gwMeta.Stat.Size = objectSize
|
|
gwMeta.Stat.ModTime = time.Now().UTC()
|
|
gwMeta.ETag = oi.ETag
|
|
|
|
if err = l.writeGWMetadata(ctx, bucket, getDareMetaPath(object), gwMeta, minio.ObjectOptions{}); err != nil {
|
|
return oi, minio.ErrorRespToObjectError(err)
|
|
}
|
|
// Clean up any uploaded parts that are not being committed by this CompleteMultipart operation
|
|
var continuationToken, startAfter, delimiter string
|
|
uploadPrefix := getTmpGWMetaPath(object, uploadID)
|
|
done := false
|
|
for {
|
|
loi, lerr := l.s3Objects.ListObjectsV2(ctx, bucket, uploadPrefix, continuationToken, delimiter, 1000, false, startAfter)
|
|
if lerr != nil {
|
|
break
|
|
}
|
|
for _, obj := range loi.Objects {
|
|
if !strings.HasPrefix(obj.Name, uploadPrefix) {
|
|
done = true
|
|
break
|
|
}
|
|
startAfter = obj.Name
|
|
l.s3Objects.DeleteObject(ctx, bucket, obj.Name)
|
|
}
|
|
continuationToken = loi.NextContinuationToken
|
|
if !loi.IsTruncated || done {
|
|
break
|
|
}
|
|
}
|
|
|
|
return gwMeta.ToObjectInfo(bucket, object), nil
|
|
}
|
|
|
|
// getTmpGWMetaPath returns the prefix under which uploads in progress are stored on backend
|
|
func getTmpGWMetaPath(object, uploadID string) string {
|
|
return path.Join(object, defaultMinioGWPrefix, uploadID)
|
|
}
|
|
|
|
// getGWMetaPath returns the prefix under which custom object metadata and object are stored on backend after upload completes
|
|
func getGWMetaPath(object string) string {
|
|
return path.Join(object, defaultMinioGWPrefix)
|
|
}
|
|
|
|
// getGWContentPath returns the prefix under which custom object is stored on backend after upload completes
|
|
func getGWContentPath(object string) string {
|
|
return path.Join(object, defaultMinioGWPrefix, defaultGWContentFileName)
|
|
}
|
|
|
|
// Clean-up the stale incomplete encrypted multipart uploads. Should be run in a Go routine.
|
|
func (l *s3EncObjects) cleanupStaleEncMultipartUploads(ctx context.Context, cleanupInterval, expiry time.Duration) {
|
|
ticker := time.NewTicker(cleanupInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
l.cleanupStaleEncMultipartUploadsOnGW(ctx, expiry)
|
|
}
|
|
}
|
|
}
|
|
|
|
// cleanupStaleMultipartUploads removes old custom encryption multipart uploads on backend
|
|
func (l *s3EncObjects) cleanupStaleEncMultipartUploadsOnGW(ctx context.Context, expiry time.Duration) {
|
|
for {
|
|
buckets, err := l.s3Objects.ListBuckets(ctx)
|
|
if err != nil {
|
|
break
|
|
}
|
|
for _, b := range buckets {
|
|
expParts := l.getStalePartsForBucket(ctx, b.Name, expiry)
|
|
for k := range expParts {
|
|
l.s3Objects.DeleteObject(ctx, b.Name, k)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (l *s3EncObjects) getStalePartsForBucket(ctx context.Context, bucket string, expiry time.Duration) (expParts map[string]string) {
|
|
var prefix, continuationToken, delimiter, startAfter string
|
|
expParts = make(map[string]string)
|
|
now := time.Now()
|
|
for {
|
|
loi, err := l.s3Objects.ListObjectsV2(ctx, bucket, prefix, continuationToken, delimiter, 1000, false, startAfter)
|
|
if err != nil {
|
|
break
|
|
}
|
|
for _, obj := range loi.Objects {
|
|
startAfter = obj.Name
|
|
if !strings.Contains(obj.Name, defaultMinioGWPrefix) {
|
|
continue
|
|
}
|
|
|
|
if isGWObject(obj.Name) {
|
|
continue
|
|
}
|
|
|
|
// delete temporary part.meta or dare.meta files for incomplete uploads that are past expiry
|
|
if (strings.HasSuffix(obj.Name, gwpartMetaJSON) || strings.HasSuffix(obj.Name, gwdareMetaJSON)) &&
|
|
now.Sub(obj.ModTime) > expiry {
|
|
expParts[obj.Name] = ""
|
|
}
|
|
}
|
|
continuationToken = loi.NextContinuationToken
|
|
if !loi.IsTruncated {
|
|
break
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (l *s3EncObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
|
|
var prefix, continuationToken, delimiter, startAfter string
|
|
expParts := make(map[string]string)
|
|
|
|
for {
|
|
loi, err := l.s3Objects.ListObjectsV2(ctx, bucket, prefix, continuationToken, delimiter, 1000, false, startAfter)
|
|
if err != nil {
|
|
break
|
|
}
|
|
for _, obj := range loi.Objects {
|
|
startAfter = obj.Name
|
|
if !strings.Contains(obj.Name, defaultMinioGWPrefix) {
|
|
return minio.BucketNotEmpty{}
|
|
}
|
|
if isGWObject(obj.Name) {
|
|
return minio.BucketNotEmpty{}
|
|
}
|
|
// delete temporary part.meta or dare.meta files for incomplete uploads
|
|
if strings.HasSuffix(obj.Name, gwpartMetaJSON) || strings.HasSuffix(obj.Name, gwdareMetaJSON) {
|
|
expParts[obj.Name] = ""
|
|
}
|
|
}
|
|
continuationToken = loi.NextContinuationToken
|
|
if !loi.IsTruncated {
|
|
break
|
|
}
|
|
}
|
|
for k := range expParts {
|
|
l.s3Objects.DeleteObject(ctx, bucket, k)
|
|
}
|
|
err := l.Client.RemoveBucket(bucket)
|
|
if err != nil {
|
|
return minio.ErrorRespToObjectError(err, bucket)
|
|
}
|
|
return nil
|
|
}
|