/*
 * MinIO Object Storage (c) 2021 MinIO, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package s3

import (
	"bytes"
	"context"
	"io"
	"net/http"
	"path"
	"strconv"
	"strings"
	"time"

	"github.com/minio/minio-go/v7/pkg/encrypt"
	minio "github.com/minio/minio/cmd"

	"github.com/minio/minio/internal/logger"
)

const (
	// name of custom multipart metadata file for s3 backend.
	gwdareMetaJSON string = "dare.meta"

	// name of temporary per part metadata file
	gwpartMetaJSON string = "part.meta"
	// custom multipart files are stored under the defaultMinioGWPrefix
	defaultMinioGWPrefix     = ".minio"
	defaultGWContentFileName = "data"
)

// s3EncObjects is a wrapper around s3Objects and implements gateway calls for
// custom large objects encrypted at the gateway
type s3EncObjects struct {
	s3Objects
}

/*
 NOTE:
 Custom gateway encrypted objects are stored on backend as follows:
	 obj/.minio/data   <= encrypted content
	 obj/.minio/dare.meta  <= metadata

 When a multipart upload operation is in progress, the metadata set during
 NewMultipartUpload is stored in obj/.minio/uploadID/dare.meta and each
 UploadPart operation saves additional state of the part's encrypted ETag and
 encrypted size in obj/.minio/uploadID/part1/part.meta

 All the part metadata and temp dare.meta are cleaned up when upload completes
*/

// ListObjects lists all blobs in S3 bucket filtered by prefix
func (l *s3EncObjects) ListObjects(ctx context.Context, bucket string, prefix string, marker string, delimiter string, maxKeys int) (loi minio.ListObjectsInfo, e error) {
	var startAfter string
	res, err := l.ListObjectsV2(ctx, bucket, prefix, marker, delimiter, maxKeys, false, startAfter)
	if err != nil {
		return loi, err
	}
	loi.IsTruncated = res.IsTruncated
	loi.NextMarker = res.NextContinuationToken
	loi.Objects = res.Objects
	loi.Prefixes = res.Prefixes
	return loi, nil

}

// ListObjectsV2 lists all blobs in S3 bucket filtered by prefix
func (l *s3EncObjects) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (loi minio.ListObjectsV2Info, e error) {

	var objects []minio.ObjectInfo
	var prefixes []string
	var isTruncated bool

	// filter out objects that contain a .minio prefix, but is not a dare.meta metadata file.
	for {
		loi, e = l.s3Objects.ListObjectsV2(ctx, bucket, prefix, continuationToken, delimiter, 1000, fetchOwner, startAfter)
		if e != nil {
			return loi, minio.ErrorRespToObjectError(e, bucket)
		}

		continuationToken = loi.NextContinuationToken
		isTruncated = loi.IsTruncated

		for _, obj := range loi.Objects {
			startAfter = obj.Name

			if !isGWObject(obj.Name) {
				continue
			}
			// get objectname and ObjectInfo from the custom metadata file
			if strings.HasSuffix(obj.Name, gwdareMetaJSON) {
				objSlice := strings.Split(obj.Name, minio.SlashSeparator+defaultMinioGWPrefix)
				gwMeta, e := l.getGWMetadata(ctx, bucket, getDareMetaPath(objSlice[0]))
				if e != nil {
					continue
				}
				oInfo := gwMeta.ToObjectInfo(bucket, objSlice[0])
				objects = append(objects, oInfo)
			} else {
				objects = append(objects, obj)
			}
			if maxKeys > 0 && len(objects) > maxKeys {
				break
			}
		}
		for _, p := range loi.Prefixes {
			objName := strings.TrimSuffix(p, minio.SlashSeparator)
			gm, err := l.getGWMetadata(ctx, bucket, getDareMetaPath(objName))
			// if prefix is actually a custom multi-part object, append it to objects
			if err == nil {
				objects = append(objects, gm.ToObjectInfo(bucket, objName))
				continue
			}
			isPrefix := l.isPrefix(ctx, bucket, p, fetchOwner, startAfter)
			if isPrefix {
				prefixes = append(prefixes, p)
			}
		}
		if (maxKeys > 0 && len(objects) > maxKeys) || !loi.IsTruncated {
			break
		}
	}

	loi.IsTruncated = isTruncated
	loi.ContinuationToken = continuationToken
	loi.Objects = make([]minio.ObjectInfo, 0)
	loi.Prefixes = make([]string, 0)
	loi.Objects = append(loi.Objects, objects...)

	for _, pfx := range prefixes {
		if pfx != prefix {
			loi.Prefixes = append(loi.Prefixes, pfx)
		}
	}
	// Set continuation token if s3 returned truncated list
	if isTruncated {
		if len(objects) > 0 {
			loi.NextContinuationToken = objects[len(objects)-1].Name
		}
	}
	return loi, nil
}

