mirror of
https://github.com/minio/minio.git
synced 2025-11-09 21:49:46 -05:00
Add admin file inspector (#12635)
Download files from *any* bucket/path as an encrypted zip file. The key is included in the response but can be separated so zip and the key doesn't have to be sent on the same channel. Requires https://github.com/minio/pkg/pull/6
This commit is contained in:
@@ -19,6 +19,7 @@ package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
crand "crypto/rand"
|
||||
"crypto/subtle"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
@@ -37,6 +38,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/klauspost/compress/zip"
|
||||
"github.com/minio/kes"
|
||||
"github.com/minio/madmin-go"
|
||||
"github.com/minio/minio/internal/auth"
|
||||
@@ -49,6 +51,7 @@ import (
|
||||
"github.com/minio/minio/internal/logger/message/log"
|
||||
iampolicy "github.com/minio/pkg/iam/policy"
|
||||
xnet "github.com/minio/pkg/net"
|
||||
"github.com/secure-io/sio-go"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -1951,3 +1954,104 @@ func checkConnection(endpointStr string, timeout time.Duration) error {
|
||||
defer xhttp.DrainBody(resp.Body)
|
||||
return nil
|
||||
}
|
||||
|
||||
// getRawDataer provides an interface for getting raw FS files.
|
||||
type getRawDataer interface {
|
||||
GetRawData(ctx context.Context, volume, file string, fn func(r io.Reader, host string, disk string, filename string, size int64, modtime time.Time) error) error
|
||||
}
|
||||
|
||||
// InspectDataHandler - GET /minio/admin/v3/inspect-data
|
||||
// ----------
|
||||
// Download file from all nodes in a zip format
|
||||
func (a adminAPIHandlers) InspectDataHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := newContext(r, w, "InspectData")
|
||||
|
||||
// Validate request signature.
|
||||
_, adminAPIErr := checkAdminRequestAuth(ctx, r, iampolicy.InspectDataAction, "")
|
||||
if adminAPIErr != ErrNone {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(adminAPIErr), r.URL)
|
||||
return
|
||||
}
|
||||
defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
|
||||
|
||||
o, ok := newObjectLayerFn().(getRawDataer)
|
||||
if !ok {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
volume := r.URL.Query().Get("volume")
|
||||
file := r.URL.Query().Get("file")
|
||||
if len(volume) == 0 {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInvalidBucketName), r.URL)
|
||||
return
|
||||
}
|
||||
if len(file) == 0 {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInvalidRequest), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
var key [32]byte
|
||||
// MUST use crypto/rand
|
||||
n, err := crand.Read(key[:])
|
||||
if err != nil || n != len(key) {
|
||||
logger.LogIf(ctx, err)
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL)
|
||||
return
|
||||
}
|
||||
stream, err := sio.AES_256_GCM.Stream(key[:])
|
||||
if err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL)
|
||||
return
|
||||
}
|
||||
// Zero nonce, we only use each key once, and 32 bytes is plenty.
|
||||
nonce := make([]byte, stream.NonceSize())
|
||||
encw := stream.EncryptWriter(w, nonce, nil)
|
||||
|
||||
defer encw.Close()
|
||||
|
||||
// Write a version for making *incompatible* changes.
|
||||
// The AdminClient will reject any version it does not know.
|
||||
w.Write([]byte{1})
|
||||
|
||||
// Write key first (without encryption)
|
||||
_, err = w.Write(key[:])
|
||||
if err != nil {
|
||||
writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrInternalError), r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
// Initialize a zip writer which will provide a zipped content
|
||||
// of profiling data of all nodes
|
||||
zipWriter := zip.NewWriter(encw)
|
||||
defer zipWriter.Close()
|
||||
|
||||
err = o.GetRawData(ctx, volume, file, func(r io.Reader, host, disk, filename string, size int64, modtime time.Time) error {
|
||||
// Prefix host+disk
|
||||
filename = path.Join(host, disk, filename)
|
||||
header, zerr := zip.FileInfoHeader(dummyFileInfo{
|
||||
name: filename,
|
||||
size: size,
|
||||
mode: 0600,
|
||||
modTime: modtime,
|
||||
isDir: false,
|
||||
sys: nil,
|
||||
})
|
||||
if zerr != nil {
|
||||
logger.LogIf(ctx, zerr)
|
||||
return nil
|
||||
}
|
||||
header.Method = zip.Deflate
|
||||
zwriter, zerr := zipWriter.CreateHeader(header)
|
||||
if zerr != nil {
|
||||
logger.LogIf(ctx, zerr)
|
||||
return nil
|
||||
}
|
||||
if _, err = io.Copy(zwriter, r); err != nil {
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
logger.LogIf(ctx, err)
|
||||
}
|
||||
|
||||
@@ -54,6 +54,7 @@ func registerAdminRouter(router *mux.Router, enableConfigOps, enableIAMOps bool)
|
||||
|
||||
// Info operations
|
||||
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/info").HandlerFunc(httpTraceAll(adminAPI.ServerInfoHandler))
|
||||
adminRouter.Methods(http.MethodGet).Path(adminVersion+"/inspect-data").HandlerFunc(httpTraceHdrs(adminAPI.InspectDataHandler)).Queries("volume", "{volume:.*}", "file", "{file:.*}")
|
||||
|
||||
// StorageInfo operations
|
||||
adminRouter.Methods(http.MethodGet).Path(adminVersion + "/storageinfo").HandlerFunc(httpTraceAll(adminAPI.StorageInfoHandler))
|
||||
|
||||
@@ -147,6 +147,39 @@ func (z *erasureServerPools) GetDisksID(ids ...string) []StorageAPI {
|
||||
return res
|
||||
}
|
||||
|
||||
// GetRawData will return all files with a given raw path to the callback.
|
||||
// Errors are ignored, only errors from the callback are returned.
|
||||
// For now only direct file paths are supported.
|
||||
func (z *erasureServerPools) GetRawData(ctx context.Context, volume, file string, fn func(r io.Reader, host string, disk string, filename string, size int64, modtime time.Time) error) error {
|
||||
for _, s := range z.serverPools {
|
||||
for _, disks := range s.erasureDisks {
|
||||
for i, disk := range disks {
|
||||
if disk == OfflineDisk {
|
||||
continue
|
||||
}
|
||||
si, err := disk.StatInfoFile(ctx, volume, file)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
r, err := disk.ReadFileStream(ctx, volume, file, 0, si.Size)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
defer r.Close()
|
||||
did, err := disk.GetDiskID()
|
||||
if err != nil {
|
||||
did = fmt.Sprintf("disk-%d", i)
|
||||
}
|
||||
err = fn(r, disk.Hostname(), did, pathJoin(volume, file), si.Size, si.ModTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (z *erasureServerPools) SetDriveCounts() []int {
|
||||
setDriveCounts := make([]int, len(z.serverPools))
|
||||
for i := range z.serverPools {
|
||||
|
||||
17
cmd/fs-v1.go
17
cmd/fs-v1.go
@@ -27,6 +27,7 @@ import (
|
||||
"os"
|
||||
"os/user"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -1509,3 +1510,19 @@ func (fs *FSObjects) TransitionObject(ctx context.Context, bucket, object string
|
||||
func (fs *FSObjects) RestoreTransitionedObject(ctx context.Context, bucket, object string, opts ObjectOptions) error {
|
||||
return NotImplemented{}
|
||||
}
|
||||
|
||||
// GetRawData returns raw file data to the callback.
|
||||
// Errors are ignored, only errors from the callback are returned.
|
||||
// For now only direct file paths are supported.
|
||||
func (fs *FSObjects) GetRawData(ctx context.Context, volume, file string, fn func(r io.Reader, host string, disk string, filename string, size int64, modtime time.Time) error) error {
|
||||
f, err := os.Open(filepath.Join(fs.fsPath, volume, file))
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
defer f.Close()
|
||||
st, err := f.Stat()
|
||||
if err != nil || st.IsDir() {
|
||||
return nil
|
||||
}
|
||||
return fn(f, "fs", fs.fsUUID, file, st.Size(), st.ModTime())
|
||||
}
|
||||
|
||||
@@ -291,3 +291,10 @@ func (d *naughtyDisk) VerifyFile(ctx context.Context, volume, path string, fi Fi
|
||||
}
|
||||
return d.disk.VerifyFile(ctx, volume, path, fi)
|
||||
}
|
||||
|
||||
func (d *naughtyDisk) StatInfoFile(ctx context.Context, volume, path string) (stat StatInfo, err error) {
|
||||
if err := d.calcError(); err != nil {
|
||||
return stat, err
|
||||
}
|
||||
return d.disk.StatInfoFile(ctx, volume, path)
|
||||
}
|
||||
|
||||
@@ -74,6 +74,7 @@ type StorageAPI interface {
|
||||
CheckFile(ctx context.Context, volume string, path string) (err error)
|
||||
Delete(ctx context.Context, volume string, path string, recursive bool) (err error)
|
||||
VerifyFile(ctx context.Context, volume, path string, fi FileInfo) error
|
||||
StatInfoFile(ctx context.Context, volume, path string) (stat StatInfo, err error)
|
||||
|
||||
// Write all data, syncs the data to disk.
|
||||
// Should be used for smaller payloads.
|
||||
|
||||
@@ -684,6 +684,23 @@ func (client *storageRESTClient) VerifyFile(ctx context.Context, volume, path st
|
||||
return toStorageErr(verifyResp.Err)
|
||||
}
|
||||
|
||||
func (client *storageRESTClient) StatInfoFile(ctx context.Context, volume, path string) (stat StatInfo, err error) {
|
||||
values := make(url.Values)
|
||||
values.Set(storageRESTVolume, volume)
|
||||
values.Set(storageRESTFilePath, path)
|
||||
respBody, err := client.call(ctx, storageRESTMethodStatInfoFile, values, nil, -1)
|
||||
if err != nil {
|
||||
return stat, err
|
||||
}
|
||||
defer xhttp.DrainBody(respBody)
|
||||
respReader, err := waitForHTTPResponse(respBody)
|
||||
if err != nil {
|
||||
return stat, err
|
||||
}
|
||||
err = stat.DecodeMsg(msgpNewReader(respReader))
|
||||
return stat, err
|
||||
}
|
||||
|
||||
// Close - marks the client as closed.
|
||||
func (client *storageRESTClient) Close() error {
|
||||
client.restClient.Close()
|
||||
|
||||
@@ -52,6 +52,7 @@ const (
|
||||
storageRESTMethodRenameFile = "/renamefile"
|
||||
storageRESTMethodVerifyFile = "/verifyfile"
|
||||
storageRESTMethodWalkDir = "/walkdir"
|
||||
storageRESTMethodStatInfoFile = "/statfile"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -1038,6 +1038,23 @@ func logFatalErrs(err error, endpoint Endpoint, exit bool) {
|
||||
}
|
||||
}
|
||||
|
||||
// StatInfoFile returns file stat info.
|
||||
func (s *storageRESTServer) StatInfoFile(w http.ResponseWriter, r *http.Request) {
|
||||
if !s.IsValid(w, r) {
|
||||
return
|
||||
}
|
||||
vars := mux.Vars(r)
|
||||
volume := vars[storageRESTVolume]
|
||||
filePath := vars[storageRESTFilePath]
|
||||
done := keepHTTPResponseAlive(w)
|
||||
si, err := s.storage.StatInfoFile(r.Context(), volume, filePath)
|
||||
done(err)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
msgp.Encode(w, &si)
|
||||
}
|
||||
|
||||
// registerStorageRPCRouter - register storage rpc router.
|
||||
func registerStorageRESTHandlers(router *mux.Router, endpointServerPools EndpointServerPools) {
|
||||
storageDisks := make([][]*xlStorage, len(endpointServerPools))
|
||||
@@ -1129,6 +1146,8 @@ func registerStorageRESTHandlers(router *mux.Router, endpointServerPools Endpoin
|
||||
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodWalkDir).HandlerFunc(httpTraceHdrs(server.WalkDirHandler)).
|
||||
Queries(restQueries(storageRESTVolume, storageRESTDirPath, storageRESTRecursive)...)
|
||||
subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodStatInfoFile).HandlerFunc(httpTraceHdrs(server.StatInfoFile)).
|
||||
Queries(restQueries(storageRESTVolume, storageRESTFilePath)...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,12 +32,13 @@ func _() {
|
||||
_ = x[storageMetricUpdateMetadata-21]
|
||||
_ = x[storageMetricReadVersion-22]
|
||||
_ = x[storageMetricReadAll-23]
|
||||
_ = x[storageMetricLast-24]
|
||||
_ = x[storageStatInfoFile-24]
|
||||
_ = x[storageMetricLast-25]
|
||||
}
|
||||
|
||||
const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsCheckFileDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadAllLast"
|
||||
const _storageMetric_name = "MakeVolBulkMakeVolListVolsStatVolDeleteVolWalkDirListDirReadFileAppendFileCreateFileReadFileStreamRenameFileRenameDataCheckPartsCheckFileDeleteDeleteVersionsVerifyFileWriteAllDeleteVersionWriteMetadataUpdateMetadataReadVersionReadAllstorageStatInfoFileLast"
|
||||
|
||||
var _storageMetric_index = [...]uint8{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 137, 143, 157, 167, 175, 188, 201, 215, 226, 233, 237}
|
||||
var _storageMetric_index = [...]uint16{0, 11, 18, 26, 33, 42, 49, 56, 64, 74, 84, 98, 108, 118, 128, 137, 143, 157, 167, 175, 188, 201, 215, 226, 233, 252, 256}
|
||||
|
||||
func (i storageMetric) String() string {
|
||||
if i >= storageMetric(len(_storageMetric_index)-1) {
|
||||
|
||||
@@ -58,6 +58,7 @@ const (
|
||||
storageMetricUpdateMetadata
|
||||
storageMetricReadVersion
|
||||
storageMetricReadAll
|
||||
storageStatInfoFile
|
||||
|
||||
// .... add more
|
||||
|
||||
@@ -611,6 +612,22 @@ func (p *xlStorageDiskIDCheck) ReadAll(ctx context.Context, volume string, path
|
||||
return p.storage.ReadAll(ctx, volume, path)
|
||||
}
|
||||
|
||||
func (p *xlStorageDiskIDCheck) StatInfoFile(ctx context.Context, volume, path string) (stat StatInfo, err error) {
|
||||
defer p.updateStorageMetrics(storageStatInfoFile, volume, path)()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return StatInfo{}, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if err = p.checkDiskStale(); err != nil {
|
||||
return StatInfo{}, err
|
||||
}
|
||||
|
||||
return p.storage.StatInfoFile(ctx, volume, path)
|
||||
}
|
||||
|
||||
func storageTrace(s storageMetric, startTime time.Time, duration time.Duration, path string) madmin.TraceInfo {
|
||||
return madmin.TraceInfo{
|
||||
TraceType: madmin.TraceStorage,
|
||||
|
||||
@@ -2284,3 +2284,32 @@ func (s *xlStorage) VerifyFile(ctx context.Context, volume, path string, fi File
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *xlStorage) StatInfoFile(ctx context.Context, volume, path string) (stat StatInfo, err error) {
|
||||
volumeDir, err := s.getVolDir(volume)
|
||||
if err != nil {
|
||||
return stat, err
|
||||
}
|
||||
|
||||
// Stat a volume entry.
|
||||
if err = Access(volumeDir); err != nil {
|
||||
if osIsNotExist(err) {
|
||||
return stat, errVolumeNotFound
|
||||
} else if isSysErrIO(err) {
|
||||
return stat, errFaultyDisk
|
||||
} else if osIsPermission(err) {
|
||||
return stat, errVolumeAccessDenied
|
||||
}
|
||||
return stat, err
|
||||
}
|
||||
filePath := pathJoin(volumeDir, path)
|
||||
if err := checkPathLength(filePath); err != nil {
|
||||
return stat, err
|
||||
}
|
||||
st, _ := Lstat(filePath)
|
||||
if st == nil {
|
||||
return stat, errPathNotFound
|
||||
}
|
||||
|
||||
return StatInfo{ModTime: st.ModTime(), Size: st.Size()}, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user