diff --git a/docs/select/README.md b/docs/select/README.md index bc5504262..6f0f2e41f 100644 --- a/docs/select/README.md +++ b/docs/select/README.md @@ -106,3 +106,4 @@ For a more detailed SELECT SQL reference, please see [here](https://docs.aws.ama - Large numbers (outside of the signed 64-bit range) are not yet supported. - The Date [functions](https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-glacier-select-sql-reference-date.html) `DATE_ADD`, `DATE_DIFF`, `EXTRACT` and `UTCNOW` along with type conversion using `CAST` to the `TIMESTAMP` data type are currently supported. - AWS S3's [reserved keywords](https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-glacier-select-sql-reference-keyword-list.html) list is not yet respected. +- CSV input fields (even quoted) cannot contain newlines even if `RecordDelimiter` is something else. diff --git a/go.mod b/go.mod index dc82d7821..421cf6ab3 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/hashicorp/vault v1.1.0 github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf github.com/json-iterator/go v1.1.7 + github.com/klauspost/compress v1.5.0 github.com/klauspost/pgzip v1.2.1 github.com/klauspost/readahead v1.3.0 github.com/klauspost/reedsolomon v1.9.1 diff --git a/pkg/s3select/csv/reader.go b/pkg/s3select/csv/reader.go index e14f85851..bdf24d922 100644 --- a/pkg/s3select/csv/reader.go +++ b/pkg/s3select/csv/reader.go @@ -17,89 +17,66 @@ package csv import ( + "bufio" "bytes" "encoding/csv" "fmt" "io" + "runtime" + "sync" "github.com/minio/minio/pkg/s3select/sql" ) -type recordReader struct { - reader io.Reader - recordDelimiter []byte - oneByte []byte - useOneByte bool -} - -func (rr *recordReader) Read(p []byte) (n int, err error) { - if rr.useOneByte { - p[0] = rr.oneByte[0] - rr.useOneByte = false - n, err = rr.reader.Read(p[1:]) - n++ - } else { - n, err = rr.reader.Read(p) - } - - if err != nil { - return 0, err - } - - if string(rr.recordDelimiter) == "\n" { - return n, nil - } - - for { - i := bytes.Index(p, rr.recordDelimiter) - if i < 0 { - break - } - - p[i] = '\n' - if len(rr.recordDelimiter) > 1 { - p = append(p[:i+1], p[i+len(rr.recordDelimiter):]...) - n-- - } - } - - if len(rr.recordDelimiter) == 1 || p[n-1] != rr.recordDelimiter[0] { - return n, nil - } - - if _, err = rr.reader.Read(rr.oneByte); err != nil { - return 0, err - } - - if rr.oneByte[0] == rr.recordDelimiter[1] { - p[n-1] = '\n' - return n, nil - } - - rr.useOneByte = true - return n, nil -} - // Reader - CSV record reader for S3Select. type Reader struct { args *ReaderArgs - readCloser io.ReadCloser - csvReader *csv.Reader - columnNames []string - nameIndexMap map[string]int64 + readCloser io.ReadCloser // raw input + buf *bufio.Reader // input to the splitter + columnNames []string // names of columns + nameIndexMap map[string]int64 // name to column index + current [][]string // current block of results to be returned + recordsRead int // number of records read in current slice + input chan *queueItem // input for workers + queue chan *queueItem // output from workers in order + err error // global error state, only touched by Reader.Read + bufferPool sync.Pool // pool of []byte objects for input + csvDstPool sync.Pool // pool of [][]string used for output + close chan struct{} // used for shutting down the splitter before end of stream + readerWg sync.WaitGroup // used to keep track of async reader. +} + +// queueItem is an item in the queue. +type queueItem struct { + input []byte // raw input sent to the worker + dst chan [][]string // result of block decode + err error // any error encountered will be set here } // Read - reads single record. -func (r *Reader) Read() (sql.Record, error) { - csvRecord, err := r.csvReader.Read() - if err != nil { - if err != io.EOF { - return nil, errCSVParsingError(err) +// Once Read is called the previous record should no longer be referenced. +func (r *Reader) Read(dst sql.Record) (sql.Record, error) { + // If we have have any records left, return these before any error. + for len(r.current) <= r.recordsRead { + if r.err != nil { + return nil, r.err } - - return nil, err + // Move to next block + item, ok := <-r.queue + if !ok { + r.err = io.EOF + return nil, r.err + } + //lint:ignore SA6002 Using pointer would allocate more since we would have to copy slice header before taking a pointer. + r.csvDstPool.Put(r.current) + r.current = <-item.dst + r.err = item.err + r.recordsRead = 0 } + csvRecord := r.current[r.recordsRead] + r.recordsRead++ + // If no column names are set, use _(index) if r.columnNames == nil { r.columnNames = make([]string, len(csvRecord)) for i := range csvRecord { @@ -107,67 +84,225 @@ func (r *Reader) Read() (sql.Record, error) { } } + // If no index max, add that. if r.nameIndexMap == nil { r.nameIndexMap = make(map[string]int64) for i := range r.columnNames { r.nameIndexMap[r.columnNames[i]] = int64(i) } } + dstRec, ok := dst.(*Record) + if !ok { + dstRec = &Record{} + } + dstRec.columnNames = r.columnNames + dstRec.csvRecord = csvRecord + dstRec.nameIndexMap = r.nameIndexMap - return &Record{ - columnNames: r.columnNames, - csvRecord: csvRecord, - nameIndexMap: r.nameIndexMap, - }, nil + return dstRec, nil } -// Close - closes underlaying reader. +// Close - closes underlying reader. func (r *Reader) Close() error { + if r.close != nil { + close(r.close) + r.close = nil + r.readerWg.Wait() + } + r.recordsRead = len(r.current) + if r.err == nil { + r.err = io.EOF + } return r.readCloser.Close() } +// nextSplit will attempt to skip a number of bytes and +// return the buffer until the next newline occurs. +// The last block will be sent along with an io.EOF. +func (r *Reader) nextSplit(skip int, dst []byte) ([]byte, error) { + if cap(dst) < skip { + dst = make([]byte, 0, skip+1024) + } + dst = dst[:skip] + if skip > 0 { + n, err := io.ReadFull(r.buf, dst) + if err != nil && err != io.ErrUnexpectedEOF { + // If an EOF happens after reading some but not all the bytes, + // ReadFull returns ErrUnexpectedEOF. + return dst[:n], err + } + dst = dst[:n] + if err == io.ErrUnexpectedEOF { + return dst, io.EOF + } + } + // Read until next line. + in, err := r.buf.ReadBytes('\n') + dst = append(dst, in...) + return dst, err +} + +// csvSplitSize is the size of each block. +// Blocks will read this much and find the first following newline. +// 128KB appears to be a very reasonable default. +const csvSplitSize = 128 << 10 + +// startReaders will read the header if needed and spin up a parser +// and a number of workers based on GOMAXPROCS. +// If an error is returned no goroutines have been started and r.err will have been set. +func (r *Reader) startReaders(in io.Reader, newReader func(io.Reader) *csv.Reader) error { + if r.args.FileHeaderInfo != none { + // Read column names + // Get one line. + b, err := r.nextSplit(0, nil) + if err != nil { + r.err = err + return err + } + reader := newReader(bytes.NewReader(b)) + record, err := reader.Read() + if err != nil { + r.err = err + if err != io.EOF { + r.err = errCSVParsingError(err) + return errCSVParsingError(err) + } + return err + } + + if r.args.FileHeaderInfo == use { + // Copy column names since records will be reused. + columns := append(make([]string, 0, len(record)), record...) + r.columnNames = columns + } + } + + r.bufferPool.New = func() interface{} { + return make([]byte, csvSplitSize+1024) + } + + // Create queue + r.queue = make(chan *queueItem, runtime.GOMAXPROCS(0)) + r.input = make(chan *queueItem, runtime.GOMAXPROCS(0)) + r.readerWg.Add(1) + + // Start splitter + go func() { + defer close(r.input) + defer close(r.queue) + defer r.readerWg.Done() + for { + next, err := r.nextSplit(csvSplitSize, r.bufferPool.Get().([]byte)) + q := queueItem{ + input: next, + dst: make(chan [][]string, 1), + err: err, + } + select { + case <-r.close: + return + case r.queue <- &q: + } + + select { + case <-r.close: + return + case r.input <- &q: + } + if err != nil { + // Exit on any error. + return + } + } + }() + + // Start parsers + for i := 0; i < runtime.GOMAXPROCS(0); i++ { + go func() { + for in := range r.input { + if len(in.input) == 0 { + in.dst <- nil + continue + } + dst, ok := r.csvDstPool.Get().([][]string) + if !ok { + dst = make([][]string, 0, 1000) + } + + cr := newReader(bytes.NewBuffer(in.input)) + all := dst[:0] + err := func() error { + // Read all records until EOF or another error. + for { + record, err := cr.Read() + if err == io.EOF { + return nil + } + if err != nil { + return errCSVParsingError(err) + } + var recDst []string + if len(dst) > len(all) { + recDst = dst[len(all)] + } + if cap(recDst) < len(record) { + recDst = make([]string, len(record)) + } + recDst = recDst[:len(record)] + copy(recDst, record) + all = append(all, recDst) + } + }() + if err != nil { + in.err = err + } + // We don't need the input any more. + //lint:ignore SA6002 Using pointer would allocate more since we would have to copy slice header before taking a pointer. + r.bufferPool.Put(in.input) + in.input = nil + in.dst <- all + } + }() + } + return nil + +} + // NewReader - creates new CSV reader using readCloser. func NewReader(readCloser io.ReadCloser, args *ReaderArgs) (*Reader, error) { if args == nil || args.IsEmpty() { panic(fmt.Errorf("empty args passed %v", args)) } - - csvReader := csv.NewReader(&recordReader{ - reader: readCloser, - recordDelimiter: []byte(args.RecordDelimiter), - oneByte: []byte{0}, - }) - csvReader.Comma = []rune(args.FieldDelimiter)[0] - csvReader.Comment = []rune(args.CommentCharacter)[0] - csvReader.FieldsPerRecord = -1 - // If LazyQuotes is true, a quote may appear in an unquoted field and a - // non-doubled quote may appear in a quoted field. - csvReader.LazyQuotes = true - // We do not trim leading space to keep consistent with s3. - csvReader.TrimLeadingSpace = false + csvIn := io.Reader(readCloser) + if args.RecordDelimiter != "\n" { + csvIn = &recordTransform{ + reader: readCloser, + recordDelimiter: []byte(args.RecordDelimiter), + oneByte: make([]byte, len(args.RecordDelimiter)-1), + } + } r := &Reader{ args: args, + buf: bufio.NewReaderSize(csvIn, csvSplitSize*2), readCloser: readCloser, - csvReader: csvReader, + close: make(chan struct{}), } - if args.FileHeaderInfo == none { - return r, nil + // Assume args are validated by ReaderArgs.UnmarshalXML() + newCsvReader := func(r io.Reader) *csv.Reader { + ret := csv.NewReader(r) + ret.Comma = []rune(args.FieldDelimiter)[0] + ret.Comment = []rune(args.CommentCharacter)[0] + ret.FieldsPerRecord = -1 + // If LazyQuotes is true, a quote may appear in an unquoted field and a + // non-doubled quote may appear in a quoted field. + ret.LazyQuotes = true + // We do not trim leading space to keep consistent with s3. + ret.TrimLeadingSpace = false + ret.ReuseRecord = true + return ret } - record, err := csvReader.Read() - if err != nil { - if err != io.EOF { - return nil, errCSVParsingError(err) - } - - return nil, err - } - - if args.FileHeaderInfo == use { - r.columnNames = record - } - - return r, nil + return r, r.startReaders(csvIn, newCsvReader) } diff --git a/pkg/s3select/csv/reader_test.go b/pkg/s3select/csv/reader_test.go index 81853e42f..81a39a5f9 100644 --- a/pkg/s3select/csv/reader_test.go +++ b/pkg/s3select/csv/reader_test.go @@ -18,11 +18,16 @@ package csv import ( "bytes" + "errors" + "fmt" "io" "io/ioutil" + "path/filepath" + "reflect" "strings" "testing" + "github.com/klauspost/compress/zip" "github.com/minio/minio/pkg/s3select/sql" ) @@ -49,12 +54,12 @@ func TestRead(t *testing.T) { QuoteCharacter: defaultQuoteCharacter, QuoteEscapeCharacter: defaultQuoteEscapeCharacter, CommentCharacter: defaultCommentCharacter, - AllowQuotedRecordDelimiter: true, + AllowQuotedRecordDelimiter: false, unmarshaled: true, }) for { - record, err = r.Read() + record, err = r.Read(record) if err != nil { break } @@ -72,3 +77,555 @@ func TestRead(t *testing.T) { } } } + +type tester interface { + Fatal(...interface{}) +} + +func openTestFile(t tester, file string) []byte { + f, err := ioutil.ReadFile(filepath.Join("testdata/testdata.zip")) + if err != nil { + t.Fatal(err) + } + z, err := zip.NewReader(bytes.NewReader(f), int64(len(f))) + if err != nil { + t.Fatal(err) + } + for _, f := range z.File { + if f.Name == file { + rc, err := f.Open() + if err != nil { + t.Fatal(err) + } + defer rc.Close() + b, err := ioutil.ReadAll(rc) + if err != nil { + t.Fatal(err) + } + return b + } + } + t.Fatal(file, "not found in testdata/testdata.zip") + return nil +} + +func TestReadExtended(t *testing.T) { + cases := []struct { + file string + recordDelimiter string + fieldDelimiter string + header bool + wantColumns []string + wantTenFields string + totalFields int + }{ + { + file: "nyc-taxi-data-100k.csv", + recordDelimiter: "\n", + fieldDelimiter: ",", + header: true, + wantColumns: []string{"trip_id", "vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code_id", "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude", "passenger_count", "trip_distance", "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount", "ehail_fee", "improvement_surcharge", "total_amount", "payment_type", "trip_type", "pickup", "dropoff", "cab_type", "precipitation", "snow_depth", "snowfall", "max_temp", "min_temp", "wind", "pickup_nyct2010_gid", "pickup_ctlabel", "pickup_borocode", "pickup_boroname", "pickup_ct2010", "pickup_boroct2010", "pickup_cdeligibil", "pickup_ntacode", "pickup_ntaname", "pickup_puma", "dropoff_nyct2010_gid", "dropoff_ctlabel", "dropoff_borocode", "dropoff_boroname", "dropoff_ct2010", "dropoff_boroct2010", "dropoff_cdeligibil", "dropoff_ntacode", "dropoff_ntaname", "dropoff_puma"}, + wantTenFields: `3389224,2,2014-03-26 00:26:15,2014-03-26 00:28:38,N,1,-73.950431823730469,40.792251586914063,-73.938949584960937,40.794425964355469,1,0.84,4.5,0.5,0.5,1,0,,,6.5,1,1,75,74,green,0.00,0.0,0.0,36,24,11.86,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1828,180,1,Manhattan,018000,1018000,E,MN34,East Harlem North,3804 +3389225,2,2014-03-31 09:42:15,2014-03-31 10:01:17,N,1,-73.950340270996094,40.792228698730469,-73.941970825195313,40.842235565185547,1,4.47,17.5,0,0.5,0,0,,,18,2,1,75,244,green,0.16,0.0,0.0,56,36,8.28,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,911,251,1,Manhattan,025100,1025100,E,MN36,Washington Heights South,3801 +3389226,2,2014-03-26 17:13:28,2014-03-26 17:19:07,N,1,-73.949493408203125,40.793506622314453,-73.943374633789063,40.786155700683594,1,0.82,5.5,1,0.5,0,0,,,7,1,1,75,75,green,0.00,0.0,0.0,36,24,11.86,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1387,164,1,Manhattan,016400,1016400,E,MN33,East Harlem South,3804 +3389227,2,2014-03-14 21:07:19,2014-03-14 21:11:41,N,1,-73.950538635253906,40.792228698730469,-73.940811157226563,40.809253692626953,1,1.40,6,0.5,0.5,0,0,,,7,2,1,75,42,green,0.00,0.0,0.0,46,22,5.59,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1184,208,1,Manhattan,020800,1020800,E,MN03,Central Harlem North-Polo Grounds,3803 +3389228,1,2014-03-28 13:52:56,2014-03-28 14:29:01,N,1,-73.950569152832031,40.792312622070313,-73.868507385253906,40.688491821289063,2,16.10,46,0,0.5,0,5.33,,,51.83,2,,75,63,green,0.04,0.0,0.0,62,37,5.37,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1544,1182.02,3,Brooklyn,118202,3118202,E,BK83,Cypress Hills-City Line,4008 +3389229,2,2014-03-07 09:46:32,2014-03-07 09:55:01,N,1,-73.952301025390625,40.789798736572266,-73.935806274414062,40.794448852539063,1,1.67,8,0,0.5,2,0,,,10.5,1,1,75,74,green,0.00,3.9,0.0,37,26,7.83,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1553,178,1,Manhattan,017800,1017800,E,MN34,East Harlem North,3804 +3389230,2,2014-03-17 18:23:05,2014-03-17 18:28:38,N,1,-73.952346801757813,40.789844512939453,-73.946319580078125,40.783851623535156,5,0.95,5.5,1,0.5,0.65,0,,,7.65,1,1,75,263,green,0.00,0.0,0.0,35,23,8.05,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,32,156.01,1,Manhattan,015601,1015601,I,MN32,Yorkville,3805 +3389231,1,2014-03-19 19:09:36,2014-03-19 19:12:20,N,1,-73.952377319335938,40.789779663085938,-73.947494506835938,40.796474456787109,1,0.50,4,1,0.5,1,0,,,6.5,1,,75,75,green,0.92,0.0,0.0,46,32,7.16,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1401,174.02,1,Manhattan,017402,1017402,E,MN33,East Harlem South,3804 +3389232,2,2014-03-20 19:06:28,2014-03-20 19:21:35,N,1,-73.952583312988281,40.789516448974609,-73.985870361328125,40.776973724365234,2,3.04,13,1,0.5,2.8,0,,,17.3,1,1,75,143,green,0.00,0.0,0.0,54,40,8.05,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1742,155,1,Manhattan,015500,1015500,I,MN14,Lincoln Square,3806 +3389233,2,2014-03-29 09:38:12,2014-03-29 09:44:16,N,1,-73.952728271484375,40.789501190185547,-73.950935363769531,40.775600433349609,1,1.10,6.5,0,0.5,1.3,0,,,8.3,1,1,75,263,green,1.81,0.0,0.0,59,43,10.74,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,2048,138,1,Manhattan,013800,1013800,I,MN32,Yorkville,3805 +`, + totalFields: 308*2 + 1, + }, { + file: "nyc-taxi-data-tabs-100k.csv", + recordDelimiter: "\n", + fieldDelimiter: "\t", + header: true, + wantColumns: []string{"trip_id", "vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code_id", "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude", "passenger_count", "trip_distance", "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount", "ehail_fee", "improvement_surcharge", "total_amount", "payment_type", "trip_type", "pickup", "dropoff", "cab_type", "precipitation", "snow_depth", "snowfall", "max_temp", "min_temp", "wind", "pickup_nyct2010_gid", "pickup_ctlabel", "pickup_borocode", "pickup_boroname", "pickup_ct2010", "pickup_boroct2010", "pickup_cdeligibil", "pickup_ntacode", "pickup_ntaname", "pickup_puma", "dropoff_nyct2010_gid", "dropoff_ctlabel", "dropoff_borocode", "dropoff_boroname", "dropoff_ct2010", "dropoff_boroct2010", "dropoff_cdeligibil", "dropoff_ntacode", "dropoff_ntaname", "dropoff_puma"}, + wantTenFields: `3389224,2,2014-03-26 00:26:15,2014-03-26 00:28:38,N,1,-73.950431823730469,40.792251586914063,-73.938949584960937,40.794425964355469,1,0.84,4.5,0.5,0.5,1,0,,,6.5,1,1,75,74,green,0.00,0.0,0.0,36,24,11.86,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1828,180,1,Manhattan,018000,1018000,E,MN34,East Harlem North,3804 +3389225,2,2014-03-31 09:42:15,2014-03-31 10:01:17,N,1,-73.950340270996094,40.792228698730469,-73.941970825195313,40.842235565185547,1,4.47,17.5,0,0.5,0,0,,,18,2,1,75,244,green,0.16,0.0,0.0,56,36,8.28,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,911,251,1,Manhattan,025100,1025100,E,MN36,Washington Heights South,3801 +3389226,2,2014-03-26 17:13:28,2014-03-26 17:19:07,N,1,-73.949493408203125,40.793506622314453,-73.943374633789063,40.786155700683594,1,0.82,5.5,1,0.5,0,0,,,7,1,1,75,75,green,0.00,0.0,0.0,36,24,11.86,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1387,164,1,Manhattan,016400,1016400,E,MN33,East Harlem South,3804 +3389227,2,2014-03-14 21:07:19,2014-03-14 21:11:41,N,1,-73.950538635253906,40.792228698730469,-73.940811157226563,40.809253692626953,1,1.40,6,0.5,0.5,0,0,,,7,2,1,75,42,green,0.00,0.0,0.0,46,22,5.59,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1184,208,1,Manhattan,020800,1020800,E,MN03,Central Harlem North-Polo Grounds,3803 +3389228,1,2014-03-28 13:52:56,2014-03-28 14:29:01,N,1,-73.950569152832031,40.792312622070313,-73.868507385253906,40.688491821289063,2,16.10,46,0,0.5,0,5.33,,,51.83,2,,75,63,green,0.04,0.0,0.0,62,37,5.37,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1544,1182.02,3,Brooklyn,118202,3118202,E,BK83,Cypress Hills-City Line,4008 +3389229,2,2014-03-07 09:46:32,2014-03-07 09:55:01,N,1,-73.952301025390625,40.789798736572266,-73.935806274414062,40.794448852539063,1,1.67,8,0,0.5,2,0,,,10.5,1,1,75,74,green,0.00,3.9,0.0,37,26,7.83,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1553,178,1,Manhattan,017800,1017800,E,MN34,East Harlem North,3804 +3389230,2,2014-03-17 18:23:05,2014-03-17 18:28:38,N,1,-73.952346801757813,40.789844512939453,-73.946319580078125,40.783851623535156,5,0.95,5.5,1,0.5,0.65,0,,,7.65,1,1,75,263,green,0.00,0.0,0.0,35,23,8.05,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,32,156.01,1,Manhattan,015601,1015601,I,MN32,Yorkville,3805 +3389231,1,2014-03-19 19:09:36,2014-03-19 19:12:20,N,1,-73.952377319335938,40.789779663085938,-73.947494506835938,40.796474456787109,1,0.50,4,1,0.5,1,0,,,6.5,1,,75,75,green,0.92,0.0,0.0,46,32,7.16,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1401,174.02,1,Manhattan,017402,1017402,E,MN33,East Harlem South,3804 +3389232,2,2014-03-20 19:06:28,2014-03-20 19:21:35,N,1,-73.952583312988281,40.789516448974609,-73.985870361328125,40.776973724365234,2,3.04,13,1,0.5,2.8,0,,,17.3,1,1,75,143,green,0.00,0.0,0.0,54,40,8.05,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1742,155,1,Manhattan,015500,1015500,I,MN14,Lincoln Square,3806 +3389233,2,2014-03-29 09:38:12,2014-03-29 09:44:16,N,1,-73.952728271484375,40.789501190185547,-73.950935363769531,40.775600433349609,1,1.10,6.5,0,0.5,1.3,0,,,8.3,1,1,75,263,green,1.81,0.0,0.0,59,43,10.74,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,2048,138,1,Manhattan,013800,1013800,I,MN32,Yorkville,3805 +`, + totalFields: 308*2 + 1, + }, { + file: "nyc-taxi-data-100k-single-delim.csv", + recordDelimiter: "^", + fieldDelimiter: ",", + header: true, + wantColumns: []string{"trip_id", "vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code_id", "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude", "passenger_count", "trip_distance", "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount", "ehail_fee", "improvement_surcharge", "total_amount", "payment_type", "trip_type", "pickup", "dropoff", "cab_type", "precipitation", "snow_depth", "snowfall", "max_temp", "min_temp", "wind", "pickup_nyct2010_gid", "pickup_ctlabel", "pickup_borocode", "pickup_boroname", "pickup_ct2010", "pickup_boroct2010", "pickup_cdeligibil", "pickup_ntacode", "pickup_ntaname", "pickup_puma", "dropoff_nyct2010_gid", "dropoff_ctlabel", "dropoff_borocode", "dropoff_boroname", "dropoff_ct2010", "dropoff_boroct2010", "dropoff_cdeligibil", "dropoff_ntacode", "dropoff_ntaname", "dropoff_puma"}, + wantTenFields: `3389224,2,2014-03-26 00:26:15,2014-03-26 00:28:38,N,1,-73.950431823730469,40.792251586914063,-73.938949584960937,40.794425964355469,1,0.84,4.5,0.5,0.5,1,0,,,6.5,1,1,75,74,green,0.00,0.0,0.0,36,24,11.86,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1828,180,1,Manhattan,018000,1018000,E,MN34,East Harlem North,3804 +3389225,2,2014-03-31 09:42:15,2014-03-31 10:01:17,N,1,-73.950340270996094,40.792228698730469,-73.941970825195313,40.842235565185547,1,4.47,17.5,0,0.5,0,0,,,18,2,1,75,244,green,0.16,0.0,0.0,56,36,8.28,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,911,251,1,Manhattan,025100,1025100,E,MN36,Washington Heights South,3801 +3389226,2,2014-03-26 17:13:28,2014-03-26 17:19:07,N,1,-73.949493408203125,40.793506622314453,-73.943374633789063,40.786155700683594,1,0.82,5.5,1,0.5,0,0,,,7,1,1,75,75,green,0.00,0.0,0.0,36,24,11.86,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1387,164,1,Manhattan,016400,1016400,E,MN33,East Harlem South,3804 +3389227,2,2014-03-14 21:07:19,2014-03-14 21:11:41,N,1,-73.950538635253906,40.792228698730469,-73.940811157226563,40.809253692626953,1,1.40,6,0.5,0.5,0,0,,,7,2,1,75,42,green,0.00,0.0,0.0,46,22,5.59,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1184,208,1,Manhattan,020800,1020800,E,MN03,Central Harlem North-Polo Grounds,3803 +3389228,1,2014-03-28 13:52:56,2014-03-28 14:29:01,N,1,-73.950569152832031,40.792312622070313,-73.868507385253906,40.688491821289063,2,16.10,46,0,0.5,0,5.33,,,51.83,2,,75,63,green,0.04,0.0,0.0,62,37,5.37,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1544,1182.02,3,Brooklyn,118202,3118202,E,BK83,Cypress Hills-City Line,4008 +3389229,2,2014-03-07 09:46:32,2014-03-07 09:55:01,N,1,-73.952301025390625,40.789798736572266,-73.935806274414062,40.794448852539063,1,1.67,8,0,0.5,2,0,,,10.5,1,1,75,74,green,0.00,3.9,0.0,37,26,7.83,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1553,178,1,Manhattan,017800,1017800,E,MN34,East Harlem North,3804 +3389230,2,2014-03-17 18:23:05,2014-03-17 18:28:38,N,1,-73.952346801757813,40.789844512939453,-73.946319580078125,40.783851623535156,5,0.95,5.5,1,0.5,0.65,0,,,7.65,1,1,75,263,green,0.00,0.0,0.0,35,23,8.05,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,32,156.01,1,Manhattan,015601,1015601,I,MN32,Yorkville,3805 +3389231,1,2014-03-19 19:09:36,2014-03-19 19:12:20,N,1,-73.952377319335938,40.789779663085938,-73.947494506835938,40.796474456787109,1,0.50,4,1,0.5,1,0,,,6.5,1,,75,75,green,0.92,0.0,0.0,46,32,7.16,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1401,174.02,1,Manhattan,017402,1017402,E,MN33,East Harlem South,3804 +3389232,2,2014-03-20 19:06:28,2014-03-20 19:21:35,N,1,-73.952583312988281,40.789516448974609,-73.985870361328125,40.776973724365234,2,3.04,13,1,0.5,2.8,0,,,17.3,1,1,75,143,green,0.00,0.0,0.0,54,40,8.05,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1742,155,1,Manhattan,015500,1015500,I,MN14,Lincoln Square,3806 +3389233,2,2014-03-29 09:38:12,2014-03-29 09:44:16,N,1,-73.952728271484375,40.789501190185547,-73.950935363769531,40.775600433349609,1,1.10,6.5,0,0.5,1.3,0,,,8.3,1,1,75,263,green,1.81,0.0,0.0,59,43,10.74,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,2048,138,1,Manhattan,013800,1013800,I,MN32,Yorkville,3805 +`, + totalFields: 308*2 + 1, + }, { + file: "nyc-taxi-data-100k-multi-delim.csv", + recordDelimiter: "^Y", + fieldDelimiter: ",", + header: true, + wantColumns: []string{"trip_id", "vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code_id", "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude", "passenger_count", "trip_distance", "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount", "ehail_fee", "improvement_surcharge", "total_amount", "payment_type", "trip_type", "pickup", "dropoff", "cab_type", "precipitation", "snow_depth", "snowfall", "max_temp", "min_temp", "wind", "pickup_nyct2010_gid", "pickup_ctlabel", "pickup_borocode", "pickup_boroname", "pickup_ct2010", "pickup_boroct2010", "pickup_cdeligibil", "pickup_ntacode", "pickup_ntaname", "pickup_puma", "dropoff_nyct2010_gid", "dropoff_ctlabel", "dropoff_borocode", "dropoff_boroname", "dropoff_ct2010", "dropoff_boroct2010", "dropoff_cdeligibil", "dropoff_ntacode", "dropoff_ntaname", "dropoff_puma"}, + wantTenFields: `3389224,2,2014-03-26 00:26:15,2014-03-26 00:28:38,N,1,-73.950431823730469,40.792251586914063,-73.938949584960937,40.794425964355469,1,0.84,4.5,0.5,0.5,1,0,,,6.5,1,1,75,74,green,0.00,0.0,0.0,36,24,11.86,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1828,180,1,Manhattan,018000,1018000,E,MN34,East Harlem North,3804 +3389225,2,2014-03-31 09:42:15,2014-03-31 10:01:17,N,1,-73.950340270996094,40.792228698730469,-73.941970825195313,40.842235565185547,1,4.47,17.5,0,0.5,0,0,,,18,2,1,75,244,green,0.16,0.0,0.0,56,36,8.28,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,911,251,1,Manhattan,025100,1025100,E,MN36,Washington Heights South,3801 +3389226,2,2014-03-26 17:13:28,2014-03-26 17:19:07,N,1,-73.949493408203125,40.793506622314453,-73.943374633789063,40.786155700683594,1,0.82,5.5,1,0.5,0,0,,,7,1,1,75,75,green,0.00,0.0,0.0,36,24,11.86,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1387,164,1,Manhattan,016400,1016400,E,MN33,East Harlem South,3804 +3389227,2,2014-03-14 21:07:19,2014-03-14 21:11:41,N,1,-73.950538635253906,40.792228698730469,-73.940811157226563,40.809253692626953,1,1.40,6,0.5,0.5,0,0,,,7,2,1,75,42,green,0.00,0.0,0.0,46,22,5.59,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1184,208,1,Manhattan,020800,1020800,E,MN03,Central Harlem North-Polo Grounds,3803 +3389228,1,2014-03-28 13:52:56,2014-03-28 14:29:01,N,1,-73.950569152832031,40.792312622070313,-73.868507385253906,40.688491821289063,2,16.10,46,0,0.5,0,5.33,,,51.83,2,,75,63,green,0.04,0.0,0.0,62,37,5.37,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1544,1182.02,3,Brooklyn,118202,3118202,E,BK83,Cypress Hills-City Line,4008 +3389229,2,2014-03-07 09:46:32,2014-03-07 09:55:01,N,1,-73.952301025390625,40.789798736572266,-73.935806274414062,40.794448852539063,1,1.67,8,0,0.5,2,0,,,10.5,1,1,75,74,green,0.00,3.9,0.0,37,26,7.83,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1553,178,1,Manhattan,017800,1017800,E,MN34,East Harlem North,3804 +3389230,2,2014-03-17 18:23:05,2014-03-17 18:28:38,N,1,-73.952346801757813,40.789844512939453,-73.946319580078125,40.783851623535156,5,0.95,5.5,1,0.5,0.65,0,,,7.65,1,1,75,263,green,0.00,0.0,0.0,35,23,8.05,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,32,156.01,1,Manhattan,015601,1015601,I,MN32,Yorkville,3805 +3389231,1,2014-03-19 19:09:36,2014-03-19 19:12:20,N,1,-73.952377319335938,40.789779663085938,-73.947494506835938,40.796474456787109,1,0.50,4,1,0.5,1,0,,,6.5,1,,75,75,green,0.92,0.0,0.0,46,32,7.16,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1401,174.02,1,Manhattan,017402,1017402,E,MN33,East Harlem South,3804 +3389232,2,2014-03-20 19:06:28,2014-03-20 19:21:35,N,1,-73.952583312988281,40.789516448974609,-73.985870361328125,40.776973724365234,2,3.04,13,1,0.5,2.8,0,,,17.3,1,1,75,143,green,0.00,0.0,0.0,54,40,8.05,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1742,155,1,Manhattan,015500,1015500,I,MN14,Lincoln Square,3806 +3389233,2,2014-03-29 09:38:12,2014-03-29 09:44:16,N,1,-73.952728271484375,40.789501190185547,-73.950935363769531,40.775600433349609,1,1.10,6.5,0,0.5,1.3,0,,,8.3,1,1,75,263,green,1.81,0.0,0.0,59,43,10.74,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,2048,138,1,Manhattan,013800,1013800,I,MN32,Yorkville,3805 +`, + totalFields: 308*2 + 1, + }, { + file: "nyc-taxi-data-noheader-100k.csv", + recordDelimiter: "\n", + fieldDelimiter: ",", + header: false, + wantColumns: []string{"_1", "_2", "_3", "_4", "_5", "_6", "_7", "_8", "_9", "_10", "_11", "_12", "_13", "_14", "_15", "_16", "_17", "_18", "_19", "_20", "_21", "_22", "_23", "_24", "_25", "_26", "_27", "_28", "_29", "_30", "_31", "_32", "_33", "_34", "_35", "_36", "_37", "_38", "_39", "_40", "_41", "_42", "_43", "_44", "_45", "_46", "_47", "_48", "_49", "_50", "_51"}, + wantTenFields: `3389224,2,2014-03-26 00:26:15,2014-03-26 00:28:38,N,1,-73.950431823730469,40.792251586914063,-73.938949584960937,40.794425964355469,1,0.84,4.5,0.5,0.5,1,0,,,6.5,1,1,75,74,green,0.00,0.0,0.0,36,24,11.86,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1828,180,1,Manhattan,018000,1018000,E,MN34,East Harlem North,3804 +3389225,2,2014-03-31 09:42:15,2014-03-31 10:01:17,N,1,-73.950340270996094,40.792228698730469,-73.941970825195313,40.842235565185547,1,4.47,17.5,0,0.5,0,0,,,18,2,1,75,244,green,0.16,0.0,0.0,56,36,8.28,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,911,251,1,Manhattan,025100,1025100,E,MN36,Washington Heights South,3801 +3389226,2,2014-03-26 17:13:28,2014-03-26 17:19:07,N,1,-73.949493408203125,40.793506622314453,-73.943374633789063,40.786155700683594,1,0.82,5.5,1,0.5,0,0,,,7,1,1,75,75,green,0.00,0.0,0.0,36,24,11.86,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1387,164,1,Manhattan,016400,1016400,E,MN33,East Harlem South,3804 +3389227,2,2014-03-14 21:07:19,2014-03-14 21:11:41,N,1,-73.950538635253906,40.792228698730469,-73.940811157226563,40.809253692626953,1,1.40,6,0.5,0.5,0,0,,,7,2,1,75,42,green,0.00,0.0,0.0,46,22,5.59,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1184,208,1,Manhattan,020800,1020800,E,MN03,Central Harlem North-Polo Grounds,3803 +3389228,1,2014-03-28 13:52:56,2014-03-28 14:29:01,N,1,-73.950569152832031,40.792312622070313,-73.868507385253906,40.688491821289063,2,16.10,46,0,0.5,0,5.33,,,51.83,2,,75,63,green,0.04,0.0,0.0,62,37,5.37,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1544,1182.02,3,Brooklyn,118202,3118202,E,BK83,Cypress Hills-City Line,4008 +3389229,2,2014-03-07 09:46:32,2014-03-07 09:55:01,N,1,-73.952301025390625,40.789798736572266,-73.935806274414062,40.794448852539063,1,1.67,8,0,0.5,2,0,,,10.5,1,1,75,74,green,0.00,3.9,0.0,37,26,7.83,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1553,178,1,Manhattan,017800,1017800,E,MN34,East Harlem North,3804 +3389230,2,2014-03-17 18:23:05,2014-03-17 18:28:38,N,1,-73.952346801757813,40.789844512939453,-73.946319580078125,40.783851623535156,5,0.95,5.5,1,0.5,0.65,0,,,7.65,1,1,75,263,green,0.00,0.0,0.0,35,23,8.05,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,32,156.01,1,Manhattan,015601,1015601,I,MN32,Yorkville,3805 +3389231,1,2014-03-19 19:09:36,2014-03-19 19:12:20,N,1,-73.952377319335938,40.789779663085938,-73.947494506835938,40.796474456787109,1,0.50,4,1,0.5,1,0,,,6.5,1,,75,75,green,0.92,0.0,0.0,46,32,7.16,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1401,174.02,1,Manhattan,017402,1017402,E,MN33,East Harlem South,3804 +3389232,2,2014-03-20 19:06:28,2014-03-20 19:21:35,N,1,-73.952583312988281,40.789516448974609,-73.985870361328125,40.776973724365234,2,3.04,13,1,0.5,2.8,0,,,17.3,1,1,75,143,green,0.00,0.0,0.0,54,40,8.05,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1742,155,1,Manhattan,015500,1015500,I,MN14,Lincoln Square,3806 +3389233,2,2014-03-29 09:38:12,2014-03-29 09:44:16,N,1,-73.952728271484375,40.789501190185547,-73.950935363769531,40.775600433349609,1,1.10,6.5,0,0.5,1.3,0,,,8.3,1,1,75,263,green,1.81,0.0,0.0,59,43,10.74,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,2048,138,1,Manhattan,013800,1013800,I,MN32,Yorkville,3805 +`, + totalFields: 308 * 2, + }, + } + + for i, c := range cases { + t.Run(c.file, func(t *testing.T) { + + var err error + var record sql.Record + var result bytes.Buffer + input := openTestFile(t, c.file) + // Get above block size. + input = append(input, input...) + args := ReaderArgs{ + FileHeaderInfo: use, + RecordDelimiter: c.recordDelimiter, + FieldDelimiter: c.fieldDelimiter, + QuoteCharacter: defaultQuoteCharacter, + QuoteEscapeCharacter: defaultQuoteEscapeCharacter, + CommentCharacter: defaultCommentCharacter, + AllowQuotedRecordDelimiter: false, + unmarshaled: true, + } + if !c.header { + args.FileHeaderInfo = none + } + r, _ := NewReader(ioutil.NopCloser(bytes.NewReader(input)), &args) + fields := 0 + for { + record, err = r.Read(record) + if err != nil { + break + } + if fields < 10 { + // Write with fixed delimiters, newlines. + err := record.WriteCSV(&result, ',') + if err != nil { + t.Error(err) + } + } + fields++ + } + r.Close() + if err != io.EOF { + t.Fatalf("Case %d failed with %s", i, err) + } + if !reflect.DeepEqual(r.columnNames, c.wantColumns) { + t.Errorf("Case %d failed: expected %#v, got result %#v", i, c.wantColumns, r.columnNames) + } + if result.String() != c.wantTenFields { + t.Errorf("Case %d failed: expected %v, got result %v", i, c.wantTenFields, result.String()) + } + if fields != c.totalFields { + t.Errorf("Case %d failed: expected %v results %v", i, c.totalFields, fields) + } + }) + } +} + +type errReader struct { + err error +} + +func (e errReader) Read(p []byte) (n int, err error) { + return 0, e.err +} + +func TestReadFailures(t *testing.T) { + customErr := errors.New("unable to read file :(") + cases := []struct { + file string + recordDelimiter string + fieldDelimiter string + sendErr error + header bool + wantColumns []string + wantFields string + wantErr error + }{ + { + file: "truncated-records.csv", + recordDelimiter: "^Y", + fieldDelimiter: ",", + header: true, + wantColumns: []string{"trip_id", "vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code_id", "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude", "passenger_count", "trip_distance", "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount", "ehail_fee", "improvement_surcharge", "total_amount", "payment_type", "trip_type", "pickup", "dropoff", "cab_type", "precipitation", "snow_depth", "snowfall", "max_temp", "min_temp", "wind", "pickup_nyct2010_gid", "pickup_ctlabel", "pickup_borocode", "pickup_boroname", "pickup_ct2010", "pickup_boroct2010", "pickup_cdeligibil", "pickup_ntacode", "pickup_ntaname", "pickup_puma", "dropoff_nyct2010_gid", "dropoff_ctlabel", "dropoff_borocode", "dropoff_boroname", "dropoff_ct2010", "dropoff_boroct2010", "dropoff_cdeligibil", "dropoff_ntacode", "dropoff_ntaname", "dropoff_puma"}, + wantFields: `3389224,2,2014-03-26 00:26:15,2014-03-26 00:28:38,N,1,-73.950431823730469,40.792251586914063,-73.938949584960937,40.794425964355469,1,0.84,4.5,0.5,0.5,1,0,,,6.5,1,1,75,74,green,0.00,0.0,0.0,36,24,11.86,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1828,180,1,Manhattan,018000,1018000,E,MN34,East Harlem North,3804 +3389225,2,2014-03-31 09:42:15,2014-03-31 10:01:17,N,1,-73.950340270996094,40.792228698730469,-73.941970825195313,40.842235565185547,1,4.47,17.5,0,0.5,0,0,,,18,2,1,75,244,green,0.16,0.0,0.0,56,36,8.28,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,911,251,1,Manhattan,025100 +`, + wantErr: io.EOF, + }, + { + file: "truncated-records.csv", + recordDelimiter: "^Y", + fieldDelimiter: ",", + sendErr: customErr, + header: true, + wantColumns: []string{"trip_id", "vendor_id", "pickup_datetime", "dropoff_datetime", "store_and_fwd_flag", "rate_code_id", "pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude", "passenger_count", "trip_distance", "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount", "ehail_fee", "improvement_surcharge", "total_amount", "payment_type", "trip_type", "pickup", "dropoff", "cab_type", "precipitation", "snow_depth", "snowfall", "max_temp", "min_temp", "wind", "pickup_nyct2010_gid", "pickup_ctlabel", "pickup_borocode", "pickup_boroname", "pickup_ct2010", "pickup_boroct2010", "pickup_cdeligibil", "pickup_ntacode", "pickup_ntaname", "pickup_puma", "dropoff_nyct2010_gid", "dropoff_ctlabel", "dropoff_borocode", "dropoff_boroname", "dropoff_ct2010", "dropoff_boroct2010", "dropoff_cdeligibil", "dropoff_ntacode", "dropoff_ntaname", "dropoff_puma"}, + wantFields: `3389224,2,2014-03-26 00:26:15,2014-03-26 00:28:38,N,1,-73.950431823730469,40.792251586914063,-73.938949584960937,40.794425964355469,1,0.84,4.5,0.5,0.5,1,0,,,6.5,1,1,75,74,green,0.00,0.0,0.0,36,24,11.86,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,1828,180,1,Manhattan,018000,1018000,E,MN34,East Harlem North,3804 +3389225,2,2014-03-31 09:42:15,2014-03-31 10:01:17,N,1,-73.950340270996094,40.792228698730469,-73.941970825195313,40.842235565185547,1,4.47,17.5,0,0.5,0,0,,,18,2,1,75,244,green,0.16,0.0,0.0,56,36,8.28,1267,168,1,Manhattan,016800,1016800,E,MN33,East Harlem South,3804,911,251,1,Manhattan,025100 +`, + wantErr: customErr, + }, + { + // This works since LazyQuotes is true: + file: "invalid-badbarequote.csv", + recordDelimiter: "\n", + fieldDelimiter: ",", + sendErr: nil, + header: true, + wantColumns: []string{"header1", "header2", "header3"}, + wantFields: "ok1,ok2,ok3\n" + `"a ""word""",b` + "\n", + wantErr: io.EOF, + }, + { + // This works since LazyQuotes is true: + file: "invalid-baddoubleq.csv", + recordDelimiter: "\n", + fieldDelimiter: ",", + sendErr: nil, + header: true, + wantColumns: []string{"header1", "header2", "header3"}, + wantFields: "ok1,ok2,ok3\n" + `"a""""b",c` + "\n", + wantErr: io.EOF, + }, + { + // This works since LazyQuotes is true: + file: "invalid-badextraq.csv", + recordDelimiter: "\n", + fieldDelimiter: ",", + sendErr: nil, + header: true, + wantColumns: []string{"header1", "header2", "header3"}, + wantFields: "ok1,ok2,ok3\n" + `a word,"b"""` + "\n", + wantErr: io.EOF, + }, + { + // This works since LazyQuotes is true: + file: "invalid-badstartline.csv", + recordDelimiter: "\n", + fieldDelimiter: ",", + sendErr: nil, + header: true, + wantColumns: []string{"header1", "header2", "header3"}, + wantFields: "ok1,ok2,ok3\n" + `a,"b` + "\n" + `c""d,e` + "\n\"\n", + wantErr: io.EOF, + }, + { + // This works since LazyQuotes is true: + file: "invalid-badstartline2.csv", + recordDelimiter: "\n", + fieldDelimiter: ",", + sendErr: nil, + header: true, + wantColumns: []string{"header1", "header2", "header3"}, + wantFields: "ok1,ok2,ok3\n" + `a,b` + "\n" + `"d` + "\n\ne\"\n", + wantErr: io.EOF, + }, + { + // This works since LazyQuotes is true: + file: "invalid-badtrailingq.csv", + recordDelimiter: "\n", + fieldDelimiter: ",", + sendErr: nil, + header: true, + wantColumns: []string{"header1", "header2", "header3"}, + wantFields: "ok1,ok2,ok3\n" + `a word,"b"""` + "\n", + wantErr: io.EOF, + }, + { + // This works since LazyQuotes is true: + file: "invalid-crlfquoted.csv", + recordDelimiter: "\n", + fieldDelimiter: ",", + sendErr: nil, + header: true, + wantColumns: []string{"header1", "header2", "header3"}, + wantFields: "ok1,ok2,ok3\n" + `"foo""bar"` + "\n", + wantErr: io.EOF, + }, + { + // This works since LazyQuotes is true: + file: "invalid-csv.csv", + recordDelimiter: "\n", + fieldDelimiter: ",", + sendErr: nil, + header: true, + wantColumns: []string{"header1", "header2", "header3"}, + wantFields: "ok1,ok2,ok3\n" + `"a""""b",c` + "\n", + wantErr: io.EOF, + }, + { + // This works since LazyQuotes is true, but output is very weird. + file: "invalid-oddquote.csv", + recordDelimiter: "\n", + fieldDelimiter: ",", + sendErr: nil, + header: true, + wantColumns: []string{"header1", "header2", "header3"}, + wantFields: "ok1,ok2,ok3\n" + `""""""",b,c` + "\n\"\n", + wantErr: io.EOF, + }, + { + // Test when file ends with a half separator + file: "endswithhalfsep.csv", + recordDelimiter: "%!", + fieldDelimiter: ",", + sendErr: nil, + header: false, + wantColumns: []string{"_1", "_2", "_3"}, + wantFields: "a,b,c\na2,b2,c2%\n", + wantErr: io.EOF, + }, + } + + for i, c := range cases { + t.Run(c.file, func(t *testing.T) { + + var err error + var record sql.Record + var result bytes.Buffer + input := openTestFile(t, c.file) + args := ReaderArgs{ + FileHeaderInfo: use, + RecordDelimiter: c.recordDelimiter, + FieldDelimiter: c.fieldDelimiter, + QuoteCharacter: defaultQuoteCharacter, + QuoteEscapeCharacter: defaultQuoteEscapeCharacter, + CommentCharacter: defaultCommentCharacter, + AllowQuotedRecordDelimiter: false, + unmarshaled: true, + } + if !c.header { + args.FileHeaderInfo = none + } + inr := io.Reader(bytes.NewReader(input)) + if c.sendErr != nil { + inr = io.MultiReader(inr, errReader{c.sendErr}) + } + r, _ := NewReader(ioutil.NopCloser(inr), &args) + fields := 0 + for { + record, err = r.Read(record) + if err != nil { + break + } + // Write with fixed delimiters, newlines. + err := record.WriteCSV(&result, ',') + if err != nil { + t.Error(err) + } + fields++ + } + r.Close() + if err != c.wantErr { + t.Fatalf("Case %d failed with %s", i, err) + } + if !reflect.DeepEqual(r.columnNames, c.wantColumns) { + t.Errorf("Case %d failed: expected \n%#v, got result \n%#v", i, c.wantColumns, r.columnNames) + } + if result.String() != c.wantFields { + t.Errorf("Case %d failed: expected \n%v\nGot result \n%v", i, c.wantFields, result.String()) + } + }) + } +} + +func BenchmarkReaderBasic(b *testing.B) { + args := ReaderArgs{ + FileHeaderInfo: use, + RecordDelimiter: "\n", + FieldDelimiter: ",", + QuoteCharacter: defaultQuoteCharacter, + QuoteEscapeCharacter: defaultQuoteEscapeCharacter, + CommentCharacter: defaultCommentCharacter, + AllowQuotedRecordDelimiter: false, + unmarshaled: true, + } + f := openTestFile(b, "nyc-taxi-data-100k.csv") + r, err := NewReader(ioutil.NopCloser(bytes.NewBuffer(f)), &args) + if err != nil { + b.Fatalf("Reading init failed with %s", err) + } + defer r.Close() + b.ReportAllocs() + b.ResetTimer() + b.SetBytes(int64(len(f))) + var record sql.Record + for i := 0; i < b.N; i++ { + r, err = NewReader(ioutil.NopCloser(bytes.NewBuffer(f)), &args) + if err != nil { + b.Fatalf("Reading init failed with %s", err) + } + for err == nil { + record, err = r.Read(record) + if err != nil && err != io.EOF { + b.Fatalf("Reading failed with %s", err) + } + } + r.Close() + } +} + +func BenchmarkReaderHuge(b *testing.B) { + args := ReaderArgs{ + FileHeaderInfo: use, + RecordDelimiter: "\n", + FieldDelimiter: ",", + QuoteCharacter: defaultQuoteCharacter, + QuoteEscapeCharacter: defaultQuoteEscapeCharacter, + CommentCharacter: defaultCommentCharacter, + AllowQuotedRecordDelimiter: false, + unmarshaled: true, + } + for n := 0; n < 11; n++ { + f := openTestFile(b, "nyc-taxi-data-100k.csv") + want := 309 + for i := 0; i < n; i++ { + f = append(f, f...) + want *= 2 + } + b.Run(fmt.Sprint(len(f)/(1<<10), "K"), func(b *testing.B) { + b.ReportAllocs() + b.SetBytes(int64(len(f))) + b.ResetTimer() + var record sql.Record + for i := 0; i < b.N; i++ { + r, err := NewReader(ioutil.NopCloser(bytes.NewBuffer(f)), &args) + if err != nil { + b.Fatalf("Reading init failed with %s", err) + } + + got := 0 + for err == nil { + record, err = r.Read(record) + if err != nil && err != io.EOF { + b.Fatalf("Reading failed with %s", err) + } + got++ + } + r.Close() + if got != want { + b.Errorf("want %d records, got %d", want, got) + } + } + }) + } +} + +func BenchmarkReaderReplace(b *testing.B) { + args := ReaderArgs{ + FileHeaderInfo: use, + RecordDelimiter: "^", + FieldDelimiter: ",", + QuoteCharacter: defaultQuoteCharacter, + QuoteEscapeCharacter: defaultQuoteEscapeCharacter, + CommentCharacter: defaultCommentCharacter, + AllowQuotedRecordDelimiter: false, + unmarshaled: true, + } + f := openTestFile(b, "nyc-taxi-data-100k-single-delim.csv") + r, err := NewReader(ioutil.NopCloser(bytes.NewBuffer(f)), &args) + if err != nil { + b.Fatalf("Reading init failed with %s", err) + } + defer r.Close() + b.ReportAllocs() + b.ResetTimer() + b.SetBytes(int64(len(f))) + var record sql.Record + for i := 0; i < b.N; i++ { + r, err = NewReader(ioutil.NopCloser(bytes.NewBuffer(f)), &args) + if err != nil { + b.Fatalf("Reading init failed with %s", err) + } + + for err == nil { + record, err = r.Read(record) + if err != nil && err != io.EOF { + b.Fatalf("Reading failed with %s", err) + } + } + r.Close() + } +} + +func BenchmarkReaderReplaceTwo(b *testing.B) { + args := ReaderArgs{ + FileHeaderInfo: use, + RecordDelimiter: "^Y", + FieldDelimiter: ",", + QuoteCharacter: defaultQuoteCharacter, + QuoteEscapeCharacter: defaultQuoteEscapeCharacter, + CommentCharacter: defaultCommentCharacter, + AllowQuotedRecordDelimiter: false, + unmarshaled: true, + } + f := openTestFile(b, "nyc-taxi-data-100k-multi-delim.csv") + r, err := NewReader(ioutil.NopCloser(bytes.NewBuffer(f)), &args) + if err != nil { + b.Fatalf("Reading init failed with %s", err) + } + defer r.Close() + b.ReportAllocs() + b.ResetTimer() + b.SetBytes(int64(len(f))) + var record sql.Record + for i := 0; i < b.N; i++ { + r, err = NewReader(ioutil.NopCloser(bytes.NewBuffer(f)), &args) + if err != nil { + b.Fatalf("Reading init failed with %s", err) + } + + for err == nil { + record, err = r.Read(record) + if err != nil && err != io.EOF { + b.Fatalf("Reading failed with %s", err) + } + } + r.Close() + } +} diff --git a/pkg/s3select/csv/record.go b/pkg/s3select/csv/record.go index 3152749a6..1a53ea478 100644 --- a/pkg/s3select/csv/record.go +++ b/pkg/s3select/csv/record.go @@ -27,7 +27,7 @@ import ( "github.com/minio/minio/pkg/s3select/sql" ) -// Record - is CSV record. +// Record - is a CSV record. type Record struct { columnNames []string csvRecord []string diff --git a/pkg/s3select/csv/recordtransform.go b/pkg/s3select/csv/recordtransform.go new file mode 100644 index 000000000..18b5ea48f --- /dev/null +++ b/pkg/s3select/csv/recordtransform.go @@ -0,0 +1,93 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package csv + +import ( + "bytes" + "io" +) + +// recordTransform will convert records to always have newline records. +type recordTransform struct { + reader io.Reader + // recordDelimiter can be up to 2 characters. + recordDelimiter []byte + oneByte []byte + useOneByte bool +} + +func (rr *recordTransform) Read(p []byte) (n int, err error) { + if rr.useOneByte { + p[0] = rr.oneByte[0] + rr.useOneByte = false + n, err = rr.reader.Read(p[1:]) + n++ + } else { + n, err = rr.reader.Read(p) + } + + if err != nil { + return n, err + } + + // Do nothing if record-delimiter is already newline. + if string(rr.recordDelimiter) == "\n" { + return n, nil + } + + // Change record delimiters to newline. + if len(rr.recordDelimiter) == 1 { + for idx := 0; idx < len(p); { + i := bytes.Index(p[idx:], rr.recordDelimiter) + if i < 0 { + break + } + idx += i + p[idx] = '\n' + } + return n, nil + } + + // 2 characters... + for idx := 0; idx < len(p); { + i := bytes.Index(p[idx:], rr.recordDelimiter) + if i < 0 { + break + } + idx += i + + p[idx] = '\n' + p = append(p[:idx+1], p[idx+2:]...) + n-- + } + + if p[n-1] != rr.recordDelimiter[0] { + return n, nil + } + + if _, err = rr.reader.Read(rr.oneByte); err != nil { + return n, err + } + + if rr.oneByte[0] == rr.recordDelimiter[1] { + p[n-1] = '\n' + return n, nil + } + + rr.useOneByte = true + return n, nil +} diff --git a/pkg/s3select/csv/testdata/testdata.zip b/pkg/s3select/csv/testdata/testdata.zip new file mode 100644 index 000000000..e4519334f Binary files /dev/null and b/pkg/s3select/csv/testdata/testdata.zip differ diff --git a/pkg/s3select/json/reader.go b/pkg/s3select/json/reader.go index b29365ccd..4be0fad0c 100644 --- a/pkg/s3select/json/reader.go +++ b/pkg/s3select/json/reader.go @@ -33,7 +33,7 @@ type Reader struct { } // Read - reads single record. -func (r *Reader) Read() (sql.Record, error) { +func (r *Reader) Read(dst sql.Record) (sql.Record, error) { v, ok := <-r.valueCh if !ok { if err := r.decoder.Err(); err != nil { @@ -55,15 +55,25 @@ func (r *Reader) Read() (sql.Record, error) { kvs = jstream.KVS{jstream.KV{Key: "_1", Value: v.Value}} } - return &Record{ - KVS: kvs, - SelectFormat: sql.SelectFmtJSON, - }, nil + dstRec, ok := dst.(*Record) + if !ok { + dstRec = &Record{} + } + dstRec.KVS = kvs + dstRec.SelectFormat = sql.SelectFmtJSON + return dstRec, nil } -// Close - closes underlaying reader. +// Close - closes underlying reader. func (r *Reader) Close() error { - return r.readCloser.Close() + // Close the input. + // Potentially racy if the stream decoder is still reading. + err := r.readCloser.Close() + for range r.valueCh { + // Drain values so we don't leak a goroutine. + // Since we have closed the input, it should fail rather quickly. + } + return err } // NewReader - creates new JSON reader using readCloser. diff --git a/pkg/s3select/json/reader_test.go b/pkg/s3select/json/reader_test.go index a0a9e277c..130fde9df 100644 --- a/pkg/s3select/json/reader_test.go +++ b/pkg/s3select/json/reader_test.go @@ -17,26 +17,30 @@ package json import ( + "bytes" "io" "io/ioutil" "os" "path/filepath" "testing" + + "github.com/minio/minio/pkg/s3select/sql" ) func TestNewReader(t *testing.T) { - files, err := ioutil.ReadDir("data") + files, err := ioutil.ReadDir("testdata") if err != nil { t.Fatal(err) } for _, file := range files { - f, err := os.Open(filepath.Join("data", file.Name())) + f, err := os.Open(filepath.Join("testdata", file.Name())) if err != nil { t.Fatal(err) } r := NewReader(f, &ReaderArgs{}) + var record sql.Record for { - _, err = r.Read() + record, err = r.Read(record) if err != nil { break } @@ -47,3 +51,35 @@ func TestNewReader(t *testing.T) { } } } + +func BenchmarkReader(b *testing.B) { + files, err := ioutil.ReadDir("testdata") + if err != nil { + b.Fatal(err) + } + for _, file := range files { + b.Run(file.Name(), func(b *testing.B) { + f, err := ioutil.ReadFile(filepath.Join("testdata", file.Name())) + if err != nil { + b.Fatal(err) + } + b.SetBytes(int64(len(f))) + b.ReportAllocs() + b.ResetTimer() + var record sql.Record + for i := 0; i < b.N; i++ { + r := NewReader(ioutil.NopCloser(bytes.NewBuffer(f)), &ReaderArgs{}) + for { + record, err = r.Read(record) + if err != nil { + break + } + } + r.Close() + if err != io.EOF { + b.Fatalf("Reading failed with %s, %s", err, file.Name()) + } + } + }) + } +} diff --git a/pkg/s3select/json/data/10.json b/pkg/s3select/json/testdata/10.json similarity index 100% rename from pkg/s3select/json/data/10.json rename to pkg/s3select/json/testdata/10.json diff --git a/pkg/s3select/json/data/11.json b/pkg/s3select/json/testdata/11.json similarity index 100% rename from pkg/s3select/json/data/11.json rename to pkg/s3select/json/testdata/11.json diff --git a/pkg/s3select/json/data/12.json b/pkg/s3select/json/testdata/12.json similarity index 100% rename from pkg/s3select/json/data/12.json rename to pkg/s3select/json/testdata/12.json diff --git a/pkg/s3select/json/data/2.json b/pkg/s3select/json/testdata/2.json similarity index 100% rename from pkg/s3select/json/data/2.json rename to pkg/s3select/json/testdata/2.json diff --git a/pkg/s3select/json/data/3.json b/pkg/s3select/json/testdata/3.json similarity index 100% rename from pkg/s3select/json/data/3.json rename to pkg/s3select/json/testdata/3.json diff --git a/pkg/s3select/json/data/4.json b/pkg/s3select/json/testdata/4.json similarity index 100% rename from pkg/s3select/json/data/4.json rename to pkg/s3select/json/testdata/4.json diff --git a/pkg/s3select/json/data/5.json b/pkg/s3select/json/testdata/5.json similarity index 100% rename from pkg/s3select/json/data/5.json rename to pkg/s3select/json/testdata/5.json diff --git a/pkg/s3select/json/data/6.json b/pkg/s3select/json/testdata/6.json similarity index 100% rename from pkg/s3select/json/data/6.json rename to pkg/s3select/json/testdata/6.json diff --git a/pkg/s3select/json/data/7.json b/pkg/s3select/json/testdata/7.json similarity index 100% rename from pkg/s3select/json/data/7.json rename to pkg/s3select/json/testdata/7.json diff --git a/pkg/s3select/json/data/8.json b/pkg/s3select/json/testdata/8.json similarity index 100% rename from pkg/s3select/json/data/8.json rename to pkg/s3select/json/testdata/8.json diff --git a/pkg/s3select/json/data/9.json b/pkg/s3select/json/testdata/9.json similarity index 100% rename from pkg/s3select/json/data/9.json rename to pkg/s3select/json/testdata/9.json diff --git a/pkg/s3select/parquet/reader.go b/pkg/s3select/parquet/reader.go index 57d7a1deb..c63967bc3 100644 --- a/pkg/s3select/parquet/reader.go +++ b/pkg/s3select/parquet/reader.go @@ -33,7 +33,7 @@ type Reader struct { } // Read - reads single record. -func (r *Reader) Read() (rec sql.Record, rerr error) { +func (r *Reader) Read(dst sql.Record) (rec sql.Record, rerr error) { parquetRecord, err := r.reader.Read() if err != nil { if err != io.EOF { @@ -73,11 +73,20 @@ func (r *Reader) Read() (rec sql.Record, rerr error) { return true } + // Apply our range parquetRecord.Range(f) - return &jsonfmt.Record{KVS: kvs, SelectFormat: sql.SelectFmtParquet}, nil + + // Reuse destination if we can. + dstRec, ok := dst.(*jsonfmt.Record) + if !ok { + dstRec = &jsonfmt.Record{} + } + dstRec.SelectFormat = sql.SelectFmtParquet + dstRec.KVS = kvs + return dstRec, nil } -// Close - closes underlaying readers. +// Close - closes underlying readers. func (r *Reader) Close() error { return r.reader.Close() } diff --git a/pkg/s3select/select.go b/pkg/s3select/select.go index 923d3a920..a594d57f9 100644 --- a/pkg/s3select/select.go +++ b/pkg/s3select/select.go @@ -34,7 +34,9 @@ import ( ) type recordReader interface { - Read() (sql.Record, error) + // Read a record. + // dst is optional but will be used if valid. + Read(dst sql.Record) (sql.Record, error) Close() error } @@ -399,6 +401,7 @@ func (s3Select *S3Select) Evaluate(w http.ResponseWriter) { return true } + var rec sql.Record for { if s3Select.statement.LimitReached() { if err = writer.Finish(s3Select.getProgress()); err != nil { @@ -408,7 +411,7 @@ func (s3Select *S3Select) Evaluate(w http.ResponseWriter) { break } - if inputRecord, err = s3Select.recordReader.Read(); err != nil { + if rec, err = s3Select.recordReader.Read(rec); err != nil { if err != io.EOF { break } @@ -431,7 +434,7 @@ func (s3Select *S3Select) Evaluate(w http.ResponseWriter) { break } - if inputRecord, err = s3Select.statement.EvalFrom(s3Select.Input.format, inputRecord); err != nil { + if inputRecord, err = s3Select.statement.EvalFrom(s3Select.Input.format, rec); err != nil { break } diff --git a/pkg/s3select/select_test.go b/pkg/s3select/select_test.go index 3231356ee..569e47415 100644 --- a/pkg/s3select/select_test.go +++ b/pkg/s3select/select_test.go @@ -18,6 +18,7 @@ package s3select import ( "bytes" + "fmt" "io" "io/ioutil" "net/http" @@ -108,26 +109,29 @@ func TestCSVInput(t *testing.T) { 2.5,baz,true `) - for _, testCase := range testTable { - s3Select, err := NewS3Select(bytes.NewReader(testCase.requestXML)) - if err != nil { - t.Fatal(err) - } + for i, testCase := range testTable { + t.Run(fmt.Sprint(i), func(t *testing.T) { + s3Select, err := NewS3Select(bytes.NewReader(testCase.requestXML)) + if err != nil { + t.Fatal(err) + } - if err = s3Select.Open(func(offset, length int64) (io.ReadCloser, error) { - return ioutil.NopCloser(bytes.NewReader(csvData)), nil - }); err != nil { - t.Fatal(err) - } + if err = s3Select.Open(func(offset, length int64) (io.ReadCloser, error) { + return ioutil.NopCloser(bytes.NewReader(csvData)), nil + }); err != nil { + t.Fatal(err) + } - w := &testResponseWriter{} - s3Select.Evaluate(w) - s3Select.Close() + w := &testResponseWriter{} + s3Select.Evaluate(w) + s3Select.Close() - if !reflect.DeepEqual(w.response, testCase.expectedResult) { - t.Fatalf("received response does not match with expected reply") - } + if !reflect.DeepEqual(w.response, testCase.expectedResult) { + t.Errorf("received response does not match with expected reply\ngot: %#v\nwant:%#v", w.response, testCase.expectedResult) + } + }) } + } func TestJSONInput(t *testing.T) { @@ -191,26 +195,27 @@ func TestJSONInput(t *testing.T) { {"three":true,"two":"baz","one":2.5} `) - for _, testCase := range testTable { + for i, testCase := range testTable { + t.Run(fmt.Sprint(i), func(t *testing.T) { + s3Select, err := NewS3Select(bytes.NewReader(testCase.requestXML)) + if err != nil { + t.Fatal(err) + } - s3Select, err := NewS3Select(bytes.NewReader(testCase.requestXML)) - if err != nil { - t.Fatal(err) - } + if err = s3Select.Open(func(offset, length int64) (io.ReadCloser, error) { + return ioutil.NopCloser(bytes.NewReader(jsonData)), nil + }); err != nil { + t.Fatal(err) + } - if err = s3Select.Open(func(offset, length int64) (io.ReadCloser, error) { - return ioutil.NopCloser(bytes.NewReader(jsonData)), nil - }); err != nil { - t.Fatal(err) - } + w := &testResponseWriter{} + s3Select.Evaluate(w) + s3Select.Close() - w := &testResponseWriter{} - s3Select.Evaluate(w) - s3Select.Close() - - if !reflect.DeepEqual(w.response, testCase.expectedResult) { - t.Fatalf("received response does not match with expected reply") - } + if !reflect.DeepEqual(w.response, testCase.expectedResult) { + t.Errorf("received response does not match with expected reply\ngot: %s\nwant:%s", string(w.response), string(testCase.expectedResult)) + } + }) } } @@ -268,45 +273,47 @@ func TestParquetInput(t *testing.T) { }, } - for _, testCase := range testTable { - getReader := func(offset int64, length int64) (io.ReadCloser, error) { - testdataFile := "testdata.parquet" - file, err := os.Open(testdataFile) + for i, testCase := range testTable { + t.Run(fmt.Sprint(i), func(t *testing.T) { + getReader := func(offset int64, length int64) (io.ReadCloser, error) { + testdataFile := "testdata.parquet" + file, err := os.Open(testdataFile) + if err != nil { + return nil, err + } + + fi, err := file.Stat() + if err != nil { + return nil, err + } + + if offset < 0 { + offset = fi.Size() + offset + } + + if _, err = file.Seek(offset, os.SEEK_SET); err != nil { + return nil, err + } + + return file, nil + } + + s3Select, err := NewS3Select(bytes.NewReader(testCase.requestXML)) if err != nil { - return nil, err + t.Fatal(err) } - fi, err := file.Stat() - if err != nil { - return nil, err + if err = s3Select.Open(getReader); err != nil { + t.Fatal(err) } - if offset < 0 { - offset = fi.Size() + offset + w := &testResponseWriter{} + s3Select.Evaluate(w) + s3Select.Close() + + if !reflect.DeepEqual(w.response, testCase.expectedResult) { + t.Errorf("received response does not match with expected reply\ngot: %#v\nwant:%#v", w.response, testCase.expectedResult) } - - if _, err = file.Seek(offset, os.SEEK_SET); err != nil { - return nil, err - } - - return file, nil - } - - s3Select, err := NewS3Select(bytes.NewReader(testCase.requestXML)) - if err != nil { - t.Fatal(err) - } - - if err = s3Select.Open(getReader); err != nil { - t.Fatal(err) - } - - w := &testResponseWriter{} - s3Select.Evaluate(w) - s3Select.Close() - - if !reflect.DeepEqual(w.response, testCase.expectedResult) { - t.Fatalf("received response does not match with expected reply") - } + }) } }