mirror of
https://github.com/minio/minio.git
synced 2025-01-11 15:03:22 -05:00
fix: retain the previous UUID for newly replaced drives (#10759)
only newly replaced drives get the new `format.json`, this avoids disks reloading their in-memory reference format, ensures that drives are online without reloading the in-memory reference format. keeping reference format in-tact means UUIDs never change once they are formatted.
This commit is contained in:
parent
649035677f
commit
029758cb20
@ -73,8 +73,6 @@ func initAutoHeal(ctx context.Context, objAPI ObjectLayer) {
|
||||
}
|
||||
}
|
||||
|
||||
go monitorLocalDisksInconsistentAndHeal(ctx, z, bgSeq)
|
||||
|
||||
go monitorLocalDisksAndHeal(ctx, z, bgSeq)
|
||||
}
|
||||
|
||||
@ -98,50 +96,6 @@ func getLocalDisksToHeal() (disksToHeal Endpoints) {
|
||||
|
||||
}
|
||||
|
||||
func getLocalDisksToHealInconsistent() (refFormats []*formatErasureV3, diskFormats [][]*formatErasureV3, disksToHeal [][]StorageAPI) {
|
||||
disksToHeal = make([][]StorageAPI, len(globalEndpoints))
|
||||
diskFormats = make([][]*formatErasureV3, len(globalEndpoints))
|
||||
refFormats = make([]*formatErasureV3, len(globalEndpoints))
|
||||
for k, ep := range globalEndpoints {
|
||||
disksToHeal[k] = make([]StorageAPI, len(ep.Endpoints))
|
||||
diskFormats[k] = make([]*formatErasureV3, len(ep.Endpoints))
|
||||
formats := make([]*formatErasureV3, len(ep.Endpoints))
|
||||
storageDisks, _ := initStorageDisksWithErrors(ep.Endpoints)
|
||||
for i, disk := range storageDisks {
|
||||
if disk != nil {
|
||||
format, err := loadFormatErasure(disk)
|
||||
if err != nil {
|
||||
// any error we don't care proceed.
|
||||
continue
|
||||
}
|
||||
formats[i] = format
|
||||
}
|
||||
}
|
||||
refFormat, err := getFormatErasureInQuorum(formats)
|
||||
if err != nil {
|
||||
logger.LogIf(GlobalContext, fmt.Errorf("No erasured disks are in quorum or too many disks are offline - please investigate immediately"))
|
||||
continue
|
||||
}
|
||||
// We have obtained reference format - check if disks are inconsistent
|
||||
for i, format := range formats {
|
||||
if format == nil {
|
||||
continue
|
||||
}
|
||||
if err := formatErasureV3Check(refFormat, format); err != nil {
|
||||
if errors.Is(err, errInconsistentDisk) {
|
||||
// Found inconsistencies - check which disk it is.
|
||||
if storageDisks[i] != nil && storageDisks[i].IsLocal() {
|
||||
disksToHeal[k][i] = storageDisks[i]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
refFormats[k] = refFormat
|
||||
diskFormats[k] = formats
|
||||
}
|
||||
return refFormats, diskFormats, disksToHeal
|
||||
}
|
||||
|
||||
func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) {
|
||||
// Run the background healer
|
||||
globalBackgroundHealRoutine = newHealRoutine()
|
||||
@ -150,36 +104,6 @@ func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) {
|
||||
globalBackgroundHealState.LaunchNewHealSequence(newBgHealSequence())
|
||||
}
|
||||
|
||||
// monitorLocalDisksInconsistentAndHeal - ensures that inconsistent
|
||||
// disks are healed appropriately.
|
||||
func monitorLocalDisksInconsistentAndHeal(ctx context.Context, z *erasureServerSets, bgSeq *healSequence) {
|
||||
// Perform automatic disk healing when a disk is found to be inconsistent.
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(defaultMonitorNewDiskInterval):
|
||||
waitForLowHTTPReq(int32(globalEndpoints.NEndpoints()), time.Second)
|
||||
|
||||
refFormats, diskFormats, localDisksHeal := getLocalDisksToHealInconsistent()
|
||||
for k := range refFormats {
|
||||
for j, disk := range localDisksHeal[k] {
|
||||
if disk == nil {
|
||||
continue
|
||||
}
|
||||
format := diskFormats[k][j].Clone()
|
||||
format.Erasure.Sets = refFormats[k].Erasure.Sets
|
||||
if err := saveFormatErasure(disk, format, true); err != nil {
|
||||
logger.LogIf(ctx, fmt.Errorf("Unable fix inconsistent format for drive %s: %w", disk, err))
|
||||
continue
|
||||
}
|
||||
globalBackgroundHealState.pushHealLocalDisks(disk.Endpoint())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// monitorLocalDisksAndHeal - ensures that detected new disks are healed
|
||||
// 1. Only the concerned erasure set will be listed and healed
|
||||
// 2. Only the node hosting the disk is responsible to perform the heal
|
||||
|
@ -20,7 +20,6 @@ import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
@ -235,16 +234,7 @@ func newBootstrapRESTClient(endpoint Endpoint) *bootstrapRESTClient {
|
||||
|
||||
trFn := newInternodeHTTPTransport(tlsConfig, rest.DefaultTimeout)
|
||||
restClient := rest.NewClient(serverURL, trFn, newAuthToken)
|
||||
restClient.HealthCheckFn = func() bool {
|
||||
ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout)
|
||||
// Instantiate a new rest client for healthcheck
|
||||
// to avoid recursive healthCheckFn()
|
||||
respBody, err := rest.NewClient(serverURL, trFn, newAuthToken).Call(ctx, bootstrapRESTMethodHealth, nil, nil, -1)
|
||||
xhttp.DrainBody(respBody)
|
||||
cancel()
|
||||
var ne *rest.NetworkError
|
||||
return !errors.Is(err, context.DeadlineExceeded) && !errors.As(err, &ne)
|
||||
}
|
||||
restClient.HealthCheckFn = nil
|
||||
|
||||
return &bootstrapRESTClient{endpoint: endpoint, restClient: restClient}
|
||||
}
|
||||
|
@ -203,8 +203,9 @@ func TestErasureDecode(t *testing.T) {
|
||||
// This test is t.Skip()ed as it a long time to run, hence should be run
|
||||
// explicitly after commenting out t.Skip()
|
||||
func TestErasureDecodeRandomOffsetLength(t *testing.T) {
|
||||
// Comment the following line to run this test.
|
||||
t.SkipNow()
|
||||
if testing.Short() {
|
||||
t.Skip()
|
||||
}
|
||||
// Initialize environment needed for the test.
|
||||
dataBlocks := 7
|
||||
parityBlocks := 7
|
||||
|
@ -133,7 +133,7 @@ func (s *erasureSets) getDiskMap() map[string]StorageAPI {
|
||||
// Initializes a new StorageAPI from the endpoint argument, returns
|
||||
// StorageAPI and also `format` which exists on the disk.
|
||||
func connectEndpoint(endpoint Endpoint) (StorageAPI, *formatErasureV3, error) {
|
||||
disk, err := newStorageAPI(endpoint)
|
||||
disk, err := newStorageAPIWithoutHealthCheck(endpoint)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@ -221,7 +221,7 @@ func (s *erasureSets) connectDisks() {
|
||||
}
|
||||
return
|
||||
}
|
||||
if endpoint.IsLocal && disk.Healing() {
|
||||
if disk.IsLocal() && disk.Healing() {
|
||||
globalBackgroundHealState.pushHealLocalDisks(disk.Endpoint())
|
||||
logger.Info(fmt.Sprintf("Found the drive %s that needs healing, attempting to heal...", disk))
|
||||
}
|
||||
@ -232,13 +232,24 @@ func (s *erasureSets) connectDisks() {
|
||||
printEndpointError(endpoint, err, false)
|
||||
return
|
||||
}
|
||||
disk.SetDiskID(format.Erasure.This)
|
||||
|
||||
s.erasureDisksMu.Lock()
|
||||
if s.erasureDisks[setIndex][diskIndex] != nil {
|
||||
s.erasureDisks[setIndex][diskIndex].Close()
|
||||
}
|
||||
s.erasureDisks[setIndex][diskIndex] = disk
|
||||
if disk.IsLocal() {
|
||||
disk.SetDiskID(format.Erasure.This)
|
||||
s.erasureDisks[setIndex][diskIndex] = disk
|
||||
} else {
|
||||
// Enable healthcheck disk for remote endpoint.
|
||||
disk, err = newStorageAPI(endpoint)
|
||||
if err != nil {
|
||||
printEndpointError(endpoint, err, false)
|
||||
return
|
||||
}
|
||||
disk.SetDiskID(format.Erasure.This)
|
||||
s.erasureDisks[setIndex][diskIndex] = disk
|
||||
}
|
||||
s.endpointStrings[setIndex*s.setDriveCount+diskIndex] = disk.String()
|
||||
s.erasureDisksMu.Unlock()
|
||||
go func(setIndex int) {
|
||||
@ -1132,7 +1143,7 @@ func formatsToDrivesInfo(endpoints Endpoints, formats []*formatErasureV3, sErrs
|
||||
// Reloads the format from the disk, usually called by a remote peer notifier while
|
||||
// healing in a distributed setup.
|
||||
func (s *erasureSets) ReloadFormat(ctx context.Context, dryRun bool) (err error) {
|
||||
storageDisks, errs := initStorageDisksWithErrors(s.endpoints)
|
||||
storageDisks, errs := initStorageDisksWithErrorsWithoutHealthCheck(s.endpoints)
|
||||
for i, err := range errs {
|
||||
if err != nil && err != errDiskNotFound {
|
||||
return fmt.Errorf("Disk %s: %w", s.endpoints[i], err)
|
||||
@ -1182,6 +1193,15 @@ func (s *erasureSets) ReloadFormat(ctx context.Context, dryRun bool) (err error)
|
||||
}
|
||||
|
||||
s.endpointStrings[m*s.setDriveCount+n] = disk.String()
|
||||
if !disk.IsLocal() {
|
||||
// Enable healthcheck disk for remote endpoint.
|
||||
disk, err = newStorageAPI(disk.Endpoint())
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
disk.SetDiskID(diskID)
|
||||
}
|
||||
|
||||
s.erasureDisks[m][n] = disk
|
||||
}
|
||||
|
||||
@ -1249,7 +1269,7 @@ func markRootDisksAsDown(storageDisks []StorageAPI, errs []error) {
|
||||
|
||||
// HealFormat - heals missing `format.json` on fresh unformatted disks.
|
||||
func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.HealResultItem, err error) {
|
||||
storageDisks, errs := initStorageDisksWithErrors(s.endpoints)
|
||||
storageDisks, errs := initStorageDisksWithErrorsWithoutHealthCheck(s.endpoints)
|
||||
for i, derr := range errs {
|
||||
if derr != nil && derr != errDiskNotFound {
|
||||
return madmin.HealResultItem{}, fmt.Errorf("Disk %s: %w", s.endpoints[i], derr)
|
||||
@ -1298,40 +1318,9 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
|
||||
return res, errNoHealRequired
|
||||
}
|
||||
|
||||
// Mark all UUIDs which might be offline, use list
|
||||
// of formats to mark them appropriately.
|
||||
markUUIDsOffline(refFormat, formats, sErrs)
|
||||
|
||||
// Initialize a new set of set formats which will be written to disk.
|
||||
newFormatSets := newHealFormatSets(refFormat, s.setCount, s.setDriveCount, formats, sErrs)
|
||||
|
||||
// Look for all offline/unformatted disks in our reference format,
|
||||
// such that we can fill them up with new UUIDs, this looping also
|
||||
// ensures that the replaced disks allocated evenly across all sets.
|
||||
// Making sure that the redundancy is not lost.
|
||||
for i := range refFormat.Erasure.Sets {
|
||||
for j := range refFormat.Erasure.Sets[i] {
|
||||
if refFormat.Erasure.Sets[i][j] == offlineDiskUUID {
|
||||
for l := range newFormatSets[i] {
|
||||
if newFormatSets[i][l] == nil {
|
||||
continue
|
||||
}
|
||||
if newFormatSets[i][l].Erasure.This == "" {
|
||||
newFormatSets[i][l].Erasure.This = mustGetUUID()
|
||||
refFormat.Erasure.Sets[i][j] = newFormatSets[i][l].Erasure.This
|
||||
for m, v := range res.After.Drives {
|
||||
if v.Endpoint == s.endpoints.GetString(i*s.setDriveCount+l) {
|
||||
res.After.Drives[m].UUID = newFormatSets[i][l].Erasure.This
|
||||
res.After.Drives[m].State = madmin.DriveStateOk
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !dryRun {
|
||||
var tmpNewFormats = make([]*formatErasureV3, s.setCount*s.setDriveCount)
|
||||
for i := range newFormatSets {
|
||||
@ -1339,8 +1328,9 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
|
||||
if newFormatSets[i][j] == nil {
|
||||
continue
|
||||
}
|
||||
res.After.Drives[i*s.setDriveCount+j].UUID = newFormatSets[i][j].Erasure.This
|
||||
res.After.Drives[i*s.setDriveCount+j].State = madmin.DriveStateOk
|
||||
tmpNewFormats[i*s.setDriveCount+j] = newFormatSets[i][j]
|
||||
tmpNewFormats[i*s.setDriveCount+j].Erasure.Sets = refFormat.Erasure.Sets
|
||||
}
|
||||
}
|
||||
|
||||
@ -1382,7 +1372,16 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
|
||||
}
|
||||
|
||||
s.endpointStrings[m*s.setDriveCount+n] = disk.String()
|
||||
if !disk.IsLocal() {
|
||||
// Enable healthcheck disk for remote endpoint.
|
||||
disk, err = newStorageAPI(disk.Endpoint())
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
disk.SetDiskID(diskID)
|
||||
}
|
||||
s.erasureDisks[m][n] = disk
|
||||
|
||||
}
|
||||
|
||||
s.erasureDisksMu.Unlock()
|
||||
|
@ -716,7 +716,10 @@ func saveFormatErasureAllWithErrs(ctx context.Context, storageDisks []StorageAPI
|
||||
if formats[index] == nil {
|
||||
return errDiskNotFound
|
||||
}
|
||||
return saveFormatErasure(storageDisks[index], formats[index], errors.Is(fErrs[index], errUnformattedDisk))
|
||||
if errors.Is(fErrs[index], errUnformattedDisk) {
|
||||
return saveFormatErasure(storageDisks[index], formats[index], true)
|
||||
}
|
||||
return nil
|
||||
}, index)
|
||||
}
|
||||
|
||||
@ -755,6 +758,20 @@ func closeStorageDisks(storageDisks []StorageAPI) {
|
||||
}
|
||||
}
|
||||
|
||||
func initStorageDisksWithErrorsWithoutHealthCheck(endpoints Endpoints) ([]StorageAPI, []error) {
|
||||
// Bootstrap disks.
|
||||
storageDisks := make([]StorageAPI, len(endpoints))
|
||||
g := errgroup.WithNErrs(len(endpoints))
|
||||
for index := range endpoints {
|
||||
index := index
|
||||
g.Go(func() (err error) {
|
||||
storageDisks[index], err = newStorageAPIWithoutHealthCheck(endpoints[index])
|
||||
return err
|
||||
}, index)
|
||||
}
|
||||
return storageDisks, g.Wait()
|
||||
}
|
||||
|
||||
// Initialize storage disks for each endpoint.
|
||||
// Errors are returned for each endpoint with matching index.
|
||||
func initStorageDisksWithErrors(endpoints Endpoints) ([]StorageAPI, []error) {
|
||||
@ -905,63 +922,6 @@ func makeFormatErasureMetaVolumes(disk StorageAPI) error {
|
||||
return disk.MakeVolBulk(context.TODO(), minioMetaBucket, minioMetaTmpBucket, minioMetaMultipartBucket, dataUsageBucket)
|
||||
}
|
||||
|
||||
// Get all UUIDs which are present in reference format should
|
||||
// be present in the list of formats provided, those are considered
|
||||
// as online UUIDs.
|
||||
func getOnlineUUIDs(refFormat *formatErasureV3, formats []*formatErasureV3) (onlineUUIDs []string) {
|
||||
for _, format := range formats {
|
||||
if format == nil {
|
||||
continue
|
||||
}
|
||||
for _, set := range refFormat.Erasure.Sets {
|
||||
for _, uuid := range set {
|
||||
if format.Erasure.This == uuid {
|
||||
onlineUUIDs = append(onlineUUIDs, uuid)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return onlineUUIDs
|
||||
}
|
||||
|
||||
// Look for all UUIDs which are not present in reference format
|
||||
// but are present in the onlineUUIDs list, construct of list such
|
||||
// offline UUIDs.
|
||||
func getOfflineUUIDs(refFormat *formatErasureV3, formats []*formatErasureV3) (offlineUUIDs []string) {
|
||||
onlineUUIDs := getOnlineUUIDs(refFormat, formats)
|
||||
for i, set := range refFormat.Erasure.Sets {
|
||||
for j, uuid := range set {
|
||||
var found bool
|
||||
for _, onlineUUID := range onlineUUIDs {
|
||||
if refFormat.Erasure.Sets[i][j] == onlineUUID {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
offlineUUIDs = append(offlineUUIDs, uuid)
|
||||
}
|
||||
}
|
||||
}
|
||||
return offlineUUIDs
|
||||
}
|
||||
|
||||
// Mark all UUIDs that are offline.
|
||||
func markUUIDsOffline(refFormat *formatErasureV3, formats []*formatErasureV3, errs []error) {
|
||||
offlineUUIDs := getOfflineUUIDs(refFormat, formats)
|
||||
for i, set := range refFormat.Erasure.Sets {
|
||||
setDriveCount := len(set)
|
||||
for j := range set {
|
||||
for _, offlineUUID := range offlineUUIDs {
|
||||
if refFormat.Erasure.Sets[i][j] == offlineUUID &&
|
||||
errors.Is(errs[i*setDriveCount+j], errUnformattedDisk) {
|
||||
// Unformatted drive gets an offline disk UUID
|
||||
refFormat.Erasure.Sets[i][j] = offlineDiskUUID
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize a new set of set formats which will be written to all disks.
|
||||
func newHealFormatSets(refFormat *formatErasureV3, setCount, setDriveCount int, formats []*formatErasureV3, errs []error) [][]*formatErasureV3 {
|
||||
newFormats := make([][]*formatErasureV3, setCount)
|
||||
@ -970,23 +930,16 @@ func newHealFormatSets(refFormat *formatErasureV3, setCount, setDriveCount int,
|
||||
}
|
||||
for i := range refFormat.Erasure.Sets {
|
||||
for j := range refFormat.Erasure.Sets[i] {
|
||||
if errs[i*setDriveCount+j] == errUnformattedDisk || errs[i*setDriveCount+j] == nil {
|
||||
if errors.Is(errs[i*setDriveCount+j], errUnformattedDisk) {
|
||||
newFormats[i][j] = &formatErasureV3{}
|
||||
newFormats[i][j].Version = refFormat.Version
|
||||
newFormats[i][j].ID = refFormat.ID
|
||||
newFormats[i][j].Format = refFormat.Format
|
||||
newFormats[i][j].Version = refFormat.Version
|
||||
newFormats[i][j].Erasure.This = refFormat.Erasure.Sets[i][j]
|
||||
newFormats[i][j].Erasure.Sets = refFormat.Erasure.Sets
|
||||
newFormats[i][j].Erasure.Version = refFormat.Erasure.Version
|
||||
newFormats[i][j].Erasure.DistributionAlgo = refFormat.Erasure.DistributionAlgo
|
||||
}
|
||||
if errs[i*setDriveCount+j] == errUnformattedDisk {
|
||||
newFormats[i][j].Erasure.This = ""
|
||||
newFormats[i][j].Erasure.Sets = nil
|
||||
continue
|
||||
}
|
||||
if errs[i*setDriveCount+j] == nil {
|
||||
newFormats[i][j].Erasure.This = formats[i*setDriveCount+j].Erasure.This
|
||||
newFormats[i][j].Erasure.Sets = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return newFormats
|
||||
|
@ -25,61 +25,6 @@ import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
// Test get offline/online uuids.
|
||||
func TestGetUUIDs(t *testing.T) {
|
||||
fmtV2 := newFormatErasureV3(4, 16, "CRCMOD")
|
||||
formats := make([]*formatErasureV3, 64)
|
||||
|
||||
for i := 0; i < 4; i++ {
|
||||
for j := 0; j < 16; j++ {
|
||||
newFormat := *fmtV2
|
||||
newFormat.Erasure.This = fmtV2.Erasure.Sets[i][j]
|
||||
formats[i*16+j] = &newFormat
|
||||
}
|
||||
}
|
||||
|
||||
gotCount := len(getOnlineUUIDs(fmtV2, formats))
|
||||
if gotCount != 64 {
|
||||
t.Errorf("Expected online count '64', got '%d'", gotCount)
|
||||
}
|
||||
|
||||
for i := 0; i < 4; i++ {
|
||||
for j := 0; j < 16; j++ {
|
||||
if j < 4 {
|
||||
formats[i*16+j] = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
gotCount = len(getOnlineUUIDs(fmtV2, formats))
|
||||
if gotCount != 48 {
|
||||
t.Errorf("Expected online count '48', got '%d'", gotCount)
|
||||
}
|
||||
|
||||
gotCount = len(getOfflineUUIDs(fmtV2, formats))
|
||||
if gotCount != 16 {
|
||||
t.Errorf("Expected offline count '16', got '%d'", gotCount)
|
||||
}
|
||||
|
||||
var errs []error
|
||||
for i := 0; i < 4*16; i++ {
|
||||
errs = append(errs, errUnformattedDisk)
|
||||
}
|
||||
|
||||
markUUIDsOffline(fmtV2, formats, errs)
|
||||
gotCount = 0
|
||||
for i := range fmtV2.Erasure.Sets {
|
||||
for j := range fmtV2.Erasure.Sets[i] {
|
||||
if fmtV2.Erasure.Sets[i][j] == offlineDiskUUID {
|
||||
gotCount++
|
||||
}
|
||||
}
|
||||
}
|
||||
if gotCount != 16 {
|
||||
t.Errorf("Expected offline count '16', got '%d'", gotCount)
|
||||
}
|
||||
}
|
||||
|
||||
// tests fixFormatErasureV3 - fix format.json on all disks.
|
||||
func TestFixFormatV3(t *testing.T) {
|
||||
erasureDirs, err := getRandomDisks(8)
|
||||
@ -480,6 +425,9 @@ func TestNewFormatSets(t *testing.T) {
|
||||
// Check if deployment IDs are preserved.
|
||||
for i := range newFormats {
|
||||
for j := range newFormats[i] {
|
||||
if newFormats[i][j] == nil {
|
||||
continue
|
||||
}
|
||||
if newFormats[i][j].ID != quorumFormat.ID {
|
||||
t.Fatal("Deployment id in the new format is lost")
|
||||
}
|
||||
|
@ -114,9 +114,9 @@ func (l *localLocker) Unlock(args dsync.LockArgs) (reply bool, err error) {
|
||||
return reply, fmt.Errorf("Unlock attempted on a read locked entity: %s", args.Resources)
|
||||
}
|
||||
for _, resource := range args.Resources {
|
||||
lri := l.lockMap[resource]
|
||||
if !l.removeEntry(resource, args, &lri) {
|
||||
return false, fmt.Errorf("Unlock unable to find corresponding lock for uid: %s on resource %s", args.UID, resource)
|
||||
lri, ok := l.lockMap[resource]
|
||||
if ok {
|
||||
l.removeEntry(resource, args, &lri)
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
@ -180,15 +180,13 @@ func (l *localLocker) RUnlock(args dsync.LockArgs) (reply bool, err error) {
|
||||
resource := args.Resources[0]
|
||||
if lri, reply = l.lockMap[resource]; !reply {
|
||||
// No lock is held on the given name
|
||||
return reply, fmt.Errorf("RUnlock attempted on an unlocked entity: %s", resource)
|
||||
return true, nil
|
||||
}
|
||||
if reply = !isWriteLock(lri); !reply {
|
||||
// A write-lock is held, cannot release a read lock
|
||||
return reply, fmt.Errorf("RUnlock attempted on a write locked entity: %s", resource)
|
||||
}
|
||||
if !l.removeEntry(resource, args, &lri) {
|
||||
return false, fmt.Errorf("RUnlock unable to find corresponding read lock for uid: %s", args.UID)
|
||||
}
|
||||
l.removeEntry(resource, args, &lri)
|
||||
return reply, nil
|
||||
}
|
||||
|
||||
@ -243,6 +241,9 @@ func (l *localLocker) Expired(ctx context.Context, args dsync.LockArgs) (expired
|
||||
// Similar to removeEntry but only removes an entry only if the lock entry exists in map.
|
||||
// Caller must hold 'l.mutex' lock.
|
||||
func (l *localLocker) removeEntryIfExists(nlrip nameLockRequesterInfoPair) {
|
||||
l.mutex.Lock()
|
||||
defer l.mutex.Unlock()
|
||||
|
||||
// Check if entry is still in map (could have been removed altogether by 'concurrent' (R)Unlock of last entry)
|
||||
if lri, ok := l.lockMap[nlrip.name]; ok {
|
||||
// Even if the entry exists, it may not be the same entry which was
|
||||
|
@ -33,7 +33,7 @@ import (
|
||||
|
||||
const (
|
||||
// Lock maintenance interval.
|
||||
lockMaintenanceInterval = 15 * time.Second
|
||||
lockMaintenanceInterval = 30 * time.Second
|
||||
|
||||
// Lock validity check interval.
|
||||
lockValidityCheckInterval = 5 * time.Second
|
||||
@ -311,12 +311,8 @@ func lockMaintenance(ctx context.Context, interval time.Duration) error {
|
||||
|
||||
// less than the quorum, we have locks expired.
|
||||
if nlripsMap[nlrip.name].locks < nlrip.lri.Quorum {
|
||||
// The lock is no longer active at server that originated
|
||||
// the lock, attempt to remove the lock.
|
||||
globalLockServers[lendpoint].mutex.Lock()
|
||||
// Purge the stale entry if it exists.
|
||||
globalLockServers[lendpoint].removeEntryIfExists(nlrip)
|
||||
globalLockServers[lendpoint].mutex.Unlock()
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -57,6 +57,18 @@ func isObjectDir(object string, size int64) bool {
|
||||
return HasSuffix(object, SlashSeparator) && size == 0
|
||||
}
|
||||
|
||||
func newStorageAPIWithoutHealthCheck(endpoint Endpoint) (storage StorageAPI, err error) {
|
||||
if endpoint.IsLocal {
|
||||
storage, err := newXLStorage(endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &xlStorageDiskIDCheck{storage: storage}, nil
|
||||
}
|
||||
|
||||
return newStorageRESTClient(endpoint, false), nil
|
||||
}
|
||||
|
||||
// Depending on the disk type network or local, initialize storage API.
|
||||
func newStorageAPI(endpoint Endpoint) (storage StorageAPI, err error) {
|
||||
if endpoint.IsLocal {
|
||||
@ -67,7 +79,7 @@ func newStorageAPI(endpoint Endpoint) (storage StorageAPI, err error) {
|
||||
return &xlStorageDiskIDCheck{storage: storage}, nil
|
||||
}
|
||||
|
||||
return newStorageRESTClient(endpoint), nil
|
||||
return newStorageRESTClient(endpoint, true), nil
|
||||
}
|
||||
|
||||
// Cleanup a directory recursively.
|
||||
|
@ -661,7 +661,7 @@ func (client *storageRESTClient) Close() error {
|
||||
}
|
||||
|
||||
// Returns a storage rest client.
|
||||
func newStorageRESTClient(endpoint Endpoint) *storageRESTClient {
|
||||
func newStorageRESTClient(endpoint Endpoint, healthcheck bool) *storageRESTClient {
|
||||
serverURL := &url.URL{
|
||||
Scheme: endpoint.Scheme,
|
||||
Host: endpoint.Host,
|
||||
@ -678,14 +678,16 @@ func newStorageRESTClient(endpoint Endpoint) *storageRESTClient {
|
||||
|
||||
trFn := newInternodeHTTPTransport(tlsConfig, rest.DefaultTimeout)
|
||||
restClient := rest.NewClient(serverURL, trFn, newAuthToken)
|
||||
restClient.HealthCheckFn = func() bool {
|
||||
ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout)
|
||||
// Instantiate a new rest client for healthcheck
|
||||
// to avoid recursive healthCheckFn()
|
||||
respBody, err := rest.NewClient(serverURL, trFn, newAuthToken).Call(ctx, storageRESTMethodHealth, nil, nil, -1)
|
||||
xhttp.DrainBody(respBody)
|
||||
cancel()
|
||||
return !errors.Is(err, context.DeadlineExceeded) && toStorageErr(err) != errDiskNotFound
|
||||
if healthcheck {
|
||||
restClient.HealthCheckFn = func() bool {
|
||||
ctx, cancel := context.WithTimeout(GlobalContext, restClient.HealthCheckTimeout)
|
||||
// Instantiate a new rest client for healthcheck
|
||||
// to avoid recursive healthCheckFn()
|
||||
respBody, err := rest.NewClient(serverURL, trFn, newAuthToken).Call(ctx, storageRESTMethodHealth, nil, nil, -1)
|
||||
xhttp.DrainBody(respBody)
|
||||
cancel()
|
||||
return !errors.Is(err, context.DeadlineExceeded) && toStorageErr(err) != errDiskNotFound
|
||||
}
|
||||
}
|
||||
|
||||
return &storageRESTClient{endpoint: endpoint, restClient: restClient}
|
||||
|
@ -454,7 +454,7 @@ func newStorageRESTHTTPServerClient(t *testing.T) (*httptest.Server, *storageRES
|
||||
globalServerConfig = newServerConfig()
|
||||
lookupConfigs(globalServerConfig, 0)
|
||||
|
||||
restClient := newStorageRESTClient(endpoint)
|
||||
restClient := newStorageRESTClient(endpoint, false)
|
||||
|
||||
return httpServer, restClient, prevGlobalServerConfig, endpointPath
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user