Add sync mode for 'xl.json' (#6798)

xl.json is the source of truth for all erasure
coded objects, without which we won't be able to
read the objects properly. This PR enables sync
mode for writing `xl.json` such all writes go hit
the disk and are persistent under situations such
as abrupt power failures on servers running Minio.
This commit is contained in:
Harshavardhana 2018-11-14 06:18:35 -08:00 committed by Nitish Tiwari
parent cf26c937e4
commit f1f23f6f11
7 changed files with 87 additions and 8 deletions

View File

@ -159,6 +159,13 @@ func (d *naughtyDisk) DeleteFile(volume string, path string) (err error) {
return d.disk.DeleteFile(volume, path)
}
func (d *naughtyDisk) WriteAll(volume string, path string, buf []byte) (err error) {
if err := d.calcError(); err != nil {
return err
}
return d.disk.WriteAll(volume, path, buf)
}
func (d *naughtyDisk) ReadAll(volume string, path string) (buf []byte, err error) {
if err := d.calcError(); err != nil {
return nil, err

View File

@ -847,7 +847,7 @@ func (s *posix) ReadFile(volume, path string, offset int64, buffer []byte, verif
return int64(len(buffer)), nil
}
func (s *posix) createFile(volume, path string) (f *os.File, err error) {
func (s *posix) openFile(volume, path string, mode int) (f *os.File, err error) {
defer func() {
if err == errFaultyDisk {
atomic.AddInt32(&s.ioErrCount, 1)
@ -896,7 +896,7 @@ func (s *posix) createFile(volume, path string) (f *os.File, err error) {
}
}
w, err := os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
w, err := os.OpenFile(filePath, mode, 0666)
if err != nil {
// File path cannot be verified since one of the parents is a file.
switch {
@ -941,7 +941,7 @@ func (s *posix) PrepareFile(volume, path string, fileSize int64) (err error) {
}
// Create file if not found
w, err := s.createFile(volume, path)
w, err := s.openFile(volume, path, os.O_CREATE|os.O_APPEND|os.O_WRONLY)
if err != nil {
return err
}
@ -972,6 +972,30 @@ func (s *posix) PrepareFile(volume, path string, fileSize int64) (err error) {
return nil
}
func (s *posix) WriteAll(volume, path string, buf []byte) (err error) {
defer func() {
if err == errFaultyDisk {
atomic.AddInt32(&s.ioErrCount, 1)
}
}()
if atomic.LoadInt32(&s.ioErrCount) > maxAllowedIOError {
return errFaultyDisk
}
// Create file if not found
w, err := s.openFile(volume, path, os.O_CREATE|os.O_SYNC|os.O_WRONLY)
if err != nil {
return err
}
if _, err = w.Write(buf); err != nil {
return err
}
return w.Close()
}
// AppendFile - append a byte array at path, if file doesn't exist at
// path this call explicitly creates it.
func (s *posix) AppendFile(volume, path string, buf []byte) (err error) {
@ -986,13 +1010,16 @@ func (s *posix) AppendFile(volume, path string, buf []byte) (err error) {
}
// Create file if not found
w, err := s.createFile(volume, path)
w, err := s.openFile(volume, path, os.O_CREATE|os.O_APPEND|os.O_WRONLY)
if err != nil {
return err
}
_, err = w.Write(buf)
w.Close()
return err
if _, err = w.Write(buf); err != nil {
return err
}
return w.Close()
}
// StatFile - get file info.

View File

@ -47,6 +47,9 @@ type StorageAPI interface {
StatFile(volume string, path string) (file FileInfo, err error)
DeleteFile(volume string, path string) (err error)
// Write all data, syncs the data to disk.
WriteAll(volume string, path string, buf []byte) (err error)
// Read all.
ReadAll(volume string, path string) (buf []byte, err error)
}

View File

@ -221,6 +221,17 @@ func (client *storageRESTClient) AppendFile(volume, path string, buffer []byte)
return err
}
// WriteAll - write all data to a file.
func (client *storageRESTClient) WriteAll(volume, path string, buffer []byte) error {
values := make(url.Values)
values.Set(storageRESTVolume, volume)
values.Set(storageRESTFilePath, path)
reader := bytes.NewBuffer(buffer)
respBody, err := client.call(storageRESTMethodWriteAll, values, reader)
defer CloseResponse(respBody)
return err
}
// StatFile - stat a file.
func (client *storageRESTClient) StatFile(volume, path string) (info FileInfo, err error) {
values := make(url.Values)

View File

@ -28,6 +28,7 @@ const (
storageRESTMethodPrepareFile = "preparefile"
storageRESTMethodAppendFile = "appendfile"
storageRESTMethodWriteAll = "writeall"
storageRESTMethodStatFile = "statfile"
storageRESTMethodReadAll = "readall"
storageRESTMethodReadFile = "readfile"

View File

@ -179,6 +179,33 @@ func (s *storageRESTServer) AppendFileHandler(w http.ResponseWriter, r *http.Req
}
}
// WriteAllHandler - write to file all content.
func (s *storageRESTServer) WriteAllHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
return
}
vars := mux.Vars(r)
volume := vars[storageRESTVolume]
filePath := vars[storageRESTFilePath]
if r.ContentLength < 0 {
s.writeErrorResponse(w, errInvalidArgument)
return
}
buf := make([]byte, r.ContentLength)
_, err := io.ReadFull(r.Body, buf)
if err != nil {
s.writeErrorResponse(w, err)
return
}
err = s.storage.WriteAll(volume, filePath, buf)
if err != nil {
s.writeErrorResponse(w, err)
}
}
// StatFileHandler - stat a file.
func (s *storageRESTServer) StatFileHandler(w http.ResponseWriter, r *http.Request) {
if !s.IsValid(w, r) {
@ -336,6 +363,8 @@ func registerStorageRESTHandlers(router *mux.Router, endpoints EndpointList) {
Queries(restQueries(storageRESTVolume, storageRESTFilePath, storageRESTLength)...)
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodAppendFile).HandlerFunc(httpTraceHdrs(server.AppendFileHandler)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodWriteAll).HandlerFunc(httpTraceHdrs(server.WriteAllHandler)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodStatFile).HandlerFunc(httpTraceHdrs(server.StatFileHandler)).
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
subrouter.Methods(http.MethodPost).Path("/" + storageRESTMethodReadAll).HandlerFunc(httpTraceHdrs(server.ReadAllHandler)).

View File

@ -423,8 +423,9 @@ func writeXLMetadata(ctx context.Context, disk StorageAPI, bucket, prefix string
logger.LogIf(ctx, err)
return err
}
// Persist marshaled data.
err = disk.AppendFile(bucket, jsonFile, metadataBytes)
err = disk.WriteAll(bucket, jsonFile, metadataBytes)
logger.LogIf(ctx, err)
return err
}