mirror of
https://github.com/minio/minio.git
synced 2025-01-11 23:13:23 -05:00
S3 Select: Concurrent LINES delimited json parsing (#8610)
The speedup is ~5x on a 6 core CPU
This commit is contained in:
parent
3b67f629a4
commit
bf3a97d3aa
227
pkg/s3select/json/preader.go
Normal file
227
pkg/s3select/json/preader.go
Normal file
@ -0,0 +1,227 @@
|
||||
/*
|
||||
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package json
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"io"
|
||||
"runtime"
|
||||
"sync"
|
||||
|
||||
"github.com/bcicen/jstream"
|
||||
"github.com/minio/minio/pkg/s3select/sql"
|
||||
)
|
||||
|
||||
// PReader - JSON record reader for S3Select.
|
||||
// Operates concurrently on line-delimited JSON.
|
||||
type PReader struct {
|
||||
args *ReaderArgs
|
||||
readCloser io.ReadCloser // raw input
|
||||
buf *bufio.Reader // input to the splitter
|
||||
current []jstream.KVS // current block of results to be returned
|
||||
recordsRead int // number of records read in current slice
|
||||
input chan *queueItem // input for workers
|
||||
queue chan *queueItem // output from workers in order
|
||||
err error // global error state, only touched by Reader.Read
|
||||
bufferPool sync.Pool // pool of []byte objects for input
|
||||
kvDstPool sync.Pool // pool of []jstream.KV used for output
|
||||
close chan struct{} // used for shutting down the splitter before end of stream
|
||||
readerWg sync.WaitGroup // used to keep track of async reader.
|
||||
}
|
||||
|
||||
// queueItem is an item in the queue.
|
||||
type queueItem struct {
|
||||
input []byte // raw input sent to the worker
|
||||
dst chan []jstream.KVS // result of block decode
|
||||
err error // any error encountered will be set here
|
||||
}
|
||||
|
||||
// Read - reads single record.
|
||||
// Once Read is called the previous record should no longer be referenced.
|
||||
func (r *PReader) Read(dst sql.Record) (sql.Record, error) {
|
||||
// If we have have any records left, return these before any error.
|
||||
for len(r.current) <= r.recordsRead {
|
||||
if r.err != nil {
|
||||
return nil, r.err
|
||||
}
|
||||
// Move to next block
|
||||
item, ok := <-r.queue
|
||||
if !ok {
|
||||
r.err = io.EOF
|
||||
return nil, r.err
|
||||
}
|
||||
//lint:ignore SA6002 Using pointer would allocate more since we would have to copy slice header before taking a pointer.
|
||||
r.kvDstPool.Put(r.current)
|
||||
r.current = <-item.dst
|
||||
r.err = item.err
|
||||
r.recordsRead = 0
|
||||
}
|
||||
kvRecord := r.current[r.recordsRead]
|
||||
r.recordsRead++
|
||||
|
||||
dstRec, ok := dst.(*Record)
|
||||
if !ok {
|
||||
dstRec = &Record{}
|
||||
}
|
||||
dstRec.KVS = kvRecord
|
||||
dstRec.SelectFormat = sql.SelectFmtJSON
|
||||
return dstRec, nil
|
||||
}
|
||||
|
||||
// Close - closes underlying reader.
|
||||
func (r *PReader) Close() error {
|
||||
if r.close != nil {
|
||||
close(r.close)
|
||||
r.readerWg.Wait()
|
||||
r.close = nil
|
||||
}
|
||||
r.recordsRead = len(r.current)
|
||||
if r.err == nil {
|
||||
r.err = io.EOF
|
||||
}
|
||||
return r.readCloser.Close()
|
||||
}
|
||||
|
||||
// nextSplit will attempt to skip a number of bytes and
|
||||
// return the buffer until the next newline occurs.
|
||||
// The last block will be sent along with an io.EOF.
|
||||
func (r *PReader) nextSplit(skip int, dst []byte) ([]byte, error) {
|
||||
if cap(dst) < skip {
|
||||
dst = make([]byte, 0, skip+1024)
|
||||
}
|
||||
dst = dst[:skip]
|
||||
if skip > 0 {
|
||||
n, err := io.ReadFull(r.buf, dst)
|
||||
if err != nil && err != io.ErrUnexpectedEOF {
|
||||
// If an EOF happens after reading some but not all the bytes,
|
||||
// ReadFull returns ErrUnexpectedEOF.
|
||||
return dst[:n], err
|
||||
}
|
||||
dst = dst[:n]
|
||||
if err == io.ErrUnexpectedEOF {
|
||||
return dst, io.EOF
|
||||
}
|
||||
}
|
||||
// Read until next line.
|
||||
in, err := r.buf.ReadBytes('\n')
|
||||
dst = append(dst, in...)
|
||||
return dst, err
|
||||
}
|
||||
|
||||
// jsonSplitSize is the size of each block.
|
||||
// Blocks will read this much and find the first following newline.
|
||||
// 128KB appears to be a very reasonable default.
|
||||
const jsonSplitSize = 128 << 10
|
||||
|
||||
// startReaders will read the header if needed and spin up a parser
|
||||
// and a number of workers based on GOMAXPROCS.
|
||||
// If an error is returned no goroutines have been started and r.err will have been set.
|
||||
func (r *PReader) startReaders() {
|
||||
r.bufferPool.New = func() interface{} {
|
||||
return make([]byte, jsonSplitSize+1024)
|
||||
}
|
||||
|
||||
// Create queue
|
||||
r.queue = make(chan *queueItem, runtime.GOMAXPROCS(0))
|
||||
r.input = make(chan *queueItem, runtime.GOMAXPROCS(0))
|
||||
r.readerWg.Add(1)
|
||||
|
||||
// Start splitter
|
||||
go func() {
|
||||
defer close(r.input)
|
||||
defer close(r.queue)
|
||||
defer r.readerWg.Done()
|
||||
for {
|
||||
next, err := r.nextSplit(jsonSplitSize, r.bufferPool.Get().([]byte))
|
||||
q := queueItem{
|
||||
input: next,
|
||||
dst: make(chan []jstream.KVS, 1),
|
||||
err: err,
|
||||
}
|
||||
select {
|
||||
case <-r.close:
|
||||
return
|
||||
case r.queue <- &q:
|
||||
}
|
||||
|
||||
select {
|
||||
case <-r.close:
|
||||
return
|
||||
case r.input <- &q:
|
||||
}
|
||||
if err != nil {
|
||||
// Exit on any error.
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Start parsers
|
||||
for i := 0; i < runtime.GOMAXPROCS(0); i++ {
|
||||
go func() {
|
||||
for in := range r.input {
|
||||
if len(in.input) == 0 {
|
||||
in.dst <- nil
|
||||
continue
|
||||
}
|
||||
dst, ok := r.kvDstPool.Get().([]jstream.KVS)
|
||||
if !ok {
|
||||
dst = make([]jstream.KVS, 0, 1000)
|
||||
}
|
||||
|
||||
d := jstream.NewDecoder(bytes.NewBuffer(in.input), 0).ObjectAsKVS()
|
||||
stream := d.Stream()
|
||||
all := dst[:0]
|
||||
for mv := range stream {
|
||||
var kvs jstream.KVS
|
||||
if mv.ValueType == jstream.Object {
|
||||
// This is a JSON object type (that preserves key
|
||||
// order)
|
||||
kvs = mv.Value.(jstream.KVS)
|
||||
} else {
|
||||
// To be AWS S3 compatible Select for JSON needs to
|
||||
// output non-object JSON as single column value
|
||||
// i.e. a map with `_1` as key and value as the
|
||||
// non-object.
|
||||
kvs = jstream.KVS{jstream.KV{Key: "_1", Value: mv.Value}}
|
||||
}
|
||||
all = append(all, kvs)
|
||||
}
|
||||
// We don't need the input any more.
|
||||
//lint:ignore SA6002 Using pointer would allocate more since we would have to copy slice header before taking a pointer.
|
||||
r.bufferPool.Put(in.input)
|
||||
in.input = nil
|
||||
in.err = d.Err()
|
||||
in.dst <- all
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// NewPReader - creates new parallel JSON reader using readCloser.
|
||||
// Should only be used for LINES types.
|
||||
func NewPReader(readCloser io.ReadCloser, args *ReaderArgs) *PReader {
|
||||
r := &PReader{
|
||||
args: args,
|
||||
buf: bufio.NewReaderSize(readCloser, jsonSplitSize*2),
|
||||
readCloser: readCloser,
|
||||
close: make(chan struct{}),
|
||||
}
|
||||
r.startReaders()
|
||||
return r
|
||||
}
|
106
pkg/s3select/json/preader_test.go
Normal file
106
pkg/s3select/json/preader_test.go
Normal file
@ -0,0 +1,106 @@
|
||||
/*
|
||||
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package json
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/minio/minio/pkg/s3select/sql"
|
||||
)
|
||||
|
||||
func TestNewPReader(t *testing.T) {
|
||||
files, err := ioutil.ReadDir("testdata")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for _, file := range files {
|
||||
t.Run(file.Name(), func(t *testing.T) {
|
||||
f, err := os.Open(filepath.Join("testdata", file.Name()))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r := NewPReader(f, &ReaderArgs{})
|
||||
var record sql.Record
|
||||
for {
|
||||
record, err = r.Read(record)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
r.Close()
|
||||
if err != io.EOF {
|
||||
t.Fatalf("Reading failed with %s, %s", err, file.Name())
|
||||
}
|
||||
})
|
||||
|
||||
t.Run(file.Name()+"-close", func(t *testing.T) {
|
||||
f, err := os.Open(filepath.Join("testdata", file.Name()))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r := NewPReader(f, &ReaderArgs{})
|
||||
r.Close()
|
||||
var record sql.Record
|
||||
for {
|
||||
record, err = r.Read(record)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != io.EOF {
|
||||
t.Fatalf("Reading failed with %s, %s", err, file.Name())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkPReader(b *testing.B) {
|
||||
files, err := ioutil.ReadDir("testdata")
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
for _, file := range files {
|
||||
b.Run(file.Name(), func(b *testing.B) {
|
||||
f, err := ioutil.ReadFile(filepath.Join("testdata", file.Name()))
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
b.SetBytes(int64(len(f)))
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
var record sql.Record
|
||||
for i := 0; i < b.N; i++ {
|
||||
r := NewPReader(ioutil.NopCloser(bytes.NewBuffer(f)), &ReaderArgs{})
|
||||
for {
|
||||
record, err = r.Read(record)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
r.Close()
|
||||
if err != io.EOF {
|
||||
b.Fatalf("Reading failed with %s, %s", err, file.Name())
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -316,7 +316,11 @@ func (s3Select *S3Select) Open(getReader func(offset, length int64) (io.ReadClos
|
||||
return err
|
||||
}
|
||||
|
||||
s3Select.recordReader = json.NewReader(s3Select.progressReader, &s3Select.Input.JSONArgs)
|
||||
if strings.EqualFold(s3Select.Input.JSONArgs.ContentType, "lines") {
|
||||
s3Select.recordReader = json.NewPReader(s3Select.progressReader, &s3Select.Input.JSONArgs)
|
||||
} else {
|
||||
s3Select.recordReader = json.NewReader(s3Select.progressReader, &s3Select.Input.JSONArgs)
|
||||
}
|
||||
return nil
|
||||
case parquetFormat:
|
||||
var err error
|
||||
|
Loading…
Reference in New Issue
Block a user