XL: Re-align the code again.

This commit is contained in:
Harshavardhana 2016-06-02 01:49:46 -07:00
parent ae311aa53b
commit de21126f7e
7 changed files with 126 additions and 132 deletions

View File

@ -25,32 +25,9 @@ import (
"github.com/klauspost/reedsolomon" "github.com/klauspost/reedsolomon"
) )
// encodeData - encodes incoming data buffer into // erasureCreateFile - writes an entire stream by erasure coding to
// dataBlocks+parityBlocks returns a 2 dimensional byte array. // all the disks, writes also calculate individual block's checksum
func encodeData(dataBuffer []byte, dataBlocks, parityBlocks int) ([][]byte, error) { // for future bit-rot protection.
rs, err := reedsolomon.New(dataBlocks, parityBlocks)
if err != nil {
return nil, err
}
// Split the input buffer into data and parity blocks.
var blocks [][]byte
blocks, err = rs.Split(dataBuffer)
if err != nil {
return nil, err
}
// Encode parity blocks using data blocks.
err = rs.Encode(blocks)
if err != nil {
return nil, err
}
// Return encoded blocks.
return blocks, nil
}
// erasureCreateFile - take a data stream, reads until io.EOF erasure
// code and writes to all the disks.
func erasureCreateFile(disks []StorageAPI, volume string, path string, partName string, data io.Reader, eInfos []erasureInfo) (newEInfos []erasureInfo, err error) { func erasureCreateFile(disks []StorageAPI, volume string, path string, partName string, data io.Reader, eInfos []erasureInfo) (newEInfos []erasureInfo, err error) {
// Allocated blockSized buffer for reading. // Allocated blockSized buffer for reading.
buf := make([]byte, blockSizeV1) buf := make([]byte, blockSizeV1)
@ -104,6 +81,30 @@ func erasureCreateFile(disks []StorageAPI, volume string, path string, partName
return newEInfos, nil return newEInfos, nil
} }
// encodeData - encodes incoming data buffer into
// dataBlocks+parityBlocks returns a 2 dimensional byte array.
func encodeData(dataBuffer []byte, dataBlocks, parityBlocks int) ([][]byte, error) {
rs, err := reedsolomon.New(dataBlocks, parityBlocks)
if err != nil {
return nil, err
}
// Split the input buffer into data and parity blocks.
var blocks [][]byte
blocks, err = rs.Split(dataBuffer)
if err != nil {
return nil, err
}
// Encode parity blocks using data blocks.
err = rs.Encode(blocks)
if err != nil {
return nil, err
}
// Return encoded blocks.
return blocks, nil
}
// appendFile - append data buffer at path. // appendFile - append data buffer at path.
func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, distribution []int, hashWriters []hash.Hash) (err error) { func appendFile(disks []StorageAPI, volume, path string, enBlocks [][]byte, distribution []int, hashWriters []hash.Hash) (err error) {
var wg = &sync.WaitGroup{} var wg = &sync.WaitGroup{}

View File

@ -23,81 +23,11 @@ import (
"github.com/klauspost/reedsolomon" "github.com/klauspost/reedsolomon"
) )
// PartObjectChecksum - returns the checksum for the part name from the checksum slice. // erasureReadFile - read an entire erasure coded file at into a byte
func (e erasureInfo) PartObjectChecksum(partName string) checkSumInfo { // array. Erasure coded parts are often few mega bytes in size and it
for _, checksum := range e.Checksum { // is convenient to return them as byte slice. This function also
if checksum.Name == partName { // supports bit-rot detection by verifying checksum of individual
return checksum // block's checksum.
}
}
return checkSumInfo{}
}
// xlMetaPartBlockChecksums - get block checksums for a given part.
func metaPartBlockChecksums(disks []StorageAPI, eInfos []erasureInfo, partName string) (blockCheckSums []checkSumInfo) {
for index := range disks {
// Save the read checksums for a given part.
blockCheckSums = append(blockCheckSums, eInfos[index].PartObjectChecksum(partName))
}
return blockCheckSums
}
// Takes block index and block distribution to get the disk index.
func toDiskIndex(blockIdx int, distribution []int) (diskIndex int) {
diskIndex = -1
// Find out the right disk index for the input block index.
for index, blockIndex := range distribution {
if blockIndex == blockIdx {
diskIndex = index
}
}
return diskIndex
}
// isValidBlock - calculates the checksum hash for the block and
// validates if its correct returns true for valid cases, false otherwise.
func isValidBlock(disks []StorageAPI, volume, path string, diskIndex int, blockCheckSums []checkSumInfo) bool {
// Unknown block index requested, treat it as error.
if diskIndex == -1 {
return false
}
// Disk is not present, treat entire block to be non existent.
if disks[diskIndex] == nil {
return false
}
// Read everything for a given block and calculate hash.
hashWriter := newHash(blockCheckSums[diskIndex].Algorithm)
hashBytes, err := hashSum(disks[diskIndex], volume, path, hashWriter)
if err != nil {
return false
}
return hex.EncodeToString(hashBytes) == blockCheckSums[diskIndex].Hash
}
// decodeData - decode encoded blocks.
func decodeData(enBlocks [][]byte, dataBlocks, parityBlocks int) error {
rs, err := reedsolomon.New(dataBlocks, parityBlocks)
if err != nil {
return err
}
err = rs.Reconstruct(enBlocks)
if err != nil {
return err
}
// Verify reconstructed blocks (parity).
ok, err := rs.Verify(enBlocks)
if err != nil {
return err
}
if !ok {
// Blocks cannot be reconstructed, corrupted data.
err = errors.New("Verification failed after reconstruction, data likely corrupted.")
return err
}
return nil
}
// ReadFile - decoded erasure coded file.
func erasureReadFile(disks []StorageAPI, volume string, path string, partName string, size int64, eInfos []erasureInfo) ([]byte, error) { func erasureReadFile(disks []StorageAPI, volume string, path string, partName string, size int64, eInfos []erasureInfo) ([]byte, error) {
// Return data buffer. // Return data buffer.
var buffer []byte var buffer []byte
@ -195,3 +125,77 @@ func erasureReadFile(disks []StorageAPI, volume string, path string, partName st
} }
return buffer, nil return buffer, nil
} }
// PartObjectChecksum - returns the checksum for the part name from the checksum slice.
func (e erasureInfo) PartObjectChecksum(partName string) checkSumInfo {
for _, checksum := range e.Checksum {
if checksum.Name == partName {
return checksum
}
}
return checkSumInfo{}
}
// xlMetaPartBlockChecksums - get block checksums for a given part.
func metaPartBlockChecksums(disks []StorageAPI, eInfos []erasureInfo, partName string) (blockCheckSums []checkSumInfo) {
for index := range disks {
// Save the read checksums for a given part.
blockCheckSums = append(blockCheckSums, eInfos[index].PartObjectChecksum(partName))
}
return blockCheckSums
}
// Takes block index and block distribution to get the disk index.
func toDiskIndex(blockIdx int, distribution []int) (diskIndex int) {
diskIndex = -1
// Find out the right disk index for the input block index.
for index, blockIndex := range distribution {
if blockIndex == blockIdx {
diskIndex = index
}
}
return diskIndex
}
// isValidBlock - calculates the checksum hash for the block and
// validates if its correct returns true for valid cases, false otherwise.
func isValidBlock(disks []StorageAPI, volume, path string, diskIndex int, blockCheckSums []checkSumInfo) bool {
// Unknown block index requested, treat it as error.
if diskIndex == -1 {
return false
}
// Disk is not present, treat entire block to be non existent.
if disks[diskIndex] == nil {
return false
}
// Read everything for a given block and calculate hash.
hashWriter := newHash(blockCheckSums[diskIndex].Algorithm)
hashBytes, err := hashSum(disks[diskIndex], volume, path, hashWriter)
if err != nil {
return false
}
return hex.EncodeToString(hashBytes) == blockCheckSums[diskIndex].Hash
}
// decodeData - decode encoded blocks.
func decodeData(enBlocks [][]byte, dataBlocks, parityBlocks int) error {
rs, err := reedsolomon.New(dataBlocks, parityBlocks)
if err != nil {
return err
}
err = rs.Reconstruct(enBlocks)
if err != nil {
return err
}
// Verify reconstructed blocks (parity).
ok, err := rs.Verify(enBlocks)
if err != nil {
return err
}
if !ok {
// Blocks cannot be reconstructed, corrupted data.
err = errors.New("Verification failed after reconstruction, data likely corrupted.")
return err
}
return nil
}