// isGWObject returns true if it is a custom object
func isGWObject(objName string) bool {
	isEncrypted := strings.Contains(objName, defaultMinioGWPrefix)
	if !isEncrypted {
		return true
	}
	// ignore temp part.meta files
	if strings.Contains(objName, gwpartMetaJSON) {
		return false
	}

	pfxSlice := strings.Split(objName, minio.SlashSeparator)
	var i1, i2 int
	for i := len(pfxSlice) - 1; i >= 0; i-- {
		p := pfxSlice[i]
		if p == defaultMinioGWPrefix {
			i1 = i
		}
		if p == gwdareMetaJSON {
			i2 = i
		}
		if i1 > 0 && i2 > 0 {
			break
		}
	}
	// incomplete uploads would have a uploadID between defaultMinioGWPrefix and gwdareMetaJSON
	return i2 > 0 && i1 > 0 && i2-i1 == 1
}

// isPrefix returns true if prefix exists and is not an incomplete multipart upload entry
func (l *s3EncObjects) isPrefix(ctx context.Context, bucket, prefix string, fetchOwner bool, startAfter string) bool {
	var continuationToken, delimiter string

	for {
		loi, e := l.s3Objects.ListObjectsV2(ctx, bucket, prefix, continuationToken, delimiter, 1000, fetchOwner, startAfter)
		if e != nil {
			return false
		}
		for _, obj := range loi.Objects {
			if isGWObject(obj.Name) {
				return true
			}
		}

		continuationToken = loi.NextContinuationToken
		if !loi.IsTruncated {
			break
		}
	}
	return false
}

// GetObject reads an object from S3. Supports additional
// parameters like offset and length which are synonymous with
// HTTP Range requests.
func (l *s3EncObjects) GetObject(ctx context.Context, bucket string, key string, startOffset int64, length int64, writer io.Writer, etag string, opts minio.ObjectOptions) error {
	return l.getObject(ctx, bucket, key, startOffset, length, writer, etag, opts)
}

func (l *s3EncObjects) isGWEncrypted(ctx context.Context, bucket, object string) bool {
	_, err := l.s3Objects.GetObjectInfo(ctx, bucket, getDareMetaPath(object), minio.ObjectOptions{})
	return err == nil
}

// getDaremetadata fetches dare.meta from s3 backend and marshals into a structured format.
func (l *s3EncObjects) getGWMetadata(ctx context.Context, bucket, metaFileName string) (m gwMetaV1, err error) {
	oi, err1 := l.s3Objects.GetObjectInfo(ctx, bucket, metaFileName, minio.ObjectOptions{})
	if err1 != nil {
		return m, err1
	}
	var buffer bytes.Buffer
	err = l.s3Objects.getObject(ctx, bucket, metaFileName, 0, oi.Size, &buffer, oi.ETag, minio.ObjectOptions{})
	if err != nil {
		return m, err
	}
	return readGWMetadata(ctx, buffer)
}

// writes dare metadata to the s3 backend
func (l *s3EncObjects) writeGWMetadata(ctx context.Context, bucket, metaFileName string, m gwMetaV1, o minio.ObjectOptions) error {
	reader, err := getGWMetadata(ctx, bucket, metaFileName, m)
	if err != nil {
		logger.LogIf(ctx, err)
		return err
	}
	_, err = l.s3Objects.PutObject(ctx, bucket, metaFileName, reader, o)
	return err
}

// returns path of temporary metadata json file for the upload
func getTmpDareMetaPath(object, uploadID string) string {
	return path.Join(getGWMetaPath(object), uploadID, gwdareMetaJSON)
}

// returns path of metadata json file for encrypted objects
func getDareMetaPath(object string) string {
	return path.Join(getGWMetaPath(object), gwdareMetaJSON)
}

// returns path of temporary part metadata file for multipart uploads
func getPartMetaPath(object, uploadID string, partID int) string {
	return path.Join(object, defaultMinioGWPrefix, uploadID, strconv.Itoa(partID), gwpartMetaJSON)
}

