Use a buffer to collect SQL Select result rows (#7158)

Batching records into a single SQL Select message in the response
leads to significant speed up as the message header overhead is made
negligible.

This change leads to a speed up of 3-5x for queries that select many
small records.
This commit is contained in:
Aditya Manthramurthy
2019-01-28 20:00:18 -08:00
committed by Harshavardhana
parent 2786055df4
commit 91c839ad28
3 changed files with 59 additions and 9 deletions

View File

@@ -224,6 +224,9 @@ type messageWriter struct {
getProgressFunc func() (int64, int64)
bytesReturned int64
payloadBuffer []byte
payloadBufferIndex int
dataCh chan []byte
doneCh chan struct{}
closeCh chan struct{}
@@ -320,6 +323,25 @@ func (writer *messageWriter) close() {
}
}
const (
bufLength = maxRecordSize
)
// collectRecord - collects records into a buffer, and when it is
// full, sends a message with the collected payload.
func (writer *messageWriter) collectRecord(data []byte) (err error) {
freeSpace := bufLength - writer.payloadBufferIndex
if len(data) > freeSpace {
err = writer.FlushRecords()
if err != nil {
return err
}
}
copy(writer.payloadBuffer[writer.payloadBufferIndex:], data)
writer.payloadBufferIndex += len(data)
return nil
}
func (writer *messageWriter) send(data []byte) error {
err := func() error {
if atomic.LoadUint32(&writer.stopped) == 1 {
@@ -343,11 +365,17 @@ func (writer *messageWriter) send(data []byte) error {
}
func (writer *messageWriter) SendRecords(payload []byte) error {
err := writer.send(newRecordsMessage(payload))
if err == nil {
atomic.AddInt64(&writer.bytesReturned, int64(len(payload)))
return writer.collectRecord(payload)
}
func (writer *messageWriter) FlushRecords() (err error) {
err = writer.send(newRecordsMessage(writer.payloadBuffer[0:writer.payloadBufferIndex]))
if err != nil {
return err
}
return err
atomic.AddInt64(&writer.bytesReturned, int64(writer.payloadBufferIndex))
writer.payloadBufferIndex = 0
return nil
}
func (writer *messageWriter) SendStats(bytesScanned, bytesProcessed int64) error {
@@ -375,6 +403,8 @@ func newMessageWriter(w http.ResponseWriter, getProgressFunc func() (bytesScanne
writer: w,
getProgressFunc: getProgressFunc,
payloadBuffer: make([]byte, bufLength),
dataCh: make(chan []byte),
doneCh: make(chan struct{}),
closeCh: make(chan struct{}),