diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 9b6895059..e69dbdc71 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -33,6 +33,7 @@ import ( snappy "github.com/golang/snappy" "github.com/gorilla/mux" + "github.com/klauspost/readahead" miniogo "github.com/minio/minio-go" "github.com/minio/minio/cmd/crypto" "github.com/minio/minio/cmd/logger" @@ -251,7 +252,10 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r } } - s3s, err := s3select.New(gr, objInfo.Size, selectReq) + reader := readahead.NewReader(gr) + defer reader.Close() + + s3s, err := s3select.New(reader, objInfo.GetActualSize(), selectReq) if err != nil { writeErrorResponse(w, toAPIErrorCode(err), r.URL) return @@ -368,6 +372,7 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req return } defer gr.Close() + objInfo := gr.ObjInfo if objectAPI.IsEncryptionSupported() { diff --git a/pkg/s3select/format/csv/csv.go b/pkg/s3select/format/csv/csv.go index 431804676..990a153da 100644 --- a/pkg/s3select/format/csv/csv.go +++ b/pkg/s3select/format/csv/csv.go @@ -17,14 +17,12 @@ package csv import ( - "compress/bzip2" "encoding/csv" "encoding/xml" "io" "strconv" "strings" - gzip "github.com/klauspost/pgzip" "github.com/minio/minio/pkg/ioutil" "github.com/minio/minio/pkg/s3select/format" ) @@ -89,28 +87,13 @@ type cinput struct { // Otherwise, the returned reader can be reliably consumed with Read(). // until Read() return err. func New(opts *Options) (format.Select, error) { - myReader := opts.ReadFrom - var tempBytesScanned int64 - tempBytesScanned = 0 - switch opts.Compressed { - case "GZIP": - tempBytesScanned = opts.StreamSize - var err error - if myReader, err = gzip.NewReader(opts.ReadFrom); err != nil { - return nil, format.ErrTruncatedInput - } - case "BZIP2": - tempBytesScanned = opts.StreamSize - myReader = bzip2.NewReader(opts.ReadFrom) - } - // DelimitedReader treats custom record delimiter like `\r\n`,`\r`,`ab` etc and replaces it with `\n`. - normalizedReader := ioutil.NewDelimitedReader(myReader, []rune(opts.RecordDelimiter)) + normalizedReader := ioutil.NewDelimitedReader(opts.ReadFrom, []rune(opts.RecordDelimiter)) reader := &cinput{ options: opts, reader: csv.NewReader(normalizedReader), } - reader.stats.BytesScanned = tempBytesScanned + reader.stats.BytesScanned = opts.StreamSize reader.stats.BytesProcessed = 0 reader.stats.BytesReturned = 0 diff --git a/pkg/s3select/format/json/json.go b/pkg/s3select/format/json/json.go index fab48bcf5..6baf5ddd6 100644 --- a/pkg/s3select/format/json/json.go +++ b/pkg/s3select/format/json/json.go @@ -17,13 +17,11 @@ package json import ( - "compress/bzip2" "encoding/json" "encoding/xml" "io" jsoniter "github.com/json-iterator/go" - gzip "github.com/klauspost/pgzip" "github.com/minio/minio/pkg/s3select/format" ) @@ -75,26 +73,11 @@ type jinput struct { // Otherwise, the returned reader can be reliably consumed with jsonRead() // until jsonRead() returns nil. func New(opts *Options) (format.Select, error) { - myReader := opts.ReadFrom - var tempBytesScanned int64 - tempBytesScanned = 0 - switch opts.Compressed { - case "GZIP": - tempBytesScanned = opts.StreamSize - var err error - if myReader, err = gzip.NewReader(opts.ReadFrom); err != nil { - return nil, format.ErrTruncatedInput - } - case "BZIP2": - tempBytesScanned = opts.StreamSize - myReader = bzip2.NewReader(opts.ReadFrom) - } - reader := &jinput{ options: opts, - reader: jsoniter.NewDecoder(myReader), + reader: jsoniter.NewDecoder(opts.ReadFrom), } - reader.stats.BytesScanned = tempBytesScanned + reader.stats.BytesScanned = opts.StreamSize reader.stats.BytesProcessed = 0 reader.stats.BytesReturned = 0 diff --git a/pkg/s3select/helpers.go b/pkg/s3select/helpers.go index ae6375415..df8d5ad60 100644 --- a/pkg/s3select/helpers.go +++ b/pkg/s3select/helpers.go @@ -490,17 +490,6 @@ func likeConvert(pattern string, record string) (bool, error) { return true, nil } -// TrimQuotes allows the following to occur select "name", we need to trim the -// quotes to reference our map of columnNames. -func trimQuotes(s string) string { - if len(s) >= 2 { - if c := s[len(s)-1]; s[0] == c && (c == '"') { - return s[1 : len(s)-1] - } - } - return s -} - // cleanCol cleans a column name from the parser so that the name is returned to // original. func cleanCol(myCol string, alias string) string { @@ -641,7 +630,7 @@ func evaluateParserType(col *sqlparser.SQLVal) (interface{}, error) { // parseErrs is the function which handles all the errors that could occur // through use of function arguments such as column names in NULLIF -func parseErrs(columnNames []string, whereClause interface{}, alias string, myFuncs *SelectFuncs, f format.Select) error { +func parseErrs(columnNames []string, whereClause interface{}, alias string, myFuncs SelectFuncs, f format.Select) error { // Below code cleans up column names. processColumnNames(columnNames, alias, f) if columnNames[0] != "*" { diff --git a/pkg/s3select/input.go b/pkg/s3select/input.go index 3c8a2e430..36937c7fc 100644 --- a/pkg/s3select/input.go +++ b/pkg/s3select/input.go @@ -18,6 +18,8 @@ package s3select import ( "bytes" + "compress/bzip2" + "compress/gzip" "io" "net/http" "strings" @@ -26,6 +28,8 @@ import ( "github.com/minio/minio/pkg/s3select/format" "github.com/minio/minio/pkg/s3select/format/csv" "github.com/minio/minio/pkg/s3select/format/json" + + humanize "github.com/dustin/go-humanize" ) const ( @@ -61,7 +65,16 @@ func cleanExpr(expr string) string { } // New - initialize new select format -func New(gr io.Reader, size int64, req ObjectSelectRequest) (s3s format.Select, err error) { +func New(reader io.Reader, size int64, req ObjectSelectRequest) (s3s format.Select, err error) { + switch req.InputSerialization.CompressionType { + case SelectCompressionGZIP: + if reader, err = gzip.NewReader(reader); err != nil { + return nil, format.ErrTruncatedInput + } + case SelectCompressionBZIP: + reader = bzip2.NewReader(reader) + } + // Initializating options for CSV if req.InputSerialization.CSV != nil { if req.OutputSerialization.CSV.FieldDelimiter == "" { @@ -79,7 +92,7 @@ func New(gr io.Reader, size int64, req ObjectSelectRequest) (s3s format.Select, FieldDelimiter: req.InputSerialization.CSV.FieldDelimiter, Comments: req.InputSerialization.CSV.Comments, Name: "S3Object", // Default table name for all objects - ReadFrom: gr, + ReadFrom: reader, Compressed: string(req.InputSerialization.CompressionType), Expression: cleanExpr(req.Expression), OutputFieldDelimiter: req.OutputSerialization.CSV.FieldDelimiter, @@ -91,7 +104,7 @@ func New(gr io.Reader, size int64, req ObjectSelectRequest) (s3s format.Select, // Initializating options for JSON s3s, err = json.New(&json.Options{ Name: "S3Object", // Default table name for all objects - ReadFrom: gr, + ReadFrom: reader, Compressed: string(req.InputSerialization.CompressionType), Expression: cleanExpr(req.Expression), StreamSize: size, @@ -106,8 +119,8 @@ func New(gr io.Reader, size int64, req ObjectSelectRequest) (s3s format.Select, // response writer in a streaming fashion so that the client can actively use // the results before the query is finally finished executing. The func Execute(writer io.Writer, f format.Select) error { - myRow := make(chan *Row) - curBuf := bytes.NewBuffer(make([]byte, 1000000)) + myRow := make(chan Row, 1000) + curBuf := bytes.NewBuffer(make([]byte, humanize.MiByte)) curBuf.Reset() progressTicker := time.NewTicker(progressTime) continuationTimer := time.NewTimer(continuationTime) @@ -115,13 +128,11 @@ func Execute(writer io.Writer, f format.Select) error { defer continuationTimer.Stop() go runSelectParser(f, myRow) - for { select { case row, ok := <-myRow: if ok && row.err != nil { - errorMessage := writeErrorMessage(row.err, curBuf) - _, err := errorMessage.WriteTo(writer) + _, err := writeErrorMessage(row.err, curBuf).WriteTo(writer) flusher, okFlush := writer.(http.Flusher) if okFlush { flusher.Flush() @@ -133,8 +144,7 @@ func Execute(writer io.Writer, f format.Select) error { close(myRow) return nil } else if ok { - message := writeRecordMessage(row.record, curBuf) - _, err := message.WriteTo(writer) + _, err := writeRecordMessage(row.record, curBuf).WriteTo(writer) flusher, okFlush := writer.(http.Flusher) if okFlush { flusher.Flush() @@ -153,8 +163,7 @@ func Execute(writer io.Writer, f format.Select) error { if err != nil { return err } - statMessage := writeStatMessage(statPayload, curBuf) - _, err = statMessage.WriteTo(writer) + _, err = writeStatMessage(statPayload, curBuf).WriteTo(writer) flusher, ok := writer.(http.Flusher) if ok { flusher.Flush() @@ -163,8 +172,7 @@ func Execute(writer io.Writer, f format.Select) error { return err } curBuf.Reset() - message := writeEndMessage(curBuf) - _, err = message.WriteTo(writer) + _, err = writeEndMessage(curBuf).WriteTo(writer) flusher, ok = writer.(http.Flusher) if ok { flusher.Flush() @@ -182,8 +190,7 @@ func Execute(writer io.Writer, f format.Select) error { if err != nil { return err } - progressMessage := writeProgressMessage(progressPayload, curBuf) - _, err = progressMessage.WriteTo(writer) + _, err = writeProgressMessage(progressPayload, curBuf).WriteTo(writer) flusher, ok := writer.(http.Flusher) if ok { flusher.Flush() @@ -194,8 +201,7 @@ func Execute(writer io.Writer, f format.Select) error { curBuf.Reset() } case <-continuationTimer.C: - message := writeContinuationMessage(curBuf) - _, err := message.WriteTo(writer) + _, err := writeContinuationMessage(curBuf).WriteTo(writer) flusher, ok := writer.(http.Flusher) if ok { flusher.Flush() diff --git a/pkg/s3select/select.go b/pkg/s3select/select.go index b4c7398b2..c2f878a3d 100644 --- a/pkg/s3select/select.go +++ b/pkg/s3select/select.go @@ -36,13 +36,12 @@ type SelectFuncs struct { // RunSqlParser allows us to easily bundle all the functions from above and run // them in the appropriate order. -func runSelectParser(f format.Select, myRow chan *Row) { +func runSelectParser(f format.Select, myRow chan Row) { reqCols, alias, myLimit, whereClause, aggFunctionNames, myFuncs, myErr := ParseSelect(f) if myErr != nil { - rowStruct := &Row{ + myRow <- Row{ err: myErr, } - myRow <- rowStruct return } processSelectReq(reqCols, alias, whereClause, myLimit, aggFunctionNames, myRow, myFuncs, f) @@ -52,19 +51,18 @@ func runSelectParser(f format.Select, myRow chan *Row) { // ParseSelect parses the SELECT expression, and effectively tokenizes it into // its separate parts. It returns the requested column names,alias,limit of // records, and the where clause. -func ParseSelect(f format.Select) ([]string, string, int64, interface{}, []string, *SelectFuncs, error) { - // return columnNames, alias, limitOfRecords, whereclause,coalStore, nil - - stmt, err := sqlparser.Parse(cleanExpr(f.Expression())) - // TODO Maybe can parse their errors a bit to return some more of the s3 errors - if err != nil { - return nil, "", 0, nil, nil, nil, ErrLexerInvalidChar - } - +func ParseSelect(f format.Select) ([]string, string, int64, interface{}, []string, SelectFuncs, error) { + var sFuncs = SelectFuncs{} var whereClause interface{} var alias string var limit int64 - myFuncs := &SelectFuncs{} + + stmt, err := sqlparser.Parse(f.Expression()) + // TODO Maybe can parse their errors a bit to return some more of the s3 errors + if err != nil { + return nil, "", 0, nil, nil, sFuncs, ErrLexerInvalidChar + } + switch stmt := stmt.(type) { case *sqlparser.Select: // evaluates the where clause @@ -95,26 +93,26 @@ func ParseSelect(f format.Select) ([]string, string, int64, interface{}, []strin case *sqlparser.StarExpr: columnNames[0] = "*" if smallerexpr.Name.CompliantName() != "count" { - return nil, "", 0, nil, nil, nil, ErrParseUnsupportedCallWithStar + return nil, "", 0, nil, nil, sFuncs, ErrParseUnsupportedCallWithStar } case *sqlparser.AliasedExpr: switch col := tempagg.Expr.(type) { case *sqlparser.BinaryExpr: - return nil, "", 0, nil, nil, nil, ErrParseNonUnaryAgregateFunctionCall + return nil, "", 0, nil, nil, sFuncs, ErrParseNonUnaryAgregateFunctionCall case *sqlparser.ColName: columnNames[i] = col.Name.CompliantName() } } // Case to deal with if COALESCE was used.. } else if supportedFunc(smallerexpr.Name.CompliantName()) { - if myFuncs.funcExpr == nil { - myFuncs.funcExpr = make([]*sqlparser.FuncExpr, len(stmt.SelectExprs)) - myFuncs.index = make([]int, len(stmt.SelectExprs)) + if sFuncs.funcExpr == nil { + sFuncs.funcExpr = make([]*sqlparser.FuncExpr, len(stmt.SelectExprs)) + sFuncs.index = make([]int, len(stmt.SelectExprs)) } - myFuncs.funcExpr[i] = smallerexpr - myFuncs.index[i] = i + sFuncs.funcExpr[i] = smallerexpr + sFuncs.index[i] = i } else { - return nil, "", 0, nil, nil, nil, ErrUnsupportedSQLOperation + return nil, "", 0, nil, nil, sFuncs, ErrUnsupportedSQLOperation } case *sqlparser.ColName: columnNames[i] = smallerexpr.Name.CompliantName() @@ -129,7 +127,7 @@ func ParseSelect(f format.Select) ([]string, string, int64, interface{}, []strin for i := 0; i < len(stmt.From); i++ { switch smallerexpr := stmt.From[i].(type) { case *sqlparser.JoinTableExpr: - return nil, "", 0, nil, nil, nil, ErrParseMalformedJoin + return nil, "", 0, nil, nil, sFuncs, ErrParseMalformedJoin case *sqlparser.AliasedTableExpr: alias = smallerexpr.As.CompliantName() if alias == "" { @@ -147,23 +145,23 @@ func ParseSelect(f format.Select) ([]string, string, int64, interface{}, []strin } } if stmt.GroupBy != nil { - return nil, "", 0, nil, nil, nil, ErrParseUnsupportedLiteralsGroupBy + return nil, "", 0, nil, nil, sFuncs, ErrParseUnsupportedLiteralsGroupBy } if stmt.OrderBy != nil { - return nil, "", 0, nil, nil, nil, ErrParseUnsupportedToken + return nil, "", 0, nil, nil, sFuncs, ErrParseUnsupportedToken } - if err := parseErrs(columnNames, whereClause, alias, myFuncs, f); err != nil { - return nil, "", 0, nil, nil, nil, err + if err := parseErrs(columnNames, whereClause, alias, sFuncs, f); err != nil { + return nil, "", 0, nil, nil, sFuncs, err } - return columnNames, alias, limit, whereClause, functionNames, myFuncs, nil + return columnNames, alias, limit, whereClause, functionNames, sFuncs, nil } - return nil, "", 0, nil, nil, nil, nil + return nil, "", 0, nil, nil, sFuncs, nil } // This is the main function, It goes row by row and for records which validate // the where clause it currently prints the appropriate row given the requested // columns. -func processSelectReq(reqColNames []string, alias string, whereClause interface{}, limitOfRecords int64, functionNames []string, myRow chan *Row, myFunc *SelectFuncs, f format.Select) { +func processSelectReq(reqColNames []string, alias string, whereClause interface{}, limitOfRecords int64, functionNames []string, myRow chan Row, myFunc SelectFuncs, f format.Select) { counter := -1 var columns []string filtrCount := 0 @@ -183,18 +181,16 @@ func processSelectReq(reqColNames []string, alias string, whereClause interface{ for { record, err := f.Read() if err != nil { - rowStruct := &Row{ + myRow <- Row{ err: err, } - myRow <- rowStruct return } if record == nil { if functionFlag { - rowStruct := &Row{ + myRow <- Row{ record: aggFuncToStr(myAggVals, f) + "\n", } - myRow <- rowStruct } close(myRow) return @@ -210,10 +206,9 @@ func processSelectReq(reqColNames []string, alias string, whereClause interface{ myErr = ErrMissingHeaders } if myErr != nil { - rowStruct := &Row{ + myRow <- Row{ err: myErr, } - myRow <- rowStruct return } } else if counter == -1 && len(f.Header()) > 0 { @@ -232,28 +227,26 @@ func processSelectReq(reqColNames []string, alias string, whereClause interface{ // The call to the where function clause,ensures that the rows we print match our where clause. condition, myErr := matchesMyWhereClause(record, alias, whereClause) if myErr != nil { - rowStruct := &Row{ + myRow <- Row{ err: myErr, } - myRow <- rowStruct return } if condition { // if its an asterix we just print everything in the row if reqColNames[0] == "*" && functionNames[0] == "" { - var row *Row + var row Row switch f.Type() { case format.CSV: - row = &Row{ + row = Row{ record: strings.Join(convertToSlice(columnsMap, record, string(out)), f.OutputFieldDelimiter()) + "\n", } case format.JSON: - row = &Row{ + row = Row{ record: string(out) + "\n", } } myRow <- row - } else if alias != "" { // This is for dealing with the case of if we have to deal with a // request for a column with an index e.g A_1. @@ -269,16 +262,14 @@ func processSelectReq(reqColNames []string, alias string, whereClause interface{ // retrieve the correct part of the row. myQueryRow, myErr := processColNameIndex(string(out), reqColNames, columns, f) if myErr != nil { - rowStruct := &Row{ + myRow <- Row{ err: myErr, } - myRow <- rowStruct return } - rowStruct := &Row{ + myRow <- Row{ record: myQueryRow + "\n", } - myRow <- rowStruct } } else { // This code does aggregation if we were provided column names in the @@ -292,16 +283,14 @@ func processSelectReq(reqColNames []string, alias string, whereClause interface{ // names rather than indices. myQueryRow, myErr := processColNameLiteral(string(out), reqColNames, myFunc, f) if myErr != nil { - rowStruct := &Row{ + myRow <- Row{ err: myErr, } - myRow <- rowStruct return } - rowStruct := &Row{ + myRow <- Row{ record: myQueryRow + "\n", } - myRow <- rowStruct } } } @@ -357,7 +346,7 @@ func processColNameIndex(record string, reqColNames []string, columns []string, // processColNameLiteral is the function which creates the row for an name based // query. -func processColNameLiteral(record string, reqColNames []string, myFunc *SelectFuncs, f format.Select) (string, error) { +func processColNameLiteral(record string, reqColNames []string, myFunc SelectFuncs, f format.Select) (string, error) { row := make([]string, len(reqColNames)) for i := 0; i < len(reqColNames); i++ { // this is the case to deal with COALESCE. diff --git a/vendor/github.com/klauspost/readahead/LICENSE b/vendor/github.com/klauspost/readahead/LICENSE new file mode 100644 index 000000000..5cec7ee94 --- /dev/null +++ b/vendor/github.com/klauspost/readahead/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2015 Klaus Post + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/vendor/github.com/klauspost/readahead/README.md b/vendor/github.com/klauspost/readahead/README.md new file mode 100644 index 000000000..e0f7c387e --- /dev/null +++ b/vendor/github.com/klauspost/readahead/README.md @@ -0,0 +1,59 @@ +# readahead +Asynchronous read-ahead for Go readers + +This package will allow you to add readhead to any reader. This means a separate goroutine will perform reads from your upstream reader, so you can request from this reader without delay. + +This is helpful for splitting an input stream into concurrent processing, and also helps smooth out **bursts** of input or output. + +This should be fully transparent, except that once an error has been returned from the Reader, it will not recover. A panic will be caught and returned as an error. + +The readahead object also fulfills the [`io.WriterTo`](https://golang.org/pkg/io/#WriterTo) interface, which is likely to speed up `io.Copy` and other code that use the interface. + +See an introduction: [An Async Read-ahead Package for Go](https://blog.klauspost.com/an-async-read-ahead-package-for-go/) + +[![GoDoc][1]][2] [![Build Status][3]][4] + +[1]: https://godoc.org/github.com/klauspost/readahead?status.svg +[2]: https://godoc.org/github.com/klauspost/readahead +[3]: https://travis-ci.org/klauspost/readahead.svg +[4]: https://travis-ci.org/klauspost/readahead + +# usage + +To get the package use `go get -u github.com/klauspost/readahead`. + +Here is a simple example that does file copy. Error handling has been omitted for brevity. +```Go +input, _ := os.Open("input.txt") +output, _ := os.Create("output.txt") +defer input.Close() +defer output.Close() + +// Create a read-ahead Reader with default settings +ra := readahead.NewReader(input) +defer ra.Close() + +// Copy the content to our output +_, _ = io.Copy(output, ra) +``` + +# settings + +You can finetune the read-ahead for your specific use case, and adjust the number of buffers and the size of each buffer. + +The default the size of each buffer is 1MB, and there are 4 buffers. Do not make your buffers too small since there is a small overhead for passing buffers between goroutines. Other than that you are free to experiment with buffer sizes. + +# contributions + +On this project contributions in terms of new features is limited to: + +* Features that are widely usable and +* Features that have extensive tests + +This package is meant to be simple and stable, so therefore these strict requirements. + +The only feature I have considered is supporting the `io.Seeker` interface. I currently do not plan to add it myself, but if you can show a clean and well-tested way to implementing it, I will consider to merge it. If not, I will be happy to link to it. + +# license + +This package is released under the MIT license. See the supplied LICENSE file for more info. diff --git a/vendor/github.com/klauspost/readahead/reader.go b/vendor/github.com/klauspost/readahead/reader.go new file mode 100644 index 000000000..a77a1f5d1 --- /dev/null +++ b/vendor/github.com/klauspost/readahead/reader.go @@ -0,0 +1,282 @@ +// Copyright (c) 2015 Klaus Post, released under MIT License. See LICENSE file. + +// The readahead package will do asynchronous read-ahead from an input io.Reader +// and make the data available as an io.Reader. +// +// This should be fully transparent, except that once an error +// has been returned from the Reader, it will not recover. +// +// The readahead object also fulfills the io.WriterTo interface, which +// is likely to speed up copies. +// +// Package home: https://github.com/klauspost/readahead +// +package readahead + +import ( + "fmt" + "io" +) + +type reader struct { + in io.Reader // Input reader + closer io.Closer // Optional closer + ready chan *buffer // Buffers ready to be handed to the reader + reuse chan *buffer // Buffers to reuse for input reading + exit chan struct{} // Closes when finished + buffers int // Number of buffers + err error // If an error has occurred it is here + cur *buffer // Current buffer being served + exited chan struct{} // Channel is closed been the async reader shuts down +} + +// New returns a reader that will asynchronously read from +// the supplied reader into 4 buffers of 1MB each. +// +// It will start reading from the input at once, maybe even before this +// function has returned. +// +// The input can be read from the returned reader. +// When done use Close() to release the buffers. +func NewReader(rd io.Reader) io.ReadCloser { + if rd == nil { + return nil + } + + ret, err := NewReaderSize(rd, 4, 1<<20) + + // Should not be possible to trigger from other packages. + if err != nil { + panic("unexpected error:" + err.Error()) + } + return ret +} + +// New returns a reader that will asynchronously read from +// the supplied reader into 4 buffers of 1MB each. +// +// It will start reading from the input at once, maybe even before this +// function has returned. +// +// The input can be read from the returned reader. +// When done use Close() to release the buffers, +// which will also close the supplied closer. +func NewReadCloser(rd io.ReadCloser) io.ReadCloser { + if rd == nil { + return nil + } + + ret, err := NewReadCloserSize(rd, 4, 1<<20) + + // Should not be possible to trigger from other packages. + if err != nil { + panic("unexpected error:" + err.Error()) + } + return ret +} + +// NewReaderSize returns a reader with a custom number of buffers and size. +// buffers is the number of queued buffers and size is the size of each +// buffer in bytes. +func NewReaderSize(rd io.Reader, buffers, size int) (io.ReadCloser, error) { + if size <= 0 { + return nil, fmt.Errorf("buffer size too small") + } + if buffers <= 0 { + return nil, fmt.Errorf("number of buffers too small") + } + if rd == nil { + return nil, fmt.Errorf("nil input reader supplied") + } + a := &reader{} + a.init(rd, buffers, size) + return a, nil +} + +// NewReadCloserSize returns a reader with a custom number of buffers and size. +// buffers is the number of queued buffers and size is the size of each +// buffer in bytes. +func NewReadCloserSize(rc io.ReadCloser, buffers, size int) (io.ReadCloser, error) { + if size <= 0 { + return nil, fmt.Errorf("buffer size too small") + } + if buffers <= 0 { + return nil, fmt.Errorf("number of buffers too small") + } + if rc == nil { + return nil, fmt.Errorf("nil input reader supplied") + } + a := &reader{closer: rc} + a.init(rc, buffers, size) + return a, nil +} + +// initialize the reader +func (a *reader) init(rd io.Reader, buffers, size int) { + a.in = rd + a.ready = make(chan *buffer, buffers) + a.reuse = make(chan *buffer, buffers) + a.exit = make(chan struct{}, 0) + a.exited = make(chan struct{}, 0) + a.buffers = buffers + a.cur = nil + + // Create buffers + for i := 0; i < buffers; i++ { + a.reuse <- newBuffer(size) + } + + // Start async reader + go func() { + // Ensure that when we exit this is signalled. + defer close(a.exited) + for { + select { + case b := <-a.reuse: + err := b.read(a.in) + a.ready <- b + if err != nil { + close(a.ready) + return + } + case <-a.exit: + return + } + } + }() +} + +// fill will check if the current buffer is empty and fill it if it is. +// If an error was returned at the end of the current buffer it is returned. +func (a *reader) fill() (err error) { + if a.cur.isEmpty() { + if a.cur != nil { + a.reuse <- a.cur + a.cur = nil + } + b, ok := <-a.ready + if !ok { + return a.err + } + a.cur = b + } + return nil +} + +// Read will return the next available data. +func (a *reader) Read(p []byte) (n int, err error) { + // Swap buffer and maybe return error + err = a.fill() + if err != nil { + return 0, err + } + + // Copy what we can + n = copy(p, a.cur.buffer()) + a.cur.inc(n) + + // If at end of buffer, return any error, if present + if a.cur.isEmpty() { + a.err = a.cur.err + return n, a.err + } + return n, nil +} + +// WriteTo writes data to w until there's no more data to write or when an error occurs. +// The return value n is the number of bytes written. +// Any error encountered during the write is also returned. +func (a *reader) WriteTo(w io.Writer) (n int64, err error) { + n = 0 + for { + err = a.fill() + if err != nil { + return n, err + } + n2, err := w.Write(a.cur.buffer()) + a.cur.inc(n2) + n += int64(n2) + if err != nil { + return n, err + } + if a.cur.err != nil { + // io.Writer should return nil if we are at EOF. + if a.cur.err == io.EOF { + a.err = a.cur.err + return n, nil + } + a.err = a.cur.err + return n, a.cur.err + } + } +} + +// Close will ensure that the underlying async reader is shut down. +// It will also close the input supplied on newAsyncReader. +func (a *reader) Close() (err error) { + select { + case <-a.exited: + case a.exit <- struct{}{}: + <-a.exited + } + if a.closer != nil { + // Only call once + c := a.closer + a.closer = nil + return c.Close() + } + return nil +} + +// Internal buffer representing a single read. +// If an error is present, it must be returned +// once all buffer content has been served. +type buffer struct { + buf []byte + err error + offset int + size int +} + +func newBuffer(size int) *buffer { + return &buffer{buf: make([]byte, size), err: nil, size: size} +} + +// isEmpty returns true is offset is at end of +// buffer, or if the buffer is nil +func (b *buffer) isEmpty() bool { + if b == nil { + return true + } + if len(b.buf)-b.offset <= 0 { + return true + } + return false +} + +// read into start of the buffer from the supplied reader, +// resets the offset and updates the size of the buffer. +// Any error encountered during the read is returned. +func (b *buffer) read(rd io.Reader) (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic reading: %v", r) + b.err = err + } + }() + var n int + n, b.err = rd.Read(b.buf[0:b.size]) + b.buf = b.buf[0:n] + b.offset = 0 + return b.err +} + +// Return the buffer at current offset +func (b *buffer) buffer() []byte { + return b.buf[b.offset:] +} + +// inc will increment the read offset +func (b *buffer) inc(n int) { + b.offset += n +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 16c7783a6..18941b366 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -562,6 +562,12 @@ "revision": "90b2c57fba35a1dd05cb40f9200722763808d99b", "revisionTime": "2018-06-06T15:09:39Z" }, + { + "checksumSHA1": "/8VtN8HUS0G235mhqfj2gRMi9Eg=", + "path": "github.com/klauspost/readahead", + "revision": "7f90b27d81113b71920c55b7a73a071dc81bdfd8", + "revisionTime": "2017-10-07T12:43:06Z" + }, { "checksumSHA1": "ehsrWipiGIWqa4To8TmelIx06vI=", "path": "github.com/klauspost/reedsolomon",