Send the lower level error directly from GetDiskID() (#10095)

this is to detect situations of corruption disk
format etc errors quickly and keep the disk online
in such scenarios for requests to fail appropriately.
This commit is contained in:
Harshavardhana 2020-07-21 13:54:06 -07:00 committed by GitHub
parent e464a5bfbc
commit a880283593
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 47 additions and 49 deletions

View File

@ -212,10 +212,10 @@ func shouldHealObjectOnDisk(erErr, dataErr error, meta FileInfo, quorumModTime t
}...) { }...) {
return true return true
} }
if meta.XLV1 { if !quorumModTime.Equal(meta.ModTime) {
return true return true
} }
if !quorumModTime.Equal(meta.ModTime) { if meta.XLV1 {
return true return true
} }
} }
@ -368,6 +368,11 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
partsMetadata[i] = cleanFileInfo(latestMeta) partsMetadata[i] = cleanFileInfo(latestMeta)
} }
dataDir := latestMeta.DataDir
if latestMeta.XLV1 {
dataDir = migrateDataDir
}
if !latestMeta.Deleted { if !latestMeta.Deleted {
result.DataBlocks = latestMeta.Erasure.DataBlocks result.DataBlocks = latestMeta.Erasure.DataBlocks
result.ParityBlocks = latestMeta.Erasure.ParityBlocks result.ParityBlocks = latestMeta.Erasure.ParityBlocks
@ -399,7 +404,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
continue continue
} }
checksumInfo := partsMetadata[i].Erasure.GetChecksumInfo(partNumber) checksumInfo := partsMetadata[i].Erasure.GetChecksumInfo(partNumber)
partPath := pathJoin(object, latestMeta.DataDir, fmt.Sprintf("part.%d", partNumber)) partPath := pathJoin(object, dataDir, fmt.Sprintf("part.%d", partNumber))
if latestMeta.XLV1 { if latestMeta.XLV1 {
partPath = pathJoin(object, fmt.Sprintf("part.%d", partNumber)) partPath = pathJoin(object, fmt.Sprintf("part.%d", partNumber))
} }
@ -410,10 +415,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
if disk == OfflineDisk { if disk == OfflineDisk {
continue continue
} }
partPath := pathJoin(tmpID, latestMeta.DataDir, fmt.Sprintf("part.%d", partNumber)) partPath := pathJoin(tmpID, dataDir, fmt.Sprintf("part.%d", partNumber))
if latestMeta.XLV1 {
partPath = pathJoin(tmpID, migrateDataDir, fmt.Sprintf("part.%d", partNumber))
}
writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, partPath, tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize()) writers[i] = newBitrotWriter(disk, minioMetaTmpBucket, partPath, tillOffset, DefaultBitrotAlgorithm, erasure.ShardSize())
} }
err = erasure.Heal(ctx, readers, writers, partSize) err = erasure.Heal(ctx, readers, writers, partSize)
@ -437,9 +439,7 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
continue continue
} }
if partsMetadata[i].XLV1 { partsMetadata[i].DataDir = dataDir
partsMetadata[i].DataDir = migrateDataDir
}
partsMetadata[i].AddObjectPart(partNumber, "", partSize, partActualSize) partsMetadata[i].AddObjectPart(partNumber, "", partSize, partActualSize)
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{ partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{
PartNumber: partNumber, PartNumber: partNumber,
@ -465,13 +465,13 @@ func (er erasureObjects) healObject(ctx context.Context, bucket string, object s
} }
// Rename from tmp location to the actual location. // Rename from tmp location to the actual location.
for _, disk := range outDatedDisks { for i, disk := range outDatedDisks {
if disk == OfflineDisk { if disk == OfflineDisk {
continue continue
} }
// Attempt a rename now from healed data to final location. // Attempt a rename now from healed data to final location.
if err = disk.RenameData(minioMetaTmpBucket, tmpID, partsMetadata[0].DataDir, bucket, object); err != nil { if err = disk.RenameData(minioMetaTmpBucket, tmpID, partsMetadata[i].DataDir, bucket, object); err != nil {
if err != errIsNotRegular && err != errFileNotFound { if err != errIsNotRegular && err != errFileNotFound {
logger.LogIf(ctx, err) logger.LogIf(ctx, err)
} }

View File

@ -1940,13 +1940,21 @@ func (z *erasureZones) HealObjects(ctx context.Context, bucket, prefix string, o
} }
func (z *erasureZones) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) { func (z *erasureZones) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
// Lock the object before healing. Use read lock since healing
// will only regenerate parts & xl.meta of outdated disks.
lk := z.NewNSLock(ctx, bucket, object) lk := z.NewNSLock(ctx, bucket, object)
if err := lk.GetRLock(globalHealingTimeout); err != nil { if bucket == minioMetaBucket {
return madmin.HealResultItem{}, err // For .minio.sys bucket heals we should hold write locks.
if err := lk.GetLock(globalHealingTimeout); err != nil {
return madmin.HealResultItem{}, err
}
defer lk.Unlock()
} else {
// Lock the object before healing. Use read lock since healing
// will only regenerate parts & xl.meta of outdated disks.
if err := lk.GetRLock(globalHealingTimeout); err != nil {
return madmin.HealResultItem{}, err
}
defer lk.RUnlock()
} }
defer lk.RUnlock()
if z.SingleZone() { if z.SingleZone() {
return z.zones[0].HealObject(ctx, bucket, object, versionID, opts) return z.zones[0].HealObject(ctx, bucket, object, versionID, opts)

View File

@ -131,29 +131,11 @@ func createFormatCache(fsFormatPath string, format *formatCacheV1) error {
// of format cache config // of format cache config
func initFormatCache(ctx context.Context, drives []string) (formats []*formatCacheV2, err error) { func initFormatCache(ctx context.Context, drives []string) (formats []*formatCacheV2, err error) {
nformats := newFormatCacheV2(drives) nformats := newFormatCacheV2(drives)
for _, drive := range drives {
_, err = os.Stat(drive)
if err == nil {
continue
}
if !os.IsNotExist(err) {
logger.GetReqInfo(ctx).AppendTags("drive", drive)
logger.LogIf(ctx, err, logger.Application)
return nil, err
}
if err = os.Mkdir(drive, 0777); err != nil {
logger.GetReqInfo(ctx).AppendTags("drive", drive)
logger.LogIf(ctx, err, logger.Application)
return nil, err
}
}
for i, drive := range drives { for i, drive := range drives {
if err = os.Mkdir(pathJoin(drive, minioMetaBucket), 0777); err != nil { if err = os.MkdirAll(pathJoin(drive, minioMetaBucket), 0777); err != nil {
if !os.IsExist(err) { logger.GetReqInfo(ctx).AppendTags("drive", drive)
logger.GetReqInfo(ctx).AppendTags("drive", drive) logger.LogIf(ctx, err)
logger.LogIf(ctx, err) return nil, err
return nil, err
}
} }
cacheFormatPath := pathJoin(drive, minioMetaBucket, formatConfigFile) cacheFormatPath := pathJoin(drive, minioMetaBucket, formatConfigFile)
// Fresh disk - create format.json for this cfs // Fresh disk - create format.json for this cfs

View File

@ -103,6 +103,8 @@ func toStorageErr(err error) error {
return io.ErrUnexpectedEOF return io.ErrUnexpectedEOF
case errDiskStale.Error(): case errDiskStale.Error():
return errDiskNotFound return errDiskNotFound
case errDiskNotFound.Error():
return errDiskNotFound
} }
return err return err
} }

View File

@ -114,12 +114,18 @@ func (s *storageRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool
return true return true
} }
storedDiskID, err := s.storage.GetDiskID() storedDiskID, err := s.storage.GetDiskID()
if err == nil && diskID == storedDiskID { if err != nil {
// If format.json is available and request sent the right disk-id, we allow the request s.writeErrorResponse(w, err)
return true return false
} }
s.writeErrorResponse(w, errDiskStale)
return false if diskID != storedDiskID {
s.writeErrorResponse(w, errDiskStale)
return false
}
// If format.json is available and request sent the right disk-id, we allow the request
return true
} }
// HealthHandler handler checks if disk is stale // HealthHandler handler checks if disk is stale

View File

@ -509,7 +509,7 @@ func (s *xlStorage) GetDiskID() (string, error) {
if os.IsNotExist(err) { if os.IsNotExist(err) {
_, err = os.Stat(s.diskPath) _, err = os.Stat(s.diskPath)
if err == nil { if err == nil {
// Disk is present by missing `format.json` // Disk is present but missing `format.json`
return "", errUnformattedDisk return "", errUnformattedDisk
} }
if os.IsNotExist(err) { if os.IsNotExist(err) {
@ -2112,7 +2112,7 @@ func (s *xlStorage) RenameData(srcVolume, srcPath, dataDir, dstVolume, dstPath s
} }
legacyDataPath := pathJoin(dstVolumeDir, dstPath, legacyDataDir) legacyDataPath := pathJoin(dstVolumeDir, dstPath, legacyDataDir)
// legacy data dir means its old content, honor system umask. // legacy data dir means its old content, honor system umask.
if err = os.Mkdir(legacyDataPath, 0777); err != nil { if err = os.MkdirAll(legacyDataPath, 0777); err != nil {
if isSysErrIO(err) { if isSysErrIO(err) {
return errFaultyDisk return errFaultyDisk
} }
@ -2133,10 +2133,10 @@ func (s *xlStorage) RenameData(srcVolume, srcPath, dataDir, dstVolume, dstPath s
} }
return osErrToFileErr(err) return osErrToFileErr(err)
} }
// Sync all the metadata operations once renames are done.
globalSync()
} }
// Sync all the metadata operations once renames are done.
globalSync()
} }
var oldDstDataPath string var oldDstDataPath string