api: Change ListenBucketNotification with new API format. (#2791)

Take prefix, suffix and events as query params.
This commit is contained in:
Harshavardhana 2016-09-27 12:50:32 -07:00
parent 9417614a8e
commit ca3022d545
4 changed files with 272 additions and 19 deletions

View File

@ -38,6 +38,7 @@ func getListObjectsV1Args(values url.Values) (prefix, marker, delimiter string,
// Parse bucket url queries for ListObjects V2. // Parse bucket url queries for ListObjects V2.
func getListObjectsV2Args(values url.Values) (prefix, token, startAfter, delimiter string, fetchOwner bool, maxkeys int, encodingType string) { func getListObjectsV2Args(values url.Values) (prefix, token, startAfter, delimiter string, fetchOwner bool, maxkeys int, encodingType string) {
prefix = values.Get("prefix") prefix = values.Get("prefix")
token = values.Get("continuation-token")
startAfter = values.Get("start-after") startAfter = values.Get("start-after")
delimiter = values.Get("delimiter") delimiter = values.Get("delimiter")
if values.Get("max-keys") != "" { if values.Get("max-keys") != "" {
@ -45,11 +46,8 @@ func getListObjectsV2Args(values url.Values) (prefix, token, startAfter, delimit
} else { } else {
maxkeys = maxObjectList maxkeys = maxObjectList
} }
fetchOwner = values.Get("fetch-owner") == "true"
encodingType = values.Get("encoding-type") encodingType = values.Get("encoding-type")
token = values.Get("continuation-token")
if values.Get("fetch-owner") == "true" {
fetchOwner = true
}
return return
} }
@ -80,3 +78,11 @@ func getObjectResources(values url.Values) (uploadID string, partNumberMarker, m
encodingType = values.Get("encoding-type") encodingType = values.Get("encoding-type")
return return
} }
// Parse listen bucket notification resources.
func getListenBucketNotificationResources(values url.Values) (prefix string, suffix string, events []string) {
prefix = values.Get("prefix")
suffix = values.Get("suffix")
events = values["events"]
return prefix, suffix, events
}

190
cmd/api-resources_test.go Normal file
View File

@ -0,0 +1,190 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"net/url"
"testing"
)
// Test list objects resources V2.
func TestListObjectsV2Resources(t *testing.T) {
testCases := []struct {
values url.Values
prefix, token, startAfter, delimiter string
fetchOwner bool
maxKeys int
encodingType string
}{
{
values: url.Values{
"prefix": []string{"photos/"},
"continuation-token": []string{"token"},
"start-after": []string{"start-after"},
"delimiter": []string{"/"},
"fetch-owner": []string{"true"},
"max-keys": []string{"100"},
"encoding-type": []string{"gzip"},
},
prefix: "photos/",
token: "token",
startAfter: "start-after",
delimiter: "/",
fetchOwner: true,
maxKeys: 100,
encodingType: "gzip",
},
{
values: url.Values{
"prefix": []string{"photos/"},
"continuation-token": []string{"token"},
"start-after": []string{"start-after"},
"delimiter": []string{"/"},
"fetch-owner": []string{"true"},
"encoding-type": []string{"gzip"},
},
prefix: "photos/",
token: "token",
startAfter: "start-after",
delimiter: "/",
fetchOwner: true,
maxKeys: 1000,
encodingType: "gzip",
},
}
for i, testCase := range testCases {
prefix, token, startAfter, delimiter, fetchOwner, maxKeys, encodingType := getListObjectsV2Args(testCase.values)
if prefix != testCase.prefix {
t.Errorf("Test %d: Expected %s, got %s", i+1, testCase.prefix, prefix)
}
if token != testCase.token {
t.Errorf("Test %d: Expected %s, got %s", i+1, testCase.token, token)
}
if startAfter != testCase.startAfter {
t.Errorf("Test %d: Expected %s, got %s", i+1, testCase.startAfter, startAfter)
}
if delimiter != testCase.delimiter {
t.Errorf("Test %d: Expected %s, got %s", i+1, testCase.delimiter, delimiter)
}
if fetchOwner != testCase.fetchOwner {
t.Errorf("Test %d: Expected %t, got %t", i+1, testCase.fetchOwner, fetchOwner)
}
if maxKeys != testCase.maxKeys {
t.Errorf("Test %d: Expected %d, got %d", i+1, testCase.maxKeys, maxKeys)
}
if encodingType != testCase.encodingType {
t.Errorf("Test %d: Expected %s, got %s", i+1, testCase.encodingType, encodingType)
}
}
}
// Test list objects resources V1.
func TestListObjectsV1Resources(t *testing.T) {
testCases := []struct {
values url.Values
prefix, marker, delimiter string
maxKeys int
encodingType string
}{
{
values: url.Values{
"prefix": []string{"photos/"},
"marker": []string{"test"},
"delimiter": []string{"/"},
"max-keys": []string{"100"},
"encoding-type": []string{"gzip"},
},
prefix: "photos/",
marker: "test",
delimiter: "/",
maxKeys: 100,
encodingType: "gzip",
},
{
values: url.Values{
"prefix": []string{"photos/"},
"marker": []string{"test"},
"delimiter": []string{"/"},
"encoding-type": []string{"gzip"},
},
prefix: "photos/",
marker: "test",
delimiter: "/",
maxKeys: 1000,
encodingType: "gzip",
},
}
for i, testCase := range testCases {
prefix, marker, delimiter, maxKeys, encodingType := getListObjectsV1Args(testCase.values)
if prefix != testCase.prefix {
t.Errorf("Test %d: Expected %s, got %s", i+1, testCase.prefix, prefix)
}
if marker != testCase.marker {
t.Errorf("Test %d: Expected %s, got %s", i+1, testCase.marker, marker)
}
if delimiter != testCase.delimiter {
t.Errorf("Test %d: Expected %s, got %s", i+1, testCase.delimiter, delimiter)
}
if maxKeys != testCase.maxKeys {
t.Errorf("Test %d: Expected %d, got %d", i+1, testCase.maxKeys, maxKeys)
}
if encodingType != testCase.encodingType {
t.Errorf("Test %d: Expected %s, got %s", i+1, testCase.encodingType, encodingType)
}
}
}
// Validates extracting information for object resources.
func TestGetObjectsResources(t *testing.T) {
testCases := []struct {
values url.Values
uploadID string
partNumberMarker, maxParts int
encodingType string
}{
{
values: url.Values{
"uploadId": []string{"11123-11312312311231-12313"},
"part-number-marker": []string{"1"},
"max-parts": []string{"1000"},
"encoding-type": []string{"gzip"},
},
uploadID: "11123-11312312311231-12313",
partNumberMarker: 1,
maxParts: 1000,
encodingType: "gzip",
},
}
for i, testCase := range testCases {
uploadID, partNumberMarker, maxParts, encodingType := getObjectResources(testCase.values)
if uploadID != testCase.uploadID {
t.Errorf("Test %d: Expected %s, got %s", i+1, testCase.uploadID, uploadID)
}
if partNumberMarker != testCase.partNumberMarker {
t.Errorf("Test %d: Expected %d, got %d", i+1, testCase.partNumberMarker, partNumberMarker)
}
if maxParts != testCase.maxParts {
t.Errorf("Test %d: Expected %d, got %d", i+1, testCase.maxParts, maxParts)
}
if encodingType != testCase.encodingType {
t.Errorf("Test %d: Expected %s, got %s", i+1, testCase.encodingType, encodingType)
}
}
}

View File

@ -63,7 +63,7 @@ func registerAPIRouter(mux *router.Router, api objectAPIHandlers) {
// GetBucketNotification // GetBucketNotification
bucket.Methods("GET").HandlerFunc(api.GetBucketNotificationHandler).Queries("notification", "") bucket.Methods("GET").HandlerFunc(api.GetBucketNotificationHandler).Queries("notification", "")
// ListenBucketNotification // ListenBucketNotification
bucket.Methods("GET").HandlerFunc(api.ListenBucketNotificationHandler).Queries("notificationARN", "{notificationARN:.*}") bucket.Methods("GET").HandlerFunc(api.ListenBucketNotificationHandler).Queries("events", "{events:.*}")
// ListMultipartUploads // ListMultipartUploads
bucket.Methods("GET").HandlerFunc(api.ListMultipartUploadsHandler).Queries("uploads", "") bucket.Methods("GET").HandlerFunc(api.ListMultipartUploadsHandler).Queries("uploads", "")
// ListObjectsV2 // ListObjectsV2

View File

@ -20,6 +20,7 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"encoding/xml" "encoding/xml"
"fmt"
"io" "io"
"net/http" "net/http"
"path" "path"
@ -231,13 +232,21 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
vars := mux.Vars(r) vars := mux.Vars(r)
bucket := vars["bucket"] bucket := vars["bucket"]
// Get notification ARN. // Parse listen bucket notification resources.
topicARN := r.URL.Query().Get("notificationARN") prefix, suffix, events := getListenBucketNotificationResources(r.URL.Query())
if topicARN == "" { if !IsValidObjectPrefix(prefix) || !IsValidObjectPrefix(suffix) {
writeErrorResponse(w, r, ErrARNNotification, r.URL.Path) writeErrorResponse(w, r, ErrFilterValueInvalid, r.URL.Path)
return return
} }
// Validate all the resource events.
for _, event := range events {
if errCode := checkEvent(event); errCode != ErrNone {
writeErrorResponse(w, r, errCode, r.URL.Path)
return
}
}
_, err := objAPI.GetBucketInfo(bucket) _, err := objAPI.GetBucketInfo(bucket)
if err != nil { if err != nil {
errorIf(err, "Unable to bucket info.") errorIf(err, "Unable to bucket info.")
@ -245,16 +254,64 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
return return
} }
notificationCfg := globalEventNotifier.GetBucketNotificationConfig(bucket) accountID := fmt.Sprintf("%d", time.Now().UTC().UnixNano())
if notificationCfg == nil { accountARN := "arn:minio:sns:" + serverConfig.GetRegion() + accountID + ":listen"
writeErrorResponse(w, r, ErrARNNotification, r.URL.Path) var filterRules []filterRule
return if prefix != "" {
filterRules = append(filterRules, filterRule{
Name: "prefix",
Value: prefix,
})
}
if suffix != "" {
filterRules = append(filterRules, filterRule{
Name: "suffix",
Value: suffix,
})
} }
// Set SNS notifications only if special "listen" sns is set in bucket // Fetch for existing notification configs and update topic configs.
// notification configs. nConfig := globalEventNotifier.GetBucketNotificationConfig(bucket)
if !isMinioSNSConfigured(topicARN, notificationCfg.TopicConfigs) { if nConfig == nil {
writeErrorResponse(w, r, ErrARNNotification, r.URL.Path) // No notification configs found, initialize.
nConfig = &notificationConfig{
TopicConfigs: []topicConfig{{
TopicARN: accountARN,
serviceConfig: serviceConfig{
Events: events,
Filter: struct {
Key keyFilter `xml:"S3Key,omitempty"`
}{
Key: keyFilter{
FilterRules: filterRules,
},
},
ID: "sns-" + accountID,
},
}},
}
} else {
// Previously set notification configs found append to
// existing topic configs.
nConfig.TopicConfigs = append(nConfig.TopicConfigs, topicConfig{
TopicARN: accountARN,
serviceConfig: serviceConfig{
Events: events,
Filter: struct {
Key keyFilter `xml:"S3Key,omitempty"`
}{
Key: keyFilter{
FilterRules: filterRules,
},
},
ID: "sns-" + accountID,
},
})
}
// Save bucket notification config.
if err = globalEventNotifier.SetBucketNotificationConfig(bucket, nConfig); err != nil {
writeErrorResponse(w, r, toAPIErrorCode(err), r.URL.Path)
return return
} }
@ -267,9 +324,9 @@ func (api objectAPIHandlers) ListenBucketNotificationHandler(w http.ResponseWrit
defer close(nEventCh) defer close(nEventCh)
// Set sns target. // Set sns target.
globalEventNotifier.SetSNSTarget(topicARN, nEventCh) globalEventNotifier.SetSNSTarget(accountARN, nEventCh)
// Remove sns listener after the writer has closed or the client disconnected. // Remove sns listener after the writer has closed or the client disconnected.
defer globalEventNotifier.RemoveSNSTarget(topicARN, nEventCh) defer globalEventNotifier.RemoveSNSTarget(accountARN, nEventCh)
// Start sending bucket notifications. // Start sending bucket notifications.
sendBucketNotification(w, nEventCh) sendBucketNotification(w, nEventCh)