mirror of https://github.com/minio/minio.git
316 lines
9.7 KiB
Go
316 lines
9.7 KiB
Go
/*
|
|
* Minimalist Object Storage, (C) 2015 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package donut
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/md5"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"hash"
|
|
"io"
|
|
"path"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/minio/minio/pkg/iodine"
|
|
"github.com/minio/minio/pkg/utils/split"
|
|
)
|
|
|
|
/// This file contains all the internal functions used by Bucket interface
|
|
|
|
// isMD5SumEqual - returns error if md5sum mismatches, other its `nil`
|
|
func (b bucket) isMD5SumEqual(expectedMD5Sum, actualMD5Sum string) error {
|
|
if strings.TrimSpace(expectedMD5Sum) != "" && strings.TrimSpace(actualMD5Sum) != "" {
|
|
expectedMD5SumBytes, err := hex.DecodeString(expectedMD5Sum)
|
|
if err != nil {
|
|
return iodine.New(err, nil)
|
|
}
|
|
actualMD5SumBytes, err := hex.DecodeString(actualMD5Sum)
|
|
if err != nil {
|
|
return iodine.New(err, nil)
|
|
}
|
|
if !bytes.Equal(expectedMD5SumBytes, actualMD5SumBytes) {
|
|
return iodine.New(errors.New("bad digest, md5sum mismatch"), nil)
|
|
}
|
|
return nil
|
|
}
|
|
return iodine.New(errors.New("invalid argument"), nil)
|
|
}
|
|
|
|
// writeObjectMetadata - write additional object metadata
|
|
func (b bucket) writeObjectMetadata(objectName string, objectMetadata map[string]string) error {
|
|
if len(objectMetadata) == 0 {
|
|
return iodine.New(errors.New("invalid argument"), nil)
|
|
}
|
|
objectMetadataWriters, err := b.getDiskWriters(objectName, objectMetadataConfig)
|
|
if err != nil {
|
|
return iodine.New(err, nil)
|
|
}
|
|
for _, objectMetadataWriter := range objectMetadataWriters {
|
|
defer objectMetadataWriter.Close()
|
|
}
|
|
for _, objectMetadataWriter := range objectMetadataWriters {
|
|
jenc := json.NewEncoder(objectMetadataWriter)
|
|
if err := jenc.Encode(objectMetadata); err != nil {
|
|
return iodine.New(err, nil)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// writeDonutObjectMetadata - write donut related object metadata
|
|
func (b bucket) writeDonutObjectMetadata(objectName string, objectMetadata map[string]string) error {
|
|
if len(objectMetadata) == 0 {
|
|
return iodine.New(errors.New("invalid argument"), nil)
|
|
}
|
|
objectMetadataWriters, err := b.getDiskWriters(objectName, donutObjectMetadataConfig)
|
|
if err != nil {
|
|
return iodine.New(err, nil)
|
|
}
|
|
for _, objectMetadataWriter := range objectMetadataWriters {
|
|
defer objectMetadataWriter.Close()
|
|
}
|
|
for _, objectMetadataWriter := range objectMetadataWriters {
|
|
jenc := json.NewEncoder(objectMetadataWriter)
|
|
if err := jenc.Encode(objectMetadata); err != nil {
|
|
return iodine.New(err, nil)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TODO - This a temporary normalization of objectNames, need to find a better way
|
|
//
|
|
// normalizedObjectName - all objectNames with "/" get normalized to a simple objectName
|
|
//
|
|
// example:
|
|
// user provided value - "this/is/my/deep/directory/structure"
|
|
// donut normalized value - "this-is-my-deep-directory-structure"
|
|
//
|
|
func (b bucket) normalizeObjectName(objectName string) string {
|
|
// replace every '/' with '-'
|
|
return strings.Replace(objectName, "/", "-", -1)
|
|
}
|
|
|
|
// getDataAndParity - calculate k, m (data and parity) values from number of disks
|
|
func (b bucket) getDataAndParity(totalWriters int) (k uint8, m uint8, err error) {
|
|
if totalWriters <= 1 {
|
|
return 0, 0, iodine.New(errors.New("invalid argument"), nil)
|
|
}
|
|
quotient := totalWriters / 2 // not using float or abs to let integer round off to lower value
|
|
// quotient cannot be bigger than (255 / 2) = 127
|
|
if quotient > 127 {
|
|
return 0, 0, iodine.New(errors.New("parity over flow"), nil)
|
|
}
|
|
remainder := totalWriters % 2 // will be 1 for odd and 0 for even numbers
|
|
k = uint8(quotient + remainder)
|
|
m = uint8(quotient)
|
|
return k, m, nil
|
|
}
|
|
|
|
// writeEncodedData -
|
|
func (b bucket) writeEncodedData(k, m uint8, writers []io.WriteCloser, objectData io.Reader, summer hash.Hash) (int, int, error) {
|
|
chunks := split.Stream(objectData, 10*1024*1024)
|
|
encoder, err := NewEncoder(k, m, "Cauchy")
|
|
if err != nil {
|
|
return 0, 0, iodine.New(err, nil)
|
|
}
|
|
chunkCount := 0
|
|
totalLength := 0
|
|
for chunk := range chunks {
|
|
if chunk.Err == nil {
|
|
totalLength = totalLength + len(chunk.Data)
|
|
encodedBlocks, _ := encoder.Encode(chunk.Data)
|
|
summer.Write(chunk.Data)
|
|
for blockIndex, block := range encodedBlocks {
|
|
_, err := io.Copy(writers[blockIndex], bytes.NewBuffer(block))
|
|
if err != nil {
|
|
return 0, 0, iodine.New(err, nil)
|
|
}
|
|
}
|
|
}
|
|
chunkCount = chunkCount + 1
|
|
}
|
|
return chunkCount, totalLength, nil
|
|
}
|
|
|
|
// readEncodedData -
|
|
func (b bucket) readEncodedData(objectName string, writer *io.PipeWriter, donutObjectMetadata map[string]string) {
|
|
expectedMd5sum, err := hex.DecodeString(donutObjectMetadata["sys.md5"])
|
|
if err != nil {
|
|
writer.CloseWithError(iodine.New(err, nil))
|
|
return
|
|
}
|
|
readers, err := b.getDiskReaders(objectName, "data")
|
|
if err != nil {
|
|
writer.CloseWithError(iodine.New(err, nil))
|
|
return
|
|
}
|
|
hasher := md5.New()
|
|
mwriter := io.MultiWriter(writer, hasher)
|
|
switch len(readers) == 1 {
|
|
case false:
|
|
totalChunks, totalLeft, blockSize, k, m, err := b.donutMetadata2Values(donutObjectMetadata)
|
|
if err != nil {
|
|
writer.CloseWithError(iodine.New(err, nil))
|
|
return
|
|
}
|
|
technique, ok := donutObjectMetadata["sys.erasureTechnique"]
|
|
if !ok {
|
|
err := errors.New("missing erasure Technique")
|
|
writer.CloseWithError(iodine.New(err, nil))
|
|
return
|
|
}
|
|
encoder, err := NewEncoder(uint8(k), uint8(m), technique)
|
|
if err != nil {
|
|
writer.CloseWithError(iodine.New(err, nil))
|
|
return
|
|
}
|
|
for i := 0; i < totalChunks; i++ {
|
|
decodedData, err := b.decodeEncodedData(totalLeft, blockSize, readers, encoder, writer)
|
|
if err != nil {
|
|
writer.CloseWithError(iodine.New(err, nil))
|
|
return
|
|
}
|
|
_, err = io.Copy(mwriter, bytes.NewBuffer(decodedData))
|
|
if err != nil {
|
|
writer.CloseWithError(iodine.New(err, nil))
|
|
return
|
|
}
|
|
totalLeft = totalLeft - int64(blockSize)
|
|
}
|
|
case true:
|
|
_, err := io.Copy(writer, readers[0])
|
|
if err != nil {
|
|
writer.CloseWithError(iodine.New(err, nil))
|
|
return
|
|
}
|
|
}
|
|
// check if decodedData md5sum matches
|
|
if !bytes.Equal(expectedMd5sum, hasher.Sum(nil)) {
|
|
err := errors.New("checksum mismatch")
|
|
writer.CloseWithError(iodine.New(err, nil))
|
|
return
|
|
}
|
|
writer.Close()
|
|
return
|
|
}
|
|
|
|
// decodeEncodedData -
|
|
func (b bucket) decodeEncodedData(totalLeft, blockSize int64, readers []io.ReadCloser, encoder Encoder, writer *io.PipeWriter) ([]byte, error) {
|
|
var curBlockSize int64
|
|
if blockSize < totalLeft {
|
|
curBlockSize = blockSize
|
|
} else {
|
|
curBlockSize = totalLeft // cast is safe, blockSize in if protects
|
|
}
|
|
curChunkSize, err := encoder.GetEncodedBlockLen(int(curBlockSize))
|
|
if err != nil {
|
|
return nil, iodine.New(err, nil)
|
|
}
|
|
encodedBytes := make([][]byte, len(readers))
|
|
for i, reader := range readers {
|
|
var bytesBuffer bytes.Buffer
|
|
_, err := io.CopyN(&bytesBuffer, reader, int64(curChunkSize))
|
|
if err != nil {
|
|
return nil, iodine.New(err, nil)
|
|
}
|
|
encodedBytes[i] = bytesBuffer.Bytes()
|
|
}
|
|
decodedData, err := encoder.Decode(encodedBytes, int(curBlockSize))
|
|
if err != nil {
|
|
return nil, iodine.New(err, nil)
|
|
}
|
|
return decodedData, nil
|
|
}
|
|
|
|
// donutMetadata2Values -
|
|
func (b bucket) donutMetadata2Values(donutObjectMetadata map[string]string) (totalChunks int, totalLeft, blockSize int64, k, m uint64, err error) {
|
|
totalChunks, err = strconv.Atoi(donutObjectMetadata["sys.chunkCount"])
|
|
if err != nil {
|
|
return 0, 0, 0, 0, 0, iodine.New(err, nil)
|
|
}
|
|
totalLeft, err = strconv.ParseInt(donutObjectMetadata["sys.size"], 10, 64)
|
|
if err != nil {
|
|
return 0, 0, 0, 0, 0, iodine.New(err, nil)
|
|
}
|
|
blockSize, err = strconv.ParseInt(donutObjectMetadata["sys.blockSize"], 10, 64)
|
|
if err != nil {
|
|
return 0, 0, 0, 0, 0, iodine.New(err, nil)
|
|
}
|
|
k, err = strconv.ParseUint(donutObjectMetadata["sys.erasureK"], 10, 8)
|
|
if err != nil {
|
|
return 0, 0, 0, 0, 0, iodine.New(err, nil)
|
|
}
|
|
m, err = strconv.ParseUint(donutObjectMetadata["sys.erasureM"], 10, 8)
|
|
if err != nil {
|
|
return 0, 0, 0, 0, 0, iodine.New(err, nil)
|
|
}
|
|
return totalChunks, totalLeft, blockSize, k, m, nil
|
|
}
|
|
|
|
// getDiskReaders -
|
|
func (b bucket) getDiskReaders(objectName, objectMeta string) ([]io.ReadCloser, error) {
|
|
var readers []io.ReadCloser
|
|
nodeSlice := 0
|
|
for _, node := range b.nodes {
|
|
disks, err := node.ListDisks()
|
|
if err != nil {
|
|
return nil, iodine.New(err, nil)
|
|
}
|
|
readers = make([]io.ReadCloser, len(disks))
|
|
for _, disk := range disks {
|
|
bucketSlice := fmt.Sprintf("%s$%d$%d", b.name, nodeSlice, disk.GetOrder())
|
|
objectPath := path.Join(b.donutName, bucketSlice, objectName, objectMeta)
|
|
objectSlice, err := disk.OpenFile(objectPath)
|
|
if err != nil {
|
|
return nil, iodine.New(err, nil)
|
|
}
|
|
readers[disk.GetOrder()] = objectSlice
|
|
}
|
|
nodeSlice = nodeSlice + 1
|
|
}
|
|
return readers, nil
|
|
}
|
|
|
|
// getDiskWriters -
|
|
func (b bucket) getDiskWriters(objectName, objectMeta string) ([]io.WriteCloser, error) {
|
|
var writers []io.WriteCloser
|
|
nodeSlice := 0
|
|
for _, node := range b.nodes {
|
|
disks, err := node.ListDisks()
|
|
if err != nil {
|
|
return nil, iodine.New(err, nil)
|
|
}
|
|
writers = make([]io.WriteCloser, len(disks))
|
|
for _, disk := range disks {
|
|
bucketSlice := fmt.Sprintf("%s$%d$%d", b.name, nodeSlice, disk.GetOrder())
|
|
objectPath := path.Join(b.donutName, bucketSlice, objectName, objectMeta)
|
|
objectSlice, err := disk.MakeFile(objectPath)
|
|
if err != nil {
|
|
return nil, iodine.New(err, nil)
|
|
}
|
|
writers[disk.GetOrder()] = objectSlice
|
|
}
|
|
nodeSlice = nodeSlice + 1
|
|
}
|
|
return writers, nil
|
|
}
|