mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
Implement XL layer - preliminary work.
This commit is contained in:
parent
bf8a9702a4
commit
a98a7fb1ad
5
Makefile
5
Makefile
@ -71,6 +71,7 @@ getdeps: checks
|
||||
@go get -u github.com/fzipp/gocyclo && echo "Installed gocyclo:"
|
||||
@go get -u github.com/remyoudompheng/go-misc/deadcode && echo "Installed deadcode:"
|
||||
@go get -u github.com/client9/misspell/cmd/misspell && echo "Installed misspell:"
|
||||
@go get -u github.com/gordonklaus/ineffassign && echo "Installed ineffassign:"
|
||||
|
||||
verifiers: vet fmt lint cyclo spelling
|
||||
|
||||
@ -91,6 +92,10 @@ lint:
|
||||
@GO15VENDOREXPERIMENT=1 ${GOPATH}/bin/golint *.go
|
||||
@GO15VENDOREXPERIMENT=1 ${GOPATH}/bin/golint github.com/minio/minio/pkg...
|
||||
|
||||
ineffassign:
|
||||
@echo "Running $@:"
|
||||
@GO15VENDOREXPERIMENT=1 ${GOPATH}/bin/ineffassign .
|
||||
|
||||
cyclo:
|
||||
@echo "Running $@:"
|
||||
@GO15VENDOREXPERIMENT=1 ${GOPATH}/bin/gocyclo -over 65 *.go
|
||||
|
9
fs.go
9
fs.go
@ -359,9 +359,7 @@ func (s fsStorage) DeleteVol(volume string) error {
|
||||
if os.IsNotExist(err) {
|
||||
return errVolumeNotFound
|
||||
} else if strings.Contains(err.Error(), "directory is not empty") {
|
||||
// On windows the string is
|
||||
// slightly different, handle it
|
||||
// here.
|
||||
// On windows the string is slightly different, handle it here.
|
||||
return errVolumeNotEmpty
|
||||
} else if strings.Contains(err.Error(), "directory not empty") {
|
||||
// Hopefully for all other
|
||||
@ -371,7 +369,7 @@ func (s fsStorage) DeleteVol(volume string) error {
|
||||
}
|
||||
return err
|
||||
}
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
// Save the goroutine reference in the map
|
||||
@ -466,7 +464,8 @@ func (s fsStorage) ListFiles(volume, prefix, marker string, recursive bool, coun
|
||||
// Verify if prefix exists.
|
||||
prefixDir := filepath.Dir(filepath.FromSlash(prefix))
|
||||
prefixRootDir := filepath.Join(volumeDir, prefixDir)
|
||||
if status, err := isDirExist(prefixRootDir); !status {
|
||||
var status bool
|
||||
if status, err = isDirExist(prefixRootDir); !status {
|
||||
if err == nil {
|
||||
// Prefix does not exist, not an error just respond empty list response.
|
||||
return nil, true, nil
|
||||
|
@ -515,6 +515,7 @@ func (o objectAPI) CompleteMultipartUpload(bucket string, object string, uploadI
|
||||
} else if !status {
|
||||
return "", probe.NewError(InvalidUploadID{UploadID: uploadID})
|
||||
}
|
||||
|
||||
fileWriter, e := o.storage.CreateFile(bucket, object)
|
||||
if e != nil {
|
||||
if e == errVolumeNotFound {
|
||||
@ -534,6 +535,7 @@ func (o objectAPI) CompleteMultipartUpload(bucket string, object string, uploadI
|
||||
|
||||
var md5Sums []string
|
||||
for _, part := range parts {
|
||||
// Construct part suffix.
|
||||
partSuffix := fmt.Sprintf("%s.%d.%s", uploadID, part.PartNumber, part.ETag)
|
||||
var fileReader io.ReadCloser
|
||||
fileReader, e = o.storage.ReadFile(minioMetaVolume, path.Join(bucket, object, partSuffix), 0)
|
||||
@ -553,6 +555,7 @@ func (o objectAPI) CompleteMultipartUpload(bucket string, object string, uploadI
|
||||
}
|
||||
md5Sums = append(md5Sums, part.ETag)
|
||||
}
|
||||
|
||||
e = fileWriter.Close()
|
||||
if e != nil {
|
||||
return "", probe.NewError(e)
|
||||
@ -567,6 +570,7 @@ func (o objectAPI) CompleteMultipartUpload(bucket string, object string, uploadI
|
||||
// Cleanup all the parts.
|
||||
o.removeMultipartUpload(bucket, object, uploadID)
|
||||
|
||||
// Return md5sum.
|
||||
return s3MD5, nil
|
||||
}
|
||||
|
||||
@ -578,6 +582,10 @@ func (o objectAPI) removeMultipartUpload(bucket, object, uploadID string) *probe
|
||||
if !IsValidObjectName(object) {
|
||||
return probe.NewError(ObjectNameInvalid{Bucket: bucket, Object: object})
|
||||
}
|
||||
if _, e := o.storage.StatVol(minioMetaVolume); e != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
marker := ""
|
||||
for {
|
||||
uploadIDPath := path.Join(bucket, object, uploadID)
|
||||
|
@ -237,7 +237,9 @@ func (o objectAPI) PutObject(bucket string, object string, size int64, data io.R
|
||||
// Instantiate checksum hashers and create a multiwriter.
|
||||
if size > 0 {
|
||||
if _, e = io.CopyN(multiWriter, data, size); e != nil {
|
||||
safeCloseAndRemove(fileWriter)
|
||||
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
|
||||
return "", probe.NewError(clErr)
|
||||
}
|
||||
if e == io.ErrUnexpectedEOF {
|
||||
return "", probe.NewError(IncompleteBody{})
|
||||
}
|
||||
@ -245,7 +247,9 @@ func (o objectAPI) PutObject(bucket string, object string, size int64, data io.R
|
||||
}
|
||||
} else {
|
||||
if _, e = io.Copy(multiWriter, data); e != nil {
|
||||
safeCloseAndRemove(fileWriter)
|
||||
if clErr := safeCloseAndRemove(fileWriter); clErr != nil {
|
||||
return "", probe.NewError(clErr)
|
||||
}
|
||||
return "", probe.NewError(e)
|
||||
}
|
||||
}
|
||||
@ -258,7 +262,9 @@ func (o objectAPI) PutObject(bucket string, object string, size int64, data io.R
|
||||
}
|
||||
if md5Hex != "" {
|
||||
if newMD5Hex != md5Hex {
|
||||
safeCloseAndRemove(fileWriter)
|
||||
if e = safeCloseAndRemove(fileWriter); e != nil {
|
||||
return "", probe.NewError(e)
|
||||
}
|
||||
return "", probe.NewError(BadDigest{md5Hex, newMD5Hex})
|
||||
}
|
||||
}
|
||||
@ -267,7 +273,7 @@ func (o objectAPI) PutObject(bucket string, object string, size int64, data io.R
|
||||
return "", probe.NewError(e)
|
||||
}
|
||||
|
||||
// Return md5sum.
|
||||
// Return md5sum, successfully wrote object.
|
||||
return newMD5Hex, nil
|
||||
}
|
||||
|
||||
@ -282,6 +288,8 @@ func (o objectAPI) DeleteObject(bucket, object string) *probe.Error {
|
||||
if e := o.storage.DeleteFile(bucket, object); e != nil {
|
||||
if e == errVolumeNotFound {
|
||||
return probe.NewError(BucketNotFound{Bucket: bucket})
|
||||
} else if e == errFileNotFound {
|
||||
return probe.NewError(ObjectNotFound{Bucket: bucket})
|
||||
}
|
||||
if e == errFileNotFound {
|
||||
return probe.NewError(ObjectNotFound{
|
||||
@ -345,7 +353,7 @@ func (o objectAPI) ListObjects(bucket, prefix, marker, delimiter string, maxKeys
|
||||
Name: fileInfo.Name,
|
||||
ModTime: fileInfo.ModTime,
|
||||
Size: fileInfo.Size,
|
||||
IsDir: fileInfo.Mode.IsDir(),
|
||||
IsDir: false,
|
||||
})
|
||||
}
|
||||
return result, nil
|
||||
|
@ -1056,8 +1056,6 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
|
||||
writeErrorResponse(w, r, ErrNoSuchUpload, r.URL.Path)
|
||||
case InvalidPart:
|
||||
writeErrorResponse(w, r, ErrInvalidPart, r.URL.Path)
|
||||
case InvalidPartOrder:
|
||||
writeErrorResponse(w, r, ErrInvalidPartOrder, r.URL.Path)
|
||||
case IncompleteBody:
|
||||
writeErrorResponse(w, r, ErrIncompleteBody, r.URL.Path)
|
||||
default:
|
||||
|
@ -20,11 +20,20 @@
|
||||
package safe
|
||||
|
||||
import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
// Vault - vault is an interface for different implementations of safe
|
||||
// i/o semantics.
|
||||
type Vault interface {
|
||||
io.ReadWriteCloser
|
||||
SyncClose() error
|
||||
CloseAndRemove() error
|
||||
}
|
||||
|
||||
// File provides for safe file writes.
|
||||
type File struct {
|
||||
*os.File
|
||||
@ -37,6 +46,7 @@ func (f *File) SyncClose() error {
|
||||
if err := f.File.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
// Close the fd.
|
||||
if err := f.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -45,11 +55,11 @@ func (f *File) SyncClose() error {
|
||||
|
||||
// Close the file, returns an error if any
|
||||
func (f *File) Close() error {
|
||||
// close the embedded fd
|
||||
// Close the embedded fd.
|
||||
if err := f.File.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
// safe rename to final destination
|
||||
// Safe rename to final destination
|
||||
if err := os.Rename(f.Name(), f.file); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -63,6 +73,7 @@ func (f *File) CloseAndRemove() error {
|
||||
if err := f.File.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
// Remove the temp file.
|
||||
if err := os.Remove(f.Name()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -54,7 +54,7 @@ func (s *MySuite) TestSafe(c *C) {
|
||||
c.Assert(err, IsNil)
|
||||
}
|
||||
|
||||
func (s *MySuite) TestSafePurge(c *C) {
|
||||
func (s *MySuite) TestSafeRemove(c *C) {
|
||||
f, err := CreateFile(filepath.Join(s.root, "purgefile"))
|
||||
c.Assert(err, IsNil)
|
||||
_, err = os.Stat(filepath.Join(s.root, "purgefile"))
|
||||
|
12
routers.go
12
routers.go
@ -27,19 +27,25 @@ import (
|
||||
// configureServer handler returns final handler for the http server.
|
||||
func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler {
|
||||
var storageHandlers StorageAPI
|
||||
var e error
|
||||
if len(srvCmdConfig.exportPaths) == 1 {
|
||||
// Verify if export path is a local file system path.
|
||||
st, e := os.Stat(srvCmdConfig.exportPaths[0])
|
||||
var st os.FileInfo
|
||||
st, e = os.Stat(srvCmdConfig.exportPaths[0])
|
||||
if e == nil && st.Mode().IsDir() {
|
||||
// Initialize storage API.
|
||||
storageHandlers, e = newFS(srvCmdConfig.exportPaths[0])
|
||||
fatalIf(probe.NewError(e), "Initializing fs failed.", nil)
|
||||
} else {
|
||||
// Initialize storage API.
|
||||
// Initialize network storage API.
|
||||
storageHandlers, e = newNetworkFS(srvCmdConfig.exportPaths[0])
|
||||
fatalIf(probe.NewError(e), "Initializing network fs failed.", nil)
|
||||
}
|
||||
} // else if - XL part.
|
||||
} else {
|
||||
// Initialize XL storage API.
|
||||
storageHandlers, e = newXL(srvCmdConfig.exportPaths...)
|
||||
fatalIf(probe.NewError(e), "Initializing XL failed.", nil)
|
||||
}
|
||||
|
||||
// Initialize object layer.
|
||||
objectAPI := newObjectLayer(storageHandlers)
|
||||
|
@ -66,6 +66,10 @@ EXAMPLES:
|
||||
|
||||
3. Start minio server on Windows.
|
||||
$ minio {{.Name}} C:\MyShare
|
||||
|
||||
4. Start minio server 8 disks to enable erasure coded layer with 4 data and 4 parity.
|
||||
$ minio {{.Name}} /mnt/export1/backend /mnt/export2/backend /mnt/export3/backend /mnt/export4/backend \
|
||||
/mnt/export5/backend /mnt/export6/backend /mnt/export7/backend /mnt/export8/backend
|
||||
`,
|
||||
}
|
||||
|
||||
@ -161,9 +165,6 @@ func checkServerSyntax(c *cli.Context) {
|
||||
if !c.Args().Present() && c.Args().First() == "help" {
|
||||
cli.ShowCommandHelpAndExit(c, "server", 1)
|
||||
}
|
||||
if len(c.Args()) > 2 {
|
||||
fatalIf(probe.NewError(errInvalidArgument), "Unnecessary arguments passed. Please refer ‘minio server --help’.", nil)
|
||||
}
|
||||
}
|
||||
|
||||
// Extract port number from address address should be of the form host:port.
|
||||
|
@ -34,6 +34,7 @@ type VolInfo struct {
|
||||
type FileInfo struct {
|
||||
Volume string
|
||||
Name string
|
||||
MD5Sum string
|
||||
ModTime time.Time
|
||||
Size int64
|
||||
Mode os.FileMode
|
||||
|
23
vendor/github.com/klauspost/reedsolomon/LICENSE
generated
vendored
Normal file
23
vendor/github.com/klauspost/reedsolomon/LICENSE
generated
vendored
Normal file
@ -0,0 +1,23 @@
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2015 Klaus Post
|
||||
Copyright (c) 2015 Backblaze
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
|
198
vendor/github.com/klauspost/reedsolomon/README.md
generated
vendored
Normal file
198
vendor/github.com/klauspost/reedsolomon/README.md
generated
vendored
Normal file
@ -0,0 +1,198 @@
|
||||
# Reed-Solomon
|
||||
[![GoDoc][1]][2] [![Build Status][3]][4]
|
||||
|
||||
[1]: https://godoc.org/github.com/klauspost/reedsolomon?status.svg
|
||||
[2]: https://godoc.org/github.com/klauspost/reedsolomon
|
||||
[3]: https://travis-ci.org/klauspost/reedsolomon.svg?branch=master
|
||||
[4]: https://travis-ci.org/klauspost/reedsolomon
|
||||
|
||||
Reed-Solomon Erasure Coding in Go, with speeds exceeding 1GB/s/cpu core implemented in pure Go.
|
||||
|
||||
This is a golang port of the [JavaReedSolomon](https://github.com/Backblaze/JavaReedSolomon) library released by [Backblaze](http://backblaze.com), with some additional optimizations.
|
||||
|
||||
For an introduction on erasure coding, see the post on the [Backblaze blog](https://www.backblaze.com/blog/reed-solomon/).
|
||||
|
||||
Package home: https://github.com/klauspost/reedsolomon
|
||||
|
||||
Godoc: https://godoc.org/github.com/klauspost/reedsolomon
|
||||
|
||||
# Installation
|
||||
To get the package use the standard:
|
||||
```bash
|
||||
go get github.com/klauspost/reedsolomon
|
||||
```
|
||||
|
||||
# Usage
|
||||
|
||||
This section assumes you know the basics of Reed-Solomon encoding. A good start is this [Backblaze blog post](https://www.backblaze.com/blog/reed-solomon/).
|
||||
|
||||
This package performs the calculation of the parity sets. The usage is therefore relatively simple.
|
||||
|
||||
First of all, you need to choose your distribution of data and parity shards. A 'good' distribution is very subjective, and will depend a lot on your usage scenario. A good starting point is above 5 and below 257 data shards (the maximum supported number), and the number of parity shards to be 2 or above, and below the number of data shards.
|
||||
|
||||
To create an encoder with 10 data shards (where your data goes) and 3 parity shards (calculated):
|
||||
```Go
|
||||
enc, err := reedsolomon.New(10, 3)
|
||||
```
|
||||
This encoder will work for all parity sets with this distribution of data and parity shards. The error will only be set if you specify 0 or negative values in any of the parameters, or if you specify more than 256 data shards.
|
||||
|
||||
The you send and receive data is a simple slice of byte slices; `[][]byte`. In the example above, the top slice must have a length of 13.
|
||||
```Go
|
||||
data := make([][]byte, 13)
|
||||
```
|
||||
You should then fill the 10 first slices with *equally sized* data, and create parity shards that will be populated with parity data. In this case we create the data in memory, but you could for instance also use [mmap](https://github.com/edsrzf/mmap-go) to map files.
|
||||
|
||||
```Go
|
||||
// Create all shards, size them at 50000 each
|
||||
for i := range input {
|
||||
data[i] := make([]byte, 50000)
|
||||
}
|
||||
|
||||
|
||||
// Fill some data into the data shards
|
||||
for i, in := range data[:10] {
|
||||
for j:= range in {
|
||||
in[j] = byte((i+j)&0xff)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
To populate the parity shards, you simply call `Encode()` with your data.
|
||||
```Go
|
||||
err = enc.Encode(data)
|
||||
```
|
||||
The only cases where you should get an error is, if the data shards aren't of equal size. The last 3 shards now contain parity data. You can verify this by calling `Verify()`:
|
||||
|
||||
```Go
|
||||
ok, err = enc.Verify(data)
|
||||
```
|
||||
|
||||
The final (and important) part is to be able to reconstruct missing shards. For this to work, you need to know which parts of your data is missing. The encoder *does not know which parts are invalid*, so if data corruption is a likely scenario, you need to implement a hash check for each shard. If a byte has changed in your set, and you don't know which it is, there is no way to reconstruct the data set.
|
||||
|
||||
To indicate missing data, you set the shard to nil before calling `Reconstruct()`:
|
||||
|
||||
```Go
|
||||
// Delete two data shards
|
||||
data[3] = nil
|
||||
data[7] = nil
|
||||
|
||||
// Reconstruct the missing shards
|
||||
err := enc.Reconstruct(data)
|
||||
```
|
||||
The missing data and parity shards will be recreated. If more than 3 shards are missing, the reconstruction will fail.
|
||||
|
||||
So to sum up reconstruction:
|
||||
* The number of data/parity shards must match the numbers used for encoding.
|
||||
* The order of shards must be the same as used when encoding.
|
||||
* You may only supply data you know is valid.
|
||||
* Invalid shards should be set to nil.
|
||||
|
||||
For complete examples of an encoder and decoder see the [examples folder](https://github.com/klauspost/reedsolomon/tree/master/examples).
|
||||
|
||||
# Splitting/Joining Data
|
||||
|
||||
You might have a large slice of data. To help you split this, there are some helper functions that can split and join a single byte slice.
|
||||
|
||||
```Go
|
||||
bigfile, _ := ioutil.Readfile("myfile.data")
|
||||
|
||||
// Split the file
|
||||
split, err := enc.Split(bigfile)
|
||||
```
|
||||
This will split the file into the number of data shards set when creating the encoder and create empty parity shards.
|
||||
|
||||
An important thing to note is that you have to *keep track of the exact input size*. If the size of the input isn't diviable by the number of data shards, extra zeros will be inserted in the last shard.
|
||||
|
||||
To join a data set, use the `Join()` function, which will join the shards and write it to the `io.Writer` you supply:
|
||||
```Go
|
||||
// Join a data set and write it to io.Discard.
|
||||
err = enc.Join(io.Discard, data, len(bigfile))
|
||||
```
|
||||
|
||||
# Streaming/Merging
|
||||
|
||||
It might seem like a limitation that all data should be in memory, but an important property is that *as long as the number of data/parity shards are the same, you can merge/split data sets*, and they will remain valid as a separate set.
|
||||
|
||||
```Go
|
||||
// Split the data set of 50000 elements into two of 25000
|
||||
splitA := make([][]byte, 13)
|
||||
splitB := make([][]byte, 13)
|
||||
|
||||
// Merge into a 100000 element set
|
||||
merged := make([][]byte, 13)
|
||||
|
||||
for i := range data {
|
||||
splitA[i] = data[i][:25000]
|
||||
splitB[i] = data[i][25000:]
|
||||
|
||||
// Concencate it to itself
|
||||
merged[i] = append(make([]byte, 0, len(data[i])*2), data[i]...)
|
||||
merged[i] = append(merged[i], data[i]...)
|
||||
}
|
||||
|
||||
// Each part should still verify as ok.
|
||||
ok, err := enc.Verify(splitA)
|
||||
if ok && err == nil {
|
||||
log.Println("splitA ok")
|
||||
}
|
||||
|
||||
ok, err = enc.Verify(splitB)
|
||||
if ok && err == nil {
|
||||
log.Println("splitB ok")
|
||||
}
|
||||
|
||||
ok, err = enc.Verify(merge)
|
||||
if ok && err == nil {
|
||||
log.Println("merge ok")
|
||||
}
|
||||
```
|
||||
|
||||
This means that if you have a data set that may not fit into memory, you can split processing into smaller blocks. For the best throughput, don't use too small blocks.
|
||||
|
||||
This also means that you can divide big input up into smaller blocks, and do reconstruction on parts of your data. This doesn't give the same flexibility of a higher number of data shards, but it will be much more performant.
|
||||
|
||||
# Streaming API
|
||||
|
||||
There has been added a fully streaming API, to help perform fully streaming operations, which enables you to do the same operations, but on streams. To use the stream API, use [`NewStream`](https://godoc.org/github.com/klauspost/reedsolomon#NewStream) function to create the encoding/decoding interfaces. You can use [`NewStreamC`](https://godoc.org/github.com/klauspost/reedsolomon#NewStreamC) to ready an interface that reads/writes concurrently from the streams.
|
||||
|
||||
Input is delivered as `[]io.Reader`, output as `[]io.Writer`, and functionality corresponds to the in-memory API. Each stream must supply the same amount of data, similar to how each slice must be similar size with the in-memory API.
|
||||
If an error occurs in relation to a stream, a [`StreamReadError`](https://godoc.org/github.com/klauspost/reedsolomon#StreamReadError) or [`StreamWriteError`](https://godoc.org/github.com/klauspost/reedsolomon#StreamWriteError) will help you determine which stream was the offender.
|
||||
|
||||
There is no buffering or timeouts/retry specified. If you want to add that, you need to add it to the Reader/Writer.
|
||||
|
||||
For complete examples of a streaming encoder and decoder see the [examples folder](https://github.com/klauspost/reedsolomon/tree/master/examples).
|
||||
|
||||
|
||||
# Performance
|
||||
Performance depends mainly on the number of parity shards. In rough terms, doubling the number of parity shards will double the encoding time.
|
||||
|
||||
Here are the throughput numbers with some different selections of data and parity shards. For reference each shard is 1MB random data, and 2 CPU cores are used for encoding.
|
||||
|
||||
| Data | Parity | Parity | MB/s | SSSE3 MB/s | SSSE3 Speed | Rel. Speed |
|
||||
|------|--------|--------|--------|-------------|-------------|------------|
|
||||
| 5 | 2 | 40% | 576,11 | 2599,2 | 451% | 100,00% |
|
||||
| 10 | 2 | 20% | 587,73 | 3100,28 | 528% | 102,02% |
|
||||
| 10 | 4 | 40% | 298,38 | 2470,97 | 828% | 51,79% |
|
||||
| 50 | 20 | 40% | 59,81 | 713,28 | 1193% | 10,38% |
|
||||
|
||||
If `runtime.GOMAXPROCS()` is set to a value higher than 1, the encoder will use multiple goroutines to perform the calculations in `Verify`, `Encode` and `Reconstruct`.
|
||||
|
||||
Example of performance scaling on Intel(R) Core(TM) i7-2600 CPU @ 3.40GHz - 4 physical cores, 8 logical cores. The example uses 10 blocks with 16MB data each and 4 parity blocks.
|
||||
|
||||
| Threads | MB/s | Speed |
|
||||
|---------|---------|-------|
|
||||
| 1 | 1355,11 | 100% |
|
||||
| 2 | 2339,78 | 172% |
|
||||
| 4 | 3179,33 | 235% |
|
||||
| 8 | 4346,18 | 321% |
|
||||
|
||||
# Links
|
||||
* [Backblaze Open Sources Reed-Solomon Erasure Coding Source Code](https://www.backblaze.com/blog/reed-solomon/).
|
||||
* [JavaReedSolomon](https://github.com/Backblaze/JavaReedSolomon). Compatible java library by Backblaze.
|
||||
* [go-erasure](https://github.com/somethingnew2-0/go-erasure). A similar library using cgo, slower in my tests.
|
||||
* [rsraid](https://github.com/goayame/rsraid). A similar library written in Go. Slower, but supports more shards.
|
||||
* [Screaming Fast Galois Field Arithmetic](http://www.snia.org/sites/default/files2/SDC2013/presentations/NewThinking/EthanMiller_Screaming_Fast_Galois_Field%20Arithmetic_SIMD%20Instructions.pdf). Basis for SSE3 optimizations.
|
||||
|
||||
# License
|
||||
|
||||
This code, as the original [JavaReedSolomon](https://github.com/Backblaze/JavaReedSolomon) is published under an MIT license. See LICENSE file for more information.
|
134
vendor/github.com/klauspost/reedsolomon/galois.go
generated
vendored
Normal file
134
vendor/github.com/klauspost/reedsolomon/galois.go
generated
vendored
Normal file
File diff suppressed because one or more lines are too long
77
vendor/github.com/klauspost/reedsolomon/galois_amd64.go
generated
vendored
Normal file
77
vendor/github.com/klauspost/reedsolomon/galois_amd64.go
generated
vendored
Normal file
@ -0,0 +1,77 @@
|
||||
//+build !noasm
|
||||
//+build !appengine
|
||||
|
||||
// Copyright 2015, Klaus Post, see LICENSE for details.
|
||||
|
||||
package reedsolomon
|
||||
|
||||
import (
|
||||
"github.com/klauspost/cpuid"
|
||||
)
|
||||
|
||||
//go:noescape
|
||||
func galMulSSSE3(low, high, in, out []byte)
|
||||
|
||||
//go:noescape
|
||||
func galMulSSSE3Xor(low, high, in, out []byte)
|
||||
|
||||
//go:noescape
|
||||
func galMulAVX2Xor(low, high, in, out []byte)
|
||||
|
||||
//go:noescape
|
||||
func galMulAVX2(low, high, in, out []byte)
|
||||
|
||||
// This is what the assembler rountes does in blocks of 16 bytes:
|
||||
/*
|
||||
func galMulSSSE3(low, high, in, out []byte) {
|
||||
for n, input := range in {
|
||||
l := input & 0xf
|
||||
h := input >> 4
|
||||
out[n] = low[l] ^ high[h]
|
||||
}
|
||||
}
|
||||
|
||||
func galMulSSSE3Xor(low, high, in, out []byte) {
|
||||
for n, input := range in {
|
||||
l := input & 0xf
|
||||
h := input >> 4
|
||||
out[n] ^= low[l] ^ high[h]
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
func galMulSlice(c byte, in, out []byte) {
|
||||
var done int
|
||||
if cpuid.CPU.AVX2() {
|
||||
galMulAVX2(mulTableLow[c][:], mulTableHigh[c][:], in, out)
|
||||
done = (len(in) >> 5) << 5
|
||||
} else if cpuid.CPU.SSSE3() {
|
||||
galMulSSSE3(mulTableLow[c][:], mulTableHigh[c][:], in, out)
|
||||
done = (len(in) >> 4) << 4
|
||||
}
|
||||
remain := len(in) - done
|
||||
if remain > 0 {
|
||||
mt := mulTable[c]
|
||||
for i := done; i < len(in); i++ {
|
||||
out[i] = mt[in[i]]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func galMulSliceXor(c byte, in, out []byte) {
|
||||
var done int
|
||||
if cpuid.CPU.AVX2() {
|
||||
galMulAVX2Xor(mulTableLow[c][:], mulTableHigh[c][:], in, out)
|
||||
done = (len(in) >> 5) << 5
|
||||
} else if cpuid.CPU.SSSE3() {
|
||||
galMulSSSE3Xor(mulTableLow[c][:], mulTableHigh[c][:], in, out)
|
||||
done = (len(in) >> 4) << 4
|
||||
}
|
||||
remain := len(in) - done
|
||||
if remain > 0 {
|
||||
mt := mulTable[c]
|
||||
for i := done; i < len(in); i++ {
|
||||
out[i] ^= mt[in[i]]
|
||||
}
|
||||
}
|
||||
}
|
183
vendor/github.com/klauspost/reedsolomon/galois_amd64.s
generated
vendored
Normal file
183
vendor/github.com/klauspost/reedsolomon/galois_amd64.s
generated
vendored
Normal file
@ -0,0 +1,183 @@
|
||||
//+build !noasm !appengine
|
||||
|
||||
// Copyright 2015, Klaus Post, see LICENSE for details.
|
||||
|
||||
// Based on http://www.snia.org/sites/default/files2/SDC2013/presentations/NewThinking/EthanMiller_Screaming_Fast_Galois_Field%20Arithmetic_SIMD%20Instructions.pdf
|
||||
// and http://jerasure.org/jerasure/gf-complete/tree/master
|
||||
|
||||
// func galMulSSSE3Xor(low, high, in, out []byte)
|
||||
TEXT ·galMulSSSE3Xor(SB), 7, $0
|
||||
MOVQ low+0(FP), SI // SI: &low
|
||||
MOVQ high+24(FP), DX // DX: &high
|
||||
MOVOU (SI), X6 // X6 low
|
||||
MOVOU (DX), X7 // X7: high
|
||||
MOVQ $15, BX // BX: low mask
|
||||
MOVQ BX, X8
|
||||
PXOR X5, X5
|
||||
MOVQ in+48(FP), SI // R11: &in
|
||||
MOVQ in_len+56(FP), R9 // R9: len(in)
|
||||
MOVQ out+72(FP), DX // DX: &out
|
||||
PSHUFB X5, X8 // X8: lomask (unpacked)
|
||||
SHRQ $4, R9 // len(in) / 16
|
||||
CMPQ R9, $0
|
||||
JEQ done_xor
|
||||
|
||||
loopback_xor:
|
||||
MOVOU (SI), X0 // in[x]
|
||||
MOVOU (DX), X4 // out[x]
|
||||
MOVOU X0, X1 // in[x]
|
||||
MOVOU X6, X2 // low copy
|
||||
MOVOU X7, X3 // high copy
|
||||
PSRLQ $4, X1 // X1: high input
|
||||
PAND X8, X0 // X0: low input
|
||||
PAND X8, X1 // X0: high input
|
||||
PSHUFB X0, X2 // X2: mul low part
|
||||
PSHUFB X1, X3 // X3: mul high part
|
||||
PXOR X2, X3 // X3: Result
|
||||
PXOR X4, X3 // X3: Result xor existing out
|
||||
MOVOU X3, (DX) // Store
|
||||
ADDQ $16, SI // in+=16
|
||||
ADDQ $16, DX // out+=16
|
||||
SUBQ $1, R9
|
||||
JNZ loopback_xor
|
||||
|
||||
done_xor:
|
||||
RET
|
||||
|
||||
// func galMulSSSE3(low, high, in, out []byte)
|
||||
TEXT ·galMulSSSE3(SB), 7, $0
|
||||
MOVQ low+0(FP), SI // SI: &low
|
||||
MOVQ high+24(FP), DX // DX: &high
|
||||
MOVOU (SI), X6 // X6 low
|
||||
MOVOU (DX), X7 // X7: high
|
||||
MOVQ $15, BX // BX: low mask
|
||||
MOVQ BX, X8
|
||||
PXOR X5, X5
|
||||
MOVQ in+48(FP), SI // R11: &in
|
||||
MOVQ in_len+56(FP), R9 // R9: len(in)
|
||||
MOVQ out+72(FP), DX // DX: &out
|
||||
PSHUFB X5, X8 // X8: lomask (unpacked)
|
||||
SHRQ $4, R9 // len(in) / 16
|
||||
CMPQ R9, $0
|
||||
JEQ done
|
||||
|
||||
loopback:
|
||||
MOVOU (SI), X0 // in[x]
|
||||
MOVOU X0, X1 // in[x]
|
||||
MOVOU X6, X2 // low copy
|
||||
MOVOU X7, X3 // high copy
|
||||
PSRLQ $4, X1 // X1: high input
|
||||
PAND X8, X0 // X0: low input
|
||||
PAND X8, X1 // X0: high input
|
||||
PSHUFB X0, X2 // X2: mul low part
|
||||
PSHUFB X1, X3 // X3: mul high part
|
||||
PXOR X2, X3 // X3: Result
|
||||
MOVOU X3, (DX) // Store
|
||||
ADDQ $16, SI // in+=16
|
||||
ADDQ $16, DX // out+=16
|
||||
SUBQ $1, R9
|
||||
JNZ loopback
|
||||
|
||||
done:
|
||||
RET
|
||||
|
||||
// func galMulAVX2Xor(low, high, in, out []byte)
|
||||
TEXT ·galMulAVX2Xor(SB), 7, $0
|
||||
MOVQ low+0(FP), SI // SI: &low
|
||||
MOVQ high+24(FP), DX // DX: &high
|
||||
MOVQ $15, BX // BX: low mask
|
||||
MOVQ BX, X5
|
||||
MOVOU (SI), X6 // X6 low
|
||||
MOVOU (DX), X7 // X7: high
|
||||
MOVQ in_len+56(FP), R9 // R9: len(in)
|
||||
|
||||
/*
|
||||
YASM:
|
||||
|
||||
VINSERTI128 YMM6, YMM6, XMM6, 1 ; low
|
||||
VINSERTI128 YMM7, YMM7, XMM7, 1 ; high
|
||||
VPBROADCASTB YMM8, XMM5 ; X8: lomask (unpacked)
|
||||
*/
|
||||
BYTE $0xc4; BYTE $0xe3; BYTE $0x4d; BYTE $0x38; BYTE $0xf6; BYTE $0x01; BYTE $0xc4; BYTE $0xe3; BYTE $0x45; BYTE $0x38; BYTE $0xff; BYTE $0x01; BYTE $0xc4; BYTE $0x62; BYTE $0x7d; BYTE $0x78; BYTE $0xc5
|
||||
|
||||
SHRQ $5, R9 // len(in) /32
|
||||
MOVQ out+72(FP), DX // DX: &out
|
||||
MOVQ in+48(FP), SI // R11: &in
|
||||
TESTQ R9, R9
|
||||
JZ done_xor_avx2
|
||||
|
||||
loopback_xor_avx2:
|
||||
/* Yasm:
|
||||
|
||||
VMOVDQU YMM0, [rsi]
|
||||
VMOVDQU YMM4, [rdx]
|
||||
VPSRLQ YMM1, YMM0, 4 ; X1: high input
|
||||
VPAND YMM0, YMM0, YMM8 ; X0: low input
|
||||
VPAND YMM1, YMM1, YMM8 ; X1: high input
|
||||
VPSHUFB YMM2, YMM6, YMM0 ; X2: mul low part
|
||||
VPSHUFB YMM3, YMM7, YMM1 ; X2: mul high part
|
||||
VPXOR YMM3, YMM2, YMM3 ; X3: Result
|
||||
VPXOR YMM4, YMM3, YMM4 ; X4: Result
|
||||
VMOVDQU [rdx], YMM4
|
||||
*/
|
||||
BYTE $0xc5; BYTE $0xfe; BYTE $0x6f; BYTE $0x06; BYTE $0xc5; BYTE $0xfe; BYTE $0x6f; BYTE $0x22; BYTE $0xc5; BYTE $0xf5; BYTE $0x73; BYTE $0xd0; BYTE $0x04; BYTE $0xc4; BYTE $0xc1; BYTE $0x7d; BYTE $0xdb; BYTE $0xc0; BYTE $0xc4; BYTE $0xc1; BYTE $0x75; BYTE $0xdb; BYTE $0xc8; BYTE $0xc4; BYTE $0xe2; BYTE $0x4d; BYTE $0x00; BYTE $0xd0; BYTE $0xc4; BYTE $0xe2; BYTE $0x45; BYTE $0x00; BYTE $0xd9; BYTE $0xc5; BYTE $0xed; BYTE $0xef; BYTE $0xdb; BYTE $0xc5; BYTE $0xe5; BYTE $0xef; BYTE $0xe4; BYTE $0xc5; BYTE $0xfe; BYTE $0x7f; BYTE $0x22
|
||||
|
||||
ADDQ $32, SI // in+=32
|
||||
ADDQ $32, DX // out+=32
|
||||
SUBQ $1, R9
|
||||
JNZ loopback_xor_avx2
|
||||
|
||||
done_xor_avx2:
|
||||
// VZEROUPPER
|
||||
BYTE $0xc5; BYTE $0xf8; BYTE $0x77
|
||||
RET
|
||||
|
||||
// func galMulAVX2(low, high, in, out []byte)
|
||||
TEXT ·galMulAVX2(SB), 7, $0
|
||||
MOVQ low+0(FP), SI // SI: &low
|
||||
MOVQ high+24(FP), DX // DX: &high
|
||||
MOVQ $15, BX // BX: low mask
|
||||
MOVQ BX, X5
|
||||
MOVOU (SI), X6 // X6 low
|
||||
MOVOU (DX), X7 // X7: high
|
||||
MOVQ in_len+56(FP), R9 // R9: len(in)
|
||||
|
||||
/*
|
||||
YASM:
|
||||
|
||||
VINSERTI128 YMM6, YMM6, XMM6, 1 ; low
|
||||
VINSERTI128 YMM7, YMM7, XMM7, 1 ; high
|
||||
VPBROADCASTB YMM8, XMM5 ; X8: lomask (unpacked)
|
||||
*/
|
||||
BYTE $0xc4; BYTE $0xe3; BYTE $0x4d; BYTE $0x38; BYTE $0xf6; BYTE $0x01; BYTE $0xc4; BYTE $0xe3; BYTE $0x45; BYTE $0x38; BYTE $0xff; BYTE $0x01; BYTE $0xc4; BYTE $0x62; BYTE $0x7d; BYTE $0x78; BYTE $0xc5
|
||||
|
||||
SHRQ $5, R9 // len(in) /32
|
||||
MOVQ out+72(FP), DX // DX: &out
|
||||
MOVQ in+48(FP), SI // R11: &in
|
||||
TESTQ R9, R9
|
||||
JZ done_avx2
|
||||
|
||||
loopback_avx2:
|
||||
/* Yasm:
|
||||
|
||||
VMOVDQU YMM0, [rsi]
|
||||
VPSRLQ YMM1, YMM0, 4 ; X1: high input
|
||||
VPAND YMM0, YMM0, YMM8 ; X0: low input
|
||||
VPAND YMM1, YMM1, YMM8 ; X1: high input
|
||||
VPSHUFB YMM2, YMM6, YMM0 ; X2: mul low part
|
||||
VPSHUFB YMM3, YMM7, YMM1 ; X2: mul high part
|
||||
VPXOR YMM4, YMM2, YMM3 ; X4: Result
|
||||
VMOVDQU [rdx], YMM4
|
||||
*/
|
||||
BYTE $0xc5; BYTE $0xfe; BYTE $0x6f; BYTE $0x06; BYTE $0xc5; BYTE $0xf5; BYTE $0x73; BYTE $0xd0; BYTE $0x04; BYTE $0xc4; BYTE $0xc1; BYTE $0x7d; BYTE $0xdb; BYTE $0xc0; BYTE $0xc4; BYTE $0xc1; BYTE $0x75; BYTE $0xdb; BYTE $0xc8; BYTE $0xc4; BYTE $0xe2; BYTE $0x4d; BYTE $0x00; BYTE $0xd0; BYTE $0xc4; BYTE $0xe2; BYTE $0x45; BYTE $0x00; BYTE $0xd9; BYTE $0xc5; BYTE $0xed; BYTE $0xef; BYTE $0xe3; BYTE $0xc5; BYTE $0xfe; BYTE $0x7f; BYTE $0x22
|
||||
|
||||
ADDQ $32, SI // in+=32
|
||||
ADDQ $32, DX // out+=32
|
||||
SUBQ $1, R9
|
||||
JNZ loopback_avx2
|
||||
JMP done_avx2
|
||||
|
||||
done_avx2:
|
||||
// VZEROUPPER
|
||||
BYTE $0xc5; BYTE $0xf8; BYTE $0x77
|
||||
RET
|
19
vendor/github.com/klauspost/reedsolomon/galois_noasm.go
generated
vendored
Normal file
19
vendor/github.com/klauspost/reedsolomon/galois_noasm.go
generated
vendored
Normal file
@ -0,0 +1,19 @@
|
||||
//+build !amd64 noasm appengine
|
||||
|
||||
// Copyright 2015, Klaus Post, see LICENSE for details.
|
||||
|
||||
package reedsolomon
|
||||
|
||||
func galMulSlice(c byte, in, out []byte) {
|
||||
mt := mulTable[c]
|
||||
for n, input := range in {
|
||||
out[n] = mt[input]
|
||||
}
|
||||
}
|
||||
|
||||
func galMulSliceXor(c byte, in, out []byte) {
|
||||
mt := mulTable[c]
|
||||
for n, input := range in {
|
||||
out[n] ^= mt[input]
|
||||
}
|
||||
}
|
132
vendor/github.com/klauspost/reedsolomon/gentables.go
generated
vendored
Normal file
132
vendor/github.com/klauspost/reedsolomon/gentables.go
generated
vendored
Normal file
@ -0,0 +1,132 @@
|
||||
//+build ignore
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
var logTable = [fieldSize]int16{
|
||||
-1, 0, 1, 25, 2, 50, 26, 198,
|
||||
3, 223, 51, 238, 27, 104, 199, 75,
|
||||
4, 100, 224, 14, 52, 141, 239, 129,
|
||||
28, 193, 105, 248, 200, 8, 76, 113,
|
||||
5, 138, 101, 47, 225, 36, 15, 33,
|
||||
53, 147, 142, 218, 240, 18, 130, 69,
|
||||
29, 181, 194, 125, 106, 39, 249, 185,
|
||||
201, 154, 9, 120, 77, 228, 114, 166,
|
||||
6, 191, 139, 98, 102, 221, 48, 253,
|
||||
226, 152, 37, 179, 16, 145, 34, 136,
|
||||
54, 208, 148, 206, 143, 150, 219, 189,
|
||||
241, 210, 19, 92, 131, 56, 70, 64,
|
||||
30, 66, 182, 163, 195, 72, 126, 110,
|
||||
107, 58, 40, 84, 250, 133, 186, 61,
|
||||
202, 94, 155, 159, 10, 21, 121, 43,
|
||||
78, 212, 229, 172, 115, 243, 167, 87,
|
||||
7, 112, 192, 247, 140, 128, 99, 13,
|
||||
103, 74, 222, 237, 49, 197, 254, 24,
|
||||
227, 165, 153, 119, 38, 184, 180, 124,
|
||||
17, 68, 146, 217, 35, 32, 137, 46,
|
||||
55, 63, 209, 91, 149, 188, 207, 205,
|
||||
144, 135, 151, 178, 220, 252, 190, 97,
|
||||
242, 86, 211, 171, 20, 42, 93, 158,
|
||||
132, 60, 57, 83, 71, 109, 65, 162,
|
||||
31, 45, 67, 216, 183, 123, 164, 118,
|
||||
196, 23, 73, 236, 127, 12, 111, 246,
|
||||
108, 161, 59, 82, 41, 157, 85, 170,
|
||||
251, 96, 134, 177, 187, 204, 62, 90,
|
||||
203, 89, 95, 176, 156, 169, 160, 81,
|
||||
11, 245, 22, 235, 122, 117, 44, 215,
|
||||
79, 174, 213, 233, 230, 231, 173, 232,
|
||||
116, 214, 244, 234, 168, 80, 88, 175,
|
||||
}
|
||||
|
||||
const (
|
||||
// The number of elements in the field.
|
||||
fieldSize = 256
|
||||
|
||||
// The polynomial used to generate the logarithm table.
|
||||
//
|
||||
// There are a number of polynomials that work to generate
|
||||
// a Galois field of 256 elements. The choice is arbitrary,
|
||||
// and we just use the first one.
|
||||
//
|
||||
// The possibilities are: 29, 43, 45, 77, 95, 99, 101, 105,
|
||||
//* 113, 135, 141, 169, 195, 207, 231, and 245.
|
||||
generatingPolynomial = 29
|
||||
)
|
||||
|
||||
func main() {
|
||||
t := generateExpTable()
|
||||
fmt.Printf("var expTable = %#v\n", t)
|
||||
//t2 := generateMulTableSplit(t)
|
||||
//fmt.Printf("var mulTable = %#v\n", t2)
|
||||
low, high := generateMulTableHalf(t)
|
||||
fmt.Printf("var mulTableLow = %#v\n", low)
|
||||
fmt.Printf("var mulTableHigh = %#v\n", high)
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates the inverse log table.
|
||||
*/
|
||||
func generateExpTable() []byte {
|
||||
result := make([]byte, fieldSize*2-2)
|
||||
for i := 1; i < fieldSize; i++ {
|
||||
log := logTable[i]
|
||||
result[log] = byte(i)
|
||||
result[log+fieldSize-1] = byte(i)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func generateMulTable(expTable []byte) []byte {
|
||||
result := make([]byte, 256*256)
|
||||
for v := range result {
|
||||
a := byte(v & 0xff)
|
||||
b := byte(v >> 8)
|
||||
if a == 0 || b == 0 {
|
||||
result[v] = 0
|
||||
continue
|
||||
}
|
||||
logA := int(logTable[a])
|
||||
logB := int(logTable[b])
|
||||
result[v] = expTable[logA+logB]
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func generateMulTableSplit(expTable []byte) [256][256]byte {
|
||||
var result [256][256]byte
|
||||
for a := range result {
|
||||
for b := range result[a] {
|
||||
if a == 0 || b == 0 {
|
||||
result[a][b] = 0
|
||||
continue
|
||||
}
|
||||
logA := int(logTable[a])
|
||||
logB := int(logTable[b])
|
||||
result[a][b] = expTable[logA+logB]
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func generateMulTableHalf(expTable []byte) (low [256][16]byte, high [256][16]byte) {
|
||||
for a := range low {
|
||||
for b := range low {
|
||||
result := 0
|
||||
if !(a == 0 || b == 0) {
|
||||
logA := int(logTable[a])
|
||||
logB := int(logTable[b])
|
||||
result = int(expTable[logA+logB])
|
||||
}
|
||||
if (b & 0xf) == b {
|
||||
low[a][b] = byte(result)
|
||||
}
|
||||
if (b & 0xf0) == b {
|
||||
high[a][b>>4] = byte(result)
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
282
vendor/github.com/klauspost/reedsolomon/matrix.go
generated
vendored
Normal file
282
vendor/github.com/klauspost/reedsolomon/matrix.go
generated
vendored
Normal file
@ -0,0 +1,282 @@
|
||||
/**
|
||||
* Matrix Algebra over an 8-bit Galois Field
|
||||
*
|
||||
* Copyright 2015, Klaus Post
|
||||
* Copyright 2015, Backblaze, Inc.
|
||||
*/
|
||||
|
||||
package reedsolomon
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// byte[row][col]
|
||||
type matrix [][]byte
|
||||
|
||||
// newMatrix returns a matrix of zeros.
|
||||
func newMatrix(rows, cols int) (matrix, error) {
|
||||
if rows <= 0 {
|
||||
return nil, errInvalidRowSize
|
||||
}
|
||||
if cols <= 0 {
|
||||
return nil, errInvalidColSize
|
||||
}
|
||||
|
||||
m := matrix(make([][]byte, rows))
|
||||
for i := range m {
|
||||
m[i] = make([]byte, cols)
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// NewMatrixData initializes a matrix with the given row-major data.
|
||||
// Note that data is not copied from input.
|
||||
func newMatrixData(data [][]byte) (matrix, error) {
|
||||
m := matrix(data)
|
||||
err := m.Check()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// IdentityMatrix returns an identity matrix of the given size.
|
||||
func identityMatrix(size int) (matrix, error) {
|
||||
m, err := newMatrix(size, size)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := range m {
|
||||
m[i][i] = 1
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// errInvalidRowSize will be returned if attempting to create a matrix with negative or zero row number.
|
||||
var errInvalidRowSize = errors.New("invalid row size")
|
||||
|
||||
// errInvalidColSize will be returned if attempting to create a matrix with negative or zero column number.
|
||||
var errInvalidColSize = errors.New("invalid column size")
|
||||
|
||||
// errColSizeMismatch is returned if the size of matrix columns mismatch.
|
||||
var errColSizeMismatch = errors.New("column size is not the same for all rows")
|
||||
|
||||
func (m matrix) Check() error {
|
||||
rows := len(m)
|
||||
if rows <= 0 {
|
||||
return errInvalidRowSize
|
||||
}
|
||||
cols := len(m[0])
|
||||
if cols <= 0 {
|
||||
return errInvalidColSize
|
||||
}
|
||||
|
||||
for _, col := range m {
|
||||
if len(col) != cols {
|
||||
return errColSizeMismatch
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// String returns a human-readable string of the matrix contents.
|
||||
//
|
||||
// Example: [[1, 2], [3, 4]]
|
||||
func (m matrix) String() string {
|
||||
var rowOut []string
|
||||
for _, row := range m {
|
||||
var colOut []string
|
||||
for _, col := range row {
|
||||
colOut = append(colOut, strconv.Itoa(int(col)))
|
||||
}
|
||||
rowOut = append(rowOut, "["+strings.Join(colOut, ", ")+"]")
|
||||
}
|
||||
return "[" + strings.Join(rowOut, ", ") + "]"
|
||||
}
|
||||
|
||||
// Multiply multiplies this matrix (the one on the left) by another
|
||||
// matrix (the one on the right) and returns a new matrix with the result.
|
||||
func (m matrix) Multiply(right matrix) (matrix, error) {
|
||||
if len(m[0]) != len(right) {
|
||||
return nil, fmt.Errorf("columns on left (%d) is different than rows on right (%d)", len(m[0]), len(right))
|
||||
}
|
||||
result, _ := newMatrix(len(m), len(right[0]))
|
||||
for r, row := range result {
|
||||
for c := range row {
|
||||
var value byte
|
||||
for i := range m[0] {
|
||||
value ^= galMultiply(m[r][i], right[i][c])
|
||||
}
|
||||
result[r][c] = value
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Augment returns the concatenation of this matrix and the matrix on the right.
|
||||
func (m matrix) Augment(right matrix) (matrix, error) {
|
||||
if len(m) != len(right) {
|
||||
return nil, errMatrixSize
|
||||
}
|
||||
|
||||
result, _ := newMatrix(len(m), len(m[0])+len(right[0]))
|
||||
for r, row := range m {
|
||||
for c := range row {
|
||||
result[r][c] = m[r][c]
|
||||
}
|
||||
cols := len(m[0])
|
||||
for c := range right[0] {
|
||||
result[r][cols+c] = right[r][c]
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// errMatrixSize is returned if matrix dimensions are doesn't match.
|
||||
var errMatrixSize = errors.New("matrix sizes does not match")
|
||||
|
||||
func (m matrix) SameSize(n matrix) error {
|
||||
if len(m) != len(n) {
|
||||
return errMatrixSize
|
||||
}
|
||||
for i := range m {
|
||||
if len(m[i]) != len(n[i]) {
|
||||
return errMatrixSize
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Returns a part of this matrix. Data is copied.
|
||||
func (m matrix) SubMatrix(rmin, cmin, rmax, cmax int) (matrix, error) {
|
||||
result, err := newMatrix(rmax-rmin, cmax-cmin)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// OPTME: If used heavily, use copy function to copy slice
|
||||
for r := rmin; r < rmax; r++ {
|
||||
for c := cmin; c < cmax; c++ {
|
||||
result[r-rmin][c-cmin] = m[r][c]
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// SwapRows Exchanges two rows in the matrix.
|
||||
func (m matrix) SwapRows(r1, r2 int) error {
|
||||
if r1 < 0 || len(m) <= r1 || r2 < 0 || len(m) <= r2 {
|
||||
return errInvalidRowSize
|
||||
}
|
||||
m[r2], m[r1] = m[r1], m[r2]
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsSquare will return true if the matrix is square
|
||||
// and nil if the matrix is square
|
||||
func (m matrix) IsSquare() bool {
|
||||
if len(m) != len(m[0]) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// errSingular is returned if the matrix is singular and cannot be inversed
|
||||
var errSingular = errors.New("matrix is singular")
|
||||
|
||||
// errNotSquare is returned if attempting to inverse a non-square matrix.
|
||||
var errNotSquare = errors.New("only square matrices can be inverted")
|
||||
|
||||
// Invert returns the inverse of this matrix.
|
||||
// Returns ErrSingular when the matrix is singular and doesn't have an inverse.
|
||||
// The matrix must be square, otherwise ErrNotSquare is returned.
|
||||
func (m matrix) Invert() (matrix, error) {
|
||||
if !m.IsSquare() {
|
||||
return nil, errNotSquare
|
||||
}
|
||||
|
||||
size := len(m)
|
||||
work, _ := identityMatrix(size)
|
||||
work, _ = m.Augment(work)
|
||||
|
||||
err := work.gaussianElimination()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return work.SubMatrix(0, size, size, size*2)
|
||||
}
|
||||
|
||||
func (m matrix) gaussianElimination() error {
|
||||
rows := len(m)
|
||||
columns := len(m[0])
|
||||
// Clear out the part below the main diagonal and scale the main
|
||||
// diagonal to be 1.
|
||||
for r := 0; r < rows; r++ {
|
||||
// If the element on the diagonal is 0, find a row below
|
||||
// that has a non-zero and swap them.
|
||||
if m[r][r] == 0 {
|
||||
for rowBelow := r + 1; rowBelow < rows; rowBelow++ {
|
||||
if m[rowBelow][r] != 0 {
|
||||
m.SwapRows(r, rowBelow)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
// If we couldn't find one, the matrix is singular.
|
||||
if m[r][r] == 0 {
|
||||
return errSingular
|
||||
}
|
||||
// Scale to 1.
|
||||
if m[r][r] != 1 {
|
||||
scale := galDivide(1, m[r][r])
|
||||
for c := 0; c < columns; c++ {
|
||||
m[r][c] = galMultiply(m[r][c], scale)
|
||||
}
|
||||
}
|
||||
// Make everything below the 1 be a 0 by subtracting
|
||||
// a multiple of it. (Subtraction and addition are
|
||||
// both exclusive or in the Galois field.)
|
||||
for rowBelow := r + 1; rowBelow < rows; rowBelow++ {
|
||||
if m[rowBelow][r] != 0 {
|
||||
scale := m[rowBelow][r]
|
||||
for c := 0; c < columns; c++ {
|
||||
m[rowBelow][c] ^= galMultiply(scale, m[r][c])
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Now clear the part above the main diagonal.
|
||||
for d := 0; d < rows; d++ {
|
||||
for rowAbove := 0; rowAbove < d; rowAbove++ {
|
||||
if m[rowAbove][d] != 0 {
|
||||
scale := m[rowAbove][d]
|
||||
for c := 0; c < columns; c++ {
|
||||
m[rowAbove][c] ^= galMultiply(scale, m[d][c])
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create a Vandermonde matrix, which is guaranteed to have the
|
||||
// property that any subset of rows that forms a square matrix
|
||||
// is invertible.
|
||||
func vandermonde(rows, cols int) (matrix, error) {
|
||||
result, err := newMatrix(rows, cols)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for r, row := range result {
|
||||
for c := range row {
|
||||
result[r][c] = galExp(byte(r), c)
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
526
vendor/github.com/klauspost/reedsolomon/reedsolomon.go
generated
vendored
Normal file
526
vendor/github.com/klauspost/reedsolomon/reedsolomon.go
generated
vendored
Normal file
@ -0,0 +1,526 @@
|
||||
/**
|
||||
* Reed-Solomon Coding over 8-bit values.
|
||||
*
|
||||
* Copyright 2015, Klaus Post
|
||||
* Copyright 2015, Backblaze, Inc.
|
||||
*/
|
||||
|
||||
// Package reedsolomon enables Erasure Coding in Go
|
||||
//
|
||||
// For usage and examples, see https://github.com/klauspost/reedsolomon
|
||||
//
|
||||
package reedsolomon
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"io"
|
||||
"runtime"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Encoder is an interface to encode Reed-Salomon parity sets for your data.
|
||||
type Encoder interface {
|
||||
// Encodes parity for a set of data shards.
|
||||
// Input is 'shards' containing data shards followed by parity shards.
|
||||
// The number of shards must match the number given to New().
|
||||
// Each shard is a byte array, and they must all be the same size.
|
||||
// The parity shards will always be overwritten and the data shards
|
||||
// will remain the same, so it is safe for you to read from the
|
||||
// data shards while this is running.
|
||||
Encode(shards [][]byte) error
|
||||
|
||||
// Verify returns true if the parity shards contain correct data.
|
||||
// The data is the same format as Encode. No data is modified, so
|
||||
// you are allowed to read from data while this is running.
|
||||
Verify(shards [][]byte) (bool, error)
|
||||
|
||||
// Reconstruct will recreate the missing shards if possible.
|
||||
//
|
||||
// Given a list of shards, some of which contain data, fills in the
|
||||
// ones that don't have data.
|
||||
//
|
||||
// The length of the array must be equal to the total number of shards.
|
||||
// You indicate that a shard is missing by setting it to nil.
|
||||
//
|
||||
// If there are too few shards to reconstruct the missing
|
||||
// ones, ErrTooFewShards will be returned.
|
||||
//
|
||||
// The reconstructed shard set is complete, but integrity is not verified.
|
||||
// Use the Verify function to check if data set is ok.
|
||||
Reconstruct(shards [][]byte) error
|
||||
|
||||
// Split a data slice into the number of shards given to the encoder,
|
||||
// and create empty parity shards.
|
||||
//
|
||||
// The data will be split into equally sized shards.
|
||||
// If the data size isn't dividable by the number of shards,
|
||||
// the last shard will contain extra zeros.
|
||||
//
|
||||
// There must be at least the same number of bytes as there are data shards,
|
||||
// otherwise ErrShortData will be returned.
|
||||
//
|
||||
// The data will not be copied, except for the last shard, so you
|
||||
// should not modify the data of the input slice afterwards.
|
||||
Split(data []byte) ([][]byte, error)
|
||||
|
||||
// Join the shards and write the data segment to dst.
|
||||
//
|
||||
// Only the data shards are considered.
|
||||
// You must supply the exact output size you want.
|
||||
// If there are to few shards given, ErrTooFewShards will be returned.
|
||||
// If the total data size is less than outSize, ErrShortData will be returned.
|
||||
Join(dst io.Writer, shards [][]byte, outSize int) error
|
||||
}
|
||||
|
||||
// reedSolomon contains a matrix for a specific
|
||||
// distribution of datashards and parity shards.
|
||||
// Construct if using New()
|
||||
type reedSolomon struct {
|
||||
DataShards int // Number of data shards, should not be modified.
|
||||
ParityShards int // Number of parity shards, should not be modified.
|
||||
Shards int // Total number of shards. Calculated, and should not be modified.
|
||||
m matrix
|
||||
parity [][]byte
|
||||
}
|
||||
|
||||
// ErrInvShardNum will be returned by New, if you attempt to create
|
||||
// an Encoder where either data or parity shards is zero or less,
|
||||
// or the number of data shards is higher than 256.
|
||||
var ErrInvShardNum = errors.New("cannot create Encoder with zero or less data/parity shards")
|
||||
|
||||
// New creates a new encoder and initializes it to
|
||||
// the number of data shards and parity shards that
|
||||
// you want to use. You can reuse this encoder.
|
||||
// Note that the maximum number of data shards is 256.
|
||||
func New(dataShards, parityShards int) (Encoder, error) {
|
||||
r := reedSolomon{
|
||||
DataShards: dataShards,
|
||||
ParityShards: parityShards,
|
||||
Shards: dataShards + parityShards,
|
||||
}
|
||||
|
||||
if dataShards <= 0 || parityShards <= 0 {
|
||||
return nil, ErrInvShardNum
|
||||
}
|
||||
|
||||
if dataShards > 256 {
|
||||
return nil, ErrInvShardNum
|
||||
}
|
||||
|
||||
// Start with a Vandermonde matrix. This matrix would work,
|
||||
// in theory, but doesn't have the property that the data
|
||||
// shards are unchanged after encoding.
|
||||
vm, err := vandermonde(r.Shards, dataShards)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Multiply by the inverse of the top square of the matrix.
|
||||
// This will make the top square be the identity matrix, but
|
||||
// preserve the property that any square subset of rows is
|
||||
// invertible.
|
||||
top, _ := vm.SubMatrix(0, 0, dataShards, dataShards)
|
||||
top, _ = top.Invert()
|
||||
r.m, _ = vm.Multiply(top)
|
||||
|
||||
r.parity = make([][]byte, parityShards)
|
||||
for i := range r.parity {
|
||||
r.parity[i] = r.m[dataShards+i]
|
||||
}
|
||||
|
||||
return &r, err
|
||||
}
|
||||
|
||||
// ErrTooFewShards is returned if too few shards where given to
|
||||
// Encode/Verify/Reconstruct. It will also be returned from Reconstruct
|
||||
// if there were too few shards to reconstruct the missing data.
|
||||
var ErrTooFewShards = errors.New("too few shards given")
|
||||
|
||||
// Encodes parity for a set of data shards.
|
||||
// An array 'shards' containing data shards followed by parity shards.
|
||||
// The number of shards must match the number given to New.
|
||||
// Each shard is a byte array, and they must all be the same size.
|
||||
// The parity shards will always be overwritten and the data shards
|
||||
// will remain the same.
|
||||
func (r reedSolomon) Encode(shards [][]byte) error {
|
||||
if len(shards) != r.Shards {
|
||||
return ErrTooFewShards
|
||||
}
|
||||
|
||||
err := checkShards(shards, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Get the slice of output buffers.
|
||||
output := shards[r.DataShards:]
|
||||
|
||||
// Do the coding.
|
||||
r.codeSomeShards(r.parity, shards[0:r.DataShards], output, r.ParityShards, len(shards[0]))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Verify returns true if the parity shards contain the right data.
|
||||
// The data is the same format as Encode. No data is modified.
|
||||
func (r reedSolomon) Verify(shards [][]byte) (bool, error) {
|
||||
if len(shards) != r.Shards {
|
||||
return false, ErrTooFewShards
|
||||
}
|
||||
err := checkShards(shards, false)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Slice of buffers being checked.
|
||||
toCheck := shards[r.DataShards:]
|
||||
|
||||
// Do the checking.
|
||||
return r.checkSomeShards(r.parity, shards[0:r.DataShards], toCheck, r.ParityShards, len(shards[0])), nil
|
||||
}
|
||||
|
||||
// Multiplies a subset of rows from a coding matrix by a full set of
|
||||
// input shards to produce some output shards.
|
||||
// 'matrixRows' is The rows from the matrix to use.
|
||||
// 'inputs' An array of byte arrays, each of which is one input shard.
|
||||
// The number of inputs used is determined by the length of each matrix row.
|
||||
// outputs Byte arrays where the computed shards are stored.
|
||||
// The number of outputs computed, and the
|
||||
// number of matrix rows used, is determined by
|
||||
// outputCount, which is the number of outputs to compute.
|
||||
func (r reedSolomon) codeSomeShards(matrixRows, inputs, outputs [][]byte, outputCount, byteCount int) {
|
||||
if runtime.GOMAXPROCS(0) > 1 && len(inputs[0]) > splitSize {
|
||||
r.codeSomeShardsP(matrixRows, inputs, outputs, outputCount, byteCount)
|
||||
return
|
||||
}
|
||||
for c := 0; c < r.DataShards; c++ {
|
||||
in := inputs[c]
|
||||
for iRow := 0; iRow < outputCount; iRow++ {
|
||||
if c == 0 {
|
||||
galMulSlice(matrixRows[iRow][c], in, outputs[iRow])
|
||||
} else {
|
||||
galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow])
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// How many bytes per goroutine.
|
||||
const splitSize = 512
|
||||
|
||||
// Perform the same as codeSomeShards, but split the workload into
|
||||
// several goroutines.
|
||||
func (r reedSolomon) codeSomeShardsP(matrixRows, inputs, outputs [][]byte, outputCount, byteCount int) {
|
||||
var wg sync.WaitGroup
|
||||
left := byteCount
|
||||
start := 0
|
||||
for {
|
||||
do := left
|
||||
if do > splitSize {
|
||||
do = splitSize
|
||||
}
|
||||
if do == 0 {
|
||||
break
|
||||
}
|
||||
left -= do
|
||||
wg.Add(1)
|
||||
go func(start, stop int) {
|
||||
for c := 0; c < r.DataShards; c++ {
|
||||
in := inputs[c]
|
||||
for iRow := 0; iRow < outputCount; iRow++ {
|
||||
if c == 0 {
|
||||
galMulSlice(matrixRows[iRow][c], in[start:stop], outputs[iRow][start:stop])
|
||||
} else {
|
||||
galMulSliceXor(matrixRows[iRow][c], in[start:stop], outputs[iRow][start:stop])
|
||||
}
|
||||
}
|
||||
}
|
||||
wg.Done()
|
||||
}(start, start+do)
|
||||
start += do
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// checkSomeShards is mostly the same as codeSomeShards,
|
||||
// except this will check values and return
|
||||
// as soon as a difference is found.
|
||||
func (r reedSolomon) checkSomeShards(matrixRows, inputs, toCheck [][]byte, outputCount, byteCount int) bool {
|
||||
var wg sync.WaitGroup
|
||||
left := byteCount
|
||||
start := 0
|
||||
|
||||
same := true
|
||||
var mu sync.RWMutex // For above
|
||||
|
||||
for {
|
||||
do := left
|
||||
if do > splitSize {
|
||||
do = splitSize
|
||||
}
|
||||
if do == 0 {
|
||||
break
|
||||
}
|
||||
left -= do
|
||||
wg.Add(1)
|
||||
go func(start, do int) {
|
||||
defer wg.Done()
|
||||
outputs := make([][]byte, len(toCheck))
|
||||
for i := range outputs {
|
||||
outputs[i] = make([]byte, do)
|
||||
}
|
||||
for c := 0; c < r.DataShards; c++ {
|
||||
mu.RLock()
|
||||
if !same {
|
||||
mu.RUnlock()
|
||||
return
|
||||
}
|
||||
mu.RUnlock()
|
||||
in := inputs[c][start : start+do]
|
||||
for iRow := 0; iRow < outputCount; iRow++ {
|
||||
galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow])
|
||||
}
|
||||
}
|
||||
|
||||
for i, calc := range outputs {
|
||||
if bytes.Compare(calc, toCheck[i][start:start+do]) != 0 {
|
||||
mu.Lock()
|
||||
same = false
|
||||
mu.Unlock()
|
||||
return
|
||||
}
|
||||
}
|
||||
}(start, do)
|
||||
start += do
|
||||
}
|
||||
wg.Wait()
|
||||
return same
|
||||
}
|
||||
|
||||
// ErrShardNoData will be returned if there are no shards,
|
||||
// or if the length of all shards is zero.
|
||||
var ErrShardNoData = errors.New("no shard data")
|
||||
|
||||
// ErrShardSize is returned if shard length isn't the same for all
|
||||
// shards.
|
||||
var ErrShardSize = errors.New("shard sizes does not match")
|
||||
|
||||
// checkShards will check if shards are the same size
|
||||
// or 0, if allowed. An error is returned if this fails.
|
||||
// An error is also returned if all shards are size 0.
|
||||
func checkShards(shards [][]byte, nilok bool) error {
|
||||
size := shardSize(shards)
|
||||
if size == 0 {
|
||||
return ErrShardNoData
|
||||
}
|
||||
for _, shard := range shards {
|
||||
if len(shard) != size {
|
||||
if len(shard) != 0 || !nilok {
|
||||
return ErrShardSize
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// shardSize return the size of a single shard.
|
||||
// The first non-zero size is returned,
|
||||
// or 0 if all shards are size 0.
|
||||
func shardSize(shards [][]byte) int {
|
||||
for _, shard := range shards {
|
||||
if len(shard) != 0 {
|
||||
return len(shard)
|
||||
}
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// Reconstruct will recreate the missing shards, if possible.
|
||||
//
|
||||
// Given a list of shards, some of which contain data, fills in the
|
||||
// ones that don't have data.
|
||||
//
|
||||
// The length of the array must be equal to Shards.
|
||||
// You indicate that a shard is missing by setting it to nil.
|
||||
//
|
||||
// If there are too few shards to reconstruct the missing
|
||||
// ones, ErrTooFewShards will be returned.
|
||||
//
|
||||
// The reconstructed shard set is complete, but integrity is not verified.
|
||||
// Use the Verify function to check if data set is ok.
|
||||
func (r reedSolomon) Reconstruct(shards [][]byte) error {
|
||||
if len(shards) != r.Shards {
|
||||
return ErrTooFewShards
|
||||
}
|
||||
// Check arguments.
|
||||
err := checkShards(shards, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
shardSize := shardSize(shards)
|
||||
|
||||
// Quick check: are all of the shards present? If so, there's
|
||||
// nothing to do.
|
||||
numberPresent := 0
|
||||
for i := 0; i < r.Shards; i++ {
|
||||
if len(shards[i]) != 0 {
|
||||
numberPresent++
|
||||
}
|
||||
}
|
||||
if numberPresent == r.Shards {
|
||||
// Cool. All of the shards data data. We don't
|
||||
// need to do anything.
|
||||
return nil
|
||||
}
|
||||
|
||||
// More complete sanity check
|
||||
if numberPresent < r.DataShards {
|
||||
return ErrTooFewShards
|
||||
}
|
||||
|
||||
// Pull out the rows of the matrix that correspond to the
|
||||
// shards that we have and build a square matrix. This
|
||||
// matrix could be used to generate the shards that we have
|
||||
// from the original data.
|
||||
//
|
||||
// Also, pull out an array holding just the shards that
|
||||
// correspond to the rows of the submatrix. These shards
|
||||
// will be the input to the decoding process that re-creates
|
||||
// the missing data shards.
|
||||
subMatrix, _ := newMatrix(r.DataShards, r.DataShards)
|
||||
subShards := make([][]byte, r.DataShards)
|
||||
subMatrixRow := 0
|
||||
for matrixRow := 0; matrixRow < r.Shards && subMatrixRow < r.DataShards; matrixRow++ {
|
||||
if len(shards[matrixRow]) != 0 {
|
||||
for c := 0; c < r.DataShards; c++ {
|
||||
subMatrix[subMatrixRow][c] = r.m[matrixRow][c]
|
||||
}
|
||||
subShards[subMatrixRow] = shards[matrixRow]
|
||||
subMatrixRow++
|
||||
}
|
||||
}
|
||||
|
||||
// Invert the matrix, so we can go from the encoded shards
|
||||
// back to the original data. Then pull out the row that
|
||||
// generates the shard that we want to decode. Note that
|
||||
// since this matrix maps back to the original data, it can
|
||||
// be used to create a data shard, but not a parity shard.
|
||||
dataDecodeMatrix, err := subMatrix.Invert()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Re-create any data shards that were missing.
|
||||
//
|
||||
// The input to the coding is all of the shards we actually
|
||||
// have, and the output is the missing data shards. The computation
|
||||
// is done using the special decode matrix we just built.
|
||||
outputs := make([][]byte, r.ParityShards)
|
||||
matrixRows := make([][]byte, r.ParityShards)
|
||||
outputCount := 0
|
||||
|
||||
for iShard := 0; iShard < r.DataShards; iShard++ {
|
||||
if len(shards[iShard]) == 0 {
|
||||
shards[iShard] = make([]byte, shardSize)
|
||||
outputs[outputCount] = shards[iShard]
|
||||
matrixRows[outputCount] = dataDecodeMatrix[iShard]
|
||||
outputCount++
|
||||
}
|
||||
}
|
||||
r.codeSomeShards(matrixRows, subShards, outputs[:outputCount], outputCount, shardSize)
|
||||
|
||||
// Now that we have all of the data shards intact, we can
|
||||
// compute any of the parity that is missing.
|
||||
//
|
||||
// The input to the coding is ALL of the data shards, including
|
||||
// any that we just calculated. The output is whichever of the
|
||||
// data shards were missing.
|
||||
outputCount = 0
|
||||
for iShard := r.DataShards; iShard < r.Shards; iShard++ {
|
||||
if len(shards[iShard]) == 0 {
|
||||
shards[iShard] = make([]byte, shardSize)
|
||||
outputs[outputCount] = shards[iShard]
|
||||
matrixRows[outputCount] = r.parity[iShard-r.DataShards]
|
||||
outputCount++
|
||||
}
|
||||
}
|
||||
r.codeSomeShards(matrixRows, shards[:r.DataShards], outputs[:outputCount], outputCount, shardSize)
|
||||
return nil
|
||||
}
|
||||
|
||||
// ErrShortData will be returned by Split(), if there isn't enough data
|
||||
// to fill the number of shards.
|
||||
var ErrShortData = errors.New("not enough data to fill the number of requested shards")
|
||||
|
||||
// Split a data slice into the number of shards given to the encoder,
|
||||
// and create empty parity shards.
|
||||
//
|
||||
// The data will be split into equally sized shards.
|
||||
// If the data size isn't divisible by the number of shards,
|
||||
// the last shard will contain extra zeros.
|
||||
//
|
||||
// There must be at least the same number of bytes as there are data shards,
|
||||
// otherwise ErrShortData will be returned.
|
||||
//
|
||||
// The data will not be copied, except for the last shard, so you
|
||||
// should not modify the data of the input slice afterwards.
|
||||
func (r reedSolomon) Split(data []byte) ([][]byte, error) {
|
||||
if len(data) < r.DataShards {
|
||||
return nil, ErrShortData
|
||||
}
|
||||
|
||||
// Calculate number of bytes per shard.
|
||||
perShard := (len(data) + r.DataShards - 1) / r.DataShards
|
||||
|
||||
// Pad data to r.Shards*perShard.
|
||||
padding := make([]byte, (r.Shards*perShard)-len(data))
|
||||
data = append(data, padding...)
|
||||
|
||||
// Split into equal-length shards.
|
||||
dst := make([][]byte, r.Shards)
|
||||
for i := range dst {
|
||||
dst[i] = data[:perShard]
|
||||
data = data[perShard:]
|
||||
}
|
||||
|
||||
return dst, nil
|
||||
}
|
||||
|
||||
// Join the shards and write the data segment to dst.
|
||||
//
|
||||
// Only the data shards are considered.
|
||||
// You must supply the exact output size you want.
|
||||
// If there are to few shards given, ErrTooFewShards will be returned.
|
||||
// If the total data size is less than outSize, ErrShortData will be returned.
|
||||
func (r reedSolomon) Join(dst io.Writer, shards [][]byte, outSize int) error {
|
||||
// Do we have enough shards?
|
||||
if len(shards) < r.DataShards {
|
||||
return ErrTooFewShards
|
||||
}
|
||||
shards = shards[:r.DataShards]
|
||||
|
||||
// Do we have enough data?
|
||||
size := 0
|
||||
for _, shard := range shards {
|
||||
size += len(shard)
|
||||
}
|
||||
if size < outSize {
|
||||
return ErrShortData
|
||||
}
|
||||
|
||||
// Copy data to dst
|
||||
write := outSize
|
||||
for _, shard := range shards {
|
||||
if write < len(shard) {
|
||||
_, err := dst.Write(shard[:write])
|
||||
return err
|
||||
}
|
||||
n, err := dst.Write(shard)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
write -= n
|
||||
}
|
||||
return nil
|
||||
}
|
576
vendor/github.com/klauspost/reedsolomon/streaming.go
generated
vendored
Normal file
576
vendor/github.com/klauspost/reedsolomon/streaming.go
generated
vendored
Normal file
@ -0,0 +1,576 @@
|
||||
/**
|
||||
* Reed-Solomon Coding over 8-bit values.
|
||||
*
|
||||
* Copyright 2015, Klaus Post
|
||||
* Copyright 2015, Backblaze, Inc.
|
||||
*/
|
||||
|
||||
package reedsolomon
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// StreamEncoder is an interface to encode Reed-Salomon parity sets for your data.
|
||||
// It provides a fully streaming interface, and processes data in blocks of up to 4MB.
|
||||
//
|
||||
// For small shard sizes, 10MB and below, it is recommended to use the in-memory interface,
|
||||
// since the streaming interface has a start up overhead.
|
||||
//
|
||||
// For all operations, no readers and writers should not assume any order/size of
|
||||
// individual reads/writes.
|
||||
//
|
||||
// For usage examples, see "stream-encoder.go" and "streamdecoder.go" in the examples
|
||||
// folder.
|
||||
type StreamEncoder interface {
|
||||
// Encodes parity shards for a set of data shards.
|
||||
//
|
||||
// Input is 'shards' containing readers for data shards followed by parity shards
|
||||
// io.Writer.
|
||||
//
|
||||
// The number of shards must match the number given to NewStream().
|
||||
//
|
||||
// Each reader must supply the same number of bytes.
|
||||
//
|
||||
// The parity shards will be written to the writer.
|
||||
// The number of bytes written will match the input size.
|
||||
//
|
||||
// If a data stream returns an error, a StreamReadError type error
|
||||
// will be returned. If a parity writer returns an error, a
|
||||
// StreamWriteError will be returned.
|
||||
Encode(data []io.Reader, parity []io.Writer) error
|
||||
|
||||
// Verify returns true if the parity shards contain correct data.
|
||||
//
|
||||
// The number of shards must match the number total data+parity shards
|
||||
// given to NewStream().
|
||||
//
|
||||
// Each reader must supply the same number of bytes.
|
||||
// If a shard stream returns an error, a StreamReadError type error
|
||||
// will be returned.
|
||||
Verify(shards []io.Reader) (bool, error)
|
||||
|
||||
// Reconstruct will recreate the missing shards if possible.
|
||||
//
|
||||
// Given a list of valid shards (to read) and invalid shards (to write)
|
||||
//
|
||||
// You indicate that a shard is missing by setting it to nil in the 'valid'
|
||||
// slice and at the same time setting a non-nil writer in "fill".
|
||||
// An index cannot contain both non-nil 'valid' and 'fill' entry.
|
||||
// If both are provided 'ErrReconstructMismatch' is returned.
|
||||
//
|
||||
// If there are too few shards to reconstruct the missing
|
||||
// ones, ErrTooFewShards will be returned.
|
||||
//
|
||||
// The reconstructed shard set is complete, but integrity is not verified.
|
||||
// Use the Verify function to check if data set is ok.
|
||||
Reconstruct(valid []io.Reader, fill []io.Writer) error
|
||||
|
||||
// Split a an input stream into the number of shards given to the encoder.
|
||||
//
|
||||
// The data will be split into equally sized shards.
|
||||
// If the data size isn't dividable by the number of shards,
|
||||
// the last shard will contain extra zeros.
|
||||
//
|
||||
// You must supply the total size of your input.
|
||||
// 'ErrShortData' will be returned if it is unable to retrieve the number of bytes
|
||||
// indicated.
|
||||
Split(data io.Reader, dst []io.Writer, size int64) (err error)
|
||||
|
||||
// Join the shards and write the data segment to dst.
|
||||
//
|
||||
// Only the data shards are considered.
|
||||
//
|
||||
// You must supply the exact output size you want.
|
||||
// If there are to few shards given, ErrTooFewShards will be returned.
|
||||
// If the total data size is less than outSize, ErrShortData will be returned.
|
||||
Join(dst io.Writer, shards []io.Reader, outSize int64) error
|
||||
}
|
||||
|
||||
// StreamReadError is returned when a read error is encountered
|
||||
// that relates to a supplied stream.
|
||||
// This will allow you to find out which reader has failed.
|
||||
type StreamReadError struct {
|
||||
Err error // The error
|
||||
Stream int // The stream number on which the error occurred
|
||||
}
|
||||
|
||||
// Error returns the error as a string
|
||||
func (s StreamReadError) Error() string {
|
||||
return fmt.Sprintf("error reading stream %d: %s", s.Stream, s.Err)
|
||||
}
|
||||
|
||||
// String returns the error as a string
|
||||
func (s StreamReadError) String() string {
|
||||
return s.Error()
|
||||
}
|
||||
|
||||
// StreamWriteError is returned when a write error is encountered
|
||||
// that relates to a supplied stream. This will allow you to
|
||||
// find out which reader has failed.
|
||||
type StreamWriteError struct {
|
||||
Err error // The error
|
||||
Stream int // The stream number on which the error occurred
|
||||
}
|
||||
|
||||
// Error returns the error as a string
|
||||
func (s StreamWriteError) Error() string {
|
||||
return fmt.Sprintf("error writing stream %d: %s", s.Stream, s.Err)
|
||||
}
|
||||
|
||||
// String returns the error as a string
|
||||
func (s StreamWriteError) String() string {
|
||||
return s.Error()
|
||||
}
|
||||
|
||||
// rsStream contains a matrix for a specific
|
||||
// distribution of datashards and parity shards.
|
||||
// Construct if using NewStream()
|
||||
type rsStream struct {
|
||||
r *reedSolomon
|
||||
bs int // Block size
|
||||
// Shard reader
|
||||
readShards func(dst [][]byte, in []io.Reader) error
|
||||
// Shard writer
|
||||
writeShards func(out []io.Writer, in [][]byte) error
|
||||
creads bool
|
||||
cwrites bool
|
||||
}
|
||||
|
||||
// NewStream creates a new encoder and initializes it to
|
||||
// the number of data shards and parity shards that
|
||||
// you want to use. You can reuse this encoder.
|
||||
// Note that the maximum number of data shards is 256.
|
||||
func NewStream(dataShards, parityShards int) (StreamEncoder, error) {
|
||||
enc, err := New(dataShards, parityShards)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rs := enc.(*reedSolomon)
|
||||
r := rsStream{r: rs, bs: 4 << 20}
|
||||
r.readShards = readShards
|
||||
r.writeShards = writeShards
|
||||
return &r, err
|
||||
}
|
||||
|
||||
// NewStreamC creates a new encoder and initializes it to
|
||||
// the number of data shards and parity shards given.
|
||||
//
|
||||
// This functions as 'NewStream', but allows you to enable CONCURRENT reads and writes.
|
||||
func NewStreamC(dataShards, parityShards int, conReads, conWrites bool) (StreamEncoder, error) {
|
||||
enc, err := New(dataShards, parityShards)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rs := enc.(*reedSolomon)
|
||||
r := rsStream{r: rs, bs: 4 << 20}
|
||||
r.readShards = readShards
|
||||
r.writeShards = writeShards
|
||||
if conReads {
|
||||
r.readShards = cReadShards
|
||||
}
|
||||
if conWrites {
|
||||
r.writeShards = cWriteShards
|
||||
}
|
||||
return &r, err
|
||||
}
|
||||
|
||||
func createSlice(n, length int) [][]byte {
|
||||
out := make([][]byte, n)
|
||||
for i := range out {
|
||||
out[i] = make([]byte, length)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// Encodes parity shards for a set of data shards.
|
||||
//
|
||||
// Input is 'shards' containing readers for data shards followed by parity shards
|
||||
// io.Writer.
|
||||
//
|
||||
// The number of shards must match the number given to NewStream().
|
||||
//
|
||||
// Each reader must supply the same number of bytes.
|
||||
//
|
||||
// The parity shards will be written to the writer.
|
||||
// The number of bytes written will match the input size.
|
||||
//
|
||||
// If a data stream returns an error, a StreamReadError type error
|
||||
// will be returned. If a parity writer returns an error, a
|
||||
// StreamWriteError will be returned.
|
||||
func (r rsStream) Encode(data []io.Reader, parity []io.Writer) error {
|
||||
if len(data) != r.r.DataShards {
|
||||
return ErrTooFewShards
|
||||
}
|
||||
|
||||
if len(parity) != r.r.ParityShards {
|
||||
return ErrTooFewShards
|
||||
}
|
||||
|
||||
all := createSlice(r.r.Shards, r.bs)
|
||||
in := all[:r.r.DataShards]
|
||||
out := all[r.r.DataShards:]
|
||||
read := 0
|
||||
|
||||
for {
|
||||
err := r.readShards(in, data)
|
||||
switch err {
|
||||
case nil:
|
||||
case io.EOF:
|
||||
if read == 0 {
|
||||
return ErrShardNoData
|
||||
}
|
||||
return nil
|
||||
default:
|
||||
return err
|
||||
}
|
||||
out = trimShards(out, shardSize(in))
|
||||
read += shardSize(in)
|
||||
err = r.r.Encode(all)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = r.writeShards(parity, out)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Trim the shards so they are all the same size
|
||||
func trimShards(in [][]byte, size int) [][]byte {
|
||||
for i := range in {
|
||||
if in[i] != nil {
|
||||
in[i] = in[i][0:size]
|
||||
}
|
||||
if len(in[i]) < size {
|
||||
in[i] = nil
|
||||
}
|
||||
}
|
||||
return in
|
||||
}
|
||||
|
||||
func readShards(dst [][]byte, in []io.Reader) error {
|
||||
if len(in) != len(dst) {
|
||||
panic("internal error: in and dst size does not match")
|
||||
}
|
||||
size := -1
|
||||
for i := range in {
|
||||
if in[i] == nil {
|
||||
dst[i] = nil
|
||||
continue
|
||||
}
|
||||
n, err := io.ReadFull(in[i], dst[i])
|
||||
// The error is EOF only if no bytes were read.
|
||||
// If an EOF happens after reading some but not all the bytes,
|
||||
// ReadFull returns ErrUnexpectedEOF.
|
||||
switch err {
|
||||
case io.ErrUnexpectedEOF, io.EOF:
|
||||
if size < 0 {
|
||||
size = n
|
||||
} else if n != size {
|
||||
// Shard sizes must match.
|
||||
return ErrShardSize
|
||||
}
|
||||
dst[i] = dst[i][0:n]
|
||||
case nil:
|
||||
continue
|
||||
default:
|
||||
return StreamReadError{Err: err, Stream: i}
|
||||
}
|
||||
}
|
||||
if size == 0 {
|
||||
return io.EOF
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func writeShards(out []io.Writer, in [][]byte) error {
|
||||
if len(out) != len(in) {
|
||||
panic("internal error: in and out size does not match")
|
||||
}
|
||||
for i := range in {
|
||||
if out[i] == nil {
|
||||
continue
|
||||
}
|
||||
n, err := out[i].Write(in[i])
|
||||
if err != nil {
|
||||
return StreamWriteError{Err: err, Stream: i}
|
||||
}
|
||||
//
|
||||
if n != len(in[i]) {
|
||||
return StreamWriteError{Err: io.ErrShortWrite, Stream: i}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type readResult struct {
|
||||
n int
|
||||
size int
|
||||
err error
|
||||
}
|
||||
|
||||
// cReadShards reads shards concurrently
|
||||
func cReadShards(dst [][]byte, in []io.Reader) error {
|
||||
if len(in) != len(dst) {
|
||||
panic("internal error: in and dst size does not match")
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(in))
|
||||
res := make(chan readResult, len(in))
|
||||
for i := range in {
|
||||
if in[i] == nil {
|
||||
dst[i] = nil
|
||||
wg.Done()
|
||||
continue
|
||||
}
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
n, err := io.ReadFull(in[i], dst[i])
|
||||
// The error is EOF only if no bytes were read.
|
||||
// If an EOF happens after reading some but not all the bytes,
|
||||
// ReadFull returns ErrUnexpectedEOF.
|
||||
res <- readResult{size: n, err: err, n: i}
|
||||
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
close(res)
|
||||
size := -1
|
||||
for r := range res {
|
||||
switch r.err {
|
||||
case io.ErrUnexpectedEOF, io.EOF:
|
||||
if size < 0 {
|
||||
size = r.size
|
||||
} else if r.size != size {
|
||||
// Shard sizes must match.
|
||||
return ErrShardSize
|
||||
}
|
||||
dst[r.n] = dst[r.n][0:r.size]
|
||||
case nil:
|
||||
default:
|
||||
return StreamReadError{Err: r.err, Stream: r.n}
|
||||
}
|
||||
}
|
||||
if size == 0 {
|
||||
return io.EOF
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// cWriteShards writes shards concurrently
|
||||
func cWriteShards(out []io.Writer, in [][]byte) error {
|
||||
if len(out) != len(in) {
|
||||
panic("internal error: in and out size does not match")
|
||||
}
|
||||
var errs = make(chan error, len(out))
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(out))
|
||||
for i := range in {
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
if out[i] == nil {
|
||||
errs <- nil
|
||||
return
|
||||
}
|
||||
n, err := out[i].Write(in[i])
|
||||
if err != nil {
|
||||
errs <- StreamWriteError{Err: err, Stream: i}
|
||||
return
|
||||
}
|
||||
if n != len(in[i]) {
|
||||
errs <- StreamWriteError{Err: io.ErrShortWrite, Stream: i}
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
close(errs)
|
||||
for err := range errs {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Verify returns true if the parity shards contain correct data.
|
||||
//
|
||||
// The number of shards must match the number total data+parity shards
|
||||
// given to NewStream().
|
||||
//
|
||||
// Each reader must supply the same number of bytes.
|
||||
// If a shard stream returns an error, a StreamReadError type error
|
||||
// will be returned.
|
||||
func (r rsStream) Verify(shards []io.Reader) (bool, error) {
|
||||
if len(shards) != r.r.Shards {
|
||||
return false, ErrTooFewShards
|
||||
}
|
||||
|
||||
read := 0
|
||||
all := createSlice(r.r.Shards, r.bs)
|
||||
for {
|
||||
err := r.readShards(all, shards)
|
||||
if err == io.EOF {
|
||||
if read == 0 {
|
||||
return false, ErrShardNoData
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
read += shardSize(all)
|
||||
ok, err := r.r.Verify(all)
|
||||
if !ok || err != nil {
|
||||
return ok, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ErrReconstructMismatch is returned by the StreamEncoder, if you supply
|
||||
// "valid" and "fill" streams on the same index.
|
||||
// Therefore it is impossible to see if you consider the shard valid
|
||||
// or would like to have it reconstructed.
|
||||
var ErrReconstructMismatch = errors.New("valid shards and fill shards are mutually exclusive")
|
||||
|
||||
// Reconstruct will recreate the missing shards if possible.
|
||||
//
|
||||
// Given a list of valid shards (to read) and invalid shards (to write)
|
||||
//
|
||||
// You indicate that a shard is missing by setting it to nil in the 'valid'
|
||||
// slice and at the same time setting a non-nil writer in "fill".
|
||||
// An index cannot contain both non-nil 'valid' and 'fill' entry.
|
||||
//
|
||||
// If there are too few shards to reconstruct the missing
|
||||
// ones, ErrTooFewShards will be returned.
|
||||
//
|
||||
// The reconstructed shard set is complete, but integrity is not verified.
|
||||
// Use the Verify function to check if data set is ok.
|
||||
func (r rsStream) Reconstruct(valid []io.Reader, fill []io.Writer) error {
|
||||
if len(valid) != r.r.Shards {
|
||||
return ErrTooFewShards
|
||||
}
|
||||
if len(fill) != r.r.Shards {
|
||||
return ErrTooFewShards
|
||||
}
|
||||
|
||||
all := createSlice(r.r.Shards, r.bs)
|
||||
for i := range valid {
|
||||
if valid[i] != nil && fill[i] != nil {
|
||||
return ErrReconstructMismatch
|
||||
}
|
||||
}
|
||||
|
||||
read := 0
|
||||
for {
|
||||
err := r.readShards(all, valid)
|
||||
if err == io.EOF {
|
||||
if read == 0 {
|
||||
return ErrShardNoData
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
read += shardSize(all)
|
||||
all = trimShards(all, shardSize(all))
|
||||
|
||||
err = r.r.Reconstruct(all)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = r.writeShards(fill, all)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Join the shards and write the data segment to dst.
|
||||
//
|
||||
// Only the data shards are considered.
|
||||
//
|
||||
// You must supply the exact output size you want.
|
||||
// If there are to few shards given, ErrTooFewShards will be returned.
|
||||
// If the total data size is less than outSize, ErrShortData will be returned.
|
||||
func (r rsStream) Join(dst io.Writer, shards []io.Reader, outSize int64) error {
|
||||
// Do we have enough shards?
|
||||
if len(shards) < r.r.DataShards {
|
||||
return ErrTooFewShards
|
||||
}
|
||||
|
||||
// Trim off parity shards if any
|
||||
shards = shards[:r.r.DataShards]
|
||||
for i := range shards {
|
||||
if shards[i] == nil {
|
||||
return StreamReadError{Err: ErrShardNoData, Stream: i}
|
||||
}
|
||||
}
|
||||
// Join all shards
|
||||
src := io.MultiReader(shards...)
|
||||
|
||||
// Copy data to dst
|
||||
n, err := io.CopyN(dst, src, outSize)
|
||||
if err == io.EOF {
|
||||
return ErrShortData
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if n != outSize {
|
||||
return ErrShortData
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Split a an input stream into the number of shards given to the encoder.
|
||||
//
|
||||
// The data will be split into equally sized shards.
|
||||
// If the data size isn't dividable by the number of shards,
|
||||
// the last shard will contain extra zeros.
|
||||
//
|
||||
// You must supply the total size of your input.
|
||||
// 'ErrShortData' will be returned if it is unable to retrieve the number of bytes
|
||||
// indicated.
|
||||
func (r rsStream) Split(data io.Reader, dst []io.Writer, size int64) error {
|
||||
if size < int64(r.r.DataShards) {
|
||||
return ErrShortData
|
||||
}
|
||||
|
||||
if len(dst) != r.r.DataShards {
|
||||
return ErrInvShardNum
|
||||
}
|
||||
|
||||
for i := range dst {
|
||||
if dst[i] == nil {
|
||||
return StreamWriteError{Err: ErrShardNoData, Stream: i}
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate number of bytes per shard.
|
||||
perShard := (size + int64(r.r.DataShards) - 1) / int64(r.r.DataShards)
|
||||
|
||||
// Pad data to r.Shards*perShard.
|
||||
padding := make([]byte, (int64(r.r.Shards)*perShard)-size)
|
||||
data = io.MultiReader(data, bytes.NewBuffer(padding))
|
||||
|
||||
// Split into equal-length shards and copy.
|
||||
for i := range dst {
|
||||
n, err := io.CopyN(dst[i], data, perShard)
|
||||
if err != io.EOF && err != nil {
|
||||
return err
|
||||
}
|
||||
if n != perShard {
|
||||
return ErrShortData
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
5
vendor/vendor.json
vendored
5
vendor/vendor.json
vendored
@ -62,6 +62,11 @@
|
||||
"revision": "349c675778172472f5e8f3a3e0fe187e302e5a10",
|
||||
"revisionTime": "2016-01-06T11:44:51+01:00"
|
||||
},
|
||||
{
|
||||
"path": "github.com/klauspost/reedsolomon",
|
||||
"revision": "d1fe8adc280ef4cd7883943f15a1b5b085a5cced",
|
||||
"revisionTime": "2016-01-11T14:44:57+01:00"
|
||||
},
|
||||
{
|
||||
"path": "github.com/mattn/go-colorable",
|
||||
"revision": "9cbef7c35391cca05f15f8181dc0b18bc9736dbb",
|
||||
|
206
xl-v1-createfile.go
Normal file
206
xl-v1-createfile.go
Normal file
@ -0,0 +1,206 @@
|
||||
/*
|
||||
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"hash"
|
||||
"io"
|
||||
slashpath "path"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
fastSha512 "github.com/minio/minio/pkg/crypto/sha512"
|
||||
)
|
||||
|
||||
// Erasure block size.
|
||||
const erasureBlockSize = 4 * 1024 * 1024 // 4MiB.
|
||||
|
||||
// Close and remove writers if they are safeFile.
|
||||
func closeAndRemoveWriters(writers ...io.WriteCloser) {
|
||||
for _, writer := range writers {
|
||||
safeCloseAndRemove(writer)
|
||||
}
|
||||
}
|
||||
|
||||
// WriteErasure reads predefined blocks, encodes them and writes to
|
||||
// configured storage disks.
|
||||
func (xl XL) writeErasure(volume, path string, reader *io.PipeReader) {
|
||||
var writers = make([]io.WriteCloser, len(xl.storageDisks))
|
||||
var sha512Writers = make([]hash.Hash, len(xl.storageDisks))
|
||||
var metadataWriters = make([]io.WriteCloser, len(xl.storageDisks))
|
||||
|
||||
// Initialize storage disks, get all the writers and corresponding
|
||||
// metadata writers.
|
||||
for index, disk := range xl.storageDisks {
|
||||
var err error
|
||||
erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", index))
|
||||
writers[index], err = disk.CreateFile(volume, erasurePart)
|
||||
if err != nil {
|
||||
// Remove previous temp writers for any failure.
|
||||
closeAndRemoveWriters(writers...)
|
||||
closeAndRemoveWriters(metadataWriters...)
|
||||
deletePathAll(volume, path, xl.storageDisks...)
|
||||
reader.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
metadataFilePath := slashpath.Join(path, metadataFile)
|
||||
metadataWriters[index], err = disk.CreateFile(volume, metadataFilePath)
|
||||
if err != nil {
|
||||
// Remove previous temp writers for any failure.
|
||||
closeAndRemoveWriters(writers...)
|
||||
closeAndRemoveWriters(metadataWriters...)
|
||||
deletePathAll(volume, path, xl.storageDisks...)
|
||||
reader.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
sha512Writers[index] = fastSha512.New()
|
||||
}
|
||||
|
||||
// Allocate 4MiB block size buffer for reading.
|
||||
buffer := make([]byte, erasureBlockSize)
|
||||
var totalSize int64 // Saves total incoming stream size.
|
||||
for {
|
||||
// Read up to allocated block size.
|
||||
n, err := io.ReadFull(reader, buffer)
|
||||
if err != nil {
|
||||
// Any unexpected errors, close the pipe reader with error.
|
||||
if err != io.ErrUnexpectedEOF && err != io.EOF {
|
||||
// Remove all temp writers.
|
||||
closeAndRemoveWriters(writers...)
|
||||
closeAndRemoveWriters(metadataWriters...)
|
||||
deletePathAll(volume, path, xl.storageDisks...)
|
||||
reader.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
// At EOF break out.
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if n > 0 {
|
||||
// Split the input buffer into data and parity blocks.
|
||||
var blocks [][]byte
|
||||
blocks, err = xl.ReedSolomon.Split(buffer[0:n])
|
||||
if err != nil {
|
||||
// Remove all temp writers.
|
||||
closeAndRemoveWriters(writers...)
|
||||
closeAndRemoveWriters(metadataWriters...)
|
||||
deletePathAll(volume, path, xl.storageDisks...)
|
||||
reader.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
// Encode parity blocks using data blocks.
|
||||
err = xl.ReedSolomon.Encode(blocks)
|
||||
if err != nil {
|
||||
// Remove all temp writers upon error.
|
||||
closeAndRemoveWriters(writers...)
|
||||
closeAndRemoveWriters(metadataWriters...)
|
||||
deletePathAll(volume, path, xl.storageDisks...)
|
||||
reader.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
// Loop through and write encoded data to all the disks.
|
||||
for index, encodedData := range blocks {
|
||||
_, err = writers[index].Write(encodedData)
|
||||
if err != nil {
|
||||
// Remove all temp writers upon error.
|
||||
closeAndRemoveWriters(writers...)
|
||||
closeAndRemoveWriters(metadataWriters...)
|
||||
deletePathAll(volume, path, xl.storageDisks...)
|
||||
reader.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
sha512Writers[index].Write(encodedData)
|
||||
}
|
||||
// Update total written.
|
||||
totalSize += int64(n)
|
||||
}
|
||||
}
|
||||
|
||||
// Save additional erasureMetadata.
|
||||
modTime := time.Now().UTC()
|
||||
|
||||
// Initialize metadata map, save all erasure related metadata.
|
||||
metadata := make(map[string]string)
|
||||
metadata["version"] = minioVersion
|
||||
metadata["format.major"] = "1"
|
||||
metadata["format.minor"] = "0"
|
||||
metadata["format.patch"] = "0"
|
||||
metadata["file.size"] = strconv.FormatInt(totalSize, 10)
|
||||
metadata["file.modTime"] = modTime.Format(timeFormatAMZ)
|
||||
metadata["file.xl.blockSize"] = strconv.Itoa(erasureBlockSize)
|
||||
metadata["file.xl.dataBlocks"] = strconv.Itoa(xl.DataBlocks)
|
||||
metadata["file.xl.parityBlocks"] = strconv.Itoa(xl.ParityBlocks)
|
||||
|
||||
// Write all the metadata.
|
||||
for index, metadataWriter := range metadataWriters {
|
||||
// Save sha512 checksum of each encoded blocks.
|
||||
metadata["file.xl.block512Sum"] = hex.EncodeToString(sha512Writers[index].Sum(nil))
|
||||
|
||||
// Marshal metadata into json strings.
|
||||
metadataBytes, err := json.Marshal(metadata)
|
||||
if err != nil {
|
||||
closeAndRemoveWriters(writers...)
|
||||
closeAndRemoveWriters(metadataWriters...)
|
||||
deletePathAll(volume, path, xl.storageDisks...)
|
||||
reader.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
|
||||
_, err = metadataWriter.Write(metadataBytes)
|
||||
if err != nil {
|
||||
closeAndRemoveWriters(writers...)
|
||||
closeAndRemoveWriters(metadataWriters...)
|
||||
deletePathAll(volume, path, xl.storageDisks...)
|
||||
reader.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Close all writers and metadata writers in routines.
|
||||
for index := range xl.storageDisks {
|
||||
// Safely wrote, now rename to its actual location.
|
||||
writers[index].Close()
|
||||
metadataWriters[index].Close()
|
||||
}
|
||||
|
||||
// Close the pipe reader and return.
|
||||
reader.Close()
|
||||
return
|
||||
}
|
||||
|
||||
// CreateFile - create a file.
|
||||
func (xl XL) CreateFile(volume, path string) (writeCloser io.WriteCloser, err error) {
|
||||
if !isValidVolname(volume) {
|
||||
return nil, errInvalidArgument
|
||||
}
|
||||
if !isValidPath(path) {
|
||||
return nil, errInvalidArgument
|
||||
}
|
||||
|
||||
// Initialize pipe for data pipe line.
|
||||
pipeReader, pipeWriter := io.Pipe()
|
||||
|
||||
// Start erasure encoding in routine, reading data block by block from pipeReader.
|
||||
go xl.writeErasure(volume, path, pipeReader)
|
||||
|
||||
// Return the piped writer, caller should start writing to this.
|
||||
return pipeWriter, nil
|
||||
}
|
169
xl-v1-readfile.go
Normal file
169
xl-v1-readfile.go
Normal file
@ -0,0 +1,169 @@
|
||||
/*
|
||||
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
slashpath "path"
|
||||
)
|
||||
|
||||
// checkBlockSize return the size of a single block.
|
||||
// The first non-zero size is returned,
|
||||
// or 0 if all blocks are size 0.
|
||||
func checkBlockSize(blocks [][]byte) int {
|
||||
for _, block := range blocks {
|
||||
if len(block) != 0 {
|
||||
return len(block)
|
||||
}
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// calculate the blockSize based on input length and total number of
|
||||
// data blocks.
|
||||
func getEncodedBlockLen(inputLen, dataBlocks int) (curBlockSize int) {
|
||||
curBlockSize = (inputLen + dataBlocks - 1) / dataBlocks
|
||||
return
|
||||
}
|
||||
|
||||
// ReadFile - read file
|
||||
func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) {
|
||||
// Input validation.
|
||||
if !isValidVolname(volume) {
|
||||
return nil, errInvalidArgument
|
||||
}
|
||||
if !isValidPath(path) {
|
||||
return nil, errInvalidArgument
|
||||
}
|
||||
|
||||
// Initialize all readers.
|
||||
var readers = make([]io.ReadCloser, len(xl.storageDisks))
|
||||
|
||||
// Extract metadata.
|
||||
metadata, err := xl.extractMetadata(volume, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Loop through and verify if all metadata files are in-tact.
|
||||
for index, disk := range xl.storageDisks {
|
||||
offset := int64(0)
|
||||
erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", index))
|
||||
var erasuredPartReader io.ReadCloser
|
||||
erasuredPartReader, err = disk.ReadFile(volume, erasurePart, offset)
|
||||
if err != nil {
|
||||
// One of parts not found, we need to re-construct.
|
||||
if err == errFileNotFound {
|
||||
readers[index] = nil
|
||||
continue
|
||||
}
|
||||
// For all other errors return to the caller.
|
||||
return nil, err
|
||||
}
|
||||
readers[index] = erasuredPartReader
|
||||
}
|
||||
totalBlocks := xl.DataBlocks + xl.ParityBlocks // Total blocks.
|
||||
|
||||
// Initialize pipe.
|
||||
pipeReader, pipeWriter := io.Pipe()
|
||||
go func() {
|
||||
var totalLeft = metadata.Size
|
||||
// Read until the totalLeft.
|
||||
for totalLeft > 0 {
|
||||
// Figure out the right blockSize as it was encoded before.
|
||||
var curBlockSize int
|
||||
if erasureBlockSize < totalLeft {
|
||||
curBlockSize = erasureBlockSize
|
||||
} else {
|
||||
curBlockSize = int(totalLeft)
|
||||
}
|
||||
// Calculate the current encoded block size.
|
||||
curEncBlockSize := getEncodedBlockLen(curBlockSize, xl.DataBlocks)
|
||||
enBlocks := make([][]byte, totalBlocks)
|
||||
// Loop through all readers and read.
|
||||
for index, reader := range readers {
|
||||
if reader == nil {
|
||||
// One of files missing, save it for reconstruction.
|
||||
enBlocks[index] = nil
|
||||
continue
|
||||
}
|
||||
// Initialize shard slice and fill the data from each parts.
|
||||
enBlocks[index] = make([]byte, curEncBlockSize)
|
||||
_, err = io.ReadFull(reader, enBlocks[index])
|
||||
if err != nil && err != io.ErrUnexpectedEOF {
|
||||
enBlocks[index] = nil
|
||||
}
|
||||
}
|
||||
|
||||
// TODO need to verify block512Sum.
|
||||
|
||||
// Check blocks if they are all zero in length.
|
||||
if checkBlockSize(enBlocks) == 0 {
|
||||
err = errors.New("Data likely corrupted, all blocks are zero in length.")
|
||||
pipeWriter.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
// Verify the blocks.
|
||||
var ok bool
|
||||
ok, err = xl.ReedSolomon.Verify(enBlocks)
|
||||
if err != nil {
|
||||
pipeWriter.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
// Verification failed, blocks require reconstruction.
|
||||
if !ok {
|
||||
err = xl.ReedSolomon.Reconstruct(enBlocks)
|
||||
if err != nil {
|
||||
pipeWriter.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
// Verify reconstructed blocks again.
|
||||
ok, err = xl.ReedSolomon.Verify(enBlocks)
|
||||
if err != nil {
|
||||
pipeWriter.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
if !ok {
|
||||
// Blocks cannot be reconstructed, corrupted data.
|
||||
err = errors.New("Verification failed after reconstruction, data likely corrupted.")
|
||||
pipeWriter.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
// Join the decoded blocks.
|
||||
err = xl.ReedSolomon.Join(pipeWriter, enBlocks, curBlockSize)
|
||||
if err != nil {
|
||||
pipeWriter.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
// Save what's left after reading erasureBlockSize.
|
||||
totalLeft = totalLeft - erasureBlockSize
|
||||
}
|
||||
// Cleanly end the pipe after a successful decoding.
|
||||
pipeWriter.Close()
|
||||
|
||||
// Cleanly close all the underlying data readers.
|
||||
for _, reader := range readers {
|
||||
reader.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
// Return the pipe for the top level caller to start reading.
|
||||
return pipeReader, nil
|
||||
}
|
402
xl-v1.go
Normal file
402
xl-v1.go
Normal file
@ -0,0 +1,402 @@
|
||||
/*
|
||||
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
slashpath "path"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/klauspost/reedsolomon"
|
||||
)
|
||||
|
||||
// XL layer structure.
|
||||
type XL struct {
|
||||
ReedSolomon reedsolomon.Encoder // Erasure encoder/decoder.
|
||||
DataBlocks int
|
||||
ParityBlocks int
|
||||
storageDisks []StorageAPI
|
||||
}
|
||||
|
||||
const (
|
||||
// Part metadata file.
|
||||
metadataFile = "part.json"
|
||||
// Maximum erasure blocks.
|
||||
maxErasureBlocks = 16
|
||||
)
|
||||
|
||||
// newXL instantiate a new XL.
|
||||
func newXL(disks ...string) (StorageAPI, error) {
|
||||
// Initialize XL.
|
||||
xl := &XL{}
|
||||
|
||||
// Verify disks.
|
||||
totalDisks := len(disks)
|
||||
if totalDisks > maxErasureBlocks {
|
||||
return nil, errors.New("Total number of disks specified is higher than supported maximum of '16'")
|
||||
}
|
||||
|
||||
// isEven function to verify if a given number if even.
|
||||
isEven := func(number int) bool {
|
||||
return number%2 == 0
|
||||
}
|
||||
|
||||
// TODO: verify if this makes sense in future.
|
||||
if !isEven(totalDisks) {
|
||||
return nil, errors.New("Invalid number of directories provided, should be always multiples of '2'")
|
||||
}
|
||||
|
||||
// Calculate data and parity blocks.
|
||||
dataBlocks, parityBlocks := totalDisks/2, totalDisks/2
|
||||
|
||||
// Initialize reed solomon encoding.
|
||||
rs, err := reedsolomon.New(dataBlocks, parityBlocks)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Save the reedsolomon.
|
||||
xl.ReedSolomon = rs
|
||||
xl.DataBlocks = dataBlocks
|
||||
xl.ParityBlocks = parityBlocks
|
||||
|
||||
// Initialize all storage disks.
|
||||
storageDisks := make([]StorageAPI, len(disks))
|
||||
for index, disk := range disks {
|
||||
var err error
|
||||
storageDisks[index], err = newFS(disk)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Save all the initialized storage disks.
|
||||
xl.storageDisks = storageDisks
|
||||
|
||||
// Return successfully initialized.
|
||||
return xl, nil
|
||||
}
|
||||
|
||||
// MakeVol - make a volume.
|
||||
func (xl XL) MakeVol(volume string) error {
|
||||
if !isValidVolname(volume) {
|
||||
return errInvalidArgument
|
||||
}
|
||||
// Make a volume entry on all underlying storage disks.
|
||||
for _, disk := range xl.storageDisks {
|
||||
if err := disk.MakeVol(volume); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteVol - delete a volume.
|
||||
func (xl XL) DeleteVol(volume string) error {
|
||||
if !isValidVolname(volume) {
|
||||
return errInvalidArgument
|
||||
}
|
||||
for _, disk := range xl.storageDisks {
|
||||
if err := disk.DeleteVol(volume); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListVols - list volumes.
|
||||
func (xl XL) ListVols() (volsInfo []VolInfo, err error) {
|
||||
// Pick the first node and list there always.
|
||||
disk := xl.storageDisks[0]
|
||||
volsInfo, err = disk.ListVols()
|
||||
if err == nil {
|
||||
return volsInfo, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// StatVol - get volume stat info.
|
||||
func (xl XL) StatVol(volume string) (volInfo VolInfo, err error) {
|
||||
if !isValidVolname(volume) {
|
||||
return VolInfo{}, errInvalidArgument
|
||||
}
|
||||
// Pick the first node and list there always.
|
||||
disk := xl.storageDisks[0]
|
||||
volInfo, err = disk.StatVol(volume)
|
||||
if err == nil {
|
||||
return volInfo, nil
|
||||
}
|
||||
return VolInfo{}, err
|
||||
}
|
||||
|
||||
// isLeafDirectory - check if a given path is leaf directory. i.e
|
||||
// there are no more directories inside it. Erasure code backend
|
||||
// format it means that the parent directory is the actual object name.
|
||||
func (xl XL) isLeafDirectory(volume, leafPath string) (isLeaf bool) {
|
||||
var allFileInfos []FileInfo
|
||||
for {
|
||||
fileInfos, eof, e := xl.storageDisks[0].ListFiles(volume, leafPath, "", false, 1000)
|
||||
if e != nil {
|
||||
break
|
||||
}
|
||||
allFileInfos = append(allFileInfos, fileInfos...)
|
||||
if eof {
|
||||
break
|
||||
}
|
||||
}
|
||||
for _, fileInfo := range allFileInfos {
|
||||
if fileInfo.Mode.IsDir() {
|
||||
// Directory found, not a leaf directory, return right here.
|
||||
isLeaf = false
|
||||
return isLeaf
|
||||
}
|
||||
}
|
||||
// Exhausted all the entries, no directories found must be leaf
|
||||
// return right here.
|
||||
isLeaf = true
|
||||
return isLeaf
|
||||
}
|
||||
|
||||
// fileMetadata - file metadata is a structured representation of the
|
||||
// unmarshalled metadata file.
|
||||
type fileMetadata struct {
|
||||
Size int64
|
||||
ModTime time.Time
|
||||
BlockSize int64
|
||||
Block512Sum string
|
||||
DataBlocks int
|
||||
ParityBlocks int
|
||||
}
|
||||
|
||||
// extractMetadata - extract file metadata.
|
||||
func (xl XL) extractMetadata(volume, path string) (fileMetadata, error) {
|
||||
metadataFilePath := slashpath.Join(path, metadataFile)
|
||||
// We are not going to read partial data from metadata file,
|
||||
// read the whole file always.
|
||||
offset := int64(0)
|
||||
disk := xl.storageDisks[0]
|
||||
metadataReader, err := disk.ReadFile(volume, metadataFilePath, offset)
|
||||
if err != nil {
|
||||
return fileMetadata{}, err
|
||||
}
|
||||
// Close metadata reader.
|
||||
defer metadataReader.Close()
|
||||
|
||||
var metadata = make(map[string]string)
|
||||
decoder := json.NewDecoder(metadataReader)
|
||||
// Unmarshalling failed, file possibly corrupted.
|
||||
if err = decoder.Decode(&metadata); err != nil {
|
||||
return fileMetadata{}, err
|
||||
}
|
||||
modTime, err := time.Parse(timeFormatAMZ, metadata["file.modTime"])
|
||||
if err != nil {
|
||||
return fileMetadata{}, err
|
||||
}
|
||||
|
||||
// Verify if size is parsable.
|
||||
var size int64
|
||||
size, err = strconv.ParseInt(metadata["file.size"], 10, 64)
|
||||
if err != nil {
|
||||
return fileMetadata{}, err
|
||||
}
|
||||
|
||||
// Verify if block size is parsable.
|
||||
var blockSize int64
|
||||
blockSize, err = strconv.ParseInt(metadata["file.xl.blockSize"], 10, 64)
|
||||
if err != nil {
|
||||
return fileMetadata{}, err
|
||||
}
|
||||
|
||||
// Verify if data blocks and parity blocks are parsable.
|
||||
var dataBlocks, parityBlocks int
|
||||
dataBlocks, err = strconv.Atoi(metadata["file.xl.dataBlocks"])
|
||||
if err != nil {
|
||||
return fileMetadata{}, err
|
||||
}
|
||||
parityBlocks, err = strconv.Atoi(metadata["file.xl.parityBlocks"])
|
||||
if err != nil {
|
||||
return fileMetadata{}, err
|
||||
}
|
||||
|
||||
// Verify if sha512sum is of proper hex format.
|
||||
sha512Sum := metadata["file.xl.block512Sum"]
|
||||
_, err = hex.DecodeString(sha512Sum)
|
||||
if err != nil {
|
||||
return fileMetadata{}, err
|
||||
}
|
||||
|
||||
// Return the concocted metadata.
|
||||
return fileMetadata{
|
||||
Size: size,
|
||||
ModTime: modTime,
|
||||
BlockSize: blockSize,
|
||||
Block512Sum: sha512Sum,
|
||||
DataBlocks: dataBlocks,
|
||||
ParityBlocks: parityBlocks,
|
||||
}, nil
|
||||
}
|
||||
|
||||
const (
|
||||
slashSeparator = "/"
|
||||
)
|
||||
|
||||
// retainSlash - retains slash from a path.
|
||||
func retainSlash(path string) string {
|
||||
return strings.TrimSuffix(path, slashSeparator) + slashSeparator
|
||||
}
|
||||
|
||||
// byFileInfoName is a collection satisfying sort.Interface.
|
||||
type byFileInfoName []FileInfo
|
||||
|
||||
func (d byFileInfoName) Len() int { return len(d) }
|
||||
func (d byFileInfoName) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
|
||||
func (d byFileInfoName) Less(i, j int) bool { return d[i].Name < d[j].Name }
|
||||
|
||||
// ListFiles files at prefix.
|
||||
func (xl XL) ListFiles(volume, prefix, marker string, recursive bool, count int) (filesInfo []FileInfo, eof bool, err error) {
|
||||
if !isValidVolname(volume) {
|
||||
return nil, true, errInvalidArgument
|
||||
}
|
||||
// Pick the first disk and list there always.
|
||||
disk := xl.storageDisks[0]
|
||||
var fsFilesInfo []FileInfo
|
||||
var markerPath = marker
|
||||
if marker != "" {
|
||||
isLeaf := xl.isLeafDirectory(volume, retainSlash(marker))
|
||||
if isLeaf {
|
||||
// For leaf for now we just point to the first block, make it
|
||||
// dynamic in future based on the availability of storage disks.
|
||||
markerPath = slashpath.Join(marker, "part.0")
|
||||
}
|
||||
}
|
||||
|
||||
// Extract file info from paths.
|
||||
extractFileInfo := func(volume, path string) (FileInfo, error) {
|
||||
var fileInfo = FileInfo{}
|
||||
var metadata fileMetadata
|
||||
fileInfo.Name = slashpath.Dir(path)
|
||||
metadata, err = xl.extractMetadata(volume, fileInfo.Name)
|
||||
if err != nil {
|
||||
return FileInfo{}, err
|
||||
}
|
||||
fileInfo.Size = metadata.Size
|
||||
fileInfo.ModTime = metadata.ModTime
|
||||
fileInfo.Mode = os.FileMode(0644)
|
||||
return fileInfo, nil
|
||||
}
|
||||
|
||||
// List files.
|
||||
fsFilesInfo, eof, err = disk.ListFiles(volume, prefix, markerPath, recursive, count)
|
||||
if err != nil {
|
||||
return nil, true, err
|
||||
}
|
||||
|
||||
for _, fsFileInfo := range fsFilesInfo {
|
||||
// Skip metadata files.
|
||||
if strings.HasSuffix(fsFileInfo.Name, metadataFile) {
|
||||
continue
|
||||
}
|
||||
var fileInfo FileInfo
|
||||
var isLeaf bool
|
||||
if fsFileInfo.Mode.IsDir() {
|
||||
isLeaf = xl.isLeafDirectory(volume, fsFileInfo.Name)
|
||||
}
|
||||
if isLeaf || !fsFileInfo.Mode.IsDir() {
|
||||
fileInfo, err = extractFileInfo(volume, fsFileInfo.Name)
|
||||
if err != nil {
|
||||
// For a leaf directory, if err is FileNotFound then
|
||||
// perhaps has a missing metadata. Ignore it and let
|
||||
// healing finish its job it will become available soon.
|
||||
if err == errFileNotFound {
|
||||
continue
|
||||
}
|
||||
// For any other errors return to the caller.
|
||||
return nil, true, err
|
||||
}
|
||||
} else {
|
||||
fileInfo = fsFileInfo
|
||||
}
|
||||
filesInfo = append(filesInfo, fileInfo)
|
||||
}
|
||||
sort.Sort(byFileInfoName(filesInfo))
|
||||
return filesInfo, eof, nil
|
||||
}
|
||||
|
||||
// Object API.
|
||||
|
||||
// StatFile - stat a file
|
||||
func (xl XL) StatFile(volume, path string) (FileInfo, error) {
|
||||
if !isValidVolname(volume) {
|
||||
return FileInfo{}, errInvalidArgument
|
||||
}
|
||||
if !isValidPath(path) {
|
||||
return FileInfo{}, errInvalidArgument
|
||||
}
|
||||
|
||||
// Extract metadata.
|
||||
metadata, err := xl.extractMetadata(volume, path)
|
||||
if err != nil {
|
||||
return FileInfo{}, err
|
||||
}
|
||||
|
||||
// Return file info.
|
||||
return FileInfo{
|
||||
Volume: volume,
|
||||
Name: path,
|
||||
Size: metadata.Size,
|
||||
ModTime: metadata.ModTime,
|
||||
Mode: os.FileMode(0644),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Delete all path.
|
||||
func deletePathAll(volume, path string, disks ...StorageAPI) {
|
||||
for _, disk := range disks {
|
||||
disk.DeleteFile(volume, path)
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteFile - delete a file
|
||||
func (xl XL) DeleteFile(volume, path string) error {
|
||||
if !isValidVolname(volume) {
|
||||
return errInvalidArgument
|
||||
}
|
||||
if !isValidPath(path) {
|
||||
return errInvalidArgument
|
||||
}
|
||||
// Loop through and delete each chunks.
|
||||
for index, disk := range xl.storageDisks {
|
||||
erasureFilePart := slashpath.Join(path, fmt.Sprintf("part.%d", index))
|
||||
err := disk.DeleteFile(volume, erasureFilePart)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
metadataFilePath := slashpath.Join(path, metadataFile)
|
||||
err = disk.DeleteFile(volume, metadataFilePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
Loading…
Reference in New Issue
Block a user