Fix: Support Unicode delimiters in s3 select (#7931)

This commit is contained in:
Kanagaraj M 2019-07-17 23:40:18 +05:30 committed by kannappanr
parent a57c747667
commit 12353caf35
2 changed files with 187 additions and 4 deletions

View File

@ -292,6 +292,188 @@ func testListObjects(s3Client *s3.S3) {
successLogger(function, args, startTime).Info() successLogger(function, args, startTime).Info()
} }
func testSelectObject(s3Client *s3.S3) {
startTime := time.Now()
function := "testSelectObject"
bucket := randString(60, rand.NewSource(time.Now().UnixNano()), "aws-sdk-go-test-")
object1 := "object1.csv"
object2 := "object2.csv"
args := map[string]interface{}{
"bucketName": bucket,
"objectName1": object1,
"objectName2": object2,
}
_, err := s3Client.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String(bucket),
})
if err != nil {
failureLog(function, args, startTime, "", "AWS SDK Go CreateBucket Failed", err).Fatal()
return
}
// Test comma field seperator
inputCsv1 := `year,gender,ethnicity,firstname,count,rank
2011,FEMALE,ASIAN AND PACIFIC ISLANDER,SOPHIA,119,1
2011,FEMALE,ASIAN AND PACIFIC ISLANDER,CHLOE,106,2
2011,FEMALE,ASIAN AND PACIFIC ISLANDER,EMILY,93,3
2011,FEMALE,ASIAN AND PACIFIC ISLANDER,OLIVIA,89,4
2011,FEMALE,ASIAN AND PACIFIC ISLANDER,EMMA,75,5
2011,FEMALE,ASIAN AND PACIFIC ISLANDER,ISABELLA,67,6
2011,FEMALE,ASIAN AND PACIFIC ISLANDER,TIFFANY,54,7
2011,FEMALE,ASIAN AND PACIFIC ISLANDER,ASHLEY,52,8
2011,FEMALE,ASIAN AND PACIFIC ISLANDER,FIONA,48,9
2011,FEMALE,ASIAN AND PACIFIC ISLANDER,ANGELA,47,10
`
outputCSV1 := `2011
2011
2011
2011
2011
2011
2011
2011
2011
2011
`
putInput1 := &s3.PutObjectInput{
Body: aws.ReadSeekCloser(strings.NewReader(inputCsv1)),
Bucket: aws.String(bucket),
Key: aws.String(object1),
}
_, err = s3Client.PutObject(putInput1)
defer cleanup(s3Client, bucket, object1, function, args, startTime, true)
params := &s3.SelectObjectContentInput{
Bucket: &bucket,
Key: &object1,
ExpressionType: aws.String(s3.ExpressionTypeSql),
Expression: aws.String("SELECT s._1 FROM S3Object s"),
RequestProgress: &s3.RequestProgress{},
InputSerialization: &s3.InputSerialization{
CompressionType: aws.String("NONE"),
CSV: &s3.CSVInput{
FileHeaderInfo: aws.String(s3.FileHeaderInfoIgnore),
FieldDelimiter: aws.String(","),
RecordDelimiter: aws.String("\n"),
},
},
OutputSerialization: &s3.OutputSerialization{
CSV: &s3.CSVOutput{},
},
}
resp, err := s3Client.SelectObjectContent(params)
if err != nil {
failureLog(function, args, startTime, "", fmt.Sprintf("AWS SDK Go Select object failed %v", err), err).Fatal()
return
}
defer resp.EventStream.Close()
payload := ""
for event := range resp.EventStream.Events() {
switch v := event.(type) {
case *s3.RecordsEvent:
// s3.RecordsEvent.Records is a byte slice of select records
payload = string(v.Payload)
}
}
if err := resp.EventStream.Err(); err != nil {
failureLog(function, args, startTime, "", fmt.Sprintf("AWS SDK Go Select object failed %v", err), err).Fatal()
return
}
if payload != outputCSV1 {
failureLog(function, args, startTime, "", fmt.Sprintf("AWS SDK Go Select object output mismatch %v", payload), errors.New("AWS S3 select object mismatch")).Fatal()
return
}
// Test unicode field seperator
inputCsv2 := `"year""gender""ethnicity""firstname""count""rank"
"2011""FEMALE""ASIAN AND PACIFIC ISLANDER""SOPHIA""119""1"
"2011""FEMALE""ASIAN AND PACIFIC ISLANDER""CHLOE""106""2"
"2011""FEMALE""ASIAN AND PACIFIC ISLANDER""EMILY""93""3"
"2011""FEMALE""ASIAN AND PACIFIC ISLANDER""OLIVIA""89""4"
"2011""FEMALE""ASIAN AND PACIFIC ISLANDER""EMMA""75""5"
"2011""FEMALE""ASIAN AND PACIFIC ISLANDER""ISABELLA""67""6"
"2011""FEMALE""ASIAN AND PACIFIC ISLANDER""TIFFANY""54""7"
"2011""FEMALE""ASIAN AND PACIFIC ISLANDER""ASHLEY""52""8"
"2011""FEMALE""ASIAN AND PACIFIC ISLANDER""FIONA""48""9"
"2011""FEMALE""ASIAN AND PACIFIC ISLANDER""ANGELA""47""10"
`
outputCSV2 := `2011
2011
2011
2011
2011
2011
2011
2011
2011
2011
`
putInput2 := &s3.PutObjectInput{
Body: aws.ReadSeekCloser(strings.NewReader(inputCsv2)),
Bucket: aws.String(bucket),
Key: aws.String(object2),
}
_, err = s3Client.PutObject(putInput2)
defer cleanup(s3Client, bucket, object2, function, args, startTime, false)
params2 := &s3.SelectObjectContentInput{
Bucket: &bucket,
Key: &object2,
ExpressionType: aws.String(s3.ExpressionTypeSql),
Expression: aws.String("SELECT s._1 FROM S3Object s"),
RequestProgress: &s3.RequestProgress{},
InputSerialization: &s3.InputSerialization{
CompressionType: aws.String("NONE"),
CSV: &s3.CSVInput{
FileHeaderInfo: aws.String(s3.FileHeaderInfoIgnore),
FieldDelimiter: aws.String("╦"),
RecordDelimiter: aws.String("\n"),
},
},
OutputSerialization: &s3.OutputSerialization{
CSV: &s3.CSVOutput{},
},
}
resp, err = s3Client.SelectObjectContent(params2)
if err != nil {
failureLog(function, args, startTime, "", fmt.Sprintf("AWS SDK Go Select object failed for unicode separator %v", err), err).Fatal()
return
}
defer resp.EventStream.Close()
for event := range resp.EventStream.Events() {
switch v := event.(type) {
case *s3.RecordsEvent:
// s3.RecordsEvent.Records is a byte slice of select records
payload = string(v.Payload)
}
}
if err := resp.EventStream.Err(); err != nil {
failureLog(function, args, startTime, "", fmt.Sprintf("AWS SDK Go Select object failed for unicode separator %v", err), err).Fatal()
return
}
if payload != outputCSV2 {
failureLog(function, args, startTime, "", fmt.Sprintf("AWS SDK Go Select object output mismatch %v", payload), errors.New("AWS S3 select object mismatch")).Fatal()
return
}
successLogger(function, args, startTime).Info()
}
func main() { func main() {
endpoint := os.Getenv("SERVER_ENDPOINT") endpoint := os.Getenv("SERVER_ENDPOINT")
accessKey := os.Getenv("ACCESS_KEY") accessKey := os.Getenv("ACCESS_KEY")
@ -325,4 +507,5 @@ func main() {
// execute tests // execute tests
testPresignedPutInvalidHash(s3Client) testPresignedPutInvalidHash(s3Client)
testListObjects(s3Client) testListObjects(s3Client)
testSelectObject(s3Client)
} }

View File

@ -72,7 +72,7 @@ func (args *ReaderArgs) UnmarshalXML(d *xml.Decoder, start xml.StartElement) err
return errInvalidFileHeaderInfo(fmt.Errorf("invalid FileHeaderInfo '%v'", parsedArgs.FileHeaderInfo)) return errInvalidFileHeaderInfo(fmt.Errorf("invalid FileHeaderInfo '%v'", parsedArgs.FileHeaderInfo))
} }
switch len(parsedArgs.RecordDelimiter) { switch len([]rune(parsedArgs.RecordDelimiter)) {
case 0: case 0:
parsedArgs.RecordDelimiter = defaultRecordDelimiter parsedArgs.RecordDelimiter = defaultRecordDelimiter
case 1, 2: case 1, 2:
@ -80,7 +80,7 @@ func (args *ReaderArgs) UnmarshalXML(d *xml.Decoder, start xml.StartElement) err
return fmt.Errorf("invalid RecordDelimiter '%v'", parsedArgs.RecordDelimiter) return fmt.Errorf("invalid RecordDelimiter '%v'", parsedArgs.RecordDelimiter)
} }
switch len(parsedArgs.FieldDelimiter) { switch len([]rune(parsedArgs.FieldDelimiter)) {
case 0: case 0:
parsedArgs.FieldDelimiter = defaultFieldDelimiter parsedArgs.FieldDelimiter = defaultFieldDelimiter
case 1: case 1:
@ -154,7 +154,7 @@ func (args *WriterArgs) UnmarshalXML(d *xml.Decoder, start xml.StartElement) err
return errInvalidQuoteFields(fmt.Errorf("invalid QuoteFields '%v'", parsedArgs.QuoteFields)) return errInvalidQuoteFields(fmt.Errorf("invalid QuoteFields '%v'", parsedArgs.QuoteFields))
} }
switch len(parsedArgs.RecordDelimiter) { switch len([]rune(parsedArgs.RecordDelimiter)) {
case 0: case 0:
parsedArgs.RecordDelimiter = defaultRecordDelimiter parsedArgs.RecordDelimiter = defaultRecordDelimiter
case 1, 2: case 1, 2:
@ -162,7 +162,7 @@ func (args *WriterArgs) UnmarshalXML(d *xml.Decoder, start xml.StartElement) err
return fmt.Errorf("invalid RecordDelimiter '%v'", parsedArgs.RecordDelimiter) return fmt.Errorf("invalid RecordDelimiter '%v'", parsedArgs.RecordDelimiter)
} }
switch len(parsedArgs.FieldDelimiter) { switch len([]rune(parsedArgs.FieldDelimiter)) {
case 0: case 0:
parsedArgs.FieldDelimiter = defaultFieldDelimiter parsedArgs.FieldDelimiter = defaultFieldDelimiter
case 1: case 1: