Detect underlying disk mount/unmount (#8408)

This commit is contained in:
Krishna Srinivas 2019-10-25 10:37:53 -07:00 committed by kannappanr
parent 8aaaa46be9
commit 980bf78b4d
13 changed files with 415 additions and 239 deletions

View File

@ -77,6 +77,9 @@ func (d *naughtyDisk) calcError() (err error) {
return nil
}
func (d *naughtyDisk) SetDiskID(id string) {
}
func (d *naughtyDisk) DiskInfo() (info DiskInfo, err error) {
if err := d.calcError(); err != nil {
return info, err

View File

@ -99,7 +99,11 @@ func deleteBucketMetadata(ctx context.Context, bucket string, objAPI ObjectLayer
// Depending on the disk type network or local, initialize storage API.
func newStorageAPI(endpoint Endpoint) (storage StorageAPI, err error) {
if endpoint.IsLocal {
return newPosix(endpoint.Path)
storage, err := newPosix(endpoint.Path)
if err != nil {
return nil, err
}
return &posixDiskIDCheck{storage: storage}, nil
}
return newStorageRESTClient(endpoint)

190
cmd/posix-diskid-check.go Normal file
View File

@ -0,0 +1,190 @@
/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cmd
import (
"io"
)
// Detects change in underlying disk.
type posixDiskIDCheck struct {
storage *posix
diskID string
}
func (p *posixDiskIDCheck) String() string {
return p.storage.String()
}
func (p *posixDiskIDCheck) IsOnline() bool {
storedDiskID, err := p.storage.getDiskID()
if err != nil {
return false
}
return storedDiskID == p.diskID
}
func (p *posixDiskIDCheck) LastError() error {
return p.storage.LastError()
}
func (p *posixDiskIDCheck) Close() error {
return p.storage.Close()
}
func (p *posixDiskIDCheck) SetDiskID(id string) {
p.diskID = id
}
func (p *posixDiskIDCheck) isDiskStale() bool {
if p.diskID == "" {
// For empty disk-id we allow the call as the server might be coming up and trying to read format.json
// or create format.json
return false
}
storedDiskID, err := p.storage.getDiskID()
if err == nil && p.diskID == storedDiskID {
return false
}
return true
}
func (p *posixDiskIDCheck) DiskInfo() (info DiskInfo, err error) {
if p.isDiskStale() {
return info, errDiskNotFound
}
return p.storage.DiskInfo()
}
func (p *posixDiskIDCheck) MakeVol(volume string) (err error) {
if p.isDiskStale() {
return errDiskNotFound
}
return p.storage.MakeVol(volume)
}
func (p *posixDiskIDCheck) ListVols() ([]VolInfo, error) {
if p.isDiskStale() {
return nil, errDiskNotFound
}
return p.storage.ListVols()
}
func (p *posixDiskIDCheck) StatVol(volume string) (vol VolInfo, err error) {
if p.isDiskStale() {
return vol, errDiskNotFound
}
return p.storage.StatVol(volume)
}
func (p *posixDiskIDCheck) DeleteVol(volume string) (err error) {
if p.isDiskStale() {
return errDiskNotFound
}
return p.storage.DeleteVol(volume)
}
func (p *posixDiskIDCheck) Walk(volume, dirPath string, marker string, recursive bool, leafFile string, readMetadataFn readMetadataFunc, endWalkCh chan struct{}) (chan FileInfo, error) {
if p.isDiskStale() {
return nil, errDiskNotFound
}
return p.storage.Walk(volume, dirPath, marker, recursive, leafFile, readMetadataFn, endWalkCh)
}
func (p *posixDiskIDCheck) ListDir(volume, dirPath string, count int, leafFile string) ([]string, error) {
if p.isDiskStale() {
return nil, errDiskNotFound
}
return p.storage.ListDir(volume, dirPath, count, leafFile)
}
func (p *posixDiskIDCheck) ReadFile(volume string, path string, offset int64, buf []byte, verifier *BitrotVerifier) (n int64, err error) {
if p.isDiskStale() {
return 0, errDiskNotFound
}
return p.storage.ReadFile(volume, path, offset, buf, verifier)
}
func (p *posixDiskIDCheck) AppendFile(volume string, path string, buf []byte) (err error) {
if p.isDiskStale() {
return errDiskNotFound
}
return p.storage.AppendFile(volume, path, buf)
}
func (p *posixDiskIDCheck) CreateFile(volume, path string, size int64, reader io.Reader) error {
if p.isDiskStale() {
return errDiskNotFound
}
return p.storage.CreateFile(volume, path, size, reader)
}
func (p *posixDiskIDCheck) ReadFileStream(volume, path string, offset, length int64) (io.ReadCloser, error) {
if p.isDiskStale() {
return nil, errDiskNotFound
}
return p.storage.ReadFileStream(volume, path, offset, length)
}
func (p *posixDiskIDCheck) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) error {
if p.isDiskStale() {
return errDiskNotFound
}
return p.storage.RenameFile(srcVolume, srcPath, dstVolume, dstPath)
}
func (p *posixDiskIDCheck) StatFile(volume string, path string) (file FileInfo, err error) {
if p.isDiskStale() {
return file, errDiskNotFound
}
return p.storage.StatFile(volume, path)
}
func (p *posixDiskIDCheck) DeleteFile(volume string, path string) (err error) {
if p.isDiskStale() {
return errDiskNotFound
}
return p.storage.DeleteFile(volume, path)
}
func (p *posixDiskIDCheck) DeleteFileBulk(volume string, paths []string) (errs []error, err error) {
if p.isDiskStale() {
return nil, errDiskNotFound
}
return p.storage.DeleteFileBulk(volume, paths)
}
func (p *posixDiskIDCheck) VerifyFile(volume, path string, size int64, algo BitrotAlgorithm, sum []byte, shardSize int64) error {
if p.isDiskStale() {
return errDiskNotFound
}
return p.storage.VerifyFile(volume, path, size, algo, sum, shardSize)
}
func (p *posixDiskIDCheck) WriteAll(volume string, path string, reader io.Reader) (err error) {
if p.isDiskStale() {
return errDiskNotFound
}
return p.storage.WriteAll(volume, path, reader)
}
func (p *posixDiskIDCheck) ReadAll(volume string, path string) (buf []byte, err error) {
if p.isDiskStale() {
return nil, errDiskNotFound
}
return p.storage.ReadAll(volume, path)
}

View File

@ -35,6 +35,7 @@ import (
"bytes"
humanize "github.com/dustin/go-humanize"
jsoniter "github.com/json-iterator/go"
"github.com/klauspost/readahead"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/disk"
@ -73,13 +74,16 @@ type posix struct {
diskPath string
pool sync.Pool
connected bool
diskMount bool // indicates if the path is an actual mount.
diskFileInfo os.FileInfo
diskID string
formatFileInfo os.FileInfo
// Disk usage metrics
stopUsageCh chan struct{}
sync.RWMutex
}
// checkPathLength - returns error if given path name length more than 255
@ -182,12 +186,11 @@ func newPosix(path string) (*posix, error) {
if path, err = getValidPath(path); err != nil {
return nil, err
}
fi, err := os.Stat(path)
_, err = os.Stat(path)
if err != nil {
return nil, err
}
p := &posix{
connected: true,
diskPath: path,
pool: sync.Pool{
New: func() interface{} {
@ -196,7 +199,6 @@ func newPosix(path string) (*posix, error) {
},
},
stopUsageCh: make(chan struct{}),
diskFileInfo: fi,
diskMount: mountinfo.IsLikelyMountPoint(path),
}
@ -298,12 +300,11 @@ func (s *posix) LastError() error {
func (s *posix) Close() error {
close(s.stopUsageCh)
s.connected = false
return nil
}
func (s *posix) IsOnline() bool {
return s.connected
return true
}
// DiskInfo is an extended type which returns current
@ -329,10 +330,6 @@ func (s *posix) DiskInfo() (info DiskInfo, err error) {
return info, errFaultyDisk
}
if err := s.checkDiskFound(); err != nil {
return info, err
}
di, err := getDiskInfo(s.diskPath)
if err != nil {
return info, err
@ -370,30 +367,37 @@ func (s *posix) getVolDir(volume string) (string, error) {
return volumeDir, nil
}
// checkDiskFound - validates if disk is available,
// returns errDiskNotFound if not found.
func (s *posix) checkDiskFound() (err error) {
if !s.IsOnline() {
return errDiskNotFound
}
fi, err := os.Stat(s.diskPath)
func (s *posix) getDiskID() (string, error) {
s.RLock()
diskID := s.diskID
s.RUnlock()
formatFile := pathJoin(s.diskPath, minioMetaBucket, formatConfigFile)
fi, err := os.Stat(formatFile)
if err != nil {
switch {
case os.IsNotExist(err):
return errDiskNotFound
case isSysErrTooLong(err):
return errFileNameTooLong
case isSysErrIO(err):
return errFaultyDisk
default:
return err
// If the disk is still not initialized.
return "", err
}
if xioutil.SameFile(fi, s.formatFileInfo) {
// If the file has not changed, just return the cached diskID information.
return diskID, nil
}
if !os.SameFile(s.diskFileInfo, fi) {
s.connected = false
return errDiskNotFound
s.Lock()
defer s.Unlock()
b, err := ioutil.ReadFile(formatFile)
if err != nil {
return "", err
}
return nil
format := &formatXLV3{}
var json = jsoniter.ConfigCompatibleWithStandardLibrary
if err = json.Unmarshal(b, &format); err != nil {
return "", err
}
s.diskID = format.XL.This
s.formatFileInfo = fi
return s.diskID, nil
}
// diskUsage returns du information for the posix path, in a continuous routine.
@ -478,6 +482,12 @@ func (s *posix) diskUsage(doneCh chan struct{}) {
}
}
// Make a volume entry.
func (s *posix) SetDiskID(id string) {
// NO-OP for posix as it is handled either by posixDiskIDCheck{} for local disks or
// storage rest server for remote disks.
}
// Make a volume entry.
func (s *posix) MakeVol(volume string) (err error) {
defer func() {
@ -490,10 +500,6 @@ func (s *posix) MakeVol(volume string) (err error) {
return errFaultyDisk
}
if err = s.checkDiskFound(); err != nil {
return err
}
if !isValidVolname(volume) {
return errInvalidArgument
}
@ -533,10 +539,6 @@ func (s *posix) ListVols() (volsInfo []VolInfo, err error) {
return nil, errFaultyDisk
}
if err = s.checkDiskFound(); err != nil {
return nil, err
}
volsInfo, err = listVols(s.diskPath)
if err != nil {
if isSysErrIO(err) {
@ -602,10 +604,6 @@ func (s *posix) StatVol(volume string) (volInfo VolInfo, err error) {
return VolInfo{}, errFaultyDisk
}
if err = s.checkDiskFound(); err != nil {
return VolInfo{}, err
}
// Verify if volume is valid and it exists.
volumeDir, err := s.getVolDir(volume)
if err != nil {
@ -643,10 +641,6 @@ func (s *posix) DeleteVol(volume string) (err error) {
return errFaultyDisk
}
if err = s.checkDiskFound(); err != nil {
return err
}
// Verify if volume is valid and it exists.
volumeDir, err := s.getVolDir(volume)
if err != nil {
@ -685,10 +679,6 @@ func (s *posix) Walk(volume, dirPath, marker string, recursive bool, leafFile st
return nil, errFaultyDisk
}
if err = s.checkDiskFound(); err != nil {
return nil, err
}
// Verify if volume is valid and it exists.
volumeDir, err := s.getVolDir(volume)
if err != nil {
@ -762,10 +752,6 @@ func (s *posix) ListDir(volume, dirPath string, count int, leafFile string) (ent
return nil, errFaultyDisk
}
if err = s.checkDiskFound(); err != nil {
return nil, err
}
// Verify if volume is valid and it exists.
volumeDir, err := s.getVolDir(volume)
if err != nil {
@ -818,10 +804,6 @@ func (s *posix) ReadAll(volume, path string) (buf []byte, err error) {
return nil, errFaultyDisk
}
if err = s.checkDiskFound(); err != nil {
return nil, err
}
volumeDir, err := s.getVolDir(volume)
if err != nil {
return nil, err
@ -895,10 +877,6 @@ func (s *posix) ReadFile(volume, path string, offset int64, buffer []byte, verif
return 0, errFaultyDisk
}
if err = s.checkDiskFound(); err != nil {
return 0, err
}
volumeDir, err := s.getVolDir(volume)
if err != nil {
return 0, err
@ -996,10 +974,6 @@ func (s *posix) openFile(volume, path string, mode int) (f *os.File, err error)
return nil, errFaultyDisk
}
if err = s.checkDiskFound(); err != nil {
return nil, err
}
volumeDir, err := s.getVolDir(volume)
if err != nil {
return nil, err
@ -1071,10 +1045,6 @@ func (s *posix) ReadFileStream(volume, path string, offset, length int64) (io.Re
return nil, errFaultyDisk
}
if err = s.checkDiskFound(); err != nil {
return nil, err
}
volumeDir, err := s.getVolDir(volume)
if err != nil {
return nil, err
@ -1161,10 +1131,6 @@ func (s *posix) CreateFile(volume, path string, fileSize int64, r io.Reader) (er
return err
}
if err = s.checkDiskFound(); err != nil {
return err
}
volumeDir, err := s.getVolDir(volume)
if err != nil {
return err
@ -1312,10 +1278,6 @@ func (s *posix) StatFile(volume, path string) (file FileInfo, err error) {
return FileInfo{}, errFaultyDisk
}
if err = s.checkDiskFound(); err != nil {
return FileInfo{}, err
}
volumeDir, err := s.getVolDir(volume)
if err != nil {
return FileInfo{}, err
@ -1412,10 +1374,6 @@ func (s *posix) DeleteFile(volume, path string) (err error) {
return errFaultyDisk
}
if err = s.checkDiskFound(); err != nil {
return err
}
volumeDir, err := s.getVolDir(volume)
if err != nil {
return err
@ -1465,10 +1423,6 @@ func (s *posix) RenameFile(srcVolume, srcPath, dstVolume, dstPath string) (err e
return errFaultyDisk
}
if err = s.checkDiskFound(); err != nil {
return err
}
srcVolumeDir, err := s.getVolDir(srcVolume)
if err != nil {
return err
@ -1561,10 +1515,6 @@ func (s *posix) VerifyFile(volume, path string, fileSize int64, algo BitrotAlgor
return errFaultyDisk
}
if err = s.checkDiskFound(); err != nil {
return err
}
volumeDir, err := s.getVolDir(volume)
if err != nil {
return err

View File

@ -94,11 +94,20 @@ func newPosixTestSetup() (StorageAPI, string, error) {
return nil, "", err
}
// Initialize a new posix layer.
posixStorage, err := newPosix(diskPath)
storage, err := newPosix(diskPath)
if err != nil {
return nil, "", err
}
return posixStorage, diskPath, nil
err = storage.MakeVol(minioMetaBucket)
if err != nil {
return nil, "", err
}
// Create a sample format.json file
err = storage.WriteAll(minioMetaBucket, formatConfigFile, bytes.NewBufferString(`{"version":"1","format":"xl","id":"592a41c2-b7cc-4130-b883-c4b5cb15965b","xl":{"version":"3","this":"da017d62-70e3-45f1-8a1a-587707e69ad1","sets":[["e07285a6-8c73-4962-89c6-047fb939f803","33b8d431-482d-4376-b63c-626d229f0a29","cff6513a-4439-4dc1-bcaa-56c9e880c352","da017d62-70e3-45f1-8a1a-587707e69ad1","9c9f21d5-1f15-4737-bce6-835faa0d9626","0a59b346-1424-4fc2-9fa2-a2e80541d0c1","7924a3dc-b69a-4971-9a2e-014966d6aebb","4d2b8dd9-4e48-444b-bdca-c89194b26042"]],"distributionAlgo":"CRCMOD"}}`))
if err != nil {
return nil, "", err
}
return &posixDiskIDCheck{storage: storage, diskID: "da017d62-70e3-45f1-8a1a-587707e69ad1"}, diskPath, nil
}
// createPermDeniedFile - creates temporary directory and file with path '/mybucket/myobject'
@ -303,9 +312,9 @@ func TestPosixReadAll(t *testing.T) {
}
// TestPosixing for faulty disk.
// Setting ioErrCount > maxAllowedIOError.
if posixType, ok := posixStorage.(*posix); ok {
if p, ok := posixStorage.(*posixDiskIDCheck); ok {
// setting the io error count from as specified in the test case.
posixType.ioErrCount = int32(6)
p.storage.ioErrCount = int32(6)
} else {
t.Errorf("Expected the StorageAPI to be of type *posix")
}
@ -421,9 +430,9 @@ func TestPosixMakeVol(t *testing.T) {
}
for i, testCase := range testCases {
if posixType, ok := posixStorage.(*posix); ok {
if p, ok := posixStorage.(*posixDiskIDCheck); ok {
// setting the io error count from as specified in the test case.
posixType.ioErrCount = int32(testCase.ioErrCount)
p.storage.ioErrCount = int32(testCase.ioErrCount)
} else {
t.Errorf("Expected the StorageAPI to be of type *posix")
}
@ -535,11 +544,11 @@ func TestPosixDeleteVol(t *testing.T) {
}
for i, testCase := range testCases {
if posixType, ok := posixStorage.(*posix); ok {
if posixDiskCheck, ok := posixStorage.(*posixDiskIDCheck); ok {
// setting the io error count from as specified in the test case.
posixType.ioErrCount = int32(testCase.ioErrCount)
posixDiskCheck.storage.ioErrCount = int32(testCase.ioErrCount)
} else {
t.Errorf("Expected the StorageAPI to be of type *posix")
t.Errorf("Expected the StorageAPI to be of type *posixDiskIDCheck")
}
if err = posixStorage.DeleteVol(testCase.volName); err != testCase.expectedErr {
t.Fatalf("TestPosix: %d, expected: %s, got: %s", i+1, testCase.expectedErr, err)
@ -648,9 +657,9 @@ func TestPosixStatVol(t *testing.T) {
for i, testCase := range testCases {
var volInfo VolInfo
// setting ioErrCnt from the test case.
if posixType, ok := posixStorage.(*posix); ok {
if p, ok := posixStorage.(*posixDiskIDCheck); ok {
// setting the io error count from as specified in the test case.
posixType.ioErrCount = int32(testCase.ioErrCount)
p.storage.ioErrCount = int32(testCase.ioErrCount)
} else {
t.Errorf("Expected the StorageAPI to be of type *posix")
}
@ -689,30 +698,42 @@ func TestPosixListVols(t *testing.T) {
t.Fatalf("Unable to create posix test setup, %s", err)
}
var volInfo []VolInfo
var volInfos []VolInfo
// TestPosix empty list vols.
if volInfo, err = posixStorage.ListVols(); err != nil {
if volInfos, err = posixStorage.ListVols(); err != nil {
t.Fatalf("expected: <nil>, got: %s", err)
} else if len(volInfo) != 0 {
t.Fatalf("expected: [], got: %s", volInfo)
} else if len(volInfos) != 1 {
t.Fatalf("expected: one entry, got: %s", volInfos)
}
// TestPosix non-empty list vols.
if err = posixStorage.MakeVol("success-vol"); err != nil {
t.Fatalf("Unable to create volume, %s", err)
}
if volInfo, err = posixStorage.ListVols(); err != nil {
volInfos, err = posixStorage.ListVols()
if err != nil {
t.Fatalf("expected: <nil>, got: %s", err)
} else if len(volInfo) != 1 {
t.Fatalf("expected: 1, got: %d", len(volInfo))
} else if volInfo[0].Name != "success-vol" {
t.Errorf("expected: success-vol, got: %s", volInfo[0].Name)
}
if len(volInfos) != 2 {
t.Fatalf("expected: 2, got: %d", len(volInfos))
}
volFound := false
for _, info := range volInfos {
if info.Name == "success-vol" {
volFound = true
break
}
}
if !volFound {
t.Errorf("expected: success-vol to be created")
}
// setting ioErrCnt to be > maxAllowedIOError.
// should fail with errFaultyDisk.
if posixType, ok := posixStorage.(*posix); ok {
if p, ok := posixStorage.(*posixDiskIDCheck); ok {
// setting the io error count from as specified in the test case.
posixType.ioErrCount = int32(6)
p.storage.ioErrCount = int32(6)
} else {
t.Errorf("Expected the StorageAPI to be of type *posix")
}
@ -723,9 +744,9 @@ func TestPosixListVols(t *testing.T) {
os.RemoveAll(path)
// Resetting the IO error.
// should fail with errDiskNotFound.
if posixType, ok := posixStorage.(*posix); ok {
if p, ok := posixStorage.(*posixDiskIDCheck); ok {
// setting the io error count from as specified in the test case.
posixType.ioErrCount = int32(0)
p.storage.ioErrCount = int32(0)
} else {
t.Errorf("Expected the StorageAPI to be of type *posix")
}
@ -832,9 +853,9 @@ func TestPosixPosixListDir(t *testing.T) {
for i, testCase := range testCases {
var dirList []string
// setting ioErrCnt from the test case.
if posixType, ok := posixStorage.(*posix); ok {
if p, ok := posixStorage.(*posixDiskIDCheck); ok {
// setting the io error count from as specified in the test case.
posixType.ioErrCount = int32(testCase.ioErrCnt)
p.storage.ioErrCount = int32(testCase.ioErrCnt)
} else {
t.Errorf("Expected the StorageAPI to be of type *posix")
}
@ -986,9 +1007,9 @@ func TestPosixDeleteFile(t *testing.T) {
for i, testCase := range testCases {
// setting ioErrCnt from the test case.
if posixType, ok := posixStorage.(*posix); ok {
if p, ok := posixStorage.(*posixDiskIDCheck); ok {
// setting the io error count from as specified in the test case.
posixType.ioErrCount = int32(testCase.ioErrCnt)
p.storage.ioErrCount = int32(testCase.ioErrCnt)
} else {
t.Errorf("Expected the StorageAPI to be of type *posix")
}
@ -1225,9 +1246,9 @@ func TestPosixReadFile(t *testing.T) {
// TestPosixing for faulty disk.
// setting ioErrCnt to 6.
// should fail with errFaultyDisk.
if posixType, ok := posixStorage.(*posix); ok {
if posixType, ok := posixStorage.(*posixDiskIDCheck); ok {
// setting the io error count from as specified in the test case.
posixType.ioErrCount = int32(6)
posixType.storage.ioErrCount = int32(6)
// Common read buffer.
var buf = make([]byte, 10)
_, err = posixType.ReadFile("abc", "yes", 0, buf, nil)
@ -1312,6 +1333,29 @@ func TestPosixReadFileWithVerify(t *testing.T) {
}
}
// TestPosixFormatFileChange - to test if changing the diskID makes the calls fail.
func TestPosixFormatFileChange(t *testing.T) {
posixStorage, path, err := newPosixTestSetup()
if err != nil {
t.Fatalf("Unable to create posix test setup, %s", err)
}
defer os.RemoveAll(path)
if err = posixStorage.MakeVol(volume); err != nil {
t.Fatalf("MakeVol failed with %s", err)
}
// Change the format.json such that "this" is changed to "randomid".
if err = ioutil.WriteFile(pathJoin(posixStorage.String(), minioMetaBucket, formatConfigFile), []byte(`{"version":"1","format":"xl","id":"592a41c2-b7cc-4130-b883-c4b5cb15965b","xl":{"version":"3","this":"randomid","sets":[["e07285a6-8c73-4962-89c6-047fb939f803","33b8d431-482d-4376-b63c-626d229f0a29","cff6513a-4439-4dc1-bcaa-56c9e880c352","randomid","9c9f21d5-1f15-4737-bce6-835faa0d9626","0a59b346-1424-4fc2-9fa2-a2e80541d0c1","7924a3dc-b69a-4971-9a2e-014966d6aebb","4d2b8dd9-4e48-444b-bdca-c89194b26042"]],"distributionAlgo":"CRCMOD"}}`), 0644); err != nil {
t.Fatalf("ioutil.WriteFile failed with %s", err)
}
err = posixStorage.MakeVol(volume)
if err != errDiskNotFound {
t.Fatalf("MakeVol expected to fail with errDiskNotFound but failed with %s", err)
}
}
// TestPosix posix.AppendFile()
func TestPosixAppendFile(t *testing.T) {
// create posix test setup
@ -1394,10 +1438,10 @@ func TestPosixAppendFile(t *testing.T) {
// setting ioErrCnt to 6.
// should fail with errFaultyDisk.
if posixType, ok := posixStorage.(*posix); ok {
if p, ok := posixStorage.(*posixDiskIDCheck); ok {
// setting the io error count from as specified in the test case.
posixType.ioErrCount = int32(6)
err = posixType.AppendFile("abc", "yes", []byte("hello, world"))
p.storage.ioErrCount = int32(6)
err = p.AppendFile("abc", "yes", []byte("hello, world"))
if err != errFaultyDisk {
t.Fatalf("Expected \"Faulty Disk\", got: \"%s\"", err)
}
@ -1647,9 +1691,9 @@ func TestPosixRenameFile(t *testing.T) {
for i, testCase := range testCases {
// setting ioErrCnt from the test case.
if posixType, ok := posixStorage.(*posix); ok {
if p, ok := posixStorage.(*posixDiskIDCheck); ok {
// setting the io error count from as specified in the test case.
posixType.ioErrCount = int32(testCase.ioErrCnt)
p.storage.ioErrCount = int32(testCase.ioErrCnt)
} else {
t.Fatalf("Expected the StorageAPI to be of type *posix")
}
@ -1748,9 +1792,9 @@ func TestPosixStatFile(t *testing.T) {
for i, testCase := range testCases {
// setting ioErrCnt from the test case.
if posixType, ok := posixStorage.(*posix); ok {
if p, ok := posixStorage.(*posixDiskIDCheck); ok {
// setting the io error count from as specified in the test case.
posixType.ioErrCount = int32(testCase.ioErrCnt)
p.storage.ioErrCount = int32(testCase.ioErrCnt)
} else {
t.Errorf("Expected the StorageAPI to be of type *posix")
}

View File

@ -29,6 +29,7 @@ type StorageAPI interface {
IsOnline() bool // Returns true if disk is online.
LastError() error
Close() error
SetDiskID(id string)
DiskInfo() (info DiskInfo, err error)

View File

@ -38,9 +38,6 @@ func isNetworkError(err error) bool {
if err == nil {
return false
}
if err.Error() == errConnectionStale.Error() {
return true
}
if nerr, ok := err.(*rest.NetworkError); ok {
return xnet.IsNetworkOrHostDown(nerr.Err)
}
@ -100,6 +97,8 @@ func toStorageErr(err error) error {
return io.EOF
case io.ErrUnexpectedEOF.Error():
return io.ErrUnexpectedEOF
case errDiskStale.Error():
return errDiskNotFound
}
return err
}
@ -110,7 +109,7 @@ type storageRESTClient struct {
restClient *rest.Client
connected int32
lastError error
instanceID string // REST server's instanceID which is sent with every request for validation.
diskID string
}
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is makred disconnected
@ -123,13 +122,13 @@ func (client *storageRESTClient) call(method string, values url.Values, body io.
if values == nil {
values = make(url.Values)
}
values.Set(storageRESTInstanceID, client.instanceID)
values.Set(storageRESTDiskID, client.diskID)
respBody, err = client.restClient.Call(method, values, body, length)
if err == nil {
return respBody, nil
}
client.lastError = err
if isNetworkError(err) {
if isNetworkError(err) || err.Error() == errDiskStale.Error() {
atomic.StoreInt32(&client.connected, 0)
}
@ -151,6 +150,10 @@ func (client *storageRESTClient) LastError() error {
return client.lastError
}
func (client *storageRESTClient) SetDiskID(id string) {
client.diskID = id
}
// DiskInfo - fetch disk information for a remote disk.
func (client *storageRESTClient) DiskInfo() (info DiskInfo, err error) {
respBody, err := client.call(storageRESTMethodDiskInfo, nil, nil, -1)
@ -403,30 +406,6 @@ func (client *storageRESTClient) RenameFile(srcVolume, srcPath, dstVolume, dstPa
return err
}
// Gets peer storage server's instanceID - to be used with every REST call for validation.
func (client *storageRESTClient) getInstanceID() (err error) {
// getInstanceID() does not use storageRESTClient.call()
// function so we need to update lastError field here.
defer func() {
if err != nil {
client.lastError = err
}
}()
respBody, err := client.restClient.Call(storageRESTMethodGetInstanceID, nil, nil, -1)
if err != nil {
return err
}
defer http.DrainBody(respBody)
instanceIDBuf := make([]byte, 64)
n, err := io.ReadFull(respBody, instanceIDBuf)
if err != io.EOF && err != io.ErrUnexpectedEOF {
return err
}
client.instanceID = string(instanceIDBuf[:n])
return nil
}
func (client *storageRESTClient) VerifyFile(volume, path string, size int64, algo BitrotAlgorithm, sum []byte, shardSize int64) error {
values := make(url.Values)
values.Set(storageRESTVolume, volume)
@ -498,10 +477,5 @@ func newStorageRESTClient(endpoint Endpoint) (*storageRESTClient, error) {
return nil, err
}
client := &storageRESTClient{endpoint: endpoint, restClient: restClient, connected: 1}
if client.getInstanceID() == nil {
client.connected = 1
} else {
client.connected = 0
}
return client, nil
}

View File

@ -17,7 +17,7 @@
package cmd
const (
storageRESTVersion = "v9"
storageRESTVersion = "v10"
storageRESTPath = minioReservedBucketPath + "/storage/" + storageRESTVersion + SlashSeparator
)
@ -41,7 +41,6 @@ const (
storageRESTMethodDeleteFileBulk = "deletefilebulk"
storageRESTMethodRenameFile = "renamefile"
storageRESTMethodVerifyFile = "verifyfile"
storageRESTMethodGetInstanceID = "getinstanceid"
)
const (
@ -61,5 +60,5 @@ const (
storageRESTRecursive = "recursive"
storageRESTBitrotAlgo = "bitrot-algo"
storageRESTBitrotHash = "bitrot-hash"
storageRESTInstanceID = "instance-id"
storageRESTDiskID = "disk-id"
)

View File

@ -34,14 +34,11 @@ import (
"github.com/minio/minio/cmd/logger"
)
var errConnectionStale = errors.New("connection stale, REST client/server instance-id mismatch")
var errDiskStale = errors.New("disk stale")
// To abstract a disk over network.
type storageRESTServer struct {
storage *posix
// Used to detect reboot of servers so that peers revalidate format.json as
// different disk might be available on the same mount point after reboot.
instanceID string
}
func (s *storageRESTServer) writeErrorResponse(w http.ResponseWriter, err error) {
@ -85,24 +82,20 @@ func (s *storageRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool
s.writeErrorResponse(w, err)
return false
}
instanceID := r.URL.Query().Get(storageRESTInstanceID)
if instanceID != s.instanceID {
// This will cause the peer to revalidate format.json using a new storage-rest-client instance.
s.writeErrorResponse(w, errConnectionStale)
return false
}
diskID := r.URL.Query().Get(storageRESTDiskID)
if diskID == "" {
// Request sent empty disk-id, we allow the request
// as the peer might be coming up and trying to read format.json
// or create format.json
return true
}
// GetInstanceID - returns the instance ID of the server.
func (s *storageRESTServer) GetInstanceID(w http.ResponseWriter, r *http.Request) {
if err := storageServerRequestValidate(r); err != nil {
s.writeErrorResponse(w, err)
return
}
w.Header().Set(xhttp.ContentLength, strconv.Itoa(len(s.instanceID)))
w.Write([]byte(s.instanceID))
w.(http.Flusher).Flush()
storedDiskID, err := s.storage.getDiskID()
if err == nil && diskID == storedDiskID {
// If format.json is available and request sent the right disk-id, we allow the request
return true
}
s.writeErrorResponse(w, errDiskStale)
return false
}
// DiskInfoHandler - returns disk info.
@ -578,7 +571,7 @@ func registerStorageRESTHandlers(router *mux.Router, endpoints EndpointList) {
"Unable to initialize posix backend")
}
server := &storageRESTServer{storage, mustGetUUID()}
server := &storageRESTServer{storage: storage}
subrouter := router.PathPrefix(path.Join(storageRESTPath, endpoint.Path)).Subrouter()
@ -616,7 +609,6 @@ func registerStorageRESTHandlers(router *mux.Router, endpoints EndpointList) {
Queries(restQueries(storageRESTSrcVolume, storageRESTSrcPath, storageRESTDstVolume, storageRESTDstPath)...)
subrouter.Methods(http.MethodPost).Path(SlashSeparator + storageRESTMethodVerifyFile).HandlerFunc(httpTraceHdrs(server.VerifyFile)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTBitrotAlgo, storageRESTBitrotHash, storageRESTLength, storageRESTShardSize)...)
subrouter.Methods(http.MethodPost).Path(SlashSeparator + storageRESTMethodGetInstanceID).HandlerFunc(httpTraceAll(server.GetInstanceID))
}
router.MethodNotAllowedHandler = http.HandlerFunc(httpTraceAll(versionMismatchHandler))

View File

@ -1599,7 +1599,7 @@ func newTestObjectLayer(endpoints EndpointList) (newObject ObjectLayer, err erro
return NewFSObjectLayer(endpoints[0].Path)
}
_, err = waitForFormatXL(endpoints[0].IsLocal, endpoints, 1, 16)
format, err := waitForFormatXL(endpoints[0].IsLocal, endpoints, 1, 16)
if err != nil {
return nil, err
}
@ -1611,6 +1611,10 @@ func newTestObjectLayer(endpoints EndpointList) (newObject ObjectLayer, err erro
}
}
for i, disk := range storageDisks {
disk.SetDiskID(format.XL.Sets[0][i])
}
// Initialize list pool.
listPool := NewTreeWalkPool(globalLookupTimeout)

View File

@ -28,7 +28,6 @@ import (
"github.com/minio/minio/cmd/config/storageclass"
xhttp "github.com/minio/minio/cmd/http"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/bpool"
"github.com/minio/minio/pkg/lifecycle"
"github.com/minio/minio/pkg/madmin"
@ -41,11 +40,12 @@ type setsStorageAPI [][]StorageAPI
func (s setsStorageAPI) Close() error {
for i := 0; i < len(s); i++ {
for _, disk := range s[i] {
for j, disk := range s[i] {
if disk == nil {
continue
}
disk.Close()
s[i][j] = nil
}
}
return nil
@ -147,29 +147,6 @@ func findDiskIndex(refFormat, format *formatXLV3) (int, int, error) {
return -1, -1, fmt.Errorf("diskID: %s not found", format.XL.This)
}
// Re initializes all disks based on the reference format, this function is
// only used by HealFormat and ReloadFormat calls.
func (s *xlSets) reInitDisks(refFormat *formatXLV3, storageDisks []StorageAPI, formats []*formatXLV3) [][]StorageAPI {
xlDisks := make([][]StorageAPI, s.setCount)
for i := 0; i < len(refFormat.XL.Sets); i++ {
xlDisks[i] = make([]StorageAPI, s.drivesPerSet)
}
for k := range storageDisks {
if storageDisks[k] == nil || formats[k] == nil {
continue
}
i, j, err := findDiskIndex(refFormat, formats[k])
if err != nil {
reqInfo := (&logger.ReqInfo{}).AppendTags("storageDisk", storageDisks[i].String())
ctx := logger.SetReqInfo(context.Background(), reqInfo)
logger.LogIf(ctx, err)
continue
}
xlDisks[i][j] = storageDisks[k]
}
return xlDisks
}
// connectDisksWithQuorum is same as connectDisks but waits
// for quorum number of formatted disks to be online in
// any given sets.
@ -192,6 +169,7 @@ func (s *xlSets) connectDisksWithQuorum() {
printEndpointError(endpoint, err)
continue
}
disk.SetDiskID(format.XL.This)
s.xlDisks[i][j] = disk
onlineDisks++
}
@ -220,6 +198,7 @@ func (s *xlSets) connectDisks() {
printEndpointError(endpoint, err)
continue
}
disk.SetDiskID(format.XL.This)
s.xlDisksMu.Lock()
s.xlDisks[i][j] = disk
s.xlDisksMu.Unlock()
@ -1306,15 +1285,9 @@ func (s *xlSets) ReloadFormat(ctx context.Context, dryRun bool) (err error) {
// Replace the new format.
s.format = refFormat
s.xlDisksMu.Lock()
{
// Close all existing disks.
// Close all existing disks and reconnect all the disks.
s.xlDisks.Close()
// Re initialize disks, after saving the new reference format.
s.xlDisks = s.reInitDisks(refFormat, storageDisks, formats)
}
s.xlDisksMu.Unlock()
s.connectDisks()
// Restart monitoring loop to monitor reformatted disks again.
go s.monitorAndConnectEndpoints(defaultMonitorConnectEndpointInterval)
@ -1519,15 +1492,9 @@ func (s *xlSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealRe
// Replace with new reference format.
s.format = refFormat
s.xlDisksMu.Lock()
{
// Disconnect/relinquish all existing disks.
// Disconnect/relinquish all existing disks and reconnect the disks.
s.xlDisks.Close()
// Re initialize disks, after saving the new reference format.
s.xlDisks = s.reInitDisks(refFormat, storageDisks, tmpNewFormats)
}
s.xlDisksMu.Unlock()
s.connectDisks()
// Restart our monitoring loop to start monitoring newly formatted disks.
go s.monitorAndConnectEndpoints(defaultMonitorConnectEndpointInterval)

View File

@ -160,6 +160,23 @@ func NewSkipReader(r io.Reader, n int64) io.Reader {
return &SkipReader{r, n}
}
// SameFile returns if the files are same.
func SameFile(fi1, fi2 os.FileInfo) bool {
if !os.SameFile(fi1, fi2) {
return false
}
if !fi1.ModTime().Equal(fi2.ModTime()) {
return false
}
if fi1.Mode() != fi2.Mode() {
return false
}
if fi1.Size() != fi2.Size() {
return false
}
return true
}
// DirectIO alignment needs to be 4K. Defined here as
// directio.AlignSize is defined as 0 in MacOS causing divide by 0 error.
const directioAlignSize = 4096

View File

@ -101,3 +101,34 @@ func TestSkipReader(t *testing.T) {
}
}
}
func TestSameFile(t *testing.T) {
f, err := goioutil.TempFile("", "")
if err != nil {
t.Errorf("Error creating tmp file: %v", err)
}
tmpFile := f.Name()
f.Close()
defer os.Remove(f.Name())
fi1, err := os.Stat(tmpFile)
if err != nil {
t.Fatalf("Error Stat(): %v", err)
}
fi2, err := os.Stat(tmpFile)
if err != nil {
t.Fatalf("Error Stat(): %v", err)
}
if !SameFile(fi1, fi2) {
t.Fatal("Expected the files to be same")
}
if err = goioutil.WriteFile(tmpFile, []byte("aaa"), 0644); err != nil {
t.Fatal(err)
}
fi2, err = os.Stat(tmpFile)
if err != nil {
t.Fatalf("Error Stat(): %v", err)
}
if SameFile(fi1, fi2) {
t.Fatal("Expected the files not to be same")
}
}