Add Support for Manta Object Storage as a Gateway (#5025)

Manta is an Object Storage by [Joyent](https://www.joyent.com/)

This PR adds initial support for Manta. It is intended as non-production 
ready so that feedback can be obtained.
This commit is contained in:
Paul Stack
2017-12-20 10:07:56 +02:00
committed by Nitish Tiwari
parent 1f77708a30
commit 7d75d61621
69 changed files with 20974 additions and 0 deletions

51
vendor/github.com/joyent/triton-go/storage/client.go generated vendored Normal file
View File

@@ -0,0 +1,51 @@
package storage
import (
triton "github.com/joyent/triton-go"
"github.com/joyent/triton-go/client"
)
type StorageClient struct {
Client *client.Client
}
func newStorageClient(client *client.Client) *StorageClient {
return &StorageClient{
Client: client,
}
}
// NewClient returns a new client for working with Storage endpoints and
// resources within CloudAPI
func NewClient(config *triton.ClientConfig) (*StorageClient, error) {
// TODO: Utilize config interface within the function itself
client, err := client.New(config.TritonURL, config.MantaURL, config.AccountName, config.Signers...)
if err != nil {
return nil, err
}
return newStorageClient(client), nil
}
// Dir returns a DirectoryClient used for accessing functions pertaining to
// Directories functionality of the Manta API.
func (c *StorageClient) Dir() *DirectoryClient {
return &DirectoryClient{c.Client}
}
// Jobs returns a JobClient used for accessing functions pertaining to Jobs
// functionality of the Triton Object Storage API.
func (c *StorageClient) Jobs() *JobClient {
return &JobClient{c.Client}
}
// Objects returns an ObjectsClient used for accessing functions pertaining to
// Objects functionality of the Triton Object Storage API.
func (c *StorageClient) Objects() *ObjectsClient {
return &ObjectsClient{c.Client}
}
// SnapLinks returns an SnapLinksClient used for accessing functions pertaining to
// SnapLinks functionality of the Triton Object Storage API.
func (c *StorageClient) SnapLinks() *SnapLinksClient {
return &SnapLinksClient{c.Client}
}

199
vendor/github.com/joyent/triton-go/storage/directory.go generated vendored Normal file
View File

@@ -0,0 +1,199 @@
package storage
import (
"bufio"
"context"
"encoding/json"
"net/http"
"net/url"
"path"
"strconv"
"time"
"github.com/hashicorp/errwrap"
"github.com/joyent/triton-go/client"
)
type DirectoryClient struct {
client *client.Client
}
// DirectoryEntry represents an object or directory in Manta.
type DirectoryEntry struct {
ETag string `json:"etag"`
ModifiedTime time.Time `json:"mtime"`
Name string `json:"name"`
Size uint64 `json:"size"`
Type string `json:"type"`
}
// ListDirectoryInput represents parameters to a List operation.
type ListDirectoryInput struct {
DirectoryName string
Limit uint64
Marker string
}
// ListDirectoryOutput contains the outputs of a List operation.
type ListDirectoryOutput struct {
Entries []*DirectoryEntry
ResultSetSize uint64
}
// List lists the contents of a directory on the Triton Object Store service.
func (s *DirectoryClient) List(ctx context.Context, input *ListDirectoryInput) (*ListDirectoryOutput, error) {
absPath := absFileInput(s.client.AccountName, input.DirectoryName)
query := &url.Values{}
if input.Limit != 0 {
query.Set("limit", strconv.FormatUint(input.Limit, 10))
}
if input.Marker != "" {
query.Set("manta_path", input.Marker)
}
reqInput := client.RequestInput{
Method: http.MethodGet,
Path: string(absPath),
Query: query,
}
respBody, respHeader, err := s.client.ExecuteRequestStorage(ctx, reqInput)
if err != nil {
return nil, errwrap.Wrapf("Error executing List request: {{err}}", err)
}
defer respBody.Close()
var results []*DirectoryEntry
scanner := bufio.NewScanner(respBody)
for scanner.Scan() {
current := &DirectoryEntry{}
if err := json.Unmarshal(scanner.Bytes(), current); err != nil {
return nil, errwrap.Wrapf("error decoding list response: {{err}}", err)
}
results = append(results, current)
}
if err := scanner.Err(); err != nil {
return nil, errwrap.Wrapf("error decoding list responses: {{err}}", err)
}
output := &ListDirectoryOutput{
Entries: results,
}
resultSetSize, err := strconv.ParseUint(respHeader.Get("Result-Set-Size"), 10, 64)
if err == nil {
output.ResultSetSize = resultSetSize
}
return output, nil
}
// PutDirectoryInput represents parameters to a Put operation.
type PutDirectoryInput struct {
DirectoryName string
}
// Put puts a directoy into the Triton Object Storage service is an idempotent
// create-or-update operation. Your private namespace starts at /:login, and you
// can create any nested set of directories or objects within it.
func (s *DirectoryClient) Put(ctx context.Context, input *PutDirectoryInput) error {
absPath := absFileInput(s.client.AccountName, input.DirectoryName)
headers := &http.Header{}
headers.Set("Content-Type", "application/json; type=directory")
reqInput := client.RequestInput{
Method: http.MethodPut,
Path: string(absPath),
Headers: headers,
}
respBody, _, err := s.client.ExecuteRequestStorage(ctx, reqInput)
if respBody != nil {
defer respBody.Close()
}
if err != nil {
return errwrap.Wrapf("Error executing Put request: {{err}}", err)
}
return nil
}
// DeleteDirectoryInput represents parameters to a Delete operation.
type DeleteDirectoryInput struct {
DirectoryName string
ForceDelete bool //Will recursively delete all child directories and objects
}
// Delete deletes a directory on the Triton Object Storage. The directory must
// be empty.
func (s *DirectoryClient) Delete(ctx context.Context, input *DeleteDirectoryInput) error {
absPath := absFileInput(s.client.AccountName, input.DirectoryName)
if input.ForceDelete {
err := deleteAll(*s, ctx, absPath)
if err != nil {
return err
}
} else {
err := deleteDirectory(*s, ctx, absPath)
if err != nil {
return err
}
}
return nil
}
func deleteAll(c DirectoryClient, ctx context.Context, directoryPath _AbsCleanPath) error {
objs, err := c.List(ctx, &ListDirectoryInput{
DirectoryName: string(directoryPath),
})
if err != nil {
return err
}
for _, obj := range objs.Entries {
newPath := absFileInput(c.client.AccountName, path.Join(string(directoryPath), obj.Name))
if obj.Type == "directory" {
err := deleteDirectory(c, ctx, newPath)
if err != nil {
return deleteAll(c, ctx, newPath)
}
} else {
return deleteObject(c, ctx, newPath)
}
}
return nil
}
func deleteDirectory(c DirectoryClient, ctx context.Context, directoryPath _AbsCleanPath) error {
reqInput := client.RequestInput{
Method: http.MethodDelete,
Path: string(directoryPath),
}
respBody, _, err := c.client.ExecuteRequestStorage(ctx, reqInput)
if respBody != nil {
defer respBody.Close()
}
if err != nil {
return errwrap.Wrapf("Error executing DeleteDirectory request: {{err}}", err)
}
return nil
}
func deleteObject(c DirectoryClient, ctx context.Context, path _AbsCleanPath) error {
objClient := &ObjectsClient{
client: c.client,
}
err := objClient.Delete(ctx, &DeleteObjectInput{
ObjectPath: string(path),
})
if err != nil {
return err
}
return nil
}

440
vendor/github.com/joyent/triton-go/storage/job.go generated vendored Normal file
View File

@@ -0,0 +1,440 @@
package storage
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"github.com/hashicorp/errwrap"
"github.com/joyent/triton-go/client"
)
type JobClient struct {
client *client.Client
}
const (
JobStateDone = "done"
JobStateRunning = "running"
)
// JobPhase represents the specification for a map or reduce phase of a Manta
// job.
type JobPhase struct {
// Type is the type of phase. Must be `map` or `reduce`.
Type string `json:"type,omitempty"`
// Assets is an array of objects to be placed in your compute zones.
Assets []string `json:"assets,omitempty"`
// Exec is the shell statement to execute. It may be any valid shell
// command, including pipelines and other shell syntax. You can also
// execute programs stored in the service by including them in "assets"
// and referencing them as /assets/$manta_path.
Exec string `json:"exec"`
// Init is a shell statement to execute in each compute zone before
// any tasks are executed. The same constraints apply as to Exec.
Init string `json:"init"`
// ReducerCount is an optional number of reducers for this phase. The
// default value if not specified is 1. The maximum value is 1024.
ReducerCount uint `json:"count,omitempty"`
// Memory is the amount of DRAM in MB to be allocated to the compute
// zone. Valid values are 256, 512, 1024, 2048, 4096 or 8192.
Memory uint64 `json:"memory,omitempty"`
// Disk is the amount of disk space in GB to be allocated to the compute
// zone. Valid values are 2, 4, 8, 16, 32, 64, 128, 256, 512 or 1024.
Disk uint64 `json:"disk,omitempty"`
}
// JobSummary represents the summary of a compute job in Manta.
type JobSummary struct {
ModifiedTime time.Time `json:"mtime"`
ID string `json:"name"`
}
// Job represents a compute job in Manta.
type Job struct {
ID string `json:"id"`
Name string `json:"name"`
Phases []*JobPhase `json:"phases"`
State string `json:"state"`
Cancelled bool `json:"cancelled"`
InputDone bool `json:"inputDone"`
CreatedTime time.Time `json:"timeCreated"`
DoneTime time.Time `json:"timeDone"`
Transient bool `json:"transient"`
Stats *JobStats `json:"stats"`
}
// JobStats represents statistics for a compute job in Manta.
type JobStats struct {
Errors uint64 `json:"errors"`
Outputs uint64 `json:"outputs"`
Retries uint64 `json:"retries"`
Tasks uint64 `json:"tasks"`
TasksDone uint64 `json:"tasksDone"`
}
// CreateJobInput represents parameters to a CreateJob operation.
type CreateJobInput struct {
Name string `json:"name"`
Phases []*JobPhase `json:"phases"`
}
// CreateJobOutput contains the outputs of a CreateJob operation.
type CreateJobOutput struct {
JobID string
}
// CreateJob submits a new job to be executed. This call is not
// idempotent, so calling it twice will create two jobs.
func (s *JobClient) Create(ctx context.Context, input *CreateJobInput) (*CreateJobOutput, error) {
path := fmt.Sprintf("/%s/jobs", s.client.AccountName)
reqInput := client.RequestInput{
Method: http.MethodPost,
Path: path,
Body: input,
}
respBody, respHeaders, err := s.client.ExecuteRequestStorage(ctx, reqInput)
if respBody != nil {
defer respBody.Close()
}
if err != nil {
return nil, errwrap.Wrapf("Error executing CreateJob request: {{err}}", err)
}
jobURI := respHeaders.Get("Location")
parts := strings.Split(jobURI, "/")
jobID := parts[len(parts)-1]
response := &CreateJobOutput{
JobID: jobID,
}
return response, nil
}
// AddJobInputs represents parameters to a AddJobInputs operation.
type AddJobInputsInput struct {
JobID string
ObjectPaths []string
}
// AddJobInputs submits inputs to an already created job.
func (s *JobClient) AddInputs(ctx context.Context, input *AddJobInputsInput) error {
path := fmt.Sprintf("/%s/jobs/%s/live/in", s.client.AccountName, input.JobID)
headers := &http.Header{}
headers.Set("Content-Type", "text/plain")
reader := strings.NewReader(strings.Join(input.ObjectPaths, "\n"))
reqInput := client.RequestNoEncodeInput{
Method: http.MethodPost,
Path: path,
Headers: headers,
Body: reader,
}
respBody, _, err := s.client.ExecuteRequestNoEncode(ctx, reqInput)
if respBody != nil {
defer respBody.Close()
}
if err != nil {
return errwrap.Wrapf("Error executing AddJobInputs request: {{err}}", err)
}
return nil
}
// EndJobInputInput represents parameters to a EndJobInput operation.
type EndJobInputInput struct {
JobID string
}
// EndJobInput submits inputs to an already created job.
func (s *JobClient) EndInput(ctx context.Context, input *EndJobInputInput) error {
path := fmt.Sprintf("/%s/jobs/%s/live/in/end", s.client.AccountName, input.JobID)
reqInput := client.RequestNoEncodeInput{
Method: http.MethodPost,
Path: path,
}
respBody, _, err := s.client.ExecuteRequestNoEncode(ctx, reqInput)
if respBody != nil {
defer respBody.Close()
}
if err != nil {
return errwrap.Wrapf("Error executing EndJobInput request: {{err}}", err)
}
return nil
}
// CancelJobInput represents parameters to a CancelJob operation.
type CancelJobInput struct {
JobID string
}
// CancelJob cancels a job from doing any further work. Cancellation
// is asynchronous and "best effort"; there is no guarantee the job
// will actually stop. For example, short jobs where input is already
// closed will likely still run to completion.
//
// This is however useful when:
// - input is still open
// - you have a long-running job
func (s *JobClient) Cancel(ctx context.Context, input *CancelJobInput) error {
path := fmt.Sprintf("/%s/jobs/%s/live/cancel", s.client.AccountName, input.JobID)
reqInput := client.RequestNoEncodeInput{
Method: http.MethodPost,
Path: path,
}
respBody, _, err := s.client.ExecuteRequestNoEncode(ctx, reqInput)
if respBody != nil {
defer respBody.Close()
}
if err != nil {
return errwrap.Wrapf("Error executing CancelJob request: {{err}}", err)
}
return nil
}
// ListJobsInput represents parameters to a ListJobs operation.
type ListJobsInput struct {
RunningOnly bool
Limit uint64
Marker string
}
// ListJobsOutput contains the outputs of a ListJobs operation.
type ListJobsOutput struct {
Jobs []*JobSummary
ResultSetSize uint64
}
// ListJobs returns the list of jobs you currently have.
func (s *JobClient) List(ctx context.Context, input *ListJobsInput) (*ListJobsOutput, error) {
path := fmt.Sprintf("/%s/jobs", s.client.AccountName)
query := &url.Values{}
if input.RunningOnly {
query.Set("state", "running")
}
if input.Limit != 0 {
query.Set("limit", strconv.FormatUint(input.Limit, 10))
}
if input.Marker != "" {
query.Set("manta_path", input.Marker)
}
reqInput := client.RequestInput{
Method: http.MethodGet,
Path: path,
Query: query,
}
respBody, respHeader, err := s.client.ExecuteRequestStorage(ctx, reqInput)
if respBody != nil {
defer respBody.Close()
}
if err != nil {
return nil, errwrap.Wrapf("Error executing ListJobs request: {{err}}", err)
}
var results []*JobSummary
for {
current := &JobSummary{}
decoder := json.NewDecoder(respBody)
if err = decoder.Decode(&current); err != nil {
if err == io.EOF {
break
}
return nil, errwrap.Wrapf("Error decoding ListJobs response: {{err}}", err)
}
results = append(results, current)
}
output := &ListJobsOutput{
Jobs: results,
}
resultSetSize, err := strconv.ParseUint(respHeader.Get("Result-Set-Size"), 10, 64)
if err == nil {
output.ResultSetSize = resultSetSize
}
return output, nil
}
// GetJobInput represents parameters to a GetJob operation.
type GetJobInput struct {
JobID string
}
// GetJobOutput contains the outputs of a GetJob operation.
type GetJobOutput struct {
Job *Job
}
// GetJob returns the list of jobs you currently have.
func (s *JobClient) Get(ctx context.Context, input *GetJobInput) (*GetJobOutput, error) {
path := fmt.Sprintf("/%s/jobs/%s/live/status", s.client.AccountName, input.JobID)
reqInput := client.RequestInput{
Method: http.MethodGet,
Path: path,
}
respBody, _, err := s.client.ExecuteRequestStorage(ctx, reqInput)
if respBody != nil {
defer respBody.Close()
}
if err != nil {
return nil, errwrap.Wrapf("Error executing GetJob request: {{err}}", err)
}
job := &Job{}
decoder := json.NewDecoder(respBody)
if err = decoder.Decode(&job); err != nil {
return nil, errwrap.Wrapf("Error decoding GetJob response: {{err}}", err)
}
return &GetJobOutput{
Job: job,
}, nil
}
// GetJobOutputInput represents parameters to a GetJobOutput operation.
type GetJobOutputInput struct {
JobID string
}
// GetJobOutputOutput contains the outputs for a GetJobOutput operation. It is your
// responsibility to ensure that the io.ReadCloser Items is closed.
type GetJobOutputOutput struct {
ResultSetSize uint64
Items io.ReadCloser
}
// GetJobOutput returns the current "live" set of outputs from a job. Think of
// this like `tail -f`. If error is nil (i.e. the operation is successful), it is
// your responsibility to close the io.ReadCloser named Items in the output.
func (s *JobClient) GetOutput(ctx context.Context, input *GetJobOutputInput) (*GetJobOutputOutput, error) {
path := fmt.Sprintf("/%s/jobs/%s/live/out", s.client.AccountName, input.JobID)
reqInput := client.RequestInput{
Method: http.MethodGet,
Path: path,
}
respBody, respHeader, err := s.client.ExecuteRequestStorage(ctx, reqInput)
if respBody != nil {
defer respBody.Close()
}
if err != nil {
return nil, errwrap.Wrapf("Error executing GetJobOutput request: {{err}}", err)
}
output := &GetJobOutputOutput{
Items: respBody,
}
resultSetSize, err := strconv.ParseUint(respHeader.Get("Result-Set-Size"), 10, 64)
if err == nil {
output.ResultSetSize = resultSetSize
}
return output, nil
}
// GetJobInputInput represents parameters to a GetJobOutput operation.
type GetJobInputInput struct {
JobID string
}
// GetJobInputOutput contains the outputs for a GetJobOutput operation. It is your
// responsibility to ensure that the io.ReadCloser Items is closed.
type GetJobInputOutput struct {
ResultSetSize uint64
Items io.ReadCloser
}
// GetJobInput returns the current "live" set of inputs from a job. Think of
// this like `tail -f`. If error is nil (i.e. the operation is successful), it is
// your responsibility to close the io.ReadCloser named Items in the output.
func (s *JobClient) GetInput(ctx context.Context, input *GetJobInputInput) (*GetJobInputOutput, error) {
path := fmt.Sprintf("/%s/jobs/%s/live/in", s.client.AccountName, input.JobID)
reqInput := client.RequestInput{
Method: http.MethodGet,
Path: path,
}
respBody, respHeader, err := s.client.ExecuteRequestStorage(ctx, reqInput)
if respBody != nil {
defer respBody.Close()
}
if err != nil {
return nil, errwrap.Wrapf("Error executing GetJobInput request: {{err}}", err)
}
output := &GetJobInputOutput{
Items: respBody,
}
resultSetSize, err := strconv.ParseUint(respHeader.Get("Result-Set-Size"), 10, 64)
if err == nil {
output.ResultSetSize = resultSetSize
}
return output, nil
}
// GetJobFailuresInput represents parameters to a GetJobFailures operation.
type GetJobFailuresInput struct {
JobID string
}
// GetJobFailuresOutput contains the outputs for a GetJobFailures operation. It is your
// responsibility to ensure that the io.ReadCloser Items is closed.
type GetJobFailuresOutput struct {
ResultSetSize uint64
Items io.ReadCloser
}
// GetJobFailures returns the current "live" set of outputs from a job. Think of
// this like `tail -f`. If error is nil (i.e. the operation is successful), it is
// your responsibility to close the io.ReadCloser named Items in the output.
func (s *JobClient) GetFailures(ctx context.Context, input *GetJobFailuresInput) (*GetJobFailuresOutput, error) {
path := fmt.Sprintf("/%s/jobs/%s/live/fail", s.client.AccountName, input.JobID)
reqInput := client.RequestInput{
Method: http.MethodGet,
Path: path,
}
respBody, respHeader, err := s.client.ExecuteRequestStorage(ctx, reqInput)
if respBody != nil {
defer respBody.Close()
}
if err != nil {
return nil, errwrap.Wrapf("Error executing GetJobFailures request: {{err}}", err)
}
output := &GetJobFailuresOutput{
Items: respBody,
}
resultSetSize, err := strconv.ParseUint(respHeader.Get("Result-Set-Size"), 10, 64)
if err == nil {
output.ResultSetSize = resultSetSize
}
return output, nil
}

388
vendor/github.com/joyent/triton-go/storage/objects.go generated vendored Normal file
View File

@@ -0,0 +1,388 @@
package storage
import (
"context"
"errors"
"io"
"net/http"
"net/url"
"path"
"strconv"
"strings"
"time"
"github.com/hashicorp/errwrap"
"github.com/joyent/triton-go/client"
)
type ObjectsClient struct {
client *client.Client
}
// GetObjectInput represents parameters to a GetObject operation.
type GetInfoInput struct {
ObjectPath string
Headers map[string]string
}
// GetObjectOutput contains the outputs for a GetObject operation. It is your
// responsibility to ensure that the io.ReadCloser ObjectReader is closed.
type GetInfoOutput struct {
ContentLength uint64
ContentType string
LastModified time.Time
ContentMD5 string
ETag string
Metadata map[string]string
}
// GetInfo sends a HEAD request to an object in the Manta service. This function
// does not return a response body.
func (s *ObjectsClient) GetInfo(ctx context.Context, input *GetInfoInput) (*GetInfoOutput, error) {
absPath := absFileInput(s.client.AccountName, input.ObjectPath)
headers := &http.Header{}
for key, value := range input.Headers {
headers.Set(key, value)
}
reqInput := client.RequestInput{
Method: http.MethodHead,
Path: string(absPath),
Headers: headers,
}
_, respHeaders, err := s.client.ExecuteRequestStorage(ctx, reqInput)
if err != nil {
return nil, errwrap.Wrapf("Error executing get info request: {{err}}", err)
}
response := &GetInfoOutput{
ContentType: respHeaders.Get("Content-Type"),
ContentMD5: respHeaders.Get("Content-MD5"),
ETag: respHeaders.Get("Etag"),
}
lastModified, err := time.Parse(time.RFC1123, respHeaders.Get("Last-Modified"))
if err == nil {
response.LastModified = lastModified
}
contentLength, err := strconv.ParseUint(respHeaders.Get("Content-Length"), 10, 64)
if err == nil {
response.ContentLength = contentLength
}
metadata := map[string]string{}
for key, values := range respHeaders {
if strings.HasPrefix(key, "m-") {
metadata[key] = strings.Join(values, ", ")
}
}
response.Metadata = metadata
return response, nil
}
// IsDir is a convenience wrapper around the GetInfo function which takes an
// ObjectPath and returns a boolean whether or not the object is a directory
// type in Manta. Returns an error if GetInfo failed upstream for some reason.
func (s *ObjectsClient) IsDir(ctx context.Context, objectPath string) (bool, error) {
info, err := s.GetInfo(ctx, &GetInfoInput{
ObjectPath: objectPath,
})
if err != nil {
return false, err
}
if info != nil {
return strings.HasSuffix(info.ContentType, "type=directory"), nil
}
return false, nil
}
// GetObjectInput represents parameters to a GetObject operation.
type GetObjectInput struct {
ObjectPath string
Headers map[string]string
}
// GetObjectOutput contains the outputs for a GetObject operation. It is your
// responsibility to ensure that the io.ReadCloser ObjectReader is closed.
type GetObjectOutput struct {
ContentLength uint64
ContentType string
LastModified time.Time
ContentMD5 string
ETag string
Metadata map[string]string
ObjectReader io.ReadCloser
}
// Get retrieves an object from the Manta service. If error is nil (i.e. the
// call returns successfully), it is your responsibility to close the
// io.ReadCloser named ObjectReader in the operation output.
func (s *ObjectsClient) Get(ctx context.Context, input *GetObjectInput) (*GetObjectOutput, error) {
absPath := absFileInput(s.client.AccountName, input.ObjectPath)
headers := &http.Header{}
for key, value := range input.Headers {
headers.Set(key, value)
}
reqInput := client.RequestInput{
Method: http.MethodGet,
Path: string(absPath),
Headers: headers,
}
respBody, respHeaders, err := s.client.ExecuteRequestStorage(ctx, reqInput)
if err != nil {
return nil, errwrap.Wrapf("Error executing Get request: {{err}}", err)
}
response := &GetObjectOutput{
ContentType: respHeaders.Get("Content-Type"),
ContentMD5: respHeaders.Get("Content-MD5"),
ETag: respHeaders.Get("Etag"),
ObjectReader: respBody,
}
lastModified, err := time.Parse(time.RFC1123, respHeaders.Get("Last-Modified"))
if err == nil {
response.LastModified = lastModified
}
contentLength, err := strconv.ParseUint(respHeaders.Get("Content-Length"), 10, 64)
if err == nil {
response.ContentLength = contentLength
}
metadata := map[string]string{}
for key, values := range respHeaders {
if strings.HasPrefix(key, "m-") {
metadata[key] = strings.Join(values, ", ")
}
}
response.Metadata = metadata
return response, nil
}
// DeleteObjectInput represents parameters to a DeleteObject operation.
type DeleteObjectInput struct {
ObjectPath string
Headers map[string]string
}
// DeleteObject deletes an object.
func (s *ObjectsClient) Delete(ctx context.Context, input *DeleteObjectInput) error {
absPath := absFileInput(s.client.AccountName, input.ObjectPath)
headers := &http.Header{}
for key, value := range input.Headers {
headers.Set(key, value)
}
reqInput := client.RequestInput{
Method: http.MethodDelete,
Path: string(absPath),
Headers: headers,
}
respBody, _, err := s.client.ExecuteRequestStorage(ctx, reqInput)
if respBody != nil {
defer respBody.Close()
}
if err != nil {
return errwrap.Wrapf("Error executing Delete request: {{err}}", err)
}
return nil
}
// PutObjectMetadataInput represents parameters to a PutObjectMetadata operation.
type PutObjectMetadataInput struct {
ObjectPath string
ContentType string
Metadata map[string]string
}
// PutObjectMetadata allows you to overwrite the HTTP headers for an already
// existing object, without changing the data. Note this is an idempotent "replace"
// operation, so you must specify the complete set of HTTP headers you want
// stored on each request.
//
// You cannot change "critical" headers:
// - Content-Length
// - Content-MD5
// - Durability-Level
func (s *ObjectsClient) PutMetadata(ctx context.Context, input *PutObjectMetadataInput) error {
absPath := absFileInput(s.client.AccountName, input.ObjectPath)
query := &url.Values{}
query.Set("metadata", "true")
headers := &http.Header{}
headers.Set("Content-Type", input.ContentType)
for key, value := range input.Metadata {
headers.Set(key, value)
}
reqInput := client.RequestInput{
Method: http.MethodPut,
Path: string(absPath),
Query: query,
Headers: headers,
}
respBody, _, err := s.client.ExecuteRequestStorage(ctx, reqInput)
if respBody != nil {
defer respBody.Close()
}
if err != nil {
return errwrap.Wrapf("Error executing PutMetadata request: {{err}}", err)
}
return nil
}
// PutObjectInput represents parameters to a PutObject operation.
type PutObjectInput struct {
ObjectPath string
DurabilityLevel uint64
ContentType string
ContentMD5 string
IfMatch string
IfModifiedSince *time.Time
ContentLength uint64
MaxContentLength uint64
ObjectReader io.Reader
Headers map[string]string
ForceInsert bool //Force the creation of the directory tree
}
func (s *ObjectsClient) Put(ctx context.Context, input *PutObjectInput) error {
absPath := absFileInput(s.client.AccountName, input.ObjectPath)
if input.ForceInsert {
// IsDir() uses a path relative to the account
absDirName := _AbsCleanPath(path.Dir(string(absPath)))
exists, err := checkDirectoryTreeExists(*s, ctx, absDirName)
if err != nil {
return err
}
if !exists {
err := createDirectory(*s, ctx, absDirName)
if err != nil {
return err
}
return putObject(*s, ctx, input, absPath)
}
}
return putObject(*s, ctx, input, absPath)
}
// _AbsCleanPath is an internal type that means the input has been
// path.Clean()'ed and is an absolute path.
type _AbsCleanPath string
func absFileInput(accountName, objPath string) _AbsCleanPath {
cleanInput := path.Clean(objPath)
if strings.HasPrefix(cleanInput, path.Join("/", accountName, "/")) {
return _AbsCleanPath(cleanInput)
}
cleanAbs := path.Clean(path.Join("/", accountName, objPath))
return _AbsCleanPath(cleanAbs)
}
func putObject(c ObjectsClient, ctx context.Context, input *PutObjectInput, absPath _AbsCleanPath) error {
if input.MaxContentLength != 0 && input.ContentLength != 0 {
return errors.New("ContentLength and MaxContentLength may not both be set to non-zero values.")
}
headers := &http.Header{}
for key, value := range input.Headers {
headers.Set(key, value)
}
if input.DurabilityLevel != 0 {
headers.Set("Durability-Level", strconv.FormatUint(input.DurabilityLevel, 10))
}
if input.ContentType != "" {
headers.Set("Content-Type", input.ContentType)
}
if input.ContentMD5 != "" {
headers.Set("Content-MD$", input.ContentMD5)
}
if input.IfMatch != "" {
headers.Set("If-Match", input.IfMatch)
}
if input.IfModifiedSince != nil {
headers.Set("If-Modified-Since", input.IfModifiedSince.Format(time.RFC1123))
}
if input.ContentLength != 0 {
headers.Set("Content-Length", strconv.FormatUint(input.ContentLength, 10))
}
if input.MaxContentLength != 0 {
headers.Set("Max-Content-Length", strconv.FormatUint(input.MaxContentLength, 10))
}
reqInput := client.RequestNoEncodeInput{
Method: http.MethodPut,
Path: string(absPath),
Headers: headers,
Body: input.ObjectReader,
}
respBody, _, err := c.client.ExecuteRequestNoEncode(ctx, reqInput)
if respBody != nil {
defer respBody.Close()
}
if err != nil {
return errwrap.Wrapf("Error executing Put request: {{err}}", err)
}
return nil
}
func createDirectory(c ObjectsClient, ctx context.Context, absPath _AbsCleanPath) error {
dirClient := &DirectoryClient{
client: c.client,
}
// An abspath starts w/ a leading "/" which gets added to the slice as an
// empty string. Start all array math at 1.
parts := strings.Split(string(absPath), "/")
if len(parts) < 2 {
return errors.New("no path components to create directory")
}
folderPath := parts[1]
// Don't attempt to create a manta account as a directory
for i := 2; i < len(parts); i++ {
part := parts[i]
folderPath = path.Clean(path.Join("/", folderPath, part))
err := dirClient.Put(ctx, &PutDirectoryInput{
DirectoryName: folderPath,
})
if err != nil {
return err
}
}
return nil
}
func checkDirectoryTreeExists(c ObjectsClient, ctx context.Context, absPath _AbsCleanPath) (bool, error) {
exists, err := c.IsDir(ctx, string(absPath))
if err != nil {
errType := &client.MantaError{}
if errwrap.ContainsType(err, errType) {
mantaErr := errwrap.GetType(err, errType).(*client.MantaError)
if mantaErr.StatusCode == http.StatusNotFound {
return false, nil
}
}
return false, err
}
if exists {
return true, nil
}
return false, nil
}

81
vendor/github.com/joyent/triton-go/storage/signing.go generated vendored Normal file
View File

@@ -0,0 +1,81 @@
package storage
import (
"bytes"
"fmt"
"net/url"
"strconv"
"strings"
"time"
"github.com/hashicorp/errwrap"
)
// SignURLInput represents parameters to a SignURL operation.
type SignURLInput struct {
ValidityPeriod time.Duration
Method string
ObjectPath string
}
// SignURLOutput contains the outputs of a SignURL operation. To simply
// access the signed URL, use the SignedURL method.
type SignURLOutput struct {
host string
objectPath string
Method string
Algorithm string
Signature string
Expires string
KeyID string
}
// SignedURL returns a signed URL for the given scheme. Valid schemes are
// `http` and `https`.
func (output *SignURLOutput) SignedURL(scheme string) string {
query := &url.Values{}
query.Set("algorithm", output.Algorithm)
query.Set("expires", output.Expires)
query.Set("keyId", output.KeyID)
query.Set("signature", output.Signature)
sUrl := url.URL{}
sUrl.Scheme = scheme
sUrl.Host = output.host
sUrl.Path = output.objectPath
sUrl.RawQuery = query.Encode()
return sUrl.String()
}
// SignURL creates a time-expiring URL that can be shared with others.
// This is useful to generate HTML links, for example.
func (s *StorageClient) SignURL(input *SignURLInput) (*SignURLOutput, error) {
output := &SignURLOutput{
host: s.Client.MantaURL.Host,
objectPath: fmt.Sprintf("/%s%s", s.Client.AccountName, input.ObjectPath),
Method: input.Method,
Algorithm: strings.ToUpper(s.Client.Authorizers[0].DefaultAlgorithm()),
Expires: strconv.FormatInt(time.Now().Add(input.ValidityPeriod).Unix(), 10),
KeyID: fmt.Sprintf("/%s/keys/%s", s.Client.AccountName, s.Client.Authorizers[0].KeyFingerprint()),
}
toSign := bytes.Buffer{}
toSign.WriteString(input.Method + "\n")
toSign.WriteString(s.Client.MantaURL.Host + "\n")
toSign.WriteString(fmt.Sprintf("/%s%s\n", s.Client.AccountName, input.ObjectPath))
query := &url.Values{}
query.Set("algorithm", output.Algorithm)
query.Set("expires", output.Expires)
query.Set("keyId", output.KeyID)
toSign.WriteString(query.Encode())
signature, _, err := s.Client.Authorizers[0].SignRaw(toSign.String())
if err != nil {
return nil, errwrap.Wrapf("Error signing string: {{err}}", err)
}
output.Signature = signature
return output, nil
}

46
vendor/github.com/joyent/triton-go/storage/snaplink.go generated vendored Normal file
View File

@@ -0,0 +1,46 @@
package storage
import (
"context"
"fmt"
"net/http"
"github.com/hashicorp/errwrap"
"github.com/joyent/triton-go/client"
)
type SnapLinksClient struct {
client *client.Client
}
// PutSnapLinkInput represents parameters to a PutSnapLink operation.
type PutSnapLinkInput struct {
LinkPath string
SourcePath string
}
// PutSnapLink creates a SnapLink to an object.
func (s *SnapLinksClient) Put(ctx context.Context, input *PutSnapLinkInput) error {
linkPath := fmt.Sprintf("/%s%s", s.client.AccountName, input.LinkPath)
sourcePath := fmt.Sprintf("/%s%s", s.client.AccountName, input.SourcePath)
headers := &http.Header{}
headers.Set("Content-Type", "application/json; type=link")
headers.Set("location", sourcePath)
headers.Set("Accept", "~1.0")
headers.Set("Accept-Version", "application/json, */*")
reqInput := client.RequestInput{
Method: http.MethodPut,
Path: linkPath,
Headers: headers,
}
respBody, _, err := s.client.ExecuteRequestStorage(ctx, reqInput)
if respBody != nil {
defer respBody.Close()
}
if err != nil {
return errwrap.Wrapf("Error executing PutSnapLink request: {{err}}", err)
}
return nil
}