mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
fs/object: Fix issues from review comments.
This commit is contained in:
parent
149c6ca094
commit
be002ac01e
@ -558,7 +558,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
|
||||
writeErrorResponse(w, r, apiErr, r.URL.Path)
|
||||
return
|
||||
}
|
||||
objInfo, err := api.ObjectAPI.PutObject(bucket, object, -1, fileBody, nil)
|
||||
md5Sum, err := api.ObjectAPI.PutObject(bucket, object, -1, fileBody, nil)
|
||||
if err != nil {
|
||||
errorIf(err.Trace(), "PutObject failed.", nil)
|
||||
switch err.ToGoError().(type) {
|
||||
@ -577,8 +577,8 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
|
||||
}
|
||||
return
|
||||
}
|
||||
if objInfo.MD5Sum != "" {
|
||||
w.Header().Set("ETag", "\""+objInfo.MD5Sum+"\"")
|
||||
if md5Sum != "" {
|
||||
w.Header().Set("ETag", "\""+md5Sum+"\"")
|
||||
}
|
||||
writeSuccessResponse(w, nil)
|
||||
}
|
||||
|
@ -127,12 +127,25 @@ func treeWalk(bucketDir, prefixDir, entryPrefixMatch, marker string, recursive b
|
||||
}
|
||||
}
|
||||
|
||||
// readDirAll returns entries that begins with entryPrefixMatch
|
||||
dirents, err := readDirAll(filepath.Join(bucketDir, prefixDir), entryPrefixMatch)
|
||||
// Entry prefix match function.
|
||||
prefixMatchFn := func(dirent fsDirent) bool {
|
||||
if dirent.IsDir() || dirent.IsRegular() {
|
||||
// Does dirent name has reserved prefixes or suffixes.
|
||||
hasReserved := hasReservedPrefix(dirent.name) || hasReservedSuffix(dirent.name)
|
||||
// All dirents which match prefix and do not have reserved
|
||||
// keywords in them are valid entries.
|
||||
return strings.HasPrefix(dirent.name, entryPrefixMatch) && !hasReserved
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// scandir returns entries that begins with entryPrefixMatch
|
||||
dirents, err := scandir(filepath.Join(bucketDir, prefixDir), prefixMatchFn, true)
|
||||
if err != nil {
|
||||
send(treeWalkResult{err: err})
|
||||
return false
|
||||
}
|
||||
|
||||
// example:
|
||||
// If markerDir="four/" searchDirents() returns the index of "four/" in the sorted
|
||||
// dirents list. We skip all the dirent entries till "four/"
|
||||
|
@ -23,7 +23,6 @@ import (
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strings"
|
||||
"syscall"
|
||||
"unsafe"
|
||||
)
|
||||
@ -48,7 +47,7 @@ func clen(n []byte) int {
|
||||
|
||||
// parseDirents - inspired from
|
||||
// https://golang.org/src/syscall/syscall_<os>.go
|
||||
func parseDirents(buf []byte) []fsDirent {
|
||||
func parseDirents(dirPath string, buf []byte) []fsDirent {
|
||||
bufidx := 0
|
||||
dirents := []fsDirent{}
|
||||
for bufidx < len(buf) {
|
||||
@ -87,7 +86,15 @@ func parseDirents(buf []byte) []fsDirent {
|
||||
case syscall.DT_SOCK:
|
||||
mode = os.ModeSocket
|
||||
case syscall.DT_UNKNOWN:
|
||||
mode = 0xffffffff
|
||||
// On Linux XFS does not implement d_type for on disk
|
||||
// format << v5. Fall back to Stat().
|
||||
if fi, err := os.Stat(filepath.Join(dirPath, name)); err == nil {
|
||||
mode = fi.Mode()
|
||||
} else {
|
||||
// Caller listing would fail, if Stat failed but we
|
||||
// won't crash the server.
|
||||
mode = 0xffffffff
|
||||
}
|
||||
}
|
||||
|
||||
dirents = append(dirents, fsDirent{
|
||||
@ -98,38 +105,6 @@ func parseDirents(buf []byte) []fsDirent {
|
||||
return dirents
|
||||
}
|
||||
|
||||
// Read all directory entries, returns a list of lexically sorted
|
||||
// entries.
|
||||
func readDirAll(readDirPath, entryPrefixMatch string) ([]fsDirent, error) {
|
||||
buf := make([]byte, readDirentBufSize)
|
||||
f, err := os.Open(readDirPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
dirents := []fsDirent{}
|
||||
for {
|
||||
nbuf, err := syscall.ReadDirent(int(f.Fd()), buf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if nbuf <= 0 {
|
||||
break
|
||||
}
|
||||
for _, dirent := range parseDirents(buf[:nbuf]) {
|
||||
if dirent.IsDir() {
|
||||
dirent.name += string(os.PathSeparator)
|
||||
dirent.size = 0
|
||||
}
|
||||
if strings.HasPrefix(dirent.name, entryPrefixMatch) {
|
||||
dirents = append(dirents, dirent)
|
||||
}
|
||||
}
|
||||
}
|
||||
sort.Sort(byDirentName(dirents))
|
||||
return dirents, nil
|
||||
}
|
||||
|
||||
// scans the directory dirPath, calling filter() on each directory
|
||||
// entry. Entries for which filter() returns true are stored, lexically
|
||||
// sorted using sort.Sort(). If filter is NULL, all entries are selected.
|
||||
@ -152,12 +127,13 @@ func scandir(dirPath string, filter func(fsDirent) bool, namesOnly bool) ([]fsDi
|
||||
if nbuf <= 0 {
|
||||
break
|
||||
}
|
||||
for _, dirent := range parseDirents(buf[:nbuf]) {
|
||||
for _, dirent := range parseDirents(dirPath, buf[:nbuf]) {
|
||||
if !namesOnly {
|
||||
dirent.name = filepath.Join(dirPath, dirent.name)
|
||||
}
|
||||
if dirent.IsDir() {
|
||||
dirent.name += string(os.PathSeparator)
|
||||
dirent.size = 0
|
||||
}
|
||||
if filter == nil || filter(dirent) {
|
||||
dirents = append(dirents, dirent)
|
||||
|
@ -23,46 +23,8 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Read all directory entries, returns a list of lexically sorted entries.
|
||||
func readDirAll(readDirPath, entryPrefixMatch string) ([]fsDirent, error) {
|
||||
f, err := os.Open(readDirPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
var dirents []fsDirent
|
||||
for {
|
||||
fis, err := f.Readdir(1000)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
for _, fi := range fis {
|
||||
dirent := fsDirent{
|
||||
name: fi.Name(),
|
||||
modTime: fi.ModTime(),
|
||||
size: fi.Size(),
|
||||
mode: fi.Mode(),
|
||||
}
|
||||
if dirent.IsDir() {
|
||||
dirent.name += string(os.PathSeparator)
|
||||
dirent.size = 0
|
||||
}
|
||||
if strings.HasPrefix(fi.Name(), entryPrefixMatch) {
|
||||
dirents = append(dirents, dirent)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Sort dirents.
|
||||
sort.Sort(byDirentName(dirents))
|
||||
return dirents, nil
|
||||
}
|
||||
|
||||
// scans the directory dirPath, calling filter() on each directory
|
||||
// entry. Entries for which filter() returns true are stored, lexically
|
||||
// sorted using sort.Sort(). If filter is NULL, all entries are selected.
|
||||
|
46
fs-utils.go
46
fs-utils.go
@ -18,6 +18,8 @@ package main
|
||||
|
||||
import (
|
||||
"regexp"
|
||||
"runtime"
|
||||
"strings"
|
||||
"unicode/utf8"
|
||||
)
|
||||
|
||||
@ -27,7 +29,17 @@ var validVolname = regexp.MustCompile(`^.{3,63}$`)
|
||||
// isValidVolname verifies a volname name in accordance with object
|
||||
// layer requirements.
|
||||
func isValidVolname(volname string) bool {
|
||||
return validVolname.MatchString(volname)
|
||||
if !validVolname.MatchString(volname) {
|
||||
return false
|
||||
}
|
||||
switch runtime.GOOS {
|
||||
case "windows":
|
||||
// Volname shouldn't have reserved characters on windows in it.
|
||||
return !strings.ContainsAny(volname, "/\\:*?\"<>|")
|
||||
default:
|
||||
// Volname shouldn't have '/' in it.
|
||||
return !strings.ContainsAny(volname, "/")
|
||||
}
|
||||
}
|
||||
|
||||
// Keeping this as lower bound value supporting Linux, Darwin and Windows operating systems.
|
||||
@ -54,3 +66,35 @@ func isValidPrefix(prefix string) bool {
|
||||
// Verify if prefix is a valid path.
|
||||
return isValidPath(prefix)
|
||||
}
|
||||
|
||||
// List of reserved words for files, includes old and new ones.
|
||||
var reservedKeywords = []string{
|
||||
"$multiparts",
|
||||
"$tmpobject",
|
||||
"$tmpfile",
|
||||
// Add new reserved words if any used in future.
|
||||
}
|
||||
|
||||
// hasReservedPrefix - returns true if name has a reserved keyword suffixed.
|
||||
func hasReservedSuffix(name string) (isReserved bool) {
|
||||
for _, reservedKey := range reservedKeywords {
|
||||
if strings.HasSuffix(name, reservedKey) {
|
||||
isReserved = true
|
||||
break
|
||||
}
|
||||
isReserved = false
|
||||
}
|
||||
return isReserved
|
||||
}
|
||||
|
||||
// hasReservedPrefix - has reserved prefix.
|
||||
func hasReservedPrefix(name string) (isReserved bool) {
|
||||
for _, reservedKey := range reservedKeywords {
|
||||
if strings.HasPrefix(name, reservedKey) {
|
||||
isReserved = true
|
||||
break
|
||||
}
|
||||
isReserved = false
|
||||
}
|
||||
return isReserved
|
||||
}
|
||||
|
216
fs.go
216
fs.go
@ -18,7 +18,6 @@ package main
|
||||
|
||||
import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
@ -46,7 +45,6 @@ type fsStorage struct {
|
||||
diskPath string
|
||||
diskInfo disk.Info
|
||||
minFreeDisk int64
|
||||
rwLock *sync.RWMutex
|
||||
listObjectMap map[listParams][]*treeWalker
|
||||
listObjectMapMutex *sync.Mutex
|
||||
}
|
||||
@ -98,7 +96,6 @@ func newFS(diskPath string) (StorageAPI, error) {
|
||||
minFreeDisk: 5, // Minimum 5% disk should be free.
|
||||
listObjectMap: make(map[listParams][]*treeWalker),
|
||||
listObjectMapMutex: &sync.Mutex{},
|
||||
rwLock: &sync.RWMutex{},
|
||||
}
|
||||
return fs, nil
|
||||
}
|
||||
@ -121,65 +118,6 @@ func checkDiskFree(diskPath string, minFreeDisk int64) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkVolumeArg - will convert incoming volume names to
|
||||
// corresponding valid volume names on the backend in a platform
|
||||
// compatible way for all operating systems. If volume is not found
|
||||
// an error is generated.
|
||||
func (s fsStorage) checkVolumeArg(volume string) (string, error) {
|
||||
if !isValidVolname(volume) {
|
||||
return "", errInvalidArgument
|
||||
}
|
||||
volumeDir := filepath.Join(s.diskPath, volume)
|
||||
_, err := os.Stat(volumeDir)
|
||||
if err == nil {
|
||||
return volumeDir, nil
|
||||
}
|
||||
if os.IsNotExist(err) {
|
||||
var volumes []os.FileInfo
|
||||
volumes, err = ioutil.ReadDir(s.diskPath)
|
||||
if err != nil {
|
||||
return volumeDir, errVolumeNotFound
|
||||
}
|
||||
for _, vol := range volumes {
|
||||
if vol.IsDir() {
|
||||
// Verify if lowercase version of the volume
|
||||
// is equal to the incoming volume, then use the proper name.
|
||||
if strings.ToLower(vol.Name()) == volume {
|
||||
volumeDir = filepath.Join(s.diskPath, vol.Name())
|
||||
return volumeDir, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return volumeDir, errVolumeNotFound
|
||||
} else if os.IsPermission(err) {
|
||||
return volumeDir, errVolumeAccessDenied
|
||||
}
|
||||
return volumeDir, err
|
||||
}
|
||||
|
||||
// Make a volume entry.
|
||||
func (s fsStorage) MakeVol(volume string) (err error) {
|
||||
volumeDir, err := s.checkVolumeArg(volume)
|
||||
if err == nil {
|
||||
// Volume already exists, return error.
|
||||
return errVolumeExists
|
||||
}
|
||||
|
||||
// Validate if disk is free.
|
||||
if e := checkDiskFree(s.diskPath, s.minFreeDisk); e != nil {
|
||||
return e
|
||||
}
|
||||
|
||||
// If volume not found create it.
|
||||
if err == errVolumeNotFound {
|
||||
// Make a volume entry.
|
||||
return os.Mkdir(volumeDir, 0700)
|
||||
}
|
||||
|
||||
// For all other errors return here.
|
||||
return err
|
||||
}
|
||||
|
||||
// removeDuplicateVols - remove duplicate volumes.
|
||||
func removeDuplicateVols(vols []VolInfo) []VolInfo {
|
||||
length := len(vols) - 1
|
||||
@ -201,38 +139,115 @@ func removeDuplicateVols(vols []VolInfo) []VolInfo {
|
||||
return vols
|
||||
}
|
||||
|
||||
// ListVols - list volumes.
|
||||
func (s fsStorage) ListVols() (volsInfo []VolInfo, err error) {
|
||||
files, err := ioutil.ReadDir(s.diskPath)
|
||||
// gets all the unique directories from diskPath.
|
||||
func getAllUniqueVols(dirPath string) ([]VolInfo, error) {
|
||||
volumeFn := func(dirent fsDirent) bool {
|
||||
// Return all directories.
|
||||
return dirent.IsDir()
|
||||
}
|
||||
namesOnly := false // Returned dirent names are absolute.
|
||||
dirents, err := scandir(dirPath, volumeFn, namesOnly)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, file := range files {
|
||||
if !file.IsDir() {
|
||||
// If not directory, ignore all file types.
|
||||
continue
|
||||
var volsInfo []VolInfo
|
||||
for _, dirent := range dirents {
|
||||
fi, err := os.Stat(dirent.name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
volsInfo = append(volsInfo, VolInfo{
|
||||
Name: fi.Name(),
|
||||
// As os.Stat() doesn't carry other than ModTime(), use
|
||||
// ModTime() as CreatedTime.
|
||||
Created: fi.ModTime(),
|
||||
})
|
||||
}
|
||||
volsInfo = removeDuplicateVols(volsInfo)
|
||||
return volsInfo, nil
|
||||
}
|
||||
|
||||
// getVolumeDir - will convert incoming volume names to
|
||||
// corresponding valid volume names on the backend in a platform
|
||||
// compatible way for all operating systems. If volume is not found
|
||||
// an error is generated.
|
||||
func (s fsStorage) getVolumeDir(volume string) (string, error) {
|
||||
if !isValidVolname(volume) {
|
||||
return "", errInvalidArgument
|
||||
}
|
||||
volumeDir := filepath.Join(s.diskPath, volume)
|
||||
_, err := os.Stat(volumeDir)
|
||||
if err == nil {
|
||||
return volumeDir, nil
|
||||
}
|
||||
if os.IsNotExist(err) {
|
||||
var volsInfo []VolInfo
|
||||
volsInfo, err = getAllUniqueVols(s.diskPath)
|
||||
if err != nil {
|
||||
return volumeDir, errVolumeNotFound
|
||||
}
|
||||
for _, vol := range volsInfo {
|
||||
// Verify if lowercase version of the volume
|
||||
// is equal to the incoming volume, then use the proper name.
|
||||
if strings.ToLower(vol.Name) == volume {
|
||||
volumeDir = filepath.Join(s.diskPath, vol.Name)
|
||||
return volumeDir, nil
|
||||
}
|
||||
}
|
||||
return volumeDir, errVolumeNotFound
|
||||
} else if os.IsPermission(err) {
|
||||
return volumeDir, errVolumeAccessDenied
|
||||
}
|
||||
return volumeDir, err
|
||||
}
|
||||
|
||||
// Make a volume entry.
|
||||
func (s fsStorage) MakeVol(volume string) (err error) {
|
||||
volumeDir, err := s.getVolumeDir(volume)
|
||||
if err == nil {
|
||||
// Volume already exists, return error.
|
||||
return errVolumeExists
|
||||
}
|
||||
|
||||
// Validate if disk is free.
|
||||
if e := checkDiskFree(s.diskPath, s.minFreeDisk); e != nil {
|
||||
return e
|
||||
}
|
||||
|
||||
// If volume not found create it.
|
||||
if err == errVolumeNotFound {
|
||||
// Make a volume entry.
|
||||
return os.Mkdir(volumeDir, 0700)
|
||||
}
|
||||
|
||||
// For all other errors return here.
|
||||
return err
|
||||
}
|
||||
|
||||
// ListVols - list volumes.
|
||||
func (s fsStorage) ListVols() (volsInfo []VolInfo, err error) {
|
||||
volsInfo, err = getAllUniqueVols(s.diskPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, vol := range volsInfo {
|
||||
// Volname on case sensitive fs backends can come in as
|
||||
// capitalized, but object layer cannot consume it
|
||||
// directly. Convert it as we see fit.
|
||||
volName := strings.ToLower(file.Name())
|
||||
// Modtime is used as created time.
|
||||
createdTime := file.ModTime()
|
||||
volName := strings.ToLower(vol.Name)
|
||||
volInfo := VolInfo{
|
||||
Name: volName,
|
||||
Created: createdTime,
|
||||
Created: vol.Created,
|
||||
}
|
||||
volsInfo = append(volsInfo, volInfo)
|
||||
}
|
||||
// Remove duplicated volume entries.
|
||||
volsInfo = removeDuplicateVols(volsInfo)
|
||||
return volsInfo, nil
|
||||
}
|
||||
|
||||
// StatVol - get volume info.
|
||||
func (s fsStorage) StatVol(volume string) (volInfo VolInfo, err error) {
|
||||
// Verify if volume is valid and it exists.
|
||||
volumeDir, err := s.checkVolumeArg(volume)
|
||||
volumeDir, err := s.getVolumeDir(volume)
|
||||
if err != nil {
|
||||
return VolInfo{}, err
|
||||
}
|
||||
@ -245,8 +260,8 @@ func (s fsStorage) StatVol(volume string) (volInfo VolInfo, err error) {
|
||||
}
|
||||
return VolInfo{}, err
|
||||
}
|
||||
// Modtime is used as created time since operating systems lack a
|
||||
// portable way of knowing the actual created time of a directory.
|
||||
// As os.Stat() doesn't carry other than ModTime(), use ModTime()
|
||||
// as CreatedTime.
|
||||
createdTime := st.ModTime()
|
||||
return VolInfo{
|
||||
Name: volume,
|
||||
@ -257,13 +272,24 @@ func (s fsStorage) StatVol(volume string) (volInfo VolInfo, err error) {
|
||||
// DeleteVol - delete a volume.
|
||||
func (s fsStorage) DeleteVol(volume string) error {
|
||||
// Verify if volume is valid and it exists.
|
||||
volumeDir, err := s.checkVolumeArg(volume)
|
||||
volumeDir, err := s.getVolumeDir(volume)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = os.Remove(volumeDir)
|
||||
if err != nil && os.IsNotExist(err) {
|
||||
return errVolumeNotFound
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return errVolumeNotFound
|
||||
} else if strings.Contains(err.Error(), "directory is not empty") {
|
||||
// On windows the string is slightly different, handle it
|
||||
// here.
|
||||
return errVolumeNotEmpty
|
||||
} else if strings.Contains(err.Error(), "directory not empty") {
|
||||
// Hopefully for all other operating systems, this is
|
||||
// assumed to be consistent.
|
||||
return errVolumeNotEmpty
|
||||
}
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
@ -302,18 +328,10 @@ func (s *fsStorage) lookupTreeWalk(params listParams) *treeWalker {
|
||||
return nil
|
||||
}
|
||||
|
||||
// List of special prefixes for files, includes old and new ones.
|
||||
var specialPrefixes = []string{
|
||||
"$multipart",
|
||||
"$tmpobject",
|
||||
"$tmpfile",
|
||||
// Add new special prefixes if any used.
|
||||
}
|
||||
|
||||
// List operation.
|
||||
func (s fsStorage) ListFiles(volume, prefix, marker string, recursive bool, count int) ([]FileInfo, bool, error) {
|
||||
// Verify if volume is valid and it exists.
|
||||
volumeDir, err := s.checkVolumeArg(volume)
|
||||
volumeDir, err := s.getVolumeDir(volume)
|
||||
if err != nil {
|
||||
return nil, true, err
|
||||
}
|
||||
@ -373,16 +391,6 @@ func (s fsStorage) ListFiles(volume, prefix, marker string, recursive bool, coun
|
||||
}
|
||||
fileInfo := walkResult.fileInfo
|
||||
fileInfo.Name = filepath.ToSlash(fileInfo.Name)
|
||||
// TODO: Find a proper place to skip these files.
|
||||
// Skip temporary files.
|
||||
for _, specialPrefix := range specialPrefixes {
|
||||
if strings.Contains(fileInfo.Name, specialPrefix) {
|
||||
if walkResult.end {
|
||||
return fileInfos, true, nil
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
fileInfos = append(fileInfos, fileInfo)
|
||||
// We have listed everything return.
|
||||
if walkResult.end {
|
||||
@ -397,7 +405,7 @@ func (s fsStorage) ListFiles(volume, prefix, marker string, recursive bool, coun
|
||||
|
||||
// ReadFile - read a file at a given offset.
|
||||
func (s fsStorage) ReadFile(volume string, path string, offset int64) (readCloser io.ReadCloser, err error) {
|
||||
volumeDir, err := s.checkVolumeArg(volume)
|
||||
volumeDir, err := s.getVolumeDir(volume)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -430,7 +438,7 @@ func (s fsStorage) ReadFile(volume string, path string, offset int64) (readClose
|
||||
|
||||
// CreateFile - create a file at path.
|
||||
func (s fsStorage) CreateFile(volume, path string) (writeCloser io.WriteCloser, err error) {
|
||||
volumeDir, err := s.checkVolumeArg(volume)
|
||||
volumeDir, err := s.getVolumeDir(volume)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -449,7 +457,7 @@ func (s fsStorage) CreateFile(volume, path string) (writeCloser io.WriteCloser,
|
||||
|
||||
// StatFile - get file info.
|
||||
func (s fsStorage) StatFile(volume, path string) (file FileInfo, err error) {
|
||||
volumeDir, err := s.checkVolumeArg(volume)
|
||||
volumeDir, err := s.getVolumeDir(volume)
|
||||
if err != nil {
|
||||
return FileInfo{}, err
|
||||
}
|
||||
@ -520,7 +528,7 @@ func deleteFile(basePath, deletePath string) error {
|
||||
|
||||
// DeleteFile - delete a file at path.
|
||||
func (s fsStorage) DeleteFile(volume, path string) error {
|
||||
volumeDir, err := s.checkVolumeArg(volume)
|
||||
volumeDir, err := s.getVolumeDir(volume)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
102
network-fs.go
102
network-fs.go
@ -30,6 +30,7 @@ import (
|
||||
)
|
||||
|
||||
type networkFS struct {
|
||||
netScheme string
|
||||
netAddr string
|
||||
netPath string
|
||||
rpcClient *rpc.Client
|
||||
@ -37,8 +38,7 @@ type networkFS struct {
|
||||
}
|
||||
|
||||
const (
|
||||
connected = "200 Connected to Go RPC"
|
||||
dialTimeoutSecs = 30 // 30 seconds.
|
||||
storageRPCPath = reservedBucket + "/rpc/storage"
|
||||
)
|
||||
|
||||
// splits network path into its components Address and Path.
|
||||
@ -49,6 +49,29 @@ func splitNetPath(networkPath string) (netAddr, netPath string) {
|
||||
return netAddr, netPath
|
||||
}
|
||||
|
||||
// Converts rpc.ServerError to underlying error. This function is
|
||||
// written so that the storageAPI errors are consistent across network
|
||||
// disks as well.
|
||||
func toStorageErr(err error) error {
|
||||
switch err.Error() {
|
||||
case errVolumeNotFound.Error():
|
||||
return errVolumeNotFound
|
||||
case errVolumeExists.Error():
|
||||
return errVolumeExists
|
||||
case errFileNotFound.Error():
|
||||
return errFileNotFound
|
||||
case errIsNotRegular.Error():
|
||||
return errIsNotRegular
|
||||
case errVolumeNotEmpty.Error():
|
||||
return errVolumeNotEmpty
|
||||
case errFileAccessDenied.Error():
|
||||
return errFileAccessDenied
|
||||
case errVolumeAccessDenied.Error():
|
||||
return errVolumeAccessDenied
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Initialize new network file system.
|
||||
func newNetworkFS(networkPath string) (StorageAPI, error) {
|
||||
// Input validation.
|
||||
@ -60,7 +83,7 @@ func newNetworkFS(networkPath string) (StorageAPI, error) {
|
||||
netAddr, netPath := splitNetPath(networkPath)
|
||||
|
||||
// Dial minio rpc storage http path.
|
||||
rpcClient, err := rpc.DialHTTPPath("tcp", netAddr, "/minio/rpc/storage")
|
||||
rpcClient, err := rpc.DialHTTPPath("tcp", netAddr, storageRPCPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -76,6 +99,7 @@ func newNetworkFS(networkPath string) (StorageAPI, error) {
|
||||
|
||||
// Initialize network storage.
|
||||
ndisk := &networkFS{
|
||||
netScheme: "http", // TODO: fix for ssl rpc support.
|
||||
netAddr: netAddr,
|
||||
netPath: netPath,
|
||||
rpcClient: rpcClient,
|
||||
@ -90,10 +114,7 @@ func newNetworkFS(networkPath string) (StorageAPI, error) {
|
||||
func (n networkFS) MakeVol(volume string) error {
|
||||
reply := GenericReply{}
|
||||
if err := n.rpcClient.Call("Storage.MakeVolHandler", volume, &reply); err != nil {
|
||||
if err.Error() == errVolumeExists.Error() {
|
||||
return errVolumeExists
|
||||
}
|
||||
return err
|
||||
return toStorageErr(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -111,10 +132,7 @@ func (n networkFS) ListVols() (vols []VolInfo, err error) {
|
||||
// StatVol - get current Stat volume info.
|
||||
func (n networkFS) StatVol(volume string) (volInfo VolInfo, err error) {
|
||||
if err = n.rpcClient.Call("Storage.StatVolHandler", volume, &volInfo); err != nil {
|
||||
if err.Error() == errVolumeNotFound.Error() {
|
||||
return VolInfo{}, errVolumeNotFound
|
||||
}
|
||||
return VolInfo{}, err
|
||||
return VolInfo{}, toStorageErr(err)
|
||||
}
|
||||
return volInfo, nil
|
||||
}
|
||||
@ -123,10 +141,7 @@ func (n networkFS) StatVol(volume string) (volInfo VolInfo, err error) {
|
||||
func (n networkFS) DeleteVol(volume string) error {
|
||||
reply := GenericReply{}
|
||||
if err := n.rpcClient.Call("Storage.DeleteVolHandler", volume, &reply); err != nil {
|
||||
if err.Error() == errVolumeNotFound.Error() {
|
||||
return errVolumeNotFound
|
||||
}
|
||||
return err
|
||||
return toStorageErr(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -136,9 +151,9 @@ func (n networkFS) DeleteVol(volume string) error {
|
||||
// CreateFile - create file.
|
||||
func (n networkFS) CreateFile(volume, path string) (writeCloser io.WriteCloser, err error) {
|
||||
writeURL := new(url.URL)
|
||||
writeURL.Scheme = "http" // TODO fix this.
|
||||
writeURL.Scheme = n.netScheme
|
||||
writeURL.Host = n.netAddr
|
||||
writeURL.Path = fmt.Sprintf("/minio/rpc/storage/upload/%s", urlpath.Join(volume, path))
|
||||
writeURL.Path = fmt.Sprintf("%s/upload/%s", storageRPCPath, urlpath.Join(volume, path))
|
||||
|
||||
contentType := "application/octet-stream"
|
||||
readCloser, writeCloser := io.Pipe()
|
||||
@ -149,11 +164,16 @@ func (n networkFS) CreateFile(volume, path string) (writeCloser io.WriteCloser,
|
||||
return
|
||||
}
|
||||
if resp != nil {
|
||||
if resp.StatusCode != http.StatusNotFound {
|
||||
readCloser.CloseWithError(errFileNotFound)
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
if resp.StatusCode == http.StatusNotFound {
|
||||
readCloser.CloseWithError(errFileNotFound)
|
||||
return
|
||||
}
|
||||
readCloser.CloseWithError(errors.New("Invalid response."))
|
||||
return
|
||||
}
|
||||
readCloser.CloseWithError(errors.New("Invalid response."))
|
||||
// Close the reader.
|
||||
readCloser.Close()
|
||||
}
|
||||
}()
|
||||
return writeCloser, nil
|
||||
@ -165,14 +185,7 @@ func (n networkFS) StatFile(volume, path string) (fileInfo FileInfo, err error)
|
||||
Vol: volume,
|
||||
Path: path,
|
||||
}, &fileInfo); err != nil {
|
||||
if err.Error() == errVolumeNotFound.Error() {
|
||||
return FileInfo{}, errVolumeNotFound
|
||||
} else if err.Error() == errFileNotFound.Error() {
|
||||
return FileInfo{}, errFileNotFound
|
||||
} else if err.Error() == errIsNotRegular.Error() {
|
||||
return FileInfo{}, errFileNotFound
|
||||
}
|
||||
return FileInfo{}, err
|
||||
return FileInfo{}, toStorageErr(err)
|
||||
}
|
||||
return fileInfo, nil
|
||||
}
|
||||
@ -180,9 +193,9 @@ func (n networkFS) StatFile(volume, path string) (fileInfo FileInfo, err error)
|
||||
// ReadFile - reads a file.
|
||||
func (n networkFS) ReadFile(volume string, path string, offset int64) (reader io.ReadCloser, err error) {
|
||||
readURL := new(url.URL)
|
||||
readURL.Scheme = "http" // TODO fix this.
|
||||
readURL.Scheme = n.netScheme
|
||||
readURL.Host = n.netAddr
|
||||
readURL.Path = fmt.Sprintf("/minio/rpc/storage/download/%s", urlpath.Join(volume, path))
|
||||
readURL.Path = fmt.Sprintf("%s/download/%s", storageRPCPath, urlpath.Join(volume, path))
|
||||
readQuery := make(url.Values)
|
||||
readQuery.Set("offset", strconv.FormatInt(offset, 10))
|
||||
readURL.RawQuery = readQuery.Encode()
|
||||
@ -190,11 +203,13 @@ func (n networkFS) ReadFile(volume string, path string, offset int64) (reader io
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
if resp.StatusCode == http.StatusNotFound {
|
||||
return nil, errFileNotFound
|
||||
if resp != nil {
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
if resp.StatusCode == http.StatusNotFound {
|
||||
return nil, errFileNotFound
|
||||
}
|
||||
return nil, errors.New("Invalid response")
|
||||
}
|
||||
return nil, errors.New("Invalid response")
|
||||
}
|
||||
return resp.Body, nil
|
||||
}
|
||||
@ -209,16 +224,10 @@ func (n networkFS) ListFiles(volume, prefix, marker string, recursive bool, coun
|
||||
Recursive: recursive,
|
||||
Count: count,
|
||||
}, &listFilesReply); err != nil {
|
||||
if err.Error() == errVolumeNotFound.Error() {
|
||||
return nil, true, errVolumeNotFound
|
||||
}
|
||||
return nil, true, err
|
||||
return nil, true, toStorageErr(err)
|
||||
}
|
||||
// List of files.
|
||||
files = listFilesReply.Files
|
||||
// EOF.
|
||||
eof = listFilesReply.EOF
|
||||
return files, eof, nil
|
||||
// Return successfully unmarshalled results.
|
||||
return listFilesReply.Files, listFilesReply.EOF, nil
|
||||
}
|
||||
|
||||
// DeleteFile - Delete a file at path.
|
||||
@ -228,12 +237,7 @@ func (n networkFS) DeleteFile(volume, path string) (err error) {
|
||||
Vol: volume,
|
||||
Path: path,
|
||||
}, &reply); err != nil {
|
||||
if err.Error() == errVolumeNotFound.Error() {
|
||||
return errVolumeNotFound
|
||||
} else if err.Error() == errFileNotFound.Error() {
|
||||
return errFileNotFound
|
||||
}
|
||||
return err
|
||||
return toStorageErr(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -26,7 +26,6 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/minio/minio/pkg/probe"
|
||||
"github.com/minio/minio/pkg/safe"
|
||||
"github.com/skyrings/skyring-common/tools/uuid"
|
||||
)
|
||||
|
||||
@ -72,6 +71,14 @@ func (o objectAPI) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarke
|
||||
if !IsValidObjectPrefix(prefix) {
|
||||
return ListMultipartsInfo{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: prefix})
|
||||
}
|
||||
if _, e := o.storage.StatVol(minioMetaVolume); e != nil {
|
||||
if e == errVolumeNotFound {
|
||||
e = o.storage.MakeVol(minioMetaVolume)
|
||||
if e != nil {
|
||||
return ListMultipartsInfo{}, probe.NewError(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Verify if delimiter is anything other than '/', which we do not support.
|
||||
if delimiter != "" && delimiter != slashPathSeparator {
|
||||
return ListMultipartsInfo{}, probe.NewError(UnsupportedDelimiter{
|
||||
@ -253,14 +260,14 @@ func (o objectAPI) NewMultipartUpload(bucket, object string) (string, *probe.Err
|
||||
return "", probe.NewError(e)
|
||||
}
|
||||
uploadID := uuid.String()
|
||||
uploadIDFile := path.Join(bucket, object, uploadID)
|
||||
if _, e = o.storage.StatFile(minioMetaVolume, uploadIDFile); e != nil {
|
||||
uploadIDPath := path.Join(bucket, object, uploadID)
|
||||
if _, e = o.storage.StatFile(minioMetaVolume, uploadIDPath); e != nil {
|
||||
if e != errFileNotFound {
|
||||
return "", probe.NewError(e)
|
||||
}
|
||||
// uploadIDFile doesn't exist, so create empty file to reserve the name
|
||||
// uploadIDPath doesn't exist, so create empty file to reserve the name
|
||||
var w io.WriteCloser
|
||||
if w, e = o.storage.CreateFile(minioMetaVolume, uploadIDFile); e == nil {
|
||||
if w, e = o.storage.CreateFile(minioMetaVolume, uploadIDPath); e == nil {
|
||||
if e = w.Close(); e != nil {
|
||||
return "", probe.NewError(e)
|
||||
}
|
||||
@ -269,19 +276,23 @@ func (o objectAPI) NewMultipartUpload(bucket, object string) (string, *probe.Err
|
||||
}
|
||||
return uploadID, nil
|
||||
}
|
||||
// uploadIDFile already exists.
|
||||
// uploadIDPath already exists.
|
||||
// loop again to try with different uuid generated.
|
||||
}
|
||||
}
|
||||
|
||||
func (o objectAPI) isUploadIDExist(bucket, object, uploadID string) (bool, error) {
|
||||
st, e := o.storage.StatFile(minioMetaVolume, path.Join(bucket, object, uploadID))
|
||||
// isUploadIDExists - verify if a given uploadID exists and is valid.
|
||||
func (o objectAPI) isUploadIDExists(bucket, object, uploadID string) (bool, error) {
|
||||
uploadIDPath := path.Join(bucket, object, uploadID)
|
||||
st, e := o.storage.StatFile(minioMetaVolume, uploadIDPath)
|
||||
if e != nil {
|
||||
// Upload id does not exist.
|
||||
if e == errFileNotFound {
|
||||
return false, nil
|
||||
}
|
||||
return false, e
|
||||
}
|
||||
// Upload id exists and is a regular file.
|
||||
return st.Mode.IsRegular(), nil
|
||||
}
|
||||
|
||||
@ -293,7 +304,7 @@ func (o objectAPI) PutObjectPart(bucket, object, uploadID string, partID int, si
|
||||
if !IsValidObjectName(object) {
|
||||
return "", probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
||||
}
|
||||
if status, e := o.isUploadIDExist(bucket, object, uploadID); e != nil {
|
||||
if status, e := o.isUploadIDExists(bucket, object, uploadID); e != nil {
|
||||
return "", probe.NewError(e)
|
||||
} else if !status {
|
||||
return "", probe.NewError(InvalidUploadID{UploadID: uploadID})
|
||||
@ -324,12 +335,12 @@ func (o objectAPI) PutObjectPart(bucket, object, uploadID string, partID int, si
|
||||
// Instantiate checksum hashers and create a multiwriter.
|
||||
if size > 0 {
|
||||
if _, e = io.CopyN(multiWriter, data, size); e != nil {
|
||||
fileWriter.(*safe.File).CloseAndRemove()
|
||||
safeCloseAndRemove(fileWriter)
|
||||
return "", probe.NewError(e)
|
||||
}
|
||||
} else {
|
||||
if _, e = io.Copy(multiWriter, data); e != nil {
|
||||
fileWriter.(*safe.File).CloseAndRemove()
|
||||
safeCloseAndRemove(fileWriter)
|
||||
return "", probe.NewError(e)
|
||||
}
|
||||
}
|
||||
@ -337,7 +348,7 @@ func (o objectAPI) PutObjectPart(bucket, object, uploadID string, partID int, si
|
||||
newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil))
|
||||
if md5Hex != "" {
|
||||
if newMD5Hex != md5Hex {
|
||||
fileWriter.(*safe.File).CloseAndRemove()
|
||||
safeCloseAndRemove(fileWriter)
|
||||
return "", probe.NewError(BadDigest{md5Hex, newMD5Hex})
|
||||
}
|
||||
}
|
||||
@ -356,7 +367,16 @@ func (o objectAPI) ListObjectParts(bucket, object, uploadID string, partNumberMa
|
||||
if !IsValidObjectName(object) {
|
||||
return ListPartsInfo{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
||||
}
|
||||
if status, e := o.isUploadIDExist(bucket, object, uploadID); e != nil {
|
||||
// Create minio meta volume, if it doesn't exist yet.
|
||||
if _, e := o.storage.StatVol(minioMetaVolume); e != nil {
|
||||
if e == errVolumeNotFound {
|
||||
e = o.storage.MakeVol(minioMetaVolume)
|
||||
if e != nil {
|
||||
return ListPartsInfo{}, probe.NewError(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
if status, e := o.isUploadIDExists(bucket, object, uploadID); e != nil {
|
||||
return ListPartsInfo{}, probe.NewError(e)
|
||||
} else if !status {
|
||||
return ListPartsInfo{}, probe.NewError(InvalidUploadID{UploadID: uploadID})
|
||||
@ -364,8 +384,11 @@ func (o objectAPI) ListObjectParts(bucket, object, uploadID string, partNumberMa
|
||||
result := ListPartsInfo{}
|
||||
marker := ""
|
||||
nextPartNumberMarker := 0
|
||||
uploadIDPath := path.Join(bucket, object, uploadID)
|
||||
// Figure out the marker for the next subsequent calls, if the
|
||||
// partNumberMarker is already set.
|
||||
if partNumberMarker > 0 {
|
||||
fileInfos, _, e := o.storage.ListFiles(minioMetaVolume, path.Join(bucket, object, uploadID)+"."+strconv.Itoa(partNumberMarker)+".", "", false, 1)
|
||||
fileInfos, _, e := o.storage.ListFiles(minioMetaVolume, uploadIDPath+"."+strconv.Itoa(partNumberMarker)+".", "", false, 1)
|
||||
if e != nil {
|
||||
return result, probe.NewError(e)
|
||||
}
|
||||
@ -374,7 +397,7 @@ func (o objectAPI) ListObjectParts(bucket, object, uploadID string, partNumberMa
|
||||
}
|
||||
marker = fileInfos[0].Name
|
||||
}
|
||||
fileInfos, eof, e := o.storage.ListFiles(minioMetaVolume, path.Join(bucket, object, uploadID)+".", marker, false, maxParts)
|
||||
fileInfos, eof, e := o.storage.ListFiles(minioMetaVolume, uploadIDPath+".", marker, false, maxParts)
|
||||
if e != nil {
|
||||
return result, probe.NewError(InvalidPart{})
|
||||
}
|
||||
@ -404,55 +427,89 @@ func (o objectAPI) ListObjectParts(bucket, object, uploadID string, partNumberMa
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (o objectAPI) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (ObjectInfo, *probe.Error) {
|
||||
// Create an s3 compatible MD5sum for complete multipart transaction.
|
||||
func makeS3MD5(md5Strs ...string) (string, *probe.Error) {
|
||||
var finalMD5Bytes []byte
|
||||
for _, md5Str := range md5Strs {
|
||||
md5Bytes, e := hex.DecodeString(md5Str)
|
||||
if e != nil {
|
||||
return "", probe.NewError(e)
|
||||
}
|
||||
finalMD5Bytes = append(finalMD5Bytes, md5Bytes...)
|
||||
}
|
||||
md5Hasher := md5.New()
|
||||
md5Hasher.Write(finalMD5Bytes)
|
||||
s3MD5 := fmt.Sprintf("%s-%d", hex.EncodeToString(md5Hasher.Sum(nil)), len(md5Strs))
|
||||
return s3MD5, nil
|
||||
}
|
||||
|
||||
func (o objectAPI) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (string, *probe.Error) {
|
||||
// Verify if bucket is valid.
|
||||
if !IsValidBucketName(bucket) {
|
||||
return ObjectInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
||||
return "", probe.NewError(BucketNameInvalid{Bucket: bucket})
|
||||
}
|
||||
if !IsValidObjectName(object) {
|
||||
return ObjectInfo{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
||||
return "", probe.NewError(ObjectNameInvalid{
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
})
|
||||
}
|
||||
if status, e := o.isUploadIDExist(bucket, object, uploadID); e != nil {
|
||||
return ObjectInfo{}, probe.NewError(e)
|
||||
if status, e := o.isUploadIDExists(bucket, object, uploadID); e != nil {
|
||||
return "", probe.NewError(e)
|
||||
} else if !status {
|
||||
return ObjectInfo{}, probe.NewError(InvalidUploadID{UploadID: uploadID})
|
||||
return "", probe.NewError(InvalidUploadID{UploadID: uploadID})
|
||||
}
|
||||
fileWriter, e := o.storage.CreateFile(bucket, object)
|
||||
if e != nil {
|
||||
return ObjectInfo{}, nil
|
||||
if e == errVolumeNotFound {
|
||||
return "", probe.NewError(BucketNotFound{
|
||||
Bucket: bucket,
|
||||
})
|
||||
} else if e == errIsNotRegular {
|
||||
return "", probe.NewError(ObjectExistsAsPrefix{
|
||||
Bucket: bucket,
|
||||
Prefix: object,
|
||||
})
|
||||
}
|
||||
return "", probe.NewError(e)
|
||||
}
|
||||
|
||||
var md5Sums []string
|
||||
for _, part := range parts {
|
||||
partSuffix := fmt.Sprintf("%s.%d.%s", uploadID, part.PartNumber, part.ETag)
|
||||
var fileReader io.ReadCloser
|
||||
fileReader, e = o.storage.ReadFile(minioMetaVolume, path.Join(bucket, object, partSuffix), 0)
|
||||
if e != nil {
|
||||
return ObjectInfo{}, probe.NewError(e)
|
||||
if e == errFileNotFound {
|
||||
return "", probe.NewError(InvalidPart{})
|
||||
}
|
||||
return "", probe.NewError(e)
|
||||
}
|
||||
_, e = io.Copy(fileWriter, fileReader)
|
||||
if e != nil {
|
||||
return ObjectInfo{}, probe.NewError(e)
|
||||
return "", probe.NewError(e)
|
||||
}
|
||||
e = fileReader.Close()
|
||||
if e != nil {
|
||||
return ObjectInfo{}, probe.NewError(e)
|
||||
return "", probe.NewError(e)
|
||||
}
|
||||
md5Sums = append(md5Sums, part.ETag)
|
||||
}
|
||||
e = fileWriter.Close()
|
||||
if e != nil {
|
||||
return ObjectInfo{}, probe.NewError(e)
|
||||
return "", probe.NewError(e)
|
||||
}
|
||||
fi, e := o.storage.StatFile(bucket, object)
|
||||
if e != nil {
|
||||
return ObjectInfo{}, probe.NewError(e)
|
||||
|
||||
// Save the s3 md5.
|
||||
s3MD5, err := makeS3MD5(md5Sums...)
|
||||
if err != nil {
|
||||
return "", err.Trace(md5Sums...)
|
||||
}
|
||||
|
||||
// Cleanup all the parts.
|
||||
o.removeMultipartUpload(bucket, object, uploadID)
|
||||
return ObjectInfo{
|
||||
Bucket: bucket,
|
||||
Name: object,
|
||||
ModTime: fi.ModTime,
|
||||
Size: fi.Size,
|
||||
IsDir: false,
|
||||
}, nil
|
||||
|
||||
return s3MD5, nil
|
||||
}
|
||||
|
||||
func (o objectAPI) removeMultipartUpload(bucket, object, uploadID string) *probe.Error {
|
||||
@ -465,8 +522,8 @@ func (o objectAPI) removeMultipartUpload(bucket, object, uploadID string) *probe
|
||||
}
|
||||
marker := ""
|
||||
for {
|
||||
uploadIDFile := path.Join(bucket, object, uploadID)
|
||||
fileInfos, eof, e := o.storage.ListFiles(minioMetaVolume, uploadIDFile, marker, false, 1000)
|
||||
uploadIDPath := path.Join(bucket, object, uploadID)
|
||||
fileInfos, eof, e := o.storage.ListFiles(minioMetaVolume, uploadIDPath, marker, false, 1000)
|
||||
if e != nil {
|
||||
return probe.NewError(ObjectNotFound{Bucket: bucket, Object: object})
|
||||
}
|
||||
@ -489,14 +546,14 @@ func (o objectAPI) AbortMultipartUpload(bucket, object, uploadID string) *probe.
|
||||
if !IsValidObjectName(object) {
|
||||
return probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
||||
}
|
||||
if status, e := o.isUploadIDExist(bucket, object, uploadID); e != nil {
|
||||
if status, e := o.isUploadIDExists(bucket, object, uploadID); e != nil {
|
||||
return probe.NewError(e)
|
||||
} else if !status {
|
||||
return probe.NewError(InvalidUploadID{UploadID: uploadID})
|
||||
}
|
||||
e := o.removeMultipartUpload(bucket, object, uploadID)
|
||||
if e != nil {
|
||||
return e.Trace()
|
||||
err := o.removeMultipartUpload(bucket, object, uploadID)
|
||||
if err != nil {
|
||||
return err.Trace(bucket, object, uploadID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ package main
|
||||
import (
|
||||
"crypto/md5"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
@ -67,7 +68,7 @@ func (o objectAPI) GetBucketInfo(bucket string) (BucketInfo, *probe.Error) {
|
||||
return BucketInfo{}, probe.NewError(e)
|
||||
}
|
||||
return BucketInfo{
|
||||
Name: vi.Name,
|
||||
Name: bucket,
|
||||
Created: vi.Created,
|
||||
}, nil
|
||||
}
|
||||
@ -102,6 +103,8 @@ func (o objectAPI) DeleteBucket(bucket string) *probe.Error {
|
||||
if e := o.storage.DeleteVol(bucket); e != nil {
|
||||
if e == errVolumeNotFound {
|
||||
return probe.NewError(BucketNotFound{Bucket: bucket})
|
||||
} else if e == errVolumeNotEmpty {
|
||||
return probe.NewError(BucketNotEmpty{Bucket: bucket})
|
||||
}
|
||||
return probe.NewError(e)
|
||||
}
|
||||
@ -161,8 +164,8 @@ func (o objectAPI) GetObjectInfo(bucket, object string) (ObjectInfo, *probe.Erro
|
||||
}
|
||||
}
|
||||
return ObjectInfo{
|
||||
Bucket: fi.Volume,
|
||||
Name: fi.Name,
|
||||
Bucket: bucket,
|
||||
Name: object,
|
||||
ModTime: fi.ModTime,
|
||||
Size: fi.Size,
|
||||
IsDir: fi.Mode.IsDir(),
|
||||
@ -171,13 +174,28 @@ func (o objectAPI) GetObjectInfo(bucket, object string) (ObjectInfo, *probe.Erro
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (o objectAPI) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (ObjectInfo, *probe.Error) {
|
||||
// safeCloseAndRemove - safely closes and removes underlying temporary
|
||||
// file writer if possible.
|
||||
func safeCloseAndRemove(writer io.WriteCloser) error {
|
||||
// If writer is a safe file, Attempt to close and remove.
|
||||
safeWriter, ok := writer.(*safe.File)
|
||||
if ok {
|
||||
return safeWriter.CloseAndRemove()
|
||||
}
|
||||
pipeWriter, ok := writer.(*io.PipeWriter)
|
||||
if ok {
|
||||
return pipeWriter.CloseWithError(errors.New("Close and error out."))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o objectAPI) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (string, *probe.Error) {
|
||||
// Verify if bucket is valid.
|
||||
if !IsValidBucketName(bucket) {
|
||||
return ObjectInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket})
|
||||
return "", probe.NewError(BucketNameInvalid{Bucket: bucket})
|
||||
}
|
||||
if !IsValidObjectName(object) {
|
||||
return ObjectInfo{}, probe.NewError(ObjectNameInvalid{
|
||||
return "", probe.NewError(ObjectNameInvalid{
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
})
|
||||
@ -185,16 +203,16 @@ func (o objectAPI) PutObject(bucket string, object string, size int64, data io.R
|
||||
fileWriter, e := o.storage.CreateFile(bucket, object)
|
||||
if e != nil {
|
||||
if e == errVolumeNotFound {
|
||||
return ObjectInfo{}, probe.NewError(BucketNotFound{
|
||||
return "", probe.NewError(BucketNotFound{
|
||||
Bucket: bucket,
|
||||
})
|
||||
} else if e == errIsNotRegular {
|
||||
return ObjectInfo{}, probe.NewError(ObjectExistsAsPrefix{
|
||||
return "", probe.NewError(ObjectExistsAsPrefix{
|
||||
Bucket: bucket,
|
||||
Prefix: object,
|
||||
})
|
||||
}
|
||||
return ObjectInfo{}, probe.NewError(e)
|
||||
return "", probe.NewError(e)
|
||||
}
|
||||
|
||||
// Initialize md5 writer.
|
||||
@ -206,13 +224,13 @@ func (o objectAPI) PutObject(bucket string, object string, size int64, data io.R
|
||||
// Instantiate checksum hashers and create a multiwriter.
|
||||
if size > 0 {
|
||||
if _, e = io.CopyN(multiWriter, data, size); e != nil {
|
||||
fileWriter.(*safe.File).CloseAndRemove()
|
||||
return ObjectInfo{}, probe.NewError(e)
|
||||
safeCloseAndRemove(fileWriter)
|
||||
return "", probe.NewError(e)
|
||||
}
|
||||
} else {
|
||||
if _, e = io.Copy(multiWriter, data); e != nil {
|
||||
fileWriter.(*safe.File).CloseAndRemove()
|
||||
return ObjectInfo{}, probe.NewError(e)
|
||||
safeCloseAndRemove(fileWriter)
|
||||
return "", probe.NewError(e)
|
||||
}
|
||||
}
|
||||
|
||||
@ -224,35 +242,17 @@ func (o objectAPI) PutObject(bucket string, object string, size int64, data io.R
|
||||
}
|
||||
if md5Hex != "" {
|
||||
if newMD5Hex != md5Hex {
|
||||
fileWriter.(*safe.File).CloseAndRemove()
|
||||
return ObjectInfo{}, probe.NewError(BadDigest{md5Hex, newMD5Hex})
|
||||
safeCloseAndRemove(fileWriter)
|
||||
return "", probe.NewError(BadDigest{md5Hex, newMD5Hex})
|
||||
}
|
||||
}
|
||||
e = fileWriter.Close()
|
||||
if e != nil {
|
||||
return ObjectInfo{}, probe.NewError(e)
|
||||
}
|
||||
fi, e := o.storage.StatFile(bucket, object)
|
||||
if e != nil {
|
||||
return ObjectInfo{}, probe.NewError(e)
|
||||
return "", probe.NewError(e)
|
||||
}
|
||||
|
||||
contentType := "application/octet-stream"
|
||||
if objectExt := filepath.Ext(object); objectExt != "" {
|
||||
content, ok := mimedb.DB[strings.ToLower(strings.TrimPrefix(objectExt, "."))]
|
||||
if ok {
|
||||
contentType = content.ContentType
|
||||
}
|
||||
}
|
||||
|
||||
return ObjectInfo{
|
||||
Bucket: fi.Volume,
|
||||
Name: fi.Name,
|
||||
ModTime: fi.ModTime,
|
||||
Size: fi.Size,
|
||||
ContentType: contentType,
|
||||
MD5Sum: newMD5Hex,
|
||||
}, nil
|
||||
// Return md5sum.
|
||||
return newMD5Hex, nil
|
||||
}
|
||||
|
||||
func (o objectAPI) DeleteObject(bucket, object string) *probe.Error {
|
||||
@ -267,6 +267,12 @@ func (o objectAPI) DeleteObject(bucket, object string) *probe.Error {
|
||||
if e == errVolumeNotFound {
|
||||
return probe.NewError(BucketNotFound{Bucket: bucket})
|
||||
}
|
||||
if e == errFileNotFound {
|
||||
return probe.NewError(ObjectNotFound{
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
})
|
||||
}
|
||||
return probe.NewError(e)
|
||||
}
|
||||
return nil
|
||||
@ -311,10 +317,13 @@ func (o objectAPI) ListObjects(bucket, prefix, marker, delimiter string, maxKeys
|
||||
}
|
||||
result := ListObjectsInfo{IsTruncated: !eof}
|
||||
for _, fileInfo := range fileInfos {
|
||||
result.NextMarker = fileInfo.Name
|
||||
if fileInfo.Mode.IsDir() {
|
||||
result.Prefixes = append(result.Prefixes, fileInfo.Name)
|
||||
continue
|
||||
// With delimiter set we fill in NextMarker and Prefixes.
|
||||
if delimiter == slashPathSeparator {
|
||||
result.NextMarker = fileInfo.Name
|
||||
if fileInfo.Mode.IsDir() {
|
||||
result.Prefixes = append(result.Prefixes, fileInfo.Name)
|
||||
continue
|
||||
}
|
||||
}
|
||||
result.Objects = append(result.Objects, ObjectInfo{
|
||||
Name: fileInfo.Name,
|
||||
|
@ -457,9 +457,8 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
|
||||
metadata["md5Sum"] = hex.EncodeToString(md5Bytes)
|
||||
|
||||
// Create the object.
|
||||
objInfo, err = api.ObjectAPI.PutObject(bucket, object, size, readCloser, metadata)
|
||||
md5Sum, err := api.ObjectAPI.PutObject(bucket, object, size, readCloser, metadata)
|
||||
if err != nil {
|
||||
errorIf(err.Trace(), "PutObject failed.", nil)
|
||||
switch err.ToGoError().(type) {
|
||||
case RootPathFull:
|
||||
writeErrorResponse(w, r, ErrRootPathFull, r.URL.Path)
|
||||
@ -474,11 +473,20 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re
|
||||
case ObjectExistsAsPrefix:
|
||||
writeErrorResponse(w, r, ErrObjectExistsAsPrefix, r.URL.Path)
|
||||
default:
|
||||
errorIf(err.Trace(), "PutObject failed.", nil)
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
}
|
||||
return
|
||||
}
|
||||
response := generateCopyObjectResponse(objInfo.MD5Sum, objInfo.ModTime)
|
||||
|
||||
objInfo, err = api.ObjectAPI.GetObjectInfo(bucket, object)
|
||||
if err != nil {
|
||||
errorIf(err.Trace(), "GetObjectInfo failed.", nil)
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
return
|
||||
}
|
||||
|
||||
response := generateCopyObjectResponse(md5Sum, objInfo.ModTime)
|
||||
encodedSuccessResponse := encodeResponse(response)
|
||||
// write headers
|
||||
setCommonHeaders(w)
|
||||
@ -613,7 +621,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
|
||||
return
|
||||
}
|
||||
|
||||
var objInfo ObjectInfo
|
||||
var md5Sum string
|
||||
switch getRequestAuthType(r) {
|
||||
default:
|
||||
// For all unknown auth types return error.
|
||||
@ -626,7 +634,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
|
||||
return
|
||||
}
|
||||
// Create anonymous object.
|
||||
objInfo, err = api.ObjectAPI.PutObject(bucket, object, size, r.Body, nil)
|
||||
md5Sum, err = api.ObjectAPI.PutObject(bucket, object, size, r.Body, nil)
|
||||
case authTypePresigned, authTypeSigned:
|
||||
// Initialize a pipe for data pipe line.
|
||||
reader, writer := io.Pipe()
|
||||
@ -665,7 +673,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
|
||||
// Make sure we hex encode here.
|
||||
metadata["md5"] = hex.EncodeToString(md5Bytes)
|
||||
// Create object.
|
||||
objInfo, err = api.ObjectAPI.PutObject(bucket, object, size, reader, metadata)
|
||||
md5Sum, err = api.ObjectAPI.PutObject(bucket, object, size, reader, metadata)
|
||||
}
|
||||
if err != nil {
|
||||
errorIf(err.Trace(), "PutObject failed.", nil)
|
||||
@ -693,8 +701,8 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req
|
||||
}
|
||||
return
|
||||
}
|
||||
if objInfo.MD5Sum != "" {
|
||||
w.Header().Set("ETag", "\""+objInfo.MD5Sum+"\"")
|
||||
if md5Sum != "" {
|
||||
w.Header().Set("ETag", "\""+md5Sum+"\"")
|
||||
}
|
||||
writeSuccessResponse(w, nil)
|
||||
}
|
||||
@ -965,6 +973,8 @@ func (api objectAPIHandlers) ListObjectPartsHandler(w http.ResponseWriter, r *ht
|
||||
writeErrorResponse(w, r, ErrNoSuchKey, r.URL.Path)
|
||||
case InvalidUploadID:
|
||||
writeErrorResponse(w, r, ErrNoSuchUpload, r.URL.Path)
|
||||
case InvalidPart:
|
||||
writeErrorResponse(w, r, ErrInvalidPart, r.URL.Path)
|
||||
default:
|
||||
writeErrorResponse(w, r, ErrInternalError, r.URL.Path)
|
||||
}
|
||||
@ -987,7 +997,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
|
||||
// Get upload id.
|
||||
uploadID, _, _, _ := getObjectResources(r.URL.Query())
|
||||
|
||||
var objInfo ObjectInfo
|
||||
var md5Sum string
|
||||
var err *probe.Error
|
||||
switch getRequestAuthType(r) {
|
||||
default:
|
||||
@ -1030,7 +1040,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
|
||||
completeParts = append(completeParts, part)
|
||||
}
|
||||
// Complete multipart upload.
|
||||
objInfo, err = api.ObjectAPI.CompleteMultipartUpload(bucket, object, uploadID, completeParts)
|
||||
md5Sum, err = api.ObjectAPI.CompleteMultipartUpload(bucket, object, uploadID, completeParts)
|
||||
if err != nil {
|
||||
errorIf(err.Trace(), "CompleteMultipartUpload failed.", nil)
|
||||
switch err.ToGoError().(type) {
|
||||
@ -1058,7 +1068,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
|
||||
// Get object location.
|
||||
location := getLocation(r)
|
||||
// Generate complete multipart response.
|
||||
response := generateCompleteMultpartUploadResponse(bucket, object, location, objInfo.MD5Sum)
|
||||
response := generateCompleteMultpartUploadResponse(bucket, object, location, md5Sum)
|
||||
encodedSuccessResponse := encodeResponse(response)
|
||||
// Write headers.
|
||||
setCommonHeaders(w)
|
||||
|
@ -81,8 +81,8 @@ func IsValidObjectName(object string) bool {
|
||||
// IsValidObjectPrefix verifies whether the prefix is a valid object name.
|
||||
// Its valid to have a empty prefix.
|
||||
func IsValidObjectPrefix(object string) bool {
|
||||
// Prefix can be empty.
|
||||
if object == "" {
|
||||
// Prefix can be empty or "/".
|
||||
if object == "" || object == "/" {
|
||||
return true
|
||||
}
|
||||
// Verify if prefix is a valid object name.
|
||||
|
@ -77,9 +77,9 @@ func testMultipartObjectCreation(c *check.C, create func() *objectAPI) {
|
||||
c.Assert(calculatedMD5sum, check.Equals, expectedMD5Sumhex)
|
||||
completedParts.Parts = append(completedParts.Parts, completePart{PartNumber: i, ETag: calculatedMD5sum})
|
||||
}
|
||||
objInfo, err := obj.CompleteMultipartUpload("bucket", "key", uploadID, completedParts.Parts)
|
||||
md5Sum, err := obj.CompleteMultipartUpload("bucket", "key", uploadID, completedParts.Parts)
|
||||
c.Assert(err, check.IsNil)
|
||||
c.Assert(objInfo.MD5Sum, check.Equals, "3605d84b1c43b1a664aa7c0d5082d271-10")
|
||||
c.Assert(md5Sum, check.Equals, "3605d84b1c43b1a664aa7c0d5082d271-10")
|
||||
}
|
||||
|
||||
func testMultipartObjectAbort(c *check.C, create func() *objectAPI) {
|
||||
@ -133,9 +133,9 @@ func testMultipleObjectCreation(c *check.C, create func() *objectAPI) {
|
||||
objects[key] = []byte(randomString)
|
||||
metadata := make(map[string]string)
|
||||
metadata["md5Sum"] = expectedMD5Sumhex
|
||||
objInfo, err := obj.PutObject("bucket", key, int64(len(randomString)), bytes.NewBufferString(randomString), metadata)
|
||||
md5Sum, err := obj.PutObject("bucket", key, int64(len(randomString)), bytes.NewBufferString(randomString), metadata)
|
||||
c.Assert(err, check.IsNil)
|
||||
c.Assert(objInfo.MD5Sum, check.Equals, expectedMD5Sumhex)
|
||||
c.Assert(md5Sum, check.Equals, expectedMD5Sumhex)
|
||||
}
|
||||
|
||||
for key, value := range objects {
|
||||
|
@ -33,6 +33,9 @@ var errIsNotRegular = errors.New("Not a regular file type.")
|
||||
// errVolumeNotFound - cannot find the volume.
|
||||
var errVolumeNotFound = errors.New("Volume not found.")
|
||||
|
||||
// errVolumeNotEmpty - volume not empty.
|
||||
var errVolumeNotEmpty = errors.New("Volume is not empty.")
|
||||
|
||||
// errVolumeAccessDenied - cannot access volume, insufficient
|
||||
// permissions.
|
||||
var errVolumeAccessDenied = errors.New("Volume access denied.")
|
||||
|
@ -4,11 +4,9 @@ import (
|
||||
"io"
|
||||
"net/http"
|
||||
"net/rpc"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
router "github.com/gorilla/mux"
|
||||
"github.com/minio/minio/pkg/safe"
|
||||
)
|
||||
|
||||
// Storage server implements rpc primitives to facilitate exporting a
|
||||
@ -58,8 +56,12 @@ func (s *storageServer) ListFilesHandler(arg *ListFilesArgs, reply *ListFilesRep
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Fill reply structure.
|
||||
reply.Files = files
|
||||
reply.EOF = eof
|
||||
|
||||
// Return success.
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -95,12 +97,18 @@ func registerStorageRPCRouter(mux *router.Router, storageAPI StorageAPI) {
|
||||
path := vars["path"]
|
||||
writeCloser, err := stServer.storage.CreateFile(volume, path)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
httpErr := http.StatusInternalServerError
|
||||
if err == errVolumeNotFound {
|
||||
httpErr = http.StatusNotFound
|
||||
} else if err == errIsNotRegular {
|
||||
httpErr = http.StatusConflict
|
||||
}
|
||||
http.Error(w, err.Error(), httpErr)
|
||||
return
|
||||
}
|
||||
reader := r.Body
|
||||
if _, err = io.Copy(writeCloser, reader); err != nil {
|
||||
writeCloser.(*safe.File).CloseAndRemove()
|
||||
safeCloseAndRemove(writeCloser)
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
@ -120,14 +128,22 @@ func registerStorageRPCRouter(mux *router.Router, storageAPI StorageAPI) {
|
||||
readCloser, err := stServer.storage.ReadFile(volume, path, offset)
|
||||
if err != nil {
|
||||
httpErr := http.StatusBadRequest
|
||||
if os.IsNotExist(err) {
|
||||
if err == errVolumeNotFound {
|
||||
httpErr = http.StatusNotFound
|
||||
} else if err == errFileNotFound {
|
||||
httpErr = http.StatusNotFound
|
||||
}
|
||||
http.Error(w, err.Error(), httpErr)
|
||||
return
|
||||
}
|
||||
|
||||
// Copy reader to writer.
|
||||
io.Copy(w, readCloser)
|
||||
|
||||
// Flush out any remaining buffers to client.
|
||||
w.(http.Flusher).Flush()
|
||||
|
||||
// Close the reader.
|
||||
readCloser.Close()
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user