Harshavardhana f248089523 api: Implement bucket notification. (#2271)
* Implement basic S3 notifications through queues

Supports multiple queues and three basic queue types:

1. NilQueue -- messages don't get sent anywhere
2. LogQueue -- messages get logged
3. AmqpQueue -- messages are sent to an AMQP queue

* api: Implement bucket notification.

Supports two different queue types

- AMQP
- ElasticSearch.

* Add support for redis
2016-07-23 22:51:12 -07:00

359 lines
9.3 KiB
Go

// Copyright 2012-2015 Oliver Eilhard. All rights reserved.
// Use of this source code is governed by a MIT-license.
// See http://olivere.mit-license.org/license.txt for details.
package elastic
import (
"errors"
"fmt"
"net/url"
"strings"
"gopkg.in/olivere/elastic.v3/uritemplates"
)
const (
defaultKeepAlive = "5m"
)
var (
// End of stream (or scan)
EOS = errors.New("EOS")
// No ScrollId
ErrNoScrollId = errors.New("no scrollId")
)
// ScanService manages a cursor through documents in Elasticsearch.
type ScanService struct {
client *Client
indices []string
types []string
keepAlive string
searchSource *SearchSource
pretty bool
routing string
preference string
size *int
}
// NewScanService creates a new service to iterate through the results
// of a query.
func NewScanService(client *Client) *ScanService {
builder := &ScanService{
client: client,
searchSource: NewSearchSource().Query(NewMatchAllQuery()),
}
return builder
}
// Index sets the name(s) of the index to use for scan.
func (s *ScanService) Index(indices ...string) *ScanService {
if s.indices == nil {
s.indices = make([]string, 0)
}
s.indices = append(s.indices, indices...)
return s
}
// Types allows to restrict the scan to a list of types.
func (s *ScanService) Type(types ...string) *ScanService {
if s.types == nil {
s.types = make([]string, 0)
}
s.types = append(s.types, types...)
return s
}
// Scroll is an alias for KeepAlive, the time to keep
// the cursor alive (e.g. "5m" for 5 minutes).
func (s *ScanService) Scroll(keepAlive string) *ScanService {
s.keepAlive = keepAlive
return s
}
// KeepAlive sets the maximum time the cursor will be
// available before expiration (e.g. "5m" for 5 minutes).
func (s *ScanService) KeepAlive(keepAlive string) *ScanService {
s.keepAlive = keepAlive
return s
}
// Fields tells Elasticsearch to only load specific fields from a search hit.
// See http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-fields.html.
func (s *ScanService) Fields(fields ...string) *ScanService {
s.searchSource = s.searchSource.Fields(fields...)
return s
}
// SearchSource sets the search source builder to use with this service.
func (s *ScanService) SearchSource(searchSource *SearchSource) *ScanService {
s.searchSource = searchSource
if s.searchSource == nil {
s.searchSource = NewSearchSource().Query(NewMatchAllQuery())
}
return s
}
// Routing allows for (a comma-separated) list of specific routing values.
func (s *ScanService) Routing(routings ...string) *ScanService {
s.routing = strings.Join(routings, ",")
return s
}
// Preference specifies the node or shard the operation should be
// performed on (default: "random").
func (s *ScanService) Preference(preference string) *ScanService {
s.preference = preference
return s
}
// Query sets the query to perform, e.g. MatchAllQuery.
func (s *ScanService) Query(query Query) *ScanService {
s.searchSource = s.searchSource.Query(query)
return s
}
// PostFilter is executed as the last filter. It only affects the
// search hits but not facets. See
// http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-post-filter.html
// for details.
func (s *ScanService) PostFilter(postFilter Query) *ScanService {
s.searchSource = s.searchSource.PostFilter(postFilter)
return s
}
// FetchSource indicates whether the response should contain the stored
// _source for every hit.
func (s *ScanService) FetchSource(fetchSource bool) *ScanService {
s.searchSource = s.searchSource.FetchSource(fetchSource)
return s
}
// FetchSourceContext indicates how the _source should be fetched.
func (s *ScanService) FetchSourceContext(fetchSourceContext *FetchSourceContext) *ScanService {
s.searchSource = s.searchSource.FetchSourceContext(fetchSourceContext)
return s
}
// Version can be set to true to return a version for each search hit.
// See http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-version.html.
func (s *ScanService) Version(version bool) *ScanService {
s.searchSource = s.searchSource.Version(version)
return s
}
// Sort the results by the given field, in the given order.
// Use the alternative SortWithInfo to use a struct to define the sorting.
// See http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-sort.html
// for detailed documentation of sorting.
func (s *ScanService) Sort(field string, ascending bool) *ScanService {
s.searchSource = s.searchSource.Sort(field, ascending)
return s
}
// SortWithInfo defines how to sort results.
// Use the Sort func for a shortcut.
// See http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-sort.html
// for detailed documentation of sorting.
func (s *ScanService) SortWithInfo(info SortInfo) *ScanService {
s.searchSource = s.searchSource.SortWithInfo(info)
return s
}
// SortBy defines how to sort results.
// Use the Sort func for a shortcut.
// See http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-sort.html
// for detailed documentation of sorting.
func (s *ScanService) SortBy(sorter ...Sorter) *ScanService {
s.searchSource = s.searchSource.SortBy(sorter...)
return s
}
// Pretty enables the caller to indent the JSON output.
func (s *ScanService) Pretty(pretty bool) *ScanService {
s.pretty = pretty
return s
}
// Size is the number of results to return per shard, not per request.
// So a size of 10 which hits 5 shards will return a maximum of 50 results
// per scan request.
func (s *ScanService) Size(size int) *ScanService {
s.size = &size
return s
}
// Do executes the query and returns a "server-side cursor".
func (s *ScanService) Do() (*ScanCursor, error) {
// Build url
path := "/"
// Indices part
indexPart := make([]string, 0)
for _, index := range s.indices {
index, err := uritemplates.Expand("{index}", map[string]string{
"index": index,
})
if err != nil {
return nil, err
}
indexPart = append(indexPart, index)
}
if len(indexPart) > 0 {
path += strings.Join(indexPart, ",")
}
// Types
typesPart := make([]string, 0)
for _, typ := range s.types {
typ, err := uritemplates.Expand("{type}", map[string]string{
"type": typ,
})
if err != nil {
return nil, err
}
typesPart = append(typesPart, typ)
}
if len(typesPart) > 0 {
path += "/" + strings.Join(typesPart, ",")
}
// Search
path += "/_search"
// Parameters
params := make(url.Values)
if !s.searchSource.hasSort() {
// TODO: ES 2.1 deprecates search_type=scan. See https://www.elastic.co/guide/en/elasticsearch/reference/current/breaking_21_search_changes.html#_literal_search_type_scan_literal_deprecated.
params.Set("search_type", "scan")
}
if s.pretty {
params.Set("pretty", fmt.Sprintf("%v", s.pretty))
}
if s.keepAlive != "" {
params.Set("scroll", s.keepAlive)
} else {
params.Set("scroll", defaultKeepAlive)
}
if s.size != nil && *s.size > 0 {
params.Set("size", fmt.Sprintf("%d", *s.size))
}
if s.routing != "" {
params.Set("routing", s.routing)
}
// Get response
body, err := s.searchSource.Source()
if err != nil {
return nil, err
}
res, err := s.client.PerformRequest("POST", path, params, body)
if err != nil {
return nil, err
}
// Return result
searchResult := new(SearchResult)
if err := s.client.decoder.Decode(res.Body, searchResult); err != nil {
return nil, err
}
cursor := NewScanCursor(s.client, s.keepAlive, s.pretty, searchResult)
return cursor, nil
}
// scanCursor represents a single page of results from
// an Elasticsearch Scan operation.
type ScanCursor struct {
Results *SearchResult
client *Client
keepAlive string
pretty bool
currentPage int
}
// newScanCursor returns a new initialized instance
// of scanCursor.
func NewScanCursor(client *Client, keepAlive string, pretty bool, searchResult *SearchResult) *ScanCursor {
return &ScanCursor{
client: client,
keepAlive: keepAlive,
pretty: pretty,
Results: searchResult,
}
}
// TotalHits is a convenience method that returns the number
// of hits the cursor will iterate through.
func (c *ScanCursor) TotalHits() int64 {
if c.Results.Hits == nil {
return 0
}
return c.Results.Hits.TotalHits
}
// Next returns the next search result or nil when all
// documents have been scanned.
//
// Usage:
//
// for {
// res, err := cursor.Next()
// if err == elastic.EOS {
// // End of stream (or scan)
// break
// }
// if err != nil {
// // Handle error
// }
// // Work with res
// }
//
func (c *ScanCursor) Next() (*SearchResult, error) {
if c.currentPage > 0 {
if c.Results.Hits == nil || len(c.Results.Hits.Hits) == 0 || c.Results.Hits.TotalHits == 0 {
return nil, EOS
}
}
if c.Results.ScrollId == "" {
return nil, EOS
}
// Build url
path := "/_search/scroll"
// Parameters
params := make(url.Values)
if c.pretty {
params.Set("pretty", fmt.Sprintf("%v", c.pretty))
}
if c.keepAlive != "" {
params.Set("scroll", c.keepAlive)
} else {
params.Set("scroll", defaultKeepAlive)
}
// Set body
body := c.Results.ScrollId
// Get response
res, err := c.client.PerformRequest("POST", path, params, body)
if err != nil {
return nil, err
}
// Return result
c.Results = &SearchResult{ScrollId: body}
if err := c.client.decoder.Decode(res.Body, c.Results); err != nil {
return nil, err
}
c.currentPage += 1
return c.Results, nil
}