mirror of
https://github.com/minio/minio.git
synced 2025-02-08 20:28:08 -05:00
Previously scanMultipartDir() returns object info channel and timer channel where timer channel is used to check whether object info channel is alive or not. This causes a race condition that timeout may occur while object info channel in use. This patch fixes the issue by removing timer channel and uses object info channel directly where each object info has End bool field indicates whether received object info is end or not.
281 lines
7.9 KiB
Go
281 lines
7.9 KiB
Go
/*
|
|
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package main
|
|
|
|
import (
|
|
"errors"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
func scanMultipartDir(bucketDir, prefixPath, markerPath, uploadIDMarker string, recursive bool) <-chan multipartObjectInfo {
|
|
objectInfoCh := make(chan multipartObjectInfo, listObjectsLimit)
|
|
|
|
// TODO: check if bucketDir is absolute path
|
|
scanDir := bucketDir
|
|
dirDepth := bucketDir
|
|
|
|
if prefixPath != "" {
|
|
if !filepath.IsAbs(prefixPath) {
|
|
tmpPrefixPath := filepath.Join(bucketDir, prefixPath)
|
|
if strings.HasSuffix(prefixPath, string(os.PathSeparator)) {
|
|
tmpPrefixPath += string(os.PathSeparator)
|
|
}
|
|
prefixPath = tmpPrefixPath
|
|
}
|
|
|
|
// TODO: check if prefixPath starts with bucketDir
|
|
|
|
// Case #1: if prefixPath is /mnt/mys3/mybucket/2012/photos/paris, then
|
|
// dirDepth is /mnt/mys3/mybucket/2012/photos
|
|
// Case #2: if prefixPath is /mnt/mys3/mybucket/2012/photos/, then
|
|
// dirDepth is /mnt/mys3/mybucket/2012/photos
|
|
dirDepth = filepath.Dir(prefixPath)
|
|
scanDir = dirDepth
|
|
} else {
|
|
prefixPath = bucketDir
|
|
}
|
|
|
|
if markerPath != "" {
|
|
if !filepath.IsAbs(markerPath) {
|
|
tmpMarkerPath := filepath.Join(bucketDir, markerPath)
|
|
if strings.HasSuffix(markerPath, string(os.PathSeparator)) {
|
|
tmpMarkerPath += string(os.PathSeparator)
|
|
}
|
|
|
|
markerPath = tmpMarkerPath
|
|
}
|
|
|
|
// TODO: check markerPath must be a file
|
|
if uploadIDMarker != "" {
|
|
markerPath = filepath.Join(markerPath, uploadIDMarker+multipartUploadIDSuffix)
|
|
}
|
|
|
|
// TODO: check if markerPath starts with bucketDir
|
|
// TODO: check if markerPath starts with prefixPath
|
|
|
|
// Case #1: if markerPath is /mnt/mys3/mybucket/2012/photos/gophercon.png, then
|
|
// scanDir is /mnt/mys3/mybucket/2012/photos
|
|
// Case #2: if markerPath is /mnt/mys3/mybucket/2012/photos/gophercon.png/1fbd117a-268a-4ed0-85c9-8cc3888cbf20.uploadid, then
|
|
// scanDir is /mnt/mys3/mybucket/2012/photos/gophercon.png
|
|
// Case #3: if markerPath is /mnt/mys3/mybucket/2012/photos/, then
|
|
// scanDir is /mnt/mys3/mybucket/2012/photos
|
|
|
|
scanDir = filepath.Dir(markerPath)
|
|
} else {
|
|
markerPath = bucketDir
|
|
}
|
|
|
|
// Have bucketDir ends with os.PathSeparator
|
|
if !strings.HasSuffix(bucketDir, string(os.PathSeparator)) {
|
|
bucketDir += string(os.PathSeparator)
|
|
}
|
|
|
|
// Remove os.PathSeparator if scanDir ends with
|
|
if strings.HasSuffix(scanDir, string(os.PathSeparator)) {
|
|
scanDir = filepath.Dir(scanDir)
|
|
}
|
|
|
|
// goroutine - retrieves directory entries, makes ObjectInfo and sends into the channel.
|
|
go func() {
|
|
defer close(objectInfoCh)
|
|
|
|
// send function - returns true if ObjectInfo is sent
|
|
// within (time.Second * 15) else false on timeout.
|
|
send := func(oi multipartObjectInfo) bool {
|
|
timer := time.After(time.Second * 15)
|
|
select {
|
|
case objectInfoCh <- oi:
|
|
return true
|
|
case <-timer:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// filter function - filters directory entries matching multipart uploadids, prefix and marker
|
|
direntFilterFn := func(dirent fsDirent) bool {
|
|
// check if dirent is a directory (or) dirent is a regular file and it's name ends with Upload ID suffix string
|
|
if dirent.IsDir() || (dirent.IsRegular() && strings.HasSuffix(dirent.name, multipartUploadIDSuffix)) {
|
|
// return if dirent's name starts with prefixPath and lexically higher than markerPath
|
|
return strings.HasPrefix(dirent.name, prefixPath) && dirent.name > markerPath
|
|
}
|
|
return false
|
|
}
|
|
|
|
// filter function - filters directory entries matching multipart uploadids
|
|
subDirentFilterFn := func(dirent fsDirent) bool {
|
|
// check if dirent is a directory (or) dirent is a regular file and it's name ends with Upload ID suffix string
|
|
return dirent.IsDir() || (dirent.IsRegular() && strings.HasSuffix(dirent.name, multipartUploadIDSuffix))
|
|
}
|
|
|
|
// lastObjInfo is used to save last object info which is sent at last with End=true
|
|
var lastObjInfo *multipartObjectInfo
|
|
|
|
sendError := func(err error) {
|
|
if lastObjInfo != nil {
|
|
if !send(*lastObjInfo) {
|
|
// as we got error sending lastObjInfo, we can't send the error
|
|
return
|
|
}
|
|
}
|
|
|
|
send(multipartObjectInfo{Err: err, End: true})
|
|
return
|
|
}
|
|
|
|
for {
|
|
dirents, err := scandir(scanDir, direntFilterFn, false)
|
|
if err != nil {
|
|
sendError(err)
|
|
return
|
|
}
|
|
|
|
var dirent fsDirent
|
|
for len(dirents) > 0 {
|
|
dirent, dirents = dirents[0], dirents[1:]
|
|
if dirent.IsRegular() {
|
|
// Handle uploadid file
|
|
name := strings.Replace(filepath.Dir(dirent.name), bucketDir, "", 1)
|
|
if name == "" {
|
|
// This should not happen ie uploadid file should not be in bucket directory
|
|
sendError(errors.New("Corrupted metadata"))
|
|
return
|
|
}
|
|
|
|
uploadID := strings.Split(filepath.Base(dirent.name), multipartUploadIDSuffix)[0]
|
|
|
|
// Solaris and older unixes have modTime to be
|
|
// empty, fallback to os.Stat() to fill missing values.
|
|
if dirent.modTime.IsZero() {
|
|
if fi, e := os.Stat(dirent.name); e == nil {
|
|
dirent.modTime = fi.ModTime()
|
|
} else {
|
|
sendError(e)
|
|
return
|
|
}
|
|
}
|
|
|
|
objInfo := multipartObjectInfo{
|
|
Name: name,
|
|
UploadID: uploadID,
|
|
ModifiedTime: dirent.modTime,
|
|
}
|
|
|
|
// as we got new object info, send last object info and keep new object info as last object info
|
|
if lastObjInfo != nil {
|
|
if !send(*lastObjInfo) {
|
|
return
|
|
}
|
|
}
|
|
lastObjInfo = &objInfo
|
|
|
|
continue
|
|
}
|
|
|
|
// Fetch sub dirents.
|
|
subDirents, err := scandir(dirent.name, subDirentFilterFn, false)
|
|
if err != nil {
|
|
sendError(err)
|
|
return
|
|
}
|
|
|
|
subDirFound := false
|
|
uploadIDDirents := []fsDirent{}
|
|
// If subDirents has a directory, then current dirent needs to be sent
|
|
for _, subdirent := range subDirents {
|
|
if subdirent.IsDir() {
|
|
subDirFound = true
|
|
|
|
if recursive {
|
|
break
|
|
}
|
|
}
|
|
|
|
if !recursive && subdirent.IsRegular() {
|
|
uploadIDDirents = append(uploadIDDirents, subdirent)
|
|
}
|
|
}
|
|
|
|
// Send directory only for non-recursive listing
|
|
if !recursive && (subDirFound || len(subDirents) == 0) {
|
|
// Solaris and older unixes have modTime to be
|
|
// empty, fallback to os.Stat() to fill missing values.
|
|
if dirent.modTime.IsZero() {
|
|
if fi, e := os.Stat(dirent.name); e == nil {
|
|
dirent.modTime = fi.ModTime()
|
|
} else {
|
|
sendError(e)
|
|
return
|
|
}
|
|
}
|
|
|
|
objInfo := multipartObjectInfo{
|
|
Name: strings.Replace(dirent.name, bucketDir, "", 1),
|
|
ModifiedTime: dirent.modTime,
|
|
IsDir: true,
|
|
}
|
|
|
|
// as we got new object info, send last object info and keep new object info as last object info
|
|
if lastObjInfo != nil {
|
|
if !send(*lastObjInfo) {
|
|
return
|
|
}
|
|
}
|
|
lastObjInfo = &objInfo
|
|
}
|
|
|
|
if recursive {
|
|
dirents = append(subDirents, dirents...)
|
|
} else {
|
|
dirents = append(uploadIDDirents, dirents...)
|
|
}
|
|
}
|
|
|
|
if !recursive {
|
|
break
|
|
}
|
|
|
|
markerPath = scanDir + string(os.PathSeparator)
|
|
if scanDir = filepath.Dir(scanDir); scanDir < dirDepth {
|
|
break
|
|
}
|
|
}
|
|
|
|
if lastObjInfo != nil {
|
|
// we got last object
|
|
lastObjInfo.End = true
|
|
if !send(*lastObjInfo) {
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return objectInfoCh
|
|
}
|
|
|
|
// multipartObjectInfo - Multipart object info
|
|
type multipartObjectInfo struct {
|
|
Name string
|
|
UploadID string
|
|
ModifiedTime time.Time
|
|
IsDir bool
|
|
Err error
|
|
End bool
|
|
}
|