mirror of
https://github.com/minio/minio.git
synced 2025-02-02 17:35:58 -05:00
716316f711
Ref #3229 After review with @abperiasamy we decided to remove all the unnecessary options - MINIO_BROWSER (Implemented as a security feature but now deemed obsolete since even if blocking access to MINIO_BROWSER, s3 API port is open) - MINIO_CACHE_EXPIRY (Defaults to 72h) - MINIO_MAXCONN (No one used this option and we don't test this) - MINIO_ENABLE_FSMETA (Enable FSMETA all the time) Remove --ignore-disks option - this option was implemented when XL layer would initialize the backend disks and heal them automatically to disallow XL accidentally using the root partition itself this option was introduced. This behavior has been changed XL no longer automatically initializes `format.json` a HEAL is controlled activity, so ignore-disks is not useful anymore. This change also addresses the problems of our documentation going forward and keeps things simple. This patch brings in reduction of options and defaulting them to a valid known inputs. This patch also serves as a guideline of limiting many ways to do the same thing.
269 lines
7.5 KiB
Go
269 lines
7.5 KiB
Go
/*
|
|
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
"io"
|
|
"net/rpc"
|
|
"path"
|
|
"time"
|
|
|
|
router "github.com/gorilla/mux"
|
|
"github.com/minio/minio/pkg/disk"
|
|
)
|
|
|
|
// Storage server implements rpc primitives to facilitate exporting a
|
|
// disk over a network.
|
|
type storageServer struct {
|
|
storage StorageAPI
|
|
path string
|
|
timestamp time.Time
|
|
}
|
|
|
|
/// Auth operations
|
|
|
|
// Login - login handler.
|
|
func (s *storageServer) LoginHandler(args *RPCLoginArgs, reply *RPCLoginReply) error {
|
|
jwt, err := newJWT(defaultInterNodeJWTExpiry)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err = jwt.Authenticate(args.Username, args.Password); err != nil {
|
|
return err
|
|
}
|
|
token, err := jwt.GenerateToken(args.Username)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
reply.Token = token
|
|
reply.Timestamp = time.Now().UTC()
|
|
reply.ServerVersion = Version
|
|
return nil
|
|
}
|
|
|
|
/// Storage operations handlers.
|
|
|
|
// DiskInfoHandler - disk info handler is rpc wrapper for DiskInfo operation.
|
|
func (s *storageServer) DiskInfoHandler(args *GenericArgs, reply *disk.Info) error {
|
|
if !isRPCTokenValid(args.Token) {
|
|
return errInvalidToken
|
|
}
|
|
info, err := s.storage.DiskInfo()
|
|
*reply = info
|
|
return err
|
|
}
|
|
|
|
/// Volume operations handlers.
|
|
|
|
// MakeVolHandler - make vol handler is rpc wrapper for MakeVol operation.
|
|
func (s *storageServer) MakeVolHandler(args *GenericVolArgs, reply *GenericReply) error {
|
|
if !isRPCTokenValid(args.Token) {
|
|
return errInvalidToken
|
|
}
|
|
return s.storage.MakeVol(args.Vol)
|
|
}
|
|
|
|
// ListVolsHandler - list vols handler is rpc wrapper for ListVols operation.
|
|
func (s *storageServer) ListVolsHandler(args *GenericArgs, reply *ListVolsReply) error {
|
|
if !isRPCTokenValid(args.Token) {
|
|
return errInvalidToken
|
|
}
|
|
vols, err := s.storage.ListVols()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
reply.Vols = vols
|
|
return nil
|
|
}
|
|
|
|
// StatVolHandler - stat vol handler is a rpc wrapper for StatVol operation.
|
|
func (s *storageServer) StatVolHandler(args *GenericVolArgs, reply *VolInfo) error {
|
|
if !isRPCTokenValid(args.Token) {
|
|
return errInvalidToken
|
|
}
|
|
volInfo, err := s.storage.StatVol(args.Vol)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*reply = volInfo
|
|
return nil
|
|
}
|
|
|
|
// DeleteVolHandler - delete vol handler is a rpc wrapper for
|
|
// DeleteVol operation.
|
|
func (s *storageServer) DeleteVolHandler(args *GenericVolArgs, reply *GenericReply) error {
|
|
if !isRPCTokenValid(args.Token) {
|
|
return errInvalidToken
|
|
}
|
|
return s.storage.DeleteVol(args.Vol)
|
|
}
|
|
|
|
/// File operations
|
|
|
|
// StatFileHandler - stat file handler is rpc wrapper to stat file.
|
|
func (s *storageServer) StatFileHandler(args *StatFileArgs, reply *FileInfo) error {
|
|
if !isRPCTokenValid(args.Token) {
|
|
return errInvalidToken
|
|
}
|
|
fileInfo, err := s.storage.StatFile(args.Vol, args.Path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*reply = fileInfo
|
|
return nil
|
|
}
|
|
|
|
// ListDirHandler - list directory handler is rpc wrapper to list dir.
|
|
func (s *storageServer) ListDirHandler(args *ListDirArgs, reply *[]string) error {
|
|
if !isRPCTokenValid(args.Token) {
|
|
return errInvalidToken
|
|
}
|
|
entries, err := s.storage.ListDir(args.Vol, args.Path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*reply = entries
|
|
return nil
|
|
}
|
|
|
|
// ReadAllHandler - read all handler is rpc wrapper to read all storage API.
|
|
func (s *storageServer) ReadAllHandler(args *ReadFileArgs, reply *[]byte) error {
|
|
if !isRPCTokenValid(args.Token) {
|
|
return errInvalidToken
|
|
}
|
|
buf, err := s.storage.ReadAll(args.Vol, args.Path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
*reply = buf
|
|
return nil
|
|
}
|
|
|
|
// ReadFileHandler - read file handler is rpc wrapper to read file.
|
|
func (s *storageServer) ReadFileHandler(args *ReadFileArgs, reply *[]byte) (err error) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
// Recover any panic and return ErrCacheFull.
|
|
err = bytes.ErrTooLarge
|
|
}
|
|
}() // Do not crash the server.
|
|
if !isRPCTokenValid(args.Token) {
|
|
return errInvalidToken
|
|
}
|
|
// Allocate the requested buffer from the client.
|
|
*reply = make([]byte, args.Size)
|
|
var n int64
|
|
n, err = s.storage.ReadFile(args.Vol, args.Path, args.Offset, *reply)
|
|
// Sending an error over the rpc layer, would cause unmarshalling to fail. In situations
|
|
// when we have short read i.e `io.ErrUnexpectedEOF` treat it as good condition and copy
|
|
// the buffer properly.
|
|
if err == io.ErrUnexpectedEOF {
|
|
// Reset to nil as good condition.
|
|
err = nil
|
|
}
|
|
*reply = (*reply)[0:n]
|
|
return err
|
|
}
|
|
|
|
// PrepareFileHandler - prepare file handler is rpc wrapper to prepare file.
|
|
func (s *storageServer) PrepareFileHandler(args *PrepareFileArgs, reply *GenericReply) error {
|
|
if !isRPCTokenValid(args.Token) {
|
|
return errInvalidToken
|
|
}
|
|
return s.storage.PrepareFile(args.Vol, args.Path, args.Size)
|
|
}
|
|
|
|
// AppendFileHandler - append file handler is rpc wrapper to append file.
|
|
func (s *storageServer) AppendFileHandler(args *AppendFileArgs, reply *GenericReply) error {
|
|
if !isRPCTokenValid(args.Token) {
|
|
return errInvalidToken
|
|
}
|
|
return s.storage.AppendFile(args.Vol, args.Path, args.Buffer)
|
|
}
|
|
|
|
// DeleteFileHandler - delete file handler is rpc wrapper to delete file.
|
|
func (s *storageServer) DeleteFileHandler(args *DeleteFileArgs, reply *GenericReply) error {
|
|
if !isRPCTokenValid(args.Token) {
|
|
return errInvalidToken
|
|
}
|
|
return s.storage.DeleteFile(args.Vol, args.Path)
|
|
}
|
|
|
|
// RenameFileHandler - rename file handler is rpc wrapper to rename file.
|
|
func (s *storageServer) RenameFileHandler(args *RenameFileArgs, reply *GenericReply) error {
|
|
if !isRPCTokenValid(args.Token) {
|
|
return errInvalidToken
|
|
}
|
|
return s.storage.RenameFile(args.SrcVol, args.SrcPath, args.DstVol, args.DstPath)
|
|
}
|
|
|
|
// TryInitHandler - wake up storage server.
|
|
func (s *storageServer) TryInitHandler(args *GenericArgs, reply *GenericReply) error {
|
|
if !isRPCTokenValid(args.Token) {
|
|
return errInvalidToken
|
|
}
|
|
go func() {
|
|
globalWakeupCh <- struct{}{}
|
|
}()
|
|
*reply = GenericReply{}
|
|
return nil
|
|
}
|
|
|
|
// Initialize new storage rpc.
|
|
func newRPCServer(srvConfig serverCmdConfig) (servers []*storageServer, err error) {
|
|
for _, ep := range srvConfig.endpoints {
|
|
// e.g server:/mnt/disk1
|
|
if isLocalStorage(ep) {
|
|
// Get the posix path.
|
|
path := getPath(ep)
|
|
var storage StorageAPI
|
|
storage, err = newPosix(path)
|
|
if err != nil && err != errDiskNotFound {
|
|
return nil, err
|
|
}
|
|
servers = append(servers, &storageServer{
|
|
storage: storage,
|
|
path: path,
|
|
})
|
|
}
|
|
}
|
|
return servers, nil
|
|
}
|
|
|
|
// registerStorageRPCRouter - register storage rpc router.
|
|
func registerStorageRPCRouters(mux *router.Router, srvCmdConfig serverCmdConfig) error {
|
|
// Initialize storage rpc servers for every disk that is hosted on this node.
|
|
storageRPCs, err := newRPCServer(srvCmdConfig)
|
|
if err != nil {
|
|
return traceError(err)
|
|
}
|
|
|
|
// Create a unique route for each disk exported from this node.
|
|
for _, stServer := range storageRPCs {
|
|
storageRPCServer := rpc.NewServer()
|
|
err = storageRPCServer.RegisterName("Storage", stServer)
|
|
if err != nil {
|
|
return traceError(err)
|
|
}
|
|
// Add minio storage routes.
|
|
storageRouter := mux.PathPrefix(reservedBucket).Subrouter()
|
|
storageRouter.Path(path.Join("/storage", stServer.path)).Handler(storageRPCServer)
|
|
}
|
|
return nil
|
|
}
|