Add full API tests, move storage/donut to donut, add disk tests as well

This commit is contained in:
Harshavardhana
2015-07-03 00:29:54 -07:00
parent 7c37e9d06a
commit 8a4e7bcdcf
36 changed files with 1110 additions and 60 deletions

2
pkg/donut/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
donut
build-constants.go

202
pkg/donut/LICENSE Normal file
View File

@@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

3
pkg/donut/README.md Normal file
View File

@@ -0,0 +1,3 @@
# Donut
donut - Donut (do not delete) on disk format implementation released under [Apache license v2](./LICENSE).

544
pkg/donut/bucket.go Normal file
View File

@@ -0,0 +1,544 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package donut
import (
"bytes"
"fmt"
"hash"
"io"
"path/filepath"
"sort"
"strings"
"sync"
"time"
"crypto/md5"
"encoding/hex"
"encoding/json"
"github.com/minio/minio/pkg/iodine"
"github.com/minio/minio/pkg/utils/crypto/sha512"
"github.com/minio/minio/pkg/utils/split"
)
const (
blockSize = 10 * 1024 * 1024
)
// internal struct carrying bucket specific information
type bucket struct {
name string
acl string
time time.Time
donutName string
nodes map[string]node
lock *sync.RWMutex
}
// newBucket - instantiate a new bucket
func newBucket(bucketName, aclType, donutName string, nodes map[string]node) (bucket, BucketMetadata, error) {
errParams := map[string]string{
"bucketName": bucketName,
"donutName": donutName,
"aclType": aclType,
}
if strings.TrimSpace(bucketName) == "" || strings.TrimSpace(donutName) == "" {
return bucket{}, BucketMetadata{}, iodine.New(InvalidArgument{}, errParams)
}
b := bucket{}
t := time.Now().UTC()
b.name = bucketName
b.acl = aclType
b.time = t
b.donutName = donutName
b.nodes = nodes
b.lock = new(sync.RWMutex)
metadata := BucketMetadata{}
metadata.Version = bucketMetadataVersion
metadata.Name = bucketName
metadata.ACL = BucketACL(aclType)
metadata.Created = t
metadata.Metadata = make(map[string]string)
metadata.BucketObjects = make(map[string]interface{})
return b, metadata, nil
}
func (b bucket) getBucketName() string {
return b.name
}
func (b bucket) getBucketMetadataReaders() ([]io.ReadCloser, error) {
var readers []io.ReadCloser
for _, node := range b.nodes {
disks, err := node.ListDisks()
if err != nil {
return nil, iodine.New(err, nil)
}
readers = make([]io.ReadCloser, len(disks))
for order, disk := range disks {
bucketMetaDataReader, err := disk.OpenFile(filepath.Join(b.donutName, bucketMetadataConfig))
if err != nil {
return nil, iodine.New(err, nil)
}
readers[order] = bucketMetaDataReader
}
}
return readers, nil
}
func (b bucket) getBucketMetadata() (*AllBuckets, error) {
metadata := new(AllBuckets)
readers, err := b.getBucketMetadataReaders()
if err != nil {
return nil, iodine.New(err, nil)
}
for _, reader := range readers {
defer reader.Close()
}
for _, reader := range readers {
jenc := json.NewDecoder(reader)
if err := jenc.Decode(metadata); err != nil {
return nil, iodine.New(err, nil)
}
return metadata, nil
}
return nil, iodine.New(InvalidArgument{}, nil)
}
// GetObjectMetadata - get metadata for an object
func (b bucket) GetObjectMetadata(objectName string) (ObjectMetadata, error) {
b.lock.RLock()
defer b.lock.RUnlock()
return b.readObjectMetadata(objectName)
}
// ListObjects - list all objects
func (b bucket) ListObjects(prefix, marker, delimiter string, maxkeys int) (ListObjectsResults, error) {
b.lock.RLock()
defer b.lock.RUnlock()
if maxkeys <= 0 {
maxkeys = 1000
}
var isTruncated bool
var objects []string
bucketMetadata, err := b.getBucketMetadata()
if err != nil {
return ListObjectsResults{}, iodine.New(err, nil)
}
for objectName := range bucketMetadata.Buckets[b.getBucketName()].BucketObjects {
if strings.HasPrefix(objectName, strings.TrimSpace(prefix)) {
if objectName > marker {
objects = append(objects, objectName)
}
}
}
if strings.TrimSpace(prefix) != "" {
objects = TrimPrefix(objects, prefix)
}
var prefixes []string
var filteredObjects []string
if strings.TrimSpace(delimiter) != "" {
filteredObjects = HasNoDelimiter(objects, delimiter)
prefixes = HasDelimiter(objects, delimiter)
prefixes = SplitDelimiter(prefixes, delimiter)
prefixes = SortU(prefixes)
} else {
filteredObjects = objects
}
var results []string
var commonPrefixes []string
for _, commonPrefix := range prefixes {
commonPrefixes = append(commonPrefixes, prefix+commonPrefix)
}
filteredObjects = RemoveDuplicates(filteredObjects)
sort.Strings(filteredObjects)
for _, objectName := range filteredObjects {
if len(results) >= maxkeys {
isTruncated = true
break
}
results = append(results, prefix+objectName)
}
results = RemoveDuplicates(results)
commonPrefixes = RemoveDuplicates(commonPrefixes)
sort.Strings(commonPrefixes)
listObjects := ListObjectsResults{}
listObjects.Objects = make(map[string]ObjectMetadata)
listObjects.CommonPrefixes = commonPrefixes
listObjects.IsTruncated = isTruncated
for _, objectName := range results {
objMetadata, err := b.readObjectMetadata(normalizeObjectName(objectName))
if err != nil {
return ListObjectsResults{}, iodine.New(err, nil)
}
listObjects.Objects[objectName] = objMetadata
}
return listObjects, nil
}
// ReadObject - open an object to read
func (b bucket) ReadObject(objectName string) (reader io.ReadCloser, size int64, err error) {
b.lock.RLock()
defer b.lock.RUnlock()
reader, writer := io.Pipe()
// get list of objects
bucketMetadata, err := b.getBucketMetadata()
if err != nil {
return nil, 0, iodine.New(err, nil)
}
// check if object exists
if _, ok := bucketMetadata.Buckets[b.getBucketName()].BucketObjects[objectName]; !ok {
return nil, 0, iodine.New(ObjectNotFound{Object: objectName}, nil)
}
objMetadata, err := b.readObjectMetadata(normalizeObjectName(objectName))
if err != nil {
return nil, 0, iodine.New(err, nil)
}
// read and reply back to GetObject() request in a go-routine
go b.readObjectData(normalizeObjectName(objectName), writer, objMetadata)
return reader, objMetadata.Size, nil
}
// WriteObject - write a new object into bucket
func (b bucket) WriteObject(objectName string, objectData io.Reader, expectedMD5Sum string, metadata map[string]string) (ObjectMetadata, error) {
b.lock.Lock()
defer b.lock.Unlock()
if objectName == "" || objectData == nil {
return ObjectMetadata{}, iodine.New(InvalidArgument{}, nil)
}
writers, err := b.getWriters(normalizeObjectName(objectName), "data")
if err != nil {
return ObjectMetadata{}, iodine.New(err, nil)
}
sumMD5 := md5.New()
sum512 := sha512.New()
objMetadata := ObjectMetadata{}
objMetadata.Version = objectMetadataVersion
objMetadata.Created = time.Now().UTC()
// if total writers are only '1' do not compute erasure
switch len(writers) == 1 {
case true:
mw := io.MultiWriter(writers[0], sumMD5, sum512)
totalLength, err := io.Copy(mw, objectData)
if err != nil {
return ObjectMetadata{}, iodine.New(err, nil)
}
objMetadata.Size = totalLength
case false:
// calculate data and parity dictated by total number of writers
k, m, err := b.getDataAndParity(len(writers))
if err != nil {
return ObjectMetadata{}, iodine.New(err, nil)
}
// write encoded data with k, m and writers
chunkCount, totalLength, err := b.writeObjectData(k, m, writers, objectData, sumMD5, sum512)
if err != nil {
return ObjectMetadata{}, iodine.New(err, nil)
}
/// donutMetadata section
objMetadata.BlockSize = blockSize
objMetadata.ChunkCount = chunkCount
objMetadata.DataDisks = k
objMetadata.ParityDisks = m
objMetadata.ErasureTechnique = "Cauchy"
objMetadata.Size = int64(totalLength)
}
objMetadata.Bucket = b.getBucketName()
objMetadata.Object = objectName
dataMD5sum := sumMD5.Sum(nil)
dataSHA512sum := sum512.Sum(nil)
objMetadata.MD5Sum = hex.EncodeToString(dataMD5sum)
objMetadata.SHA512Sum = hex.EncodeToString(dataSHA512sum)
// Verify if the written object is equal to what is expected, only if it is requested as such
if strings.TrimSpace(expectedMD5Sum) != "" {
if err := b.isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), objMetadata.MD5Sum); err != nil {
return ObjectMetadata{}, iodine.New(err, nil)
}
}
objMetadata.Metadata = metadata
// write object specific metadata
if err := b.writeObjectMetadata(normalizeObjectName(objectName), objMetadata); err != nil {
return ObjectMetadata{}, iodine.New(err, nil)
}
// close all writers, when control flow reaches here
for _, writer := range writers {
writer.Close()
}
return objMetadata, nil
}
// isMD5SumEqual - returns error if md5sum mismatches, other its `nil`
func (b bucket) isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error {
if strings.TrimSpace(expectedMD5Sum) != "" && strings.TrimSpace(actualMD5Sum) != "" {
expectedMD5SumBytes, err := hex.DecodeString(expectedMD5Sum)
if err != nil {
return iodine.New(err, nil)
}
actualMD5SumBytes, err := hex.DecodeString(actualMD5Sum)
if err != nil {
return iodine.New(err, nil)
}
if !bytes.Equal(expectedMD5SumBytes, actualMD5SumBytes) {
return iodine.New(BadDigest{}, nil)
}
return nil
}
return iodine.New(InvalidArgument{}, nil)
}
// writeObjectMetadata - write additional object metadata
func (b bucket) writeObjectMetadata(objectName string, objMetadata ObjectMetadata) error {
if objMetadata.Object == "" {
return iodine.New(InvalidArgument{}, nil)
}
objMetadataWriters, err := b.getWriters(objectName, objectMetadataConfig)
if err != nil {
return iodine.New(err, nil)
}
for _, objMetadataWriter := range objMetadataWriters {
defer objMetadataWriter.Close()
}
for _, objMetadataWriter := range objMetadataWriters {
jenc := json.NewEncoder(objMetadataWriter)
if err := jenc.Encode(&objMetadata); err != nil {
return iodine.New(err, nil)
}
}
return nil
}
// readObjectMetadata - read object metadata
func (b bucket) readObjectMetadata(objectName string) (ObjectMetadata, error) {
objMetadata := ObjectMetadata{}
if objectName == "" {
return ObjectMetadata{}, iodine.New(InvalidArgument{}, nil)
}
objMetadataReaders, err := b.getReaders(objectName, objectMetadataConfig)
if err != nil {
return ObjectMetadata{}, iodine.New(err, nil)
}
for _, objMetadataReader := range objMetadataReaders {
defer objMetadataReader.Close()
}
for _, objMetadataReader := range objMetadataReaders {
jdec := json.NewDecoder(objMetadataReader)
if err := jdec.Decode(&objMetadata); err != nil {
return ObjectMetadata{}, iodine.New(err, nil)
}
break
}
return objMetadata, nil
}
// TODO - This a temporary normalization of objectNames, need to find a better way
//
// normalizedObjectName - all objectNames with "/" get normalized to a simple objectName
//
// example:
// user provided value - "this/is/my/deep/directory/structure"
// donut normalized value - "this-is-my-deep-directory-structure"
//
func normalizeObjectName(objectName string) string {
// replace every '/' with '-'
return strings.Replace(objectName, "/", "-", -1)
}
// getDataAndParity - calculate k, m (data and parity) values from number of disks
func (b bucket) getDataAndParity(totalWriters int) (k uint8, m uint8, err error) {
if totalWriters <= 1 {
return 0, 0, iodine.New(InvalidArgument{}, nil)
}
quotient := totalWriters / 2 // not using float or abs to let integer round off to lower value
// quotient cannot be bigger than (255 / 2) = 127
if quotient > 127 {
return 0, 0, iodine.New(ParityOverflow{}, nil)
}
remainder := totalWriters % 2 // will be 1 for odd and 0 for even numbers
k = uint8(quotient + remainder)
m = uint8(quotient)
return k, m, nil
}
// writeObjectData -
func (b bucket) writeObjectData(k, m uint8, writers []io.WriteCloser, objectData io.Reader, sumMD5, sum512 hash.Hash) (int, int, error) {
encoder, err := newEncoder(k, m, "Cauchy")
if err != nil {
return 0, 0, iodine.New(err, nil)
}
chunkCount := 0
totalLength := 0
for chunk := range split.Stream(objectData, 10*1024*1024) {
if chunk.Err != nil {
return 0, 0, iodine.New(err, nil)
}
totalLength = totalLength + len(chunk.Data)
encodedBlocks, _ := encoder.Encode(chunk.Data)
sumMD5.Write(chunk.Data)
sum512.Write(chunk.Data)
for blockIndex, block := range encodedBlocks {
_, err := io.Copy(writers[blockIndex], bytes.NewBuffer(block))
if err != nil {
return 0, 0, iodine.New(err, nil)
}
}
chunkCount = chunkCount + 1
}
return chunkCount, totalLength, nil
}
// readObjectData -
func (b bucket) readObjectData(objectName string, writer *io.PipeWriter, objMetadata ObjectMetadata) {
readers, err := b.getReaders(objectName, "data")
if err != nil {
writer.CloseWithError(iodine.New(err, nil))
return
}
for _, reader := range readers {
defer reader.Close()
}
expectedMd5sum, err := hex.DecodeString(objMetadata.MD5Sum)
if err != nil {
writer.CloseWithError(iodine.New(err, nil))
return
}
hasher := md5.New()
mwriter := io.MultiWriter(writer, hasher)
switch len(readers) == 1 {
case false:
if objMetadata.ErasureTechnique == "" {
writer.CloseWithError(iodine.New(MissingErasureTechnique{}, nil))
return
}
encoder, err := newEncoder(objMetadata.DataDisks, objMetadata.ParityDisks, objMetadata.ErasureTechnique)
if err != nil {
writer.CloseWithError(iodine.New(err, nil))
return
}
totalLeft := objMetadata.Size
for i := 0; i < objMetadata.ChunkCount; i++ {
decodedData, err := b.decodeEncodedData(totalLeft, int64(objMetadata.BlockSize), readers, encoder, writer)
if err != nil {
writer.CloseWithError(iodine.New(err, nil))
return
}
_, err = io.Copy(mwriter, bytes.NewBuffer(decodedData))
if err != nil {
writer.CloseWithError(iodine.New(err, nil))
return
}
totalLeft = totalLeft - int64(objMetadata.BlockSize)
}
case true:
_, err := io.Copy(writer, readers[0])
if err != nil {
writer.CloseWithError(iodine.New(err, nil))
return
}
}
// check if decodedData md5sum matches
if !bytes.Equal(expectedMd5sum, hasher.Sum(nil)) {
writer.CloseWithError(iodine.New(ChecksumMismatch{}, nil))
return
}
writer.Close()
return
}
// decodeEncodedData -
func (b bucket) decodeEncodedData(totalLeft, blockSize int64, readers []io.ReadCloser, encoder encoder, writer *io.PipeWriter) ([]byte, error) {
var curBlockSize int64
if blockSize < totalLeft {
curBlockSize = blockSize
} else {
curBlockSize = totalLeft // cast is safe, blockSize in if protects
}
curChunkSize, err := encoder.GetEncodedBlockLen(int(curBlockSize))
if err != nil {
return nil, iodine.New(err, nil)
}
encodedBytes := make([][]byte, len(readers))
for i, reader := range readers {
var bytesBuffer bytes.Buffer
_, err := io.CopyN(&bytesBuffer, reader, int64(curChunkSize))
if err != nil {
return nil, iodine.New(err, nil)
}
encodedBytes[i] = bytesBuffer.Bytes()
}
decodedData, err := encoder.Decode(encodedBytes, int(curBlockSize))
if err != nil {
return nil, iodine.New(err, nil)
}
return decodedData, nil
}
// getReaders -
func (b bucket) getReaders(objectName, objectMeta string) ([]io.ReadCloser, error) {
var readers []io.ReadCloser
nodeSlice := 0
for _, node := range b.nodes {
disks, err := node.ListDisks()
if err != nil {
return nil, iodine.New(err, nil)
}
readers = make([]io.ReadCloser, len(disks))
for order, disk := range disks {
bucketSlice := fmt.Sprintf("%s$%d$%d", b.name, nodeSlice, order)
objectPath := filepath.Join(b.donutName, bucketSlice, objectName, objectMeta)
objectSlice, err := disk.OpenFile(objectPath)
if err != nil {
return nil, iodine.New(err, nil)
}
readers[order] = objectSlice
}
nodeSlice = nodeSlice + 1
}
return readers, nil
}
// getWriters -
func (b bucket) getWriters(objectName, objectMeta string) ([]io.WriteCloser, error) {
var writers []io.WriteCloser
nodeSlice := 0
for _, node := range b.nodes {
disks, err := node.ListDisks()
if err != nil {
return nil, iodine.New(err, nil)
}
writers = make([]io.WriteCloser, len(disks))
for order, disk := range disks {
bucketSlice := fmt.Sprintf("%s$%d$%d", b.name, nodeSlice, order)
objectPath := filepath.Join(b.donutName, bucketSlice, objectName, objectMeta)
objectSlice, err := disk.CreateFile(objectPath)
if err != nil {
return nil, iodine.New(err, nil)
}
writers[order] = objectSlice
}
nodeSlice = nodeSlice + 1
}
return writers, nil
}

