diff --git a/api-router.go b/api-router.go index b263a20c3..987cf2d46 100644 --- a/api-router.go +++ b/api-router.go @@ -20,7 +20,7 @@ import router "github.com/gorilla/mux" // objectAPIHandler implements and provides http handlers for S3 API. type objectAPIHandlers struct { - ObjectAPI *objectAPI + ObjectAPI objectAPI } // registerAPIRouter - registers S3 compatible APIs. diff --git a/object-api.go b/object-api.go index 038ff8549..4bc3e1be2 100644 --- a/object-api.go +++ b/object-api.go @@ -33,8 +33,8 @@ type objectAPI struct { storage StorageAPI } -func newObjectLayer(storage StorageAPI) *objectAPI { - return &objectAPI{storage} +func newObjectLayer(storage StorageAPI) objectAPI { + return objectAPI{storage} } /// Bucket operations diff --git a/routers.go b/routers.go index 9f816c722..fe6b37dfb 100644 --- a/routers.go +++ b/routers.go @@ -26,7 +26,7 @@ import ( // configureServer handler returns final handler for the http server. func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler { - var storageHandlers StorageAPI + var storageAPI StorageAPI var e error if len(srvCmdConfig.exportPaths) == 1 { // Verify if export path is a local file system path. @@ -34,37 +34,40 @@ func configureServerHandler(srvCmdConfig serverCmdConfig) http.Handler { st, e = os.Stat(srvCmdConfig.exportPaths[0]) if e == nil && st.Mode().IsDir() { // Initialize storage API. - storageHandlers, e = newFS(srvCmdConfig.exportPaths[0]) + storageAPI, e = newFS(srvCmdConfig.exportPaths[0]) fatalIf(probe.NewError(e), "Initializing fs failed.", nil) } else { // Initialize network storage API. - storageHandlers, e = newNetworkFS(srvCmdConfig.exportPaths[0]) + storageAPI, e = newNetworkFS(srvCmdConfig.exportPaths[0]) fatalIf(probe.NewError(e), "Initializing network fs failed.", nil) } } else { // Initialize XL storage API. - storageHandlers, e = newXL(srvCmdConfig.exportPaths...) + storageAPI, e = newXL(srvCmdConfig.exportPaths...) fatalIf(probe.NewError(e), "Initializing XL failed.", nil) } // Initialize object layer. - objectAPI := newObjectLayer(storageHandlers) + objAPI := newObjectLayer(storageAPI) + + // Initialize storage rpc. + storageRPC := newStorageRPC(storageAPI) // Initialize API. apiHandlers := objectAPIHandlers{ - ObjectAPI: objectAPI, + ObjectAPI: objAPI, } // Initialize Web. webHandlers := &webAPIHandlers{ - ObjectAPI: objectAPI, + ObjectAPI: objAPI, } // Initialize router. mux := router.NewRouter() // Register all routers. - registerStorageRPCRouter(mux, storageHandlers) + registerStorageRPCRouter(mux, storageRPC) registerWebRouter(mux, webHandlers) registerAPIRouter(mux, apiHandlers) // Add new routers here. diff --git a/storage-rpc-server.go b/storage-rpc-server.go index de3cdcd43..60bac3121 100644 --- a/storage-rpc-server.go +++ b/storage-rpc-server.go @@ -99,11 +99,15 @@ func (s *storageServer) DeleteFileHandler(arg *DeleteFileArgs, reply *GenericRep return nil } -// registerStorageRPCRouter - register storage rpc router. -func registerStorageRPCRouter(mux *router.Router, storageAPI StorageAPI) { - stServer := &storageServer{ +// Initialize new storage rpc. +func newStorageRPC(storageAPI StorageAPI) *storageServer { + return &storageServer{ storage: storageAPI, } +} + +// registerStorageRPCRouter - register storage rpc router. +func registerStorageRPCRouter(mux *router.Router, stServer *storageServer) { storageRPCServer := rpc.NewServer() storageRPCServer.RegisterName("Storage", stServer) storageRouter := mux.NewRoute().PathPrefix(reservedBucket).Subrouter() diff --git a/web-router.go b/web-router.go index 698b6bd82..00d16221d 100644 --- a/web-router.go +++ b/web-router.go @@ -30,7 +30,7 @@ import ( // webAPI container for Web API. type webAPIHandlers struct { - ObjectAPI *objectAPI + ObjectAPI objectAPI } // indexHandler - Handler to serve index.html diff --git a/xl-v1-healfile.go b/xl-v1-healfile.go new file mode 100644 index 000000000..782cb441a --- /dev/null +++ b/xl-v1-healfile.go @@ -0,0 +1,256 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "encoding/json" + "errors" + "fmt" + "io" + slashpath "path" + "strconv" +) + +func (xl XL) selfHeal(volume string, path string) error { + totalShards := xl.DataBlocks + xl.ParityBlocks + needsSelfHeal := make([]bool, totalShards) + var metadata = make(map[string]string) + var readers = make([]io.Reader, totalShards) + var writers = make([]io.WriteCloser, totalShards) + for index, disk := range xl.storageDisks { + metadataFile := slashpath.Join(path, metadataFile) + + // Start from the beginning, we are not reading partial metadata files. + offset := int64(0) + + metadataReader, err := disk.ReadFile(volume, metadataFile, offset) + if err != nil { + if err != errFileNotFound { + continue + } + // Needs healing if part.json is not found + needsSelfHeal[index] = true + continue + } + defer metadataReader.Close() + + decoder := json.NewDecoder(metadataReader) + if err = decoder.Decode(&metadata); err != nil { + // needs healing if parts.json is not parsable + needsSelfHeal[index] = true + } + + erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", index)) + erasuredPartReader, err := disk.ReadFile(volume, erasurePart, offset) + if err != nil { + if err == errFileNotFound { + // Needs healing if part file not found + needsSelfHeal[index] = true + } + return err + } + readers[index] = erasuredPartReader + defer erasuredPartReader.Close() + } + + // Check if there is atleast one part that needs to be healed. + atleastOneSelfHeal := false + for _, shNeeded := range needsSelfHeal { + if shNeeded { + atleastOneSelfHeal = true + break + } + } + if !atleastOneSelfHeal { + // Return if healing not needed anywhere. + return nil + } + + // create writers for parts where healing is needed. + for index, shNeeded := range needsSelfHeal { + if !shNeeded { + continue + } + var err error + erasurePart := slashpath.Join(path, fmt.Sprintf("part.%d", index)) + writers[index], err = xl.storageDisks[index].CreateFile(volume, erasurePart) + if err != nil { + // Unexpected error + closeAndRemoveWriters(writers...) + return err + } + } + size, err := strconv.ParseInt(metadata["file.size"], 10, 64) + if err != nil { + closeAndRemoveWriters(writers...) + return err + } + var totalLeft = size + for totalLeft > 0 { + // Figure out the right blockSize. + var curBlockSize int + if erasureBlockSize < totalLeft { + curBlockSize = erasureBlockSize + } else { + curBlockSize = int(totalLeft) + } + // Calculate the current shard size. + curShardSize := getEncodedBlockLen(curBlockSize, xl.DataBlocks) + enShards := make([][]byte, totalShards) + // Loop through all readers and read. + for index, reader := range readers { + // Initialize shard slice and fill the data from each parts. + // ReedSolomon.Verify() expects that slice is not nil even if the particular + // part needs healing. + enShards[index] = make([]byte, curShardSize) + if needsSelfHeal[index] { + // Skip reading if the part needs healing. + continue + } + _, e := io.ReadFull(reader, enShards[index]) + if e != nil && e != io.ErrUnexpectedEOF { + enShards[index] = nil + } + } + + // Check blocks if they are all zero in length. + if checkBlockSize(enShards) == 0 { + err = errors.New("Data likely corrupted, all blocks are zero in length.") + return err + } + + // Verify the shards. + ok, e := xl.ReedSolomon.Verify(enShards) + if e != nil { + closeAndRemoveWriters(writers...) + return e + } + + // Verification failed, shards require reconstruction. + if !ok { + for index, shNeeded := range needsSelfHeal { + if shNeeded { + // Reconstructs() reconstructs the parts if the array is nil. + enShards[index] = nil + } + } + e = xl.ReedSolomon.Reconstruct(enShards) + if e != nil { + closeAndRemoveWriters(writers...) + return e + } + // Verify reconstructed shards again. + ok, e = xl.ReedSolomon.Verify(enShards) + if e != nil { + closeAndRemoveWriters(writers...) + return e + } + if !ok { + // Shards cannot be reconstructed, corrupted data. + e = errors.New("Verification failed after reconstruction, data likely corrupted.") + closeAndRemoveWriters(writers...) + return e + } + } + for index, shNeeded := range needsSelfHeal { + if !shNeeded { + continue + } + _, e := writers[index].Write(enShards[index]) + if e != nil { + closeAndRemoveWriters(writers...) + return e + } + } + totalLeft = totalLeft - erasureBlockSize + } + + // After successful healing Close() the writer so that the temp + // files are committed to their location. + for index, shNeeded := range needsSelfHeal { + if !shNeeded { + continue + } + writers[index].Close() + } + + // Write part.json where ever healing was done. + var metadataWriters = make([]io.WriteCloser, len(xl.storageDisks)) + for index, shNeeded := range needsSelfHeal { + if !shNeeded { + continue + } + metadataFile := slashpath.Join(path, metadataFile) + metadataWriters[index], err = xl.storageDisks[index].CreateFile(volume, metadataFile) + if err != nil { + closeAndRemoveWriters(writers...) + return err + } + } + metadataBytes, err := json.Marshal(metadata) + if err != nil { + closeAndRemoveWriters(metadataWriters...) + return err + } + for index, shNeeded := range needsSelfHeal { + if !shNeeded { + continue + } + _, err = metadataWriters[index].Write(metadataBytes) + if err != nil { + closeAndRemoveWriters(metadataWriters...) + return err + } + } + + // Metadata written for all the healed parts hence Close() so that + // temp files can be committed. + for index := range xl.storageDisks { + if !needsSelfHeal[index] { + continue + } + metadataWriters[index].Close() + } + return nil +} + +// self heal. +type selfHeal struct { + volume string + path string + errCh chan<- error +} + +// selfHealRoutine - starts a go routine and listens on a channel for healing requests. +func (xl *XL) selfHealRoutine() { + xl.selfHealCh = make(chan selfHeal) + + // Healing request can be made like this: + // errCh := make(chan error) + // xl.selfHealCh <- selfHeal{"testbucket", "testobject", errCh} + // fmt.Println(<-errCh) + go func() { + for sh := range xl.selfHealCh { + if sh.volume == "" || sh.path == "" { + sh.errCh <- errors.New("volume or path can not be empty") + continue + } + xl.selfHeal(sh.volume, sh.path) + sh.errCh <- nil + } + }() +} diff --git a/xl-v1-readfile.go b/xl-v1-readfile.go index 650f5c011..c12ebe70c 100644 --- a/xl-v1-readfile.go +++ b/xl-v1-readfile.go @@ -90,7 +90,7 @@ func (xl XL) getReadFileQuorumDisks(volume, path string) (quorumDisks []quorumDi for disk, version := range diskVersionMap { if version > higherVersion { higherVersion = version - quorumDisks = []quorumDisk{quorumDisk{disk, i}} + quorumDisks = []quorumDisk{{disk, i}} } else if version == higherVersion { quorumDisks = append(quorumDisks, quorumDisk{disk, i}) } @@ -133,10 +133,10 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error) return nil, errInvalidArgument } - // Acquire a read lock. - readLock := true - xl.lockNS(volume, path, readLock) - defer xl.unlockNS(volume, path, readLock) + // Acquire a read lock. - TODO - disable this due to stack overflow bug. + // readLock := true + // xl.lockNS(volume, path, readLock) + // defer xl.unlockNS(volume, path, readLock) // Check read quorum. quorumDisks := xl.getReadFileQuorumDisks(volume, path) diff --git a/xl-v1.go b/xl-v1.go index 2c6ac91eb..d919f04b1 100644 --- a/xl-v1.go +++ b/xl-v1.go @@ -21,7 +21,6 @@ import ( "encoding/json" "errors" "fmt" - "io" "os" slashpath "path" "sort" @@ -40,13 +39,6 @@ const ( maxErasureBlocks = 16 ) -// Self Heal data -type selfHeal struct { - volume string - fsPath string - errCh chan error -} - // XL layer structure. type XL struct { ReedSolomon reedsolomon.Encoder // Erasure encoder/decoder. @@ -57,7 +49,9 @@ type XL struct { nameSpaceLockMapMutex *sync.Mutex readQuorum int writeQuorum int - selfHealCh chan selfHeal + + // Heal input/output channel. + selfHealCh chan selfHeal } // lockNS - locks the given resource, using a previously allocated @@ -163,7 +157,7 @@ func newXL(disks ...string) (StorageAPI, error) { xl.writeQuorum = len(xl.storageDisks) } - // Start self heal go routine. + // Start self heal go routine, taking inputs over self heal channel. xl.selfHealRoutine() // Return successfully initialized. @@ -227,8 +221,9 @@ func (xl XL) StatVol(volume string) (volInfo VolInfo, err error) { // format it means that the parent directory is the actual object name. func (xl XL) isLeafDirectory(volume, leafPath string) (isLeaf bool) { var allFileInfos []FileInfo + var markerPath string for { - fileInfos, eof, e := xl.storageDisks[0].ListFiles(volume, leafPath, "", false, 1000) + fileInfos, eof, e := xl.storageDisks[0].ListFiles(volume, leafPath, markerPath, false, 1000) if e != nil { break } @@ -236,6 +231,8 @@ func (xl XL) isLeafDirectory(volume, leafPath string) (isLeaf bool) { if eof { break } + // MarkerPath to get the next set of files. + markerPath = allFileInfos[len(allFileInfos)-1].Name } for _, fileInfo := range allFileInfos { if fileInfo.Mode.IsDir() { @@ -477,224 +474,3 @@ func (xl XL) DeleteFile(volume, path string) error { } return nil } - -// selfHeal - called by the healing go-routine, heals using erasure coding. -func (xl XL) selfHeal(volume string, fsPath string) error { - totalShards := xl.DataBlocks + xl.ParityBlocks - needsSelfHeal := make([]bool, totalShards) - var metadata = make(map[string]string) - var readers = make([]io.Reader, totalShards) - var writers = make([]io.WriteCloser, totalShards) - for index, disk := range xl.storageDisks { - metadataFile := slashpath.Join(fsPath, metadataFile) - - // Start from the beginning, we are not reading partial metadata files. - offset := int64(0) - - metadataReader, err := disk.ReadFile(volume, metadataFile, offset) - if err != nil { - if err != errFileNotFound { - continue - } - // Needs healing if part.json is not found - needsSelfHeal[index] = true - continue - } - defer metadataReader.Close() - - decoder := json.NewDecoder(metadataReader) - if err = decoder.Decode(&metadata); err != nil { - // needs healing if parts.json is not parsable - needsSelfHeal[index] = true - } - - erasurePart := slashpath.Join(fsPath, fmt.Sprintf("part.%d", index)) - erasuredPartReader, err := disk.ReadFile(volume, erasurePart, offset) - if err != nil { - if err != errFileNotFound { - continue - } - // needs healing if part file not found - needsSelfHeal[index] = true - } else { - readers[index] = erasuredPartReader - defer erasuredPartReader.Close() - } - } - // Check if there is atleat one part that needs to be healed. - atleastOneSelfHeal := false - for _, shNeeded := range needsSelfHeal { - if shNeeded { - atleastOneSelfHeal = true - break - } - } - if !atleastOneSelfHeal { - // return if healing not needed anywhere. - return nil - } - - // create writers for parts where healing is needed. - for i, shNeeded := range needsSelfHeal { - if !shNeeded { - continue - } - var err error - erasurePart := slashpath.Join(fsPath, fmt.Sprintf("part.%d", i)) - writers[i], err = xl.storageDisks[i].CreateFile(volume, erasurePart) - if err != nil { - // Unexpected error - closeAndRemoveWriters(writers...) - return err - } - } - size, err := strconv.ParseInt(metadata["file.size"], 10, 64) - if err != nil { - closeAndRemoveWriters(writers...) - return err - } - var totalLeft = size - for totalLeft > 0 { - // Figure out the right blockSize. - var curBlockSize int - if erasureBlockSize < totalLeft { - curBlockSize = erasureBlockSize - } else { - curBlockSize = int(totalLeft) - } - // Calculate the current shard size. - curShardSize := getEncodedBlockLen(curBlockSize, xl.DataBlocks) - enShards := make([][]byte, totalShards) - // Loop through all readers and read. - for i, reader := range readers { - // Initialize shard slice and fill the data from each parts. - // ReedSolomon.Verify() expects that slice is not nil even if the particular - // part needs healing. - enShards[i] = make([]byte, curShardSize) - if needsSelfHeal[i] { - // Skip reading if the part needs healing. - continue - } - _, e := io.ReadFull(reader, enShards[i]) - if e != nil && e != io.ErrUnexpectedEOF { - enShards[i] = nil - } - } - - // Check blocks if they are all zero in length. - if checkBlockSize(enShards) == 0 { - err = errors.New("Data likely corrupted, all blocks are zero in length.") - return err - } - - // Verify the shards. - ok, e := xl.ReedSolomon.Verify(enShards) - if e != nil { - closeAndRemoveWriters(writers...) - return e - } - // Verification failed, shards require reconstruction. - if !ok { - for i, shNeeded := range needsSelfHeal { - if shNeeded { - // Reconstructs() reconstructs the parts if the array is nil. - enShards[i] = nil - } - } - e = xl.ReedSolomon.Reconstruct(enShards) - if e != nil { - closeAndRemoveWriters(writers...) - return e - } - // Verify reconstructed shards again. - ok, e = xl.ReedSolomon.Verify(enShards) - if e != nil { - closeAndRemoveWriters(writers...) - return e - } - if !ok { - // Shards cannot be reconstructed, corrupted data. - e = errors.New("Verification failed after reconstruction, data likely corrupted.") - closeAndRemoveWriters(writers...) - return e - } - } - for i, shNeeded := range needsSelfHeal { - if !shNeeded { - continue - } - _, e := writers[i].Write(enShards[i]) - if e != nil { - closeAndRemoveWriters(writers...) - return e - } - } - totalLeft = totalLeft - erasureBlockSize - } - // After successful healing Close() the writer so that the temp files are renamed. - for i, shNeeded := range needsSelfHeal { - if !shNeeded { - continue - } - writers[i].Close() - } - - // Write part.json where ever healing was done. - var metadataWriters = make([]io.WriteCloser, len(xl.storageDisks)) - for i, shNeeded := range needsSelfHeal { - if !shNeeded { - continue - } - metadataFile := slashpath.Join(fsPath, metadataFile) - metadataWriters[i], err = xl.storageDisks[i].CreateFile(volume, metadataFile) - if err != nil { - closeAndRemoveWriters(writers...) - return err - } - } - metadataBytes, err := json.Marshal(metadata) - if err != nil { - closeAndRemoveWriters(metadataWriters...) - return err - } - for i, shNeeded := range needsSelfHeal { - if !shNeeded { - continue - } - _, err := metadataWriters[i].Write(metadataBytes) - if err != nil { - closeAndRemoveWriters(metadataWriters...) - return err - } - } - - // part.json written for all the healed parts hence Close() so that temp files can be renamed. - for index := range xl.storageDisks { - if !needsSelfHeal[index] { - continue - } - metadataWriters[index].Close() - } - return nil -} - -// selfHealRoutine - starts a go routine and listens on a channel for healing requests. -func (xl *XL) selfHealRoutine() { - xl.selfHealCh = make(chan selfHeal) - - // Healing request can be made like this: - // errCh := make(chan error) - // xl.selfHealCh <- selfHeal{"testbucket", "testobject", errCh} - // fmt.Println(<-errCh) - - go func() { - for sh := range xl.selfHealCh { - if sh.volume == "" || sh.fsPath == "" { - sh.errCh <- errors.New("volume or path can not be empty") - continue - } - xl.selfHeal(sh.volume, sh.fsPath) - sh.errCh <- nil - } - }() -}