From ec9bfd3aefda439fa4701da8746cd4f74b027f97 Mon Sep 17 00:00:00 2001 From: Yao Zongyou Date: Sat, 31 Aug 2019 15:07:40 +0800 Subject: [PATCH] speed up the performance of s3select on csv (#7945) --- pkg/s3select/csv/reader.go | 28 +++++++------ pkg/s3select/csv/reader_test.go | 12 +++--- pkg/s3select/csv/record.go | 22 +++++----- pkg/s3select/json/record.go | 24 +++++------ pkg/s3select/message.go | 20 ++++++--- pkg/s3select/select.go | 59 ++++++++++++++++++++++----- pkg/s3select/select_benchmark_test.go | 30 +++++++------- pkg/s3select/sql/record.go | 10 +++-- 8 files changed, 128 insertions(+), 77 deletions(-) diff --git a/pkg/s3select/csv/reader.go b/pkg/s3select/csv/reader.go index 3438d09d4..e14f85851 100644 --- a/pkg/s3select/csv/reader.go +++ b/pkg/s3select/csv/reader.go @@ -82,10 +82,11 @@ func (rr *recordReader) Read(p []byte) (n int, err error) { // Reader - CSV record reader for S3Select. type Reader struct { - args *ReaderArgs - readCloser io.ReadCloser - csvReader *csv.Reader - columnNames []string + args *ReaderArgs + readCloser io.ReadCloser + csvReader *csv.Reader + columnNames []string + nameIndexMap map[string]int64 } // Read - reads single record. @@ -99,23 +100,24 @@ func (r *Reader) Read() (sql.Record, error) { return nil, err } - columnNames := r.columnNames - if columnNames == nil { - columnNames = make([]string, len(csvRecord)) + if r.columnNames == nil { + r.columnNames = make([]string, len(csvRecord)) for i := range csvRecord { - columnNames[i] = fmt.Sprintf("_%v", i+1) + r.columnNames[i] = fmt.Sprintf("_%v", i+1) } } - nameIndexMap := make(map[string]int64) - for i := range columnNames { - nameIndexMap[columnNames[i]] = int64(i) + if r.nameIndexMap == nil { + r.nameIndexMap = make(map[string]int64) + for i := range r.columnNames { + r.nameIndexMap[r.columnNames[i]] = int64(i) + } } return &Record{ - columnNames: columnNames, + columnNames: r.columnNames, csvRecord: csvRecord, - nameIndexMap: nameIndexMap, + nameIndexMap: r.nameIndexMap, }, nil } diff --git a/pkg/s3select/csv/reader_test.go b/pkg/s3select/csv/reader_test.go index 68b222ef7..81853e42f 100644 --- a/pkg/s3select/csv/reader_test.go +++ b/pkg/s3select/csv/reader_test.go @@ -17,6 +17,7 @@ package csv import ( + "bytes" "io" "io/ioutil" "strings" @@ -39,6 +40,7 @@ func TestRead(t *testing.T) { for i, c := range cases { var err error var record sql.Record + var result bytes.Buffer r, _ := NewReader(ioutil.NopCloser(strings.NewReader(c.content)), &ReaderArgs{ FileHeaderInfo: none, @@ -51,22 +53,22 @@ func TestRead(t *testing.T) { unmarshaled: true, }) - result := "" for { record, err = r.Read() if err != nil { break } - s, _ := record.MarshalCSV([]rune(c.fieldDelimiter)[0]) - result += string(s) + c.recordDelimiter + record.WriteCSV(&result, []rune(c.fieldDelimiter)[0]) + result.Truncate(result.Len() - 1) + result.WriteString(c.recordDelimiter) } r.Close() if err != io.EOF { t.Fatalf("Case %d failed with %s", i, err) } - if result != c.content { - t.Errorf("Case %d failed: expected %v result %v", i, c.content, result) + if result.String() != c.content { + t.Errorf("Case %d failed: expected %v result %v", i, c.content, result.String()) } } } diff --git a/pkg/s3select/csv/record.go b/pkg/s3select/csv/record.go index 478b222a7..3152749a6 100644 --- a/pkg/s3select/csv/record.go +++ b/pkg/s3select/csv/record.go @@ -17,11 +17,11 @@ package csv import ( - "bytes" "encoding/csv" "encoding/json" "errors" "fmt" + "io" "github.com/bcicen/jstream" "github.com/minio/minio/pkg/s3select/sql" @@ -61,30 +61,28 @@ func (r *Record) Set(name string, value *sql.Value) error { return nil } -// MarshalCSV - encodes to CSV data. -func (r *Record) MarshalCSV(fieldDelimiter rune) ([]byte, error) { - buf := new(bytes.Buffer) - w := csv.NewWriter(buf) +// WriteCSV - encodes to CSV data. +func (r *Record) WriteCSV(writer io.Writer, fieldDelimiter rune) error { + w := csv.NewWriter(writer) w.Comma = fieldDelimiter if err := w.Write(r.csvRecord); err != nil { - return nil, err + return err } w.Flush() if err := w.Error(); err != nil { - return nil, err + return err } - data := buf.Bytes() - return data[:len(data)-1], nil + return nil } -// MarshalJSON - encodes to JSON data. -func (r *Record) MarshalJSON() ([]byte, error) { +// WriteJSON - encodes to JSON data. +func (r *Record) WriteJSON(writer io.Writer) error { var kvs jstream.KVS = make([]jstream.KV, len(r.columnNames)) for i := 0; i < len(r.columnNames); i++ { kvs[i] = jstream.KV{Key: r.columnNames[i], Value: r.csvRecord[i]} } - return json.Marshal(kvs) + return json.NewEncoder(writer).Encode(kvs) } // Raw - returns the underlying data with format info. diff --git a/pkg/s3select/json/record.go b/pkg/s3select/json/record.go index 9c305621f..f252d5c29 100644 --- a/pkg/s3select/json/record.go +++ b/pkg/s3select/json/record.go @@ -17,11 +17,11 @@ package json import ( - "bytes" "encoding/csv" "encoding/json" "errors" "fmt" + "io" "strings" "github.com/bcicen/jstream" @@ -77,8 +77,8 @@ func (r *Record) Set(name string, value *sql.Value) error { return nil } -// MarshalCSV - encodes to CSV data. -func (r *Record) MarshalCSV(fieldDelimiter rune) ([]byte, error) { +// WriteCSV - encodes to CSV data. +func (r *Record) WriteCSV(writer io.Writer, fieldDelimiter rune) error { var csvRecord []string for _, kv := range r.KVS { var columnValue string @@ -90,24 +90,22 @@ func (r *Record) MarshalCSV(fieldDelimiter rune) ([]byte, error) { case RawJSON: columnValue = string([]byte(val)) default: - return nil, errors.New("Cannot marshal unhandled type") + return errors.New("Cannot marshal unhandled type") } csvRecord = append(csvRecord, columnValue) } - buf := new(bytes.Buffer) - w := csv.NewWriter(buf) + w := csv.NewWriter(writer) w.Comma = fieldDelimiter if err := w.Write(csvRecord); err != nil { - return nil, err + return err } w.Flush() if err := w.Error(); err != nil { - return nil, err + return err } - data := buf.Bytes() - return data[:len(data)-1], nil + return nil } // Raw - returns the underlying representation. @@ -115,9 +113,9 @@ func (r *Record) Raw() (sql.SelectObjectFormat, interface{}) { return r.SelectFormat, r.KVS } -// MarshalJSON - encodes to JSON data. -func (r *Record) MarshalJSON() ([]byte, error) { - return json.Marshal(r.KVS) +// WriteJSON - encodes to JSON data. +func (r *Record) WriteJSON(writer io.Writer) error { + return json.NewEncoder(writer).Encode(r.KVS) } // Replace the underlying buffer of json data. diff --git a/pkg/s3select/message.go b/pkg/s3select/message.go index cac179393..4b6f2b6cc 100644 --- a/pkg/s3select/message.go +++ b/pkg/s3select/message.go @@ -242,7 +242,7 @@ type messageWriter struct { payloadBuffer []byte payloadBufferIndex int - payloadCh chan []byte + payloadCh chan *bytes.Buffer finBytesScanned, finBytesProcessed int64 @@ -308,10 +308,10 @@ func (writer *messageWriter) start() { } writer.write(endMessage) } else { - for len(payload) > 0 { - copiedLen := copy(writer.payloadBuffer[writer.payloadBufferIndex:], payload) + for payload.Len() > 0 { + copiedLen := copy(writer.payloadBuffer[writer.payloadBufferIndex:], payload.Bytes()) writer.payloadBufferIndex += copiedLen - payload = payload[copiedLen:] + payload.Next(copiedLen) // If buffer is filled, flush it now! freeSpace := bufLength - writer.payloadBufferIndex @@ -322,6 +322,8 @@ func (writer *messageWriter) start() { } } } + + bufPool.Put(payload) } case <-recordStagingTicker.C: @@ -349,10 +351,16 @@ func (writer *messageWriter) start() { if progressTicker != nil { progressTicker.Stop() } + + // Whatever drain the payloadCh to prevent from memory leaking. + for len(writer.payloadCh) > 0 { + payload := <-writer.payloadCh + bufPool.Put(payload) + } } // Sends a single whole record. -func (writer *messageWriter) SendRecord(payload []byte) error { +func (writer *messageWriter) SendRecord(payload *bytes.Buffer) error { select { case writer.payloadCh <- payload: return nil @@ -409,7 +417,7 @@ func newMessageWriter(w http.ResponseWriter, getProgressFunc func() (bytesScanne getProgressFunc: getProgressFunc, payloadBuffer: make([]byte, bufLength), - payloadCh: make(chan []byte), + payloadCh: make(chan *bytes.Buffer), errCh: make(chan []byte), doneCh: make(chan struct{}), diff --git a/pkg/s3select/select.go b/pkg/s3select/select.go index 04ffce196..923d3a920 100644 --- a/pkg/s3select/select.go +++ b/pkg/s3select/select.go @@ -17,11 +17,15 @@ package s3select import ( + "bufio" + "bytes" "encoding/xml" "fmt" "io" + "io/ioutil" "net/http" "strings" + "sync" "github.com/minio/minio/pkg/s3select/csv" "github.com/minio/minio/pkg/s3select/json" @@ -53,6 +57,20 @@ const ( maxRecordSize = 1 << 20 // 1 MiB ) +var bufPool = sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, +} + +var bufioWriterPool = sync.Pool{ + New: func() interface{} { + // ioutil.Discard is just used to create the writer. Actual destination + // writer is set later by Reset() before using it. + return bufio.NewWriter(ioutil.Discard) + }, +} + // UnmarshalXML - decodes XML data. func (c *CompressionType) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error { var s string @@ -306,22 +324,36 @@ func (s3Select *S3Select) Open(getReader func(offset, length int64) (io.ReadClos panic(fmt.Errorf("unknown input format '%v'", s3Select.Input.format)) } -func (s3Select *S3Select) marshal(record sql.Record) ([]byte, error) { +func (s3Select *S3Select) marshal(buf *bytes.Buffer, record sql.Record) error { switch s3Select.Output.format { case csvFormat: - data, err := record.MarshalCSV([]rune(s3Select.Output.CSVArgs.FieldDelimiter)[0]) + // Use bufio Writer to prevent csv.Writer from allocating a new buffer. + bufioWriter := bufioWriterPool.Get().(*bufio.Writer) + defer func() { + bufioWriter.Reset(ioutil.Discard) + bufioWriterPool.Put(bufioWriter) + }() + + bufioWriter.Reset(buf) + err := record.WriteCSV(bufioWriter, []rune(s3Select.Output.CSVArgs.FieldDelimiter)[0]) if err != nil { - return nil, err + return err } - return append(data, []byte(s3Select.Output.CSVArgs.RecordDelimiter)...), nil + buf.Truncate(buf.Len() - 1) + buf.WriteString(s3Select.Output.CSVArgs.RecordDelimiter) + + return nil case jsonFormat: - data, err := record.MarshalJSON() + err := record.WriteJSON(buf) if err != nil { - return nil, err + return err } - return append(data, []byte(s3Select.Output.JSONArgs.RecordDelimiter)...), nil + buf.Truncate(buf.Len() - 1) + buf.WriteString(s3Select.Output.JSONArgs.RecordDelimiter) + + return nil } panic(fmt.Errorf("unknown output format '%v'", s3Select.Output.format)) @@ -338,24 +370,29 @@ func (s3Select *S3Select) Evaluate(w http.ResponseWriter) { var inputRecord sql.Record var outputRecord sql.Record var err error - var data []byte sendRecord := func() bool { if outputRecord == nil { return true } - if data, err = s3Select.marshal(outputRecord); err != nil { + buf := bufPool.Get().(*bytes.Buffer) + buf.Reset() + + if err = s3Select.marshal(buf, outputRecord); err != nil { + bufPool.Put(buf) return false } - if len(data) > maxRecordSize { + if buf.Len() > maxRecordSize { writer.FinishWithError("OverMaxRecordSize", "The length of a record in the input or result is greater than maxCharsPerRecord of 1 MB.") + bufPool.Put(buf) return false } - if err = writer.SendRecord(data); err != nil { + if err = writer.SendRecord(buf); err != nil { // FIXME: log this error. err = nil + bufPool.Put(buf) return false } diff --git a/pkg/s3select/select_benchmark_test.go b/pkg/s3select/select_benchmark_test.go index 37187c493..e5e6ebf38 100644 --- a/pkg/s3select/select_benchmark_test.go +++ b/pkg/s3select/select_benchmark_test.go @@ -99,26 +99,28 @@ func benchmarkSelect(b *testing.B, count int, query string) { `) - s3Select, err := NewS3Select(bytes.NewReader(requestXML)) - if err != nil { - b.Fatal(err) - } - csvData := genSampleCSVData(count) b.ResetTimer() b.ReportAllocs() - for i := 0; i < b.N; i++ { - if err = s3Select.Open(func(offset, length int64) (io.ReadCloser, error) { - return ioutil.NopCloser(bytes.NewReader(csvData)), nil - }); err != nil { - b.Fatal(err) - } + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + s3Select, err := NewS3Select(bytes.NewReader(requestXML)) + if err != nil { + b.Fatal(err) + } - s3Select.Evaluate(&nullResponseWriter{}) - s3Select.Close() - } + if err = s3Select.Open(func(offset, length int64) (io.ReadCloser, error) { + return ioutil.NopCloser(bytes.NewReader(csvData)), nil + }); err != nil { + b.Fatal(err) + } + + s3Select.Evaluate(&nullResponseWriter{}) + s3Select.Close() + } + }) } func benchmarkSelectAll(b *testing.B, count int) { diff --git a/pkg/s3select/sql/record.go b/pkg/s3select/sql/record.go index 733ebd0ee..60604a383 100644 --- a/pkg/s3select/sql/record.go +++ b/pkg/s3select/sql/record.go @@ -16,7 +16,11 @@ package sql -import "github.com/bcicen/jstream" +import ( + "io" + + "github.com/bcicen/jstream" +) // SelectObjectFormat specifies the format of the underlying data type SelectObjectFormat int @@ -36,8 +40,8 @@ const ( type Record interface { Get(name string) (*Value, error) Set(name string, value *Value) error - MarshalCSV(fieldDelimiter rune) ([]byte, error) - MarshalJSON() ([]byte, error) + WriteCSV(writer io.Writer, fieldDelimiter rune) error + WriteJSON(writer io.Writer) error // Returns underlying representation Raw() (SelectObjectFormat, interface{})