123
pkg/donut/common.go Normal file
View File

@@ -0,0 +1,123 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package donut
import (
"bufio"
"bytes"
"io"
"sort"
"strings"
)
// ProxyWriter implements io.Writer to trap written bytes
type ProxyWriter struct {
writer io.Writer
writtenBytes []byte
}
func (r *ProxyWriter) Write(p []byte) (n int, err error) {
n, err = r.writer.Write(p)
if err != nil {
return
}
r.writtenBytes = append(r.writtenBytes, p[0:n]...)
return
}
// NewProxyWriter - wrap around a given writer with ProxyWriter
func NewProxyWriter(w io.Writer) *ProxyWriter {
return &ProxyWriter{writer: w, writtenBytes: nil}
}
// Delimiter delims the string at delimiter
func Delimiter(object, delimiter string) string {
readBuffer := bytes.NewBufferString(object)
reader := bufio.NewReader(readBuffer)
stringReader := strings.NewReader(delimiter)
delimited, _ := stringReader.ReadByte()
delimitedStr, _ := reader.ReadString(delimited)
return delimitedStr
}
// RemoveDuplicates removes duplicate elements from a slice
func RemoveDuplicates(slice []string) []string {
newSlice := []string{}
seen := make(map[string]struct{})
for _, val := range slice {
if _, ok := seen[val]; !ok {
newSlice = append(newSlice, val)
seen[val] = struct{}{} // avoiding byte allocation
}
}
return newSlice
}
// TrimPrefix trims off a prefix string from all the elements in a given slice
func TrimPrefix(objects []string, prefix string) []string {
var results []string
for _, object := range objects {
results = append(results, strings.TrimPrefix(object, prefix))
}
return results
}
// HasNoDelimiter provides a new slice from an input slice which has elements without delimiter
func HasNoDelimiter(objects []string, delim string) []string {
var results []string
for _, object := range objects {
if !strings.Contains(object, delim) {
results = append(results, object)
}
}
return results
}
// HasDelimiter provides a new slice from an input slice which has elements with a delimiter
func HasDelimiter(objects []string, delim string) []string {
var results []string
for _, object := range objects {
if strings.Contains(object, delim) {
results = append(results, object)
}
}
return results
}
// SplitDelimiter provides a new slice from an input slice by splitting a delimiter
func SplitDelimiter(objects []string, delim string) []string {
var results []string
for _, object := range objects {
parts := strings.Split(object, delim)
results = append(results, parts[0]+delim)
}
return results
}
// SortU sort a slice in lexical order, removing duplicate elements
func SortU(objects []string) []string {
objectMap := make(map[string]string)
for _, v := range objects {
objectMap[v] = v
}
var results []string
for k := range objectMap {
results = append(results, k)
}
sort.Strings(results)
return results
}

87
pkg/donut/config.go Normal file
View File

@@ -0,0 +1,87 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package donut
import (
"os/user"
"path/filepath"
"time"
"github.com/minio/minio/pkg/iodine"
"github.com/minio/minio/pkg/quick"
)
// getDonutConfigPath get donut config file path
func getDonutConfigPath() (string, error) {
u, err := user.Current()
if err != nil {
return "", iodine.New(err, nil)
}
donutConfigPath := filepath.Join(u.HomeDir, ".minio", "donut.json")
return donutConfigPath, nil
}
// SaveConfig save donut config
func SaveConfig(a *Config) error {
donutConfigPath, err := getDonutConfigPath()
if err != nil {
return iodine.New(err, nil)
}
qc, err := quick.New(a)
if err != nil {
return iodine.New(err, nil)
}
if err := qc.Save(donutConfigPath); err != nil {
return iodine.New(err, nil)
}
return nil
}
// LoadConfig load donut config
func LoadConfig() (*Config, error) {
donutConfigPath, err := getDonutConfigPath()
if err != nil {
return nil, iodine.New(err, nil)
}
a := &Config{}
a.Version = "0.0.1"
qc, err := quick.New(a)
if err != nil {
return nil, iodine.New(err, nil)
}
if err := qc.Load(donutConfigPath); err != nil {
return nil, iodine.New(err, nil)
}
return qc.Data().(*Config), nil
}
// LoadDonut load donut from config
func LoadDonut() (Interface, error) {
conf, err := LoadConfig()
if err != nil {
conf = &Config{
Version: "0.0.1",
MaxSize: 512000000,
Expiration: 1 * time.Hour,
}
}
donut, err := New(conf)
if err != nil {
return nil, iodine.New(err, nil)
}
return donut, nil
}

78
pkg/donut/date.go Normal file
View File

@@ -0,0 +1,78 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package donut
import (
"errors"
"fmt"
"strconv"
"strings"
)
// Date - [0000-00-00]
type Date struct {
Year int16
Month byte
Day byte
}
// String output in yyyy-mm-dd format
func (d Date) String() string {
return fmt.Sprintf("%04d-%02d-%02d", d.Year, d.Month, d.Day)
}
// IsZero true if date is 0000-00-00
func (d Date) IsZero() bool {
return d.Day == 0 && d.Month == 0 && d.Year == 0
}
// Convert string date in format YYYY-MM-DD to Date.
// Leading and trailing spaces are ignored. If format is invalid returns zero.
func parseDate(str string) (d Date, err error) {
str = strings.TrimSpace(str)
if str == "0000-00-00" {
return
}
var (
y, m, n int
)
if len(str) != 10 || str[4] != '-' || str[7] != '-' {
err = errors.New("Invalid 0000-00-000 style DATE string: " + str)
return
}
if y, err = strconv.Atoi(str[0:4]); err != nil {
return
}
if m, err = strconv.Atoi(str[5:7]); err != nil {
return
}
if m < 1 || m > 12 {
err = errors.New("Invalid 0000-00-000 style DATE string: " + str)
return
}
if n, err = strconv.Atoi(str[8:10]); err != nil {
return
}
if n < 1 || n > 31 {
err = errors.New("Invalid 0000-00-000 style DATE string: " + str)
return
}
d.Year = int16(y)
d.Month = byte(m)
d.Day = byte(n)
return
}

73
pkg/donut/definitions.go Normal file
View File

@@ -0,0 +1,73 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package donut
import "time"
// ObjectMetadata container for object on donut system
type ObjectMetadata struct {
// version
Version string `json:"version"`
// object metadata
Created time.Time `json:"created"`
Bucket string `json:"bucket"`
Object string `json:"object"`
Size int64 `json:"size"`
// erasure
DataDisks uint8 `json:"sys.erasureK"`
ParityDisks uint8 `json:"sys.erasureM"`
ErasureTechnique string `json:"sys.erasureTechnique"`
BlockSize int `json:"sys.blockSize"`
ChunkCount int `json:"sys.chunkCount"`
// checksums
MD5Sum string `json:"sys.md5sum"`
SHA512Sum string `json:"sys.sha512sum"`
// metadata
Metadata map[string]string `json:"metadata"`
}
// Metadata container for donut metadata
type Metadata struct {
Version string `json:"version"`
}
// AllBuckets container for all buckets
type AllBuckets struct {
Version string `json:"version"`
Buckets map[string]BucketMetadata `json:"buckets"`
}
// BucketMetadata container for bucket level metadata
type BucketMetadata struct {
Version string `json:"version"`
Name string `json:"name"`
ACL BucketACL `json:"acl"`
Created time.Time `json:"created"`
Metadata map[string]string `json:"metadata"`
BucketObjects map[string]interface{} `json:"objects"`
}
// ListObjectsResults container for list objects response
type ListObjectsResults struct {
Objects map[string]ObjectMetadata `json:"objects"`
CommonPrefixes []string `json:"commonPrefixes"`
IsTruncated bool `json:"isTruncated"`
}

177
pkg/donut/disk/disk.go Normal file
View File

@@ -0,0 +1,177 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or impliedisk.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package disk
import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"syscall"
"github.com/minio/minio/pkg/iodine"
)
// Disk container for disk parameters
type Disk struct {
path string
fsInfo map[string]string
}
// New - instantiate new disk
func New(diskPath string) (Disk, error) {
if diskPath == "" {
return Disk{}, iodine.New(InvalidArgument{}, nil)
}
s := syscall.Statfs_t{}
err := syscall.Statfs(diskPath, &s)
if err != nil {
return Disk{}, iodine.New(err, nil)
}
st, err := os.Stat(diskPath)
if err != nil {
return Disk{}, iodine.New(err, nil)
}
if !st.IsDir() {
return Disk{}, iodine.New(syscall.ENOTDIR, nil)
}
disk := Disk{
path: diskPath,
fsInfo: make(map[string]string),
}
if fsType := getFSType(s.Type); fsType != "UNKNOWN" {
disk.fsInfo["FSType"] = fsType
disk.fsInfo["MountPoint"] = disk.path
return disk, nil
}
return Disk{}, iodine.New(UnsupportedFilesystem{Type: strconv.FormatInt(s.Type, 10)},
map[string]string{"Type": strconv.FormatInt(s.Type, 10)})
}
// GetPath - get root disk path
func (disk Disk) GetPath() string {
return disk.path
}
// GetFSInfo - get disk filesystem and its usage information
func (disk Disk) GetFSInfo() map[string]string {
s := syscall.Statfs_t{}
err := syscall.Statfs(disk.path, &s)
if err != nil {
return nil
}
disk.fsInfo["Total"] = formatBytes(s.Bsize * int64(s.Blocks))
disk.fsInfo["Free"] = formatBytes(s.Bsize * int64(s.Bfree))
disk.fsInfo["TotalB"] = strconv.FormatInt(s.Bsize*int64(s.Blocks), 10)
disk.fsInfo["FreeB"] = strconv.FormatInt(s.Bsize*int64(s.Bfree), 10)
return disk.fsInfo
}
// MakeDir - make a directory inside disk root path
func (disk Disk) MakeDir(dirname string) error {
return os.MkdirAll(filepath.Join(disk.path, dirname), 0700)
}
// ListDir - list a directory inside disk root path, get only directories
func (disk Disk) ListDir(dirname string) ([]os.FileInfo, error) {
dir, err := os.Open(filepath.Join(disk.path, dirname))
if err != nil {
return nil, iodine.New(err, nil)
}
defer dir.Close()
contents, err := dir.Readdir(-1)
if err != nil {
return nil, iodine.New(err, nil)
}
var directories []os.FileInfo
for _, content := range contents {
// Include only directories, ignore everything else
if content.IsDir() {
directories = append(directories, content)
}
}
return directories, nil
}
// ListFiles - list a directory inside disk root path, get only files
func (disk Disk) ListFiles(dirname string) ([]os.FileInfo, error) {
dir, err := os.Open(filepath.Join(disk.path, dirname))
if err != nil {
return nil, iodine.New(err, nil)
}
defer dir.Close()
contents, err := dir.Readdir(-1)
if err != nil {
return nil, iodine.New(err, nil)
}
var files []os.FileInfo
for _, content := range contents {
// Include only regular files, ignore everything else
if content.Mode().IsRegular() {
files = append(files, content)
}
}
return files, nil
}
// CreateFile - create a file inside disk root path
func (disk Disk) CreateFile(filename string) (*os.File, error) {
if filename == "" {
return nil, iodine.New(InvalidArgument{}, nil)
}
filePath := filepath.Join(disk.path, filename)
// Create directories if they don't exist
if err := os.MkdirAll(filepath.Dir(filePath), 0700); err != nil {
return nil, iodine.New(err, nil)
}
dataFile, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return nil, iodine.New(err, nil)
}
return dataFile, nil
}
// OpenFile - read a file inside disk root path
func (disk Disk) OpenFile(filename string) (*os.File, error) {
if filename == "" {
return nil, iodine.New(InvalidArgument{}, nil)
}
dataFile, err := os.Open(filepath.Join(disk.path, filename))
if err != nil {
return nil, iodine.New(err, nil)
}
return dataFile, nil
}
// formatBytes - Convert bytes to human readable string. Like a 2 MB, 64.2 KB, 52 B
func formatBytes(i int64) (result string) {
switch {
case i > (1024 * 1024 * 1024 * 1024):
result = fmt.Sprintf("%.02f TB", float64(i)/1024/1024/1024/1024)
case i > (1024 * 1024 * 1024):
result = fmt.Sprintf("%.02f GB", float64(i)/1024/1024/1024)
case i > (1024 * 1024):
result = fmt.Sprintf("%.02f MB", float64(i)/1024/1024)
case i > 1024:
result = fmt.Sprintf("%.02f KB", float64(i)/1024)
default:
result = fmt.Sprintf("%d B", i)
}
result = strings.Trim(result, " ")
return
}

View File

@@ -0,0 +1,34 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package disk
import "strconv"
// fsType2StrinMap - list of filesystems supported by donut
var fsType2StringMap = map[string]string{
"11": "HFS",
}
// getFSType - get filesystem type
func getFSType(fsType uint32) string {
fsTypeHex := strconv.FormatUint(uint64(fsType), 16)
fsTypeString, ok := fsType2StringMap[fsTypeHex]
if ok == false {
return "UNKNOWN"
}
return fsTypeString
}

