diff --git a/bucket-handlers.go b/bucket-handlers.go index 1ce48bd83..22401f1ed 100644 --- a/bucket-handlers.go +++ b/bucket-handlers.go @@ -558,7 +558,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h writeErrorResponse(w, r, apiErr, r.URL.Path) return } - objInfo, err := api.ObjectAPI.PutObject(bucket, object, -1, fileBody, nil) + md5Sum, err := api.ObjectAPI.PutObject(bucket, object, -1, fileBody, nil) if err != nil { errorIf(err.Trace(), "PutObject failed.", nil) switch err.ToGoError().(type) { @@ -577,8 +577,8 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h } return } - if objInfo.MD5Sum != "" { - w.Header().Set("ETag", "\""+objInfo.MD5Sum+"\"") + if md5Sum != "" { + w.Header().Set("ETag", "\""+md5Sum+"\"") } writeSuccessResponse(w, nil) } diff --git a/fs-dir-common.go b/fs-dir-common.go index 569db3705..6db10d0e8 100644 --- a/fs-dir-common.go +++ b/fs-dir-common.go @@ -127,12 +127,25 @@ func treeWalk(bucketDir, prefixDir, entryPrefixMatch, marker string, recursive b } } - // readDirAll returns entries that begins with entryPrefixMatch - dirents, err := readDirAll(filepath.Join(bucketDir, prefixDir), entryPrefixMatch) + // Entry prefix match function. + prefixMatchFn := func(dirent fsDirent) bool { + if dirent.IsDir() || dirent.IsRegular() { + // Does dirent name has reserved prefixes or suffixes. + hasReserved := hasReservedPrefix(dirent.name) || hasReservedSuffix(dirent.name) + // All dirents which match prefix and do not have reserved + // keywords in them are valid entries. + return strings.HasPrefix(dirent.name, entryPrefixMatch) && !hasReserved + } + return false + } + + // scandir returns entries that begins with entryPrefixMatch + dirents, err := scandir(filepath.Join(bucketDir, prefixDir), prefixMatchFn, true) if err != nil { send(treeWalkResult{err: err}) return false } + // example: // If markerDir="four/" searchDirents() returns the index of "four/" in the sorted // dirents list. We skip all the dirent entries till "four/" diff --git a/fs-dir-nix.go b/fs-dir-nix.go index 68b4e2bd7..e50970519 100644 --- a/fs-dir-nix.go +++ b/fs-dir-nix.go @@ -23,7 +23,6 @@ import ( "path/filepath" "runtime" "sort" - "strings" "syscall" "unsafe" ) @@ -48,7 +47,7 @@ func clen(n []byte) int { // parseDirents - inspired from // https://golang.org/src/syscall/syscall_.go -func parseDirents(buf []byte) []fsDirent { +func parseDirents(dirPath string, buf []byte) []fsDirent { bufidx := 0 dirents := []fsDirent{} for bufidx < len(buf) { @@ -87,7 +86,15 @@ func parseDirents(buf []byte) []fsDirent { case syscall.DT_SOCK: mode = os.ModeSocket case syscall.DT_UNKNOWN: - mode = 0xffffffff + // On Linux XFS does not implement d_type for on disk + // format << v5. Fall back to Stat(). + if fi, err := os.Stat(filepath.Join(dirPath, name)); err == nil { + mode = fi.Mode() + } else { + // Caller listing would fail, if Stat failed but we + // won't crash the server. + mode = 0xffffffff + } } dirents = append(dirents, fsDirent{ @@ -98,38 +105,6 @@ func parseDirents(buf []byte) []fsDirent { return dirents } -// Read all directory entries, returns a list of lexically sorted -// entries. -func readDirAll(readDirPath, entryPrefixMatch string) ([]fsDirent, error) { - buf := make([]byte, readDirentBufSize) - f, err := os.Open(readDirPath) - if err != nil { - return nil, err - } - defer f.Close() - dirents := []fsDirent{} - for { - nbuf, err := syscall.ReadDirent(int(f.Fd()), buf) - if err != nil { - return nil, err - } - if nbuf <= 0 { - break - } - for _, dirent := range parseDirents(buf[:nbuf]) { - if dirent.IsDir() { - dirent.name += string(os.PathSeparator) - dirent.size = 0 - } - if strings.HasPrefix(dirent.name, entryPrefixMatch) { - dirents = append(dirents, dirent) - } - } - } - sort.Sort(byDirentName(dirents)) - return dirents, nil -} - // scans the directory dirPath, calling filter() on each directory // entry. Entries for which filter() returns true are stored, lexically // sorted using sort.Sort(). If filter is NULL, all entries are selected. @@ -152,12 +127,13 @@ func scandir(dirPath string, filter func(fsDirent) bool, namesOnly bool) ([]fsDi if nbuf <= 0 { break } - for _, dirent := range parseDirents(buf[:nbuf]) { + for _, dirent := range parseDirents(dirPath, buf[:nbuf]) { if !namesOnly { dirent.name = filepath.Join(dirPath, dirent.name) } if dirent.IsDir() { dirent.name += string(os.PathSeparator) + dirent.size = 0 } if filter == nil || filter(dirent) { dirents = append(dirents, dirent) diff --git a/fs-dir-others.go b/fs-dir-others.go index 93d5b9429..210fa2361 100644 --- a/fs-dir-others.go +++ b/fs-dir-others.go @@ -23,46 +23,8 @@ import ( "os" "path/filepath" "sort" - "strings" ) -// Read all directory entries, returns a list of lexically sorted entries. -func readDirAll(readDirPath, entryPrefixMatch string) ([]fsDirent, error) { - f, err := os.Open(readDirPath) - if err != nil { - return nil, err - } - defer f.Close() - var dirents []fsDirent - for { - fis, err := f.Readdir(1000) - if err != nil { - if err == io.EOF { - break - } - return nil, err - } - for _, fi := range fis { - dirent := fsDirent{ - name: fi.Name(), - modTime: fi.ModTime(), - size: fi.Size(), - mode: fi.Mode(), - } - if dirent.IsDir() { - dirent.name += string(os.PathSeparator) - dirent.size = 0 - } - if strings.HasPrefix(fi.Name(), entryPrefixMatch) { - dirents = append(dirents, dirent) - } - } - } - // Sort dirents. - sort.Sort(byDirentName(dirents)) - return dirents, nil -} - // scans the directory dirPath, calling filter() on each directory // entry. Entries for which filter() returns true are stored, lexically // sorted using sort.Sort(). If filter is NULL, all entries are selected. diff --git a/fs-utils.go b/fs-utils.go index 14ac6f1c7..bf3da0a3b 100644 --- a/fs-utils.go +++ b/fs-utils.go @@ -18,6 +18,8 @@ package main import ( "regexp" + "runtime" + "strings" "unicode/utf8" ) @@ -27,7 +29,17 @@ var validVolname = regexp.MustCompile(`^.{3,63}$`) // isValidVolname verifies a volname name in accordance with object // layer requirements. func isValidVolname(volname string) bool { - return validVolname.MatchString(volname) + if !validVolname.MatchString(volname) { + return false + } + switch runtime.GOOS { + case "windows": + // Volname shouldn't have reserved characters on windows in it. + return !strings.ContainsAny(volname, "/\\:*?\"<>|") + default: + // Volname shouldn't have '/' in it. + return !strings.ContainsAny(volname, "/") + } } // Keeping this as lower bound value supporting Linux, Darwin and Windows operating systems. @@ -54,3 +66,35 @@ func isValidPrefix(prefix string) bool { // Verify if prefix is a valid path. return isValidPath(prefix) } + +// List of reserved words for files, includes old and new ones. +var reservedKeywords = []string{ + "$multiparts", + "$tmpobject", + "$tmpfile", + // Add new reserved words if any used in future. +} + +// hasReservedPrefix - returns true if name has a reserved keyword suffixed. +func hasReservedSuffix(name string) (isReserved bool) { + for _, reservedKey := range reservedKeywords { + if strings.HasSuffix(name, reservedKey) { + isReserved = true + break + } + isReserved = false + } + return isReserved +} + +// hasReservedPrefix - has reserved prefix. +func hasReservedPrefix(name string) (isReserved bool) { + for _, reservedKey := range reservedKeywords { + if strings.HasPrefix(name, reservedKey) { + isReserved = true + break + } + isReserved = false + } + return isReserved +} diff --git a/fs.go b/fs.go index 36ec72f4f..e1af593f4 100644 --- a/fs.go +++ b/fs.go @@ -18,7 +18,6 @@ package main import ( "io" - "io/ioutil" "os" "path/filepath" "strings" @@ -46,7 +45,6 @@ type fsStorage struct { diskPath string diskInfo disk.Info minFreeDisk int64 - rwLock *sync.RWMutex listObjectMap map[listParams][]*treeWalker listObjectMapMutex *sync.Mutex } @@ -98,7 +96,6 @@ func newFS(diskPath string) (StorageAPI, error) { minFreeDisk: 5, // Minimum 5% disk should be free. listObjectMap: make(map[listParams][]*treeWalker), listObjectMapMutex: &sync.Mutex{}, - rwLock: &sync.RWMutex{}, } return fs, nil } @@ -121,65 +118,6 @@ func checkDiskFree(diskPath string, minFreeDisk int64) (err error) { return nil } -// checkVolumeArg - will convert incoming volume names to -// corresponding valid volume names on the backend in a platform -// compatible way for all operating systems. If volume is not found -// an error is generated. -func (s fsStorage) checkVolumeArg(volume string) (string, error) { - if !isValidVolname(volume) { - return "", errInvalidArgument - } - volumeDir := filepath.Join(s.diskPath, volume) - _, err := os.Stat(volumeDir) - if err == nil { - return volumeDir, nil - } - if os.IsNotExist(err) { - var volumes []os.FileInfo - volumes, err = ioutil.ReadDir(s.diskPath) - if err != nil { - return volumeDir, errVolumeNotFound - } - for _, vol := range volumes { - if vol.IsDir() { - // Verify if lowercase version of the volume - // is equal to the incoming volume, then use the proper name. - if strings.ToLower(vol.Name()) == volume { - volumeDir = filepath.Join(s.diskPath, vol.Name()) - return volumeDir, nil - } - } - } - return volumeDir, errVolumeNotFound - } else if os.IsPermission(err) { - return volumeDir, errVolumeAccessDenied - } - return volumeDir, err -} - -// Make a volume entry. -func (s fsStorage) MakeVol(volume string) (err error) { - volumeDir, err := s.checkVolumeArg(volume) - if err == nil { - // Volume already exists, return error. - return errVolumeExists - } - - // Validate if disk is free. - if e := checkDiskFree(s.diskPath, s.minFreeDisk); e != nil { - return e - } - - // If volume not found create it. - if err == errVolumeNotFound { - // Make a volume entry. - return os.Mkdir(volumeDir, 0700) - } - - // For all other errors return here. - return err -} - // removeDuplicateVols - remove duplicate volumes. func removeDuplicateVols(vols []VolInfo) []VolInfo { length := len(vols) - 1 @@ -201,38 +139,115 @@ func removeDuplicateVols(vols []VolInfo) []VolInfo { return vols } -// ListVols - list volumes. -func (s fsStorage) ListVols() (volsInfo []VolInfo, err error) { - files, err := ioutil.ReadDir(s.diskPath) +// gets all the unique directories from diskPath. +func getAllUniqueVols(dirPath string) ([]VolInfo, error) { + volumeFn := func(dirent fsDirent) bool { + // Return all directories. + return dirent.IsDir() + } + namesOnly := false // Returned dirent names are absolute. + dirents, err := scandir(dirPath, volumeFn, namesOnly) if err != nil { return nil, err } - for _, file := range files { - if !file.IsDir() { - // If not directory, ignore all file types. - continue + var volsInfo []VolInfo + for _, dirent := range dirents { + fi, err := os.Stat(dirent.name) + if err != nil { + return nil, err } + volsInfo = append(volsInfo, VolInfo{ + Name: fi.Name(), + // As os.Stat() doesn't carry other than ModTime(), use + // ModTime() as CreatedTime. + Created: fi.ModTime(), + }) + } + volsInfo = removeDuplicateVols(volsInfo) + return volsInfo, nil +} + +// getVolumeDir - will convert incoming volume names to +// corresponding valid volume names on the backend in a platform +// compatible way for all operating systems. If volume is not found +// an error is generated. +func (s fsStorage) getVolumeDir(volume string) (string, error) { + if !isValidVolname(volume) { + return "", errInvalidArgument + } + volumeDir := filepath.Join(s.diskPath, volume) + _, err := os.Stat(volumeDir) + if err == nil { + return volumeDir, nil + } + if os.IsNotExist(err) { + var volsInfo []VolInfo + volsInfo, err = getAllUniqueVols(s.diskPath) + if err != nil { + return volumeDir, errVolumeNotFound + } + for _, vol := range volsInfo { + // Verify if lowercase version of the volume + // is equal to the incoming volume, then use the proper name. + if strings.ToLower(vol.Name) == volume { + volumeDir = filepath.Join(s.diskPath, vol.Name) + return volumeDir, nil + } + } + return volumeDir, errVolumeNotFound + } else if os.IsPermission(err) { + return volumeDir, errVolumeAccessDenied + } + return volumeDir, err +} + +// Make a volume entry. +func (s fsStorage) MakeVol(volume string) (err error) { + volumeDir, err := s.getVolumeDir(volume) + if err == nil { + // Volume already exists, return error. + return errVolumeExists + } + + // Validate if disk is free. + if e := checkDiskFree(s.diskPath, s.minFreeDisk); e != nil { + return e + } + + // If volume not found create it. + if err == errVolumeNotFound { + // Make a volume entry. + return os.Mkdir(volumeDir, 0700) + } + + // For all other errors return here. + return err +} + +// ListVols - list volumes. +func (s fsStorage) ListVols() (volsInfo []VolInfo, err error) { + volsInfo, err = getAllUniqueVols(s.diskPath) + if err != nil { + return nil, err + } + for _, vol := range volsInfo { // Volname on case sensitive fs backends can come in as // capitalized, but object layer cannot consume it // directly. Convert it as we see fit. - volName := strings.ToLower(file.Name()) - // Modtime is used as created time. - createdTime := file.ModTime() + volName := strings.ToLower(vol.Name) volInfo := VolInfo{ Name: volName, - Created: createdTime, + Created: vol.Created, } volsInfo = append(volsInfo, volInfo) } - // Remove duplicated volume entries. - volsInfo = removeDuplicateVols(volsInfo) return volsInfo, nil } // StatVol - get volume info. func (s fsStorage) StatVol(volume string) (volInfo VolInfo, err error) { // Verify if volume is valid and it exists. - volumeDir, err := s.checkVolumeArg(volume) + volumeDir, err := s.getVolumeDir(volume) if err != nil { return VolInfo{}, err } @@ -245,8 +260,8 @@ func (s fsStorage) StatVol(volume string) (volInfo VolInfo, err error) { } return VolInfo{}, err } - // Modtime is used as created time since operating systems lack a - // portable way of knowing the actual created time of a directory. + // As os.Stat() doesn't carry other than ModTime(), use ModTime() + // as CreatedTime. createdTime := st.ModTime() return VolInfo{ Name: volume, @@ -257,13 +272,24 @@ func (s fsStorage) StatVol(volume string) (volInfo VolInfo, err error) { // DeleteVol - delete a volume. func (s fsStorage) DeleteVol(volume string) error { // Verify if volume is valid and it exists. - volumeDir, err := s.checkVolumeArg(volume) + volumeDir, err := s.getVolumeDir(volume) if err != nil { return err } err = os.Remove(volumeDir) - if err != nil && os.IsNotExist(err) { - return errVolumeNotFound + if err != nil { + if os.IsNotExist(err) { + return errVolumeNotFound + } else if strings.Contains(err.Error(), "directory is not empty") { + // On windows the string is slightly different, handle it + // here. + return errVolumeNotEmpty + } else if strings.Contains(err.Error(), "directory not empty") { + // Hopefully for all other operating systems, this is + // assumed to be consistent. + return errVolumeNotEmpty + } + return err } return err } @@ -302,18 +328,10 @@ func (s *fsStorage) lookupTreeWalk(params listParams) *treeWalker { return nil } -// List of special prefixes for files, includes old and new ones. -var specialPrefixes = []string{ - "$multipart", - "$tmpobject", - "$tmpfile", - // Add new special prefixes if any used. -} - // List operation. func (s fsStorage) ListFiles(volume, prefix, marker string, recursive bool, count int) ([]FileInfo, bool, error) { // Verify if volume is valid and it exists. - volumeDir, err := s.checkVolumeArg(volume) + volumeDir, err := s.getVolumeDir(volume) if err != nil { return nil, true, err } @@ -373,16 +391,6 @@ func (s fsStorage) ListFiles(volume, prefix, marker string, recursive bool, coun } fileInfo := walkResult.fileInfo fileInfo.Name = filepath.ToSlash(fileInfo.Name) - // TODO: Find a proper place to skip these files. - // Skip temporary files. - for _, specialPrefix := range specialPrefixes { - if strings.Contains(fileInfo.Name, specialPrefix) { - if walkResult.end { - return fileInfos, true, nil - } - continue - } - } fileInfos = append(fileInfos, fileInfo) // We have listed everything return. if walkResult.end { @@ -397,7 +405,7 @@ func (s fsStorage) ListFiles(volume, prefix, marker string, recursive bool, coun // ReadFile - read a file at a given offset. func (s fsStorage) ReadFile(volume string, path string, offset int64) (readCloser io.ReadCloser, err error) { - volumeDir, err := s.checkVolumeArg(volume) + volumeDir, err := s.getVolumeDir(volume) if err != nil { return nil, err } @@ -430,7 +438,7 @@ func (s fsStorage) ReadFile(volume string, path string, offset int64) (readClose // CreateFile - create a file at path. func (s fsStorage) CreateFile(volume, path string) (writeCloser io.WriteCloser, err error) { - volumeDir, err := s.checkVolumeArg(volume) + volumeDir, err := s.getVolumeDir(volume) if err != nil { return nil, err } @@ -449,7 +457,7 @@ func (s fsStorage) CreateFile(volume, path string) (writeCloser io.WriteCloser, // StatFile - get file info. func (s fsStorage) StatFile(volume, path string) (file FileInfo, err error) { - volumeDir, err := s.checkVolumeArg(volume) + volumeDir, err := s.getVolumeDir(volume) if err != nil { return FileInfo{}, err } @@ -520,7 +528,7 @@ func deleteFile(basePath, deletePath string) error { // DeleteFile - delete a file at path. func (s fsStorage) DeleteFile(volume, path string) error { - volumeDir, err := s.checkVolumeArg(volume) + volumeDir, err := s.getVolumeDir(volume) if err != nil { return err } diff --git a/network-fs.go b/network-fs.go index a160c4fd4..3872deaf0 100644 --- a/network-fs.go +++ b/network-fs.go @@ -30,6 +30,7 @@ import ( ) type networkFS struct { + netScheme string netAddr string netPath string rpcClient *rpc.Client @@ -37,8 +38,7 @@ type networkFS struct { } const ( - connected = "200 Connected to Go RPC" - dialTimeoutSecs = 30 // 30 seconds. + storageRPCPath = reservedBucket + "/rpc/storage" ) // splits network path into its components Address and Path. @@ -49,6 +49,29 @@ func splitNetPath(networkPath string) (netAddr, netPath string) { return netAddr, netPath } +// Converts rpc.ServerError to underlying error. This function is +// written so that the storageAPI errors are consistent across network +// disks as well. +func toStorageErr(err error) error { + switch err.Error() { + case errVolumeNotFound.Error(): + return errVolumeNotFound + case errVolumeExists.Error(): + return errVolumeExists + case errFileNotFound.Error(): + return errFileNotFound + case errIsNotRegular.Error(): + return errIsNotRegular + case errVolumeNotEmpty.Error(): + return errVolumeNotEmpty + case errFileAccessDenied.Error(): + return errFileAccessDenied + case errVolumeAccessDenied.Error(): + return errVolumeAccessDenied + } + return err +} + // Initialize new network file system. func newNetworkFS(networkPath string) (StorageAPI, error) { // Input validation. @@ -60,7 +83,7 @@ func newNetworkFS(networkPath string) (StorageAPI, error) { netAddr, netPath := splitNetPath(networkPath) // Dial minio rpc storage http path. - rpcClient, err := rpc.DialHTTPPath("tcp", netAddr, "/minio/rpc/storage") + rpcClient, err := rpc.DialHTTPPath("tcp", netAddr, storageRPCPath) if err != nil { return nil, err } @@ -76,6 +99,7 @@ func newNetworkFS(networkPath string) (StorageAPI, error) { // Initialize network storage. ndisk := &networkFS{ + netScheme: "http", // TODO: fix for ssl rpc support. netAddr: netAddr, netPath: netPath, rpcClient: rpcClient, @@ -90,10 +114,7 @@ func newNetworkFS(networkPath string) (StorageAPI, error) { func (n networkFS) MakeVol(volume string) error { reply := GenericReply{} if err := n.rpcClient.Call("Storage.MakeVolHandler", volume, &reply); err != nil { - if err.Error() == errVolumeExists.Error() { - return errVolumeExists - } - return err + return toStorageErr(err) } return nil } @@ -111,10 +132,7 @@ func (n networkFS) ListVols() (vols []VolInfo, err error) { // StatVol - get current Stat volume info. func (n networkFS) StatVol(volume string) (volInfo VolInfo, err error) { if err = n.rpcClient.Call("Storage.StatVolHandler", volume, &volInfo); err != nil { - if err.Error() == errVolumeNotFound.Error() { - return VolInfo{}, errVolumeNotFound - } - return VolInfo{}, err + return VolInfo{}, toStorageErr(err) } return volInfo, nil } @@ -123,10 +141,7 @@ func (n networkFS) StatVol(volume string) (volInfo VolInfo, err error) { func (n networkFS) DeleteVol(volume string) error { reply := GenericReply{} if err := n.rpcClient.Call("Storage.DeleteVolHandler", volume, &reply); err != nil { - if err.Error() == errVolumeNotFound.Error() { - return errVolumeNotFound - } - return err + return toStorageErr(err) } return nil } @@ -136,9 +151,9 @@ func (n networkFS) DeleteVol(volume string) error { // CreateFile - create file. func (n networkFS) CreateFile(volume, path string) (writeCloser io.WriteCloser, err error) { writeURL := new(url.URL) - writeURL.Scheme = "http" // TODO fix this. + writeURL.Scheme = n.netScheme writeURL.Host = n.netAddr - writeURL.Path = fmt.Sprintf("/minio/rpc/storage/upload/%s", urlpath.Join(volume, path)) + writeURL.Path = fmt.Sprintf("%s/upload/%s", storageRPCPath, urlpath.Join(volume, path)) contentType := "application/octet-stream" readCloser, writeCloser := io.Pipe() @@ -149,11 +164,16 @@ func (n networkFS) CreateFile(volume, path string) (writeCloser io.WriteCloser, return } if resp != nil { - if resp.StatusCode != http.StatusNotFound { - readCloser.CloseWithError(errFileNotFound) + if resp.StatusCode != http.StatusOK { + if resp.StatusCode == http.StatusNotFound { + readCloser.CloseWithError(errFileNotFound) + return + } + readCloser.CloseWithError(errors.New("Invalid response.")) return } - readCloser.CloseWithError(errors.New("Invalid response.")) + // Close the reader. + readCloser.Close() } }() return writeCloser, nil @@ -165,14 +185,7 @@ func (n networkFS) StatFile(volume, path string) (fileInfo FileInfo, err error) Vol: volume, Path: path, }, &fileInfo); err != nil { - if err.Error() == errVolumeNotFound.Error() { - return FileInfo{}, errVolumeNotFound - } else if err.Error() == errFileNotFound.Error() { - return FileInfo{}, errFileNotFound - } else if err.Error() == errIsNotRegular.Error() { - return FileInfo{}, errFileNotFound - } - return FileInfo{}, err + return FileInfo{}, toStorageErr(err) } return fileInfo, nil } @@ -180,9 +193,9 @@ func (n networkFS) StatFile(volume, path string) (fileInfo FileInfo, err error) // ReadFile - reads a file. func (n networkFS) ReadFile(volume string, path string, offset int64) (reader io.ReadCloser, err error) { readURL := new(url.URL) - readURL.Scheme = "http" // TODO fix this. + readURL.Scheme = n.netScheme readURL.Host = n.netAddr - readURL.Path = fmt.Sprintf("/minio/rpc/storage/download/%s", urlpath.Join(volume, path)) + readURL.Path = fmt.Sprintf("%s/download/%s", storageRPCPath, urlpath.Join(volume, path)) readQuery := make(url.Values) readQuery.Set("offset", strconv.FormatInt(offset, 10)) readURL.RawQuery = readQuery.Encode() @@ -190,11 +203,13 @@ func (n networkFS) ReadFile(volume string, path string, offset int64) (reader io if err != nil { return nil, err } - if resp.StatusCode != http.StatusOK { - if resp.StatusCode == http.StatusNotFound { - return nil, errFileNotFound + if resp != nil { + if resp.StatusCode != http.StatusOK { + if resp.StatusCode == http.StatusNotFound { + return nil, errFileNotFound + } + return nil, errors.New("Invalid response") } - return nil, errors.New("Invalid response") } return resp.Body, nil } @@ -209,16 +224,10 @@ func (n networkFS) ListFiles(volume, prefix, marker string, recursive bool, coun Recursive: recursive, Count: count, }, &listFilesReply); err != nil { - if err.Error() == errVolumeNotFound.Error() { - return nil, true, errVolumeNotFound - } - return nil, true, err + return nil, true, toStorageErr(err) } - // List of files. - files = listFilesReply.Files - // EOF. - eof = listFilesReply.EOF - return files, eof, nil + // Return successfully unmarshalled results. + return listFilesReply.Files, listFilesReply.EOF, nil } // DeleteFile - Delete a file at path. @@ -228,12 +237,7 @@ func (n networkFS) DeleteFile(volume, path string) (err error) { Vol: volume, Path: path, }, &reply); err != nil { - if err.Error() == errVolumeNotFound.Error() { - return errVolumeNotFound - } else if err.Error() == errFileNotFound.Error() { - return errFileNotFound - } - return err + return toStorageErr(err) } return nil } diff --git a/object-api-multipart.go b/object-api-multipart.go index bbb442b17..6c6980b0f 100644 --- a/object-api-multipart.go +++ b/object-api-multipart.go @@ -26,7 +26,6 @@ import ( "strings" "github.com/minio/minio/pkg/probe" - "github.com/minio/minio/pkg/safe" "github.com/skyrings/skyring-common/tools/uuid" ) @@ -72,6 +71,14 @@ func (o objectAPI) ListMultipartUploads(bucket, prefix, keyMarker, uploadIDMarke if !IsValidObjectPrefix(prefix) { return ListMultipartsInfo{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: prefix}) } + if _, e := o.storage.StatVol(minioMetaVolume); e != nil { + if e == errVolumeNotFound { + e = o.storage.MakeVol(minioMetaVolume) + if e != nil { + return ListMultipartsInfo{}, probe.NewError(e) + } + } + } // Verify if delimiter is anything other than '/', which we do not support. if delimiter != "" && delimiter != slashPathSeparator { return ListMultipartsInfo{}, probe.NewError(UnsupportedDelimiter{ @@ -253,14 +260,14 @@ func (o objectAPI) NewMultipartUpload(bucket, object string) (string, *probe.Err return "", probe.NewError(e) } uploadID := uuid.String() - uploadIDFile := path.Join(bucket, object, uploadID) - if _, e = o.storage.StatFile(minioMetaVolume, uploadIDFile); e != nil { + uploadIDPath := path.Join(bucket, object, uploadID) + if _, e = o.storage.StatFile(minioMetaVolume, uploadIDPath); e != nil { if e != errFileNotFound { return "", probe.NewError(e) } - // uploadIDFile doesn't exist, so create empty file to reserve the name + // uploadIDPath doesn't exist, so create empty file to reserve the name var w io.WriteCloser - if w, e = o.storage.CreateFile(minioMetaVolume, uploadIDFile); e == nil { + if w, e = o.storage.CreateFile(minioMetaVolume, uploadIDPath); e == nil { if e = w.Close(); e != nil { return "", probe.NewError(e) } @@ -269,19 +276,23 @@ func (o objectAPI) NewMultipartUpload(bucket, object string) (string, *probe.Err } return uploadID, nil } - // uploadIDFile already exists. + // uploadIDPath already exists. // loop again to try with different uuid generated. } } -func (o objectAPI) isUploadIDExist(bucket, object, uploadID string) (bool, error) { - st, e := o.storage.StatFile(minioMetaVolume, path.Join(bucket, object, uploadID)) +// isUploadIDExists - verify if a given uploadID exists and is valid. +func (o objectAPI) isUploadIDExists(bucket, object, uploadID string) (bool, error) { + uploadIDPath := path.Join(bucket, object, uploadID) + st, e := o.storage.StatFile(minioMetaVolume, uploadIDPath) if e != nil { + // Upload id does not exist. if e == errFileNotFound { return false, nil } return false, e } + // Upload id exists and is a regular file. return st.Mode.IsRegular(), nil } @@ -293,7 +304,7 @@ func (o objectAPI) PutObjectPart(bucket, object, uploadID string, partID int, si if !IsValidObjectName(object) { return "", probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object}) } - if status, e := o.isUploadIDExist(bucket, object, uploadID); e != nil { + if status, e := o.isUploadIDExists(bucket, object, uploadID); e != nil { return "", probe.NewError(e) } else if !status { return "", probe.NewError(InvalidUploadID{UploadID: uploadID}) @@ -324,12 +335,12 @@ func (o objectAPI) PutObjectPart(bucket, object, uploadID string, partID int, si // Instantiate checksum hashers and create a multiwriter. if size > 0 { if _, e = io.CopyN(multiWriter, data, size); e != nil { - fileWriter.(*safe.File).CloseAndRemove() + safeCloseAndRemove(fileWriter) return "", probe.NewError(e) } } else { if _, e = io.Copy(multiWriter, data); e != nil { - fileWriter.(*safe.File).CloseAndRemove() + safeCloseAndRemove(fileWriter) return "", probe.NewError(e) } } @@ -337,7 +348,7 @@ func (o objectAPI) PutObjectPart(bucket, object, uploadID string, partID int, si newMD5Hex := hex.EncodeToString(md5Writer.Sum(nil)) if md5Hex != "" { if newMD5Hex != md5Hex { - fileWriter.(*safe.File).CloseAndRemove() + safeCloseAndRemove(fileWriter) return "", probe.NewError(BadDigest{md5Hex, newMD5Hex}) } } @@ -356,7 +367,16 @@ func (o objectAPI) ListObjectParts(bucket, object, uploadID string, partNumberMa if !IsValidObjectName(object) { return ListPartsInfo{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object}) } - if status, e := o.isUploadIDExist(bucket, object, uploadID); e != nil { + // Create minio meta volume, if it doesn't exist yet. + if _, e := o.storage.StatVol(minioMetaVolume); e != nil { + if e == errVolumeNotFound { + e = o.storage.MakeVol(minioMetaVolume) + if e != nil { + return ListPartsInfo{}, probe.NewError(e) + } + } + } + if status, e := o.isUploadIDExists(bucket, object, uploadID); e != nil { return ListPartsInfo{}, probe.NewError(e) } else if !status { return ListPartsInfo{}, probe.NewError(InvalidUploadID{UploadID: uploadID}) @@ -364,8 +384,11 @@ func (o objectAPI) ListObjectParts(bucket, object, uploadID string, partNumberMa result := ListPartsInfo{} marker := "" nextPartNumberMarker := 0 + uploadIDPath := path.Join(bucket, object, uploadID) + // Figure out the marker for the next subsequent calls, if the + // partNumberMarker is already set. if partNumberMarker > 0 { - fileInfos, _, e := o.storage.ListFiles(minioMetaVolume, path.Join(bucket, object, uploadID)+"."+strconv.Itoa(partNumberMarker)+".", "", false, 1) + fileInfos, _, e := o.storage.ListFiles(minioMetaVolume, uploadIDPath+"."+strconv.Itoa(partNumberMarker)+".", "", false, 1) if e != nil { return result, probe.NewError(e) } @@ -374,7 +397,7 @@ func (o objectAPI) ListObjectParts(bucket, object, uploadID string, partNumberMa } marker = fileInfos[0].Name } - fileInfos, eof, e := o.storage.ListFiles(minioMetaVolume, path.Join(bucket, object, uploadID)+".", marker, false, maxParts) + fileInfos, eof, e := o.storage.ListFiles(minioMetaVolume, uploadIDPath+".", marker, false, maxParts) if e != nil { return result, probe.NewError(InvalidPart{}) } @@ -404,55 +427,89 @@ func (o objectAPI) ListObjectParts(bucket, object, uploadID string, partNumberMa return result, nil } -func (o objectAPI) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (ObjectInfo, *probe.Error) { +// Create an s3 compatible MD5sum for complete multipart transaction. +func makeS3MD5(md5Strs ...string) (string, *probe.Error) { + var finalMD5Bytes []byte + for _, md5Str := range md5Strs { + md5Bytes, e := hex.DecodeString(md5Str) + if e != nil { + return "", probe.NewError(e) + } + finalMD5Bytes = append(finalMD5Bytes, md5Bytes...) + } + md5Hasher := md5.New() + md5Hasher.Write(finalMD5Bytes) + s3MD5 := fmt.Sprintf("%s-%d", hex.EncodeToString(md5Hasher.Sum(nil)), len(md5Strs)) + return s3MD5, nil +} + +func (o objectAPI) CompleteMultipartUpload(bucket string, object string, uploadID string, parts []completePart) (string, *probe.Error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return ObjectInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) + return "", probe.NewError(BucketNameInvalid{Bucket: bucket}) } if !IsValidObjectName(object) { - return ObjectInfo{}, probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object}) + return "", probe.NewError(ObjectNameInvalid{ + Bucket: bucket, + Object: object, + }) } - if status, e := o.isUploadIDExist(bucket, object, uploadID); e != nil { - return ObjectInfo{}, probe.NewError(e) + if status, e := o.isUploadIDExists(bucket, object, uploadID); e != nil { + return "", probe.NewError(e) } else if !status { - return ObjectInfo{}, probe.NewError(InvalidUploadID{UploadID: uploadID}) + return "", probe.NewError(InvalidUploadID{UploadID: uploadID}) } fileWriter, e := o.storage.CreateFile(bucket, object) if e != nil { - return ObjectInfo{}, nil + if e == errVolumeNotFound { + return "", probe.NewError(BucketNotFound{ + Bucket: bucket, + }) + } else if e == errIsNotRegular { + return "", probe.NewError(ObjectExistsAsPrefix{ + Bucket: bucket, + Prefix: object, + }) + } + return "", probe.NewError(e) } + + var md5Sums []string for _, part := range parts { partSuffix := fmt.Sprintf("%s.%d.%s", uploadID, part.PartNumber, part.ETag) var fileReader io.ReadCloser fileReader, e = o.storage.ReadFile(minioMetaVolume, path.Join(bucket, object, partSuffix), 0) if e != nil { - return ObjectInfo{}, probe.NewError(e) + if e == errFileNotFound { + return "", probe.NewError(InvalidPart{}) + } + return "", probe.NewError(e) } _, e = io.Copy(fileWriter, fileReader) if e != nil { - return ObjectInfo{}, probe.NewError(e) + return "", probe.NewError(e) } e = fileReader.Close() if e != nil { - return ObjectInfo{}, probe.NewError(e) + return "", probe.NewError(e) } + md5Sums = append(md5Sums, part.ETag) } e = fileWriter.Close() if e != nil { - return ObjectInfo{}, probe.NewError(e) + return "", probe.NewError(e) } - fi, e := o.storage.StatFile(bucket, object) - if e != nil { - return ObjectInfo{}, probe.NewError(e) + + // Save the s3 md5. + s3MD5, err := makeS3MD5(md5Sums...) + if err != nil { + return "", err.Trace(md5Sums...) } + + // Cleanup all the parts. o.removeMultipartUpload(bucket, object, uploadID) - return ObjectInfo{ - Bucket: bucket, - Name: object, - ModTime: fi.ModTime, - Size: fi.Size, - IsDir: false, - }, nil + + return s3MD5, nil } func (o objectAPI) removeMultipartUpload(bucket, object, uploadID string) *probe.Error { @@ -465,8 +522,8 @@ func (o objectAPI) removeMultipartUpload(bucket, object, uploadID string) *probe } marker := "" for { - uploadIDFile := path.Join(bucket, object, uploadID) - fileInfos, eof, e := o.storage.ListFiles(minioMetaVolume, uploadIDFile, marker, false, 1000) + uploadIDPath := path.Join(bucket, object, uploadID) + fileInfos, eof, e := o.storage.ListFiles(minioMetaVolume, uploadIDPath, marker, false, 1000) if e != nil { return probe.NewError(ObjectNotFound{Bucket: bucket, Object: object}) } @@ -489,14 +546,14 @@ func (o objectAPI) AbortMultipartUpload(bucket, object, uploadID string) *probe. if !IsValidObjectName(object) { return probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object}) } - if status, e := o.isUploadIDExist(bucket, object, uploadID); e != nil { + if status, e := o.isUploadIDExists(bucket, object, uploadID); e != nil { return probe.NewError(e) } else if !status { return probe.NewError(InvalidUploadID{UploadID: uploadID}) } - e := o.removeMultipartUpload(bucket, object, uploadID) - if e != nil { - return e.Trace() + err := o.removeMultipartUpload(bucket, object, uploadID) + if err != nil { + return err.Trace(bucket, object, uploadID) } return nil } diff --git a/object-api.go b/object-api.go index e0e54ae8f..bd2ad1ff2 100644 --- a/object-api.go +++ b/object-api.go @@ -19,6 +19,7 @@ package main import ( "crypto/md5" "encoding/hex" + "errors" "io" "path/filepath" "strings" @@ -67,7 +68,7 @@ func (o objectAPI) GetBucketInfo(bucket string) (BucketInfo, *probe.Error) { return BucketInfo{}, probe.NewError(e) } return BucketInfo{ - Name: vi.Name, + Name: bucket, Created: vi.Created, }, nil } @@ -102,6 +103,8 @@ func (o objectAPI) DeleteBucket(bucket string) *probe.Error { if e := o.storage.DeleteVol(bucket); e != nil { if e == errVolumeNotFound { return probe.NewError(BucketNotFound{Bucket: bucket}) + } else if e == errVolumeNotEmpty { + return probe.NewError(BucketNotEmpty{Bucket: bucket}) } return probe.NewError(e) } @@ -161,8 +164,8 @@ func (o objectAPI) GetObjectInfo(bucket, object string) (ObjectInfo, *probe.Erro } } return ObjectInfo{ - Bucket: fi.Volume, - Name: fi.Name, + Bucket: bucket, + Name: object, ModTime: fi.ModTime, Size: fi.Size, IsDir: fi.Mode.IsDir(), @@ -171,13 +174,28 @@ func (o objectAPI) GetObjectInfo(bucket, object string) (ObjectInfo, *probe.Erro }, nil } -func (o objectAPI) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (ObjectInfo, *probe.Error) { +// safeCloseAndRemove - safely closes and removes underlying temporary +// file writer if possible. +func safeCloseAndRemove(writer io.WriteCloser) error { + // If writer is a safe file, Attempt to close and remove. + safeWriter, ok := writer.(*safe.File) + if ok { + return safeWriter.CloseAndRemove() + } + pipeWriter, ok := writer.(*io.PipeWriter) + if ok { + return pipeWriter.CloseWithError(errors.New("Close and error out.")) + } + return nil +} + +func (o objectAPI) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (string, *probe.Error) { // Verify if bucket is valid. if !IsValidBucketName(bucket) { - return ObjectInfo{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) + return "", probe.NewError(BucketNameInvalid{Bucket: bucket}) } if !IsValidObjectName(object) { - return ObjectInfo{}, probe.NewError(ObjectNameInvalid{ + return "", probe.NewError(ObjectNameInvalid{ Bucket: bucket, Object: object, }) @@ -185,16 +203,16 @@ func (o objectAPI) PutObject(bucket string, object string, size int64, data io.R fileWriter, e := o.storage.CreateFile(bucket, object) if e != nil { if e == errVolumeNotFound { - return ObjectInfo{}, probe.NewError(BucketNotFound{ + return "", probe.NewError(BucketNotFound{ Bucket: bucket, }) } else if e == errIsNotRegular { - return ObjectInfo{}, probe.NewError(ObjectExistsAsPrefix{ + return "", probe.NewError(ObjectExistsAsPrefix{ Bucket: bucket, Prefix: object, }) } - return ObjectInfo{}, probe.NewError(e) + return "", probe.NewError(e) } // Initialize md5 writer. @@ -206,13 +224,13 @@ func (o objectAPI) PutObject(bucket string, object string, size int64, data io.R // Instantiate checksum hashers and create a multiwriter. if size > 0 { if _, e = io.CopyN(multiWriter, data, size); e != nil { - fileWriter.(*safe.File).CloseAndRemove() - return ObjectInfo{}, probe.NewError(e) + safeCloseAndRemove(fileWriter) + return "", probe.NewError(e) } } else { if _, e = io.Copy(multiWriter, data); e != nil { - fileWriter.(*safe.File).CloseAndRemove() - return ObjectInfo{}, probe.NewError(e) + safeCloseAndRemove(fileWriter) + return "", probe.NewError(e) } } @@ -224,35 +242,17 @@ func (o objectAPI) PutObject(bucket string, object string, size int64, data io.R } if md5Hex != "" { if newMD5Hex != md5Hex { - fileWriter.(*safe.File).CloseAndRemove() - return ObjectInfo{}, probe.NewError(BadDigest{md5Hex, newMD5Hex}) + safeCloseAndRemove(fileWriter) + return "", probe.NewError(BadDigest{md5Hex, newMD5Hex}) } } e = fileWriter.Close() if e != nil { - return ObjectInfo{}, probe.NewError(e) - } - fi, e := o.storage.StatFile(bucket, object) - if e != nil { - return ObjectInfo{}, probe.NewError(e) + return "", probe.NewError(e) } - contentType := "application/octet-stream" - if objectExt := filepath.Ext(object); objectExt != "" { - content, ok := mimedb.DB[strings.ToLower(strings.TrimPrefix(objectExt, "."))] - if ok { - contentType = content.ContentType - } - } - - return ObjectInfo{ - Bucket: fi.Volume, - Name: fi.Name, - ModTime: fi.ModTime, - Size: fi.Size, - ContentType: contentType, - MD5Sum: newMD5Hex, - }, nil + // Return md5sum. + return newMD5Hex, nil } func (o objectAPI) DeleteObject(bucket, object string) *probe.Error { @@ -267,6 +267,12 @@ func (o objectAPI) DeleteObject(bucket, object string) *probe.Error { if e == errVolumeNotFound { return probe.NewError(BucketNotFound{Bucket: bucket}) } + if e == errFileNotFound { + return probe.NewError(ObjectNotFound{ + Bucket: bucket, + Object: object, + }) + } return probe.NewError(e) } return nil @@ -311,10 +317,13 @@ func (o objectAPI) ListObjects(bucket, prefix, marker, delimiter string, maxKeys } result := ListObjectsInfo{IsTruncated: !eof} for _, fileInfo := range fileInfos { - result.NextMarker = fileInfo.Name - if fileInfo.Mode.IsDir() { - result.Prefixes = append(result.Prefixes, fileInfo.Name) - continue + // With delimiter set we fill in NextMarker and Prefixes. + if delimiter == slashPathSeparator { + result.NextMarker = fileInfo.Name + if fileInfo.Mode.IsDir() { + result.Prefixes = append(result.Prefixes, fileInfo.Name) + continue + } } result.Objects = append(result.Objects, ObjectInfo{ Name: fileInfo.Name, diff --git a/object-handlers.go b/object-handlers.go index 865a1981a..3424db5b0 100644 --- a/object-handlers.go +++ b/object-handlers.go @@ -457,9 +457,8 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re metadata["md5Sum"] = hex.EncodeToString(md5Bytes) // Create the object. - objInfo, err = api.ObjectAPI.PutObject(bucket, object, size, readCloser, metadata) + md5Sum, err := api.ObjectAPI.PutObject(bucket, object, size, readCloser, metadata) if err != nil { - errorIf(err.Trace(), "PutObject failed.", nil) switch err.ToGoError().(type) { case RootPathFull: writeErrorResponse(w, r, ErrRootPathFull, r.URL.Path) @@ -474,11 +473,20 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re case ObjectExistsAsPrefix: writeErrorResponse(w, r, ErrObjectExistsAsPrefix, r.URL.Path) default: + errorIf(err.Trace(), "PutObject failed.", nil) writeErrorResponse(w, r, ErrInternalError, r.URL.Path) } return } - response := generateCopyObjectResponse(objInfo.MD5Sum, objInfo.ModTime) + + objInfo, err = api.ObjectAPI.GetObjectInfo(bucket, object) + if err != nil { + errorIf(err.Trace(), "GetObjectInfo failed.", nil) + writeErrorResponse(w, r, ErrInternalError, r.URL.Path) + return + } + + response := generateCopyObjectResponse(md5Sum, objInfo.ModTime) encodedSuccessResponse := encodeResponse(response) // write headers setCommonHeaders(w) @@ -613,7 +621,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req return } - var objInfo ObjectInfo + var md5Sum string switch getRequestAuthType(r) { default: // For all unknown auth types return error. @@ -626,7 +634,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req return } // Create anonymous object. - objInfo, err = api.ObjectAPI.PutObject(bucket, object, size, r.Body, nil) + md5Sum, err = api.ObjectAPI.PutObject(bucket, object, size, r.Body, nil) case authTypePresigned, authTypeSigned: // Initialize a pipe for data pipe line. reader, writer := io.Pipe() @@ -665,7 +673,7 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req // Make sure we hex encode here. metadata["md5"] = hex.EncodeToString(md5Bytes) // Create object. - objInfo, err = api.ObjectAPI.PutObject(bucket, object, size, reader, metadata) + md5Sum, err = api.ObjectAPI.PutObject(bucket, object, size, reader, metadata) } if err != nil { errorIf(err.Trace(), "PutObject failed.", nil) @@ -693,8 +701,8 @@ func (api objectAPIHandlers) PutObjectHandler(w http.ResponseWriter, r *http.Req } return } - if objInfo.MD5Sum != "" { - w.Header().Set("ETag", "\""+objInfo.MD5Sum+"\"") + if md5Sum != "" { + w.Header().Set("ETag", "\""+md5Sum+"\"") } writeSuccessResponse(w, nil) } @@ -965,6 +973,8 @@ func (api objectAPIHandlers) ListObjectPartsHandler(w http.ResponseWriter, r *ht writeErrorResponse(w, r, ErrNoSuchKey, r.URL.Path) case InvalidUploadID: writeErrorResponse(w, r, ErrNoSuchUpload, r.URL.Path) + case InvalidPart: + writeErrorResponse(w, r, ErrInvalidPart, r.URL.Path) default: writeErrorResponse(w, r, ErrInternalError, r.URL.Path) } @@ -987,7 +997,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite // Get upload id. uploadID, _, _, _ := getObjectResources(r.URL.Query()) - var objInfo ObjectInfo + var md5Sum string var err *probe.Error switch getRequestAuthType(r) { default: @@ -1030,7 +1040,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite completeParts = append(completeParts, part) } // Complete multipart upload. - objInfo, err = api.ObjectAPI.CompleteMultipartUpload(bucket, object, uploadID, completeParts) + md5Sum, err = api.ObjectAPI.CompleteMultipartUpload(bucket, object, uploadID, completeParts) if err != nil { errorIf(err.Trace(), "CompleteMultipartUpload failed.", nil) switch err.ToGoError().(type) { @@ -1058,7 +1068,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite // Get object location. location := getLocation(r) // Generate complete multipart response. - response := generateCompleteMultpartUploadResponse(bucket, object, location, objInfo.MD5Sum) + response := generateCompleteMultpartUploadResponse(bucket, object, location, md5Sum) encodedSuccessResponse := encodeResponse(response) // Write headers. setCommonHeaders(w) diff --git a/object-utils.go b/object-utils.go index 9be07b0f8..1c31becb2 100644 --- a/object-utils.go +++ b/object-utils.go @@ -81,8 +81,8 @@ func IsValidObjectName(object string) bool { // IsValidObjectPrefix verifies whether the prefix is a valid object name. // Its valid to have a empty prefix. func IsValidObjectPrefix(object string) bool { - // Prefix can be empty. - if object == "" { + // Prefix can be empty or "/". + if object == "" || object == "/" { return true } // Verify if prefix is a valid object name. diff --git a/object_api_suite_test.go b/object_api_suite_test.go index 635aae54c..859533a7f 100644 --- a/object_api_suite_test.go +++ b/object_api_suite_test.go @@ -77,9 +77,9 @@ func testMultipartObjectCreation(c *check.C, create func() *objectAPI) { c.Assert(calculatedMD5sum, check.Equals, expectedMD5Sumhex) completedParts.Parts = append(completedParts.Parts, completePart{PartNumber: i, ETag: calculatedMD5sum}) } - objInfo, err := obj.CompleteMultipartUpload("bucket", "key", uploadID, completedParts.Parts) + md5Sum, err := obj.CompleteMultipartUpload("bucket", "key", uploadID, completedParts.Parts) c.Assert(err, check.IsNil) - c.Assert(objInfo.MD5Sum, check.Equals, "3605d84b1c43b1a664aa7c0d5082d271-10") + c.Assert(md5Sum, check.Equals, "3605d84b1c43b1a664aa7c0d5082d271-10") } func testMultipartObjectAbort(c *check.C, create func() *objectAPI) { @@ -133,9 +133,9 @@ func testMultipleObjectCreation(c *check.C, create func() *objectAPI) { objects[key] = []byte(randomString) metadata := make(map[string]string) metadata["md5Sum"] = expectedMD5Sumhex - objInfo, err := obj.PutObject("bucket", key, int64(len(randomString)), bytes.NewBufferString(randomString), metadata) + md5Sum, err := obj.PutObject("bucket", key, int64(len(randomString)), bytes.NewBufferString(randomString), metadata) c.Assert(err, check.IsNil) - c.Assert(objInfo.MD5Sum, check.Equals, expectedMD5Sumhex) + c.Assert(md5Sum, check.Equals, expectedMD5Sumhex) } for key, value := range objects { diff --git a/storage-errors.go b/storage-errors.go index 5b50ed9b9..0e78ffdab 100644 --- a/storage-errors.go +++ b/storage-errors.go @@ -33,6 +33,9 @@ var errIsNotRegular = errors.New("Not a regular file type.") // errVolumeNotFound - cannot find the volume. var errVolumeNotFound = errors.New("Volume not found.") +// errVolumeNotEmpty - volume not empty. +var errVolumeNotEmpty = errors.New("Volume is not empty.") + // errVolumeAccessDenied - cannot access volume, insufficient // permissions. var errVolumeAccessDenied = errors.New("Volume access denied.") diff --git a/storage-rpc-server.go b/storage-rpc-server.go index 760c8b0d9..1dce8337a 100644 --- a/storage-rpc-server.go +++ b/storage-rpc-server.go @@ -4,11 +4,9 @@ import ( "io" "net/http" "net/rpc" - "os" "strconv" router "github.com/gorilla/mux" - "github.com/minio/minio/pkg/safe" ) // Storage server implements rpc primitives to facilitate exporting a @@ -58,8 +56,12 @@ func (s *storageServer) ListFilesHandler(arg *ListFilesArgs, reply *ListFilesRep if err != nil { return err } + + // Fill reply structure. reply.Files = files reply.EOF = eof + + // Return success. return nil } @@ -95,12 +97,18 @@ func registerStorageRPCRouter(mux *router.Router, storageAPI StorageAPI) { path := vars["path"] writeCloser, err := stServer.storage.CreateFile(volume, path) if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + httpErr := http.StatusInternalServerError + if err == errVolumeNotFound { + httpErr = http.StatusNotFound + } else if err == errIsNotRegular { + httpErr = http.StatusConflict + } + http.Error(w, err.Error(), httpErr) return } reader := r.Body if _, err = io.Copy(writeCloser, reader); err != nil { - writeCloser.(*safe.File).CloseAndRemove() + safeCloseAndRemove(writeCloser) http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -120,14 +128,22 @@ func registerStorageRPCRouter(mux *router.Router, storageAPI StorageAPI) { readCloser, err := stServer.storage.ReadFile(volume, path, offset) if err != nil { httpErr := http.StatusBadRequest - if os.IsNotExist(err) { + if err == errVolumeNotFound { + httpErr = http.StatusNotFound + } else if err == errFileNotFound { httpErr = http.StatusNotFound } http.Error(w, err.Error(), httpErr) return } + + // Copy reader to writer. io.Copy(w, readCloser) + + // Flush out any remaining buffers to client. w.(http.Flusher).Flush() + + // Close the reader. readCloser.Close() }) }