feat: add lambda transformation functions target (#16507)

This commit is contained in:
Harshavardhana
2023-03-07 08:12:41 -08:00
committed by GitHub
parent ee54643004
commit 901887e6bf
29 changed files with 2130 additions and 70 deletions

View File

@@ -2429,7 +2429,7 @@ func assignPoolNumbers(servers []madmin.ServerProperties) {
func fetchLambdaInfo() []map[string][]madmin.TargetIDStatus {
lambdaMap := make(map[string][]madmin.TargetIDStatus)
for _, tgt := range globalConfigTargetList.Targets() {
for _, tgt := range globalNotifyTargetList.Targets() {
targetIDStatus := make(map[string]madmin.Status)
active, _ := tgt.IsActive()
targetID := tgt.ID()

View File

@@ -42,6 +42,7 @@ import (
objectlock "github.com/minio/minio/internal/bucket/object/lock"
"github.com/minio/minio/internal/bucket/versioning"
levent "github.com/minio/minio/internal/config/lambda/event"
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/hash"
"github.com/minio/pkg/bucket/policy"
@@ -411,6 +412,10 @@ const (
ErrInvalidChecksum
// Lambda functions
ErrLambdaARNInvalid
ErrLambdaARNNotFound
apiErrCodeEnd // This is used only for the testing code
)
@@ -1964,6 +1969,16 @@ var errorCodes = errorCodeMap{
Description: "Invalid checksum provided.",
HTTPStatusCode: http.StatusBadRequest,
},
ErrLambdaARNInvalid: {
Code: "LambdaARNInvalid",
Description: "The specified lambda ARN is invalid",
HTTPStatusCode: http.StatusBadRequest,
},
ErrLambdaARNNotFound: {
Code: "LambdaARNNotFound",
Description: "The specified lambda ARN does not exist",
HTTPStatusCode: http.StatusNotFound,
},
ErrPolicyAlreadyAttached: {
Code: "XMinioPolicyAlreadyAttached",
Description: "The specified policy is already attached.",
@@ -1987,15 +2002,15 @@ func toAPIErrorCode(ctx context.Context, err error) (apiErr APIErrorCode) {
// Only return ErrClientDisconnected if the provided context is actually canceled.
// This way downstream context.Canceled will still report ErrOperationTimedOut
if contextCanceled(ctx) {
if ctx.Err() == context.Canceled {
return ErrClientDisconnected
}
if contextCanceled(ctx) && errors.Is(ctx.Err(), context.Canceled) {
return ErrClientDisconnected
}
switch err {
case errInvalidArgument:
apiErr = ErrAdminInvalidArgument
case errNoSuchPolicy:
apiErr = ErrAdminNoSuchPolicy
case errNoSuchUser:
apiErr = ErrAdminNoSuchUser
case errNoSuchServiceAccount:
@@ -2024,6 +2039,10 @@ func toAPIErrorCode(ctx context.Context, err error) (apiErr APIErrorCode) {
apiErr = ErrAdminInvalidSecretKey
case errInvalidStorageClass:
apiErr = ErrInvalidStorageClass
case errErasureReadQuorum:
apiErr = ErrSlowDown
case errErasureWriteQuorum:
apiErr = ErrSlowDown
// SSE errors
case errInvalidEncryptionParameters:
apiErr = ErrInvalidEncryptionParameters
@@ -2071,10 +2090,6 @@ func toAPIErrorCode(ctx context.Context, err error) (apiErr APIErrorCode) {
apiErr = ErrObjectLockInvalidHeaders
case objectlock.ErrMalformedXML:
apiErr = ErrMalformedXML
default:
if errors.Is(err, errNoSuchPolicy) {
apiErr = ErrAdminNoSuchPolicy
}
}
// Compression errors
@@ -2089,7 +2104,7 @@ func toAPIErrorCode(ctx context.Context, err error) (apiErr APIErrorCode) {
// etcd specific errors, a key is always a bucket for us return
// ErrNoSuchBucket in such a case.
if err == dns.ErrNoEntriesFound {
if errors.Is(err, dns.ErrNoEntriesFound) {
return ErrNoSuchBucket
}
@@ -2214,6 +2229,10 @@ func toAPIErrorCode(ctx context.Context, err error) (apiErr APIErrorCode) {
apiErr = ErrARNNotification
case *event.ErrARNNotFound:
apiErr = ErrARNNotification
case *levent.ErrInvalidARN:
apiErr = ErrLambdaARNInvalid
case *levent.ErrARNNotFound:
apiErr = ErrLambdaARNNotFound
case *event.ErrUnknownRegion:
apiErr = ErrRegionNotification
case *event.ErrInvalidFilterName:
@@ -2277,33 +2296,17 @@ func toAPIError(ctx context.Context, err error) APIError {
}
apiErr := errorCodes.ToAPIErr(toAPIErrorCode(ctx, err))
e, ok := err.(dns.ErrInvalidBucketName)
if ok {
code := toAPIErrorCode(ctx, e)
apiErr = errorCodes.ToAPIErrWithErr(code, e)
}
if apiErr.Code == "NotImplemented" {
if e, ok := err.(NotImplemented); ok {
desc := e.Error()
if desc == "" {
desc = apiErr.Description
}
apiErr = APIError{
Code: apiErr.Code,
Description: desc,
HTTPStatusCode: apiErr.HTTPStatusCode,
}
return apiErr
switch apiErr.Code {
case "NotImplemented":
desc := fmt.Sprintf("%s (%v)", apiErr.Description, err)
apiErr = APIError{
Code: apiErr.Code,
Description: desc,
HTTPStatusCode: apiErr.HTTPStatusCode,
}
}
if apiErr.Code == "XMinioBackendDown" {
case "XMinioBackendDown":
apiErr.Description = fmt.Sprintf("%s (%v)", apiErr.Description, err)
return apiErr
}
if apiErr.Code == "InternalError" {
case "InternalError":
// If we see an internal error try to interpret
// any underlying errors if possible depending on
// their internal error types.
@@ -2318,22 +2321,20 @@ func toAPIError(ctx context.Context, err error) APIError {
}
case *xml.SyntaxError:
apiErr = APIError{
Code: "MalformedXML",
Description: fmt.Sprintf("%s (%s)", errorCodes[ErrMalformedXML].Description,
e.Error()),
Code: "MalformedXML",
Description: fmt.Sprintf("%s (%s)", errorCodes[ErrMalformedXML].Description, e),
HTTPStatusCode: errorCodes[ErrMalformedXML].HTTPStatusCode,
}
case url.EscapeError:
apiErr = APIError{
Code: "XMinioInvalidObjectName",
Description: fmt.Sprintf("%s (%s)", errorCodes[ErrInvalidObjectName].Description,
e.Error()),
Code: "XMinioInvalidObjectName",
Description: fmt.Sprintf("%s (%s)", errorCodes[ErrInvalidObjectName].Description, e),
HTTPStatusCode: http.StatusBadRequest,
}
case versioning.Error:
apiErr = APIError{
Code: "IllegalVersioningConfigurationException",
Description: fmt.Sprintf("Versioning configuration specified in the request is invalid. (%s)", e.Error()),
Description: fmt.Sprintf("Versioning configuration specified in the request is invalid. (%s)", e),
HTTPStatusCode: http.StatusBadRequest,
}
case lifecycle.Error:
@@ -2399,19 +2400,7 @@ func toAPIError(ctx context.Context, err error) APIError {
// Add more other SDK related errors here if any in future.
default:
//nolint:gocritic
if errors.Is(err, errMalformedEncoding) {
apiErr = APIError{
Code: "BadRequest",
Description: err.Error(),
HTTPStatusCode: http.StatusBadRequest,
}
} else if errors.Is(err, errChunkTooBig) {
apiErr = APIError{
Code: "BadRequest",
Description: err.Error(),
HTTPStatusCode: http.StatusBadRequest,
}
} else if errors.Is(err, strconv.ErrRange) {
if errors.Is(err, errMalformedEncoding) || errors.Is(err, errChunkTooBig) || errors.Is(err, strconv.ErrRange) {
apiErr = APIError{
Code: "BadRequest",
Description: err.Error(),

View File

@@ -285,7 +285,10 @@ func registerAPIRouter(router *mux.Router) {
// GetObjectLegalHold
router.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(
collectAPIStats("getobjectlegalhold", maxClients(gz(httpTraceAll(api.GetObjectLegalHoldHandler))))).Queries("legal-hold", "")
// GetObject - note gzip compression is *not* added due to Range requests.
// GetObject with lambda ARNs
router.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(
collectAPIStats("getobject", maxClients(gz(httpTraceHdrs(api.GetObjectLambdaHandler))))).Queries("lambdaArn", "{lambdaArn:.*}")
// GetObject
router.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(
collectAPIStats("getobject", maxClients(gz(httpTraceHdrs(api.GetObjectHandler)))))
// CopyObject

File diff suppressed because one or more lines are too long

View File

@@ -661,6 +661,7 @@ func handleCommonEnvVars() {
}
u.Path = "" // remove any path component such as `/`
globalMinioEndpoint = u.String()
globalMinioEndpointURL = u
}
globalFSOSync, err = config.ParseBool(env.Get(config.EnvFSOSync, config.EnableOff))

View File

@@ -37,6 +37,7 @@ import (
"github.com/minio/minio/internal/config/identity/openid"
idplugin "github.com/minio/minio/internal/config/identity/plugin"
xtls "github.com/minio/minio/internal/config/identity/tls"
"github.com/minio/minio/internal/config/lambda"
"github.com/minio/minio/internal/config/notify"
"github.com/minio/minio/internal/config/policy/opa"
polplugin "github.com/minio/minio/internal/config/policy/plugin"
@@ -74,6 +75,9 @@ func initHelp() {
for k, v := range notify.DefaultNotificationKVS {
kvs[k] = v
}
for k, v := range lambda.DefaultLambdaKVS {
kvs[k] = v
}
if globalIsErasure {
kvs[config.StorageClassSubSys] = storageclass.DefaultKVS
kvs[config.HealSubSys] = heal.DefaultKVS
@@ -196,6 +200,11 @@ func initHelp() {
Description: "publish bucket notifications to Redis datastores",
MultipleTargets: true,
},
config.HelpKV{
Key: config.LambdaWebhookSubSys,
Description: "manage remote lambda functions",
MultipleTargets: true,
},
config.HelpKV{
Key: config.EtcdSubSys,
Description: "persist IAM assets externally to etcd",
@@ -246,6 +255,7 @@ func initHelp() {
config.NotifyRedisSubSys: notify.HelpRedis,
config.NotifyWebhookSubSys: notify.HelpWebhook,
config.NotifyESSubSys: notify.HelpES,
config.LambdaWebhookSubSys: lambda.HelpWebhook,
config.SubnetSubSys: subnet.HelpSubnet,
config.CallhomeSubSys: callhome.HelpCallhome,
}
@@ -387,6 +397,13 @@ func validateSubSysConfig(s config.Config, subSys string, objAPI ObjectLayer) er
return err
}
}
if config.LambdaSubSystems.Contains(subSys) {
if err := lambda.TestSubSysLambdaTargets(GlobalContext, s, subSys, NewHTTPTransport()); err != nil {
return err
}
}
return nil
}
@@ -423,6 +440,7 @@ func lookupConfigs(s config.Config, objAPI ObjectLayer) {
logger.LogIf(ctx, fmt.Errorf("Unable to initialize remote webhook DNS config %w", err))
}
if err == nil && dnsURL != "" {
bootstrapTrace("initialize remote bucket DNS store")
globalDNSConfig, err = dns.NewOperatorDNS(dnsURL,
dns.Authentication(dnsUser, dnsPass),
dns.RootCAs(globalRootCAs))
@@ -437,6 +455,7 @@ func lookupConfigs(s config.Config, objAPI ObjectLayer) {
}
if etcdCfg.Enabled {
bootstrapTrace("initialize etcd store")
globalEtcdClient, err = etcd.New(etcdCfg)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to initialize etcd config: %w", err))
@@ -495,12 +514,18 @@ func lookupConfigs(s config.Config, objAPI ObjectLayer) {
transport := NewHTTPTransport()
bootstrapTrace("lookup the event notification targets")
globalConfigTargetList, err = notify.FetchEnabledTargets(GlobalContext, s, transport)
bootstrapTrace("initialize the event notification targets")
globalNotifyTargetList, err = notify.FetchEnabledTargets(GlobalContext, s, transport)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to initialize notification target(s): %w", err))
}
bootstrapTrace("initialize the lambda targets")
globalLambdaTargetList, err = lambda.FetchEnabledTargets(GlobalContext, s, transport)
if err != nil {
logger.LogIf(ctx, fmt.Errorf("Unable to initialize lambda target(s): %w", err))
}
bootstrapTrace("applying the dynamic configuration")
// Apply dynamic config values
if err := applyDynamicConfig(ctx, objAPI, s); err != nil {

View File

@@ -101,7 +101,7 @@ func (evnot *EventNotifier) InitBucketTargets(ctx context.Context, objAPI Object
return errServerNotInitialized
}
if err := evnot.targetList.Add(globalConfigTargetList.Targets()...); err != nil {
if err := evnot.targetList.Add(globalNotifyTargetList.Targets()...); err != nil {
return err
}

View File

@@ -49,6 +49,7 @@ import (
xhttp "github.com/minio/minio/internal/http"
etcd "go.etcd.io/etcd/client/v3"
levent "github.com/minio/minio/internal/config/lambda/event"
"github.com/minio/minio/internal/event"
"github.com/minio/minio/internal/pubsub"
"github.com/minio/pkg/certs"
@@ -174,7 +175,8 @@ var (
globalMinioConsoleHost = ""
// Holds the possible host endpoint.
globalMinioEndpoint = ""
globalMinioEndpoint = ""
globalMinioEndpointURL *xnet.URL
// globalConfigSys server config system.
globalConfigSys *ConfigSys
@@ -182,7 +184,8 @@ var (
globalNotificationSys *NotificationSys
globalEventNotifier *EventNotifier
globalConfigTargetList *event.TargetList
globalNotifyTargetList *event.TargetList
globalLambdaTargetList *levent.TargetList
globalBucketMetadataSys *BucketMetadataSys
globalBucketMonitor *bandwidth.Monitor

View File

@@ -131,6 +131,7 @@ const (
iamSubsystem MetricSubsystem = "iam"
kmsSubsystem MetricSubsystem = "kms"
notifySubsystem MetricSubsystem = "notify"
lambdaSubsystem MetricSubsystem = "lambda"
auditSubsystem MetricSubsystem = "audit"
)
@@ -1656,8 +1657,8 @@ func getNotificationMetrics() *MetricsGroup {
cacheInterval: 10 * time.Second,
}
mg.RegisterRead(func(ctx context.Context) []Metric {
stats := globalConfigTargetList.Stats()
metrics := make([]Metric, 0, 1+len(stats.TargetStats))
nstats := globalNotifyTargetList.Stats()
metrics := make([]Metric, 0, 1+len(nstats.TargetStats))
metrics = append(metrics, Metric{
Description: MetricDescription{
Namespace: minioNamespace,
@@ -1666,9 +1667,9 @@ func getNotificationMetrics() *MetricsGroup {
Help: "Number of concurrent async Send calls active to all targets",
Type: gaugeMetric,
},
Value: float64(stats.CurrentSendCalls),
Value: float64(nstats.CurrentSendCalls),
})
for _, st := range stats.TargetStats {
for _, st := range nstats.TargetStats {
metrics = append(metrics, Metric{
Description: MetricDescription{
Namespace: minioNamespace,
@@ -1681,6 +1682,43 @@ func getNotificationMetrics() *MetricsGroup {
Value: float64(st.CurrentQueue),
})
}
lstats := globalLambdaTargetList.Stats()
for _, st := range lstats.TargetStats {
metrics = append(metrics, Metric{
Description: MetricDescription{
Namespace: minioNamespace,
Subsystem: lambdaSubsystem,
Name: "active_requests",
Help: "Number of in progress requests",
},
VariableLabels: map[string]string{"target_id": st.ID.ID, "target_name": st.ID.Name},
Value: float64(st.ActiveRequests),
})
metrics = append(metrics, Metric{
Description: MetricDescription{
Namespace: minioNamespace,
Subsystem: lambdaSubsystem,
Name: "total_requests",
Help: "Total number of requests sent since start",
Type: counterMetric,
},
VariableLabels: map[string]string{"target_id": st.ID.ID, "target_name": st.ID.Name},
Value: float64(st.TotalRequests),
})
metrics = append(metrics, Metric{
Description: MetricDescription{
Namespace: minioNamespace,
Subsystem: lambdaSubsystem,
Name: "failed_requests",
Help: "Total number of requests that failed to send since start",
Type: counterMetric,
},
VariableLabels: map[string]string{"target_id": st.ID.ID, "target_name": st.ID.Name},
Value: float64(st.FailedRequests),
})
}
// Audit and system:
audit := logger.CurrentStats()
for id, st := range audit {

View File

@@ -0,0 +1,291 @@
// Copyright (c) 2015-2023 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"crypto/subtle"
"io"
"net/http"
"net/url"
"strings"
"time"
"github.com/klauspost/compress/gzhttp"
"github.com/lithammer/shortuuid/v4"
miniogo "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/mux"
"github.com/minio/pkg/bucket/policy"
"github.com/minio/minio/internal/auth"
levent "github.com/minio/minio/internal/config/lambda/event"
xhttp "github.com/minio/minio/internal/http"
"github.com/minio/minio/internal/logger"
)
func getLambdaEventData(bucket, object string, cred auth.Credentials, r *http.Request) (levent.Event, error) {
host := globalLocalNodeName
secure := globalIsTLS
if globalMinioEndpointURL != nil {
host = globalMinioEndpointURL.Host
secure = globalMinioEndpointURL.Scheme == "https"
}
duration := time.Since(cred.Expiration)
if cred.Expiration.IsZero() {
duration = time.Hour
}
clnt, err := miniogo.New(host, &miniogo.Options{
Creds: credentials.NewStaticV4(cred.AccessKey, cred.SecretKey, cred.SessionToken),
Secure: secure,
Transport: globalRemoteTargetTransport,
Region: globalSite.Region,
})
if err != nil {
return levent.Event{}, err
}
reqParams := url.Values{}
if partNumberStr := r.Form.Get("partNumber"); partNumberStr != "" {
reqParams.Set("partNumber", partNumberStr)
}
for k := range supportedHeadGetReqParams {
if v := r.Form.Get(k); v != "" {
reqParams.Set(k, v)
}
}
extraHeaders := http.Header{}
if rng := r.Header.Get(xhttp.Range); rng != "" {
extraHeaders.Set(xhttp.Range, r.Header.Get(xhttp.Range))
}
u, err := clnt.PresignHeader(r.Context(), http.MethodGet, bucket, object, duration, reqParams, extraHeaders)
if err != nil {
return levent.Event{}, err
}
token, err := authenticateNode(cred.AccessKey, cred.SecretKey, u.RawQuery)
if err != nil {
return levent.Event{}, err
}
eventData := levent.Event{
GetObjectContext: &levent.GetObjectContext{
InputS3URL: u.String(),
OutputRoute: shortuuid.New(),
OutputToken: token,
},
UserRequest: levent.UserRequest{
URL: r.URL.String(),
Headers: r.Header.Clone(),
},
UserIdentity: levent.Identity{
Type: "IAMUser",
PrincipalID: cred.AccessKey,
AccessKeyID: cred.SecretKey,
},
}
return eventData, nil
}
var statusTextToCode = map[string]int{
"Continue": http.StatusContinue,
"Switching Protocols": http.StatusSwitchingProtocols,
"Processing": http.StatusProcessing,
"Early Hints": http.StatusEarlyHints,
"OK": http.StatusOK,
"Created": http.StatusCreated,
"Accepted": http.StatusAccepted,
"Non-Authoritative Information": http.StatusNonAuthoritativeInfo,
"No Content": http.StatusNoContent,
"Reset Content": http.StatusResetContent,
"Partial Content": http.StatusPartialContent,
"Multi-Status": http.StatusMultiStatus,
"Already Reported": http.StatusAlreadyReported,
"IM Used": http.StatusIMUsed,
"Multiple Choices": http.StatusMultipleChoices,
"Moved Permanently": http.StatusMovedPermanently,
"Found": http.StatusFound,
"See Other": http.StatusSeeOther,
"Not Modified": http.StatusNotModified,
"Use Proxy": http.StatusUseProxy,
"Temporary Redirect": http.StatusTemporaryRedirect,
"Permanent Redirect": http.StatusPermanentRedirect,
"Bad Request": http.StatusBadRequest,
"Unauthorized": http.StatusUnauthorized,
"Payment Required": http.StatusPaymentRequired,
"Forbidden": http.StatusForbidden,
"Not Found": http.StatusNotFound,
"Method Not Allowed": http.StatusMethodNotAllowed,
"Not Acceptable": http.StatusNotAcceptable,
"Proxy Authentication Required": http.StatusProxyAuthRequired,
"Request Timeout": http.StatusRequestTimeout,
"Conflict": http.StatusConflict,
"Gone": http.StatusGone,
"Length Required": http.StatusLengthRequired,
"Precondition Failed": http.StatusPreconditionFailed,
"Request Entity Too Large": http.StatusRequestEntityTooLarge,
"Request URI Too Long": http.StatusRequestURITooLong,
"Unsupported Media Type": http.StatusUnsupportedMediaType,
"Requested Range Not Satisfiable": http.StatusRequestedRangeNotSatisfiable,
"Expectation Failed": http.StatusExpectationFailed,
"I'm a teapot": http.StatusTeapot,
"Misdirected Request": http.StatusMisdirectedRequest,
"Unprocessable Entity": http.StatusUnprocessableEntity,
"Locked": http.StatusLocked,
"Failed Dependency": http.StatusFailedDependency,
"Too Early": http.StatusTooEarly,
"Upgrade Required": http.StatusUpgradeRequired,
"Precondition Required": http.StatusPreconditionRequired,
"Too Many Requests": http.StatusTooManyRequests,
"Request Header Fields Too Large": http.StatusRequestHeaderFieldsTooLarge,
"Unavailable For Legal Reasons": http.StatusUnavailableForLegalReasons,
"Internal Server Error": http.StatusInternalServerError,
"Not Implemented": http.StatusNotImplemented,
"Bad Gateway": http.StatusBadGateway,
"Service Unavailable": http.StatusServiceUnavailable,
"Gateway Timeout": http.StatusGatewayTimeout,
"HTTP Version Not Supported": http.StatusHTTPVersionNotSupported,
"Variant Also Negotiates": http.StatusVariantAlsoNegotiates,
"Insufficient Storage": http.StatusInsufficientStorage,
"Loop Detected": http.StatusLoopDetected,
"Not Extended": http.StatusNotExtended,
"Network Authentication Required": http.StatusNetworkAuthenticationRequired,
}
// StatusCode returns a HTTP Status code for the HTTP text. It returns -1
// if the text is unknown.
func StatusCode(text string) int {
if code, ok := statusTextToCode[text]; ok {
return code
}
return -1
}
func fwdHeadersToS3(h http.Header, w http.ResponseWriter) {
const trim = "x-amz-fwd-header-"
for k, v := range h {
if strings.HasPrefix(strings.ToLower(k), trim) {
w.Header()[k[len(trim):]] = v
}
}
}
func fwdStatusToAPIError(resp *http.Response) *APIError {
if status := resp.Header.Get(xhttp.AmzFwdStatus); status != "" && StatusCode(status) > -1 {
apiErr := &APIError{
HTTPStatusCode: StatusCode(status),
Description: resp.Header.Get(xhttp.AmzFwdErrorMessage),
Code: resp.Header.Get(xhttp.AmzFwdErrorCode),
}
if apiErr.HTTPStatusCode == http.StatusOK {
return nil
}
return apiErr
}
return nil
}
// GetObjectLamdbaHandler - GET Object with transformed data via lambda functions
// ----------
// This implementation of the GET operation applies lambda functions and returns the
// response generated via the lambda functions. To use this API, you must have READ access
// to the object.
func (api objectAPIHandlers) GetObjectLambdaHandler(w http.ResponseWriter, r *http.Request) {
ctx := newContext(r, w, "GetObjectLambda")
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
objectAPI := api.ObjectAPI()
if objectAPI == nil {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrServerNotInitialized), r.URL)
return
}
vars := mux.Vars(r)
bucket := vars["bucket"]
object, err := unescapePath(vars["object"])
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// Check for auth type to return S3 compatible error.
cred, _, s3Error := checkRequestAuthTypeCredential(ctx, r, policy.GetObjectAction)
if s3Error != ErrNone {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL)
return
}
target, err := globalLambdaTargetList.Lookup(r.Form.Get("lambdaArn"))
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
eventData, err := getLambdaEventData(bucket, object, cred, r)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
resp, err := target.Send(eventData)
if err != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
defer resp.Body.Close()
if eventData.GetObjectContext.OutputRoute != resp.Header.Get(xhttp.AmzRequestRoute) {
tokenErr := errorCodes.ToAPIErr(ErrInvalidRequest)
tokenErr.Description = "The request route included in the request is invalid"
writeErrorResponse(ctx, w, tokenErr, r.URL)
return
}
if subtle.ConstantTimeCompare([]byte(resp.Header.Get(xhttp.AmzRequestToken)), []byte(eventData.GetObjectContext.OutputToken)) != 1 {
tokenErr := errorCodes.ToAPIErr(ErrInvalidToken)
tokenErr.Description = "The request token included in the request is invalid"
writeErrorResponse(ctx, w, tokenErr, r.URL)
return
}
// Set all the relevant lambda forward headers if found.
fwdHeadersToS3(resp.Header, w)
if apiErr := fwdStatusToAPIError(resp); apiErr != nil {
writeErrorResponse(ctx, w, *apiErr, r.URL)
return
}
if resp.StatusCode != http.StatusOK {
writeErrorResponse(ctx, w, APIError{
Code: "LambdaFunctionError",
HTTPStatusCode: resp.StatusCode,
Description: "unexpected failure reported from lambda function",
}, r.URL)
return
}
if !globalAPIConfig.shouldGzipObjects() {
w.Header().Set(gzhttp.HeaderNoCompression, "true")
}
io.Copy(w, resp.Body)
}

View File

@@ -136,6 +136,7 @@ func printServerCommonMsg(apiEndpoints []string) {
}
}
printEventNotifiers()
printLambdaTargets()
if globalBrowserEnabled {
consoleEndpointStr := strings.Join(stripStandardPorts(getConsoleEndpoints(), globalMinioConsoleHost), " ")
@@ -152,6 +153,18 @@ func printObjectAPIMsg() {
logger.Info(color.Blue("\nDocumentation: ") + "https://min.io/docs/minio/linux/index.html")
}
func printLambdaTargets() {
if globalLambdaTargetList == nil || globalLambdaTargetList.Empty() {
return
}
arnMsg := color.Blue("Object Lambda ARNs: ")
for _, arn := range globalLambdaTargetList.List(globalSite.Region) {
arnMsg += color.Bold(fmt.Sprintf("%s ", arn))
}
logger.Info(arnMsg + "\n")
}
// Prints bucket notification configurations.
func printEventNotifiers() {
if globalNotificationSys == nil {
@@ -168,7 +181,7 @@ func printEventNotifiers() {
arnMsg += color.Bold(fmt.Sprintf("%s ", arn))
}
logger.Info(arnMsg)
logger.Info(arnMsg + "\n")
}
// Prints startup message for command line access. Prints link to our documentation