View File

@@ -0,0 +1,44 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package disk
import "strconv"
// fsType2StringMap - list of filesystems supported by donut on linux
var fsType2StringMap = map[string]string{
"1021994": "TMPFS",
"137d": "EXT",
"4244": "HFS",
"4d44": "MSDOS",
"52654973": "REISERFS",
"5346544e": "NTFS",
"58465342": "XFS",
"61756673": "AUFS",
"6969": "NFS",
"ef51": "EXT2OLD",
"ef53": "EXT4",
}
// getFSType - get filesystem type
func getFSType(fsType int64) string {
fsTypeHex := strconv.FormatInt(fsType, 16)
fsTypeString, ok := fsType2StringMap[fsTypeHex]
if ok == false {
return "UNKNOWN"
}
return fsTypeString
}

View File

@@ -0,0 +1,78 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or impliedisk.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package disk
import (
"io/ioutil"
"os"
"path/filepath"
"testing"
. "github.com/minio/check"
)
func TestDisk(t *testing.T) { TestingT(t) }
type MyDiskSuite struct {
path string
disk Disk
}
var _ = Suite(&MyDiskSuite{})
func (s *MyDiskSuite) SetUpSuite(c *C) {
path, err := ioutil.TempDir(os.TempDir(), "disk-")
c.Assert(err, IsNil)
s.path = path
d, err := New(s.path)
c.Assert(err, IsNil)
s.disk = d
}
func (s *MyDiskSuite) TearDownSuite(c *C) {
os.RemoveAll(s.path)
}
func (s *MyDiskSuite) TestDiskInfo(c *C) {
c.Assert(s.path, Equals, s.disk.GetPath())
fsInfo := s.disk.GetFSInfo()
c.Assert(fsInfo["MountPoint"], Equals, s.disk.GetPath())
c.Assert(fsInfo["FSType"], Not(Equals), "UNKNOWN")
}
func (s *MyDiskSuite) TestDiskCreateDir(c *C) {
c.Assert(s.disk.MakeDir("hello"), IsNil)
}
func (s *MyDiskSuite) TestDiskCreateFile(c *C) {
f, err := s.disk.CreateFile("hello1")
c.Assert(err, IsNil)
c.Assert(f.Name(), Equals, filepath.Join(s.path, "hello1"))
defer f.Close()
}
func (s *MyDiskSuite) TestDiskOpenFile(c *C) {
f, err := s.disk.CreateFile("hello2")
c.Assert(err, IsNil)
c.Assert(f.Name(), Equals, filepath.Join(s.path, "hello2"))
defer f.Close()
f, err = s.disk.OpenFile("hello2")
c.Assert(err, IsNil)
c.Assert(f.Name(), Equals, filepath.Join(s.path, "hello2"))
defer f.Close()
}

33
pkg/donut/disk/errors.go Normal file
View File

@@ -0,0 +1,33 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or impliedisk.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package disk
// InvalidArgument invalid argument
type InvalidArgument struct{}
func (e InvalidArgument) Error() string {
return "Invalid argument"
}
// UnsupportedFilesystem unsupported filesystem type
type UnsupportedFilesystem struct {
Type string
}
func (e UnsupportedFilesystem) Error() string {
return "Unsupported filesystem: " + e.Type
}

375
pkg/donut/donut-v1.go Normal file
View File

@@ -0,0 +1,375 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package donut
import (
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"github.com/minio/minio/pkg/iodine"
)
// config files used inside Donut
const (
// donut system config
donutConfig = "donutConfig.json"
// bucket, object metadata
bucketMetadataConfig = "bucketMetadata.json"
objectMetadataConfig = "objectMetadata.json"
// versions
objectMetadataVersion = "1.0.0"
bucketMetadataVersion = "1.0.0"
)
// makeBucket - make a new bucket
func (donut API) makeBucket(bucket string, acl BucketACL) error {
donut.lock.Lock()
defer donut.lock.Unlock()
if bucket == "" || strings.TrimSpace(bucket) == "" {
return iodine.New(InvalidArgument{}, nil)
}
return donut.makeDonutBucket(bucket, acl.String())
}
// getBucketMetadata - get bucket metadata
func (donut API) getBucketMetadata(bucketName string) (BucketMetadata, error) {
donut.lock.RLock()
defer donut.lock.RUnlock()
if err := donut.listDonutBuckets(); err != nil {
return BucketMetadata{}, iodine.New(err, nil)
}
if _, ok := donut.buckets[bucketName]; !ok {
return BucketMetadata{}, iodine.New(BucketNotFound{Bucket: bucketName}, nil)
}
metadata, err := donut.getDonutBucketMetadata()
if err != nil {
return BucketMetadata{}, iodine.New(err, nil)
}
return metadata.Buckets[bucketName], nil
}
// setBucketMetadata - set bucket metadata
func (donut API) setBucketMetadata(bucketName string, bucketMetadata map[string]string) error {
donut.lock.Lock()
defer donut.lock.Unlock()
if err := donut.listDonutBuckets(); err != nil {
return iodine.New(err, nil)
}
metadata, err := donut.getDonutBucketMetadata()
if err != nil {
return iodine.New(err, nil)
}
oldBucketMetadata := metadata.Buckets[bucketName]
acl, ok := bucketMetadata["acl"]
if !ok {
return iodine.New(InvalidArgument{}, nil)
}
oldBucketMetadata.ACL = BucketACL(acl)
metadata.Buckets[bucketName] = oldBucketMetadata
return donut.setDonutBucketMetadata(metadata)
}
// listBuckets - return list of buckets
func (donut API) listBuckets() (map[string]BucketMetadata, error) {
donut.lock.RLock()
defer donut.lock.RUnlock()
if err := donut.listDonutBuckets(); err != nil {
return nil, iodine.New(err, nil)
}
metadata, err := donut.getDonutBucketMetadata()
if err != nil {
// intentionally left out the error when Donut is empty
// but we need to revisit this area in future - since we need
// to figure out between acceptable and unacceptable errors
return make(map[string]BucketMetadata), nil
}
return metadata.Buckets, nil
}
// listObjects - return list of objects
func (donut API) listObjects(bucket, prefix, marker, delimiter string, maxkeys int) (ListObjectsResults, error) {
donut.lock.RLock()
defer donut.lock.RUnlock()
errParams := map[string]string{
"bucket": bucket,
"prefix": prefix,
"marker": marker,
"delimiter": delimiter,
"maxkeys": strconv.Itoa(maxkeys),
}
if err := donut.listDonutBuckets(); err != nil {
return ListObjectsResults{}, iodine.New(err, errParams)
}
if _, ok := donut.buckets[bucket]; !ok {
return ListObjectsResults{}, iodine.New(BucketNotFound{Bucket: bucket}, errParams)
}
listObjects, err := donut.buckets[bucket].ListObjects(prefix, marker, delimiter, maxkeys)
if err != nil {
return ListObjectsResults{}, iodine.New(err, errParams)
}
return listObjects, nil
}
// putObject - put object
func (donut API) putObject(bucket, object, expectedMD5Sum string, reader io.Reader, metadata map[string]string) (ObjectMetadata, error) {
donut.lock.Lock()
defer donut.lock.Unlock()
errParams := map[string]string{
"bucket": bucket,
"object": object,
}
if bucket == "" || strings.TrimSpace(bucket) == "" {
return ObjectMetadata{}, iodine.New(InvalidArgument{}, errParams)
}
if object == "" || strings.TrimSpace(object) == "" {
return ObjectMetadata{}, iodine.New(InvalidArgument{}, errParams)
}
if err := donut.listDonutBuckets(); err != nil {
return ObjectMetadata{}, iodine.New(err, errParams)
}
if _, ok := donut.buckets[bucket]; !ok {
return ObjectMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
bucketMeta, err := donut.getDonutBucketMetadata()
if err != nil {
return ObjectMetadata{}, iodine.New(err, errParams)
}
if _, ok := bucketMeta.Buckets[bucket].BucketObjects[object]; ok {
return ObjectMetadata{}, iodine.New(ObjectExists{Object: object}, errParams)
}
objMetadata, err := donut.buckets[bucket].WriteObject(object, reader, expectedMD5Sum, metadata)
if err != nil {
return ObjectMetadata{}, iodine.New(err, errParams)
}
bucketMeta.Buckets[bucket].BucketObjects[object] = 1
if err := donut.setDonutBucketMetadata(bucketMeta); err != nil {
return ObjectMetadata{}, iodine.New(err, errParams)
}
return objMetadata, nil
}
// getObject - get object
func (donut API) getObject(bucket, object string) (reader io.ReadCloser, size int64, err error) {
donut.lock.RLock()
defer donut.lock.RUnlock()
errParams := map[string]string{
"bucket": bucket,
"object": object,
}
if bucket == "" || strings.TrimSpace(bucket) == "" {
return nil, 0, iodine.New(InvalidArgument{}, errParams)
}
if object == "" || strings.TrimSpace(object) == "" {
return nil, 0, iodine.New(InvalidArgument{}, errParams)
}
if err := donut.listDonutBuckets(); err != nil {
return nil, 0, iodine.New(err, nil)
}
if _, ok := donut.buckets[bucket]; !ok {
return nil, 0, iodine.New(BucketNotFound{Bucket: bucket}, errParams)
}
return donut.buckets[bucket].ReadObject(object)
}
// getObjectMetadata - get object metadata
func (donut API) getObjectMetadata(bucket, object string) (ObjectMetadata, error) {
donut.lock.RLock()
defer donut.lock.RUnlock()
errParams := map[string]string{
"bucket": bucket,
"object": object,
}
if err := donut.listDonutBuckets(); err != nil {
return ObjectMetadata{}, iodine.New(err, errParams)
}
if _, ok := donut.buckets[bucket]; !ok {
return ObjectMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, errParams)
}
bucketMeta, err := donut.getDonutBucketMetadata()
if err != nil {
return ObjectMetadata{}, iodine.New(err, errParams)
}
if _, ok := bucketMeta.Buckets[bucket].BucketObjects[object]; !ok {
return ObjectMetadata{}, iodine.New(ObjectNotFound{Object: object}, errParams)
}
objectMetadata, err := donut.buckets[bucket].GetObjectMetadata(object)
if err != nil {
return ObjectMetadata{}, iodine.New(err, nil)
}
return objectMetadata, nil
}
// getDiskWriters -
func (donut API) getBucketMetadataWriters() ([]io.WriteCloser, error) {
var writers []io.WriteCloser
for _, node := range donut.nodes {
disks, err := node.ListDisks()
if err != nil {
return nil, iodine.New(err, nil)
}
writers = make([]io.WriteCloser, len(disks))
for order, d := range disks {
bucketMetaDataWriter, err := d.CreateFile(filepath.Join(donut.config.DonutName, bucketMetadataConfig))
if err != nil {
return nil, iodine.New(err, nil)
}
writers[order] = bucketMetaDataWriter
}
}
return writers, nil
}
func (donut API) getBucketMetadataReaders() ([]io.ReadCloser, error) {
var readers []io.ReadCloser
for _, node := range donut.nodes {
disks, err := node.ListDisks()
if err != nil {
return nil, iodine.New(err, nil)
}
readers = make([]io.ReadCloser, len(disks))
for order, d := range disks {
bucketMetaDataReader, err := d.OpenFile(filepath.Join(donut.config.DonutName, bucketMetadataConfig))
if err != nil {
return nil, iodine.New(err, nil)
}
readers[order] = bucketMetaDataReader
}
}
return readers, nil
}
//
func (donut API) setDonutBucketMetadata(metadata *AllBuckets) error {
writers, err := donut.getBucketMetadataWriters()
if err != nil {
return iodine.New(err, nil)
}
for _, writer := range writers {
defer writer.Close()
}
for _, writer := range writers {
jenc := json.NewEncoder(writer)
if err := jenc.Encode(metadata); err != nil {
return iodine.New(err, nil)
}
}
return nil
}
func (donut API) getDonutBucketMetadata() (*AllBuckets, error) {
metadata := new(AllBuckets)
readers, err := donut.getBucketMetadataReaders()
if err != nil {
return nil, iodine.New(err, nil)
}
for _, reader := range readers {
defer reader.Close()
}
for _, reader := range readers {
jenc := json.NewDecoder(reader)
if err := jenc.Decode(metadata); err != nil {
return nil, iodine.New(err, nil)
}
return metadata, nil
}
return nil, iodine.New(InvalidArgument{}, nil)
}
func (donut API) makeDonutBucket(bucketName, acl string) error {
if err := donut.listDonutBuckets(); err != nil {
return iodine.New(err, nil)
}
if _, ok := donut.buckets[bucketName]; ok {
return iodine.New(BucketExists{Bucket: bucketName}, nil)
}
bucket, bucketMetadata, err := newBucket(bucketName, acl, donut.config.DonutName, donut.nodes)
if err != nil {
return iodine.New(err, nil)
}
nodeNumber := 0
donut.buckets[bucketName] = bucket
for _, node := range donut.nodes {
disks, err := node.ListDisks()
if err != nil {
return iodine.New(err, nil)
}
for order, disk := range disks {
bucketSlice := fmt.Sprintf("%s$%d$%d", bucketName, nodeNumber, order)
err := disk.MakeDir(filepath.Join(donut.config.DonutName, bucketSlice))
if err != nil {
return iodine.New(err, nil)
}
}
nodeNumber = nodeNumber + 1
}
metadata, err := donut.getDonutBucketMetadata()
if err != nil {
if os.IsNotExist(iodine.ToError(err)) {
metadata := new(AllBuckets)
metadata.Buckets = make(map[string]BucketMetadata)
metadata.Buckets[bucketName] = bucketMetadata
err = donut.setDonutBucketMetadata(metadata)
if err != nil {
return iodine.New(err, nil)
}
return nil
}
return iodine.New(err, nil)
}
metadata.Buckets[bucketName] = bucketMetadata
err = donut.setDonutBucketMetadata(metadata)
if err != nil {
return iodine.New(err, nil)
}
return nil
}
func (donut API) listDonutBuckets() error {
for _, node := range donut.nodes {
disks, err := node.ListDisks()
if err != nil {
return iodine.New(err, nil)
}
for _, disk := range disks {
dirs, err := disk.ListDir(donut.config.DonutName)
if err != nil {
return iodine.New(err, nil)
}
for _, dir := range dirs {
splitDir := strings.Split(dir.Name(), "$")
if len(splitDir) < 3 {
return iodine.New(CorruptedBackend{Backend: dir.Name()}, nil)
}
bucketName := splitDir[0]
// we dont need this once we cache from makeDonutBucket()
bucket, _, err := newBucket(bucketName, "private", donut.config.DonutName, donut.nodes)
if err != nil {
return iodine.New(err, nil)
}
donut.buckets[bucketName] = bucket
}
}
}
return nil
}

288
pkg/donut/donut-v1_test.go Normal file
View File

