mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
support proper values for listMultipartUploads/listParts (#9970)
object KMS is configured with auto-encryption, there were issues when using docker registry - this has been left unnoticed for a while. This PR fixes an issue with compatibility. Additionally also fix the continuation-token implementation infinite loop issue which was missed as part of #9939 Also fix the heal token to be generated as a client facing value instead of what is remembered by the server, this allows for the server to be stateless regarding the token's behavior.
This commit is contained in:
parent
03b84091fc
commit
cdb0e6ffed
@ -652,19 +652,15 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
if globalIsDistErasure {
|
||||
// Analyze the heal token and route the request accordingly
|
||||
_, nodeIndex, parsed := parseRequestToken(hip.clientToken)
|
||||
if parsed {
|
||||
if proxyRequestByNodeIndex(ctx, w, r, nodeIndex) {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
apiErr := errorCodes.ToAPIErr(ErrHealInvalidClientToken)
|
||||
writeErrorResponseJSON(ctx, w, apiErr, r.URL)
|
||||
return
|
||||
}
|
||||
// Analyze the heal token and route the request accordingly
|
||||
token, success := proxyRequestByToken(ctx, w, r, hip.clientToken)
|
||||
if success {
|
||||
return
|
||||
}
|
||||
hip.clientToken = token
|
||||
// if request was not successful, try this server locally if token
|
||||
// is not found the call will fail anyways. if token is empty
|
||||
// try this server to generate a new token.
|
||||
|
||||
type healResp struct {
|
||||
respBytes []byte
|
||||
@ -736,8 +732,12 @@ func (a adminAPIHandlers) HealHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if hip.clientToken == "" && !hip.forceStart && !hip.forceStop {
|
||||
nh, exists := globalAllHealState.getHealSequence(healPath)
|
||||
if exists && !nh.hasEnded() && len(nh.currentStatus.Items) > 0 {
|
||||
clientToken := nh.clientToken
|
||||
if globalIsDistErasure {
|
||||
clientToken = fmt.Sprintf("%s@%d", nh.clientToken, GetProxyEndpointLocalIndex(globalProxyEndpoints))
|
||||
}
|
||||
b, err := json.Marshal(madmin.HealStartSuccess{
|
||||
ClientToken: nh.clientToken,
|
||||
ClientToken: clientToken,
|
||||
ClientAddress: nh.clientAddress,
|
||||
StartTime: nh.startTime,
|
||||
})
|
||||
|
@ -155,8 +155,13 @@ func (ahs *allHealState) stopHealSequence(path string) ([]byte, APIError) {
|
||||
StartTime: UTCNow(),
|
||||
}
|
||||
} else {
|
||||
clientToken := he.clientToken
|
||||
if globalIsDistErasure {
|
||||
clientToken = fmt.Sprintf("%s@%d", he.clientToken, GetProxyEndpointLocalIndex(globalProxyEndpoints))
|
||||
}
|
||||
|
||||
hsp = madmin.HealStopSuccess{
|
||||
ClientToken: he.clientToken,
|
||||
ClientToken: clientToken,
|
||||
ClientAddress: he.clientAddress,
|
||||
StartTime: he.startTime,
|
||||
}
|
||||
@ -232,8 +237,13 @@ func (ahs *allHealState) LaunchNewHealSequence(h *healSequence) (
|
||||
// Launch top-level background heal go-routine
|
||||
go h.healSequenceStart()
|
||||
|
||||
clientToken := h.clientToken
|
||||
if globalIsDistErasure {
|
||||
clientToken = fmt.Sprintf("%s@%d", h.clientToken, GetProxyEndpointLocalIndex(globalProxyEndpoints))
|
||||
}
|
||||
|
||||
b, err := json.Marshal(madmin.HealStartSuccess{
|
||||
ClientToken: h.clientToken,
|
||||
ClientToken: clientToken,
|
||||
ClientAddress: h.clientAddress,
|
||||
StartTime: h.startTime,
|
||||
})
|
||||
@ -371,9 +381,6 @@ func newHealSequence(ctx context.Context, bucket, objPrefix, clientAddr string,
|
||||
ctx, cancel := context.WithCancel(logger.SetReqInfo(ctx, reqInfo))
|
||||
|
||||
clientToken := mustGetUUID()
|
||||
if globalIsDistErasure {
|
||||
clientToken = fmt.Sprintf("%s@%d", clientToken, GetProxyEndpointLocalIndex(globalProxyEndpoints))
|
||||
}
|
||||
|
||||
return &healSequence{
|
||||
respCh: make(chan healResult),
|
||||
|
@ -161,12 +161,10 @@ func (api objectAPIHandlers) ListObjectsV2MHandler(w http.ResponseWriter, r *htt
|
||||
}
|
||||
|
||||
// Analyze continuation token and route the request accordingly
|
||||
subToken, nodeIndex, parsed := parseRequestToken(token)
|
||||
if parsed {
|
||||
if proxyRequestByNodeIndex(ctx, w, r, nodeIndex) {
|
||||
return
|
||||
}
|
||||
token = subToken
|
||||
var success bool
|
||||
token, success = proxyRequestByToken(ctx, w, r, token)
|
||||
if success {
|
||||
return
|
||||
}
|
||||
|
||||
listObjectsV2 := objectAPI.ListObjectsV2
|
||||
@ -192,7 +190,10 @@ func (api objectAPIHandlers) ListObjectsV2MHandler(w http.ResponseWriter, r *htt
|
||||
}
|
||||
|
||||
// The next continuation token has id@node_index format to optimize paginated listing
|
||||
nextContinuationToken := fmt.Sprintf("%s@%d", listObjectsV2Info.NextContinuationToken, getLocalNodeIndex())
|
||||
nextContinuationToken := listObjectsV2Info.NextContinuationToken
|
||||
if nextContinuationToken != "" && listObjectsV2Info.IsTruncated {
|
||||
nextContinuationToken = fmt.Sprintf("%s@%d", listObjectsV2Info.NextContinuationToken, getLocalNodeIndex())
|
||||
}
|
||||
|
||||
response := generateListObjectsV2Response(bucket, prefix, token, nextContinuationToken, startAfter,
|
||||
delimiter, encodingType, fetchOwner, listObjectsV2Info.IsTruncated,
|
||||
@ -246,12 +247,10 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http
|
||||
}
|
||||
|
||||
// Analyze continuation token and route the request accordingly
|
||||
subToken, nodeIndex, parsed := parseRequestToken(token)
|
||||
if parsed {
|
||||
if proxyRequestByNodeIndex(ctx, w, r, nodeIndex) {
|
||||
return
|
||||
}
|
||||
token = subToken
|
||||
var success bool
|
||||
token, success = proxyRequestByToken(ctx, w, r, token)
|
||||
if success {
|
||||
return
|
||||
}
|
||||
|
||||
listObjectsV2 := objectAPI.ListObjectsV2
|
||||
@ -277,7 +276,10 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http
|
||||
}
|
||||
|
||||
// The next continuation token has id@node_index format to optimize paginated listing
|
||||
nextContinuationToken := fmt.Sprintf("%s@%d", listObjectsV2Info.NextContinuationToken, getLocalNodeIndex())
|
||||
nextContinuationToken := listObjectsV2Info.NextContinuationToken
|
||||
if nextContinuationToken != "" && listObjectsV2Info.IsTruncated {
|
||||
nextContinuationToken = fmt.Sprintf("%s@%d", listObjectsV2Info.NextContinuationToken, getLocalNodeIndex())
|
||||
}
|
||||
|
||||
response := generateListObjectsV2Response(bucket, prefix, token, nextContinuationToken, startAfter,
|
||||
delimiter, encodingType, fetchOwner, listObjectsV2Info.IsTruncated,
|
||||
@ -299,17 +301,28 @@ func getLocalNodeIndex() int {
|
||||
return -1
|
||||
}
|
||||
|
||||
func parseRequestToken(token string) (subToken string, nodeIndex int, success bool) {
|
||||
func parseRequestToken(token string) (subToken string, nodeIndex int) {
|
||||
if token == "" {
|
||||
return token, -1
|
||||
}
|
||||
i := strings.Index(token, "@")
|
||||
if i < 0 {
|
||||
return "", -1, false
|
||||
return token, -1
|
||||
}
|
||||
nodeIndex, err := strconv.Atoi(token[i+1:])
|
||||
if err != nil {
|
||||
return "", -1, false
|
||||
return token, -1
|
||||
}
|
||||
subToken = token[:i]
|
||||
return subToken, nodeIndex, true
|
||||
return subToken, nodeIndex
|
||||
}
|
||||
|
||||
func proxyRequestByToken(ctx context.Context, w http.ResponseWriter, r *http.Request, token string) (string, bool) {
|
||||
subToken, nodeIndex := parseRequestToken(token)
|
||||
if nodeIndex > 0 {
|
||||
return subToken, proxyRequestByNodeIndex(ctx, w, r, nodeIndex)
|
||||
}
|
||||
return subToken, false
|
||||
}
|
||||
|
||||
func proxyRequestByNodeIndex(ctx context.Context, w http.ResponseWriter, r *http.Request, index int) (success bool) {
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/minio/minio-go/v6/pkg/set"
|
||||
xhttp "github.com/minio/minio/cmd/http"
|
||||
"github.com/minio/minio/cmd/logger"
|
||||
"github.com/minio/minio/pkg/mimedb"
|
||||
@ -75,37 +76,101 @@ func (er erasureObjects) removeObjectPart(bucket, object, uploadID, dataDir stri
|
||||
// not support prefix based listing, this is a deliberate attempt
|
||||
// towards simplification of multipart APIs.
|
||||
// The resulting ListMultipartsInfo structure is unmarshalled directly as XML.
|
||||
func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, object, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, e error) {
|
||||
func (er erasureObjects) ListMultipartUploads(ctx context.Context, bucket, object, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result ListMultipartsInfo, err error) {
|
||||
result.MaxUploads = maxUploads
|
||||
result.KeyMarker = keyMarker
|
||||
result.Prefix = object
|
||||
result.Delimiter = delimiter
|
||||
|
||||
var uploadIDs []string
|
||||
for _, disk := range er.getLoadBalancedDisks() {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
uploadIDs, err := disk.ListDir(minioMetaMultipartBucket, er.getMultipartSHADir(bucket, object), -1)
|
||||
uploadIDs, err = disk.ListDir(minioMetaMultipartBucket, er.getMultipartSHADir(bucket, object), -1)
|
||||
if err != nil {
|
||||
if err == errDiskNotFound {
|
||||
continue
|
||||
}
|
||||
if err == errFileNotFound {
|
||||
return result, nil
|
||||
}
|
||||
logger.LogIf(ctx, err)
|
||||
return result, err
|
||||
}
|
||||
for i := range uploadIDs {
|
||||
uploadIDs[i] = strings.TrimSuffix(uploadIDs[i], SlashSeparator)
|
||||
}
|
||||
sort.Strings(uploadIDs)
|
||||
for _, uploadID := range uploadIDs {
|
||||
if len(result.Uploads) == maxUploads {
|
||||
break
|
||||
}
|
||||
result.Uploads = append(result.Uploads, MultipartInfo{Object: object, UploadID: uploadID})
|
||||
return result, toObjectErr(err, bucket, object)
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
for i := range uploadIDs {
|
||||
uploadIDs[i] = strings.TrimSuffix(uploadIDs[i], SlashSeparator)
|
||||
}
|
||||
|
||||
// S3 spec says uploadIDs should be sorted based on initiated time, we need
|
||||
// to read the metadata entry.
|
||||
var uploads []MultipartInfo
|
||||
|
||||
populatedUploadIds := set.NewStringSet()
|
||||
|
||||
retry:
|
||||
for _, disk := range er.getLoadBalancedDisks() {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
for _, uploadID := range uploadIDs {
|
||||
if populatedUploadIds.Contains(uploadID) {
|
||||
continue
|
||||
}
|
||||
fi, err := disk.ReadVersion(minioMetaMultipartBucket, pathJoin(er.getUploadIDDir(bucket, object, uploadID)), "")
|
||||
if err != nil {
|
||||
if err == errDiskNotFound || err == errFileNotFound {
|
||||
goto retry
|
||||
}
|
||||
return result, toObjectErr(err, bucket, object)
|
||||
}
|
||||
populatedUploadIds.Add(uploadID)
|
||||
uploads = append(uploads, MultipartInfo{
|
||||
Object: object,
|
||||
UploadID: uploadID,
|
||||
Initiated: fi.ModTime,
|
||||
})
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
sort.Slice(uploads, func(i int, j int) bool {
|
||||
return uploads[i].Initiated.Before(uploads[j].Initiated)
|
||||
})
|
||||
|
||||
uploadIndex := 0
|
||||
if uploadIDMarker != "" {
|
||||
for uploadIndex < len(uploads) {
|
||||
if uploads[uploadIndex].UploadID != uploadIDMarker {
|
||||
uploadIndex++
|
||||
continue
|
||||
}
|
||||
if uploads[uploadIndex].UploadID == uploadIDMarker {
|
||||
uploadIndex++
|
||||
break
|
||||
}
|
||||
uploadIndex++
|
||||
}
|
||||
}
|
||||
for uploadIndex < len(uploads) {
|
||||
result.Uploads = append(result.Uploads, uploads[uploadIndex])
|
||||
result.NextUploadIDMarker = uploads[uploadIndex].UploadID
|
||||
uploadIndex++
|
||||
if len(result.Uploads) == maxUploads {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
result.IsTruncated = uploadIndex < len(uploads)
|
||||
|
||||
if !result.IsTruncated {
|
||||
result.NextKeyMarker = ""
|
||||
result.NextUploadIDMarker = ""
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
|
@ -2326,13 +2326,20 @@ func (api objectAPIHandlers) ListObjectPartsHandler(w http.ResponseWriter, r *ht
|
||||
return
|
||||
}
|
||||
}
|
||||
parts := make([]PartInfo, len(listPartsInfo.Parts))
|
||||
for i, p := range listPartsInfo.Parts {
|
||||
part := p
|
||||
part.ETag = tryDecryptETag(objectEncryptionKey, p.ETag, ssec)
|
||||
parts[i] = part
|
||||
for i := range listPartsInfo.Parts {
|
||||
curp := listPartsInfo.Parts[i]
|
||||
curp.ETag = tryDecryptETag(objectEncryptionKey, curp.ETag, ssec)
|
||||
if !ssec {
|
||||
var partSize uint64
|
||||
partSize, err = sio.DecryptedSize(uint64(curp.Size))
|
||||
if err != nil {
|
||||
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
curp.Size = int64(partSize)
|
||||
}
|
||||
listPartsInfo.Parts[i] = curp
|
||||
}
|
||||
listPartsInfo.Parts = parts
|
||||
}
|
||||
|
||||
response := generateListPartsResponse(listPartsInfo, encodingType)
|
||||
|
Loading…
Reference in New Issue
Block a user