View File

@ -41,20 +41,9 @@ type fsObjects struct {
// newFSObjects - initialize new fs object layer. // newFSObjects - initialize new fs object layer.
func newFSObjects(disk string) (ObjectLayer, error) { func newFSObjects(disk string) (ObjectLayer, error) {
var storage StorageAPI storage, err := newStorageAPI(disk)
var err error if err != nil {
if !strings.ContainsRune(disk, ':') || filepath.VolumeName(disk) != "" { return nil, err
// Initialize filesystem storage API.
storage, err = newPosix(disk)
if err != nil {
return nil, err
}
} else {
// Initialize rpc client storage API.
storage, err = newRPCClient(disk)
if err != nil {
return nil, err
}
} }
// Runs house keeping code, like creating minioMetaBucket, cleaning up tmp files etc. // Runs house keeping code, like creating minioMetaBucket, cleaning up tmp files etc.

View File

@ -118,8 +118,8 @@ func registerApp() *cli.App {
app := cli.NewApp() app := cli.NewApp()
app.Name = "Minio" app.Name = "Minio"
app.Author = "Minio.io" app.Author = "Minio.io"
app.Usage = "Distributed Object Storage Server for Micro Services." app.Usage = "Cloud Storage Server."
app.Description = `Micro services environment provisions one Minio server per application instance. Scalability is achieved through large number of smaller personalized instances. This version of the Minio binary is built using Filesystem storage backend for magnetic and solid state disks.` app.Description = `Minio is an Amazon S3 compatible object storage server. Use it to store photos, videos, VMs, containers, log files, or any blob of data as objects.`
app.Flags = append(minioFlags, globalFlags...) app.Flags = append(minioFlags, globalFlags...)
app.Commands = commands app.Commands = commands
app.CustomAppHelpTemplate = minioHelpTemplate app.CustomAppHelpTemplate = minioHelpTemplate

View File

@ -17,6 +17,7 @@
package main package main
import ( import (
"path/filepath"
"strings" "strings"
"sync" "sync"
) )
@ -43,6 +44,16 @@ func fsHouseKeeping(storageDisk StorageAPI) error {
return nil return nil
} }
// Depending on the disk type network or local, initialize storage API.
func newStorageAPI(disk string) (storage StorageAPI, err error) {
if !strings.ContainsRune(disk, ':') || filepath.VolumeName(disk) != "" {
// Initialize filesystem storage API.
return newPosix(disk)
}
// Initialize rpc client storage API.
return newRPCClient(disk)
}
// House keeping code needed for XL. // House keeping code needed for XL.
func xlHouseKeeping(storageDisks []StorageAPI) error { func xlHouseKeeping(storageDisks []StorageAPI) error {
// This happens for the first time, but keep this here since this // This happens for the first time, but keep this here since this

View File

@ -34,7 +34,7 @@ import (
var serverCmd = cli.Command{ var serverCmd = cli.Command{
Name: "server", Name: "server",
Usage: "Start Minio cloud storage server.", Usage: "Start object storage server.",
Flags: []cli.Flag{ Flags: []cli.Flag{
cli.StringFlag{ cli.StringFlag{
Name: "address", Name: "address",
@ -65,9 +65,10 @@ EXAMPLES:
3. Start minio server on Windows. 3. Start minio server on Windows.
$ minio {{.Name}} C:\MyShare $ minio {{.Name}} C:\MyShare
4. Start minio server 8 disks to enable erasure coded layer with 4 data and 4 parity. 4. Start minio server 12 disks to enable erasure coded layer with 6 data and 6 parity.
$ minio {{.Name}} /mnt/export1/backend /mnt/export2/backend /mnt/export3/backend /mnt/export4/backend \ $ minio {{.Name}} /mnt/export1/backend /mnt/export2/backend /mnt/export3/backend /mnt/export4/backend \
/mnt/export5/backend /mnt/export6/backend /mnt/export7/backend /mnt/export8/backend /mnt/export5/backend /mnt/export6/backend /mnt/export7/backend /mnt/export8/backend /mnt/export9/backend \
/mnt/export10/backend /mnt/export11/backend /mnt/export12/backend
`, `,
} }

View File

@ -19,9 +19,7 @@ package main
import ( import (
"errors" "errors"
"fmt" "fmt"
"path/filepath"
"sort" "sort"
"strings"
"sync" "sync"
"github.com/minio/minio/pkg/disk" "github.com/minio/minio/pkg/disk"
@ -92,16 +90,6 @@ func checkSufficientDisks(disks []string) error {
return nil return nil
} }
// Depending on the disk type network or local, initialize storage API.
func newStorageAPI(disk string) (storage StorageAPI, err error) {
if !strings.ContainsRune(disk, ':') || filepath.VolumeName(disk) != "" {
// Initialize filesystem storage API.
return newPosix(disk)
}
// Initialize rpc client storage API.
return newRPCClient(disk)
}
// newXLObjects - initialize new xl object layer. // newXLObjects - initialize new xl object layer.
func newXLObjects(disks []string) (ObjectLayer, error) { func newXLObjects(disks []string) (ObjectLayer, error) {
// Validate if input disks are sufficient. // Validate if input disks are sufficient.