package storage import ( "context" "encoding/json" "fmt" "io" "net/http" "net/url" "strconv" "strings" "time" "github.com/hashicorp/errwrap" "github.com/joyent/triton-go/client" ) type JobClient struct { client *client.Client } const ( JobStateDone = "done" JobStateRunning = "running" ) // JobPhase represents the specification for a map or reduce phase of a Manta // job. type JobPhase struct { // Type is the type of phase. Must be `map` or `reduce`. Type string `json:"type,omitempty"` // Assets is an array of objects to be placed in your compute zones. Assets []string `json:"assets,omitempty"` // Exec is the shell statement to execute. It may be any valid shell // command, including pipelines and other shell syntax. You can also // execute programs stored in the service by including them in "assets" // and referencing them as /assets/$manta_path. Exec string `json:"exec"` // Init is a shell statement to execute in each compute zone before // any tasks are executed. The same constraints apply as to Exec. Init string `json:"init"` // ReducerCount is an optional number of reducers for this phase. The // default value if not specified is 1. The maximum value is 1024. ReducerCount uint `json:"count,omitempty"` // Memory is the amount of DRAM in MB to be allocated to the compute // zone. Valid values are 256, 512, 1024, 2048, 4096 or 8192. Memory uint64 `json:"memory,omitempty"` // Disk is the amount of disk space in GB to be allocated to the compute // zone. Valid values are 2, 4, 8, 16, 32, 64, 128, 256, 512 or 1024. Disk uint64 `json:"disk,omitempty"` } // JobSummary represents the summary of a compute job in Manta. type JobSummary struct { ModifiedTime time.Time `json:"mtime"` ID string `json:"name"` } // Job represents a compute job in Manta. type Job struct { ID string `json:"id"` Name string `json:"name"` Phases []*JobPhase `json:"phases"` State string `json:"state"` Cancelled bool `json:"cancelled"` InputDone bool `json:"inputDone"` CreatedTime time.Time `json:"timeCreated"` DoneTime time.Time `json:"timeDone"` Transient bool `json:"transient"` Stats *JobStats `json:"stats"` } // JobStats represents statistics for a compute job in Manta. type JobStats struct { Errors uint64 `json:"errors"` Outputs uint64 `json:"outputs"` Retries uint64 `json:"retries"` Tasks uint64 `json:"tasks"` TasksDone uint64 `json:"tasksDone"` } // CreateJobInput represents parameters to a CreateJob operation. type CreateJobInput struct { Name string `json:"name"` Phases []*JobPhase `json:"phases"` } // CreateJobOutput contains the outputs of a CreateJob operation. type CreateJobOutput struct { JobID string } // CreateJob submits a new job to be executed. This call is not // idempotent, so calling it twice will create two jobs. func (s *JobClient) Create(ctx context.Context, input *CreateJobInput) (*CreateJobOutput, error) { path := fmt.Sprintf("/%s/jobs", s.client.AccountName) reqInput := client.RequestInput{ Method: http.MethodPost, Path: path, Body: input, } respBody, respHeaders, err := s.client.ExecuteRequestStorage(ctx, reqInput) if respBody != nil { defer respBody.Close() } if err != nil { return nil, errwrap.Wrapf("Error executing CreateJob request: {{err}}", err) } jobURI := respHeaders.Get("Location") parts := strings.Split(jobURI, "/") jobID := parts[len(parts)-1] response := &CreateJobOutput{ JobID: jobID, } return response, nil } // AddJobInputs represents parameters to a AddJobInputs operation. type AddJobInputsInput struct { JobID string ObjectPaths []string } // AddJobInputs submits inputs to an already created job. func (s *JobClient) AddInputs(ctx context.Context, input *AddJobInputsInput) error { path := fmt.Sprintf("/%s/jobs/%s/live/in", s.client.AccountName, input.JobID) headers := &http.Header{} headers.Set("Content-Type", "text/plain") reader := strings.NewReader(strings.Join(input.ObjectPaths, "\n")) reqInput := client.RequestNoEncodeInput{ Method: http.MethodPost, Path: path, Headers: headers, Body: reader, } respBody, _, err := s.client.ExecuteRequestNoEncode(ctx, reqInput) if respBody != nil { defer respBody.Close() } if err != nil { return errwrap.Wrapf("Error executing AddJobInputs request: {{err}}", err) } return nil } // EndJobInputInput represents parameters to a EndJobInput operation. type EndJobInputInput struct { JobID string } // EndJobInput submits inputs to an already created job. func (s *JobClient) EndInput(ctx context.Context, input *EndJobInputInput) error { path := fmt.Sprintf("/%s/jobs/%s/live/in/end", s.client.AccountName, input.JobID) reqInput := client.RequestNoEncodeInput{ Method: http.MethodPost, Path: path, } respBody, _, err := s.client.ExecuteRequestNoEncode(ctx, reqInput) if respBody != nil { defer respBody.Close() } if err != nil { return errwrap.Wrapf("Error executing EndJobInput request: {{err}}", err) } return nil } // CancelJobInput represents parameters to a CancelJob operation. type CancelJobInput struct { JobID string } // CancelJob cancels a job from doing any further work. Cancellation // is asynchronous and "best effort"; there is no guarantee the job // will actually stop. For example, short jobs where input is already // closed will likely still run to completion. // // This is however useful when: // - input is still open // - you have a long-running job func (s *JobClient) Cancel(ctx context.Context, input *CancelJobInput) error { path := fmt.Sprintf("/%s/jobs/%s/live/cancel", s.client.AccountName, input.JobID) reqInput := client.RequestNoEncodeInput{ Method: http.MethodPost, Path: path, } respBody, _, err := s.client.ExecuteRequestNoEncode(ctx, reqInput) if respBody != nil { defer respBody.Close() } if err != nil { return errwrap.Wrapf("Error executing CancelJob request: {{err}}", err) } return nil } // ListJobsInput represents parameters to a ListJobs operation. type ListJobsInput struct { RunningOnly bool Limit uint64 Marker string } // ListJobsOutput contains the outputs of a ListJobs operation. type ListJobsOutput struct { Jobs []*JobSummary ResultSetSize uint64 } // ListJobs returns the list of jobs you currently have. func (s *JobClient) List(ctx context.Context, input *ListJobsInput) (*ListJobsOutput, error) { path := fmt.Sprintf("/%s/jobs", s.client.AccountName) query := &url.Values{} if input.RunningOnly { query.Set("state", "running") } if input.Limit != 0 { query.Set("limit", strconv.FormatUint(input.Limit, 10)) } if input.Marker != "" { query.Set("manta_path", input.Marker) } reqInput := client.RequestInput{ Method: http.MethodGet, Path: path, Query: query, } respBody, respHeader, err := s.client.ExecuteRequestStorage(ctx, reqInput) if respBody != nil { defer respBody.Close() } if err != nil { return nil, errwrap.Wrapf("Error executing ListJobs request: {{err}}", err) } var results []*JobSummary for { current := &JobSummary{} decoder := json.NewDecoder(respBody) if err = decoder.Decode(¤t); err != nil { if err == io.EOF { break } return nil, errwrap.Wrapf("Error decoding ListJobs response: {{err}}", err) } results = append(results, current) } output := &ListJobsOutput{ Jobs: results, } resultSetSize, err := strconv.ParseUint(respHeader.Get("Result-Set-Size"), 10, 64) if err == nil { output.ResultSetSize = resultSetSize } return output, nil } // GetJobInput represents parameters to a GetJob operation. type GetJobInput struct { JobID string } // GetJobOutput contains the outputs of a GetJob operation. type GetJobOutput struct { Job *Job } // GetJob returns the list of jobs you currently have. func (s *JobClient) Get(ctx context.Context, input *GetJobInput) (*GetJobOutput, error) { path := fmt.Sprintf("/%s/jobs/%s/live/status", s.client.AccountName, input.JobID) reqInput := client.RequestInput{ Method: http.MethodGet, Path: path, } respBody, _, err := s.client.ExecuteRequestStorage(ctx, reqInput) if respBody != nil { defer respBody.Close() } if err != nil { return nil, errwrap.Wrapf("Error executing GetJob request: {{err}}", err) } job := &Job{} decoder := json.NewDecoder(respBody) if err = decoder.Decode(&job); err != nil { return nil, errwrap.Wrapf("Error decoding GetJob response: {{err}}", err) } return &GetJobOutput{ Job: job, }, nil } // GetJobOutputInput represents parameters to a GetJobOutput operation. type GetJobOutputInput struct { JobID string } // GetJobOutputOutput contains the outputs for a GetJobOutput operation. It is your // responsibility to ensure that the io.ReadCloser Items is closed. type GetJobOutputOutput struct { ResultSetSize uint64 Items io.ReadCloser } // GetJobOutput returns the current "live" set of outputs from a job. Think of // this like `tail -f`. If error is nil (i.e. the operation is successful), it is // your responsibility to close the io.ReadCloser named Items in the output. func (s *JobClient) GetOutput(ctx context.Context, input *GetJobOutputInput) (*GetJobOutputOutput, error) { path := fmt.Sprintf("/%s/jobs/%s/live/out", s.client.AccountName, input.JobID) reqInput := client.RequestInput{ Method: http.MethodGet, Path: path, } respBody, respHeader, err := s.client.ExecuteRequestStorage(ctx, reqInput) if respBody != nil { defer respBody.Close() } if err != nil { return nil, errwrap.Wrapf("Error executing GetJobOutput request: {{err}}", err) } output := &GetJobOutputOutput{ Items: respBody, } resultSetSize, err := strconv.ParseUint(respHeader.Get("Result-Set-Size"), 10, 64) if err == nil { output.ResultSetSize = resultSetSize } return output, nil } // GetJobInputInput represents parameters to a GetJobOutput operation. type GetJobInputInput struct { JobID string } // GetJobInputOutput contains the outputs for a GetJobOutput operation. It is your // responsibility to ensure that the io.ReadCloser Items is closed. type GetJobInputOutput struct { ResultSetSize uint64 Items io.ReadCloser } // GetJobInput returns the current "live" set of inputs from a job. Think of // this like `tail -f`. If error is nil (i.e. the operation is successful), it is // your responsibility to close the io.ReadCloser named Items in the output. func (s *JobClient) GetInput(ctx context.Context, input *GetJobInputInput) (*GetJobInputOutput, error) { path := fmt.Sprintf("/%s/jobs/%s/live/in", s.client.AccountName, input.JobID) reqInput := client.RequestInput{ Method: http.MethodGet, Path: path, } respBody, respHeader, err := s.client.ExecuteRequestStorage(ctx, reqInput) if respBody != nil { defer respBody.Close() } if err != nil { return nil, errwrap.Wrapf("Error executing GetJobInput request: {{err}}", err) } output := &GetJobInputOutput{ Items: respBody, } resultSetSize, err := strconv.ParseUint(respHeader.Get("Result-Set-Size"), 10, 64) if err == nil { output.ResultSetSize = resultSetSize } return output, nil } // GetJobFailuresInput represents parameters to a GetJobFailures operation. type GetJobFailuresInput struct { JobID string } // GetJobFailuresOutput contains the outputs for a GetJobFailures operation. It is your // responsibility to ensure that the io.ReadCloser Items is closed. type GetJobFailuresOutput struct { ResultSetSize uint64 Items io.ReadCloser } // GetJobFailures returns the current "live" set of outputs from a job. Think of // this like `tail -f`. If error is nil (i.e. the operation is successful), it is // your responsibility to close the io.ReadCloser named Items in the output. func (s *JobClient) GetFailures(ctx context.Context, input *GetJobFailuresInput) (*GetJobFailuresOutput, error) { path := fmt.Sprintf("/%s/jobs/%s/live/fail", s.client.AccountName, input.JobID) reqInput := client.RequestInput{ Method: http.MethodGet, Path: path, } respBody, respHeader, err := s.client.ExecuteRequestStorage(ctx, reqInput) if respBody != nil { defer respBody.Close() } if err != nil { return nil, errwrap.Wrapf("Error executing GetJobFailures request: {{err}}", err) } output := &GetJobFailuresOutput{ Items: respBody, } resultSetSize, err := strconv.ParseUint(respHeader.Get("Result-Set-Size"), 10, 64) if err == nil { output.ResultSetSize = resultSetSize } return output, nil }