// deletes the custom dare metadata file saved at the backend
func (l *s3EncObjects) deleteGWMetadata(ctx context.Context, bucket, metaFileName string) (minio.ObjectInfo, error) {
	return l.s3Objects.DeleteObject(ctx, bucket, metaFileName, minio.ObjectOptions{})
}

func (l *s3EncObjects) getObject(ctx context.Context, bucket string, key string, startOffset int64, length int64, writer io.Writer, etag string, opts minio.ObjectOptions) error {
	var o minio.ObjectOptions
	if minio.GlobalGatewaySSE.SSEC() {
		o = opts
	}
	dmeta, err := l.getGWMetadata(ctx, bucket, getDareMetaPath(key))
	if err != nil {
		// unencrypted content
		return l.s3Objects.getObject(ctx, bucket, key, startOffset, length, writer, etag, o)
	}
	if startOffset < 0 {
		logger.LogIf(ctx, minio.InvalidRange{})
	}

	// For negative length read everything.
	if length < 0 {
		length = dmeta.Stat.Size - startOffset
	}
	// Reply back invalid range if the input offset and length fall out of range.
	if startOffset > dmeta.Stat.Size || startOffset+length > dmeta.Stat.Size {
		logger.LogIf(ctx, minio.InvalidRange{OffsetBegin: startOffset, OffsetEnd: length, ResourceSize: dmeta.Stat.Size})
		return minio.InvalidRange{OffsetBegin: startOffset, OffsetEnd: length, ResourceSize: dmeta.Stat.Size}
	}
	// Get start part index and offset.
	_, partOffset, err := dmeta.ObjectToPartOffset(ctx, startOffset)
	if err != nil {
		return minio.InvalidRange{OffsetBegin: startOffset, OffsetEnd: length, ResourceSize: dmeta.Stat.Size}
	}

	// Calculate endOffset according to length
	endOffset := startOffset
	if length > 0 {
		endOffset += length - 1
	}

	// Get last part index to read given length.
	if _, _, err := dmeta.ObjectToPartOffset(ctx, endOffset); err != nil {
		return minio.InvalidRange{OffsetBegin: startOffset, OffsetEnd: length, ResourceSize: dmeta.Stat.Size}
	}
	return l.s3Objects.getObject(ctx, bucket, key, partOffset, endOffset, writer, dmeta.ETag, o)
}

// GetObjectNInfo - returns object info and locked object ReadCloser
func (l *s3EncObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header, lockType minio.LockType, o minio.ObjectOptions) (gr *minio.GetObjectReader, err error) {
	var opts minio.ObjectOptions
	if minio.GlobalGatewaySSE.SSEC() {
		opts = o
	}
	objInfo, err := l.GetObjectInfo(ctx, bucket, object, opts)
	if err != nil {
		return l.s3Objects.GetObjectNInfo(ctx, bucket, object, rs, h, lockType, opts)
	}
	fn, off, length, err := minio.NewGetObjectReader(rs, objInfo, opts)
	if err != nil {
		return nil, minio.ErrorRespToObjectError(err, bucket, object)
	}
	if l.isGWEncrypted(ctx, bucket, object) {
		object = getGWContentPath(object)
	}
	pr, pw := io.Pipe()
	go func() {
		// Do not set an `If-Match` header for the ETag when
		// the ETag is encrypted. The ETag at the backend never
		// matches an encrypted ETag and there is in any case
		// no way to make two consecutive S3 calls safe for concurrent
		// access.
		// However,  the encrypted object changes concurrently then the
		// gateway will not be able to decrypt it since the key (obtained
		// from dare.meta) will not work for any new created object. Therefore,
		// we will in any case not return invalid data to the client.
		etag := objInfo.ETag
		if len(etag) > 32 && strings.Count(etag, "-") == 0 {
			etag = ""
		}
		err := l.getObject(ctx, bucket, object, off, length, pw, etag, opts)
		pw.CloseWithError(err)
	}()

	// Setup cleanup function to cause the above go-routine to
	// exit in case of partial read
	pipeCloser := func() { pr.Close() }
	return fn(pr, h, pipeCloser)
}

