mirror of
https://github.com/minio/minio.git
synced 2025-01-23 12:43:16 -05:00
155a90403a
This PR also does backend format change to 1.0.1 from 1.0.0. Backward compatible changes are still kept to read the 'md5Sum' key. But all new objects will be stored with the same details under 'etag'. Fixes #4312
237 lines
6.0 KiB
Go
237 lines
6.0 KiB
Go
/*
|
|
* Minio Cloud Storage, (C) 2016, 2017 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"sync"
|
|
|
|
humanize "github.com/dustin/go-humanize"
|
|
)
|
|
|
|
const (
|
|
// Block size used for all internal operations version 1.
|
|
blockSizeV1 = 10 * humanize.MiByte
|
|
|
|
// Staging buffer read size for all internal operations version 1.
|
|
readSizeV1 = 1 * humanize.MiByte
|
|
|
|
// Buckets meta prefix.
|
|
bucketMetaPrefix = "buckets"
|
|
|
|
// Objects meta prefix.
|
|
objectMetaPrefix = "objects"
|
|
|
|
// ETag (hex encoded md5sum) of empty string.
|
|
emptyETag = "d41d8cd98f00b204e9800998ecf8427e"
|
|
)
|
|
|
|
// Global object layer mutex, used for safely updating object layer.
|
|
var globalObjLayerMutex *sync.RWMutex
|
|
|
|
// Global object layer, only accessed by newObjectLayerFn().
|
|
var globalObjectAPI ObjectLayer
|
|
|
|
func init() {
|
|
// Initialize this once per server initialization.
|
|
globalObjLayerMutex = &sync.RWMutex{}
|
|
}
|
|
|
|
// Check if the disk is remote.
|
|
func isRemoteDisk(disk StorageAPI) bool {
|
|
_, ok := disk.(*networkStorage)
|
|
return ok
|
|
}
|
|
|
|
// Checks if the object is a directory, this logic uses
|
|
// if size == 0 and object ends with slashSeparator then
|
|
// returns true.
|
|
func isObjectDir(object string, size int64) bool {
|
|
return hasSuffix(object, slashSeparator) && size == 0
|
|
}
|
|
|
|
// Converts just bucket, object metadata into ObjectInfo datatype.
|
|
func dirObjectInfo(bucket, object string, size int64, metadata map[string]string) ObjectInfo {
|
|
// This is a special case with size as '0' and object ends with
|
|
// a slash separator, we treat it like a valid operation and
|
|
// return success.
|
|
etag := metadata["etag"]
|
|
delete(metadata, "etag")
|
|
if etag == "" {
|
|
etag = emptyETag
|
|
}
|
|
|
|
return ObjectInfo{
|
|
Bucket: bucket,
|
|
Name: object,
|
|
ModTime: UTCNow(),
|
|
ContentType: "application/octet-stream",
|
|
IsDir: true,
|
|
Size: size,
|
|
ETag: etag,
|
|
UserDefined: metadata,
|
|
}
|
|
}
|
|
|
|
// House keeping code for FS/XL and distributed Minio setup.
|
|
func houseKeeping(storageDisks []StorageAPI) error {
|
|
var wg = &sync.WaitGroup{}
|
|
|
|
// Initialize errs to collect errors inside go-routine.
|
|
var errs = make([]error, len(storageDisks))
|
|
|
|
// Initialize all disks in parallel.
|
|
for index, disk := range storageDisks {
|
|
if disk == nil {
|
|
continue
|
|
}
|
|
// Skip remote disks.
|
|
if isRemoteDisk(disk) {
|
|
continue
|
|
}
|
|
wg.Add(1)
|
|
go func(index int, disk StorageAPI) {
|
|
// Indicate this wait group is done.
|
|
defer wg.Done()
|
|
|
|
// Cleanup all temp entries upon start.
|
|
err := cleanupDir(disk, minioMetaTmpBucket, "")
|
|
if err != nil {
|
|
if !isErrIgnored(errorCause(err), errDiskNotFound, errVolumeNotFound, errFileNotFound) {
|
|
errs[index] = err
|
|
}
|
|
}
|
|
}(index, disk)
|
|
}
|
|
|
|
// Wait for all cleanup to finish.
|
|
wg.Wait()
|
|
|
|
// Return upon first error.
|
|
for _, err := range errs {
|
|
if err == nil {
|
|
continue
|
|
}
|
|
return toObjectErr(err, minioMetaTmpBucket, "*")
|
|
}
|
|
|
|
// Return success here.
|
|
return nil
|
|
}
|
|
|
|
// Depending on the disk type network or local, initialize storage API.
|
|
func newStorageAPI(endpoint Endpoint) (storage StorageAPI, err error) {
|
|
if endpoint.IsLocal {
|
|
return newPosix(endpoint.Path)
|
|
}
|
|
|
|
return newStorageRPC(endpoint), nil
|
|
}
|
|
|
|
var initMetaVolIgnoredErrs = append(baseIgnoredErrs, errVolumeExists)
|
|
|
|
// Initializes meta volume on all input storage disks.
|
|
func initMetaVolume(storageDisks []StorageAPI) error {
|
|
// This happens for the first time, but keep this here since this
|
|
// is the only place where it can be made expensive optimizing all
|
|
// other calls. Create minio meta volume, if it doesn't exist yet.
|
|
var wg = &sync.WaitGroup{}
|
|
|
|
// Initialize errs to collect errors inside go-routine.
|
|
var errs = make([]error, len(storageDisks))
|
|
|
|
// Initialize all disks in parallel.
|
|
for index, disk := range storageDisks {
|
|
if disk == nil {
|
|
// Ignore create meta volume on disks which are not found.
|
|
continue
|
|
}
|
|
wg.Add(1)
|
|
go func(index int, disk StorageAPI) {
|
|
// Indicate this wait group is done.
|
|
defer wg.Done()
|
|
|
|
// Attempt to create `.minio.sys`.
|
|
err := disk.MakeVol(minioMetaBucket)
|
|
if err != nil {
|
|
if !isErrIgnored(err, initMetaVolIgnoredErrs...) {
|
|
errs[index] = err
|
|
return
|
|
}
|
|
}
|
|
err = disk.MakeVol(minioMetaTmpBucket)
|
|
if err != nil {
|
|
if !isErrIgnored(err, initMetaVolIgnoredErrs...) {
|
|
errs[index] = err
|
|
return
|
|
}
|
|
}
|
|
err = disk.MakeVol(minioMetaMultipartBucket)
|
|
if err != nil {
|
|
if !isErrIgnored(err, initMetaVolIgnoredErrs...) {
|
|
errs[index] = err
|
|
return
|
|
}
|
|
}
|
|
}(index, disk)
|
|
}
|
|
|
|
// Wait for all cleanup to finish.
|
|
wg.Wait()
|
|
|
|
// Return upon first error.
|
|
for _, err := range errs {
|
|
if err == nil {
|
|
continue
|
|
}
|
|
return toObjectErr(err, minioMetaBucket)
|
|
}
|
|
|
|
// Return success here.
|
|
return nil
|
|
}
|
|
|
|
// Cleanup a directory recursively.
|
|
func cleanupDir(storage StorageAPI, volume, dirPath string) error {
|
|
var delFunc func(string) error
|
|
// Function to delete entries recursively.
|
|
delFunc = func(entryPath string) error {
|
|
if !hasSuffix(entryPath, slashSeparator) {
|
|
// Delete the file entry.
|
|
return traceError(storage.DeleteFile(volume, entryPath))
|
|
}
|
|
|
|
// If it's a directory, list and call delFunc() for each entry.
|
|
entries, err := storage.ListDir(volume, entryPath)
|
|
// If entryPath prefix never existed, safe to ignore.
|
|
if err == errFileNotFound {
|
|
return nil
|
|
} else if err != nil { // For any other errors fail.
|
|
return traceError(err)
|
|
} // else on success..
|
|
|
|
// Recurse and delete all other entries.
|
|
for _, entry := range entries {
|
|
if err = delFunc(pathJoin(entryPath, entry)); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
err := delFunc(retainSlash(pathJoin(dirPath)))
|
|
return err
|
|
}
|