From 4c7c571875e1c065dd1920dc7207cdf452dfddfd Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 7 Dec 2018 14:55:32 -0800 Subject: [PATCH] Support JSON to CSV and CSV to JSON output format conversion (#6910) This PR implements one of the pending items in issue #6286 in S3 API a user can request CSV output for a JSON document and a JSON output for a CSV document. This PR refactors the code a little bit to bring this feature. --- cmd/object-handlers.go | 12 +++--- pkg/s3select/format/csv/csv.go | 34 ++++++++++++--- pkg/s3select/format/json/json.go | 48 ++++++++++++++++----- pkg/s3select/format/select.go | 2 + pkg/s3select/input.go | 74 ++++++++++++++++++++------------ pkg/s3select/select.go | 36 ++++++++++++---- 6 files changed, 150 insertions(+), 56 deletions(-) diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index bbf60c782..afa57c4cc 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -218,11 +218,13 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r writeErrorResponse(w, ErrInvalidFileHeaderInfo, r.URL, guessIsBrowserReq(r)) return } - if selectReq.OutputSerialization.CSV.QuoteFields != s3select.CSVQuoteFieldsAlways && - selectReq.OutputSerialization.CSV.QuoteFields != s3select.CSVQuoteFieldsAsNeeded && - selectReq.OutputSerialization.CSV.QuoteFields != "" { - writeErrorResponse(w, ErrInvalidQuoteFields, r.URL, guessIsBrowserReq(r)) - return + if selectReq.OutputSerialization.CSV != nil { + if selectReq.OutputSerialization.CSV.QuoteFields != s3select.CSVQuoteFieldsAlways && + selectReq.OutputSerialization.CSV.QuoteFields != s3select.CSVQuoteFieldsAsNeeded && + selectReq.OutputSerialization.CSV.QuoteFields != "" { + writeErrorResponse(w, ErrInvalidQuoteFields, r.URL, guessIsBrowserReq(r)) + return + } } if len(selectReq.InputSerialization.CSV.RecordDelimiter) > 2 { writeErrorResponse(w, ErrInvalidRequestParameter, r.URL, guessIsBrowserReq(r)) diff --git a/pkg/s3select/format/csv/csv.go b/pkg/s3select/format/csv/csv.go index e49eb6ef2..b08687e7e 100644 --- a/pkg/s3select/format/csv/csv.go +++ b/pkg/s3select/format/csv/csv.go @@ -57,9 +57,12 @@ type Options struct { // SQL expression meant to be evaluated. Expression string - // What the outputted CSV will be delimited by . + // Output CSV will be delimited by. OutputFieldDelimiter string + // Output CSV record will be delimited by. + OutputRecordDelimiter string + // Size of incoming object StreamSize int64 @@ -68,6 +71,9 @@ type Options struct { // Progress enabled, enable/disable progress messages. Progress bool + + // Output format type, supported values are CSV and JSON + OutputType format.Type } // cinput represents a record producing input from a formatted object. @@ -147,6 +153,9 @@ func (reader *cinput) readHeader() error { reader.firstRow = nil } else { reader.firstRow, readErr = reader.reader.Read() + if readErr != nil { + return format.ErrCSVParsingError + } reader.header = make([]string, len(reader.firstRow)) for i := range reader.firstRow { reader.header[i] = "_" + strconv.Itoa(i) @@ -173,8 +182,13 @@ func (reader *cinput) Read() ([]byte, error) { if dec != nil { var data []byte var err error - for i, value := range dec { - data, err = sjson.SetBytes(data, reader.header[i], value) + // Navigate column values in reverse order to preserve + // the input order for AWS S3 compatibility, because + // sjson adds json key/value pairs in first in last out + // fashion. This should be fixed in sjson ideally. Following + // work around is needed to circumvent this issue for now. + for i := len(dec) - 1; i >= 0; i-- { + data, err = sjson.SetBytes(data, reader.header[i], dec[i]) if err != nil { return nil, err } @@ -184,11 +198,16 @@ func (reader *cinput) Read() ([]byte, error) { return nil, nil } -// OutputFieldDelimiter - returns the delimiter specified in input request +// OutputFieldDelimiter - returns the requested output field delimiter. func (reader *cinput) OutputFieldDelimiter() string { return reader.options.OutputFieldDelimiter } +// OutputRecordDelimiter - returns the requested output record delimiter. +func (reader *cinput) OutputRecordDelimiter() string { + return reader.options.OutputFieldDelimiter +} + // HasHeader - returns true or false depending upon the header. func (reader *cinput) HasHeader() bool { return reader.options.HasHeader @@ -285,11 +304,16 @@ func (reader *cinput) CreateProgressXML() (string, error) { return xml.Header + string(out), nil } -// Type - return the data format type { +// Type - return the data format type func (reader *cinput) Type() format.Type { return format.CSV } +// OutputType - return the data format type +func (reader *cinput) OutputType() format.Type { + return reader.options.OutputType +} + // ColNameErrs is a function which makes sure that the headers are requested are // present in the file otherwise it throws an error. func (reader *cinput) ColNameErrs(columnNames []string) error { diff --git a/pkg/s3select/format/json/json.go b/pkg/s3select/format/json/json.go index e2e29e0ec..f9958a87d 100644 --- a/pkg/s3select/format/json/json.go +++ b/pkg/s3select/format/json/json.go @@ -22,6 +22,7 @@ import ( "io" "github.com/minio/minio/pkg/s3select/format" + "github.com/tidwall/gjson" ) // Options options are passed to the underlying encoding/json reader. @@ -40,24 +41,32 @@ type Options struct { // SQL expression meant to be evaluated. Expression string - // What the outputted will be delimited by . + // Input record delimiter. RecordDelimiter string + // Output CSV will be delimited by. + OutputFieldDelimiter string + + // Output record delimiter. + OutputRecordDelimiter string + // Size of incoming object StreamSize int64 - // True if Type is DOCUMENTS - Type bool + // True if DocumentType is DOCUMENTS + DocumentType bool // Progress enabled, enable/disable progress messages. Progress bool + + // Output format type, supported values are CSV and JSON + OutputType format.Type } // jinput represents a record producing input from a formatted file or pipe. type jinput struct { options *Options reader *bufio.Reader - firstRow []string header []string minOutputLength int stats struct { @@ -79,7 +88,6 @@ func New(opts *Options) (format.Select, error) { reader.stats.BytesScanned = opts.StreamSize reader.stats.BytesProcessed = 0 reader.stats.BytesReturned = 0 - return reader, nil } @@ -95,7 +103,7 @@ func (reader *jinput) UpdateBytesProcessed(size int64) { // Read the file and returns func (reader *jinput) Read() ([]byte, error) { - data, err := reader.reader.ReadBytes('\n') + data, _, err := reader.reader.ReadLine() if err != nil { if err == io.EOF || err == io.ErrClosedPipe { err = nil @@ -103,17 +111,32 @@ func (reader *jinput) Read() ([]byte, error) { err = format.ErrJSONParsingError } } + if err == nil { + var header []string + gjson.ParseBytes(data).ForEach(func(key, value gjson.Result) bool { + header = append(header, key.String()) + return true + }) + reader.header = header + } return data, err } -// OutputFieldDelimiter - returns the delimiter specified in input request +// OutputFieldDelimiter - returns the delimiter specified in input request, +// for JSON output this value is empty, but does have a value when +// output type is CSV. func (reader *jinput) OutputFieldDelimiter() string { - return "," + return reader.options.OutputFieldDelimiter +} + +// OutputRecordDelimiter - returns the delimiter specified in input request, after each JSON record. +func (reader *jinput) OutputRecordDelimiter() string { + return reader.options.OutputRecordDelimiter } // HasHeader - returns true or false depending upon the header. func (reader *jinput) HasHeader() bool { - return false + return true } // Expression - return the Select Expression for @@ -128,7 +151,7 @@ func (reader *jinput) UpdateBytesReturned(size int64) { // Header returns a nil in case of func (reader *jinput) Header() []string { - return nil + return reader.header } // CreateStatXML is the function which does the marshaling from the stat @@ -171,6 +194,11 @@ func (reader *jinput) Type() format.Type { return format.JSON } +// OutputType - return the data format type { +func (reader *jinput) OutputType() format.Type { + return reader.options.OutputType +} + // ColNameErrs - this is a dummy function for JSON input type. func (reader *jinput) ColNameErrs(columnNames []string) error { return nil diff --git a/pkg/s3select/format/select.go b/pkg/s3select/format/select.go index e1c4d971a..a2a0ca47b 100644 --- a/pkg/s3select/format/select.go +++ b/pkg/s3select/format/select.go @@ -22,10 +22,12 @@ import "encoding/xml" // https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectSELECTContent.html type Select interface { Type() Type + OutputType() Type Read() ([]byte, error) Header() []string HasHeader() bool OutputFieldDelimiter() string + OutputRecordDelimiter() string UpdateBytesProcessed(int64) Expression() string UpdateBytesReturned(int64) diff --git a/pkg/s3select/input.go b/pkg/s3select/input.go index 09d607e72..1cfb9f1a2 100644 --- a/pkg/s3select/input.go +++ b/pkg/s3select/input.go @@ -65,40 +65,60 @@ func New(reader io.Reader, size int64, req ObjectSelectRequest) (s3s format.Sele // Initializating options for CSV if req.InputSerialization.CSV != nil { - if req.OutputSerialization.CSV.FieldDelimiter == "" { - req.OutputSerialization.CSV.FieldDelimiter = "," - } if req.InputSerialization.CSV.FileHeaderInfo == "" { req.InputSerialization.CSV.FileHeaderInfo = CSVFileHeaderInfoNone } if req.InputSerialization.CSV.RecordDelimiter == "" { req.InputSerialization.CSV.RecordDelimiter = "\n" } - s3s, err = csv.New(&csv.Options{ - HasHeader: req.InputSerialization.CSV.FileHeaderInfo == CSVFileHeaderInfoUse, - RecordDelimiter: req.InputSerialization.CSV.RecordDelimiter, - FieldDelimiter: req.InputSerialization.CSV.FieldDelimiter, - Comments: req.InputSerialization.CSV.Comments, - Name: "S3Object", // Default table name for all objects - ReadFrom: reader, - Compressed: string(req.InputSerialization.CompressionType), - Expression: cleanExpr(req.Expression), - OutputFieldDelimiter: req.OutputSerialization.CSV.FieldDelimiter, - StreamSize: size, - HeaderOpt: req.InputSerialization.CSV.FileHeaderInfo == CSVFileHeaderInfoUse, - Progress: req.RequestProgress.Enabled, - }) + options := &csv.Options{ + Name: "S3Object", // Default table name for all objects + HasHeader: req.InputSerialization.CSV.FileHeaderInfo == CSVFileHeaderInfoUse, + RecordDelimiter: req.InputSerialization.CSV.RecordDelimiter, + FieldDelimiter: req.InputSerialization.CSV.FieldDelimiter, + Comments: req.InputSerialization.CSV.Comments, + ReadFrom: reader, + Compressed: string(req.InputSerialization.CompressionType), + Expression: cleanExpr(req.Expression), + StreamSize: size, + HeaderOpt: req.InputSerialization.CSV.FileHeaderInfo == CSVFileHeaderInfoUse, + Progress: req.RequestProgress.Enabled, + } + if req.OutputSerialization.CSV != nil { + if req.OutputSerialization.CSV.FieldDelimiter == "" { + req.OutputSerialization.CSV.FieldDelimiter = "," + } + options.OutputFieldDelimiter = req.OutputSerialization.CSV.FieldDelimiter + options.OutputRecordDelimiter = req.OutputSerialization.CSV.RecordDelimiter + options.OutputType = format.CSV + } + if req.OutputSerialization.JSON != nil { + options.OutputRecordDelimiter = req.OutputSerialization.JSON.RecordDelimiter + options.OutputType = format.JSON + } + // Initialize CSV input type + s3s, err = csv.New(options) } else if req.InputSerialization.JSON != nil { - // Initializating options for JSON - s3s, err = json.New(&json.Options{ - Name: "S3Object", // Default table name for all objects - ReadFrom: reader, - Compressed: string(req.InputSerialization.CompressionType), - Expression: cleanExpr(req.Expression), - StreamSize: size, - Type: req.InputSerialization.JSON.Type == JSONTypeDocument, - Progress: req.RequestProgress.Enabled, - }) + options := &json.Options{ + Name: "S3Object", // Default table name for all objects + ReadFrom: reader, + Compressed: string(req.InputSerialization.CompressionType), + Expression: cleanExpr(req.Expression), + StreamSize: size, + DocumentType: req.InputSerialization.JSON.Type == JSONTypeDocument, + Progress: req.RequestProgress.Enabled, + } + if req.OutputSerialization.JSON != nil { + options.OutputRecordDelimiter = req.OutputSerialization.JSON.RecordDelimiter + options.OutputType = format.JSON + } + if req.OutputSerialization.CSV != nil { + options.OutputFieldDelimiter = req.OutputSerialization.CSV.FieldDelimiter + options.OutputRecordDelimiter = req.OutputSerialization.CSV.RecordDelimiter + options.OutputType = format.CSV + } + // Initialize JSON input type + s3s, err = json.New(options) } return s3s, err } diff --git a/pkg/s3select/select.go b/pkg/s3select/select.go index 4046f4432..c371db3db 100644 --- a/pkg/s3select/select.go +++ b/pkg/s3select/select.go @@ -201,14 +201,19 @@ func processSelectReq(reqColNames []string, alias string, wc sqlparser.Expr, lre lrecords = math.MaxInt64 } - columnsKv, err := columnsIndex(reqColNames, f) - if err != nil { - rowCh <- Row{ - err: err, + var results []string + var columnsKv []columnKv + if f.Type() == format.CSV { + var err error + columnsKv, err = columnsIndex(reqColNames, f) + if err != nil { + rowCh <- Row{ + err: err, + } + return } - return + results = make([]string, len(columnsKv)) } - var results = make([]string, len(columnsKv)) for { record, err := f.Read() @@ -228,6 +233,19 @@ func processSelectReq(reqColNames []string, alias string, wc sqlparser.Expr, lre return } + // For JSON multi-line input type columns needs + // to be handled for each record. + if f.Type() == format.JSON { + columnsKv, err = columnsIndex(reqColNames, f) + if err != nil { + rowCh <- Row{ + err: err, + } + return + } + results = make([]string, len(columnsKv)) + } + f.UpdateBytesProcessed(int64(len(record))) // Return in case the number of record reaches the LIMIT @@ -250,17 +268,17 @@ func processSelectReq(reqColNames []string, alias string, wc sqlparser.Expr, lre if condition { // if its an asterix we just print everything in the row if reqColNames[0] == "*" && fnNames[0] == "" { - switch f.Type() { + switch f.OutputType() { case format.CSV: for i, kv := range columnsKv { results[i] = gjson.GetBytes(record, kv.Key).String() } rowCh <- Row{ - record: strings.Join(results, f.OutputFieldDelimiter()) + "\n", + record: strings.Join(results, f.OutputFieldDelimiter()) + f.OutputRecordDelimiter(), } case format.JSON: rowCh <- Row{ - record: string(record) + "\n", + record: string(record) + f.OutputRecordDelimiter(), } } } else if alias != "" {