// GetObjectInfo reads object info and replies back ObjectInfo
// For custom gateway encrypted large objects, the ObjectInfo is retrieved from the dare.meta file.
func (l *s3EncObjects) GetObjectInfo(ctx context.Context, bucket string, object string, o minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
	var opts minio.ObjectOptions
	if minio.GlobalGatewaySSE.SSEC() {
		opts = o
	}

	gwMeta, err := l.getGWMetadata(ctx, bucket, getDareMetaPath(object))
	if err != nil {
		return l.s3Objects.GetObjectInfo(ctx, bucket, object, opts)
	}
	return gwMeta.ToObjectInfo(bucket, object), nil
}

// CopyObject copies an object from source bucket to a destination bucket.
func (l *s3EncObjects) CopyObject(ctx context.Context, srcBucket string, srcObject string, dstBucket string, dstObject string, srcInfo minio.ObjectInfo, s, d minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
	cpSrcDstSame := path.Join(srcBucket, srcObject) == path.Join(dstBucket, dstObject)
	if cpSrcDstSame {
		var gwMeta gwMetaV1
		if s.ServerSideEncryption != nil && d.ServerSideEncryption != nil &&
			((s.ServerSideEncryption.Type() == encrypt.SSEC && d.ServerSideEncryption.Type() == encrypt.SSEC) ||
				(s.ServerSideEncryption.Type() == encrypt.S3 && d.ServerSideEncryption.Type() == encrypt.S3)) {
			gwMeta, err = l.getGWMetadata(ctx, srcBucket, getDareMetaPath(srcObject))
			if err != nil {
				return
			}
			header := make(http.Header)
			if d.ServerSideEncryption != nil {
				d.ServerSideEncryption.Marshal(header)
			}
			for k, v := range header {
				srcInfo.UserDefined[k] = v[0]
			}
			gwMeta.Meta = srcInfo.UserDefined
			if err = l.writeGWMetadata(ctx, dstBucket, getDareMetaPath(dstObject), gwMeta, minio.ObjectOptions{}); err != nil {
				return objInfo, minio.ErrorRespToObjectError(err)
			}
			return gwMeta.ToObjectInfo(dstBucket, dstObject), nil
		}
	}
	dstOpts := minio.ObjectOptions{ServerSideEncryption: d.ServerSideEncryption, UserDefined: srcInfo.UserDefined}
	return l.PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, dstOpts)
}

// DeleteObject deletes a blob in bucket
// For custom gateway encrypted large objects, cleans up encrypted content and metadata files
// from the backend.
func (l *s3EncObjects) DeleteObject(ctx context.Context, bucket string, object string, opts minio.ObjectOptions) (minio.ObjectInfo, error) {
	// Get dare meta json
	if _, err := l.getGWMetadata(ctx, bucket, getDareMetaPath(object)); err != nil {
		logger.LogIf(minio.GlobalContext, err)
		return l.s3Objects.DeleteObject(ctx, bucket, object, opts)
	}
	// delete encrypted object
	l.s3Objects.DeleteObject(ctx, bucket, getGWContentPath(object), opts)
	return l.deleteGWMetadata(ctx, bucket, getDareMetaPath(object))
}

func (l *s3EncObjects) DeleteObjects(ctx context.Context, bucket string, objects []minio.ObjectToDelete, opts minio.ObjectOptions) ([]minio.DeletedObject, []error) {
	errs := make([]error, len(objects))
	dobjects := make([]minio.DeletedObject, len(objects))
	for idx, object := range objects {
		_, errs[idx] = l.DeleteObject(ctx, bucket, object.ObjectName, opts)
		if errs[idx] == nil {
			dobjects[idx] = minio.DeletedObject{
				ObjectName: object.ObjectName,
			}
		}
	}
	return dobjects, errs
}

