diff --git a/erasure-createfile.go b/erasure-createfile.go index d2622a75c..0733cbc30 100644 --- a/erasure-createfile.go +++ b/erasure-createfile.go @@ -34,7 +34,7 @@ func erasureCreateFile(disks []StorageAPI, volume string, path string, partName hashWriters := newHashWriters(len(disks)) // Just pick one eInfo. - eInfo := eInfos[0] + eInfo := pickValidErasureInfo(eInfos) // Read until io.EOF, erasure codes data and writes to all disks. for { @@ -72,9 +72,11 @@ func erasureCreateFile(disks []StorageAPI, volume string, path string, partName // Erasure info update for checksum for each disks. newEInfos = make([]erasureInfo, len(disks)) for index, eInfo := range eInfos { - blockIndex := eInfo.Distribution[index] - 1 - newEInfos[index] = eInfo - newEInfos[index].Checksum = append(newEInfos[index].Checksum, checkSums[blockIndex]) + if eInfo.IsValid() { + blockIndex := eInfo.Distribution[index] - 1 + newEInfos[index] = eInfo + newEInfos[index].Checksum = append(newEInfos[index].Checksum, checkSums[blockIndex]) + } } // Return newEInfos. diff --git a/erasure-readfile.go b/erasure-readfile.go index fe573071f..165b5c811 100644 --- a/erasure-readfile.go +++ b/erasure-readfile.go @@ -42,7 +42,7 @@ func erasureReadFile(disks []StorageAPI, volume string, path string, partName st blockCheckSums := metaPartBlockChecksums(disks, eInfos, partName) // Pick one erasure info. - eInfo := eInfos[0] + eInfo := pickValidErasureInfo(eInfos) // Write until each parts are read and exhausted. for totalSizeLeft > 0 { @@ -71,6 +71,9 @@ func erasureReadFile(disks []StorageAPI, volume string, path string, partName st if !isValidBlock(disks, volume, path, toDiskIndex(blockIndex, eInfo.Distribution), blockCheckSums) { continue } + if disk == nil { + continue + } // Initialize shard slice and fill the data from each parts. enBlocks[blockIndex] = make([]byte, curEncBlockSize) // Read the necessary blocks. @@ -94,7 +97,7 @@ func erasureReadFile(disks []StorageAPI, volume string, path string, partName st // Check blocks if they are all zero in length, we have corruption return error. if checkBlockSize(enBlocks) == 0 { - return nil, errDataCorrupt + return nil, errXLDataCorrupt } // Verify if reconstruction is needed, proceed with reconstruction. @@ -139,8 +142,12 @@ func (e erasureInfo) PartObjectChecksum(partName string) checkSumInfo { // xlMetaPartBlockChecksums - get block checksums for a given part. func metaPartBlockChecksums(disks []StorageAPI, eInfos []erasureInfo, partName string) (blockCheckSums []checkSumInfo) { for index := range disks { - // Save the read checksums for a given part. - blockCheckSums = append(blockCheckSums, eInfos[index].PartObjectChecksum(partName)) + if eInfos[index].IsValid() { + // Save the read checksums for a given part. + blockCheckSums = append(blockCheckSums, eInfos[index].PartObjectChecksum(partName)) + } else { + blockCheckSums = append(blockCheckSums, checkSumInfo{}) + } } return blockCheckSums } diff --git a/format-config-v1.go b/format-config-v1.go index bfebb7764..32452b7db 100644 --- a/format-config-v1.go +++ b/format-config-v1.go @@ -21,25 +21,239 @@ import ( "errors" "fmt" "strings" + "sync" "github.com/skyrings/skyring-common/tools/uuid" ) +// fsFormat - structure holding 'fs' format. type fsFormat struct { Version string `json:"version"` } +// xlFormat - structure holding 'xl' format. type xlFormat struct { - Version string `json:"version"` - Disk string `json:"disk"` - JBOD []string `json:"jbod"` + Version string `json:"version"` // Version of 'xl' format. + Disk string `json:"disk"` // Disk field carries assigned disk uuid. + // JBOD field carries the input disk order generated the first + // time when fresh disks were supplied. + JBOD []string `json:"jbod"` } +// formatConfigV1 - structure holds format config version '1'. type formatConfigV1 struct { - Version string `json:"version"` - Format string `json:"format"` - FS *fsFormat `json:"fs,omitempty"` - XL *xlFormat `json:"xl,omitempty"` + Version string `json:"version"` // Version of the format config. + // Format indicates the backend format type, supports two values 'xl' and 'fs'. + Format string `json:"format"` + FS *fsFormat `json:"fs,omitempty"` // FS field holds fs format. + XL *xlFormat `json:"xl,omitempty"` // XL field holds xl format. +} + +/* + +All disks online +----------------- +- All Unformatted - format all and return success. +- Some Unformatted - format all and return success. +- Any JBOD inconsistent - return failure // Requires deep inspection, phase2. +- Some are corrupt (missing format.json) - return failure // Requires deep inspection, phase2. +- Any unrecognized disks - return failure + +Some disks are offline and we have quorum. +----------------- +- Some unformatted - no heal, return success. +- Any JBOD inconsistent - return failure // Requires deep inspection, phase2. +- Some are corrupt (missing format.json) - return failure // Requires deep inspection, phase2. +- Any unrecognized disks - return failure + +No read quorum +----------------- +failure for all cases. + +// Pseudo code for managing `format.json`. + +// Generic checks. +if (no quorum) return error +if (any disk is corrupt) return error // phase2 +if (jbod inconsistent) return error // phase2 +if (disks not recognized) // Always error. + +// Specific checks. +if (all disks online) + if (all disks return format.json) + if (jbod consistent) + if (all disks recognized) + return + else + if (all disks return format.json not found) + (initialize format) + return + else (some disks return format.json not found) + (heal format) + return + fi + fi +else // No healing at this point forward, some disks are offline or dead. + if (some disks return format.json not found) + if (with force) + // Offline disks are marked as dead. + (heal format) // Offline disks should be marked as dead. + return success + else (without force) + // --force is necessary to heal few drives, because some drives + // are offline. Offline disks will be marked as dead. + return error + fi +fi +*/ + +var errSomeDiskUnformatted = errors.New("some disks are found to be unformatted") +var errSomeDiskOffline = errors.New("some disks are offline") + +// Returns error slice into understandable errors. +func reduceFormatErrs(errs []error, diskCount int) error { + var errUnformattedDiskCount = 0 + var errDiskNotFoundCount = 0 + for _, err := range errs { + if err == errUnformattedDisk { + errUnformattedDiskCount++ + } else if err == errDiskNotFound { + errDiskNotFoundCount++ + } + } + // Returns errUnformattedDisk if all disks report unFormattedDisk. + if errUnformattedDiskCount == diskCount { + return errUnformattedDisk + } else if errUnformattedDiskCount < diskCount && errDiskNotFoundCount == 0 { + // Only some disks return unFormattedDisk and all disks are online. + return errSomeDiskUnformatted + } else if errUnformattedDiskCount < diskCount && errDiskNotFoundCount > 0 { + // Only some disks return unFormattedDisk and some disks are + // offline as well. + return errSomeDiskOffline + } + return nil +} + +// loadAllFormats - load all format config from all input disks in parallel. +func loadAllFormats(bootstrapDisks []StorageAPI) ([]*formatConfigV1, []error) { + // Initialize sync waitgroup. + var wg = &sync.WaitGroup{} + + // Initialize list of errors. + var sErrs = make([]error, len(bootstrapDisks)) + + // Initialize format configs. + var formatConfigs = make([]*formatConfigV1, len(bootstrapDisks)) + + // Make a volume entry on all underlying storage disks. + for index, disk := range bootstrapDisks { + wg.Add(1) + // Make a volume inside a go-routine. + go func(index int, disk StorageAPI) { + defer wg.Done() + formatConfig, lErr := loadFormat(disk) + if lErr != nil { + sErrs[index] = lErr + return + } + formatConfigs[index] = formatConfig + }(index, disk) + } + + // Wait for all make vol to finish. + wg.Wait() + + for _, err := range sErrs { + if err != nil { + // Return all formats and errors. + return formatConfigs, sErrs + } + } + // Return all formats and nil + return formatConfigs, nil +} + +// genericFormatCheck - validates and returns error. +// if (no quorum) return error +// if (any disk is corrupt) return error // phase2 +// if (jbod inconsistent) return error // phase2 +// if (disks not recognized) // Always error. +func genericFormatCheck(formatConfigs []*formatConfigV1, sErrs []error) (err error) { + // Calculate the errors. + var ( + errCorruptFormatCount = 0 + errCount = 0 + ) + + // Through all errors calculate the actual errors. + for _, lErr := range sErrs { + if lErr == nil { + continue + } + // These errors are good conditions, means disk is online. + if lErr == errUnformattedDisk || lErr == errVolumeNotFound { + continue + } + if lErr == errCorruptedFormat { + errCorruptFormatCount++ + } else { + errCount++ + } + } + + // Calculate read quorum. + readQuorum := len(formatConfigs)/2 + 1 + + // Validate the err count under tolerant limit. + if errCount > len(formatConfigs)-readQuorum { + return errXLReadQuorum + } + + // One of the disk has corrupt format, return error. + if errCorruptFormatCount > 0 { + return errCorruptedFormat + } + + // Validates if format and JBOD are consistent across all disks. + if err = checkFormatXL(formatConfigs); err != nil { + return err + } + + // Success.. + return nil +} + +// checkDisksConsistency - checks if all disks are consistent with all JBOD entries on all disks. +func checkDisksConsistency(formatConfigs []*formatConfigV1) error { + var disks = make([]string, len(formatConfigs)) + var disksFound = make(map[string]bool) + // Collect currently available disk uuids. + for index, formatConfig := range formatConfigs { + if formatConfig == nil { + continue + } + disks[index] = formatConfig.XL.Disk + } + // Validate collected uuids and verify JBOD. + for index, uuid := range disks { + if uuid == "" { + continue + } + var formatConfig = formatConfigs[index] + for _, savedUUID := range formatConfig.XL.JBOD { + if savedUUID == uuid { + disksFound[uuid] = true + } + } + } + // Check if all disks are found. + for _, value := range disksFound { + if !value { + return errors.New("Some disks not found in JBOD.") + } + } + return nil } // checkJBODConsistency - validate xl jbod order if they are consistent. @@ -60,7 +274,7 @@ func checkJBODConsistency(formatConfigs []*formatConfigV1) error { } savedJBODStr := strings.Join(format.XL.JBOD, ".") if jbodStr != savedJBODStr { - return errors.New("Inconsistent disks.") + return errors.New("Inconsistent JBOD found.") } } return nil @@ -87,10 +301,8 @@ func reorderDisks(bootstrapDisks []StorageAPI, formatConfigs []*formatConfigV1) } // Pick the first JBOD list to verify the order and construct new set of disk slice. var newDisks = make([]StorageAPI, len(bootstrapDisks)) - var unclaimedJBODIndex = make(map[int]struct{}) for fIndex, format := range formatConfigs { if format == nil { - unclaimedJBODIndex[fIndex] = struct{}{} continue } jIndex := findIndex(format.XL.Disk, savedJBOD) @@ -99,17 +311,6 @@ func reorderDisks(bootstrapDisks []StorageAPI, formatConfigs []*formatConfigV1) } newDisks[jIndex] = bootstrapDisks[fIndex] } - // Save the unclaimed jbods as well. - for index, disk := range newDisks { - if disk == nil { - for fIndex := range unclaimedJBODIndex { - newDisks[index] = bootstrapDisks[fIndex] - delete(unclaimedJBODIndex, fIndex) - break - } - continue - } - } return newDisks, nil } @@ -146,83 +347,104 @@ func loadFormat(disk StorageAPI) (format *formatConfigV1, err error) { // Heals any missing format.json on the drives. Returns error only for unexpected errors // as regular errors can be ignored since there might be enough quorum to be operational. func healFormatXL(bootstrapDisks []StorageAPI) error { + needHeal := make([]bool, len(bootstrapDisks)) // Slice indicating which drives needs healing. + + formatConfigs := make([]*formatConfigV1, len(bootstrapDisks)) + var referenceConfig *formatConfigV1 + successCount := 0 // Tracks if we have successfully loaded all `format.json` from all disks. + formatNotFoundCount := 0 // Tracks if we `format.json` is not found on all disks. + // Loads `format.json` from all disks. + for index, disk := range bootstrapDisks { + formatXL, err := loadFormat(disk) + if err != nil { + if err == errUnformattedDisk { + // format.json is missing, should be healed. + needHeal[index] = true + formatNotFoundCount++ + continue + } else if err == errDiskNotFound { // Is a valid case we + // can proceed without healing. + return nil + } + // Return error for unsupported errors. + return err + } // Success. + formatConfigs[index] = formatXL + successCount++ + } + // All `format.json` has been read successfully, previously completed. + if successCount == len(bootstrapDisks) { + // Return success. + return nil + } + // All disks are fresh, format.json will be written by initFormatXL() + if formatNotFoundCount == len(bootstrapDisks) { + return initFormatXL(bootstrapDisks) + } + // Validate format configs for consistency in JBOD and disks. + if err := checkFormatXL(formatConfigs); err != nil { + return err + } + + if referenceConfig == nil { + // This config will be used to update the drives missing format.json. + for _, formatConfig := range formatConfigs { + if formatConfig == nil { + continue + } + referenceConfig = formatConfig + break + } + } + uuidUsage := make([]struct { uuid string // Disk uuid - inuse bool // indicates if the uuid is used by any disk + inUse bool // indicates if the uuid is used by + // any disk }, len(bootstrapDisks)) - needHeal := make([]bool, len(bootstrapDisks)) // Slice indicating which drives needs healing. - // Returns any unused drive UUID. getUnusedUUID := func() string { for index := range uuidUsage { - if !uuidUsage[index].inuse { - uuidUsage[index].inuse = true + if !uuidUsage[index].inUse { + uuidUsage[index].inUse = true return uuidUsage[index].uuid } } return "" } - formatConfigs := make([]*formatConfigV1, len(bootstrapDisks)) - var referenceConfig *formatConfigV1 - for index, disk := range bootstrapDisks { - formatXL, err := loadFormat(disk) - if err == errUnformattedDisk { - // format.json is missing, should be healed. - needHeal[index] = true - continue - } - if err == nil { - if referenceConfig == nil { - // this config will be used to update the drives missing format.json - referenceConfig = formatXL - } - formatConfigs[index] = formatXL - } else { - // Abort format.json healing if any one of the drives is not available because we don't - // know if that drive is down permanently or temporarily. So we don't want to reuse - // its uuid for any other disks. - // Return nil so that operations can continue if quorum is available. - return nil - } - } - if referenceConfig == nil { - // All disks are fresh, format.json will be written by initFormatXL() - return nil - } // From reference config update UUID's not be in use. for index, diskUUID := range referenceConfig.XL.JBOD { uuidUsage[index].uuid = diskUUID - uuidUsage[index].inuse = false + uuidUsage[index].inUse = false } - // For all config formats validate if they are in use and update - // the uuidUsage values. + // For all config formats validate if they are in use and + // update the uuidUsage values. for _, config := range formatConfigs { if config == nil { continue } for index := range uuidUsage { if config.XL.Disk == uuidUsage[index].uuid { - uuidUsage[index].inuse = true + uuidUsage[index].inUse = true break } } } - // This section heals the format.json and updates the fresh disks - // by reapply the unused UUID's . + // by apply a new UUID for all the fresh disks. for index, heal := range needHeal { if !heal { - // Previously we detected that heal is not needed on the disk. continue } config := &formatConfigV1{} *config = *referenceConfig config.XL.Disk = getUnusedUUID() if config.XL.Disk == "" { - // getUnusedUUID() should have returned an unused uuid, it + // getUnusedUUID() should have + // returned an unused uuid, it // is an unexpected error. return errUnexpected } @@ -231,10 +453,10 @@ func healFormatXL(bootstrapDisks []StorageAPI) error { if err != nil { return err } - // Fresh disk without format.json _, _ = bootstrapDisks[index].AppendFile(minioMetaBucket, formatConfigFile, formatBytes) - // Ignore any error from AppendFile() as quorum might still be there to be operational. + // Ignore any error from AppendFile() as + // quorum might still be there to be operational. } return nil } @@ -246,12 +468,6 @@ func loadFormatXL(bootstrapDisks []StorageAPI) (disks []StorageAPI, err error) { var diskNotFoundCount = 0 formatConfigs := make([]*formatConfigV1, len(bootstrapDisks)) - // Heal missing format.json on the drives. - if err = healFormatXL(bootstrapDisks); err != nil { - // There was an unexpected unrecoverable error during healing. - return nil, err - } - // Try to load `format.json` bootstrap disks. for index, disk := range bootstrapDisks { var formatXL *formatConfigV1 @@ -277,9 +493,9 @@ func loadFormatXL(bootstrapDisks []StorageAPI) (disks []StorageAPI, err error) { } else if diskNotFoundCount == len(bootstrapDisks) { return nil, errDiskNotFound } else if diskNotFoundCount > len(bootstrapDisks)-(len(bootstrapDisks)/2+1) { - return nil, errReadQuorum + return nil, errXLReadQuorum } else if unformattedDisksFoundCnt > len(bootstrapDisks)-(len(bootstrapDisks)/2+1) { - return nil, errReadQuorum + return nil, errXLReadQuorum } // Validate the format configs read are correct. @@ -310,7 +526,10 @@ func checkFormatXL(formatConfigs []*formatConfigV1) error { return fmt.Errorf("Number of disks %d did not match the backend format %d", len(formatConfigs), len(formatXL.XL.JBOD)) } } - return checkJBODConsistency(formatConfigs) + if err := checkJBODConsistency(formatConfigs); err != nil { + return err + } + return checkDisksConsistency(formatConfigs) } // initFormatXL - save XL format configuration on all disks. @@ -328,7 +547,7 @@ func initFormatXL(storageDisks []StorageAPI) (err error) { if saveFormatErrCnt <= len(storageDisks)-(len(storageDisks)/2+3) { continue } - return errWriteQuorum + return errXLWriteQuorum } } var u *uuid.UUID diff --git a/object-common.go b/object-common.go index 9bad88337..330bee093 100644 --- a/object-common.go +++ b/object-common.go @@ -66,6 +66,10 @@ func xlHouseKeeping(storageDisks []StorageAPI) error { // Initialize all disks in parallel. for index, disk := range storageDisks { + if disk == nil { + errs[index] = errDiskNotFound + continue + } wg.Add(1) go func(index int, disk StorageAPI) { // Indicate this wait group is done. @@ -73,11 +77,9 @@ func xlHouseKeeping(storageDisks []StorageAPI) error { // Attempt to create `.minio`. err := disk.MakeVol(minioMetaBucket) - if err != nil { - if err != errVolumeExists && err != errDiskNotFound { - errs[index] = err - return - } + if err != nil && err != errVolumeExists && err != errDiskNotFound { + errs[index] = err + return } // Cleanup all temp entries upon start. err = cleanupDir(disk, minioMetaBucket, tmpMetaPrefix) @@ -130,5 +132,6 @@ func cleanupDir(storage StorageAPI, volume, dirPath string) error { } return nil } - return delFunc(retainSlash(pathJoin(dirPath))) + err := delFunc(retainSlash(pathJoin(dirPath))) + return err } diff --git a/object-errors.go b/object-errors.go index f22272738..78431ffb5 100644 --- a/object-errors.go +++ b/object-errors.go @@ -40,10 +40,6 @@ func toObjectErr(err error, params ...string) error { } case errDiskFull: return StorageFull{} - case errReadQuorum: - return InsufficientReadQuorum{} - case errWriteQuorum: - return InsufficientWriteQuorum{} case errIsNotRegular, errFileAccessDenied: if len(params) >= 2 { return ObjectExistsAsDirectory{ @@ -65,6 +61,10 @@ func toObjectErr(err error, params ...string) error { Object: params[1], } } + case errXLReadQuorum: + return InsufficientReadQuorum{} + case errXLWriteQuorum: + return InsufficientWriteQuorum{} case io.ErrUnexpectedEOF, io.ErrShortWrite: return IncompleteBody{} } diff --git a/posix.go b/posix.go index 03007df61..c543be094 100644 --- a/posix.go +++ b/posix.go @@ -219,6 +219,11 @@ func (s posix) ListVols() (volsInfo []VolInfo, err error) { // StatVol - get volume info. func (s posix) StatVol(volume string) (volInfo VolInfo, err error) { + // Validate if disk is free. + if err = checkDiskFree(s.diskPath, s.minFreeDisk); err != nil { + return VolInfo{}, err + } + // Verify if volume is valid and it exists. volumeDir, err := s.getVolDir(volume) if err != nil { diff --git a/routers.go b/routers.go index c8a40de84..78c742a1f 100644 --- a/routers.go +++ b/routers.go @@ -33,7 +33,7 @@ func newObjectLayer(exportPaths []string) (ObjectLayer, error) { } // Initialize XL object layer. objAPI, err := newXLObjects(exportPaths) - if err == errWriteQuorum { + if err == errXLWriteQuorum { return objAPI, errors.New("Disks are different with last minio server run.") } return objAPI, err diff --git a/storage-errors.go b/storage-errors.go index 5d9f9c42a..c92a82828 100644 --- a/storage-errors.go +++ b/storage-errors.go @@ -51,18 +51,8 @@ var errVolumeNotFound = errors.New("volume not found") // errVolumeNotEmpty - volume not empty. var errVolumeNotEmpty = errors.New("volume is not empty") -// errVolumeAccessDenied - cannot access volume, insufficient -// permissions. +// errVolumeAccessDenied - cannot access volume, insufficient permissions. var errVolumeAccessDenied = errors.New("volume access denied") // errVolumeAccessDenied - cannot access file, insufficient permissions. var errFileAccessDenied = errors.New("file access denied") - -// errReadQuorum - did not meet read quorum. -var errReadQuorum = errors.New("I/O error. did not meet read quorum.") - -// errWriteQuorum - did not meet write quorum. -var errWriteQuorum = errors.New("I/O error. did not meet write quorum.") - -// errDataCorrupt - err data corrupt. -var errDataCorrupt = errors.New("data likely corrupted, all blocks are zero in length") diff --git a/tree-walk-xl.go b/tree-walk-xl.go index 804fcccb6..f49a6c931 100644 --- a/tree-walk-xl.go +++ b/tree-walk-xl.go @@ -48,6 +48,9 @@ type treeWalker struct { // listDir - listDir. func (xl xlObjects) listDir(bucket, prefixDir string, filter func(entry string) bool, isLeaf func(string, string) bool) (entries []string, err error) { for _, disk := range xl.getLoadBalancedQuorumDisks() { + if disk == nil { + continue + } entries, err = disk.ListDir(bucket, prefixDir) if err != nil { break diff --git a/xl-v1-bucket.go b/xl-v1-bucket.go index 4ec22f020..26e8aa3ff 100644 --- a/xl-v1-bucket.go +++ b/xl-v1-bucket.go @@ -45,6 +45,10 @@ func (xl xlObjects) MakeBucket(bucket string) error { // Make a volume entry on all underlying storage disks. for index, disk := range xl.storageDisks { + if disk == nil { + dErrs[index] = errDiskNotFound + continue + } wg.Add(1) // Make a volume inside a go-routine. go func(index int, disk StorageAPI) { @@ -77,11 +81,11 @@ func (xl xlObjects) MakeBucket(bucket string) error { } // Return err if all disks report volume exists. - if volumeExistsErrCnt == len(xl.storageDisks) { + if volumeExistsErrCnt > len(xl.storageDisks)-xl.readQuorum { return toObjectErr(errVolumeExists, bucket) } else if createVolErr > len(xl.storageDisks)-xl.writeQuorum { - // Return errWriteQuorum if errors were more than allowed write quorum. - return toObjectErr(errWriteQuorum, bucket) + // Return errXLWriteQuorum if errors were more than allowed write quorum. + return toObjectErr(errXLWriteQuorum, bucket) } return nil } @@ -89,9 +93,16 @@ func (xl xlObjects) MakeBucket(bucket string) error { // getBucketInfo - returns the BucketInfo from one of the load balanced disks. func (xl xlObjects) getBucketInfo(bucketName string) (bucketInfo BucketInfo, err error) { for _, disk := range xl.getLoadBalancedQuorumDisks() { + if disk == nil { + continue + } var volInfo VolInfo volInfo, err = disk.StatVol(bucketName) if err != nil { + // For some reason disk went offline pick the next one. + if err == errDiskNotFound { + continue + } return BucketInfo{}, err } bucketInfo = BucketInfo{ @@ -138,6 +149,9 @@ func (xl xlObjects) GetBucketInfo(bucket string) (BucketInfo, error) { // listBuckets - returns list of all buckets from a disk picked at random. func (xl xlObjects) listBuckets() (bucketsInfo []BucketInfo, err error) { for _, disk := range xl.getLoadBalancedQuorumDisks() { + if disk == nil { + continue + } var volsInfo []VolInfo volsInfo, err = disk.ListVols() if err == nil { @@ -193,6 +207,10 @@ func (xl xlObjects) DeleteBucket(bucket string) error { // Remove a volume entry on all underlying storage disks. for index, disk := range xl.storageDisks { + if disk == nil { + dErrs[index] = errDiskNotFound + continue + } wg.Add(1) // Delete volume inside a go-routine. go func(index int, disk StorageAPI) { diff --git a/xl-v1-common.go b/xl-v1-common.go index d79edf5f4..446e8ffa2 100644 --- a/xl-v1-common.go +++ b/xl-v1-common.go @@ -58,6 +58,9 @@ func (xl xlObjects) parentDirIsObject(bucket, parent string) bool { // `xl.json` exists at the leaf, false otherwise. func (xl xlObjects) isObject(bucket, prefix string) bool { for _, disk := range xl.getLoadBalancedQuorumDisks() { + if disk == nil { + continue + } _, err := disk.StatFile(bucket, path.Join(prefix, xlMetaJSONFile)) if err != nil { return false @@ -70,6 +73,9 @@ func (xl xlObjects) isObject(bucket, prefix string) bool { // statPart - returns fileInfo structure for a successful stat on part file. func (xl xlObjects) statPart(bucket, objectPart string) (fileInfo FileInfo, err error) { for _, disk := range xl.getLoadBalancedQuorumDisks() { + if disk == nil { + continue + } fileInfo, err = disk.StatFile(bucket, objectPart) if err != nil { return FileInfo{}, err diff --git a/xl-v1-healing.go b/xl-v1-healing.go index a77cd102e..4adb05112 100644 --- a/xl-v1-healing.go +++ b/xl-v1-healing.go @@ -38,6 +38,10 @@ func (xl xlObjects) readAllXLMetadata(bucket, object string) ([]xlMetaV1, []erro xlMetaPath := path.Join(object, xlMetaJSONFile) var wg = &sync.WaitGroup{} for index, disk := range xl.storageDisks { + if disk == nil { + errs[index] = errDiskNotFound + continue + } wg.Add(1) go func(index int, disk StorageAPI) { defer wg.Done() @@ -138,7 +142,7 @@ func (xl xlObjects) shouldHeal(onlineDisks []StorageAPI) (heal bool) { // Verify if online disks count are lesser than readQuorum // threshold, return an error. if onlineDiskCount < xl.readQuorum { - errorIf(errReadQuorum, "Unable to establish read quorum, disks are offline.") + errorIf(errXLReadQuorum, "Unable to establish read quorum, disks are offline.") return false } } diff --git a/xl-v1-metadata.go b/xl-v1-metadata.go index 22ea5eaa5..066e4ef0e 100644 --- a/xl-v1-metadata.go +++ b/xl-v1-metadata.go @@ -65,6 +65,24 @@ type erasureInfo struct { Checksum []checkSumInfo `json:"checksum,omitempty"` } +// IsValid - tells if the erasure info is sane by validating the data +// blocks, parity blocks and distribution. +func (e erasureInfo) IsValid() bool { + return e.DataBlocks != 0 && e.ParityBlocks != 0 && len(e.Distribution) != 0 +} + +// pickValidErasureInfo - picks one valid erasure info content and returns, from a +// slice of erasure info content. If no value is found this function panics +// and dies. +func pickValidErasureInfo(eInfos []erasureInfo) erasureInfo { + for _, eInfo := range eInfos { + if eInfo.IsValid() { + return eInfo + } + } + panic("Unable to look for valid erasure info content") +} + // statInfo - carries stat information of the object. type statInfo struct { Size int64 `json:"size"` // Size of the object `xl.json`. @@ -185,6 +203,9 @@ func pickValidXLMeta(xlMetas []xlMetaV1) xlMetaV1 { // one of the disks picked at random. func (xl xlObjects) readXLMetadata(bucket, object string) (xlMeta xlMetaV1, err error) { for _, disk := range xl.getLoadBalancedQuorumDisks() { + if disk == nil { + continue + } var buf []byte buf, err = readAll(disk, bucket, path.Join(object, xlMetaJSONFile)) if err != nil { @@ -208,6 +229,10 @@ func (xl xlObjects) renameXLMetadata(srcBucket, srcPrefix, dstBucket, dstPrefix dstJSONFile := path.Join(dstPrefix, xlMetaJSONFile) // Rename `xl.json` to all disks in parallel. for index, disk := range xl.storageDisks { + if disk == nil { + mErrs[index] = errDiskNotFound + continue + } wg.Add(1) // Rename `xl.json` in a routine. go func(index int, disk StorageAPI) { @@ -230,16 +255,49 @@ func (xl xlObjects) renameXLMetadata(srcBucket, srcPrefix, dstBucket, dstPrefix // Wait for all the routines. wg.Wait() - // Return the first error. + // Gather err count. + var errCount = 0 for _, err := range mErrs { if err == nil { continue } - return err + errCount++ + } + // We can safely allow RenameFile errors up to len(xl.storageDisks) - xl.writeQuorum + // otherwise return failure. Cleanup successful renames. + if errCount > len(xl.storageDisks)-xl.writeQuorum { + // Check we have successful read quorum. + if errCount <= len(xl.storageDisks)-xl.readQuorum { + return nil // Return success. + } // else - failed to acquire read quorum. + + // Undo rename `xl.json` on disks where RenameFile succeeded. + for index, disk := range xl.storageDisks { + if disk == nil { + continue + } + // Undo rename object in parallel. + wg.Add(1) + go func(index int, disk StorageAPI) { + defer wg.Done() + if mErrs[index] != nil { + return + } + _ = disk.RenameFile(dstBucket, dstJSONFile, srcBucket, srcJSONFile) + }(index, disk) + } + wg.Wait() + return errXLWriteQuorum } return nil } +// deleteXLMetadata - deletes `xl.json` on a single disk. +func deleteXLMetdata(disk StorageAPI, bucket, prefix string) error { + jsonFile := path.Join(prefix, xlMetaJSONFile) + return disk.DeleteFile(bucket, jsonFile) +} + // writeXLMetadata - writes `xl.json` to a single disk. func writeXLMetadata(disk StorageAPI, bucket, prefix string, xlMeta xlMetaV1) error { jsonFile := path.Join(prefix, xlMetaJSONFile) @@ -267,6 +325,10 @@ func (xl xlObjects) writeUniqueXLMetadata(bucket, prefix string, xlMetas []xlMet // Start writing `xl.json` to all disks in parallel. for index, disk := range xl.storageDisks { + if disk == nil { + mErrs[index] = errDiskNotFound + continue + } wg.Add(1) // Write `xl.json` in a routine. go func(index int, disk StorageAPI) { @@ -287,24 +349,52 @@ func (xl xlObjects) writeUniqueXLMetadata(bucket, prefix string, xlMetas []xlMet // Wait for all the routines. wg.Wait() + var errCount = 0 // Return the first error. for _, err := range mErrs { if err == nil { continue } - return err + errCount++ + } + // Count all the errors and validate if we have write quorum. + if errCount > len(xl.storageDisks)-xl.writeQuorum { + // Validate if we have read quorum, then return success. + if errCount > len(xl.storageDisks)-xl.readQuorum { + return nil + } + // Delete all the `xl.json` left over. + for index, disk := range xl.storageDisks { + if disk == nil { + continue + } + // Undo rename object in parallel. + wg.Add(1) + go func(index int, disk StorageAPI) { + defer wg.Done() + if mErrs[index] != nil { + return + } + _ = deleteXLMetdata(disk, bucket, prefix) + }(index, disk) + } + wg.Wait() + return errXLWriteQuorum } - return nil } -// writeXLMetadata - write `xl.json` on all disks in order. -func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) error { +// writeSameXLMetadata - write `xl.json` on all disks in order. +func (xl xlObjects) writeSameXLMetadata(bucket, prefix string, xlMeta xlMetaV1) error { var wg = &sync.WaitGroup{} var mErrs = make([]error, len(xl.storageDisks)) // Start writing `xl.json` to all disks in parallel. for index, disk := range xl.storageDisks { + if disk == nil { + mErrs[index] = errDiskNotFound + continue + } wg.Add(1) // Write `xl.json` in a routine. go func(index int, disk StorageAPI, metadata xlMetaV1) { @@ -325,12 +415,37 @@ func (xl xlObjects) writeXLMetadata(bucket, prefix string, xlMeta xlMetaV1) erro // Wait for all the routines. wg.Wait() + var errCount = 0 // Return the first error. for _, err := range mErrs { if err == nil { continue } - return err + errCount++ + } + // Count all the errors and validate if we have write quorum. + if errCount > len(xl.storageDisks)-xl.writeQuorum { + // Validate if we have read quorum, then return success. + if errCount > len(xl.storageDisks)-xl.readQuorum { + return nil + } + // Delete all the `xl.json` left over. + for index, disk := range xl.storageDisks { + if disk == nil { + continue + } + // Undo rename object in parallel. + wg.Add(1) + go func(index int, disk StorageAPI) { + defer wg.Done() + if mErrs[index] != nil { + return + } + _ = deleteXLMetdata(disk, bucket, prefix) + }(index, disk) + } + wg.Wait() + return errXLWriteQuorum } return nil } diff --git a/xl-v1-multipart-common.go b/xl-v1-multipart-common.go index c380231a1..a2a0214df 100644 --- a/xl-v1-multipart-common.go +++ b/xl-v1-multipart-common.go @@ -92,6 +92,10 @@ func updateUploadsJSON(bucket, object string, uploadsJSON uploadsV1, storageDisk // Update `uploads.json` for all the disks. for index, disk := range storageDisks { + if disk == nil { + errs[index] = errDiskNotFound + continue + } wg.Add(1) // Update `uploads.json` in routine. go func(index int, disk StorageAPI) { @@ -120,13 +124,41 @@ func updateUploadsJSON(bucket, object string, uploadsJSON uploadsV1, storageDisk // Wait for all the routines to finish updating `uploads.json` wg.Wait() + // For only single disk return first error. + if len(storageDisks) == 1 { + return errs[0] + } // else count all the errors for quorum validation. + var errCount = 0 // Return for first error. for _, err := range errs { if err != nil { - return err + errCount++ } } - + // Count all the errors and validate if we have write quorum. + if errCount > len(storageDisks)-len(storageDisks)/2+3 { + // Validate if we have read quorum return success. + if errCount > len(storageDisks)-len(storageDisks)/2+1 { + return nil + } + // Rename `uploads.json` left over back to tmp location. + for index, disk := range storageDisks { + if disk == nil { + continue + } + // Undo rename `uploads.json` in parallel. + wg.Add(1) + go func(index int, disk StorageAPI) { + defer wg.Done() + if errs[index] != nil { + return + } + _ = disk.RenameFile(minioMetaBucket, uploadsPath, minioMetaBucket, tmpUploadsPath) + }(index, disk) + } + wg.Wait() + return errXLWriteQuorum + } return nil } @@ -149,6 +181,9 @@ func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, stora var uploadsJSON uploadsV1 for _, disk := range storageDisks { + if disk == nil { + continue + } uploadsJSON, err = readUploadsJSON(bucket, object, disk) break } @@ -170,6 +205,10 @@ func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, stora // Update `uploads.json` on all disks. for index, disk := range storageDisks { + if disk == nil { + errs[index] = errDiskNotFound + continue + } wg.Add(1) // Update `uploads.json` in a routine. go func(index int, disk StorageAPI) { @@ -205,13 +244,41 @@ func writeUploadJSON(bucket, object, uploadID string, initiated time.Time, stora // Wait for all the writes to finish. wg.Wait() - // Return for first error encountered. - for _, err = range errs { + // For only single disk return first error. + if len(storageDisks) == 1 { + return errs[0] + } // else count all the errors for quorum validation. + var errCount = 0 + // Return for first error. + for _, err := range errs { if err != nil { - return err + errCount++ } } - + // Count all the errors and validate if we have write quorum. + if errCount > len(storageDisks)-len(storageDisks)/2+3 { + // Validate if we have read quorum return success. + if errCount > len(storageDisks)-len(storageDisks)/2+1 { + return nil + } + // Rename `uploads.json` left over back to tmp location. + for index, disk := range storageDisks { + if disk == nil { + continue + } + // Undo rename `uploads.json` in parallel. + wg.Add(1) + go func(index int, disk StorageAPI) { + defer wg.Done() + if errs[index] != nil { + return + } + _ = disk.RenameFile(minioMetaBucket, uploadsPath, minioMetaBucket, tmpUploadsPath) + }(index, disk) + } + wg.Wait() + return errXLWriteQuorum + } return nil } @@ -225,6 +292,10 @@ func cleanupUploadedParts(bucket, object, uploadID string, storageDisks ...Stora // Cleanup uploadID for all disks. for index, disk := range storageDisks { + if disk == nil { + errs[index] = errDiskNotFound + continue + } wg.Add(1) // Cleanup each uploadID in a routine. go func(index int, disk StorageAPI) { @@ -287,6 +358,9 @@ func listMultipartUploadIDs(bucketName, objectName, uploadIDMarker string, count // Returns if the prefix is a multipart upload. func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool { for _, disk := range xl.getLoadBalancedQuorumDisks() { + if disk == nil { + continue + } _, err := disk.StatFile(bucket, pathJoin(prefix, uploadsJSONFile)) if err != nil { return false @@ -299,6 +373,9 @@ func (xl xlObjects) isMultipartUpload(bucket, prefix string) bool { // listUploadsInfo - list all uploads info. func (xl xlObjects) listUploadsInfo(prefixPath string) (uploadsInfo []uploadInfo, err error) { for _, disk := range xl.getLoadBalancedQuorumDisks() { + if disk == nil { + continue + } splitPrefixes := strings.SplitN(prefixPath, "/", 3) uploadsJSON, err := readUploadsJSON(splitPrefixes[1], splitPrefixes[2], disk) if err != nil { @@ -324,6 +401,9 @@ func (xl xlObjects) removeObjectPart(bucket, object, uploadID, partName string) curpartPath := path.Join(mpartMetaPrefix, bucket, object, uploadID, partName) wg := sync.WaitGroup{} for i, disk := range xl.storageDisks { + if disk == nil { + continue + } wg.Add(1) go func(index int, disk StorageAPI) { defer wg.Done() diff --git a/xl-v1-multipart.go b/xl-v1-multipart.go index ac0d4249b..ff02d3a41 100644 --- a/xl-v1-multipart.go +++ b/xl-v1-multipart.go @@ -63,8 +63,13 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark // uploadIDMarker first. if uploadIDMarker != "" { nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker)) - disk := xl.getLoadBalancedQuorumDisks()[0] - uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, disk) + for _, disk := range xl.getLoadBalancedQuorumDisks() { + if disk == nil { + continue + } + uploads, _, err = listMultipartUploadIDs(bucket, keyMarker, uploadIDMarker, maxUploads, disk) + break + } nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, keyMarker)) if err != nil { return ListMultipartsInfo{}, err @@ -114,7 +119,13 @@ func (xl xlObjects) listMultipartUploads(bucket, prefix, keyMarker, uploadIDMark uploadIDMarker = "" // For the new object entry we get all its pending uploadIDs. nsMutex.RLock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry)) - disk := xl.getLoadBalancedQuorumDisks()[0] + var disk StorageAPI + for _, disk = range xl.getLoadBalancedQuorumDisks() { + if disk == nil { + continue + } + break + } newUploads, end, err = listMultipartUploadIDs(bucket, entry, uploadIDMarker, maxUploads, disk) nsMutex.RUnlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, entry)) if err != nil { @@ -248,7 +259,8 @@ func (xl xlObjects) newMultipartUpload(bucket string, object string, meta map[st } uploadIDPath := path.Join(mpartMetaPrefix, bucket, object, uploadID) tempUploadIDPath := path.Join(tmpMetaPrefix, uploadID) - if err = xl.writeXLMetadata(minioMetaBucket, tempUploadIDPath, xlMeta); err != nil { + // Write updated `xl.json` to all disks. + if err = xl.writeSameXLMetadata(minioMetaBucket, tempUploadIDPath, xlMeta); err != nil { return "", toObjectErr(err, minioMetaBucket, tempUploadIDPath) } rErr := xl.renameObject(minioMetaBucket, tempUploadIDPath, minioMetaBucket, uploadIDPath) @@ -646,7 +658,13 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload // Validate if there are other incomplete upload-id's present for // the object, if yes do not attempt to delete 'uploads.json'. - disk := xl.getLoadBalancedQuorumDisks()[0] + var disk StorageAPI + for _, disk = range xl.getLoadBalancedQuorumDisks() { + if disk == nil { + continue + } + break + } uploadsJSON, err := readUploadsJSON(bucket, object, disk) if err != nil { return "", toObjectErr(err, minioMetaBucket, object) @@ -688,7 +706,13 @@ func (xl xlObjects) abortMultipartUpload(bucket, object, uploadID string) (err e defer nsMutex.Unlock(minioMetaBucket, pathJoin(mpartMetaPrefix, bucket, object)) // Validate if there are other incomplete upload-id's present for // the object, if yes do not attempt to delete 'uploads.json'. - disk := xl.getLoadBalancedQuorumDisks()[0] + var disk StorageAPI + for _, disk = range xl.getLoadBalancedQuorumDisks() { + if disk == nil { + continue + } + break + } uploadsJSON, err := readUploadsJSON(bucket, object, disk) if err != nil { return toObjectErr(err, bucket, object) diff --git a/xl-v1-object.go b/xl-v1-object.go index ea398eaa2..526212385 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -178,6 +178,10 @@ func (xl xlObjects) renameObject(srcBucket, srcObject, dstBucket, dstObject stri // Rename file on all underlying storage disks. for index, disk := range xl.storageDisks { + if disk == nil { + errs[index] = errDiskNotFound + continue + } // Append "/" as srcObject and dstObject are either leaf-dirs or non-leaf-dris. // If srcObject is an object instead of prefix we just rename the leaf-dir and // not rename the part and metadata files separately. @@ -212,6 +216,9 @@ func (xl xlObjects) renameObject(srcBucket, srcObject, dstBucket, dstObject stri // Undo rename object on disks where RenameFile succeeded. for index, disk := range xl.storageDisks { + if disk == nil { + continue + } // Undo rename object in parallel. wg.Add(1) go func(index int, disk StorageAPI) { @@ -223,7 +230,7 @@ func (xl xlObjects) renameObject(srcBucket, srcObject, dstBucket, dstObject stri }(index, disk) } wg.Wait() - return errWriteQuorum + return errXLWriteQuorum } return nil } @@ -382,6 +389,10 @@ func (xl xlObjects) deleteObject(bucket, object string) error { var dErrs = make([]error, len(xl.storageDisks)) for index, disk := range xl.storageDisks { + if disk == nil { + dErrs[index] = errDiskNotFound + continue + } wg.Add(1) go func(index int, disk StorageAPI) { defer wg.Done() @@ -416,9 +427,9 @@ func (xl xlObjects) deleteObject(bucket, object string) error { if fileNotFoundCnt == len(xl.storageDisks) { return errFileNotFound } else if deleteFileErr > len(xl.storageDisks)-xl.writeQuorum { - // Return errWriteQuorum if errors were more than + // Return errXLWriteQuorum if errors were more than // allowed write quorum. - return errWriteQuorum + return errXLWriteQuorum } return nil diff --git a/xl-v1-utils.go b/xl-v1-utils.go index 8a161ceab..a9722d56d 100644 --- a/xl-v1-utils.go +++ b/xl-v1-utils.go @@ -20,6 +20,7 @@ import ( "bytes" "io" "math/rand" + "path" "time" ) @@ -38,14 +39,40 @@ func randInts(count int) []int { return ints } -// readAll reads from bucket, object until an error or returns the data it read until io.EOF. -func readAll(disk StorageAPI, bucket, object string) ([]byte, error) { +// readAll - returns contents from volume/path as byte array. +func readAll(disk StorageAPI, volume string, path string) ([]byte, error) { var writer = new(bytes.Buffer) startOffset := int64(0) + + // Allocate 10MiB buffer. + buf := make([]byte, blockSizeV1) + // Read until io.EOF. for { - buf := make([]byte, blockSizeV1) - n, err := disk.ReadFile(bucket, object, startOffset, buf) + n, err := disk.ReadFile(volume, path, startOffset, buf) + if err == io.EOF { + break + } + if err != nil && err != io.EOF { + return nil, err + } + writer.Write(buf[:n]) + startOffset += n + } + return writer.Bytes(), nil +} + +// readXLMeta reads `xl.json` returns contents as byte array. +func readXLMeta(disk StorageAPI, bucket string, object string) ([]byte, error) { + var writer = new(bytes.Buffer) + startOffset := int64(0) + + // Allocate 2MiB buffer, this is sufficient for the most of `xl.json`. + buf := make([]byte, 2*1024*1024) + + // Read until io.EOF. + for { + n, err := disk.ReadFile(bucket, path.Join(object, xlMetaJSONFile), startOffset, buf) if err == io.EOF { break } diff --git a/xl-v1.go b/xl-v1.go index 4287c8001..2211bbfd5 100644 --- a/xl-v1.go +++ b/xl-v1.go @@ -58,6 +58,15 @@ var errXLMinDisks = errors.New("Number of disks are smaller than supported minim // errXLNumDisks - returned for odd number of disks. var errXLNumDisks = errors.New("Number of disks should be multiples of '2'") +// errXLReadQuorum - did not meet read quorum. +var errXLReadQuorum = errors.New("I/O error. did not meet read quorum.") + +// errXLWriteQuorum - did not meet write quorum. +var errXLWriteQuorum = errors.New("I/O error. did not meet write quorum.") + +// errXLDataCorrupt - err data corrupt. +var errXLDataCorrupt = errors.New("data likely corrupted, all blocks are zero in length") + const ( // Maximum erasure blocks. maxErasureBlocks = 16 @@ -112,21 +121,37 @@ func newXLObjects(disks []string) (ObjectLayer, error) { // Runs house keeping code, like creating minioMetaBucket, cleaning up tmp files etc. xlHouseKeeping(storageDisks) + // Attempt to load all `format.json` + formatConfigs, sErrs := loadAllFormats(storageDisks) + + // Generic format check validates all necessary cases. + if err := genericFormatCheck(formatConfigs, sErrs); err != nil { + return nil, err + } + + // Handles different cases properly. + switch reduceFormatErrs(sErrs, len(storageDisks)) { + case errUnformattedDisk: + // All drives online but fresh, initialize format. + if err := initFormatXL(storageDisks); err != nil { + return nil, fmt.Errorf("Unable to initialize format, %s", err) + } + case errSomeDiskUnformatted: + // All drives online but some report missing format.json. + if err := healFormatXL(storageDisks); err != nil { + // There was an unexpected unrecoverable error during healing. + return nil, fmt.Errorf("Unable to heal backend %s", err) + } + case errSomeDiskOffline: + // Some disks offline but some report missing format.json. + // FIXME. + } + // Load saved XL format.json and validate. newPosixDisks, err := loadFormatXL(storageDisks) if err != nil { - switch err { - case errUnformattedDisk: - // Save new XL format. - errSave := initFormatXL(storageDisks) - if errSave != nil { - return nil, errSave - } - newPosixDisks = storageDisks - default: - // errCorruptedDisk - error. - return nil, fmt.Errorf("Unable to recognize backend format, %s", err) - } + // errCorruptedDisk - healing failed + return nil, fmt.Errorf("Unable to recognize backend format, %s", err) } // Calculate data and parity blocks.