Add support for deployment ID (#6144)

deployment ID helps in identifying a minio deployment in the case of remote
logging targets.
This commit is contained in:
kannappanr
2018-07-18 20:17:35 -07:00
committed by GitHub
parent e8a008f5b5
commit 43cc0096fa
6 changed files with 314 additions and 11 deletions

View File

@@ -24,6 +24,7 @@ import (
"path"
"time"
"github.com/minio/minio/cmd/logger"
"github.com/minio/minio/pkg/lock"
)
@@ -55,11 +56,18 @@ type formatFSVersionDetect struct {
} `json:"fs"`
}
// Generic structure to manage both v1 and v2 structures
type formatFS struct {
formatMetaV1
FS interface{} `json:"fs"`
}
// Returns the latest "fs" format V1
func newFormatFSV1() (format *formatFSV1) {
f := &formatFSV1{}
f.Version = formatMetaVersionV1
f.Format = formatBackendFS
f.ID = mustGetUUID()
f.FS.Version = formatFSVersionV1
return f
}
@@ -69,6 +77,7 @@ func newFormatFSV2() (format *formatFSV2) {
f := &formatFSV2{}
f.Version = formatMetaVersionV1
f.Format = formatBackendFS
f.ID = mustGetUUID()
f.FS.Version = formatFSVersionV2
return f
}
@@ -115,7 +124,16 @@ func formatFSMigrateV1ToV2(ctx context.Context, wlk *lock.LockedFile, fsPath str
return err
}
return jsonSave(wlk.File, newFormatFSV2())
formatV1 := formatFSV1{}
if err = jsonLoad(wlk, &formatV1); err != nil {
return err
}
formatV2 := formatFSV2{}
formatV2.formatMetaV1 = formatV1.formatMetaV1
formatV2.FS.Version = formatFSVersionV2
return jsonSave(wlk.File, formatV2)
}
// Migrate the "fs" backend.
@@ -179,6 +197,12 @@ func createFormatFS(ctx context.Context, fsFormatPath string) error {
// migrate the backend when we are actively working on the backend.
func initFormatFS(ctx context.Context, fsPath string) (rlk *lock.RLockedFile, err error) {
fsFormatPath := pathJoin(fsPath, minioMetaBucket, formatConfigFile)
// Add a deployment ID, if it does not exist.
if err := formatFSFixDeploymentID(fsFormatPath); err != nil {
return nil, err
}
// Any read on format.json should be done with read-lock.
// Any write on format.json should be done with write-lock.
for {
@@ -235,7 +259,8 @@ func initFormatFS(ctx context.Context, fsPath string) (rlk *lock.RLockedFile, er
rlk.Close()
// Hold write lock during migration so that we do not disturb any
// minio processes running in parallel.
wlk, err := lock.TryLockedOpenFile(fsFormatPath, os.O_RDWR, 0)
var wlk *lock.LockedFile
wlk, err = lock.TryLockedOpenFile(fsFormatPath, os.O_RDWR, 0)
if err == lock.ErrAlreadyLocked {
// Lock already present, sleep and attempt again.
time.Sleep(100 * time.Millisecond)
@@ -253,7 +278,94 @@ func initFormatFS(ctx context.Context, fsPath string) (rlk *lock.RLockedFile, er
// Successfully migrated, now try to hold a read-lock on format.json
continue
}
var id string
if id, err = formatFSGetDeploymentID(rlk); err != nil {
rlk.Close()
return nil, err
}
logger.SetDeploymentID(id)
return rlk, nil
}
}
func formatFSGetDeploymentID(rlk *lock.RLockedFile) (id string, err error) {
format := &formatFS{}
if err := jsonLoad(rlk, format); err != nil {
return "", err
}
return format.ID, nil
}
// Generate a deployment ID if one does not exist already.
func formatFSFixDeploymentID(fsFormatPath string) error {
rlk, err := lock.RLockedOpenFile(fsFormatPath)
if err == nil {
// format.json can be empty in a rare condition when another
// minio process just created the file but could not hold lock
// and write to it.
var fi os.FileInfo
fi, err = rlk.Stat()
if err != nil {
rlk.Close()
return err
}
if fi.Size() == 0 {
rlk.Close()
return nil
}
}
if os.IsNotExist(err) {
return nil
}
if err != nil {
return err
}
formatBackend, err := formatMetaGetFormatBackendFS(rlk)
if err != nil {
rlk.Close()
return err
}
if formatBackend != formatBackendFS {
rlk.Close()
return fmt.Errorf(`%s file: expected format-type: %s, found: %s`, formatConfigFile, formatBackendFS, formatBackend)
}
format := &formatFS{}
err = jsonLoad(rlk, format)
rlk.Close()
if err != nil {
return err
}
// Check if it needs to be updated
if format.ID != "" {
return nil
}
for {
wlk, err := lock.TryLockedOpenFile(fsFormatPath, os.O_RDWR, 0)
if err == lock.ErrAlreadyLocked {
// Lock already present, sleep and attempt again.
time.Sleep(100 * time.Millisecond)
continue
}
if err != nil {
return err
}
defer wlk.Close()
err = jsonLoad(wlk, format)
if err != nil {
return err
}
// Check if it needs to be updated
if format.ID != "" {
return nil
}
format.ID = mustGetUUID()
return jsonSave(wlk, format)
}
}