// ListMultipartUploads lists all multipart uploads.
func (l *s3EncObjects) ListMultipartUploads(ctx context.Context, bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (lmi minio.ListMultipartsInfo, e error) {

	lmi, e = l.s3Objects.ListMultipartUploads(ctx, bucket, prefix, keyMarker, uploadIDMarker, delimiter, maxUploads)
	if e != nil {
		return
	}
	lmi.KeyMarker = strings.TrimSuffix(lmi.KeyMarker, getGWContentPath(minio.SlashSeparator))
	lmi.NextKeyMarker = strings.TrimSuffix(lmi.NextKeyMarker, getGWContentPath(minio.SlashSeparator))
	for i := range lmi.Uploads {
		lmi.Uploads[i].Object = strings.TrimSuffix(lmi.Uploads[i].Object, getGWContentPath(minio.SlashSeparator))
	}
	return
}

// NewMultipartUpload uploads object in multiple parts
func (l *s3EncObjects) NewMultipartUpload(ctx context.Context, bucket string, object string, o minio.ObjectOptions) (uploadID string, err error) {
	var sseOpts encrypt.ServerSide
	if o.ServerSideEncryption == nil {
		return l.s3Objects.NewMultipartUpload(ctx, bucket, object, minio.ObjectOptions{UserDefined: o.UserDefined})
	}
	// Decide if sse options needed to be passed to backend
	if (minio.GlobalGatewaySSE.SSEC() && o.ServerSideEncryption.Type() == encrypt.SSEC) ||
		(minio.GlobalGatewaySSE.SSES3() && o.ServerSideEncryption.Type() == encrypt.S3) {
		sseOpts = o.ServerSideEncryption
	}

	uploadID, err = l.s3Objects.NewMultipartUpload(ctx, bucket, getGWContentPath(object), minio.ObjectOptions{ServerSideEncryption: sseOpts})
	if err != nil {
		return
	}
	// Create uploadID and write a temporary dare.meta object under object/uploadID prefix
	gwmeta := newGWMetaV1()
	gwmeta.Meta = o.UserDefined
	gwmeta.Stat.ModTime = time.Now().UTC()
	err = l.writeGWMetadata(ctx, bucket, getTmpDareMetaPath(object, uploadID), gwmeta, minio.ObjectOptions{})
	if err != nil {
		return uploadID, minio.ErrorRespToObjectError(err)
	}
	return uploadID, nil
}

// PutObject creates a new object with the incoming data,
func (l *s3EncObjects) PutObject(ctx context.Context, bucket string, object string, data *minio.PutObjReader, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
	var sseOpts encrypt.ServerSide
	// Decide if sse options needed to be passed to backend
	if opts.ServerSideEncryption != nil &&
		((minio.GlobalGatewaySSE.SSEC() && opts.ServerSideEncryption.Type() == encrypt.SSEC) ||
			(minio.GlobalGatewaySSE.SSES3() && opts.ServerSideEncryption.Type() == encrypt.S3) ||
			opts.ServerSideEncryption.Type() == encrypt.KMS) {
		sseOpts = opts.ServerSideEncryption
	}
	if opts.ServerSideEncryption == nil {
		defer l.deleteGWMetadata(ctx, bucket, getDareMetaPath(object))
		defer l.DeleteObject(ctx, bucket, getGWContentPath(object), opts)
		return l.s3Objects.PutObject(ctx, bucket, object, data, minio.ObjectOptions{UserDefined: opts.UserDefined})
	}

	oi, err := l.s3Objects.PutObject(ctx, bucket, getGWContentPath(object), data, minio.ObjectOptions{ServerSideEncryption: sseOpts})
	if err != nil {
		return objInfo, minio.ErrorRespToObjectError(err)
	}

	gwMeta := newGWMetaV1()
	gwMeta.Meta = make(map[string]string)
	for k, v := range opts.UserDefined {
		gwMeta.Meta[k] = v
	}
	encMD5 := data.MD5CurrentHexString()

	gwMeta.ETag = encMD5
	gwMeta.Stat.Size = oi.Size
	gwMeta.Stat.ModTime = time.Now().UTC()
	if err = l.writeGWMetadata(ctx, bucket, getDareMetaPath(object), gwMeta, minio.ObjectOptions{}); err != nil {
		return objInfo, minio.ErrorRespToObjectError(err)
	}
	objInfo = gwMeta.ToObjectInfo(bucket, object)
	// delete any unencrypted content of the same name created previously
	l.s3Objects.DeleteObject(ctx, bucket, object, opts)
	return objInfo, nil
}

// PutObjectPart puts a part of object in bucket
func (l *s3EncObjects) PutObjectPart(ctx context.Context, bucket string, object string, uploadID string, partID int, data *minio.PutObjReader, opts minio.ObjectOptions) (pi minio.PartInfo, e error) {

	if opts.ServerSideEncryption == nil {
		return l.s3Objects.PutObjectPart(ctx, bucket, object, uploadID, partID, data, opts)
	}

	var s3Opts minio.ObjectOptions
	// for sse-s3 encryption options should not be passed to backend
	if opts.ServerSideEncryption != nil && opts.ServerSideEncryption.Type() == encrypt.SSEC && minio.GlobalGatewaySSE.SSEC() {
		s3Opts = opts
	}

	uploadPath := getTmpGWMetaPath(object, uploadID)
	tmpDareMeta := path.Join(uploadPath, gwdareMetaJSON)
	_, err := l.s3Objects.GetObjectInfo(ctx, bucket, tmpDareMeta, minio.ObjectOptions{})
	if err != nil {
		return pi, minio.InvalidUploadID{UploadID: uploadID}
	}

	pi, e = l.s3Objects.PutObjectPart(ctx, bucket, getGWContentPath(object), uploadID, partID, data, s3Opts)
	if e != nil {
		return
	}
	gwMeta := newGWMetaV1()
	gwMeta.Parts = make([]minio.ObjectPartInfo, 1)
	// Add incoming part.
	gwMeta.Parts[0] = minio.ObjectPartInfo{
		Number: partID,
		ETag:   pi.ETag,
		Size:   pi.Size,
	}
	gwMeta.ETag = data.MD5CurrentHexString() // encrypted ETag
	gwMeta.Stat.Size = pi.Size
	gwMeta.Stat.ModTime = pi.LastModified

	if err = l.writeGWMetadata(ctx, bucket, getPartMetaPath(object, uploadID, partID), gwMeta, minio.ObjectOptions{}); err != nil {
		return pi, minio.ErrorRespToObjectError(err)
	}
	return minio.PartInfo{
		Size:         gwMeta.Stat.Size,
		ETag:         minio.CanonicalizeETag(gwMeta.ETag),
		LastModified: gwMeta.Stat.ModTime,
		PartNumber:   partID,
	}, nil
}

// CopyObjectPart creates a part in a multipart upload by copying
// existing object or a part of it.
func (l *s3EncObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, destBucket, destObject, uploadID string,
	partID int, startOffset, length int64, srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions) (p minio.PartInfo, err error) {
	return l.PutObjectPart(ctx, destBucket, destObject, uploadID, partID, srcInfo.PutObjReader, dstOpts)
}

// GetMultipartInfo returns multipart info of the uploadId of the object
func (l *s3EncObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts minio.ObjectOptions) (result minio.MultipartInfo, err error) {
	result.Bucket = bucket
	result.Object = object
	result.UploadID = uploadID
	// We do not store parts uploaded so far in the dare.meta. Only CompleteMultipartUpload finalizes the parts under upload prefix.Otherwise,
	// there could be situations of dare.meta getting corrupted by competing upload parts.
	dm, err := l.getGWMetadata(ctx, bucket, getTmpDareMetaPath(object, uploadID))
	if err != nil {
		return l.s3Objects.GetMultipartInfo(ctx, bucket, object, uploadID, opts)
	}
	result.UserDefined = dm.ToObjectInfo(bucket, object).UserDefined
	return result, nil
}

// ListObjectParts returns all object parts for specified object in specified bucket
func (l *s3EncObjects) ListObjectParts(ctx context.Context, bucket string, object string, uploadID string, partNumberMarker int, maxParts int, opts minio.ObjectOptions) (lpi minio.ListPartsInfo, e error) {
	// We do not store parts uploaded so far in the dare.meta. Only CompleteMultipartUpload finalizes the parts under upload prefix.Otherwise,
	// there could be situations of dare.meta getting corrupted by competing upload parts.
	dm, err := l.getGWMetadata(ctx, bucket, getTmpDareMetaPath(object, uploadID))
	if err != nil {
		return l.s3Objects.ListObjectParts(ctx, bucket, object, uploadID, partNumberMarker, maxParts, opts)
	}

	lpi, err = l.s3Objects.ListObjectParts(ctx, bucket, getGWContentPath(object), uploadID, partNumberMarker, maxParts, opts)
	if err != nil {
		return lpi, err
	}
	for i, part := range lpi.Parts {
		partMeta, err := l.getGWMetadata(ctx, bucket, getPartMetaPath(object, uploadID, part.PartNumber))
		if err != nil || len(partMeta.Parts) == 0 {
			return lpi, minio.InvalidPart{}
		}
		lpi.Parts[i].ETag = partMeta.ETag
	}
	lpi.UserDefined = dm.ToObjectInfo(bucket, object).UserDefined
	lpi.Object = object
	return lpi, nil
}

// AbortMultipartUpload aborts a ongoing multipart upload
func (l *s3EncObjects) AbortMultipartUpload(ctx context.Context, bucket string, object string, uploadID string, opts minio.ObjectOptions) error {
	if _, err := l.getGWMetadata(ctx, bucket, getTmpDareMetaPath(object, uploadID)); err != nil {
		return l.s3Objects.AbortMultipartUpload(ctx, bucket, object, uploadID, opts)
	}

	if err := l.s3Objects.AbortMultipartUpload(ctx, bucket, getGWContentPath(object), uploadID, opts); err != nil {
		return err
	}

	uploadPrefix := getTmpGWMetaPath(object, uploadID)
	var continuationToken, startAfter, delimiter string
	for {
		loi, err := l.s3Objects.ListObjectsV2(ctx, bucket, uploadPrefix, continuationToken, delimiter, 1000, false, startAfter)
		if err != nil {
			return minio.InvalidUploadID{UploadID: uploadID}
		}
		for _, obj := range loi.Objects {
			if _, err := l.s3Objects.DeleteObject(ctx, bucket, obj.Name, minio.ObjectOptions{}); err != nil {
				return minio.ErrorRespToObjectError(err)
			}
			startAfter = obj.Name
		}
		continuationToken = loi.NextContinuationToken
		if !loi.IsTruncated {
			break
		}
	}
	return nil
}

// CompleteMultipartUpload completes ongoing multipart upload and finalizes object
func (l *s3EncObjects) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []minio.CompletePart, opts minio.ObjectOptions) (oi minio.ObjectInfo, e error) {

	tmpMeta, err := l.getGWMetadata(ctx, bucket, getTmpDareMetaPath(object, uploadID))
	if err != nil {
		oi, e = l.s3Objects.CompleteMultipartUpload(ctx, bucket, object, uploadID, uploadedParts, opts)
		if e == nil {
			// delete any encrypted version of object that might exist
			defer l.deleteGWMetadata(ctx, bucket, getDareMetaPath(object))
			defer l.DeleteObject(ctx, bucket, getGWContentPath(object), opts)
		}
		return oi, e
	}
	gwMeta := newGWMetaV1()
	gwMeta.Meta = make(map[string]string)
	for k, v := range tmpMeta.Meta {
		gwMeta.Meta[k] = v
	}
	// Allocate parts similar to incoming slice.
	gwMeta.Parts = make([]minio.ObjectPartInfo, len(uploadedParts))

	bkUploadedParts := make([]minio.CompletePart, len(uploadedParts))
	// Calculate full object size.
	var objectSize int64

	// Validate each part and then commit to disk.
	for i, part := range uploadedParts {
		partMeta, err := l.getGWMetadata(ctx, bucket, getPartMetaPath(object, uploadID, part.PartNumber))
		if err != nil || len(partMeta.Parts) == 0 {
			return oi, minio.InvalidPart{}
		}
		bkUploadedParts[i] = minio.CompletePart{PartNumber: part.PartNumber, ETag: partMeta.Parts[0].ETag}
		gwMeta.Parts[i] = partMeta.Parts[0]
		objectSize += partMeta.Parts[0].Size
	}
	oi, e = l.s3Objects.CompleteMultipartUpload(ctx, bucket, getGWContentPath(object), uploadID, bkUploadedParts, opts)
	if e != nil {
		return oi, e
	}

	// delete any unencrypted version of object that might be on the backend
	defer l.s3Objects.DeleteObject(ctx, bucket, object, opts)

	// Save the final object size and modtime.
	gwMeta.Stat.Size = objectSize
	gwMeta.Stat.ModTime = time.Now().UTC()
	gwMeta.ETag = oi.ETag

	if err = l.writeGWMetadata(ctx, bucket, getDareMetaPath(object), gwMeta, minio.ObjectOptions{}); err != nil {
		return oi, minio.ErrorRespToObjectError(err)
	}
	// Clean up any uploaded parts that are not being committed by this CompleteMultipart operation
	var continuationToken, startAfter, delimiter string
	uploadPrefix := getTmpGWMetaPath(object, uploadID)
	done := false
	for {
		loi, lerr := l.s3Objects.ListObjectsV2(ctx, bucket, uploadPrefix, continuationToken, delimiter, 1000, false, startAfter)
		if lerr != nil {
			break
		}
		for _, obj := range loi.Objects {
			if !strings.HasPrefix(obj.Name, uploadPrefix) {
				done = true
				break
			}
			startAfter = obj.Name
			l.s3Objects.DeleteObject(ctx, bucket, obj.Name, opts)
		}
		continuationToken = loi.NextContinuationToken
		if !loi.IsTruncated || done {
			break
		}
	}

	return gwMeta.ToObjectInfo(bucket, object), nil
}

