mirror of
https://github.com/minio/minio.git
synced 2024-12-25 06:35:56 -05:00
b890bbfa63
The main goal of this PR is to solve the situation where disks stop responding to operations. This generally causes an FD build-up and eventually will crash the server. This adds detection of hung disks, where calls on disk get stuck. We add functionality to `xlStorageDiskIDCheck` where it keeps track of the number of concurrent requests on a given disk. A total number of 100 operations are allowed. If this limit is reached we will block (but not reject) new requests, but we will monitor the state of the disk. If no requests have been completed or updated within a 15-second window, we mark the disk as offline. Requests that are blocked will be unblocked and return an error as "faulty disk". New requests will be rejected until the disk is marked OK again. Once a disk has been marked faulty, a check will run every 5 seconds that will attempt to write and read back a file. As long as this fails the disk will remain faulty. To prevent lots of long-running requests to mark the disk faulty we implement a callback feature that allows updating the status as parts of these operations are running. We add a reader and writer wrapper that will update the status of each successful read/write operation. This should allow fine enough granularity that a slow, but still operational disk will not reach 15 seconds where 50 operations have not progressed. Note that errors themselves are not enough to mark a disk faulty. A nil (or io.EOF) error will mark a disk as "good". * Make concurrent disk setting configurable via `_MINIO_DISK_MAX_CONCURRENT`. * de-couple IsOnline() from disk health tracker The purpose of IsOnline() is to ensure that we reconnect the drive only when the "drive" was - disconnected from network we need to validate if the drive is "correct" and is the same drive which belongs to this server. - drive was replaced we have to format it - we support hot swapping of the drives. IsOnline() is not meant for taking the drive offline when it is hung, it is not useful we can let the drive be online instead "return" errors for relevant calls. * return errFaultyDisk for DiskInfo() call Co-authored-by: Harshavardhana <harsha@minio.io> Possible future Improvements: * Unify the REST server and local xlStorageDiskIDCheck. This would also improve stats significantly. * Allow reads/writes to be aborted by the context. * Add usage stats, concurrent count, blocked operations, etc.
1274 lines
36 KiB
Go
1274 lines
36 KiB
Go
// Copyright (c) 2015-2021 MinIO, Inc.
|
|
//
|
|
// This file is part of MinIO Object Storage stack
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/binary"
|
|
"encoding/gob"
|
|
"encoding/hex"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"os/user"
|
|
"path"
|
|
"runtime/debug"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/tinylib/msgp/msgp"
|
|
|
|
jwtreq "github.com/golang-jwt/jwt/v4/request"
|
|
"github.com/gorilla/mux"
|
|
"github.com/minio/minio/internal/config"
|
|
xhttp "github.com/minio/minio/internal/http"
|
|
xioutil "github.com/minio/minio/internal/ioutil"
|
|
xjwt "github.com/minio/minio/internal/jwt"
|
|
"github.com/minio/minio/internal/logger"
|
|
xnet "github.com/minio/pkg/net"
|
|
)
|
|
|
|
var errDiskStale = errors.New("disk stale")
|
|
|
|
// To abstract a disk over network.
|
|
type storageRESTServer struct {
|
|
storage *xlStorageDiskIDCheck
|
|
}
|
|
|
|
func (s *storageRESTServer) writeErrorResponse(w http.ResponseWriter, err error) {
|
|
if errors.Is(err, errDiskStale) {
|
|
w.WriteHeader(http.StatusPreconditionFailed)
|
|
} else {
|
|
w.WriteHeader(http.StatusForbidden)
|
|
}
|
|
w.Write([]byte(err.Error()))
|
|
}
|
|
|
|
// DefaultSkewTime - skew time is 15 minutes between minio peers.
|
|
const DefaultSkewTime = 15 * time.Minute
|
|
|
|
// Authenticates storage client's requests and validates for skewed time.
|
|
func storageServerRequestValidate(r *http.Request) error {
|
|
token, err := jwtreq.AuthorizationHeaderExtractor.ExtractToken(r)
|
|
if err != nil {
|
|
if err == jwtreq.ErrNoTokenInRequest {
|
|
return errNoAuthToken
|
|
}
|
|
return err
|
|
}
|
|
|
|
claims := xjwt.NewStandardClaims()
|
|
if err = xjwt.ParseWithStandardClaims(token, claims, []byte(globalActiveCred.SecretKey)); err != nil {
|
|
return errAuthentication
|
|
}
|
|
|
|
owner := claims.AccessKey == globalActiveCred.AccessKey || claims.Subject == globalActiveCred.AccessKey
|
|
if !owner {
|
|
return errAuthentication
|
|
}
|
|
|
|
if claims.Audience != r.URL.RawQuery {
|
|
return errAuthentication
|
|
}
|
|
|
|
requestTimeStr := r.Header.Get("X-Minio-Time")
|
|
requestTime, err := time.Parse(time.RFC3339, requestTimeStr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
utcNow := UTCNow()
|
|
delta := requestTime.Sub(utcNow)
|
|
if delta < 0 {
|
|
delta *= -1
|
|
}
|
|
if delta > DefaultSkewTime {
|
|
return fmt.Errorf("client time %v is too apart with server time %v", requestTime, utcNow)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// IsValid - To authenticate and verify the time difference.
|
|
func (s *storageRESTServer) IsAuthValid(w http.ResponseWriter, r *http.Request) bool {
|
|
if s.storage == nil {
|
|
s.writeErrorResponse(w, errDiskNotFound)
|
|
return false
|
|
}
|
|
|
|
if err := storageServerRequestValidate(r); err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// IsValid - To authenticate and check if the disk-id in the request corresponds to the underlying disk.
|
|
func (s *storageRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool {
|
|
if !s.IsAuthValid(w, r) {
|
|
return false
|
|
}
|
|
|
|
if err := r.ParseForm(); err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return false
|
|
}
|
|
|
|
diskID := r.Form.Get(storageRESTDiskID)
|
|
if diskID == "" {
|
|
// Request sent empty disk-id, we allow the request
|
|
// as the peer might be coming up and trying to read format.json
|
|
// or create format.json
|
|
return true
|
|
}
|
|
|
|
storedDiskID, err := s.storage.GetDiskID()
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return false
|
|
}
|
|
|
|
if diskID != storedDiskID {
|
|
s.writeErrorResponse(w, errDiskStale)
|
|
return false
|
|
}
|
|
|
|
// If format.json is available and request sent the right disk-id, we allow the request
|
|
return true
|
|
}
|
|
|
|
// HealthHandler handler checks if disk is stale
|
|
func (s *storageRESTServer) HealthHandler(w http.ResponseWriter, r *http.Request) {
|
|
s.IsValid(w, r)
|
|
}
|
|
|
|
// DiskInfoHandler - returns disk info.
|
|
func (s *storageRESTServer) DiskInfoHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsAuthValid(w, r) {
|
|
return
|
|
}
|
|
info, err := s.storage.DiskInfo(r.Context())
|
|
if err != nil {
|
|
info.Error = err.Error()
|
|
}
|
|
logger.LogIf(r.Context(), msgp.Encode(w, &info))
|
|
}
|
|
|
|
func (s *storageRESTServer) NSScannerHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
return
|
|
}
|
|
|
|
setEventStreamHeaders(w)
|
|
|
|
var cache dataUsageCache
|
|
err := cache.deserialize(r.Body)
|
|
if err != nil {
|
|
logger.LogIf(r.Context(), err)
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(r.Context())
|
|
defer cancel()
|
|
resp := streamHTTPResponse(w)
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
debug.PrintStack()
|
|
resp.CloseWithError(fmt.Errorf("panic: %v", r))
|
|
}
|
|
}()
|
|
respW := msgp.NewWriter(resp)
|
|
|
|
// Collect updates, stream them before the full cache is sent.
|
|
updates := make(chan dataUsageEntry, 1)
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for update := range updates {
|
|
// Write true bool to indicate update.
|
|
var err error
|
|
if err = respW.WriteBool(true); err == nil {
|
|
err = update.EncodeMsg(respW)
|
|
}
|
|
respW.Flush()
|
|
if err != nil {
|
|
cancel()
|
|
resp.CloseWithError(err)
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
usageInfo, err := s.storage.NSScanner(ctx, cache, updates)
|
|
if err != nil {
|
|
respW.Flush()
|
|
resp.CloseWithError(err)
|
|
return
|
|
}
|
|
|
|
// Write false bool to indicate we finished.
|
|
wg.Wait()
|
|
if err = respW.WriteBool(false); err == nil {
|
|
err = usageInfo.EncodeMsg(respW)
|
|
}
|
|
if err != nil {
|
|
resp.CloseWithError(err)
|
|
return
|
|
}
|
|
resp.CloseWithError(respW.Flush())
|
|
}
|
|
|
|
// MakeVolHandler - make a volume.
|
|
func (s *storageRESTServer) MakeVolHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
return
|
|
}
|
|
volume := r.Form.Get(storageRESTVolume)
|
|
err := s.storage.MakeVol(r.Context(), volume)
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
}
|
|
}
|
|
|
|
// MakeVolBulkHandler - create multiple volumes as a bulk operation.
|
|
func (s *storageRESTServer) MakeVolBulkHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
return
|
|
}
|
|
volumes := strings.Split(r.Form.Get(storageRESTVolumes), ",")
|
|
err := s.storage.MakeVolBulk(r.Context(), volumes...)
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
}
|
|
}
|
|
|
|
// ListVolsHandler - list volumes.
|
|
func (s *storageRESTServer) ListVolsHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
return
|
|
}
|
|
infos, err := s.storage.ListVols(r.Context())
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
logger.LogIf(r.Context(), msgp.Encode(w, VolsInfo(infos)))
|
|
}
|
|
|
|
// StatVolHandler - stat a volume.
|
|
func (s *storageRESTServer) StatVolHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
return
|
|
}
|
|
volume := r.Form.Get(storageRESTVolume)
|
|
info, err := s.storage.StatVol(r.Context(), volume)
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
logger.LogIf(r.Context(), msgp.Encode(w, &info))
|
|
}
|
|
|
|
// DeleteVolumeHandler - delete a volume.
|
|
func (s *storageRESTServer) DeleteVolHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
return
|
|
}
|
|
volume := r.Form.Get(storageRESTVolume)
|
|
forceDelete := r.Form.Get(storageRESTForceDelete) == "true"
|
|
err := s.storage.DeleteVol(r.Context(), volume, forceDelete)
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
}
|
|
}
|
|
|
|
// AppendFileHandler - append data from the request to the file specified.
|
|
func (s *storageRESTServer) AppendFileHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
return
|
|
}
|
|
volume := r.Form.Get(storageRESTVolume)
|
|
filePath := r.Form.Get(storageRESTFilePath)
|
|
|
|
buf := make([]byte, r.ContentLength)
|
|
_, err := io.ReadFull(r.Body, buf)
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
err = s.storage.AppendFile(r.Context(), volume, filePath, buf)
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
}
|
|
}
|
|
|
|
// CreateFileHandler - copy the contents from the request.
|
|
func (s *storageRESTServer) CreateFileHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
return
|
|
}
|
|
volume := r.Form.Get(storageRESTVolume)
|
|
filePath := r.Form.Get(storageRESTFilePath)
|
|
|
|
fileSizeStr := r.Form.Get(storageRESTLength)
|
|
fileSize, err := strconv.Atoi(fileSizeStr)
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
|
|
done, body := keepHTTPReqResponseAlive(w, r)
|
|
done(s.storage.CreateFile(r.Context(), volume, filePath, int64(fileSize), body))
|
|
}
|
|
|
|
// DeleteVersion delete updated metadata.
|
|
func (s *storageRESTServer) DeleteVersionHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
return
|
|
}
|
|
volume := r.Form.Get(storageRESTVolume)
|
|
filePath := r.Form.Get(storageRESTFilePath)
|
|
forceDelMarker, err := strconv.ParseBool(r.Form.Get(storageRESTForceDelMarker))
|
|
if err != nil {
|
|
s.writeErrorResponse(w, errInvalidArgument)
|
|
return
|
|
}
|
|
|
|
if r.ContentLength < 0 {
|
|
s.writeErrorResponse(w, errInvalidArgument)
|
|
return
|
|
}
|
|
|
|
var fi FileInfo
|
|
if err := msgp.Decode(r.Body, &fi); err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
|
|
err = s.storage.DeleteVersion(r.Context(), volume, filePath, fi, forceDelMarker)
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
}
|
|
}
|
|
|
|
// ReadVersion read metadata of versionID
|
|
func (s *storageRESTServer) ReadVersionHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
return
|
|
}
|
|
volume := r.Form.Get(storageRESTVolume)
|
|
filePath := r.Form.Get(storageRESTFilePath)
|
|
versionID := r.Form.Get(storageRESTVersionID)
|
|
readData, err := strconv.ParseBool(r.Form.Get(storageRESTReadData))
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
|
|
fi, err := s.storage.ReadVersion(r.Context(), volume, filePath, versionID, readData)
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
|
|
logger.LogIf(r.Context(), msgp.Encode(w, &fi))
|
|
}
|
|
|
|
// WriteMetadata write new updated metadata.
|
|
func (s *storageRESTServer) WriteMetadataHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
return
|
|
}
|
|
volume := r.Form.Get(storageRESTVolume)
|
|
filePath := r.Form.Get(storageRESTFilePath)
|
|
|
|
if r.ContentLength < 0 {
|
|
s.writeErrorResponse(w, errInvalidArgument)
|
|
return
|
|
}
|
|
|
|
var fi FileInfo
|
|
if err := msgp.Decode(r.Body, &fi); err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
|
|
err := s.storage.WriteMetadata(r.Context(), volume, filePath, fi)
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
}
|
|
}
|
|
|
|
// UpdateMetadata update new updated metadata.
|
|
func (s *storageRESTServer) UpdateMetadataHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
return
|
|
}
|
|
volume := r.Form.Get(storageRESTVolume)
|
|
filePath := r.Form.Get(storageRESTFilePath)
|
|
|
|
if r.ContentLength < 0 {
|
|
s.writeErrorResponse(w, errInvalidArgument)
|
|
return
|
|
}
|
|
|
|
var fi FileInfo
|
|
if err := msgp.Decode(r.Body, &fi); err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
|
|
err := s.storage.UpdateMetadata(r.Context(), volume, filePath, fi)
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
}
|
|
}
|
|
|
|
// WriteAllHandler - write to file all content.
|
|
func (s *storageRESTServer) WriteAllHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
return
|
|
}
|
|
volume := r.Form.Get(storageRESTVolume)
|
|
filePath := r.Form.Get(storageRESTFilePath)
|
|
|
|
if r.ContentLength < 0 {
|
|
s.writeErrorResponse(w, errInvalidArgument)
|
|
return
|
|
}
|
|
tmp := make([]byte, r.ContentLength)
|
|
_, err := io.ReadFull(r.Body, tmp)
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
err = s.storage.WriteAll(r.Context(), volume, filePath, tmp)
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
}
|
|
}
|
|
|
|
// CheckPartsHandler - check if a file metadata exists.
|
|
func (s *storageRESTServer) CheckPartsHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
return
|
|
}
|
|
volume := r.Form.Get(storageRESTVolume)
|
|
filePath := r.Form.Get(storageRESTFilePath)
|
|
|
|
if r.ContentLength < 0 {
|
|
s.writeErrorResponse(w, errInvalidArgument)
|
|
return
|
|
}
|
|
|
|
var fi FileInfo
|
|
if err := msgp.Decode(r.Body, &fi); err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
|
|
if err := s.storage.CheckParts(r.Context(), volume, filePath, fi); err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
}
|
|
}
|
|
|
|
// ReadAllHandler - read all the contents of a file.
|
|
func (s *storageRESTServer) ReadAllHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
return
|
|
}
|
|
volume := r.Form.Get(storageRESTVolume)
|
|
filePath := r.Form.Get(storageRESTFilePath)
|
|
|
|
buf, err := s.storage.ReadAll(r.Context(), volume, filePath)
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
// Reuse after return.
|
|
defer metaDataPoolPut(buf)
|
|
w.Header().Set(xhttp.ContentLength, strconv.Itoa(len(buf)))
|
|
w.Write(buf)
|
|
}
|
|
|
|
// ReadFileHandler - read section of a file.
|
|
func (s *storageRESTServer) ReadFileHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
return
|
|
}
|
|
volume := r.Form.Get(storageRESTVolume)
|
|
filePath := r.Form.Get(storageRESTFilePath)
|
|
offset, err := strconv.Atoi(r.Form.Get(storageRESTOffset))
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
length, err := strconv.Atoi(r.Form.Get(storageRESTLength))
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
if offset < 0 || length < 0 {
|
|
s.writeErrorResponse(w, errInvalidArgument)
|
|
return
|
|
}
|
|
var verifier *BitrotVerifier
|
|
if r.Form.Get(storageRESTBitrotAlgo) != "" {
|
|
hashStr := r.Form.Get(storageRESTBitrotHash)
|
|
var hash []byte
|
|
hash, err = hex.DecodeString(hashStr)
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
verifier = NewBitrotVerifier(BitrotAlgorithmFromString(r.Form.Get(storageRESTBitrotAlgo)), hash)
|
|
}
|
|
buf := make([]byte, length)
|
|
defer metaDataPoolPut(buf) // Reuse if we can.
|
|
_, err = s.storage.ReadFile(r.Context(), volume, filePath, int64(offset), buf, verifier)
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
w.Header().Set(xhttp.ContentLength, strconv.Itoa(len(buf)))
|
|
w.Write(buf)
|
|
}
|
|
|
|
// ReadFileHandler - read section of a file.
|
|
func (s *storageRESTServer) ReadFileStreamHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
return
|
|
}
|
|
volume := r.Form.Get(storageRESTVolume)
|
|
filePath := r.Form.Get(storageRESTFilePath)
|
|
offset, err := strconv.Atoi(r.Form.Get(storageRESTOffset))
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
length, err := strconv.Atoi(r.Form.Get(storageRESTLength))
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
|
|
rc, err := s.storage.ReadFileStream(r.Context(), volume, filePath, int64(offset), int64(length))
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
defer rc.Close()
|
|
|
|
w.Header().Set(xhttp.ContentLength, strconv.Itoa(length))
|
|
if _, err = xioutil.Copy(w, rc); err != nil {
|
|
if !xnet.IsNetworkOrHostDown(err, true) { // do not need to log disconnected clients
|
|
logger.LogIf(r.Context(), err)
|
|
}
|
|
return
|
|
}
|
|
}
|
|
|
|
// ListDirHandler - list a directory.
|
|
func (s *storageRESTServer) ListDirHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
return
|
|
}
|
|
volume := r.Form.Get(storageRESTVolume)
|
|
dirPath := r.Form.Get(storageRESTDirPath)
|
|
count, err := strconv.Atoi(r.Form.Get(storageRESTCount))
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
|
|
entries, err := s.storage.ListDir(r.Context(), volume, dirPath, count)
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
gob.NewEncoder(w).Encode(&entries)
|
|
}
|
|
|
|
// DeleteFileHandler - delete a file.
|
|
func (s *storageRESTServer) DeleteFileHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
return
|
|
}
|
|
volume := r.Form.Get(storageRESTVolume)
|
|
filePath := r.Form.Get(storageRESTFilePath)
|
|
recursive, err := strconv.ParseBool(r.Form.Get(storageRESTRecursive))
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
|
|
err = s.storage.Delete(r.Context(), volume, filePath, recursive)
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
}
|
|
}
|
|
|
|
// DeleteVersionsErrsResp - collection of delete errors
|
|
// for bulk version deletes
|
|
type DeleteVersionsErrsResp struct {
|
|
Errs []error
|
|
}
|
|
|
|
// DeleteVersionsHandler - delete a set of a versions.
|
|
func (s *storageRESTServer) DeleteVersionsHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
return
|
|
}
|
|
|
|
volume := r.Form.Get(storageRESTVolume)
|
|
totalVersions, err := strconv.Atoi(r.Form.Get(storageRESTTotalVersions))
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
|
|
versions := make([]FileInfoVersions, totalVersions)
|
|
decoder := msgp.NewReader(r.Body)
|
|
for i := 0; i < totalVersions; i++ {
|
|
dst := &versions[i]
|
|
if err := dst.DecodeMsg(decoder); err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
}
|
|
|
|
dErrsResp := &DeleteVersionsErrsResp{Errs: make([]error, totalVersions)}
|
|
|
|
setEventStreamHeaders(w)
|
|
encoder := gob.NewEncoder(w)
|
|
done := keepHTTPResponseAlive(w)
|
|
errs := s.storage.DeleteVersions(r.Context(), volume, versions)
|
|
done(nil)
|
|
for idx := range versions {
|
|
if errs[idx] != nil {
|
|
dErrsResp.Errs[idx] = StorageErr(errs[idx].Error())
|
|
}
|
|
}
|
|
encoder.Encode(dErrsResp)
|
|
}
|
|
|
|
// RenameDataHandler - renames a meta object and data dir to destination.
|
|
func (s *storageRESTServer) RenameDataHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
return
|
|
}
|
|
|
|
srcVolume := r.Form.Get(storageRESTSrcVolume)
|
|
srcFilePath := r.Form.Get(storageRESTSrcPath)
|
|
dstVolume := r.Form.Get(storageRESTDstVolume)
|
|
dstFilePath := r.Form.Get(storageRESTDstPath)
|
|
|
|
if r.ContentLength < 0 {
|
|
s.writeErrorResponse(w, errInvalidArgument)
|
|
return
|
|
}
|
|
|
|
var fi FileInfo
|
|
if err := msgp.Decode(r.Body, &fi); err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
|
|
err := s.storage.RenameData(r.Context(), srcVolume, srcFilePath, fi, dstVolume, dstFilePath)
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
}
|
|
}
|
|
|
|
// RenameFileHandler - rename a file.
|
|
func (s *storageRESTServer) RenameFileHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
return
|
|
}
|
|
srcVolume := r.Form.Get(storageRESTSrcVolume)
|
|
srcFilePath := r.Form.Get(storageRESTSrcPath)
|
|
dstVolume := r.Form.Get(storageRESTDstVolume)
|
|
dstFilePath := r.Form.Get(storageRESTDstPath)
|
|
err := s.storage.RenameFile(r.Context(), srcVolume, srcFilePath, dstVolume, dstFilePath)
|
|
if err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
}
|
|
}
|
|
|
|
// closeNotifier is itself a ReadCloser that will notify when either an error occurs or
|
|
// the Close() function is called.
|
|
type closeNotifier struct {
|
|
rc io.ReadCloser
|
|
done chan struct{}
|
|
}
|
|
|
|
func (c *closeNotifier) Read(p []byte) (n int, err error) {
|
|
n, err = c.rc.Read(p)
|
|
if err != nil {
|
|
if c.done != nil {
|
|
close(c.done)
|
|
c.done = nil
|
|
}
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
func (c *closeNotifier) Close() error {
|
|
if c.done != nil {
|
|
close(c.done)
|
|
c.done = nil
|
|
}
|
|
return c.rc.Close()
|
|
}
|
|
|
|
// keepHTTPReqResponseAlive can be used to avoid timeouts with long storage
|
|
// operations, such as bitrot verification or data usage scanning.
|
|
// Every 10 seconds a space character is sent.
|
|
// keepHTTPReqResponseAlive will wait for the returned body to be read before starting the ticker.
|
|
// The returned function should always be called to release resources.
|
|
// An optional error can be sent which will be picked as text only error,
|
|
// without its original type by the receiver.
|
|
// waitForHTTPResponse should be used to the receiving side.
|
|
func keepHTTPReqResponseAlive(w http.ResponseWriter, r *http.Request) (resp func(error), body io.ReadCloser) {
|
|
bodyDoneCh := make(chan struct{})
|
|
doneCh := make(chan error)
|
|
ctx := r.Context()
|
|
go func() {
|
|
canWrite := true
|
|
write := func(b []byte) {
|
|
if canWrite {
|
|
n, err := w.Write(b)
|
|
if err != nil || n != len(b) {
|
|
canWrite = false
|
|
}
|
|
}
|
|
}
|
|
// Wait for body to be read.
|
|
select {
|
|
case <-ctx.Done():
|
|
case <-bodyDoneCh:
|
|
case err := <-doneCh:
|
|
if err != nil {
|
|
write([]byte{1})
|
|
write([]byte(err.Error()))
|
|
} else {
|
|
write([]byte{0})
|
|
}
|
|
close(doneCh)
|
|
return
|
|
}
|
|
defer close(doneCh)
|
|
// Initiate ticker after body has been read.
|
|
ticker := time.NewTicker(time.Second * 10)
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
// Response not ready, write a filler byte.
|
|
write([]byte{32})
|
|
if canWrite {
|
|
w.(http.Flusher).Flush()
|
|
}
|
|
case err := <-doneCh:
|
|
if err != nil {
|
|
write([]byte{1})
|
|
write([]byte(err.Error()))
|
|
} else {
|
|
write([]byte{0})
|
|
}
|
|
ticker.Stop()
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
return func(err error) {
|
|
if doneCh == nil {
|
|
return
|
|
}
|
|
|
|
// Indicate we are ready to write.
|
|
doneCh <- err
|
|
|
|
// Wait for channel to be closed so we don't race on writes.
|
|
<-doneCh
|
|
|
|
// Clear so we can be called multiple times without crashing.
|
|
doneCh = nil
|
|
}, &closeNotifier{rc: r.Body, done: bodyDoneCh}
|
|
}
|
|
|
|
// keepHTTPResponseAlive can be used to avoid timeouts with long storage
|
|
// operations, such as bitrot verification or data usage scanning.
|
|
// keepHTTPResponseAlive may NOT be used until the request body has been read,
|
|
// use keepHTTPReqResponseAlive instead.
|
|
// Every 10 seconds a space character is sent.
|
|
// The returned function should always be called to release resources.
|
|
// An optional error can be sent which will be picked as text only error,
|
|
// without its original type by the receiver.
|
|
// waitForHTTPResponse should be used to the receiving side.
|
|
func keepHTTPResponseAlive(w http.ResponseWriter) func(error) {
|
|
doneCh := make(chan error)
|
|
go func() {
|
|
canWrite := true
|
|
write := func(b []byte) {
|
|
if canWrite {
|
|
n, err := w.Write(b)
|
|
if err != nil || n != len(b) {
|
|
canWrite = false
|
|
}
|
|
}
|
|
}
|
|
defer close(doneCh)
|
|
ticker := time.NewTicker(time.Second * 10)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
// Response not ready, write a filler byte.
|
|
write([]byte{32})
|
|
if canWrite {
|
|
w.(http.Flusher).Flush()
|
|
}
|
|
case err := <-doneCh:
|
|
if err != nil {
|
|
write([]byte{1})
|
|
write([]byte(err.Error()))
|
|
} else {
|
|
write([]byte{0})
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
return func(err error) {
|
|
if doneCh == nil {
|
|
return
|
|
}
|
|
// Indicate we are ready to write.
|
|
doneCh <- err
|
|
|
|
// Wait for channel to be closed so we don't race on writes.
|
|
<-doneCh
|
|
|
|
// Clear so we can be called multiple times without crashing.
|
|
doneCh = nil
|
|
}
|
|
}
|
|
|
|
// waitForHTTPResponse will wait for responses where keepHTTPResponseAlive
|
|
// has been used.
|
|
// The returned reader contains the payload.
|
|
func waitForHTTPResponse(respBody io.Reader) (io.Reader, error) {
|
|
reader := bufio.NewReader(respBody)
|
|
for {
|
|
b, err := reader.ReadByte()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Check if we have a response ready or a filler byte.
|
|
switch b {
|
|
case 0:
|
|
return reader, nil
|
|
case 1:
|
|
errorText, err := ioutil.ReadAll(reader)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return nil, errors.New(string(errorText))
|
|
case 32:
|
|
continue
|
|
default:
|
|
return nil, fmt.Errorf("unexpected filler byte: %d", b)
|
|
}
|
|
}
|
|
}
|
|
|
|
// httpStreamResponse allows streaming a response, but still send an error.
|
|
type httpStreamResponse struct {
|
|
done chan error
|
|
block chan []byte
|
|
err error
|
|
}
|
|
|
|
// Write part of the the streaming response.
|
|
// Note that upstream errors are currently not forwarded, but may be in the future.
|
|
func (h *httpStreamResponse) Write(b []byte) (int, error) {
|
|
if len(b) == 0 || h.err != nil {
|
|
// Ignore 0 length blocks
|
|
return 0, h.err
|
|
}
|
|
tmp := make([]byte, len(b))
|
|
copy(tmp, b)
|
|
h.block <- tmp
|
|
return len(b), h.err
|
|
}
|
|
|
|
// CloseWithError will close the stream and return the specified error.
|
|
// This can be done several times, but only the first error will be sent.
|
|
// After calling this the stream should not be written to.
|
|
func (h *httpStreamResponse) CloseWithError(err error) {
|
|
if h.done == nil {
|
|
return
|
|
}
|
|
h.done <- err
|
|
h.err = err
|
|
// Indicates that the response is done.
|
|
<-h.done
|
|
h.done = nil
|
|
}
|
|
|
|
// streamHTTPResponse can be used to avoid timeouts with long storage
|
|
// operations, such as bitrot verification or data usage scanning.
|
|
// Every 10 seconds a space character is sent.
|
|
// The returned function should always be called to release resources.
|
|
// An optional error can be sent which will be picked as text only error,
|
|
// without its original type by the receiver.
|
|
// waitForHTTPStream should be used to the receiving side.
|
|
func streamHTTPResponse(w http.ResponseWriter) *httpStreamResponse {
|
|
doneCh := make(chan error)
|
|
blockCh := make(chan []byte)
|
|
h := httpStreamResponse{done: doneCh, block: blockCh}
|
|
go func() {
|
|
canWrite := true
|
|
write := func(b []byte) {
|
|
if canWrite {
|
|
n, err := w.Write(b)
|
|
if err != nil || n != len(b) {
|
|
canWrite = false
|
|
}
|
|
}
|
|
}
|
|
|
|
ticker := time.NewTicker(time.Second * 10)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
// Response not ready, write a filler byte.
|
|
write([]byte{32})
|
|
if canWrite {
|
|
w.(http.Flusher).Flush()
|
|
}
|
|
case err := <-doneCh:
|
|
if err != nil {
|
|
write([]byte{1})
|
|
write([]byte(err.Error()))
|
|
} else {
|
|
write([]byte{0})
|
|
}
|
|
close(doneCh)
|
|
return
|
|
case block := <-blockCh:
|
|
var tmp [5]byte
|
|
tmp[0] = 2
|
|
binary.LittleEndian.PutUint32(tmp[1:], uint32(len(block)))
|
|
write(tmp[:])
|
|
write(block)
|
|
if canWrite {
|
|
w.(http.Flusher).Flush()
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
return &h
|
|
}
|
|
|
|
var poolBuf8k = sync.Pool{
|
|
New: func() interface{} {
|
|
b := make([]byte, 8192)
|
|
return &b
|
|
},
|
|
}
|
|
|
|
// waitForHTTPStream will wait for responses where
|
|
// streamHTTPResponse has been used.
|
|
// The returned reader contains the payload and must be closed if no error is returned.
|
|
func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error {
|
|
var tmp [1]byte
|
|
// 8K copy buffer, reused for less allocs...
|
|
bufp := poolBuf8k.Get().(*[]byte)
|
|
buf := *bufp
|
|
defer poolBuf8k.Put(bufp)
|
|
for {
|
|
_, err := io.ReadFull(respBody, tmp[:])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Check if we have a response ready or a filler byte.
|
|
switch tmp[0] {
|
|
case 0:
|
|
// 0 is unbuffered, copy the rest.
|
|
_, err := io.CopyBuffer(w, respBody, buf)
|
|
if err == io.EOF {
|
|
return nil
|
|
}
|
|
return err
|
|
case 1:
|
|
errorText, err := ioutil.ReadAll(respBody)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return errors.New(string(errorText))
|
|
case 2:
|
|
// Block of data
|
|
var tmp [4]byte
|
|
_, err := io.ReadFull(respBody, tmp[:])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
length := binary.LittleEndian.Uint32(tmp[:])
|
|
_, err = io.CopyBuffer(w, io.LimitReader(respBody, int64(length)), buf)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
continue
|
|
case 32:
|
|
continue
|
|
default:
|
|
return fmt.Errorf("unexpected filler byte: %d", tmp[0])
|
|
}
|
|
}
|
|
}
|
|
|
|
// VerifyFileResp - VerifyFile()'s response.
|
|
type VerifyFileResp struct {
|
|
Err error
|
|
}
|
|
|
|
// VerifyFileHandler - Verify all part of file for bitrot errors.
|
|
func (s *storageRESTServer) VerifyFileHandler(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
return
|
|
}
|
|
volume := r.Form.Get(storageRESTVolume)
|
|
filePath := r.Form.Get(storageRESTFilePath)
|
|
|
|
if r.ContentLength < 0 {
|
|
s.writeErrorResponse(w, errInvalidArgument)
|
|
return
|
|
}
|
|
|
|
var fi FileInfo
|
|
if err := msgp.Decode(r.Body, &fi); err != nil {
|
|
s.writeErrorResponse(w, err)
|
|
return
|
|
}
|
|
|
|
setEventStreamHeaders(w)
|
|
encoder := gob.NewEncoder(w)
|
|
done := keepHTTPResponseAlive(w)
|
|
err := s.storage.VerifyFile(r.Context(), volume, filePath, fi)
|
|
done(nil)
|
|
vresp := &VerifyFileResp{}
|
|
if err != nil {
|
|
vresp.Err = StorageErr(err.Error())
|
|
}
|
|
encoder.Encode(vresp)
|
|
}
|
|
|
|
func checkDiskFatalErrs(errs []error) error {
|
|
// This returns a common error if all errors are
|
|
// same errors, then there is no point starting
|
|
// the server.
|
|
if countErrs(errs, errUnsupportedDisk) == len(errs) {
|
|
return errUnsupportedDisk
|
|
}
|
|
|
|
if countErrs(errs, errDiskAccessDenied) == len(errs) {
|
|
return errDiskAccessDenied
|
|
}
|
|
|
|
if countErrs(errs, errFileAccessDenied) == len(errs) {
|
|
return errDiskAccessDenied
|
|
}
|
|
|
|
if countErrs(errs, errDiskNotDir) == len(errs) {
|
|
return errDiskNotDir
|
|
}
|
|
|
|
if countErrs(errs, errFaultyDisk) == len(errs) {
|
|
return errFaultyDisk
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// A single function to write certain errors to be fatal
|
|
// or informative based on the `exit` flag, please look
|
|
// at each implementation of error for added hints.
|
|
//
|
|
// FIXME: This is an unusual function but serves its purpose for
|
|
// now, need to revist the overall erroring structure here.
|
|
// Do not like it :-(
|
|
func logFatalErrs(err error, endpoint Endpoint, exit bool) {
|
|
switch {
|
|
case errors.Is(err, errUnsupportedDisk):
|
|
var hint string
|
|
if endpoint.URL != nil {
|
|
hint = fmt.Sprintf("Disk '%s' does not support O_DIRECT flags, MinIO erasure coding requires filesystems with O_DIRECT support", endpoint.Path)
|
|
} else {
|
|
hint = "Disks do not support O_DIRECT flags, MinIO erasure coding requires filesystems with O_DIRECT support"
|
|
}
|
|
logger.Fatal(config.ErrUnsupportedBackend(err).Hint(hint), "Unable to initialize backend")
|
|
case errors.Is(err, errDiskNotDir):
|
|
var hint string
|
|
if endpoint.URL != nil {
|
|
hint = fmt.Sprintf("Disk '%s' is not a directory, MinIO erasure coding needs a directory", endpoint.Path)
|
|
} else {
|
|
hint = "Disks are not directories, MinIO erasure coding needs directories"
|
|
}
|
|
logger.Fatal(config.ErrUnableToWriteInBackend(err).Hint(hint), "Unable to initialize backend")
|
|
case errors.Is(err, errDiskAccessDenied):
|
|
// Show a descriptive error with a hint about how to fix it.
|
|
var username string
|
|
if u, err := user.Current(); err == nil {
|
|
username = u.Username
|
|
} else {
|
|
username = "<your-username>"
|
|
}
|
|
var hint string
|
|
if endpoint.URL != nil {
|
|
hint = fmt.Sprintf("Run the following command to add write permissions: `sudo chown -R %s %s && sudo chmod u+rxw %s`",
|
|
username, endpoint.Path, endpoint.Path)
|
|
} else {
|
|
hint = fmt.Sprintf("Run the following command to add write permissions: `sudo chown -R %s. <path> && sudo chmod u+rxw <path>`", username)
|
|
}
|
|
if !exit {
|
|
logger.LogIf(GlobalContext, fmt.Errorf("disk is not writable %s, %s", endpoint, hint))
|
|
} else {
|
|
logger.Fatal(config.ErrUnableToWriteInBackend(err).Hint(hint), "Unable to initialize backend")
|
|
}
|
|
case errors.Is(err, errFaultyDisk):
|
|
if !exit {
|
|
logger.LogIf(GlobalContext, fmt.Errorf("disk is faulty at %s, please replace the drive - disk will be offline", endpoint))
|
|
} else {
|
|
logger.Fatal(err, "Unable to initialize backend")
|
|
}
|
|
case errors.Is(err, errDiskFull):
|
|
if !exit {
|
|
logger.LogIf(GlobalContext, fmt.Errorf("disk is already full at %s, incoming I/O will fail - disk will be offline", endpoint))
|
|
} else {
|
|
logger.Fatal(err, "Unable to initialize backend")
|
|
}
|
|
default:
|
|
if !exit {
|
|
logger.LogIf(GlobalContext, fmt.Errorf("disk returned an unexpected error at %s, please investigate - disk will be offline (%w)", endpoint, err))
|
|
} else {
|
|
logger.Fatal(err, "Unable to initialize backend")
|
|
}
|
|
}
|
|
}
|
|
|
|
// StatInfoFile returns file stat info.
|
|
func (s *storageRESTServer) StatInfoFile(w http.ResponseWriter, r *http.Request) {
|
|
if !s.IsValid(w, r) {
|
|
return
|
|
}
|
|
volume := r.Form.Get(storageRESTVolume)
|
|
filePath := r.Form.Get(storageRESTFilePath)
|
|
glob := r.Form.Get(storageRESTGlob)
|
|
done := keepHTTPResponseAlive(w)
|
|
stats, err := s.storage.StatInfoFile(r.Context(), volume, filePath, glob == "true")
|
|
done(err)
|
|
if err != nil {
|
|
return
|
|
}
|
|
for _, si := range stats {
|
|
msgp.Encode(w, &si)
|
|
}
|
|
}
|
|
|
|
// registerStorageRPCRouter - register storage rpc router.
|
|
func registerStorageRESTHandlers(router *mux.Router, endpointServerPools EndpointServerPools) {
|
|
storageDisks := make([][]*xlStorage, len(endpointServerPools))
|
|
for poolIdx, ep := range endpointServerPools {
|
|
storageDisks[poolIdx] = make([]*xlStorage, len(ep.Endpoints))
|
|
}
|
|
var wg sync.WaitGroup
|
|
for poolIdx, ep := range endpointServerPools {
|
|
for setIdx, endpoint := range ep.Endpoints {
|
|
if !endpoint.IsLocal {
|
|
continue
|
|
}
|
|
wg.Add(1)
|
|
go func(poolIdx, setIdx int, endpoint Endpoint) {
|
|
defer wg.Done()
|
|
var err error
|
|
storageDisks[poolIdx][setIdx], err = newXLStorage(endpoint)
|
|
if err != nil {
|
|
// if supported errors don't fail, we proceed to
|
|
// printing message and moving forward.
|
|
logFatalErrs(err, endpoint, false)
|
|
}
|
|
}(poolIdx, setIdx, endpoint)
|
|
}
|
|
}
|
|
wg.Wait()
|
|
|
|
for _, setDisks := range storageDisks {
|
|
for _, storage := range setDisks {
|
|
if storage == nil {
|
|
continue
|
|
}
|
|
|
|
endpoint := storage.Endpoint()
|
|
|
|
server := &storageRESTServer{storage: newXLStorageDiskIDCheck(storage)}
|
|
server.storage.SetDiskID(storage.diskID)
|
|
|
|
subrouter := router.PathPrefix(path.Join(storageRESTPrefix, endpoint.Path)).Subrouter()
|
|
|
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodHealth).HandlerFunc(httpTraceHdrs(server.HealthHandler))
|
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDiskInfo).HandlerFunc(httpTraceHdrs(server.DiskInfoHandler))
|
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodNSScanner).HandlerFunc(httpTraceHdrs(server.NSScannerHandler))
|
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodMakeVol).HandlerFunc(httpTraceHdrs(server.MakeVolHandler))
|
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodMakeVolBulk).HandlerFunc(httpTraceHdrs(server.MakeVolBulkHandler))
|
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodStatVol).HandlerFunc(httpTraceHdrs(server.StatVolHandler))
|
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVol).HandlerFunc(httpTraceHdrs(server.DeleteVolHandler))
|
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodListVols).HandlerFunc(httpTraceHdrs(server.ListVolsHandler))
|
|
|
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodAppendFile).HandlerFunc(httpTraceHdrs(server.AppendFileHandler))
|
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWriteAll).HandlerFunc(httpTraceHdrs(server.WriteAllHandler))
|
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWriteMetadata).HandlerFunc(httpTraceHdrs(server.WriteMetadataHandler))
|
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodUpdateMetadata).HandlerFunc(httpTraceHdrs(server.UpdateMetadataHandler))
|
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersion).HandlerFunc(httpTraceHdrs(server.DeleteVersionHandler))
|
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(httpTraceHdrs(server.ReadVersionHandler))
|
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodRenameData).HandlerFunc(httpTraceHdrs(server.RenameDataHandler))
|
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodCreateFile).HandlerFunc(httpTraceHdrs(server.CreateFileHandler))
|
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodCheckParts).HandlerFunc(httpTraceHdrs(server.CheckPartsHandler))
|
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadAll).HandlerFunc(httpTraceHdrs(server.ReadAllHandler))
|
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadFile).HandlerFunc(httpTraceHdrs(server.ReadFileHandler))
|
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadFileStream).HandlerFunc(httpTraceHdrs(server.ReadFileStreamHandler))
|
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodListDir).HandlerFunc(httpTraceHdrs(server.ListDirHandler))
|
|
|
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersions).HandlerFunc(httpTraceHdrs(server.DeleteVersionsHandler))
|
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteFile).HandlerFunc(httpTraceHdrs(server.DeleteFileHandler))
|
|
|
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodRenameFile).HandlerFunc(httpTraceHdrs(server.RenameFileHandler))
|
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodVerifyFile).HandlerFunc(httpTraceHdrs(server.VerifyFileHandler))
|
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalkDir).HandlerFunc(httpTraceHdrs(server.WalkDirHandler))
|
|
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodStatInfoFile).HandlerFunc(httpTraceHdrs(server.StatInfoFile))
|
|
}
|
|
}
|
|
}
|