S3 Select: optimize output (#8238)

Queue output items and reuse them.
Remove the unneeded type system in sql and just use the Go type system.

In best case this is more than an order of magnitude speedup:

```
BenchmarkSelectAll_1M-12    	       1	1841049400 ns/op	274299728 B/op	 4198522 allocs/op
BenchmarkSelectAll_1M-12    	      14	  84833400 ns/op	169228346 B/op	 3146541 allocs/op
```
This commit is contained in:
Klaus Post
2019-09-16 17:26:27 -07:00
committed by kannappanr
parent 017456df63
commit c9b8bd8de2
13 changed files with 556 additions and 231 deletions

View File

@@ -61,7 +61,8 @@ const (
var bufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
// make a buffer with a reasonable capacity.
return bytes.NewBuffer(make([]byte, 0, maxRecordSize))
},
}
@@ -341,7 +342,10 @@ func (s3Select *S3Select) marshal(buf *bytes.Buffer, record sql.Record) error {
if err != nil {
return err
}
err = bufioWriter.Flush()
if err != nil {
return err
}
buf.Truncate(buf.Len() - 1)
buf.WriteString(s3Select.Output.CSVArgs.RecordDelimiter)
@@ -370,25 +374,33 @@ func (s3Select *S3Select) Evaluate(w http.ResponseWriter) {
writer := newMessageWriter(w, getProgressFunc)
var inputRecord sql.Record
var outputRecord sql.Record
var outputQueue []sql.Record
// Create queue based on the type.
if s3Select.statement.IsAggregated() {
outputQueue = make([]sql.Record, 0, 1)
} else {
outputQueue = make([]sql.Record, 0, 100)
}
var err error
sendRecord := func() bool {
if outputRecord == nil {
return true
}
buf := bufPool.Get().(*bytes.Buffer)
buf.Reset()
if err = s3Select.marshal(buf, outputRecord); err != nil {
bufPool.Put(buf)
return false
}
if buf.Len() > maxRecordSize {
writer.FinishWithError("OverMaxRecordSize", "The length of a record in the input or result is greater than maxCharsPerRecord of 1 MB.")
bufPool.Put(buf)
return false
for _, outputRecord := range outputQueue {
if outputRecord == nil {
continue
}
before := buf.Len()
if err = s3Select.marshal(buf, outputRecord); err != nil {
bufPool.Put(buf)
return false
}
if buf.Len()-before > maxRecordSize {
writer.FinishWithError("OverMaxRecordSize", "The length of a record in the input or result is greater than maxCharsPerRecord of 1 MB.")
bufPool.Put(buf)
return false
}
}
if err = writer.SendRecord(buf); err != nil {
@@ -397,7 +409,7 @@ func (s3Select *S3Select) Evaluate(w http.ResponseWriter) {
bufPool.Put(buf)
return false
}
outputQueue = outputQueue[:0]
return true
}
@@ -417,14 +429,15 @@ func (s3Select *S3Select) Evaluate(w http.ResponseWriter) {
}
if s3Select.statement.IsAggregated() {
outputRecord = s3Select.outputRecord()
outputRecord := s3Select.outputRecord()
if err = s3Select.statement.AggregateResult(outputRecord); err != nil {
break
}
outputQueue = append(outputQueue, outputRecord)
}
if !sendRecord() {
break
}
if !sendRecord() {
break
}
if err = writer.Finish(s3Select.getProgress()); err != nil {
@@ -443,10 +456,33 @@ func (s3Select *S3Select) Evaluate(w http.ResponseWriter) {
break
}
} else {
outputRecord = s3Select.outputRecord()
if outputRecord, err = s3Select.statement.Eval(inputRecord, outputRecord); err != nil {
var outputRecord sql.Record
// We will attempt to reuse the records in the table.
// The type of these should not change.
// The queue should always have at least one entry left for this to work.
outputQueue = outputQueue[:len(outputQueue)+1]
if t := outputQueue[len(outputQueue)-1]; t != nil {
// If the output record is already set, we reuse it.
outputRecord = t
outputRecord.Reset()
} else {
// Create new one
outputRecord = s3Select.outputRecord()
outputQueue[len(outputQueue)-1] = outputRecord
}
if err = s3Select.statement.Eval(inputRecord, outputRecord); err != nil {
break
}
if outputRecord == nil {
// This should not be written.
// Remove it from the queue.
outputQueue = outputQueue[:len(outputQueue)-1]
continue
}
if len(outputQueue) < cap(outputQueue) {
continue
}
if !sendRecord() {
break