@@ -0,0 +1,288 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or impliedd.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package donut
import (
"bytes"
"crypto/md5"
"encoding/base64"
"encoding/hex"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"testing"
"time"
. "github.com/minio/check"
)
func TestDonut(t *testing.T) { TestingT(t) }
type MyDonutSuite struct {
root string
}
var _ = Suite(&MyDonutSuite{})
// create a dummy TestNodeDiskMap
func createTestNodeDiskMap(p string) map[string][]string {
nodes := make(map[string][]string)
nodes["localhost"] = make([]string, 16)
for i := 0; i < len(nodes["localhost"]); i++ {
diskPath := filepath.Join(p, strconv.Itoa(i))
if _, err := os.Stat(diskPath); err != nil {
if os.IsNotExist(err) {
os.MkdirAll(diskPath, 0700)
}
}
nodes["localhost"][i] = diskPath
}
return nodes
}
var dd Interface
func (s *MyDonutSuite) SetUpSuite(c *C) {
root, err := ioutil.TempDir(os.TempDir(), "donut-")
c.Assert(err, IsNil)
s.root = root
conf := new(Config)
conf.DonutName = "test"
conf.NodeDiskMap = createTestNodeDiskMap(root)
conf.Expiration = time.Duration(1 * time.Hour)
conf.MaxSize = 100000
dd, err = New(conf)
c.Assert(err, IsNil)
// testing empty donut
buckets, err := dd.ListBuckets()
c.Assert(err, IsNil)
c.Assert(len(buckets), Equals, 0)
}
func (s *MyDonutSuite) TearDownSuite(c *C) {
os.RemoveAll(s.root)
}
// test make bucket without name
func (s *MyDonutSuite) TestBucketWithoutNameFails(c *C) {
// fail to create new bucket without a name
err := dd.MakeBucket("", "private")
c.Assert(err, Not(IsNil))
err = dd.MakeBucket(" ", "private")
c.Assert(err, Not(IsNil))
}
// test empty bucket
func (s *MyDonutSuite) TestEmptyBucket(c *C) {
c.Assert(dd.MakeBucket("foo1", "private"), IsNil)
// check if bucket is empty
var resources BucketResourcesMetadata
resources.Maxkeys = 1
objectsMetadata, resources, err := dd.ListObjects("foo1", resources)
c.Assert(err, IsNil)
c.Assert(len(objectsMetadata), Equals, 0)
c.Assert(resources.CommonPrefixes, DeepEquals, []string{})
c.Assert(resources.IsTruncated, Equals, false)
}
// test bucket list
func (s *MyDonutSuite) TestMakeBucketAndList(c *C) {
// create bucket
err := dd.MakeBucket("foo2", "private")
c.Assert(err, IsNil)
// check bucket exists
buckets, err := dd.ListBuckets()
c.Assert(err, IsNil)
c.Assert(len(buckets), Equals, 5)
c.Assert(buckets[0].ACL, Equals, BucketACL("private"))
}
// test re-create bucket
func (s *MyDonutSuite) TestMakeBucketWithSameNameFails(c *C) {
err := dd.MakeBucket("foo3", "private")
c.Assert(err, IsNil)
err = dd.MakeBucket("foo3", "private")
c.Assert(err, Not(IsNil))
}
// test make multiple buckets
func (s *MyDonutSuite) TestCreateMultipleBucketsAndList(c *C) {
// add a second bucket
err := dd.MakeBucket("foo4", "private")
c.Assert(err, IsNil)
err = dd.MakeBucket("bar1", "private")
c.Assert(err, IsNil)
buckets, err := dd.ListBuckets()
c.Assert(err, IsNil)
c.Assert(len(buckets), Equals, 2)
c.Assert(buckets[0].Name, Equals, "bar1")
c.Assert(buckets[1].Name, Equals, "foo4")
err = dd.MakeBucket("foobar1", "private")
c.Assert(err, IsNil)
buckets, err = dd.ListBuckets()
c.Assert(err, IsNil)
c.Assert(len(buckets), Equals, 3)
c.Assert(buckets[2].Name, Equals, "foobar1")
}
// test object create without bucket
func (s *MyDonutSuite) TestNewObjectFailsWithoutBucket(c *C) {
_, err := dd.CreateObject("unknown", "obj", "", 0, nil, nil)
c.Assert(err, Not(IsNil))
}
// test create object metadata
func (s *MyDonutSuite) TestNewObjectMetadata(c *C) {
data := "Hello World"
hasher := md5.New()
hasher.Write([]byte(data))
expectedMd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
reader := ioutil.NopCloser(bytes.NewReader([]byte(data)))
err := dd.MakeBucket("foo6", "private")
c.Assert(err, IsNil)
objectMetadata, err := dd.CreateObject("foo6", "obj", expectedMd5Sum, int64(len(data)), reader, map[string]string{"contentType": "application/json"})
c.Assert(err, IsNil)
c.Assert(objectMetadata.MD5Sum, Equals, hex.EncodeToString(hasher.Sum(nil)))
c.Assert(objectMetadata.Metadata["contentType"], Equals, "application/json")
}
// test create object fails without name
func (s *MyDonutSuite) TestNewObjectFailsWithEmptyName(c *C) {
_, err := dd.CreateObject("foo", "", "", 0, nil, nil)
c.Assert(err, Not(IsNil))
}
// test create object
func (s *MyDonutSuite) TestNewObjectCanBeWritten(c *C) {
err := dd.MakeBucket("foo", "private")
c.Assert(err, IsNil)
data := "Hello World"
hasher := md5.New()
hasher.Write([]byte(data))
expectedMd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
reader := ioutil.NopCloser(bytes.NewReader([]byte(data)))
actualMetadata, err := dd.CreateObject("foo", "obj", expectedMd5Sum, int64(len(data)), reader, map[string]string{"contentType": "application/octet-stream"})
c.Assert(err, IsNil)
c.Assert(actualMetadata.MD5Sum, Equals, hex.EncodeToString(hasher.Sum(nil)))
var buffer bytes.Buffer
size, err := dd.GetObject(&buffer, "foo", "obj")
c.Assert(err, IsNil)
c.Assert(size, Equals, int64(len(data)))
c.Assert(buffer.Bytes(), DeepEquals, []byte(data))
actualMetadata, err = dd.GetObjectMetadata("foo", "obj")
c.Assert(err, IsNil)
c.Assert(hex.EncodeToString(hasher.Sum(nil)), Equals, actualMetadata.MD5Sum)
c.Assert(int64(len(data)), Equals, actualMetadata.Size)
}
// test list objects
func (s *MyDonutSuite) TestMultipleNewObjects(c *C) {
c.Assert(dd.MakeBucket("foo5", "private"), IsNil)
one := ioutil.NopCloser(bytes.NewReader([]byte("one")))
_, err := dd.CreateObject("foo5", "obj1", "", int64(len("one")), one, nil)
c.Assert(err, IsNil)
two := ioutil.NopCloser(bytes.NewReader([]byte("two")))
_, err = dd.CreateObject("foo5", "obj2", "", int64(len("two")), two, nil)
c.Assert(err, IsNil)
var buffer1 bytes.Buffer
size, err := dd.GetObject(&buffer1, "foo5", "obj1")
c.Assert(err, IsNil)
c.Assert(size, Equals, int64(len([]byte("one"))))
c.Assert(buffer1.Bytes(), DeepEquals, []byte("one"))
var buffer2 bytes.Buffer
size, err = dd.GetObject(&buffer2, "foo5", "obj2")
c.Assert(err, IsNil)
c.Assert(size, Equals, int64(len([]byte("two"))))
c.Assert(buffer2.Bytes(), DeepEquals, []byte("two"))
/// test list of objects
// test list objects with prefix and delimiter
var resources BucketResourcesMetadata
resources.Prefix = "o"
resources.Delimiter = "1"
resources.Maxkeys = 10
objectsMetadata, resources, err := dd.ListObjects("foo5", resources)
c.Assert(err, IsNil)
c.Assert(resources.IsTruncated, Equals, false)
c.Assert(resources.CommonPrefixes[0], Equals, "obj1")
// test list objects with only delimiter
resources.Prefix = ""
resources.Delimiter = "1"
resources.Maxkeys = 10
objectsMetadata, resources, err = dd.ListObjects("foo5", resources)
c.Assert(err, IsNil)
c.Assert(objectsMetadata[0].Object, Equals, "obj2")
c.Assert(resources.IsTruncated, Equals, false)
c.Assert(resources.CommonPrefixes[0], Equals, "obj1")
// test list objects with only prefix
resources.Prefix = "o"
resources.Delimiter = ""
resources.Maxkeys = 10
objectsMetadata, resources, err = dd.ListObjects("foo5", resources)
c.Assert(err, IsNil)
c.Assert(resources.IsTruncated, Equals, false)
c.Assert(objectsMetadata[0].Object, Equals, "obj1")
c.Assert(objectsMetadata[1].Object, Equals, "obj2")
three := ioutil.NopCloser(bytes.NewReader([]byte("three")))
_, err = dd.CreateObject("foo5", "obj3", "", int64(len("three")), three, nil)
c.Assert(err, IsNil)
var buffer bytes.Buffer
size, err = dd.GetObject(&buffer, "foo5", "obj3")
c.Assert(err, IsNil)
c.Assert(size, Equals, int64(len([]byte("three"))))
c.Assert(buffer.Bytes(), DeepEquals, []byte("three"))
// test list objects with maxkeys
resources.Prefix = "o"
resources.Delimiter = ""
resources.Maxkeys = 2
objectsMetadata, resources, err = dd.ListObjects("foo5", resources)
c.Assert(err, IsNil)
c.Assert(resources.IsTruncated, Equals, true)
c.Assert(len(objectsMetadata), Equals, 2)
}

638
pkg/donut/donut-v2.go Normal file
View File