// getTmpGWMetaPath returns the prefix under which uploads in progress are stored on backend
func getTmpGWMetaPath(object, uploadID string) string {
	return path.Join(object, defaultMinioGWPrefix, uploadID)
}

// getGWMetaPath returns the prefix under which custom object metadata and object are stored on backend after upload completes
func getGWMetaPath(object string) string {
	return path.Join(object, defaultMinioGWPrefix)
}

// getGWContentPath returns the prefix under which custom object is stored on backend after upload completes
func getGWContentPath(object string) string {
	return path.Join(object, defaultMinioGWPrefix, defaultGWContentFileName)
}

// Clean-up the stale incomplete encrypted multipart uploads. Should be run in a Go routine.
func (l *s3EncObjects) cleanupStaleEncMultipartUploads(ctx context.Context, cleanupInterval, expiry time.Duration) {
	ticker := time.NewTicker(cleanupInterval)
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			l.cleanupStaleUploads(ctx, expiry)
		}
	}
}

// cleanupStaleUploads removes old custom encryption multipart uploads on backend
func (l *s3EncObjects) cleanupStaleUploads(ctx context.Context, expiry time.Duration) {
	buckets, err := l.s3Objects.ListBuckets(ctx)
	if err != nil {
		logger.LogIf(ctx, err)
		return
	}
	for _, b := range buckets {
		expParts := l.getStalePartsForBucket(ctx, b.Name, expiry)
		for k := range expParts {
			l.s3Objects.DeleteObject(ctx, b.Name, k, minio.ObjectOptions{})
		}
	}
}

