mirror of
https://github.com/minio/minio.git
synced 2025-04-12 07:22:18 -04:00
erasure: avoid io.Copy in hotpaths to reduce allocation (#11213)
This commit is contained in:
parent
c4131c2798
commit
c4b1d394d6
@ -186,7 +186,6 @@ func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) {
|
|||||||
readerIndex++
|
readerIndex++
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
if p.canDecode(newBuf) {
|
if p.canDecode(newBuf) {
|
||||||
p.offset += p.shardSize
|
p.offset += p.shardSize
|
||||||
if healRequired != 0 {
|
if healRequired != 0 {
|
||||||
|
@ -17,7 +17,6 @@
|
|||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
@ -77,20 +76,25 @@ func writeDataBlocks(ctx context.Context, dst io.Writer, enBlocks [][]byte, data
|
|||||||
// from subsequent blocks.
|
// from subsequent blocks.
|
||||||
offset = 0
|
offset = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// We have written all the blocks, write the last remaining block.
|
// We have written all the blocks, write the last remaining block.
|
||||||
if write < int64(len(block)) {
|
if write < int64(len(block)) {
|
||||||
n, err := io.Copy(dst, bytes.NewReader(block[:write]))
|
n, err := dst.Write(block[:write])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != io.ErrClosedPipe {
|
if err != io.ErrClosedPipe {
|
||||||
logger.LogIf(ctx, err)
|
logger.LogIf(ctx, err)
|
||||||
}
|
}
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
totalWritten += n
|
if int64(n) != write {
|
||||||
|
return 0, io.ErrShortWrite
|
||||||
|
}
|
||||||
|
totalWritten += int64(n)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copy the block.
|
// Copy the block.
|
||||||
n, err := io.Copy(dst, bytes.NewReader(block))
|
n, err := dst.Write(block)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// The writer will be closed incase of range queries, which will emit ErrClosedPipe.
|
// The writer will be closed incase of range queries, which will emit ErrClosedPipe.
|
||||||
if err != io.ErrClosedPipe {
|
if err != io.ErrClosedPipe {
|
||||||
@ -99,11 +103,15 @@ func writeDataBlocks(ctx context.Context, dst io.Writer, enBlocks [][]byte, data
|
|||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if n != len(block) {
|
||||||
|
return 0, io.ErrShortWrite
|
||||||
|
}
|
||||||
|
|
||||||
// Decrement output size.
|
// Decrement output size.
|
||||||
write -= n
|
write -= int64(n)
|
||||||
|
|
||||||
// Increment written.
|
// Increment written.
|
||||||
totalWritten += n
|
totalWritten += int64(n)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Success.
|
// Success.
|
||||||
|
@ -432,6 +432,7 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer gr.Close()
|
defer gr.Close()
|
||||||
|
|
||||||
objInfo := gr.ObjInfo
|
objInfo := gr.ObjInfo
|
||||||
|
|
||||||
// filter object lock metadata if permission does not permit
|
// filter object lock metadata if permission does not permit
|
||||||
|
@ -319,7 +319,13 @@ func initAllSubsystems(ctx context.Context, newObject ObjectLayer) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if globalIsErasure {
|
if globalIsErasure {
|
||||||
logger.Info(fmt.Sprintf("Verifying %d buckets are consistent across drives...", len(buckets)))
|
if len(buckets) > 0 {
|
||||||
|
if len(buckets) == 1 {
|
||||||
|
logger.Info(fmt.Sprintf("Verifying if %d bucket is consistent across drives...", len(buckets)))
|
||||||
|
} else {
|
||||||
|
logger.Info(fmt.Sprintf("Verifying if %d buckets are consistent across drives...", len(buckets)))
|
||||||
|
}
|
||||||
|
}
|
||||||
for _, bucket := range buckets {
|
for _, bucket := range buckets {
|
||||||
if _, err = newObject.HealBucket(ctx, bucket.Name, madmin.HealOpts{}); err != nil {
|
if _, err = newObject.HealBucket(ctx, bucket.Name, madmin.HealOpts{}); err != nil {
|
||||||
return fmt.Errorf("Unable to list buckets to heal: %w", err)
|
return fmt.Errorf("Unable to list buckets to heal: %w", err)
|
||||||
|
@ -64,6 +64,9 @@ const (
|
|||||||
// Size of each buffer.
|
// Size of each buffer.
|
||||||
readAheadBufSize = 1 << 20
|
readAheadBufSize = 1 << 20
|
||||||
|
|
||||||
|
// Small file threshold below which the metadata accompanies the data.
|
||||||
|
smallFileThreshold = 32 * humanize.KiByte
|
||||||
|
|
||||||
// XL metadata file carries per object metadata.
|
// XL metadata file carries per object metadata.
|
||||||
xlStorageFormatFile = "xl.meta"
|
xlStorageFormatFile = "xl.meta"
|
||||||
)
|
)
|
||||||
@ -1174,7 +1177,7 @@ func (s *xlStorage) ReadVersion(ctx context.Context, volume, path, versionID str
|
|||||||
// - object has not yet transitioned
|
// - object has not yet transitioned
|
||||||
// - object size lesser than 32KiB
|
// - object size lesser than 32KiB
|
||||||
// - object has maximum of 1 parts
|
// - object has maximum of 1 parts
|
||||||
if fi.TransitionStatus == "" && fi.DataDir != "" && fi.Size < 32*humanize.KiByte && len(fi.Parts) == 1 {
|
if fi.TransitionStatus == "" && fi.DataDir != "" && fi.Size < smallFileThreshold && len(fi.Parts) == 1 {
|
||||||
fi.Data, err = s.readAllData(volumeDir, pathJoin(volumeDir, path, fi.DataDir, fmt.Sprintf("part.%d", fi.Parts[0].Number)))
|
fi.Data, err = s.readAllData(volumeDir, pathJoin(volumeDir, path, fi.DataDir, fmt.Sprintf("part.%d", fi.Parts[0].Number)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fi, err
|
return fi, err
|
||||||
|
Loading…
x
Reference in New Issue
Block a user