@@ -0,0 +1,638 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package donut
import (
"bytes"
"crypto/md5"
"encoding/base64"
"encoding/hex"
"errors"
"io"
"io/ioutil"
"log"
"runtime/debug"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/minio/minio/pkg/donut/trove"
"github.com/minio/minio/pkg/iodine"
"github.com/minio/minio/pkg/quick"
)
// total Number of buckets allowed
const (
totalBuckets = 100
)
// Config donut config
type Config struct {
Version string `json:"version"`
MaxSize uint64 `json:"max-size"`
Expiration time.Duration `json:"expiration"`
DonutName string `json:"donut-name"`
NodeDiskMap map[string][]string `json:"node-disk-map"`
}
// API - local variables
type API struct {
config *Config
lock *sync.RWMutex
objects *trove.Cache
multiPartObjects *trove.Cache
storedBuckets map[string]storedBucket
nodes map[string]node
buckets map[string]bucket
}
// storedBucket saved bucket
type storedBucket struct {
bucketMetadata BucketMetadata
objectMetadata map[string]ObjectMetadata
partMetadata map[string]PartMetadata
multiPartSession map[string]multiPartSession
}
// multiPartSession multipart session
type multiPartSession struct {
totalParts int
uploadID string
initiated time.Time
}
// New instantiate a new donut
func New(c *Config) (Interface, error) {
if err := quick.CheckData(c); err != nil {
return nil, iodine.New(err, nil)
}
a := API{config: c}
a.storedBuckets = make(map[string]storedBucket)
a.nodes = make(map[string]node)
a.buckets = make(map[string]bucket)
a.objects = trove.NewCache(a.config.MaxSize, a.config.Expiration)
a.multiPartObjects = trove.NewCache(0, time.Duration(0))
a.objects.OnExpired = a.expiredObject
a.multiPartObjects.OnExpired = a.expiredPart
a.lock = new(sync.RWMutex)
// set up cache expiration
a.objects.ExpireObjects(time.Second * 5)
if len(a.config.NodeDiskMap) > 0 {
for k, v := range a.config.NodeDiskMap {
if len(v) == 0 {
return nil, iodine.New(InvalidDisksArgument{}, nil)
}
err := a.AttachNode(k, v)
if err != nil {
return nil, iodine.New(err, nil)
}
}
/// Initialization, populate all buckets into memory
buckets, err := a.listBuckets()
if err != nil {
return nil, iodine.New(err, nil)
}
for k, v := range buckets {
storedBucket := a.storedBuckets[k]
storedBucket.bucketMetadata = v
a.storedBuckets[k] = storedBucket
}
}
return a, nil
}
// GetObject - GET object from cache buffer
func (donut API) GetObject(w io.Writer, bucket string, object string) (int64, error) {
donut.lock.RLock()
if !IsValidBucket(bucket) {
donut.lock.RUnlock()
return 0, iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
}
if !IsValidObjectName(object) {
donut.lock.RUnlock()
return 0, iodine.New(ObjectNameInvalid{Object: object}, nil)
}
if _, ok := donut.storedBuckets[bucket]; ok == false {
donut.lock.RUnlock()
return 0, iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
objectKey := bucket + "/" + object
data, ok := donut.objects.Get(objectKey)
if !ok {
if len(donut.config.NodeDiskMap) > 0 {
reader, size, err := donut.getObject(bucket, object)
if err != nil {
donut.lock.RUnlock()
return 0, iodine.New(err, nil)
}
// new proxy writer to capture data read from disk
pw := NewProxyWriter(w)
written, err := io.CopyN(pw, reader, size)
if err != nil {
donut.lock.RUnlock()
return 0, iodine.New(err, nil)
}
donut.lock.RUnlock()
/// cache object read from disk
{
donut.lock.Lock()
ok := donut.objects.Set(objectKey, pw.writtenBytes)
donut.lock.Unlock()
pw.writtenBytes = nil
go debug.FreeOSMemory()
if !ok {
return 0, iodine.New(InternalError{}, nil)
}
}
return written, nil
}
donut.lock.RUnlock()
return 0, iodine.New(ObjectNotFound{Object: object}, nil)
}
written, err := io.CopyN(w, bytes.NewBuffer(data), int64(donut.objects.Len(objectKey)))
if err != nil {
return 0, iodine.New(err, nil)
}
donut.lock.RUnlock()
return written, nil
}
// GetPartialObject - GET object from cache buffer range
func (donut API) GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error) {
errParams := map[string]string{
"bucket": bucket,
"object": object,
"start": strconv.FormatInt(start, 10),
"length": strconv.FormatInt(length, 10),
}
donut.lock.RLock()
if !IsValidBucket(bucket) {
donut.lock.RUnlock()
return 0, iodine.New(BucketNameInvalid{Bucket: bucket}, errParams)
}
if !IsValidObjectName(object) {
donut.lock.RUnlock()
return 0, iodine.New(ObjectNameInvalid{Object: object}, errParams)
}
if start < 0 {
donut.lock.RUnlock()
return 0, iodine.New(InvalidRange{
Start: start,
Length: length,
}, errParams)
}
objectKey := bucket + "/" + object
data, ok := donut.objects.Get(objectKey)
if !ok {
if len(donut.config.NodeDiskMap) > 0 {
reader, _, err := donut.getObject(bucket, object)
if err != nil {
donut.lock.RUnlock()
return 0, iodine.New(err, nil)
}
if _, err := io.CopyN(ioutil.Discard, reader, start); err != nil {
donut.lock.RUnlock()
return 0, iodine.New(err, nil)
}
pw := NewProxyWriter(w)
written, err := io.CopyN(w, reader, length)
if err != nil {
donut.lock.RUnlock()
return 0, iodine.New(err, nil)
}
donut.lock.RUnlock()
{
donut.lock.Lock()
ok := donut.objects.Set(objectKey, pw.writtenBytes)
donut.lock.Unlock()
pw.writtenBytes = nil
go debug.FreeOSMemory()
if !ok {
return 0, iodine.New(InternalError{}, nil)
}
}
return written, nil
}
donut.lock.RUnlock()
return 0, iodine.New(ObjectNotFound{Object: object}, nil)
}
written, err := io.CopyN(w, bytes.NewBuffer(data[start:]), length)
if err != nil {
return 0, iodine.New(err, nil)
}
donut.lock.RUnlock()
return written, nil
}
// GetBucketMetadata -
func (donut API) GetBucketMetadata(bucket string) (BucketMetadata, error) {
donut.lock.RLock()
if !IsValidBucket(bucket) {
donut.lock.RUnlock()
return BucketMetadata{}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
}
if _, ok := donut.storedBuckets[bucket]; ok == false {
if len(donut.config.NodeDiskMap) > 0 {
bucketMetadata, err := donut.getBucketMetadata(bucket)
if err != nil {
donut.lock.RUnlock()
return BucketMetadata{}, iodine.New(err, nil)
}
storedBucket := donut.storedBuckets[bucket]
donut.lock.RUnlock()
{
donut.lock.Lock()
storedBucket.bucketMetadata = bucketMetadata
donut.storedBuckets[bucket] = storedBucket
donut.lock.Unlock()
}
}
return BucketMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
donut.lock.RUnlock()
return donut.storedBuckets[bucket].bucketMetadata, nil
}
// SetBucketMetadata -
func (donut API) SetBucketMetadata(bucket string, metadata map[string]string) error {
donut.lock.RLock()
if !IsValidBucket(bucket) {
donut.lock.RUnlock()
return iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
}
if _, ok := donut.storedBuckets[bucket]; ok == false {
donut.lock.RUnlock()
return iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
donut.lock.RUnlock()
donut.lock.Lock()
{
if len(donut.config.NodeDiskMap) > 0 {
if err := donut.setBucketMetadata(bucket, metadata); err != nil {
return iodine.New(err, nil)
}
}
storedBucket := donut.storedBuckets[bucket]
storedBucket.bucketMetadata.ACL = BucketACL(metadata["acl"])
donut.storedBuckets[bucket] = storedBucket
}
donut.lock.Unlock()
return nil
}
// isMD5SumEqual - returns error if md5sum mismatches, success its `nil`
func isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error {
if strings.TrimSpace(expectedMD5Sum) != "" && strings.TrimSpace(actualMD5Sum) != "" {
expectedMD5SumBytes, err := hex.DecodeString(expectedMD5Sum)
if err != nil {
return iodine.New(err, nil)
}
actualMD5SumBytes, err := hex.DecodeString(actualMD5Sum)
if err != nil {
return iodine.New(err, nil)
}
if !bytes.Equal(expectedMD5SumBytes, actualMD5SumBytes) {
return iodine.New(errors.New("bad digest, md5sum mismatch"), nil)
}
return nil
}
return iodine.New(errors.New("invalid argument"), nil)
}
// CreateObject -
func (donut API) CreateObject(bucket, key, expectedMD5Sum string, size int64, data io.Reader, metadata map[string]string) (ObjectMetadata, error) {
if size > int64(donut.config.MaxSize) {
generic := GenericObjectError{Bucket: bucket, Object: key}
return ObjectMetadata{}, iodine.New(EntityTooLarge{
GenericObjectError: generic,
Size: strconv.FormatInt(size, 10),
MaxSize: strconv.FormatUint(donut.config.MaxSize, 10),
}, nil)
}
contentType := metadata["contentType"]
objectMetadata, err := donut.createObject(bucket, key, contentType, expectedMD5Sum, size, data)
// free
debug.FreeOSMemory()
return objectMetadata, iodine.New(err, nil)
}
// createObject - PUT object to cache buffer
func (donut API) createObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (ObjectMetadata, error) {
donut.lock.RLock()
if !IsValidBucket(bucket) {
donut.lock.RUnlock()
return ObjectMetadata{}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
}
if !IsValidObjectName(key) {
donut.lock.RUnlock()
return ObjectMetadata{}, iodine.New(ObjectNameInvalid{Object: key}, nil)
}
if _, ok := donut.storedBuckets[bucket]; ok == false {
donut.lock.RUnlock()
return ObjectMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := donut.storedBuckets[bucket]
// get object key
objectKey := bucket + "/" + key
if _, ok := storedBucket.objectMetadata[objectKey]; ok == true {
donut.lock.RUnlock()
return ObjectMetadata{}, iodine.New(ObjectExists{Object: key}, nil)
}
donut.lock.RUnlock()
if contentType == "" {
contentType = "application/octet-stream"
}
contentType = strings.TrimSpace(contentType)
if strings.TrimSpace(expectedMD5Sum) != "" {
expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))
if err != nil {
// pro-actively close the connection
return ObjectMetadata{}, iodine.New(InvalidDigest{Md5: expectedMD5Sum}, nil)
}
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
}
if len(donut.config.NodeDiskMap) > 0 {
objMetadata, err := donut.putObject(bucket, key, expectedMD5Sum, data, map[string]string{"contentType": contentType})
if err != nil {
return ObjectMetadata{}, iodine.New(err, nil)
}
donut.lock.Lock()
storedBucket.objectMetadata[objectKey] = objMetadata
donut.storedBuckets[bucket] = storedBucket
donut.lock.Unlock()
return objMetadata, nil
}
// calculate md5
hash := md5.New()
var err error
var totalLength int
for err == nil {
var length int
byteBuffer := make([]byte, 1024*1024)
length, err = data.Read(byteBuffer)
// While hash.Write() wouldn't mind a Nil byteBuffer
// It is necessary for us to verify this and break
if length == 0 {
break
}
hash.Write(byteBuffer[0:length])
//donut.lock.Lock()
ok := donut.objects.Append(objectKey, byteBuffer[0:length])
//donut.lock.Unlock()
if !ok {
return ObjectMetadata{}, iodine.New(InternalError{}, nil)
}
totalLength += length
go debug.FreeOSMemory()
}
if err != io.EOF {
return ObjectMetadata{}, iodine.New(err, nil)
}
md5SumBytes := hash.Sum(nil)
md5Sum := hex.EncodeToString(md5SumBytes)
// Verify if the written object is equal to what is expected, only if it is requested as such
if strings.TrimSpace(expectedMD5Sum) != "" {
if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum); err != nil {
return ObjectMetadata{}, iodine.New(BadDigest{}, nil)
}
}
m := make(map[string]string)
m["contentType"] = contentType
newObject := ObjectMetadata{
Bucket: bucket,
Object: key,
Metadata: m,
Created: time.Now().UTC(),
MD5Sum: md5Sum,
Size: int64(totalLength),
}
//donut.lock.Lock()
storedBucket.objectMetadata[objectKey] = newObject
donut.storedBuckets[bucket] = storedBucket
//donut.lock.Unlock()
return newObject, nil
}
// MakeBucket - create bucket in cache
func (donut API) MakeBucket(bucketName, acl string) error {
donut.lock.RLock()
if len(donut.storedBuckets) == totalBuckets {
donut.lock.RUnlock()
return iodine.New(TooManyBuckets{Bucket: bucketName}, nil)
}
if !IsValidBucket(bucketName) {
donut.lock.RUnlock()
return iodine.New(BucketNameInvalid{Bucket: bucketName}, nil)
}
if !IsValidBucketACL(acl) {
donut.lock.RUnlock()
return iodine.New(InvalidACL{ACL: acl}, nil)
}
if _, ok := donut.storedBuckets[bucketName]; ok == true {
donut.lock.RUnlock()
return iodine.New(BucketExists{Bucket: bucketName}, nil)
}
donut.lock.RUnlock()
if strings.TrimSpace(acl) == "" {
// default is private
acl = "private"
}
if len(donut.config.NodeDiskMap) > 0 {
if err := donut.makeBucket(bucketName, BucketACL(acl)); err != nil {
return iodine.New(err, nil)
}
}
var newBucket = storedBucket{}
newBucket.objectMetadata = make(map[string]ObjectMetadata)
newBucket.multiPartSession = make(map[string]multiPartSession)
newBucket.partMetadata = make(map[string]PartMetadata)
newBucket.bucketMetadata = BucketMetadata{}
newBucket.bucketMetadata.Name = bucketName
newBucket.bucketMetadata.Created = time.Now().UTC()
newBucket.bucketMetadata.ACL = BucketACL(acl)
//donut.lock.Lock()
donut.storedBuckets[bucketName] = newBucket
//donut.lock.Unlock()
return nil
}
// ListObjects - list objects from cache
func (donut API) ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, error) {
donut.lock.RLock()
defer donut.lock.RUnlock()
if !IsValidBucket(bucket) {
return nil, BucketResourcesMetadata{IsTruncated: false}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
}
if !IsValidPrefix(resources.Prefix) {
return nil, BucketResourcesMetadata{IsTruncated: false}, iodine.New(ObjectNameInvalid{Object: resources.Prefix}, nil)
}
if _, ok := donut.storedBuckets[bucket]; ok == false {
return nil, BucketResourcesMetadata{IsTruncated: false}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
var results []ObjectMetadata
var keys []string
if len(donut.config.NodeDiskMap) > 0 {
listObjects, err := donut.listObjects(
bucket,
resources.Prefix,
resources.Marker,
resources.Delimiter,
resources.Maxkeys,
)
if err != nil {
return nil, BucketResourcesMetadata{IsTruncated: false}, iodine.New(err, nil)
}
resources.CommonPrefixes = listObjects.CommonPrefixes
resources.IsTruncated = listObjects.IsTruncated
if resources.IsTruncated && resources.IsDelimiterSet() {
resources.NextMarker = results[len(results)-1].Object
}
for key := range listObjects.Objects {
keys = append(keys, key)
}
sort.Strings(keys)
for _, key := range keys {
results = append(results, listObjects.Objects[key])
}
return results, resources, nil
}
storedBucket := donut.storedBuckets[bucket]
for key := range storedBucket.objectMetadata {
if strings.HasPrefix(key, bucket+"/") {
key = key[len(bucket)+1:]
if strings.HasPrefix(key, resources.Prefix) {
if key > resources.Marker {
keys = append(keys, key)
}
}
}
}
if strings.TrimSpace(resources.Prefix) != "" {
keys = TrimPrefix(keys, resources.Prefix)
}
var prefixes []string
var filteredKeys []string
if strings.TrimSpace(resources.Delimiter) != "" {
filteredKeys = HasNoDelimiter(keys, resources.Delimiter)
prefixes = HasDelimiter(keys, resources.Delimiter)
prefixes = SplitDelimiter(prefixes, resources.Delimiter)
prefixes = SortU(prefixes)
} else {
filteredKeys = keys
}
for _, commonPrefix := range prefixes {
resources.CommonPrefixes = append(resources.CommonPrefixes, resources.Prefix+commonPrefix)
}
filteredKeys = RemoveDuplicates(filteredKeys)
sort.Strings(filteredKeys)
for _, key := range filteredKeys {
if len(results) == resources.Maxkeys {
resources.IsTruncated = true
if resources.IsTruncated && resources.IsDelimiterSet() {
resources.NextMarker = results[len(results)-1].Object
}
return results, resources, nil
}
object := storedBucket.objectMetadata[bucket+"/"+resources.Prefix+key]
results = append(results, object)
}
resources.CommonPrefixes = RemoveDuplicates(resources.CommonPrefixes)
sort.Strings(resources.CommonPrefixes)
return results, resources, nil
}
// byBucketName is a type for sorting bucket metadata by bucket name
type byBucketName []BucketMetadata
func (b byBucketName) Len() int { return len(b) }
func (b byBucketName) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b byBucketName) Less(i, j int) bool { return b[i].Name < b[j].Name }
// ListBuckets - List buckets from cache
func (donut API) ListBuckets() ([]BucketMetadata, error) {
donut.lock.RLock()
defer donut.lock.RUnlock()
var results []BucketMetadata
for _, bucket := range donut.storedBuckets {
results = append(results, bucket.bucketMetadata)
}
sort.Sort(byBucketName(results))
return results, nil
}
// GetObjectMetadata - get object metadata from cache
func (donut API) GetObjectMetadata(bucket, key string) (ObjectMetadata, error) {
donut.lock.RLock()
// check if bucket exists
if !IsValidBucket(bucket) {
donut.lock.RUnlock()
return ObjectMetadata{}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
}
if !IsValidObjectName(key) {
donut.lock.RUnlock()
return ObjectMetadata{}, iodine.New(ObjectNameInvalid{Object: key}, nil)
}
if _, ok := donut.storedBuckets[bucket]; ok == false {
donut.lock.RUnlock()
return ObjectMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := donut.storedBuckets[bucket]
objectKey := bucket + "/" + key
if objMetadata, ok := storedBucket.objectMetadata[objectKey]; ok == true {
donut.lock.RUnlock()
return objMetadata, nil
}
if len(donut.config.NodeDiskMap) > 0 {
objMetadata, err := donut.getObjectMetadata(bucket, key)
donut.lock.RUnlock()
if err != nil {
return ObjectMetadata{}, iodine.New(err, nil)
}
// update
donut.lock.Lock()
storedBucket.objectMetadata[objectKey] = objMetadata
donut.lock.Unlock()
return objMetadata, nil
}
donut.lock.RUnlock()
return ObjectMetadata{}, iodine.New(ObjectNotFound{Object: key}, nil)
}
func (donut API) expiredObject(a ...interface{}) {
cacheStats := donut.objects.Stats()
log.Printf("CurrentSize: %d, CurrentItems: %d, TotalExpirations: %d",
cacheStats.Bytes, cacheStats.Items, cacheStats.Expired)
key := a[0].(string)
// loop through all buckets
for _, storedBucket := range donut.storedBuckets {
delete(storedBucket.objectMetadata, key)
}
debug.FreeOSMemory()
}

261
pkg/donut/donut-v2_test.go Normal file
View File

@@ -0,0 +1,261 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or impliedc.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package donut
import (
"bytes"
"crypto/md5"
"encoding/base64"
"encoding/hex"
"io/ioutil"
"testing"
"time"
. "github.com/minio/check"
)
func TestCache(t *testing.T) { TestingT(t) }
type MyCacheSuite struct{}
var _ = Suite(&MyCacheSuite{})
var dc Interface
func (s *MyCacheSuite) SetUpSuite(c *C) {
// test only cache
conf := new(Config)
conf.DonutName = ""
conf.NodeDiskMap = nil
conf.Expiration = time.Duration(1 * time.Hour)
conf.MaxSize = 100000
var err error
dc, err = New(conf)
c.Assert(err, IsNil)
// testing empty cache
buckets, err := dc.ListBuckets()
c.Assert(err, IsNil)
c.Assert(len(buckets), Equals, 0)
}
// test make bucket without name
func (s *MyCacheSuite) TestBucketWithoutNameFails(c *C) {
// fail to create new bucket without a name
err := dc.MakeBucket("", "private")
c.Assert(err, Not(IsNil))
err = dc.MakeBucket(" ", "private")
c.Assert(err, Not(IsNil))
}
// test empty bucket
func (s *MyCacheSuite) TestEmptyBucket(c *C) {
c.Assert(dc.MakeBucket("foo1", "private"), IsNil)
// check if bucket is empty
var resources BucketResourcesMetadata
resources.Maxkeys = 1
objectsMetadata, resources, err := dc.ListObjects("foo1", resources)
c.Assert(err, IsNil)
c.Assert(len(objectsMetadata), Equals, 0)
c.Assert(resources.CommonPrefixes, DeepEquals, []string{})
c.Assert(resources.IsTruncated, Equals, false)
}
// test bucket list
func (s *MyCacheSuite) TestMakeBucketAndList(c *C) {
// create bucket
err := dc.MakeBucket("foo2", "private")
c.Assert(err, IsNil)
// check bucket exists
buckets, err := dc.ListBuckets()
c.Assert(err, IsNil)
c.Assert(len(buckets), Equals, 5)
c.Assert(buckets[0].ACL, Equals, BucketACL("private"))
}
// test re-create bucket
func (s *MyCacheSuite) TestMakeBucketWithSameNameFails(c *C) {
err := dc.MakeBucket("foo3", "private")
c.Assert(err, IsNil)
err = dc.MakeBucket("foo3", "private")
c.Assert(err, Not(IsNil))
}
// test make multiple buckets
func (s *MyCacheSuite) TestCreateMultipleBucketsAndList(c *C) {
// add a second bucket
err := dc.MakeBucket("foo4", "private")
c.Assert(err, IsNil)
err = dc.MakeBucket("bar1", "private")
c.Assert(err, IsNil)
buckets, err := dc.ListBuckets()
c.Assert(err, IsNil)
c.Assert(len(buckets), Equals, 2)
c.Assert(buckets[0].Name, Equals, "bar1")
c.Assert(buckets[1].Name, Equals, "foo4")
err = dc.MakeBucket("foobar1", "private")
c.Assert(err, IsNil)
buckets, err = dc.ListBuckets()
c.Assert(err, IsNil)
c.Assert(len(buckets), Equals, 3)
c.Assert(buckets[2].Name, Equals, "foobar1")
}
// test object create without bucket
func (s *MyCacheSuite) TestNewObjectFailsWithoutBucket(c *C) {
_, err := dc.CreateObject("unknown", "obj", "", 0, nil, nil)
c.Assert(err, Not(IsNil))
}
// test create object metadata
func (s *MyCacheSuite) TestNewObjectMetadata(c *C) {
data := "Hello World"
hasher := md5.New()
hasher.Write([]byte(data))
expectedMd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
reader := ioutil.NopCloser(bytes.NewReader([]byte(data)))
err := dc.MakeBucket("foo6", "private")
c.Assert(err, IsNil)
objectMetadata, err := dc.CreateObject("foo6", "obj", expectedMd5Sum, int64(len(data)), reader, map[string]string{"contentType": "application/json"})
c.Assert(err, IsNil)
c.Assert(objectMetadata.MD5Sum, Equals, hex.EncodeToString(hasher.Sum(nil)))
c.Assert(objectMetadata.Metadata["contentType"], Equals, "application/json")
}
// test create object fails without name
func (s *MyCacheSuite) TestNewObjectFailsWithEmptyName(c *C) {
_, err := dc.CreateObject("foo", "", "", 0, nil, nil)
c.Assert(err, Not(IsNil))
}
// test create object
func (s *MyCacheSuite) TestNewObjectCanBeWritten(c *C) {
err := dc.MakeBucket("foo", "private")
c.Assert(err, IsNil)
data := "Hello World"
hasher := md5.New()
hasher.Write([]byte(data))
expectedMd5Sum := base64.StdEncoding.EncodeToString(hasher.Sum(nil))
reader := ioutil.NopCloser(bytes.NewReader([]byte(data)))
actualMetadata, err := dc.CreateObject("foo", "obj", expectedMd5Sum, int64(len(data)), reader, map[string]string{"contentType": "application/octet-stream"})
c.Assert(err, IsNil)
c.Assert(actualMetadata.MD5Sum, Equals, hex.EncodeToString(hasher.Sum(nil)))
var buffer bytes.Buffer
size, err := dc.GetObject(&buffer, "foo", "obj")
c.Assert(err, IsNil)
c.Assert(size, Equals, int64(len(data)))
c.Assert(buffer.Bytes(), DeepEquals, []byte(data))
actualMetadata, err = dc.GetObjectMetadata("foo", "obj")
c.Assert(err, IsNil)
c.Assert(hex.EncodeToString(hasher.Sum(nil)), Equals, actualMetadata.MD5Sum)
c.Assert(int64(len(data)), Equals, actualMetadata.Size)
}
// test list objects
func (s *MyCacheSuite) TestMultipleNewObjects(c *C) {
c.Assert(dc.MakeBucket("foo5", "private"), IsNil)
one := ioutil.NopCloser(bytes.NewReader([]byte("one")))
_, err := dc.CreateObject("foo5", "obj1", "", int64(len("one")), one, nil)
c.Assert(err, IsNil)
two := ioutil.NopCloser(bytes.NewReader([]byte("two")))
_, err = dc.CreateObject("foo5", "obj2", "", int64(len("two")), two, nil)
c.Assert(err, IsNil)
var buffer1 bytes.Buffer
size, err := dc.GetObject(&buffer1, "foo5", "obj1")
c.Assert(err, IsNil)
c.Assert(size, Equals, int64(len([]byte("one"))))
c.Assert(buffer1.Bytes(), DeepEquals, []byte("one"))
var buffer2 bytes.Buffer
size, err = dc.GetObject(&buffer2, "foo5", "obj2")
c.Assert(err, IsNil)
c.Assert(size, Equals, int64(len([]byte("two"))))
c.Assert(buffer2.Bytes(), DeepEquals, []byte("two"))
/// test list of objects
// test list objects with prefix and delimiter
var resources BucketResourcesMetadata
resources.Prefix = "o"
resources.Delimiter = "1"
resources.Maxkeys = 10
objectsMetadata, resources, err := dc.ListObjects("foo5", resources)
c.Assert(err, IsNil)
c.Assert(resources.IsTruncated, Equals, false)
c.Assert(resources.CommonPrefixes[0], Equals, "obj1")
// test list objects with only delimiter
resources.Prefix = ""
resources.Delimiter = "1"
resources.Maxkeys = 10
objectsMetadata, resources, err = dc.ListObjects("foo5", resources)
c.Assert(err, IsNil)
c.Assert(objectsMetadata[0].Object, Equals, "obj2")
c.Assert(resources.IsTruncated, Equals, false)
c.Assert(resources.CommonPrefixes[0], Equals, "obj1")
// test list objects with only prefix
resources.Prefix = "o"
resources.Delimiter = ""
resources.Maxkeys = 10
objectsMetadata, resources, err = dc.ListObjects("foo5", resources)
c.Assert(err, IsNil)
c.Assert(resources.IsTruncated, Equals, false)
c.Assert(objectsMetadata[0].Object, Equals, "obj1")
c.Assert(objectsMetadata[1].Object, Equals, "obj2")
three := ioutil.NopCloser(bytes.NewReader([]byte("three")))
_, err = dc.CreateObject("foo5", "obj3", "", int64(len("three")), three, nil)
c.Assert(err, IsNil)
var buffer bytes.Buffer
size, err = dc.GetObject(&buffer, "foo5", "obj3")
c.Assert(err, IsNil)
c.Assert(size, Equals, int64(len([]byte("three"))))
c.Assert(buffer.Bytes(), DeepEquals, []byte("three"))
// test list objects with maxkeys
resources.Prefix = "o"
resources.Delimiter = ""
resources.Maxkeys = 2
objectsMetadata, resources, err = dc.ListObjects("foo5", resources)
c.Assert(err, IsNil)
c.Assert(resources.IsTruncated, Equals, true)
c.Assert(len(objectsMetadata), Equals, 2)
}

96
pkg/donut/encoder.go Normal file
View File

@@ -0,0 +1,96 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package donut
import (
"strconv"
encoding "github.com/minio/minio/pkg/erasure"
"github.com/minio/minio/pkg/iodine"
)
// encoder internal struct
type encoder struct {
encoder *encoding.Erasure
k, m uint8
technique encoding.Technique
}
// getErasureTechnique - convert technique string into Technique type
func getErasureTechnique(technique string) (encoding.Technique, error) {
switch true {
case technique == "Cauchy":
return encoding.Cauchy, nil
case technique == "Vandermonde":
return encoding.Cauchy, nil
default:
return encoding.None, iodine.New(InvalidErasureTechnique{Technique: technique}, nil)
}
}
// newEncoder - instantiate a new encoder
func newEncoder(k, m uint8, technique string) (encoder, error) {
errParams := map[string]string{
"k": strconv.FormatUint(uint64(k), 10),
"m": strconv.FormatUint(uint64(m), 10),
"technique": technique,
}
e := encoder{}
t, err := getErasureTechnique(technique)
if err != nil {
return encoder{}, iodine.New(err, errParams)
}
params, err := encoding.ValidateParams(k, m, t)
if err != nil {
return encoder{}, iodine.New(err, errParams)
}
e.encoder = encoding.NewErasure(params)
e.k = k
e.m = m
e.technique = t
return e, nil
}
// TODO - think again if this is needed
// GetEncodedBlockLen - wrapper around erasure function with the same name
func (e encoder) GetEncodedBlockLen(dataLength int) (int, error) {
if dataLength <= 0 {
return 0, iodine.New(InvalidArgument{}, nil)
}
return encoding.GetEncodedBlockLen(dataLength, e.k), nil
}
// Encode - erasure code input bytes
func (e encoder) Encode(data []byte) (encodedData [][]byte, err error) {
if data == nil {
return nil, iodine.New(InvalidArgument{}, nil)
}
encodedData, err = e.encoder.Encode(data)
if err != nil {
return nil, iodine.New(err, nil)
}
return encodedData, nil
}
// Decode - erasure decode input encoded bytes
func (e encoder) Decode(encodedData [][]byte, dataLength int) (data []byte, err error) {
decodedData, err := e.encoder.Decode(encodedData, dataLength)
if err != nil {
return nil, iodine.New(err, nil)
}
return decodedData, nil
}

311
pkg/donut/errors.go Normal file
View File

@@ -0,0 +1,311 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package donut
import "fmt"
// InvalidArgument invalid argument
type InvalidArgument struct{}
func (e InvalidArgument) Error() string {
return "Invalid argument"
}
// UnsupportedFilesystem unsupported filesystem type
type UnsupportedFilesystem struct {
Type string
}
func (e UnsupportedFilesystem) Error() string {
return "Unsupported filesystem: " + e.Type
}
// BucketNotFound bucket does not exist
type BucketNotFound struct {
Bucket string
}
func (e BucketNotFound) Error() string {
return "Bucket not found: " + e.Bucket
}
// ObjectExists object exists
type ObjectExists struct {
Object string
}
func (e ObjectExists) Error() string {
return "Object exists: " + e.Object
}
// ObjectNotFound object does not exist
type ObjectNotFound struct {
Object string
}
func (e ObjectNotFound) Error() string {
return "Object not found: " + e.Object
}
// ObjectCorrupted object found to be corrupted
type ObjectCorrupted struct {
Object string
}
func (e ObjectCorrupted) Error() string {
return "Object found corrupted: " + e.Object
}
// BucketExists bucket exists
type BucketExists struct {
Bucket string
}
func (e BucketExists) Error() string {
return "Bucket exists: " + e.Bucket
}
// CorruptedBackend backend found to be corrupted
type CorruptedBackend struct {
Backend string
}
func (e CorruptedBackend) Error() string {
return "Corrupted backend: " + e.Backend
}
// NotImplemented function not implemented
type NotImplemented struct {
Function string
}
func (e NotImplemented) Error() string {
return "Not implemented: " + e.Function
}
// InvalidDisksArgument invalid number of disks per node
type InvalidDisksArgument struct{}
func (e InvalidDisksArgument) Error() string {
return "Invalid number of disks per node"
}
// BadDigest bad md5sum
type BadDigest struct{}
func (e BadDigest) Error() string {
return "Bad digest"
}
// ParityOverflow parity over flow
type ParityOverflow struct{}
func (e ParityOverflow) Error() string {
return "Parity overflow"
}
// ChecksumMismatch checksum mismatch
type ChecksumMismatch struct{}
func (e ChecksumMismatch) Error() string {
return "Checksum mismatch"
}
// MissingErasureTechnique missing erasure technique
type MissingErasureTechnique struct{}
func (e MissingErasureTechnique) Error() string {
return "Missing erasure technique"
}
// InvalidErasureTechnique invalid erasure technique
type InvalidErasureTechnique struct {
Technique string
}
func (e InvalidErasureTechnique) Error() string {
return "Invalid erasure technique: " + e.Technique
}
// InternalError - generic internal error
type InternalError struct {
}
// BackendError - generic disk backend error
type BackendError struct {
Path string
}
// BackendCorrupted - path has corrupted data
type BackendCorrupted BackendError
// APINotImplemented - generic API not implemented error
type APINotImplemented struct {
API string
}
// GenericBucketError - generic bucket error
type GenericBucketError struct {
Bucket string
}
// GenericObjectError - generic object error
type GenericObjectError struct {
Bucket string
Object string
}
// ImplementationError - generic implementation error
type ImplementationError struct {
Bucket string
Object string
Err error
}
// DigestError - Generic Md5 error
type DigestError struct {
Bucket string
Key string
Md5 string
}
/// ACL related errors
// InvalidACL - acl invalid
type InvalidACL struct {
ACL string
}
func (e InvalidACL) Error() string {
return "Requested ACL is " + e.ACL + " invalid"
}
/// Bucket related errors
// BucketNameInvalid - bucketname provided is invalid
type BucketNameInvalid GenericBucketError
// TooManyBuckets - total buckets exceeded
type TooManyBuckets GenericBucketError
/// Object related errors
// EntityTooLarge - object size exceeds maximum limit
type EntityTooLarge struct {
GenericObjectError
Size string
MaxSize string
}
// ObjectNameInvalid - object name provided is invalid
type ObjectNameInvalid GenericObjectError
// InvalidDigest - md5 in request header invalid
type InvalidDigest DigestError
// Return string an error formatted as the given text
func (e ImplementationError) Error() string {
error := ""
if e.Bucket != "" {
error = error + "Bucket: " + e.Bucket + " "
}
if e.Object != "" {
error = error + "Object: " + e.Object + " "
}
error = error + "Error: " + e.Err.Error()
return error
}
// EmbedError - wrapper function for error object
func EmbedError(bucket, object string, err error) ImplementationError {
return ImplementationError{
Bucket: bucket,
Object: object,
Err: err,
}
}
// Return string an error formatted as the given text
func (e InternalError) Error() string {
return "Internal error occured"
}
// Return string an error formatted as the given text
func (e APINotImplemented) Error() string {
return "Api not implemented: " + e.API
}
// Return string an error formatted as the given text
func (e BucketNameInvalid) Error() string {
return "Bucket name invalid: " + e.Bucket
}
// Return string an error formatted as the given text
func (e TooManyBuckets) Error() string {
return "Bucket limit exceeded beyond 100, cannot create bucket: " + e.Bucket
}
// Return string an error formatted as the given text
func (e ObjectNameInvalid) Error() string {
return "Object name invalid: " + e.Bucket + "#" + e.Object
}
// Return string an error formatted as the given text
func (e EntityTooLarge) Error() string {
return e.Bucket + "#" + e.Object + "with " + e.Size + "reached maximum allowed size limit " + e.MaxSize
}
// Return string an error formatted as the given text
func (e BackendCorrupted) Error() string {
return "Backend corrupted: " + e.Path
}
// Return string an error formatted as the given text
func (e InvalidDigest) Error() string {
return "Md5 provided " + e.Md5 + " is invalid"
}
// OperationNotPermitted - operation not permitted
type OperationNotPermitted struct {
Op string
Reason string
}
func (e OperationNotPermitted) Error() string {
return "Operation " + e.Op + " not permitted for reason: " + e.Reason
}
// InvalidRange - invalid range
type InvalidRange struct {
Start int64
Length int64
}
func (e InvalidRange) Error() string {
return fmt.Sprintf("Invalid range start:%d length:%d", e.Start, e.Length)
}
/// Multipart related errors
// InvalidUploadID invalid upload id
type InvalidUploadID struct {
UploadID string
}
func (e InvalidUploadID) Error() string {
return "Invalid upload id " + e.UploadID
}

70
pkg/donut/interfaces.go Normal file
View File

@@ -0,0 +1,70 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package donut
import "io"
// Collection of Donut specification interfaces
// Interface is a collection of object storage and management interface
type Interface interface {
ObjectStorage
Management
}
// ObjectStorage is a donut object storage interface
type ObjectStorage interface {
// Storage service operations
GetBucketMetadata(bucket string) (BucketMetadata, error)
SetBucketMetadata(bucket string, metadata map[string]string) error
ListBuckets() ([]BucketMetadata, error)
MakeBucket(bucket string, ACL string) error
// Bucket operations
ListObjects(bucket string, resources BucketResourcesMetadata) ([]ObjectMetadata, BucketResourcesMetadata, error)
// Object operations
GetObject(w io.Writer, bucket, object string) (int64, error)
GetPartialObject(w io.Writer, bucket, object string, start, length int64) (int64, error)
GetObjectMetadata(bucket, object string) (ObjectMetadata, error)
CreateObject(bucket, object, expectedMD5Sum string, size int64, reader io.Reader, metadata map[string]string) (ObjectMetadata, error)
Multipart
}
// Multipart API
type Multipart interface {
NewMultipartUpload(bucket, key, contentType string) (string, error)
AbortMultipartUpload(bucket, key, uploadID string) error
CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error)
CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (ObjectMetadata, error)
ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, error)
ListObjectParts(bucket, key string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, error)
}
// Management is a donut management system interface
type Management interface {
Heal() error
Rebalance() error
Info() (map[string][]string, error)
AttachNode(hostname string, disks []string) error
DetachNode(hostname string) error
SaveConfig() error
LoadConfig() error
}