func (l *s3EncObjects) getStalePartsForBucket(ctx context.Context, bucket string, expiry time.Duration) (expParts map[string]string) {
	var prefix, continuationToken, delimiter, startAfter string
	expParts = make(map[string]string)
	now := time.Now()
	for {
		loi, err := l.s3Objects.ListObjectsV2(ctx, bucket, prefix, continuationToken, delimiter, 1000, false, startAfter)
		if err != nil {
			logger.LogIf(ctx, err)
			break
		}
		for _, obj := range loi.Objects {
			startAfter = obj.Name
			if !strings.Contains(obj.Name, defaultMinioGWPrefix) {
				continue
			}

			if isGWObject(obj.Name) {
				continue
			}

			// delete temporary part.meta or dare.meta files for incomplete uploads that are past expiry
			if (strings.HasSuffix(obj.Name, gwpartMetaJSON) || strings.HasSuffix(obj.Name, gwdareMetaJSON)) &&
				now.Sub(obj.ModTime) > expiry {
				expParts[obj.Name] = ""
			}
		}
		continuationToken = loi.NextContinuationToken
		if !loi.IsTruncated {
			break
		}
	}
	return
}

func (l *s3EncObjects) DeleteBucket(ctx context.Context, bucket string, opts minio.DeleteBucketOptions) error {
	var prefix, continuationToken, delimiter, startAfter string
	expParts := make(map[string]string)

	for {
		loi, err := l.s3Objects.ListObjectsV2(ctx, bucket, prefix, continuationToken, delimiter, 1000, false, startAfter)
		if err != nil {
			break
		}
		for _, obj := range loi.Objects {
			startAfter = obj.Name
			if !strings.Contains(obj.Name, defaultMinioGWPrefix) {
				return minio.BucketNotEmpty{}
			}
			if isGWObject(obj.Name) {
				return minio.BucketNotEmpty{}
			}
			// delete temporary part.meta or dare.meta files for incomplete uploads
			if strings.HasSuffix(obj.Name, gwpartMetaJSON) || strings.HasSuffix(obj.Name, gwdareMetaJSON) {
				expParts[obj.Name] = ""
			}
		}
		continuationToken = loi.NextContinuationToken
		if !loi.IsTruncated {
			break
		}
	}
	for k := range expParts {
		l.s3Objects.DeleteObject(ctx, bucket, k, minio.ObjectOptions{})
	}
	err := l.Client.RemoveBucket(ctx, bucket)
	if err != nil {
		return minio.ErrorRespToObjectError(err, bucket)
	}
	return nil
}