optimize ftp/sftp upload() implementations to avoid CPU load (#19552)

This commit is contained in:
Harshavardhana 2024-04-19 05:23:42 -07:00 committed by GitHub
parent 928c0181bf
commit 72f5cb577e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 40 additions and 39 deletions

View File

@ -25,6 +25,7 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"path"
"strings" "strings"
"time" "time"
@ -33,6 +34,7 @@ import (
"github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio/internal/auth" "github.com/minio/minio/internal/auth"
xioutil "github.com/minio/minio/internal/ioutil" xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/pkg/v2/mimedb"
ftp "goftp.io/server/v2" ftp "goftp.io/server/v2"
) )
@ -103,7 +105,7 @@ type ftpMetrics struct{}
var globalFtpMetrics ftpMetrics var globalFtpMetrics ftpMetrics
func ftpTrace(s *ftp.Context, startTime time.Time, source, path string, err error) madmin.TraceInfo { func ftpTrace(s *ftp.Context, startTime time.Time, source, objPath string, err error) madmin.TraceInfo {
var errStr string var errStr string
if err != nil { if err != nil {
errStr = err.Error() errStr = err.Error()
@ -114,7 +116,7 @@ func ftpTrace(s *ftp.Context, startTime time.Time, source, path string, err erro
NodeName: globalLocalNodeName, NodeName: globalLocalNodeName,
FuncName: fmt.Sprintf("ftp USER=%s COMMAND=%s PARAM=%s ISLOGIN=%t, Source=%s", s.Sess.LoginUser(), s.Cmd, s.Param, s.Sess.IsLogin(), source), FuncName: fmt.Sprintf("ftp USER=%s COMMAND=%s PARAM=%s ISLOGIN=%t, Source=%s", s.Sess.LoginUser(), s.Cmd, s.Param, s.Sess.IsLogin(), source),
Duration: time.Since(startTime), Duration: time.Since(startTime),
Path: path, Path: objPath,
Error: errStr, Error: errStr,
} }
} }
@ -128,18 +130,18 @@ func (m *ftpMetrics) log(s *ftp.Context, paths ...string) func(err error) {
} }
// Stat implements ftpDriver // Stat implements ftpDriver
func (driver *ftpDriver) Stat(ctx *ftp.Context, path string) (fi os.FileInfo, err error) { func (driver *ftpDriver) Stat(ctx *ftp.Context, objPath string) (fi os.FileInfo, err error) {
stopFn := globalFtpMetrics.log(ctx, path) stopFn := globalFtpMetrics.log(ctx, objPath)
defer stopFn(err) defer stopFn(err)
if path == SlashSeparator { if objPath == SlashSeparator {
return &minioFileInfo{ return &minioFileInfo{
p: SlashSeparator, p: SlashSeparator,
isDir: true, isDir: true,
}, nil }, nil
} }
bucket, object := path2BucketObject(path) bucket, object := path2BucketObject(objPath)
if bucket == "" { if bucket == "" {
return nil, errors.New("bucket name cannot be empty") return nil, errors.New("bucket name cannot be empty")
} }
@ -186,8 +188,8 @@ func (driver *ftpDriver) Stat(ctx *ftp.Context, path string) (fi os.FileInfo, er
} }
// ListDir implements ftpDriver // ListDir implements ftpDriver
func (driver *ftpDriver) ListDir(ctx *ftp.Context, path string, callback func(os.FileInfo) error) (err error) { func (driver *ftpDriver) ListDir(ctx *ftp.Context, objPath string, callback func(os.FileInfo) error) (err error) {
stopFn := globalFtpMetrics.log(ctx, path) stopFn := globalFtpMetrics.log(ctx, objPath)
defer stopFn(err) defer stopFn(err)
clnt, err := driver.getMinIOClient(ctx) clnt, err := driver.getMinIOClient(ctx)
@ -198,7 +200,7 @@ func (driver *ftpDriver) ListDir(ctx *ftp.Context, path string, callback func(os
cctx, cancel := context.WithCancel(context.Background()) cctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
bucket, prefix := path2BucketObject(path) bucket, prefix := path2BucketObject(objPath)
if bucket == "" { if bucket == "" {
buckets, err := clnt.ListBuckets(cctx) buckets, err := clnt.ListBuckets(cctx)
if err != nil { if err != nil {
@ -365,11 +367,11 @@ func (driver *ftpDriver) getMinIOClient(ctx *ftp.Context) (*minio.Client, error)
} }
// DeleteDir implements ftpDriver // DeleteDir implements ftpDriver
func (driver *ftpDriver) DeleteDir(ctx *ftp.Context, path string) (err error) { func (driver *ftpDriver) DeleteDir(ctx *ftp.Context, objPath string) (err error) {
stopFn := globalFtpMetrics.log(ctx, path) stopFn := globalFtpMetrics.log(ctx, objPath)
defer stopFn(err) defer stopFn(err)
bucket, prefix := path2BucketObject(path) bucket, prefix := path2BucketObject(objPath)
if bucket == "" { if bucket == "" {
return errors.New("deleting all buckets not allowed") return errors.New("deleting all buckets not allowed")
} }
@ -415,11 +417,11 @@ func (driver *ftpDriver) DeleteDir(ctx *ftp.Context, path string) (err error) {
} }
// DeleteFile implements ftpDriver // DeleteFile implements ftpDriver
func (driver *ftpDriver) DeleteFile(ctx *ftp.Context, path string) (err error) { func (driver *ftpDriver) DeleteFile(ctx *ftp.Context, objPath string) (err error) {
stopFn := globalFtpMetrics.log(ctx, path) stopFn := globalFtpMetrics.log(ctx, objPath)
defer stopFn(err) defer stopFn(err)
bucket, object := path2BucketObject(path) bucket, object := path2BucketObject(objPath)
if bucket == "" { if bucket == "" {
return errors.New("bucket name cannot be empty") return errors.New("bucket name cannot be empty")
} }
@ -433,19 +435,19 @@ func (driver *ftpDriver) DeleteFile(ctx *ftp.Context, path string) (err error) {
} }
// Rename implements ftpDriver // Rename implements ftpDriver
func (driver *ftpDriver) Rename(ctx *ftp.Context, fromPath string, toPath string) (err error) { func (driver *ftpDriver) Rename(ctx *ftp.Context, fromObjPath string, toObjPath string) (err error) {
stopFn := globalFtpMetrics.log(ctx, fromPath, toPath) stopFn := globalFtpMetrics.log(ctx, fromObjPath, toObjPath)
defer stopFn(err) defer stopFn(err)
return NotImplemented{} return NotImplemented{}
} }
// MakeDir implements ftpDriver // MakeDir implements ftpDriver
func (driver *ftpDriver) MakeDir(ctx *ftp.Context, path string) (err error) { func (driver *ftpDriver) MakeDir(ctx *ftp.Context, objPath string) (err error) {
stopFn := globalFtpMetrics.log(ctx, path) stopFn := globalFtpMetrics.log(ctx, objPath)
defer stopFn(err) defer stopFn(err)
bucket, prefix := path2BucketObject(path) bucket, prefix := path2BucketObject(objPath)
if bucket == "" { if bucket == "" {
return errors.New("bucket name cannot be empty") return errors.New("bucket name cannot be empty")
} }
@ -461,21 +463,18 @@ func (driver *ftpDriver) MakeDir(ctx *ftp.Context, path string) (err error) {
dirPath := buildMinioDir(prefix) dirPath := buildMinioDir(prefix)
_, err = clnt.PutObject(context.Background(), bucket, dirPath, bytes.NewReader([]byte("")), 0, _, err = clnt.PutObject(context.Background(), bucket, dirPath, bytes.NewReader([]byte("")), 0, minio.PutObjectOptions{
// Always send Content-MD5 to succeed with bucket with DisableContentSha256: true,
// locking enabled. There is no performance hit since })
// this is always an empty object
minio.PutObjectOptions{SendContentMd5: true},
)
return err return err
} }
// GetFile implements ftpDriver // GetFile implements ftpDriver
func (driver *ftpDriver) GetFile(ctx *ftp.Context, path string, offset int64) (n int64, rc io.ReadCloser, err error) { func (driver *ftpDriver) GetFile(ctx *ftp.Context, objPath string, offset int64) (n int64, rc io.ReadCloser, err error) {
stopFn := globalFtpMetrics.log(ctx, path) stopFn := globalFtpMetrics.log(ctx, objPath)
defer stopFn(err) defer stopFn(err)
bucket, object := path2BucketObject(path) bucket, object := path2BucketObject(objPath)
if bucket == "" { if bucket == "" {
return 0, nil, errors.New("bucket name cannot be empty") return 0, nil, errors.New("bucket name cannot be empty")
} }
@ -510,11 +509,11 @@ func (driver *ftpDriver) GetFile(ctx *ftp.Context, path string, offset int64) (n
} }
// PutFile implements ftpDriver // PutFile implements ftpDriver
func (driver *ftpDriver) PutFile(ctx *ftp.Context, path string, data io.Reader, offset int64) (n int64, err error) { func (driver *ftpDriver) PutFile(ctx *ftp.Context, objPath string, data io.Reader, offset int64) (n int64, err error) {
stopFn := globalFtpMetrics.log(ctx, path) stopFn := globalFtpMetrics.log(ctx, objPath)
defer stopFn(err) defer stopFn(err)
bucket, object := path2BucketObject(path) bucket, object := path2BucketObject(objPath)
if bucket == "" { if bucket == "" {
return 0, errors.New("bucket name cannot be empty") return 0, errors.New("bucket name cannot be empty")
} }
@ -530,8 +529,8 @@ func (driver *ftpDriver) PutFile(ctx *ftp.Context, path string, data io.Reader,
} }
info, err := clnt.PutObject(context.Background(), bucket, object, data, -1, minio.PutObjectOptions{ info, err := clnt.PutObject(context.Background(), bucket, object, data, -1, minio.PutObjectOptions{
ContentType: "application/octet-stream", ContentType: mimedb.TypeByExtension(path.Ext(object)),
SendContentMd5: true, DisableContentSha256: true,
}) })
return info.Size, err return info.Size, err
} }

View File

@ -24,6 +24,7 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"path"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -33,6 +34,7 @@ import (
"github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio/internal/auth" "github.com/minio/minio/internal/auth"
xioutil "github.com/minio/minio/internal/ioutil" xioutil "github.com/minio/minio/internal/ioutil"
"github.com/minio/pkg/v2/mimedb"
"github.com/pkg/sftp" "github.com/pkg/sftp"
"golang.org/x/crypto/ssh" "golang.org/x/crypto/ssh"
) )
@ -319,7 +321,10 @@ func (f *sftpDriver) Filewrite(r *sftp.Request) (w io.WriterAt, err error) {
} }
wa.wg.Add(1) wa.wg.Add(1)
go func() { go func() {
_, err := clnt.PutObject(r.Context(), bucket, object, pr, -1, minio.PutObjectOptions{SendContentMd5: true}) _, err := clnt.PutObject(r.Context(), bucket, object, pr, -1, minio.PutObjectOptions{
ContentType: mimedb.TypeByExtension(path.Ext(object)),
DisableContentSha256: true,
})
pr.CloseWithError(err) pr.CloseWithError(err)
wa.wg.Done() wa.wg.Done()
}() }()
@ -399,10 +404,7 @@ func (f *sftpDriver) Filecmd(r *sftp.Request) (err error) {
dirPath := buildMinioDir(prefix) dirPath := buildMinioDir(prefix)
_, err = clnt.PutObject(context.Background(), bucket, dirPath, bytes.NewReader([]byte("")), 0, _, err = clnt.PutObject(context.Background(), bucket, dirPath, bytes.NewReader([]byte("")), 0,
// Always send Content-MD5 to succeed with bucket with minio.PutObjectOptions{DisableContentSha256: true},
// locking enabled. There is no performance hit since
// this is always an empty object
minio.PutObjectOptions{SendContentMd5: true},
) )
return err return err
} }