108
pkg/donut/management.go Normal file
View File

@@ -0,0 +1,108 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package donut
import (
"encoding/json"
"path/filepath"
"github.com/minio/minio/pkg/donut/disk"
"github.com/minio/minio/pkg/iodine"
)
// Heal - heal a donut and fix bad data blocks
func (donut API) Heal() error {
return iodine.New(NotImplemented{Function: "Heal"}, nil)
}
// Info - return info about donut configuration
func (donut API) Info() (nodeDiskMap map[string][]string, err error) {
nodeDiskMap = make(map[string][]string)
for nodeName, node := range donut.nodes {
disks, err := node.ListDisks()
if err != nil {
return nil, iodine.New(err, nil)
}
diskList := make([]string, len(disks))
for diskOrder, disk := range disks {
diskList[diskOrder] = disk.GetPath()
}
nodeDiskMap[nodeName] = diskList
}
return nodeDiskMap, nil
}
// AttachNode - attach node
func (donut API) AttachNode(hostname string, disks []string) error {
if hostname == "" || len(disks) == 0 {
return iodine.New(InvalidArgument{}, nil)
}
node, err := newNode(hostname)
if err != nil {
return iodine.New(err, nil)
}
donut.nodes[hostname] = node
for i, d := range disks {
newDisk, err := disk.New(d)
if err != nil {
return iodine.New(err, nil)
}
if err := newDisk.MakeDir(donut.config.DonutName); err != nil {
return iodine.New(err, nil)
}
if err := node.AttachDisk(newDisk, i); err != nil {
return iodine.New(err, nil)
}
}
return nil
}
// DetachNode - detach node
func (donut API) DetachNode(hostname string) error {
delete(donut.nodes, hostname)
return nil
}
// SaveConfig - save donut configuration
func (donut API) SaveConfig() error {
nodeDiskMap := make(map[string][]string)
for hostname, node := range donut.nodes {
disks, err := node.ListDisks()
if err != nil {
return iodine.New(err, nil)
}
for order, disk := range disks {
donutConfigPath := filepath.Join(donut.config.DonutName, donutConfig)
donutConfigWriter, err := disk.CreateFile(donutConfigPath)
defer donutConfigWriter.Close()
if err != nil {
return iodine.New(err, nil)
}
nodeDiskMap[hostname][order] = disk.GetPath()
jenc := json.NewEncoder(donutConfigWriter)
if err := jenc.Encode(nodeDiskMap); err != nil {
return iodine.New(err, nil)
}
}
}
return nil
}
// LoadConfig - load configuration
func (donut API) LoadConfig() error {
return iodine.New(NotImplemented{Function: "LoadConfig"}, nil)
}

