mirror of
https://github.com/minio/minio.git
synced 2025-01-11 23:13:23 -05:00
fix: [fs] CompleteMultipart use trie structure for partMatch (#10522)
performance improves by around 100x or more ``` go test -v -run NONE -bench BenchmarkGetPartFile goos: linux goarch: amd64 pkg: github.com/minio/minio/cmd BenchmarkGetPartFileWithTrie BenchmarkGetPartFileWithTrie-4 1000000000 0.140 ns/op 0 B/op 0 allocs/op PASS ok github.com/minio/minio/cmd 1.737s ``` fixes #10520
This commit is contained in:
parent
230fc0d186
commit
3831cc9e3b
@ -32,6 +32,7 @@ import (
|
|||||||
jsoniter "github.com/json-iterator/go"
|
jsoniter "github.com/json-iterator/go"
|
||||||
"github.com/minio/minio/cmd/logger"
|
"github.com/minio/minio/cmd/logger"
|
||||||
mioutil "github.com/minio/minio/pkg/ioutil"
|
mioutil "github.com/minio/minio/pkg/ioutil"
|
||||||
|
"github.com/minio/minio/pkg/trie"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Returns EXPORT/.minio.sys/multipart/SHA256/UPLOADID
|
// Returns EXPORT/.minio.sys/multipart/SHA256/UPLOADID
|
||||||
@ -577,7 +578,10 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
|
|||||||
// Calculate s3 compatible md5sum for complete multipart.
|
// Calculate s3 compatible md5sum for complete multipart.
|
||||||
s3MD5 := getCompleteMultipartMD5(parts)
|
s3MD5 := getCompleteMultipartMD5(parts)
|
||||||
|
|
||||||
partSize := int64(-1) // Used later to ensure that all parts sizes are same.
|
// ensure that part ETag is canonicalized to strip off extraneous quotes
|
||||||
|
for i := range parts {
|
||||||
|
parts[i].ETag = canonicalizeETag(parts[i].ETag)
|
||||||
|
}
|
||||||
|
|
||||||
fsMeta := fsMetaV1{}
|
fsMeta := fsMetaV1{}
|
||||||
|
|
||||||
@ -591,16 +595,17 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
|
|||||||
return oi, err
|
return oi, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensure that part ETag is canonicalized to strip off extraneous quotes
|
// Create entries trie structure for prefix match
|
||||||
for i := range parts {
|
entriesTrie := trie.NewTrie()
|
||||||
parts[i].ETag = canonicalizeETag(parts[i].ETag)
|
for _, entry := range entries {
|
||||||
|
entriesTrie.Insert(entry)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save consolidated actual size.
|
// Save consolidated actual size.
|
||||||
var objectActualSize int64
|
var objectActualSize int64
|
||||||
// Validate all parts and then commit to disk.
|
// Validate all parts and then commit to disk.
|
||||||
for i, part := range parts {
|
for i, part := range parts {
|
||||||
partFile := getPartFile(entries, part.PartNumber, part.ETag)
|
partFile := getPartFile(entriesTrie, part.PartNumber, part.ETag)
|
||||||
if partFile == "" {
|
if partFile == "" {
|
||||||
return oi, InvalidPart{
|
return oi, InvalidPart{
|
||||||
PartNumber: part.PartNumber,
|
PartNumber: part.PartNumber,
|
||||||
@ -628,9 +633,6 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
|
|||||||
}
|
}
|
||||||
return oi, err
|
return oi, err
|
||||||
}
|
}
|
||||||
if partSize == -1 {
|
|
||||||
partSize = actualSize
|
|
||||||
}
|
|
||||||
|
|
||||||
fsMeta.Parts[i] = ObjectPartInfo{
|
fsMeta.Parts[i] = ObjectPartInfo{
|
||||||
Number: part.PartNumber,
|
Number: part.PartNumber,
|
||||||
@ -695,8 +697,16 @@ func (fs *FSObjects) CompleteMultipartUpload(ctx context.Context, bucket string,
|
|||||||
fsRemoveFile(ctx, file.filePath)
|
fsRemoveFile(ctx, file.filePath)
|
||||||
}
|
}
|
||||||
for _, part := range parts {
|
for _, part := range parts {
|
||||||
partPath := getPartFile(entries, part.PartNumber, part.ETag)
|
partFile := getPartFile(entriesTrie, part.PartNumber, part.ETag)
|
||||||
if err = mioutil.AppendFile(appendFilePath, pathJoin(uploadIDDir, partPath), globalFSOSync); err != nil {
|
if partFile == "" {
|
||||||
|
logger.LogIf(ctx, fmt.Errorf("%.5d.%s missing will not proceed",
|
||||||
|
part.PartNumber, part.ETag))
|
||||||
|
return oi, InvalidPart{
|
||||||
|
PartNumber: part.PartNumber,
|
||||||
|
GotETag: part.ETag,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err = mioutil.AppendFile(appendFilePath, pathJoin(uploadIDDir, partFile), globalFSOSync); err != nil {
|
||||||
logger.LogIf(ctx, err)
|
logger.LogIf(ctx, err)
|
||||||
return oi, toObjectErr(err)
|
return oi, toObjectErr(err)
|
||||||
}
|
}
|
||||||
|
10
cmd/main.go
10
cmd/main.go
@ -102,21 +102,19 @@ func newApp(name string) *cli.App {
|
|||||||
|
|
||||||
findClosestCommands := func(command string) []string {
|
findClosestCommands := func(command string) []string {
|
||||||
var closestCommands []string
|
var closestCommands []string
|
||||||
for _, value := range commandsTree.PrefixMatch(command) {
|
closestCommands = append(closestCommands, commandsTree.PrefixMatch(command)...)
|
||||||
closestCommands = append(closestCommands, value.(string))
|
|
||||||
}
|
|
||||||
|
|
||||||
sort.Strings(closestCommands)
|
sort.Strings(closestCommands)
|
||||||
// Suggest other close commands - allow missed, wrongly added and
|
// Suggest other close commands - allow missed, wrongly added and
|
||||||
// even transposed characters
|
// even transposed characters
|
||||||
for _, value := range commandsTree.Walk(commandsTree.Root()) {
|
for _, value := range commandsTree.Walk(commandsTree.Root()) {
|
||||||
if sort.SearchStrings(closestCommands, value.(string)) < len(closestCommands) {
|
if sort.SearchStrings(closestCommands, value) < len(closestCommands) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// 2 is arbitrary and represents the max
|
// 2 is arbitrary and represents the max
|
||||||
// allowed number of typed errors
|
// allowed number of typed errors
|
||||||
if words.DamerauLevenshteinDistance(command, value.(string)) < 2 {
|
if words.DamerauLevenshteinDistance(command, value) < 2 {
|
||||||
closestCommands = append(closestCommands, value.(string))
|
closestCommands = append(closestCommands, value)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,6 +45,7 @@ import (
|
|||||||
"github.com/minio/minio/cmd/logger"
|
"github.com/minio/minio/cmd/logger"
|
||||||
"github.com/minio/minio/pkg/hash"
|
"github.com/minio/minio/pkg/hash"
|
||||||
"github.com/minio/minio/pkg/ioutil"
|
"github.com/minio/minio/pkg/ioutil"
|
||||||
|
"github.com/minio/minio/pkg/trie"
|
||||||
"github.com/minio/minio/pkg/wildcard"
|
"github.com/minio/minio/pkg/wildcard"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -487,13 +488,12 @@ func hasPattern(patterns []string, matchStr string) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Returns the part file name which matches the partNumber and etag.
|
// Returns the part file name which matches the partNumber and etag.
|
||||||
func getPartFile(entries []string, partNumber int, etag string) string {
|
func getPartFile(entriesTrie *trie.Trie, partNumber int, etag string) (partFile string) {
|
||||||
for _, entry := range entries {
|
for _, match := range entriesTrie.PrefixMatch(fmt.Sprintf("%.5d.%s.", partNumber, etag)) {
|
||||||
if strings.HasPrefix(entry, fmt.Sprintf("%.5d.%s.", partNumber, etag)) {
|
partFile = match
|
||||||
return entry
|
break
|
||||||
}
|
}
|
||||||
}
|
return partFile
|
||||||
return ""
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns the compressed offset which should be skipped.
|
// Returns the compressed offset which should be skipped.
|
||||||
|
@ -18,6 +18,7 @@ package cmd
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"reflect"
|
"reflect"
|
||||||
@ -27,6 +28,7 @@ import (
|
|||||||
"github.com/klauspost/compress/s2"
|
"github.com/klauspost/compress/s2"
|
||||||
"github.com/minio/minio/cmd/config/compress"
|
"github.com/minio/minio/cmd/config/compress"
|
||||||
"github.com/minio/minio/cmd/crypto"
|
"github.com/minio/minio/cmd/crypto"
|
||||||
|
"github.com/minio/minio/pkg/trie"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Tests validate bucket name.
|
// Tests validate bucket name.
|
||||||
@ -440,40 +442,22 @@ func TestExcludeForCompression(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test getPartFile function.
|
func BenchmarkGetPartFileWithTrie(b *testing.B) {
|
||||||
func TestGetPartFile(t *testing.T) {
|
b.ResetTimer()
|
||||||
testCases := []struct {
|
|
||||||
entries []string
|
entriesTrie := trie.NewTrie()
|
||||||
partNumber int
|
for i := 1; i <= 10000; i++ {
|
||||||
etag string
|
entriesTrie.Insert(fmt.Sprintf("%.5d.8a034f82cb9cb31140d87d3ce2a9ede3.67108864", i))
|
||||||
result string
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
entries: []string{"00001.8a034f82cb9cb31140d87d3ce2a9ede3.67108864", "fs.json", "00002.d73d8ab724016dfb051e2d3584495c54.32891137"},
|
|
||||||
partNumber: 1,
|
|
||||||
etag: "8a034f82cb9cb31140d87d3ce2a9ede3",
|
|
||||||
result: "00001.8a034f82cb9cb31140d87d3ce2a9ede3.67108864",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
entries: []string{"00001.8a034f82cb9cb31140d87d3ce2a9ede3.67108864", "fs.json", "00002.d73d8ab724016dfb051e2d3584495c54.32891137"},
|
|
||||||
partNumber: 2,
|
|
||||||
etag: "d73d8ab724016dfb051e2d3584495c54",
|
|
||||||
result: "00002.d73d8ab724016dfb051e2d3584495c54.32891137",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
entries: []string{"00001.8a034f82cb9cb31140d87d3ce2a9ede3.67108864", "fs.json", "00002.d73d8ab724016dfb051e2d3584495c54.32891137"},
|
|
||||||
partNumber: 1,
|
|
||||||
etag: "d73d8ab724016dfb051e2d3584495c54",
|
|
||||||
result: "",
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
for i, test := range testCases {
|
|
||||||
got := getPartFile(test.entries, test.partNumber, test.etag)
|
for i := 1; i <= 10000; i++ {
|
||||||
if got != test.result {
|
partFile := getPartFile(entriesTrie, i, "8a034f82cb9cb31140d87d3ce2a9ede3")
|
||||||
t.Errorf("Test %d - expected %s but received %s",
|
if partFile == "" {
|
||||||
i+1, test.result, got)
|
b.Fatal("partFile returned is empty")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
b.ReportAllocs()
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetActualSize(t *testing.T) {
|
func TestGetActualSize(t *testing.T) {
|
||||||
|
@ -21,7 +21,7 @@ package trie
|
|||||||
// Node trie tree node container carries value and children.
|
// Node trie tree node container carries value and children.
|
||||||
type Node struct {
|
type Node struct {
|
||||||
exists bool
|
exists bool
|
||||||
value interface{}
|
value string
|
||||||
child map[rune]*Node // runes as child.
|
child map[rune]*Node // runes as child.
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -29,7 +29,7 @@ type Node struct {
|
|||||||
func newNode() *Node {
|
func newNode() *Node {
|
||||||
return &Node{
|
return &Node{
|
||||||
exists: false,
|
exists: false,
|
||||||
value: nil,
|
value: "",
|
||||||
child: make(map[rune]*Node),
|
child: make(map[rune]*Node),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -65,16 +65,16 @@ func (t *Trie) Insert(key string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// PrefixMatch - prefix match.
|
// PrefixMatch - prefix match.
|
||||||
func (t *Trie) PrefixMatch(key string) []interface{} {
|
func (t *Trie) PrefixMatch(key string) []string {
|
||||||
node, _ := t.findNode(key)
|
node, _ := t.findNode(key)
|
||||||
if node != nil {
|
if node == nil {
|
||||||
return t.Walk(node)
|
return nil
|
||||||
}
|
}
|
||||||
return []interface{}{}
|
return t.Walk(node)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Walk the tree.
|
// Walk the tree.
|
||||||
func (t *Trie) Walk(node *Node) (ret []interface{}) {
|
func (t *Trie) Walk(node *Node) (ret []string) {
|
||||||
if node.exists {
|
if node.exists {
|
||||||
ret = append(ret, node.value)
|
ret = append(ret, node.value)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user