mirror of
https://github.com/minio/minio.git
synced 2024-12-23 21:55:53 -05:00
fix: add more optimizations to putMetacacheObject() (#12916)
- avoid extra lookup for 'xl.meta' since we are definitely sure that it doesn't exist. - use this in newMultipartUpload() as well - also additionally do not write with O_DSYNC to avoid loading the drives, instead create 'xl.meta' for listing operations without O_DSYNC since these are ephemeral objects. - do the same with newMultipartUpload() since it gets synced when the PutObjectPart() is attempted, we do not need to tax newMultipartUpload() instead.
This commit is contained in:
parent
39f81d2c5b
commit
40a2fa8e81
@ -347,8 +347,9 @@ func (er erasureObjects) newMultipartUpload(ctx context.Context, bucket string,
|
||||
// Fill all the necessary metadata.
|
||||
// Update `xl.meta` content on each disks.
|
||||
for index := range partsMetadata {
|
||||
partsMetadata[index].Metadata = opts.UserDefined
|
||||
partsMetadata[index].Fresh = true
|
||||
partsMetadata[index].ModTime = modTime
|
||||
partsMetadata[index].Metadata = opts.UserDefined
|
||||
}
|
||||
|
||||
uploadID := mustGetUUID()
|
||||
|
@ -677,11 +677,7 @@ func (er erasureObjects) putMetacacheObject(ctx context.Context, key string, r *
|
||||
onlineDisks[i] = nil
|
||||
continue
|
||||
}
|
||||
if len(inlineBuffers) > 0 && inlineBuffers[i] != nil {
|
||||
partsMetadata[i].Data = inlineBuffers[i].Bytes()
|
||||
} else {
|
||||
partsMetadata[i].Data = nil
|
||||
}
|
||||
partsMetadata[i].Data = inlineBuffers[i].Bytes()
|
||||
partsMetadata[i].AddObjectPart(1, "", n, data.ActualSize())
|
||||
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{
|
||||
PartNumber: 1,
|
||||
@ -695,9 +691,10 @@ func (er erasureObjects) putMetacacheObject(ctx context.Context, key string, r *
|
||||
// Fill all the necessary metadata.
|
||||
// Update `xl.meta` content on each disks.
|
||||
for index := range partsMetadata {
|
||||
partsMetadata[index].Metadata = opts.UserDefined
|
||||
partsMetadata[index].Size = n
|
||||
partsMetadata[index].Fresh = true
|
||||
partsMetadata[index].ModTime = modTime
|
||||
partsMetadata[index].Metadata = opts.UserDefined
|
||||
}
|
||||
|
||||
// Set an additional header when data is inlined.
|
||||
@ -714,33 +711,11 @@ func (er erasureObjects) putMetacacheObject(ctx context.Context, key string, r *
|
||||
}
|
||||
}
|
||||
|
||||
g := errgroup.WithNErrs(len(onlineDisks))
|
||||
|
||||
// Rename file on all underlying storage disks.
|
||||
for index := range onlineDisks {
|
||||
index := index
|
||||
g.Go(func() error {
|
||||
if onlineDisks[index] == nil {
|
||||
return errDiskNotFound
|
||||
}
|
||||
// Pick one FileInfo for a disk at index.
|
||||
fi := partsMetadata[index]
|
||||
// Assign index when index is initialized
|
||||
if fi.Erasure.Index == 0 {
|
||||
fi.Erasure.Index = index + 1
|
||||
}
|
||||
|
||||
if fi.IsValid() {
|
||||
return onlineDisks[index].WriteMetadata(ctx, minioMetaBucket, key, fi)
|
||||
}
|
||||
return errFileCorrupt
|
||||
}, index)
|
||||
if _, err = writeUniqueFileInfo(ctx, onlineDisks, minioMetaBucket, key, partsMetadata, writeQuorum); err != nil {
|
||||
return ObjectInfo{}, toObjectErr(err, minioMetaBucket, key)
|
||||
}
|
||||
|
||||
// Wait for all renames to finish.
|
||||
errs := g.Wait()
|
||||
|
||||
return fi.ToObjectInfo(minioMetaBucket, key), reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
|
||||
return fi.ToObjectInfo(minioMetaBucket, key), nil
|
||||
}
|
||||
|
||||
// PutObject - creates an object upon reading from the input stream
|
||||
|
@ -179,6 +179,8 @@ type FileInfo struct {
|
||||
|
||||
NumVersions int
|
||||
SuccessorModTime time.Time
|
||||
|
||||
Fresh bool // indicates this is a first time call to write FileInfo.
|
||||
}
|
||||
|
||||
// InlineData returns true if object contents are inlined alongside its metadata.
|
||||
|
@ -550,8 +550,8 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) {
|
||||
err = msgp.WrapError(err)
|
||||
return
|
||||
}
|
||||
if zb0001 != 24 {
|
||||
err = msgp.ArrayError{Wanted: 24, Got: zb0001}
|
||||
if zb0001 != 25 {
|
||||
err = msgp.ArrayError{Wanted: 25, Got: zb0001}
|
||||
return
|
||||
}
|
||||
z.Volume, err = dc.ReadString()
|
||||
@ -715,13 +715,18 @@ func (z *FileInfo) DecodeMsg(dc *msgp.Reader) (err error) {
|
||||
err = msgp.WrapError(err, "SuccessorModTime")
|
||||
return
|
||||
}
|
||||
z.Fresh, err = dc.ReadBool()
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "Fresh")
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// EncodeMsg implements msgp.Encodable
|
||||
func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) {
|
||||
// array header, size 24
|
||||
err = en.Append(0xdc, 0x0, 0x18)
|
||||
// array header, size 25
|
||||
err = en.Append(0xdc, 0x0, 0x19)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@ -864,14 +869,19 @@ func (z *FileInfo) EncodeMsg(en *msgp.Writer) (err error) {
|
||||
err = msgp.WrapError(err, "SuccessorModTime")
|
||||
return
|
||||
}
|
||||
err = en.WriteBool(z.Fresh)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "Fresh")
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// MarshalMsg implements msgp.Marshaler
|
||||
func (z *FileInfo) MarshalMsg(b []byte) (o []byte, err error) {
|
||||
o = msgp.Require(b, z.Msgsize())
|
||||
// array header, size 24
|
||||
o = append(o, 0xdc, 0x0, 0x18)
|
||||
// array header, size 25
|
||||
o = append(o, 0xdc, 0x0, 0x19)
|
||||
o = msgp.AppendString(o, z.Volume)
|
||||
o = msgp.AppendString(o, z.Name)
|
||||
o = msgp.AppendString(o, z.VersionID)
|
||||
@ -911,6 +921,7 @@ func (z *FileInfo) MarshalMsg(b []byte) (o []byte, err error) {
|
||||
o = msgp.AppendBytes(o, z.Data)
|
||||
o = msgp.AppendInt(o, z.NumVersions)
|
||||
o = msgp.AppendTime(o, z.SuccessorModTime)
|
||||
o = msgp.AppendBool(o, z.Fresh)
|
||||
return
|
||||
}
|
||||
|
||||
@ -922,8 +933,8 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
|
||||
err = msgp.WrapError(err)
|
||||
return
|
||||
}
|
||||
if zb0001 != 24 {
|
||||
err = msgp.ArrayError{Wanted: 24, Got: zb0001}
|
||||
if zb0001 != 25 {
|
||||
err = msgp.ArrayError{Wanted: 25, Got: zb0001}
|
||||
return
|
||||
}
|
||||
z.Volume, bts, err = msgp.ReadStringBytes(bts)
|
||||
@ -1087,6 +1098,11 @@ func (z *FileInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
|
||||
err = msgp.WrapError(err, "SuccessorModTime")
|
||||
return
|
||||
}
|
||||
z.Fresh, bts, err = msgp.ReadBoolBytes(bts)
|
||||
if err != nil {
|
||||
err = msgp.WrapError(err, "Fresh")
|
||||
return
|
||||
}
|
||||
o = bts
|
||||
return
|
||||
}
|
||||
@ -1104,7 +1120,7 @@ func (z *FileInfo) Msgsize() (s int) {
|
||||
for za0003 := range z.Parts {
|
||||
s += z.Parts[za0003].Msgsize()
|
||||
}
|
||||
s += z.Erasure.Msgsize() + msgp.BoolSize + msgp.StringPrefixSize + len(z.DeleteMarkerReplicationStatus) + msgp.StringPrefixSize + len(string(z.VersionPurgeStatus)) + msgp.BytesPrefixSize + len(z.Data) + msgp.IntSize + msgp.TimeSize
|
||||
s += z.Erasure.Msgsize() + msgp.BoolSize + msgp.StringPrefixSize + len(z.DeleteMarkerReplicationStatus) + msgp.StringPrefixSize + len(string(z.VersionPurgeStatus)) + msgp.BytesPrefixSize + len(z.Data) + msgp.IntSize + msgp.TimeSize + msgp.BoolSize
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -18,7 +18,7 @@
|
||||
package cmd
|
||||
|
||||
const (
|
||||
storageRESTVersion = "v38" // Remove CheckFile API
|
||||
storageRESTVersion = "v39" // Add FileInfo.Fresh field
|
||||
storageRESTVersionPrefix = SlashSeparator + storageRESTVersion
|
||||
storageRESTPrefix = minioReservedBucketPath + "/storage"
|
||||
)
|
||||
|
@ -953,6 +953,24 @@ func (s *xlStorage) UpdateMetadata(ctx context.Context, volume, path string, fi
|
||||
|
||||
// WriteMetadata - writes FileInfo metadata for path at `xl.meta`
|
||||
func (s *xlStorage) WriteMetadata(ctx context.Context, volume, path string, fi FileInfo) error {
|
||||
if fi.Fresh {
|
||||
var xlMeta xlMetaV2
|
||||
if err := xlMeta.AddVersion(fi); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return err
|
||||
}
|
||||
buf, err := xlMeta.AppendTo(nil)
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
return err
|
||||
}
|
||||
// First writes for special situations do not write to stable storage.
|
||||
// this is currently used by
|
||||
// - emphemeral objects such as objects created during listObjects() calls
|
||||
// - newMultipartUpload() call..
|
||||
return s.writeAll(ctx, volume, pathJoin(path, xlStorageFormatFile), buf, false)
|
||||
}
|
||||
|
||||
buf, err := s.ReadAll(ctx, volume, pathJoin(path, xlStorageFormatFile))
|
||||
if err != nil && err != errFileNotFound {
|
||||
return err
|
||||
@ -1288,7 +1306,7 @@ func (s *xlStorage) ReadFile(ctx context.Context, volume string, path string, of
|
||||
return int64(len(buffer)), nil
|
||||
}
|
||||
|
||||
func (s *xlStorage) openFile(filePath string, mode int) (f *os.File, err error) {
|
||||
func (s *xlStorage) openFileSync(filePath string, mode int) (f *os.File, err error) {
|
||||
// Create top level directories if they don't exist.
|
||||
// with mode 0777 mkdir honors system umask.
|
||||
if err = mkdirAll(pathutil.Dir(filePath), 0777); err != nil {
|
||||
@ -1315,6 +1333,33 @@ func (s *xlStorage) openFile(filePath string, mode int) (f *os.File, err error)
|
||||
return w, nil
|
||||
}
|
||||
|
||||
func (s *xlStorage) openFileNoSync(filePath string, mode int) (f *os.File, err error) {
|
||||
// Create top level directories if they don't exist.
|
||||
// with mode 0777 mkdir honors system umask.
|
||||
if err = mkdirAll(pathutil.Dir(filePath), 0777); err != nil {
|
||||
return nil, osErrToFileErr(err)
|
||||
}
|
||||
|
||||
w, err := OpenFile(filePath, mode, 0666)
|
||||
if err != nil {
|
||||
// File path cannot be verified since one of the parents is a file.
|
||||
switch {
|
||||
case isSysErrIsDir(err):
|
||||
return nil, errIsNotRegular
|
||||
case osIsPermission(err):
|
||||
return nil, errFileAccessDenied
|
||||
case isSysErrIO(err):
|
||||
return nil, errFaultyDisk
|
||||
case isSysErrTooManyFiles(err):
|
||||
return nil, errTooManyOpenFiles
|
||||
default:
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return w, nil
|
||||
}
|
||||
|
||||
// To support O_DIRECT reads for erasure backends.
|
||||
type odirectReader struct {
|
||||
f *os.File
|
||||
@ -1525,7 +1570,7 @@ func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSiz
|
||||
if fileSize >= 0 && fileSize <= smallFileThreshold {
|
||||
// For streams smaller than 128KiB we simply write them as O_DSYNC (fdatasync)
|
||||
// and not O_DIRECT to avoid the complexities of aligned I/O.
|
||||
w, err := s.openFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_EXCL)
|
||||
w, err := s.openFileSync(filePath, os.O_CREATE|os.O_WRONLY|os.O_EXCL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -1585,7 +1630,7 @@ func (s *xlStorage) CreateFile(ctx context.Context, volume, path string, fileSiz
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *xlStorage) WriteAll(ctx context.Context, volume string, path string, b []byte) (err error) {
|
||||
func (s *xlStorage) writeAll(ctx context.Context, volume string, path string, b []byte, sync bool) (err error) {
|
||||
volumeDir, err := s.getVolDir(volume)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -1596,7 +1641,12 @@ func (s *xlStorage) WriteAll(ctx context.Context, volume string, path string, b
|
||||
return err
|
||||
}
|
||||
|
||||
w, err := s.openFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC)
|
||||
var w *os.File
|
||||
if sync {
|
||||
w, err = s.openFileSync(filePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC)
|
||||
} else {
|
||||
w, err = s.openFileNoSync(filePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -1614,6 +1664,10 @@ func (s *xlStorage) WriteAll(ctx context.Context, volume string, path string, b
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *xlStorage) WriteAll(ctx context.Context, volume string, path string, b []byte) (err error) {
|
||||
return s.writeAll(ctx, volume, path, b, true)
|
||||
}
|
||||
|
||||
// AppendFile - append a byte array at path, if file doesn't exist at
|
||||
// path this call explicitly creates it.
|
||||
func (s *xlStorage) AppendFile(ctx context.Context, volume string, path string, buf []byte) (err error) {
|
||||
@ -1642,7 +1696,7 @@ func (s *xlStorage) AppendFile(ctx context.Context, volume string, path string,
|
||||
var w *os.File
|
||||
// Create file if not found. Not doing O_DIRECT here to avoid the code that does buffer aligned writes.
|
||||
// AppendFile() is only used by healing code to heal objects written in old format.
|
||||
w, err = s.openFile(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY)
|
||||
w, err = s.openFileSync(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user