405
pkg/donut/multipart.go Normal file
View File

@@ -0,0 +1,405 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package donut
import (
"bytes"
"crypto/md5"
"crypto/sha512"
"encoding/base64"
"encoding/hex"
"errors"
"io"
"math/rand"
"runtime/debug"
"sort"
"strconv"
"strings"
"time"
"github.com/minio/minio/pkg/iodine"
)
// NewMultipartUpload -
func (donut API) NewMultipartUpload(bucket, key, contentType string) (string, error) {
donut.lock.RLock()
if !IsValidBucket(bucket) {
donut.lock.RUnlock()
return "", iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
}
if !IsValidObjectName(key) {
donut.lock.RUnlock()
return "", iodine.New(ObjectNameInvalid{Object: key}, nil)
}
if _, ok := donut.storedBuckets[bucket]; ok == false {
donut.lock.RUnlock()
return "", iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := donut.storedBuckets[bucket]
objectKey := bucket + "/" + key
if _, ok := storedBucket.objectMetadata[objectKey]; ok == true {
donut.lock.RUnlock()
return "", iodine.New(ObjectExists{Object: key}, nil)
}
donut.lock.RUnlock()
//donut.lock.Lock()
id := []byte(strconv.FormatInt(rand.Int63(), 10) + bucket + key + time.Now().String())
uploadIDSum := sha512.Sum512(id)
uploadID := base64.URLEncoding.EncodeToString(uploadIDSum[:])[:47]
donut.storedBuckets[bucket].multiPartSession[key] = multiPartSession{
uploadID: uploadID,
initiated: time.Now(),
totalParts: 0,
}
//donut.lock.Unlock()
return uploadID, nil
}
// AbortMultipartUpload -
func (donut API) AbortMultipartUpload(bucket, key, uploadID string) error {
donut.lock.RLock()
storedBucket := donut.storedBuckets[bucket]
if storedBucket.multiPartSession[key].uploadID != uploadID {
donut.lock.RUnlock()
return iodine.New(InvalidUploadID{UploadID: uploadID}, nil)
}
donut.lock.RUnlock()
donut.cleanupMultiparts(bucket, key, uploadID)
donut.cleanupMultipartSession(bucket, key, uploadID)
return nil
}
func getMultipartKey(key string, uploadID string, partNumber int) string {
return key + "?uploadId=" + uploadID + "&partNumber=" + strconv.Itoa(partNumber)
}
// CreateObjectPart -
func (donut API) CreateObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
// Verify upload id
donut.lock.RLock()
storedBucket := donut.storedBuckets[bucket]
if storedBucket.multiPartSession[key].uploadID != uploadID {
donut.lock.RUnlock()
return "", iodine.New(InvalidUploadID{UploadID: uploadID}, nil)
}
donut.lock.RUnlock()
etag, err := donut.createObjectPart(bucket, key, uploadID, partID, "", expectedMD5Sum, size, data)
if err != nil {
return "", iodine.New(err, nil)
}
// free
debug.FreeOSMemory()
return etag, nil
}
// createObject - PUT object to cache buffer
func (donut API) createObjectPart(bucket, key, uploadID string, partID int, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
donut.lock.RLock()
if !IsValidBucket(bucket) {
donut.lock.RUnlock()
return "", iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
}
if !IsValidObjectName(key) {
donut.lock.RUnlock()
return "", iodine.New(ObjectNameInvalid{Object: key}, nil)
}
if _, ok := donut.storedBuckets[bucket]; ok == false {
donut.lock.RUnlock()
return "", iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := donut.storedBuckets[bucket]
// get object key
partKey := bucket + "/" + getMultipartKey(key, uploadID, partID)
if _, ok := storedBucket.partMetadata[partKey]; ok == true {
donut.lock.RUnlock()
return storedBucket.partMetadata[partKey].ETag, nil
}
donut.lock.RUnlock()
if contentType == "" {
contentType = "application/octet-stream"
}
contentType = strings.TrimSpace(contentType)
if strings.TrimSpace(expectedMD5Sum) != "" {
expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum))
if err != nil {
// pro-actively close the connection
return "", iodine.New(InvalidDigest{Md5: expectedMD5Sum}, nil)
}
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
}
// calculate md5
hash := md5.New()
var readBytes []byte
var err error
var length int
for err == nil {
byteBuffer := make([]byte, 1024*1024)
length, err = data.Read(byteBuffer)
// While hash.Write() wouldn't mind a Nil byteBuffer
// It is necessary for us to verify this and break
if length == 0 {
break
}
hash.Write(byteBuffer[0:length])
readBytes = append(readBytes, byteBuffer[0:length]...)
}
if err != io.EOF {
return "", iodine.New(err, nil)
}
go debug.FreeOSMemory()
md5SumBytes := hash.Sum(nil)
totalLength := int64(len(readBytes))
//donut.lock.Lock()
donut.multiPartObjects.Set(partKey, readBytes)
//donut.lock.Unlock()
// setting up for de-allocation
readBytes = nil
md5Sum := hex.EncodeToString(md5SumBytes)
// Verify if the written object is equal to what is expected, only if it is requested as such
if strings.TrimSpace(expectedMD5Sum) != "" {
if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum); err != nil {
return "", iodine.New(BadDigest{}, nil)
}
}
newPart := PartMetadata{
PartNumber: partID,
LastModified: time.Now().UTC(),
ETag: md5Sum,
Size: totalLength,
}
//donut.lock.Lock()
storedBucket.partMetadata[partKey] = newPart
multiPartSession := storedBucket.multiPartSession[key]
multiPartSession.totalParts++
storedBucket.multiPartSession[key] = multiPartSession
donut.storedBuckets[bucket] = storedBucket
//donut.lock.Unlock()
return md5Sum, nil
}
func (donut API) cleanupMultipartSession(bucket, key, uploadID string) {
// donut.lock.Lock()
// defer donut.lock.Unlock()
delete(donut.storedBuckets[bucket].multiPartSession, key)
}
func (donut API) cleanupMultiparts(bucket, key, uploadID string) {
for i := 1; i <= donut.storedBuckets[bucket].multiPartSession[key].totalParts; i++ {
objectKey := bucket + "/" + getMultipartKey(key, uploadID, i)
donut.multiPartObjects.Delete(objectKey)
}
}
// CompleteMultipartUpload -
func (donut API) CompleteMultipartUpload(bucket, key, uploadID string, parts map[int]string) (ObjectMetadata, error) {
if !IsValidBucket(bucket) {
return ObjectMetadata{}, iodine.New(BucketNameInvalid{Bucket: bucket}, nil)
}
if !IsValidObjectName(key) {
return ObjectMetadata{}, iodine.New(ObjectNameInvalid{Object: key}, nil)
}
// Verify upload id
donut.lock.RLock()
if _, ok := donut.storedBuckets[bucket]; ok == false {
donut.lock.RUnlock()
return ObjectMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := donut.storedBuckets[bucket]
if storedBucket.multiPartSession[key].uploadID != uploadID {
donut.lock.RUnlock()
return ObjectMetadata{}, iodine.New(InvalidUploadID{UploadID: uploadID}, nil)
}
donut.lock.RUnlock()
//donut.lock.Lock()
var size int64
var fullObject bytes.Buffer
for i := 1; i <= len(parts); i++ {
recvMD5 := parts[i]
object, ok := donut.multiPartObjects.Get(bucket + "/" + getMultipartKey(key, uploadID, i))
if ok == false {
donut.lock.Unlock()
return ObjectMetadata{}, iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)
}
size += int64(len(object))
calcMD5Bytes := md5.Sum(object)
// complete multi part request header md5sum per part is hex encoded
recvMD5Bytes, err := hex.DecodeString(strings.Trim(recvMD5, "\""))
if err != nil {
return ObjectMetadata{}, iodine.New(InvalidDigest{Md5: recvMD5}, nil)
}
if !bytes.Equal(recvMD5Bytes, calcMD5Bytes[:]) {
return ObjectMetadata{}, iodine.New(BadDigest{}, nil)
}
_, err = io.Copy(&fullObject, bytes.NewBuffer(object))
if err != nil {
return ObjectMetadata{}, iodine.New(err, nil)
}
object = nil
go debug.FreeOSMemory()
}
//donut.lock.Unlock()
md5sumSlice := md5.Sum(fullObject.Bytes())
// this is needed for final verification inside CreateObject, do not convert this to hex
md5sum := base64.StdEncoding.EncodeToString(md5sumSlice[:])
objectMetadata, err := donut.CreateObject(bucket, key, md5sum, size, &fullObject, nil)
if err != nil {
// No need to call internal cleanup functions here, caller will call AbortMultipartUpload()
// which would in-turn cleanup properly in accordance with S3 Spec
return ObjectMetadata{}, iodine.New(err, nil)
}
fullObject.Reset()
donut.cleanupMultiparts(bucket, key, uploadID)
donut.cleanupMultipartSession(bucket, key, uploadID)
return objectMetadata, nil
}
// byKey is a sortable interface for UploadMetadata slice
type byKey []*UploadMetadata
func (a byKey) Len() int { return len(a) }
func (a byKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
// ListMultipartUploads -
func (donut API) ListMultipartUploads(bucket string, resources BucketMultipartResourcesMetadata) (BucketMultipartResourcesMetadata, error) {
// TODO handle delimiter
donut.lock.RLock()
defer donut.lock.RUnlock()
if _, ok := donut.storedBuckets[bucket]; ok == false {
return BucketMultipartResourcesMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := donut.storedBuckets[bucket]
var uploads []*UploadMetadata
for key, session := range storedBucket.multiPartSession {
if strings.HasPrefix(key, resources.Prefix) {
if len(uploads) > resources.MaxUploads {
sort.Sort(byKey(uploads))
resources.Upload = uploads
resources.NextKeyMarker = key
resources.NextUploadIDMarker = session.uploadID
resources.IsTruncated = true
return resources, nil
}
// uploadIDMarker is ignored if KeyMarker is empty
switch {
case resources.KeyMarker != "" && resources.UploadIDMarker == "":
if key > resources.KeyMarker {
upload := new(UploadMetadata)
upload.Key = key
upload.UploadID = session.uploadID
upload.Initiated = session.initiated
uploads = append(uploads, upload)
}
case resources.KeyMarker != "" && resources.UploadIDMarker != "":
if session.uploadID > resources.UploadIDMarker {
if key >= resources.KeyMarker {
upload := new(UploadMetadata)
upload.Key = key
upload.UploadID = session.uploadID
upload.Initiated = session.initiated
uploads = append(uploads, upload)
}
}
default:
upload := new(UploadMetadata)
upload.Key = key
upload.UploadID = session.uploadID
upload.Initiated = session.initiated
uploads = append(uploads, upload)
}
}
}
sort.Sort(byKey(uploads))
resources.Upload = uploads
return resources, nil
}
// partNumber is a sortable interface for Part slice
type partNumber []*PartMetadata
func (a partNumber) Len() int { return len(a) }
func (a partNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a partNumber) Less(i, j int) bool { return a[i].PartNumber < a[j].PartNumber }
// ListObjectParts -
func (donut API) ListObjectParts(bucket, key string, resources ObjectResourcesMetadata) (ObjectResourcesMetadata, error) {
// Verify upload id
donut.lock.RLock()
defer donut.lock.RUnlock()
if _, ok := donut.storedBuckets[bucket]; ok == false {
return ObjectResourcesMetadata{}, iodine.New(BucketNotFound{Bucket: bucket}, nil)
}
storedBucket := donut.storedBuckets[bucket]
if _, ok := storedBucket.multiPartSession[key]; ok == false {
return ObjectResourcesMetadata{}, iodine.New(ObjectNotFound{Object: key}, nil)
}
if storedBucket.multiPartSession[key].uploadID != resources.UploadID {
return ObjectResourcesMetadata{}, iodine.New(InvalidUploadID{UploadID: resources.UploadID}, nil)
}
objectResourcesMetadata := resources
objectResourcesMetadata.Bucket = bucket
objectResourcesMetadata.Key = key
var parts []*PartMetadata
var startPartNumber int
switch {
case objectResourcesMetadata.PartNumberMarker == 0:
startPartNumber = 1
default:
startPartNumber = objectResourcesMetadata.PartNumberMarker
}
for i := startPartNumber; i <= storedBucket.multiPartSession[key].totalParts; i++ {
if len(parts) > objectResourcesMetadata.MaxParts {
sort.Sort(partNumber(parts))
objectResourcesMetadata.IsTruncated = true
objectResourcesMetadata.Part = parts
objectResourcesMetadata.NextPartNumberMarker = i
return objectResourcesMetadata, nil
}
part, ok := storedBucket.partMetadata[bucket+"/"+getMultipartKey(key, resources.UploadID, i)]
if !ok {
return ObjectResourcesMetadata{}, iodine.New(errors.New("missing part: "+strconv.Itoa(i)), nil)
}
parts = append(parts, &part)
}
sort.Sort(partNumber(parts))
objectResourcesMetadata.Part = parts
return objectResourcesMetadata, nil
}
func (donut API) expiredPart(a ...interface{}) {
key := a[0].(string)
// loop through all buckets
for _, storedBucket := range donut.storedBuckets {
delete(storedBucket.partMetadata, key)
}
debug.FreeOSMemory()
}

76
pkg/donut/node.go Normal file
View File

@@ -0,0 +1,76 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package donut
import (
"github.com/minio/minio/pkg/donut/disk"
"github.com/minio/minio/pkg/iodine"
)
// node struct internal
type node struct {
hostname string
disks map[int]disk.Disk
}
// newNode - instantiates a new node
func newNode(hostname string) (node, error) {
if hostname == "" {
return node{}, iodine.New(InvalidArgument{}, nil)
}
disks := make(map[int]disk.Disk)
n := node{
hostname: hostname,
disks: disks,
}
return n, nil
}
// GetHostname - return hostname
func (n node) GetHostname() string {
return n.hostname
}
// ListDisks - return number of disks
func (n node) ListDisks() (map[int]disk.Disk, error) {
return n.disks, nil
}
// AttachDisk - attach a disk
func (n node) AttachDisk(disk disk.Disk, diskOrder int) error {
if diskOrder < 0 {
return iodine.New(InvalidArgument{}, nil)
}
n.disks[diskOrder] = disk
return nil
}
// DetachDisk - detach a disk
func (n node) DetachDisk(diskOrder int) error {
delete(n.disks, diskOrder)
return nil
}
// SaveConfig - save node configuration
func (n node) SaveConfig() error {
return iodine.New(NotImplemented{Function: "SaveConfig"}, nil)
}
// LoadConfig - load node configuration from saved configs
func (n node) LoadConfig() error {
return iodine.New(NotImplemented{Function: "LoadConfig"}, nil)
}

57
pkg/donut/rebalance.go Normal file
View File

@@ -0,0 +1,57 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package donut
import (
"fmt"
"os"
"strings"
"github.com/minio/minio/pkg/donut/disk"
"github.com/minio/minio/pkg/iodine"
)
// Rebalance -
func (donut API) Rebalance() error {
var totalOffSetLength int
var newDisks []disk.Disk
var existingDirs []os.FileInfo
for _, node := range donut.nodes {
disks, err := node.ListDisks()
if err != nil {
return iodine.New(err, nil)
}
totalOffSetLength = len(disks)
fmt.Println(totalOffSetLength)
for _, disk := range disks {
dirs, err := disk.ListDir(donut.config.DonutName)
if err != nil {
return iodine.New(err, nil)
}
if len(dirs) == 0 {
newDisks = append(newDisks, disk)
}
existingDirs = append(existingDirs, dirs...)
}
}
for _, dir := range existingDirs {
splits := strings.Split(dir.Name(), "$")
bucketName, segment, offset := splits[0], splits[1], splits[2]
fmt.Println(bucketName, segment, offset)
}
return nil
}

226
pkg/donut/trove/trove.go Normal file
View File

@@ -0,0 +1,226 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package trove implements in memory caching methods
package trove
import (
"sync"
"time"
)
var noExpiration = time.Duration(0)
// Cache holds the required variables to compose an in memory cache system
// which also provides expiring key mechanism and also maxSize
type Cache struct {
// Mutex is used for handling the concurrent
// read/write requests for cache
sync.Mutex
// items hold the cached objects
items map[string][]byte
// updatedAt holds the time that related item's updated at
updatedAt map[string]time.Time
// expiration is a duration for a cache key to expire
expiration time.Duration
// stopExpireTimer channel to quit the timer thread
stopExpireTimer chan struct{}
// maxSize is a total size for overall cache
maxSize uint64
// currentSize is a current size in memory
currentSize uint64
// OnExpired - callback function for eviction
OnExpired func(a ...interface{})
// totalExpired counter to keep track of total expirations
totalExpired uint64
}
// Stats current cache statistics
type Stats struct {
Bytes uint64
Items uint64
Expired uint64
}
// NewCache creates an inmemory cache
//
// maxSize is used for expiring objects before we run out of memory
// expiration is used for expiration of a key from cache
func NewCache(maxSize uint64, expiration time.Duration) *Cache {
return &Cache{
items: make(map[string][]byte),
updatedAt: map[string]time.Time{},
expiration: expiration,
maxSize: maxSize,
}
}
// Stats get current cache statistics
func (r *Cache) Stats() Stats {
return Stats{
Bytes: r.currentSize,
Items: uint64(len(r.items)),
Expired: r.totalExpired,
}
}
// ExpireObjects expire objects in go routine
func (r *Cache) ExpireObjects(gcInterval time.Duration) {
r.stopExpireTimer = make(chan struct{})
ticker := time.NewTicker(gcInterval)
go func() {
for {
select {
case <-ticker.C:
r.Expire()
case <-r.stopExpireTimer:
ticker.Stop()
return
}
}
}()
}
// Get returns a value of a given key if it exists
func (r *Cache) Get(key string) ([]byte, bool) {
r.Lock()
defer r.Unlock()
value, ok := r.items[key]
if !ok {
return nil, false
}
r.updatedAt[key] = time.Now()
return value, true
}
// Len returns length of the value of a given key, returns zero if key doesn't exist
func (r *Cache) Len(key string) int {
r.Lock()
defer r.Unlock()
_, ok := r.items[key]
if !ok {
return 0
}
return len(r.items[key])
}
// Append will append new data to an existing key,
// if key doesn't exist it behaves like Set()
func (r *Cache) Append(key string, value []byte) bool {
r.Lock()
defer r.Unlock()
valueLen := uint64(len(value))
if r.maxSize > 0 {
// check if the size of the object is not bigger than the
// capacity of the cache
if valueLen > r.maxSize {
return false
}
// remove random key if only we reach the maxSize threshold
for (r.currentSize + valueLen) > r.maxSize {
for randomKey := range r.items {
r.doDelete(randomKey)
break
}
}
}
_, ok := r.items[key]
if !ok {
r.items[key] = value
r.currentSize += valueLen
r.updatedAt[key] = time.Now()
return true
}
r.items[key] = append(r.items[key], value...)
r.currentSize += valueLen
r.updatedAt[key] = time.Now()
return true
}
// Set will persist a value to the cache
func (r *Cache) Set(key string, value []byte) bool {
r.Lock()
defer r.Unlock()
valueLen := uint64(len(value))
if r.maxSize > 0 {
// check if the size of the object is not bigger than the
// capacity of the cache
if valueLen > r.maxSize {
return false
}
// remove random key if only we reach the maxSize threshold
for (r.currentSize + valueLen) > r.maxSize {
for randomKey := range r.items {
r.doDelete(randomKey)
break
}
}
}
r.items[key] = value
r.currentSize += valueLen
r.updatedAt[key] = time.Now()
return true
}
// Expire expires keys which have expired
func (r *Cache) Expire() {
r.Lock()
defer r.Unlock()
for key := range r.items {
if !r.isValid(key) {
r.doDelete(key)
}
}
}
// Delete deletes a given key if exists
func (r *Cache) Delete(key string) {
r.Lock()
defer r.Unlock()
r.doDelete(key)
}
func (r *Cache) doDelete(key string) {
if _, ok := r.items[key]; ok {
r.currentSize -= uint64(len(r.items[key]))
delete(r.items, key)
delete(r.updatedAt, key)
r.totalExpired++
if r.OnExpired != nil {
r.OnExpired(key)
}
}
}
func (r *Cache) isValid(key string) bool {
updatedAt, ok := r.updatedAt[key]
if !ok {
return false
}
if r.expiration == noExpiration {
return true
}
return updatedAt.Add(r.expiration).After(time.Now())
}

View File

@@ -0,0 +1,45 @@
/*
* Minimalist Object Storage, (C) 2015 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package trove
import (
"testing"
. "github.com/minio/check"
)
func Test(t *testing.T) { TestingT(t) }
type MySuite struct{}
var _ = Suite(&MySuite{})
func (s *MySuite) TestCache(c *C) {
cache := NewCache(1000, 0)
data := []byte("Hello, world!")
ok := cache.Set("filename", data)
c.Assert(ok, Equals, true)
storedata, ok := cache.Get("filename")
c.Assert(ok, Equals, true)
c.Assert(data, DeepEquals, storedata)
cache.Delete("filename")
_, ok = cache.Get("filename")
c.Assert(ok, Equals, false)
}

201
pkg/donut/utils.go Normal file
View File

@@ -0,0 +1,201 @@
package donut
import (
"regexp"
"strings"
"time"
"unicode/utf8"
)
// BucketACL - bucket level access control
type BucketACL string
// different types of ACL's currently supported for buckets
const (
BucketPrivate = BucketACL("private")
BucketPublicRead = BucketACL("public-read")
BucketPublicReadWrite = BucketACL("public-read-write")
)
func (b BucketACL) String() string {
return string(b)
}
// IsPrivate - is acl Private
func (b BucketACL) IsPrivate() bool {
return b == BucketACL("private")
}
// IsPublicRead - is acl PublicRead
func (b BucketACL) IsPublicRead() bool {
return b == BucketACL("public-read")
}
// IsPublicReadWrite - is acl PublicReadWrite
func (b BucketACL) IsPublicReadWrite() bool {
return b == BucketACL("public-read-write")
}
// FilterMode type
type FilterMode int
// FilterMode list
const (
DelimiterPrefixMode FilterMode = iota
DelimiterMode
PrefixMode
DefaultMode
)
// PartMetadata - various types of individual part resources
type PartMetadata struct {
PartNumber int
LastModified time.Time
ETag string
Size int64
}
// ObjectResourcesMetadata - various types of object resources
type ObjectResourcesMetadata struct {
Bucket string
EncodingType string
Key string
UploadID string
StorageClass string
PartNumberMarker int
NextPartNumberMarker int
MaxParts int
IsTruncated bool
Part []*PartMetadata
}
// UploadMetadata container capturing metadata on in progress multipart upload in a given bucket
type UploadMetadata struct {
Key string
UploadID string
StorageClass string
Initiated time.Time
}
// BucketMultipartResourcesMetadata - various types of bucket resources for inprogress multipart uploads
type BucketMultipartResourcesMetadata struct {
KeyMarker string
UploadIDMarker string
NextKeyMarker string
NextUploadIDMarker string
EncodingType string
MaxUploads int
IsTruncated bool
Upload []*UploadMetadata
Prefix string
Delimiter string
CommonPrefixes []string
}
// BucketResourcesMetadata - various types of bucket resources
type BucketResourcesMetadata struct {
Prefix string
Marker string
NextMarker string
Maxkeys int
EncodingType string
Delimiter string
IsTruncated bool
CommonPrefixes []string
Mode FilterMode
}
// GetMode - Populate filter mode
func GetMode(resources BucketResourcesMetadata) FilterMode {
var f FilterMode
switch true {
case resources.Delimiter != "" && resources.Prefix != "":
f = DelimiterPrefixMode
case resources.Delimiter != "" && resources.Prefix == "":
f = DelimiterMode
case resources.Delimiter == "" && resources.Prefix != "":
f = PrefixMode
case resources.Delimiter == "" && resources.Prefix == "":
f = DefaultMode
}
return f
}
// IsValidBucketACL - is provided acl string supported
func IsValidBucketACL(acl string) bool {
switch acl {
case "private":
fallthrough
case "public-read":
fallthrough
case "public-read-write":
return true
case "":
// by default its "private"
return true
default:
return false
}
}
// IsDelimiterPrefixSet Delimiter and Prefix set
func (b BucketResourcesMetadata) IsDelimiterPrefixSet() bool {
return b.Mode == DelimiterPrefixMode
}
// IsDelimiterSet Delimiter set
func (b BucketResourcesMetadata) IsDelimiterSet() bool {
return b.Mode == DelimiterMode
}
// IsPrefixSet Prefix set
func (b BucketResourcesMetadata) IsPrefixSet() bool {
return b.Mode == PrefixMode
}
// IsDefault No query values
func (b BucketResourcesMetadata) IsDefault() bool {
return b.Mode == DefaultMode
}
// IsValidBucket - verify bucket name in accordance with
// - http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingBucket.html
func IsValidBucket(bucket string) bool {
if len(bucket) < 3 || len(bucket) > 63 {
return false
}
if bucket[0] == '.' || bucket[len(bucket)-1] == '.' {
return false
}
if match, _ := regexp.MatchString("\\.\\.", bucket); match == true {
return false
}
// We don't support buckets with '.' in them
match, _ := regexp.MatchString("^[a-zA-Z][a-zA-Z0-9\\-]+[a-zA-Z0-9]$", bucket)
return match
}
// IsValidObjectName - verify object name in accordance with
// - http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html
func IsValidObjectName(object string) bool {
if strings.TrimSpace(object) == "" {
return false
}
if len(object) > 1024 || len(object) == 0 {
return false
}
if !utf8.ValidString(object) {
return false
}
return true
}
// IsValidPrefix - verify prefix name is correct, an empty prefix is valid
func IsValidPrefix(prefix string) bool {
if strings.TrimSpace(prefix) == "" {
return true
}
return IsValidObjectName(prefix)
}