mirror of
https://github.com/minio/minio.git
synced 2025-03-31 17:53:43 -04:00
xl/heal: Make healFile non-blocking for StatFile and ReadFile. (#1399)
Fixes #1355
This commit is contained in:
parent
b51bef85e6
commit
5fffd558d0
@ -37,6 +37,7 @@ func (xl XL) healFile(volume string, path string) error {
|
|||||||
xl.lockNS(volume, path, readLock)
|
xl.lockNS(volume, path, readLock)
|
||||||
defer xl.unlockNS(volume, path, readLock)
|
defer xl.unlockNS(volume, path, readLock)
|
||||||
|
|
||||||
|
// Fetch all online disks.
|
||||||
onlineDisks, metadata, heal, err := xl.listOnlineDisks(volume, path)
|
onlineDisks, metadata, heal, err := xl.listOnlineDisks(volume, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithFields(logrus.Fields{
|
log.WithFields(logrus.Fields{
|
||||||
|
@ -49,19 +49,19 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if heal {
|
if heal {
|
||||||
if err = xl.healFile(volume, path); err != nil {
|
// Heal in background safely, since we already have read
|
||||||
log.WithFields(logrus.Fields{
|
// quorum disks. Let the reads continue.
|
||||||
"volume": volume,
|
go func() {
|
||||||
"path": path,
|
if err = xl.healFile(volume, path); err != nil {
|
||||||
}).Errorf("healFile failed with %s", err)
|
log.WithFields(logrus.Fields{
|
||||||
return nil, err
|
"volume": volume,
|
||||||
}
|
"path": path,
|
||||||
|
}).Errorf("healFile failed with %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Acquire read lock again.
|
|
||||||
xl.lockNS(volume, path, readLock)
|
|
||||||
defer xl.unlockNS(volume, path, readLock)
|
|
||||||
|
|
||||||
fileSize, err := metadata.GetSize()
|
fileSize, err := metadata.GetSize()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithFields(logrus.Fields{
|
log.WithFields(logrus.Fields{
|
||||||
@ -71,6 +71,8 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Acquire read lock again.
|
||||||
|
xl.lockNS(volume, path, readLock)
|
||||||
readers := make([]io.ReadCloser, len(xl.storageDisks))
|
readers := make([]io.ReadCloser, len(xl.storageDisks))
|
||||||
for index, disk := range onlineDisks {
|
for index, disk := range onlineDisks {
|
||||||
if disk == nil {
|
if disk == nil {
|
||||||
@ -84,6 +86,7 @@ func (xl XL) ReadFile(volume, path string, offset int64) (io.ReadCloser, error)
|
|||||||
readers[index] = reader
|
readers[index] = reader
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
xl.unlockNS(volume, path, readLock)
|
||||||
|
|
||||||
// Initialize pipe.
|
// Initialize pipe.
|
||||||
pipeReader, pipeWriter := io.Pipe()
|
pipeReader, pipeWriter := io.Pipe()
|
||||||
|
17
xl-v1.go
17
xl-v1.go
@ -536,13 +536,16 @@ func (xl XL) StatFile(volume, path string) (FileInfo, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if heal {
|
if heal {
|
||||||
if err = xl.healFile(volume, path); err != nil {
|
// Heal in background safely, since we already have read quorum disks.
|
||||||
log.WithFields(logrus.Fields{
|
go func() {
|
||||||
"volume": volume,
|
if err = xl.healFile(volume, path); err != nil {
|
||||||
"path": path,
|
log.WithFields(logrus.Fields{
|
||||||
}).Errorf("healFile failed with %s", err)
|
"volume": volume,
|
||||||
return FileInfo{}, err
|
"path": path,
|
||||||
}
|
}).Errorf("healFile failed with %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Extract metadata.
|
// Extract metadata.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user