mirror of https://github.com/minio/minio.git
handle the actualSize() properly for PostUpload() (#20422)
postUpload() incorrectly saves actual size as '-1' we should save correct size when its possible. Bonus: fix the PutObjectPart() write locker, instead of holding a lock before we read the client stream. We should hold it only when we need to commit the parts.
This commit is contained in:
parent
b963f36e1e
commit
bc527eceda
|
@ -41,6 +41,7 @@ import (
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
"github.com/minio/pkg/v3/mimedb"
|
"github.com/minio/pkg/v3/mimedb"
|
||||||
"github.com/minio/pkg/v3/sync/errgroup"
|
"github.com/minio/pkg/v3/sync/errgroup"
|
||||||
|
"github.com/minio/sio"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (er erasureObjects) getUploadIDDir(bucket, object, uploadID string) string {
|
func (er erasureObjects) getUploadIDDir(bucket, object, uploadID string) string {
|
||||||
|
@ -590,19 +591,6 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
||||||
return pi, toObjectErr(err, bucket, object, uploadID)
|
return pi, toObjectErr(err, bucket, object, uploadID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write lock for this part ID, only hold it if we are planning to read from the
|
|
||||||
// streamto avoid any concurrent updates.
|
|
||||||
//
|
|
||||||
// Must be held throughout this call.
|
|
||||||
partIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID, strconv.Itoa(partID)))
|
|
||||||
plkctx, err := partIDLock.GetLock(ctx, globalOperationTimeout)
|
|
||||||
if err != nil {
|
|
||||||
return PartInfo{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx = plkctx.Context()
|
|
||||||
defer partIDLock.Unlock(plkctx)
|
|
||||||
|
|
||||||
onlineDisks := er.getDisks()
|
onlineDisks := er.getDisks()
|
||||||
writeQuorum := fi.WriteQuorum(er.defaultWQuorum())
|
writeQuorum := fi.WriteQuorum(er.defaultWQuorum())
|
||||||
|
|
||||||
|
@ -716,11 +704,28 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
||||||
index = opts.IndexCB()
|
index = opts.IndexCB()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
actualSize := data.ActualSize()
|
||||||
|
if actualSize < 0 {
|
||||||
|
_, encrypted := crypto.IsEncrypted(fi.Metadata)
|
||||||
|
compressed := fi.IsCompressed()
|
||||||
|
switch {
|
||||||
|
case compressed:
|
||||||
|
// ... nothing changes for compressed stream.
|
||||||
|
case encrypted:
|
||||||
|
decSize, err := sio.DecryptedSize(uint64(n))
|
||||||
|
if err == nil {
|
||||||
|
actualSize = int64(decSize)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
actualSize = n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
partInfo := ObjectPartInfo{
|
partInfo := ObjectPartInfo{
|
||||||
Number: partID,
|
Number: partID,
|
||||||
ETag: md5hex,
|
ETag: md5hex,
|
||||||
Size: n,
|
Size: n,
|
||||||
ActualSize: data.ActualSize(),
|
ActualSize: actualSize,
|
||||||
ModTime: UTCNow(),
|
ModTime: UTCNow(),
|
||||||
Index: index,
|
Index: index,
|
||||||
Checksums: r.ContentCRC(),
|
Checksums: r.ContentCRC(),
|
||||||
|
@ -731,6 +736,19 @@ func (er erasureObjects) PutObjectPart(ctx context.Context, bucket, object, uplo
|
||||||
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
|
return pi, toObjectErr(err, minioMetaMultipartBucket, partPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Write lock for this part ID, only hold it if we are planning to read from the
|
||||||
|
// stream avoid any concurrent updates.
|
||||||
|
//
|
||||||
|
// Must be held throughout this call.
|
||||||
|
partIDLock := er.NewNSLock(bucket, pathJoin(object, uploadID, strconv.Itoa(partID)))
|
||||||
|
plkctx, err := partIDLock.GetLock(ctx, globalOperationTimeout)
|
||||||
|
if err != nil {
|
||||||
|
return PartInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx = plkctx.Context()
|
||||||
|
defer partIDLock.Unlock(plkctx)
|
||||||
|
|
||||||
onlineDisks, err = er.renamePart(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, partFI, writeQuorum)
|
onlineDisks, err = er.renamePart(ctx, onlineDisks, minioMetaTmpBucket, tmpPartPath, minioMetaMultipartBucket, partPath, partFI, writeQuorum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, errFileNotFound) {
|
if errors.Is(err, errFileNotFound) {
|
||||||
|
@ -1154,7 +1172,7 @@ func (er erasureObjects) CompleteMultipartUpload(ctx context.Context, bucket str
|
||||||
// However, in case of encryption, the persisted part ETags don't match
|
// However, in case of encryption, the persisted part ETags don't match
|
||||||
// what we have sent to the client during PutObjectPart. The reason is
|
// what we have sent to the client during PutObjectPart. The reason is
|
||||||
// that ETags are encrypted. Hence, the client will send a list of complete
|
// that ETags are encrypted. Hence, the client will send a list of complete
|
||||||
// part ETags of which non can match the ETag of any part. For example
|
// part ETags of which may not match the ETag of any part. For example
|
||||||
// ETag (client): 30902184f4e62dd8f98f0aaff810c626
|
// ETag (client): 30902184f4e62dd8f98f0aaff810c626
|
||||||
// ETag (server-internal): 20000f00ce5dc16e3f3b124f586ae1d88e9caa1c598415c2759bbb50e84a59f630902184f4e62dd8f98f0aaff810c626
|
// ETag (server-internal): 20000f00ce5dc16e3f3b124f586ae1d88e9caa1c598415c2759bbb50e84a59f630902184f4e62dd8f98f0aaff810c626
|
||||||
//
|
//
|
||||||
|
|
|
@ -49,6 +49,7 @@ import (
|
||||||
"github.com/minio/minio/internal/logger"
|
"github.com/minio/minio/internal/logger"
|
||||||
"github.com/minio/pkg/v3/mimedb"
|
"github.com/minio/pkg/v3/mimedb"
|
||||||
"github.com/minio/pkg/v3/sync/errgroup"
|
"github.com/minio/pkg/v3/sync/errgroup"
|
||||||
|
"github.com/minio/sio"
|
||||||
)
|
)
|
||||||
|
|
||||||
// list all errors which can be ignored in object operations.
|
// list all errors which can be ignored in object operations.
|
||||||
|
@ -1468,6 +1469,23 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
||||||
modTime = UTCNow()
|
modTime = UTCNow()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
actualSize := data.ActualSize()
|
||||||
|
if actualSize < 0 {
|
||||||
|
_, encrypted := crypto.IsEncrypted(fi.Metadata)
|
||||||
|
compressed := fi.IsCompressed()
|
||||||
|
switch {
|
||||||
|
case compressed:
|
||||||
|
// ... nothing changes for compressed stream.
|
||||||
|
case encrypted:
|
||||||
|
decSize, err := sio.DecryptedSize(uint64(n))
|
||||||
|
if err == nil {
|
||||||
|
actualSize = int64(decSize)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
actualSize = n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for i, w := range writers {
|
for i, w := range writers {
|
||||||
if w == nil {
|
if w == nil {
|
||||||
onlineDisks[i] = nil
|
onlineDisks[i] = nil
|
||||||
|
@ -1479,7 +1497,7 @@ func (er erasureObjects) putObject(ctx context.Context, bucket string, object st
|
||||||
partsMetadata[i].Data = nil
|
partsMetadata[i].Data = nil
|
||||||
}
|
}
|
||||||
// No need to add checksum to part. We already have it on the object.
|
// No need to add checksum to part. We already have it on the object.
|
||||||
partsMetadata[i].AddObjectPart(1, "", n, data.ActualSize(), modTime, compIndex, nil)
|
partsMetadata[i].AddObjectPart(1, "", n, actualSize, modTime, compIndex, nil)
|
||||||
partsMetadata[i].Versioned = opts.Versioned || opts.VersionSuspended
|
partsMetadata[i].Versioned = opts.Versioned || opts.VersionSuspended
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue