mirror of
				https://github.com/minio/minio.git
				synced 2025-10-30 00:05:02 -04:00 
			
		
		
		
	Merge pull request #1186 from balamurugana/devel
api: refactor list object handling in fs backend
This commit is contained in:
		
						commit
						af295f3600
					
				
							
								
								
									
										245
									
								
								pkg/fs/dir.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										245
									
								
								pkg/fs/dir.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,245 @@ | |||||||
|  | /* | ||||||
|  |  * Minio Cloud Storage, (C) 2016 Minio, Inc. | ||||||
|  |  * | ||||||
|  |  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  |  * you may not use this file except in compliance with the License. | ||||||
|  |  * You may obtain a copy of the License at | ||||||
|  |  * | ||||||
|  |  *     http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|  |  * | ||||||
|  |  * Unless required by applicable law or agreed to in writing, software | ||||||
|  |  * distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  |  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  |  * See the License for the specific language governing permissions and | ||||||
|  |  * limitations under the License. | ||||||
|  |  */ | ||||||
|  | 
 | ||||||
|  | package fs | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"io" | ||||||
|  | 	"os" | ||||||
|  | 	"path/filepath" | ||||||
|  | 	"sort" | ||||||
|  | 	"strings" | ||||||
|  | 	"time" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | const ( | ||||||
|  | 	// listObjectsLimit - maximum list objects limit | ||||||
|  | 	listObjectsLimit = 1000 | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | // IsDirEmpty - returns whether given directory is empty or not | ||||||
|  | func IsDirEmpty(dirname string) (status bool, err error) { | ||||||
|  | 	f, err := os.Open(dirname) | ||||||
|  | 	if err == nil { | ||||||
|  | 		defer f.Close() | ||||||
|  | 		if _, err = f.Readdirnames(1); err == io.EOF { | ||||||
|  | 			status = true | ||||||
|  | 			err = nil | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // IsDirExist - returns whether given directory is exist or not | ||||||
|  | func IsDirExist(dirname string) (status bool, err error) { | ||||||
|  | 	fi, err := os.Lstat(dirname) | ||||||
|  | 	if err == nil { | ||||||
|  | 		status = fi.IsDir() | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // byName implements sort.Interface for sorting os.FileInfo list | ||||||
|  | type byName []os.FileInfo | ||||||
|  | 
 | ||||||
|  | func (f byName) Len() int      { return len(f) } | ||||||
|  | func (f byName) Swap(i, j int) { f[i], f[j] = f[j], f[i] } | ||||||
|  | func (f byName) Less(i, j int) bool { | ||||||
|  | 	n1 := f[i].Name() | ||||||
|  | 	if f[i].IsDir() { | ||||||
|  | 		n1 = n1 + string(os.PathSeparator) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	n2 := f[j].Name() | ||||||
|  | 	if f[j].IsDir() { | ||||||
|  | 		n2 = n2 + string(os.PathSeparator) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return n1 < n2 | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // ObjectInfo - object info | ||||||
|  | type ObjectInfo struct { | ||||||
|  | 	Name         string | ||||||
|  | 	ModifiedTime time.Time | ||||||
|  | 	Checksum     string | ||||||
|  | 	Size         int64 | ||||||
|  | 	IsDir        bool | ||||||
|  | 	Err          error | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // readDir - read 'scanDir' directory.  It returns list of ObjectInfo where | ||||||
|  | //           each object name is appended with 'namePrefix' | ||||||
|  | func readDir(scanDir, namePrefix string) (objInfos []ObjectInfo) { | ||||||
|  | 	f, err := os.Open(scanDir) | ||||||
|  | 	if err != nil { | ||||||
|  | 		objInfos = append(objInfos, ObjectInfo{Err: err}) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	fis, err := f.Readdir(-1) | ||||||
|  | 	if err != nil { | ||||||
|  | 		f.Close() | ||||||
|  | 		objInfos = append(objInfos, ObjectInfo{Err: err}) | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	f.Close() | ||||||
|  | 	sort.Sort(byName(fis)) | ||||||
|  | 
 | ||||||
|  | 	// make []ObjectInfo from []FileInfo | ||||||
|  | 	for _, fi := range fis { | ||||||
|  | 		name := fi.Name() | ||||||
|  | 		if namePrefix != "" { | ||||||
|  | 			name = namePrefix + "/" + name | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		if fi.IsDir() { | ||||||
|  | 			name += "/" | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		objInfos = append(objInfos, ObjectInfo{ | ||||||
|  | 			Name:         name, | ||||||
|  | 			ModifiedTime: fi.ModTime(), | ||||||
|  | 			Checksum:     "", | ||||||
|  | 			Size:         fi.Size(), | ||||||
|  | 			IsDir:        fi.IsDir(), | ||||||
|  | 		}) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // ObjectInfoChannel - object info channel | ||||||
|  | type ObjectInfoChannel struct { | ||||||
|  | 	ch        <-chan ObjectInfo | ||||||
|  | 	objInfo   *ObjectInfo | ||||||
|  | 	closed    bool | ||||||
|  | 	timeoutCh <-chan struct{} | ||||||
|  | 	timedOut  bool | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (oic *ObjectInfoChannel) Read() (ObjectInfo, bool) { | ||||||
|  | 	if oic.closed { | ||||||
|  | 		return ObjectInfo{}, false | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if oic.objInfo == nil { | ||||||
|  | 		// first read | ||||||
|  | 		if oi, ok := <-oic.ch; ok { | ||||||
|  | 			oic.objInfo = &oi | ||||||
|  | 		} else { | ||||||
|  | 			oic.closed = true | ||||||
|  | 			return ObjectInfo{}, false | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	retObjInfo := *oic.objInfo | ||||||
|  | 	status := true | ||||||
|  | 	oic.objInfo = nil | ||||||
|  | 
 | ||||||
|  | 	// read once more to know whether it was last read | ||||||
|  | 	if oi, ok := <-oic.ch; ok { | ||||||
|  | 		oic.objInfo = &oi | ||||||
|  | 	} else { | ||||||
|  | 		oic.closed = true | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return retObjInfo, status | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // IsClosed - return whether channel is closed or not | ||||||
|  | func (oic ObjectInfoChannel) IsClosed() bool { | ||||||
|  | 	if oic.objInfo != nil { | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return oic.closed | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // IsTimedOut - return whether channel is closed due to timeout | ||||||
|  | func (oic ObjectInfoChannel) IsTimedOut() bool { | ||||||
|  | 	if oic.timedOut { | ||||||
|  | 		return true | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	select { | ||||||
|  | 	case _, ok := <-oic.timeoutCh: | ||||||
|  | 		if ok { | ||||||
|  | 			oic.timedOut = true | ||||||
|  | 			return true | ||||||
|  | 		} | ||||||
|  | 		return false | ||||||
|  | 	default: | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // treeWalk - walk into 'scanDir' recursively when 'recursive' is true. | ||||||
|  | //            It uses 'bucketDir' to get name prefix for object name. | ||||||
|  | func treeWalk(scanDir, bucketDir string, recursive bool) ObjectInfoChannel { | ||||||
|  | 	objectInfoCh := make(chan ObjectInfo, listObjectsLimit) | ||||||
|  | 	timeoutCh := make(chan struct{}, 1) | ||||||
|  | 
 | ||||||
|  | 	// goroutine - retrieves directory entries, makes ObjectInfo and sends into the channel | ||||||
|  | 	go func() { | ||||||
|  | 		defer close(objectInfoCh) | ||||||
|  | 		defer close(timeoutCh) | ||||||
|  | 
 | ||||||
|  | 		// send function - returns true if ObjectInfo is sent | ||||||
|  | 		//                 within (time.Second * 15) else false on time-out | ||||||
|  | 		send := func(oi ObjectInfo) bool { | ||||||
|  | 			timer := time.After(time.Second * 15) | ||||||
|  | 			select { | ||||||
|  | 			case objectInfoCh <- oi: | ||||||
|  | 				return true | ||||||
|  | 			case <-timer: | ||||||
|  | 				timeoutCh <- struct{}{} | ||||||
|  | 				return false | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		namePrefix := strings.Replace(filepath.ToSlash(scanDir), filepath.ToSlash(bucketDir), "", 1) | ||||||
|  | 		if strings.HasPrefix(namePrefix, "/") { | ||||||
|  | 			/* remove beginning "/" */ | ||||||
|  | 			namePrefix = namePrefix[1:] | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		for objInfos := readDir(scanDir, namePrefix); len(objInfos) > 0; { | ||||||
|  | 			var objInfo ObjectInfo | ||||||
|  | 			objInfo, objInfos = objInfos[0], objInfos[1:] | ||||||
|  | 			if !send(objInfo) { | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			if objInfo.IsDir && recursive { | ||||||
|  | 				scanDir := filepath.Join(bucketDir, filepath.FromSlash(objInfo.Name)) | ||||||
|  | 
 | ||||||
|  | 				namePrefix = strings.Replace(filepath.ToSlash(scanDir), filepath.ToSlash(bucketDir), "", 1) | ||||||
|  | 				if strings.HasPrefix(namePrefix, "/") { | ||||||
|  | 					/* remove beginning "/" */ | ||||||
|  | 					namePrefix = namePrefix[1:] | ||||||
|  | 				} | ||||||
|  | 
 | ||||||
|  | 				objInfos = append(readDir(scanDir, namePrefix), objInfos...) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  | 
 | ||||||
|  | 	return ObjectInfoChannel{ch: objectInfoCh, timeoutCh: timeoutCh} | ||||||
|  | } | ||||||
| @ -17,313 +17,118 @@ | |||||||
| package fs | package fs | ||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"errors" | 	"fmt" | ||||||
| 	"hash/fnv" |  | ||||||
| 	"net/url" | 	"net/url" | ||||||
| 	"os" | 	"os" | ||||||
| 	"path/filepath" | 	"path/filepath" | ||||||
| 	"strings" | 	"strings" | ||||||
| 	"time" |  | ||||||
| 
 | 
 | ||||||
| 	"github.com/minio/minio/pkg/ioutils" |  | ||||||
| 	"github.com/minio/minio/pkg/probe" | 	"github.com/minio/minio/pkg/probe" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| // listObjectsParams - list objects input parameters. |  | ||||||
| type listObjectsParams struct { |  | ||||||
| 	// Bucket name to list the objects for. |  | ||||||
| 	Bucket string |  | ||||||
| 	// list all objects with this parameter as common prefix. |  | ||||||
| 	Prefix string |  | ||||||
| 	// list all objects starting with object after marker in |  | ||||||
| 	// lexicographical order. |  | ||||||
| 	Marker string |  | ||||||
| 	// list all objects until the first occurrence of the delimtier |  | ||||||
| 	// after the prefix. |  | ||||||
| 	Delimiter string |  | ||||||
| 	// maximum number of objects returned per listObjects() |  | ||||||
| 	// operation. |  | ||||||
| 	MaxKeys int |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // listServiceReq |  | ||||||
| type listServiceReq struct { |  | ||||||
| 	reqParams listObjectsParams |  | ||||||
| 	respCh    chan ListObjectsResult |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| type listWorkerReq struct { |  | ||||||
| 	respCh chan ListObjectsResult |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // listObjects - list objects lists objects up to maxKeys for a given prefix. |  | ||||||
| func (fs Filesystem) listObjects(bucket, prefix, marker, delimiter string, maxKeys int) (chan<- listWorkerReq, *probe.Error) { |  | ||||||
| 	quitWalker := make(chan bool) |  | ||||||
| 	reqCh := make(chan listWorkerReq) |  | ||||||
| 	walkerCh := make(chan ObjectMetadata, 2000) |  | ||||||
| 	go func() { |  | ||||||
| 		defer close(walkerCh) |  | ||||||
| 		var walkPath string |  | ||||||
| 		bucketPath := filepath.Join(fs.path, bucket) |  | ||||||
| 		// Bucket path prefix should always end with a separator. |  | ||||||
| 		bucketPathPrefix := bucketPath + string(os.PathSeparator) |  | ||||||
| 		prefixPath := bucketPathPrefix + prefix |  | ||||||
| 		st, e := os.Stat(prefixPath) |  | ||||||
| 		if e != nil { |  | ||||||
| 			if os.IsNotExist(e) { |  | ||||||
| 				walkPath = bucketPath |  | ||||||
| 			} |  | ||||||
| 		} else { |  | ||||||
| 			if st.IsDir() && !strings.HasSuffix(prefix, delimiter) { |  | ||||||
| 				walkPath = bucketPath |  | ||||||
| 			} else { |  | ||||||
| 				walkPath = prefixPath |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 		ioutils.FTW(walkPath, func(path string, info os.FileInfo, e error) error { |  | ||||||
| 			// For any error return right here. |  | ||||||
| 			if e != nil { |  | ||||||
| 				return e |  | ||||||
| 			} |  | ||||||
| 			// Skip special temporary files, kept for multipart transaction. |  | ||||||
| 			if strings.Contains(path, "$multiparts") || strings.Contains(path, "$tmpobject") { |  | ||||||
| 				return nil |  | ||||||
| 			} |  | ||||||
| 			// We don't need to list the walk path if its a directory. |  | ||||||
| 			if path == walkPath && info.IsDir() { |  | ||||||
| 				return nil |  | ||||||
| 			} |  | ||||||
| 			// Skip all directories if there is no delimiter. |  | ||||||
| 			if info.IsDir() && delimiter == "" { |  | ||||||
| 				return nil |  | ||||||
| 			} |  | ||||||
| 			// For all incoming directories add a ending separator. |  | ||||||
| 			if info.IsDir() { |  | ||||||
| 				path = path + string(os.PathSeparator) |  | ||||||
| 			} |  | ||||||
| 			// Extract object name. |  | ||||||
| 			objectName := strings.TrimPrefix(path, bucketPathPrefix) |  | ||||||
| 			if strings.HasPrefix(objectName, prefix) { |  | ||||||
| 				object := ObjectMetadata{ |  | ||||||
| 					Object:       objectName, |  | ||||||
| 					LastModified: info.ModTime(), |  | ||||||
| 					Mode:         info.Mode(), |  | ||||||
| 					Size:         info.Size(), |  | ||||||
| 				} |  | ||||||
| 				select { |  | ||||||
| 				// Send object on walker channel. |  | ||||||
| 				case walkerCh <- object: |  | ||||||
| 				case <-quitWalker: |  | ||||||
| 					// Returning error ends the file tree Walk(). |  | ||||||
| 					return errors.New("Quit list worker.") |  | ||||||
| 				} |  | ||||||
| 				// If delimiter is set, we stop if current path is a directory. |  | ||||||
| 				if delimiter != "" && info.IsDir() { |  | ||||||
| 					return ioutils.ErrSkipDir |  | ||||||
| 				} |  | ||||||
| 			} |  | ||||||
| 			return nil |  | ||||||
| 		}) |  | ||||||
| 	}() |  | ||||||
| 
 |  | ||||||
| 	go func() { |  | ||||||
| 		for { |  | ||||||
| 			select { |  | ||||||
| 			// Timeout after 30 seconds if request did not arrive for |  | ||||||
| 			// the given list parameters. |  | ||||||
| 			case <-time.After(30 * time.Second): |  | ||||||
| 				quitWalker <- true // Quit file path walk if running. |  | ||||||
| 				// Send back the hash for this request. |  | ||||||
| 				fs.timeoutReqCh <- fnvSum(bucket, prefix, marker, delimiter) |  | ||||||
| 				return |  | ||||||
| 			case req, ok := <-reqCh: |  | ||||||
| 				if !ok { |  | ||||||
| 					// If the request channel is closed, no more |  | ||||||
| 					// requests return here. |  | ||||||
| 					return |  | ||||||
| 				} |  | ||||||
| 				resp := ListObjectsResult{} |  | ||||||
| 				var count int |  | ||||||
| 				for { |  | ||||||
| 					// We have read all the keys necessary by now. We |  | ||||||
| 					// cleanly break out. |  | ||||||
| 					if count == maxKeys { |  | ||||||
| 						if delimiter != "" { |  | ||||||
| 							// Set the next marker for the next request. |  | ||||||
| 							// This element is set only if you have delimiter set. |  | ||||||
| 							// If response does not include the NextMaker and it is |  | ||||||
| 							// truncated, you can use the value of the last Key in the |  | ||||||
| 							// response as the marker in the subsequent request to get the |  | ||||||
| 							// next set of object keys. |  | ||||||
| 							if len(resp.Objects) > 0 { |  | ||||||
| 								// NextMarker is only set when there |  | ||||||
| 								// are more than maxKeys worth of |  | ||||||
| 								// objects for a given prefix path. |  | ||||||
| 								resp.NextMarker = resp.Objects[len(resp.Objects)-1:][0].Object |  | ||||||
| 							} |  | ||||||
| 						} |  | ||||||
| 						resp.IsTruncated = len(walkerCh) > 0 |  | ||||||
| 						break |  | ||||||
| 					} |  | ||||||
| 					object, walkerOK := <-walkerCh |  | ||||||
| 					// If the channel is closed return right here. |  | ||||||
| 					if !walkerOK { |  | ||||||
| 						break |  | ||||||
| 					} |  | ||||||
| 					// Verify if the object is lexically smaller than |  | ||||||
| 					// the marker, we will skip those objects. |  | ||||||
| 					if marker != "" { |  | ||||||
| 						if marker >= object.Object { |  | ||||||
| 							continue |  | ||||||
| 						} else { |  | ||||||
| 							// Reset marker so that we avoid comparing |  | ||||||
| 							// again and again in a loop unecessarily. |  | ||||||
| 							marker = "" |  | ||||||
| 						} |  | ||||||
| 					} |  | ||||||
| 					if delimiter != "" { |  | ||||||
| 						// Prefixes are only valid wth delimiters, and |  | ||||||
| 						// for filesystem backend they are only valid |  | ||||||
| 						// if they are directories. |  | ||||||
| 						if object.Mode.IsDir() { |  | ||||||
| 							resp.Prefixes = append(resp.Prefixes, object.Object) |  | ||||||
| 						} else { |  | ||||||
| 							// Rest of them are treated as objects. |  | ||||||
| 							resp.Objects = append(resp.Objects, object) |  | ||||||
| 						} |  | ||||||
| 					} else { |  | ||||||
| 						// In-case of no delimiters, there are no |  | ||||||
| 						// prefixes - all are considered to be objects. |  | ||||||
| 						resp.Objects = append(resp.Objects, object) |  | ||||||
| 					} |  | ||||||
| 					count++ // Bump the number. |  | ||||||
| 				} |  | ||||||
| 				// Set the marker right here for the new set of the |  | ||||||
| 				// values coming in the from the client. |  | ||||||
| 				marker = resp.NextMarker |  | ||||||
| 				req.respCh <- resp |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 	}() |  | ||||||
| 	return reqCh, nil |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // fnvSum calculates a hash for concatenation of all input strings. |  | ||||||
| func fnvSum(elements ...string) uint32 { |  | ||||||
| 	fnvHash := fnv.New32a() |  | ||||||
| 	for _, element := range elements { |  | ||||||
| 		fnvHash.Write([]byte(element)) |  | ||||||
| 	} |  | ||||||
| 	return fnvHash.Sum32() |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // listObjectsService - list objects service manages various incoming |  | ||||||
| // list object requests by delegating them to an existing listObjects |  | ||||||
| // routine or initializes a new listObjects routine. |  | ||||||
| func (fs *Filesystem) listObjectsService() *probe.Error { |  | ||||||
| 	// Initialize list service request channel. |  | ||||||
| 	listServiceReqCh := make(chan listServiceReq) |  | ||||||
| 	fs.listServiceReqCh = listServiceReqCh |  | ||||||
| 
 |  | ||||||
| 	// Initialize timeout request channel to receive request hashes of |  | ||||||
| 	// timed-out requests. |  | ||||||
| 	timeoutReqCh := make(chan uint32) |  | ||||||
| 	fs.timeoutReqCh = timeoutReqCh |  | ||||||
| 
 |  | ||||||
| 	// Initialize request hash to list worker map. |  | ||||||
| 	reqToListWorkerReqCh := make(map[uint32]chan<- listWorkerReq) |  | ||||||
| 
 |  | ||||||
| 	// Start service in a go routine. |  | ||||||
| 	go func() { |  | ||||||
| 		for { |  | ||||||
| 			select { |  | ||||||
| 			case reqHash := <-timeoutReqCh: |  | ||||||
| 				// For requests which have timed-out, close the worker |  | ||||||
| 				// channels proactively, this may happen for idle |  | ||||||
| 				// workers once in 10seconds. |  | ||||||
| 				listWorkerReqCh, ok := reqToListWorkerReqCh[reqHash] |  | ||||||
| 				if ok { |  | ||||||
| 					close(listWorkerReqCh) |  | ||||||
| 				} |  | ||||||
| 				delete(reqToListWorkerReqCh, reqHash) |  | ||||||
| 			case srvReq := <-listServiceReqCh: |  | ||||||
| 				// Save the params for readability. |  | ||||||
| 				bucket := srvReq.reqParams.Bucket |  | ||||||
| 				prefix := srvReq.reqParams.Prefix |  | ||||||
| 				marker := srvReq.reqParams.Marker |  | ||||||
| 				delimiter := srvReq.reqParams.Delimiter |  | ||||||
| 				maxKeys := srvReq.reqParams.MaxKeys |  | ||||||
| 
 |  | ||||||
| 				// Generate hash. |  | ||||||
| 				reqHash := fnvSum(bucket, prefix, marker, delimiter) |  | ||||||
| 				listWorkerReqCh, ok := reqToListWorkerReqCh[reqHash] |  | ||||||
| 				if !ok { |  | ||||||
| 					var err *probe.Error |  | ||||||
| 					listWorkerReqCh, err = fs.listObjects(bucket, prefix, marker, delimiter, maxKeys) |  | ||||||
| 					if err != nil { |  | ||||||
| 						srvReq.respCh <- ListObjectsResult{} |  | ||||||
| 						return |  | ||||||
| 					} |  | ||||||
| 					reqToListWorkerReqCh[reqHash] = listWorkerReqCh |  | ||||||
| 				} |  | ||||||
| 				respCh := make(chan ListObjectsResult) |  | ||||||
| 				listWorkerReqCh <- listWorkerReq{respCh} |  | ||||||
| 				resp, ok := <-respCh |  | ||||||
| 				if !ok { |  | ||||||
| 					srvReq.respCh <- ListObjectsResult{} |  | ||||||
| 					return |  | ||||||
| 				} |  | ||||||
| 				delete(reqToListWorkerReqCh, reqHash) |  | ||||||
| 				if !resp.IsTruncated { |  | ||||||
| 					close(listWorkerReqCh) |  | ||||||
| 				} |  | ||||||
| 				srvReq.respCh <- resp |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 	}() |  | ||||||
| 	return nil |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // ListObjects - lists all objects for a given prefix, returns up to | // ListObjects - lists all objects for a given prefix, returns up to | ||||||
| // maxKeys number of objects per call. | // maxKeys number of objects per call. | ||||||
| func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsResult, *probe.Error) { | func (fs Filesystem) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (ListObjectsResult, *probe.Error) { | ||||||
|  | 	result := ListObjectsResult{} | ||||||
|  | 
 | ||||||
| 	// Input validation. | 	// Input validation. | ||||||
| 	if !IsValidBucketName(bucket) { | 	if !IsValidBucketName(bucket) { | ||||||
| 		return ListObjectsResult{}, probe.NewError(BucketNameInvalid{Bucket: bucket}) | 		return result, probe.NewError(BucketNameInvalid{Bucket: bucket}) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	bucket = fs.denormalizeBucket(bucket) | 	bucket = fs.denormalizeBucket(bucket) | ||||||
| 	rootPrefix := filepath.Join(fs.path, bucket) | 
 | ||||||
| 	// Check bucket exists. | 	if status, err := IsDirExist(filepath.Join(fs.path, bucket)); !status { | ||||||
| 	if _, e := os.Stat(rootPrefix); e != nil { | 		if err == nil { | ||||||
| 		if os.IsNotExist(e) { | 			return result, probe.NewError(BucketNotFound{Bucket: bucket}) | ||||||
| 			return ListObjectsResult{}, probe.NewError(BucketNotFound{Bucket: bucket}) | 		} else if os.IsNotExist(err) { | ||||||
|  | 			return result, probe.NewError(BucketNotFound{Bucket: bucket}) | ||||||
|  | 		} else { | ||||||
|  | 			return result, probe.NewError(err) | ||||||
| 		} | 		} | ||||||
| 		return ListObjectsResult{}, probe.NewError(e) |  | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// Unescape the marker values. | 	if delimiter != "" && delimiter != "/" { | ||||||
| 	markerUnescaped, e := url.QueryUnescape(marker) | 		return result, probe.NewError(fmt.Errorf("delimiter '%s' is not supported", delimiter)) | ||||||
| 	if e != nil { |  | ||||||
| 		return ListObjectsResult{}, probe.NewError(e) |  | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	reqParams := listObjectsParams{} | 	if marker != "" { | ||||||
| 	reqParams.Bucket = bucket | 		if markerUnescaped, err := url.QueryUnescape(marker); err == nil { | ||||||
| 	reqParams.Prefix = filepath.FromSlash(prefix) | 			marker = markerUnescaped | ||||||
| 	reqParams.Marker = filepath.FromSlash(markerUnescaped) | 		} else { | ||||||
| 	reqParams.Delimiter = filepath.FromSlash(delimiter) | 			return result, probe.NewError(err) | ||||||
| 	reqParams.MaxKeys = maxKeys | 		} | ||||||
| 
 | 
 | ||||||
| 	respCh := make(chan ListObjectsResult) | 		if !strings.HasPrefix(marker, prefix) { | ||||||
| 	fs.listServiceReqCh <- listServiceReq{reqParams, respCh} | 			return result, probe.NewError(fmt.Errorf("marker '%s' and prefix '%s' do not match", marker, prefix)) | ||||||
| 	resp := <-respCh | 		} | ||||||
|  | 	} | ||||||
| 
 | 
 | ||||||
| 	for i := range resp.Prefixes { | 	if maxKeys <= 0 || maxKeys > listObjectsLimit { | ||||||
| 		resp.Prefixes[i] = filepath.ToSlash(resp.Prefixes[i]) | 		maxKeys = listObjectsLimit | ||||||
| 	} | 	} | ||||||
| 	for i := range resp.Objects { | 
 | ||||||
| 		resp.Objects[i].Object = filepath.ToSlash(resp.Objects[i].Object) | 	bucketDir := filepath.Join(fs.path, bucket) | ||||||
|  | 
 | ||||||
|  | 	recursive := true | ||||||
|  | 	skipDir := true | ||||||
|  | 	if delimiter == "/" { | ||||||
|  | 		skipDir = false | ||||||
|  | 		recursive = false | ||||||
| 	} | 	} | ||||||
| 	return resp, nil | 
 | ||||||
|  | 	prefixDir := filepath.Dir(filepath.FromSlash(prefix)) | ||||||
|  | 	rootDir := filepath.Join(bucketDir, prefixDir) | ||||||
|  | 
 | ||||||
|  | 	objectInfoCh := fs.popListObjectCh(ListObjectParams{bucket, delimiter, marker, prefix}) | ||||||
|  | 	if objectInfoCh == nil { | ||||||
|  | 		ch := treeWalk(rootDir, bucketDir, recursive) | ||||||
|  | 		objectInfoCh = &ch | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	nextMarker := "" | ||||||
|  | 	for i := 0; i < maxKeys; { | ||||||
|  | 		objInfo, ok := objectInfoCh.Read() | ||||||
|  | 		if !ok { | ||||||
|  | 			// closed channel | ||||||
|  | 			return result, nil | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		if objInfo.Err != nil { | ||||||
|  | 			return ListObjectsResult{}, probe.NewError(objInfo.Err) | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		if strings.Contains(objInfo.Name, "$multiparts") || strings.Contains(objInfo.Name, "$tmpobject") { | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		if objInfo.IsDir && skipDir { | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		if strings.HasPrefix(objInfo.Name, prefix) { | ||||||
|  | 			if objInfo.Name > marker { | ||||||
|  | 				if objInfo.IsDir { | ||||||
|  | 					result.Prefixes = append(result.Prefixes, objInfo.Name) | ||||||
|  | 				} else { | ||||||
|  | 					result.Objects = append(result.Objects, ObjectMetadata{ | ||||||
|  | 						Bucket:       bucket, | ||||||
|  | 						Object:       objInfo.Name, | ||||||
|  | 						LastModified: objInfo.ModifiedTime, | ||||||
|  | 						Size:         objInfo.Size, | ||||||
|  | 					}) | ||||||
|  | 				} | ||||||
|  | 				nextMarker = objInfo.Name | ||||||
|  | 				i++ | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if !objectInfoCh.IsClosed() { | ||||||
|  | 		result.IsTruncated = true | ||||||
|  | 		result.NextMarker = nextMarker | ||||||
|  | 		fs.pushListObjectCh(ListObjectParams{bucket, delimiter, nextMarker, prefix}, *objectInfoCh) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return result, nil | ||||||
| } | } | ||||||
|  | |||||||
							
								
								
									
										64
									
								
								pkg/fs/fs.go
									
									
									
									
									
								
							
							
						
						
									
										64
									
								
								pkg/fs/fs.go
									
									
									
									
									
								
							| @ -25,14 +25,59 @@ import ( | |||||||
| 	"github.com/minio/minio/pkg/probe" | 	"github.com/minio/minio/pkg/probe" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | // ListObjectParams - list object params used for list object map | ||||||
|  | type ListObjectParams struct { | ||||||
|  | 	bucket    string | ||||||
|  | 	delimiter string | ||||||
|  | 	marker    string | ||||||
|  | 	prefix    string | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // Filesystem - local variables | // Filesystem - local variables | ||||||
| type Filesystem struct { | type Filesystem struct { | ||||||
| 	path             string | 	path               string | ||||||
| 	minFreeDisk      int64 | 	minFreeDisk        int64 | ||||||
| 	rwLock           *sync.RWMutex | 	rwLock             *sync.RWMutex | ||||||
| 	multiparts       *Multiparts | 	multiparts         *Multiparts | ||||||
| 	listServiceReqCh chan<- listServiceReq | 	listObjectMap      map[ListObjectParams][]ObjectInfoChannel | ||||||
| 	timeoutReqCh     chan<- uint32 | 	listObjectMapMutex *sync.Mutex | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (fs *Filesystem) pushListObjectCh(params ListObjectParams, ch ObjectInfoChannel) { | ||||||
|  | 	fs.listObjectMapMutex.Lock() | ||||||
|  | 	defer fs.listObjectMapMutex.Unlock() | ||||||
|  | 
 | ||||||
|  | 	channels := []ObjectInfoChannel{ch} | ||||||
|  | 	if _, ok := fs.listObjectMap[params]; ok { | ||||||
|  | 		channels = append(fs.listObjectMap[params], ch) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	fs.listObjectMap[params] = channels | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (fs *Filesystem) popListObjectCh(params ListObjectParams) *ObjectInfoChannel { | ||||||
|  | 	fs.listObjectMapMutex.Lock() | ||||||
|  | 	defer fs.listObjectMapMutex.Unlock() | ||||||
|  | 
 | ||||||
|  | 	if channels, ok := fs.listObjectMap[params]; ok { | ||||||
|  | 		for i, channel := range channels { | ||||||
|  | 			if !channel.IsTimedOut() { | ||||||
|  | 				chs := channels[i+1:] | ||||||
|  | 				if len(chs) > 0 { | ||||||
|  | 					fs.listObjectMap[params] = chs | ||||||
|  | 				} else { | ||||||
|  | 					delete(fs.listObjectMap, params) | ||||||
|  | 				} | ||||||
|  | 
 | ||||||
|  | 				return &channel | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		// As all channels are timed out, delete the map entry | ||||||
|  | 		delete(fs.listObjectMap, params) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // MultipartSession holds active session information | // MultipartSession holds active session information | ||||||
| @ -83,10 +128,9 @@ func New(rootPath string, minFreeDisk int64) (Filesystem, *probe.Error) { | |||||||
| 	// minium free disk required for i/o operations to succeed. | 	// minium free disk required for i/o operations to succeed. | ||||||
| 	fs.minFreeDisk = minFreeDisk | 	fs.minFreeDisk = minFreeDisk | ||||||
| 
 | 
 | ||||||
| 	// Start list goroutine. | 	fs.listObjectMap = make(map[ListObjectParams][]ObjectInfoChannel) | ||||||
| 	if err = fs.listObjectsService(); err != nil { | 	fs.listObjectMapMutex = &sync.Mutex{} | ||||||
| 		return Filesystem{}, err.Trace(rootPath) | 
 | ||||||
| 	} |  | ||||||
| 	// Return here. | 	// Return here. | ||||||
| 	return fs, nil | 	return fs, nil | ||||||
| } | } | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user