// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <http://www.gnu.org/licenses/>.

package cmd

import (
	"bytes"
	"context"
	"encoding/base64"
	"encoding/binary"
	"fmt"
	"net/http"
	"path"
	"strings"
	"sync"

	"github.com/minio/madmin-go/v2"
	"github.com/minio/minio/internal/crypto"
	"github.com/minio/minio/internal/hash"
	"github.com/minio/minio/internal/kms"
)

//go:generate msgp -file $GOFILE

var (
	errTierMissingCredentials = AdminError{
		Code:       "XMinioAdminTierMissingCredentials",
		Message:    "Specified remote credentials are empty",
		StatusCode: http.StatusForbidden,
	}

	errTierBackendInUse = AdminError{
		Code:       "XMinioAdminTierBackendInUse",
		Message:    "Specified remote tier is already in use",
		StatusCode: http.StatusConflict,
	}

	errTierTypeUnsupported = AdminError{
		Code:       "XMinioAdminTierTypeUnsupported",
		Message:    "Specified tier type is unsupported",
		StatusCode: http.StatusBadRequest,
	}

	errTierBackendNotEmpty = AdminError{
		Code:       "XMinioAdminTierBackendNotEmpty",
		Message:    "Specified remote backend is not empty",
		StatusCode: http.StatusBadRequest,
	}
)

const (
	tierConfigFile    = "tier-config.bin"
	tierConfigFormat  = 1
	tierConfigV1      = 1
	tierConfigVersion = 2

	minioHotTier = "STANDARD"
)

// tierConfigPath refers to remote tier config object name
var tierConfigPath = path.Join(minioConfigPrefix, tierConfigFile)

// TierConfigMgr holds the collection of remote tiers configured in this deployment.
type TierConfigMgr struct {
	sync.RWMutex `msg:"-"`
	drivercache  map[string]WarmBackend `msg:"-"`

	Tiers map[string]madmin.TierConfig `json:"tiers"`
}

// IsTierValid returns true if there exists a remote tier by name tierName,
// otherwise returns false.
func (config *TierConfigMgr) IsTierValid(tierName string) bool {
	config.RLock()
	defer config.RUnlock()
	_, valid := config.isTierNameInUse(tierName)
	return valid
}

// isTierNameInUse returns tier type and true if there exists a remote tier by
// name tierName, otherwise returns madmin.Unsupported and false. N B this
// function is meant for internal use, where the caller is expected to take
// appropriate locks.
func (config *TierConfigMgr) isTierNameInUse(tierName string) (madmin.TierType, bool) {
	if t, ok := config.Tiers[tierName]; ok {
		return t.Type, true
	}
	return madmin.Unsupported, false
}

// Add adds tier to config if it passes all validations.
func (config *TierConfigMgr) Add(ctx context.Context, tier madmin.TierConfig) error {
	config.Lock()
	defer config.Unlock()

	// check if tier name is in all caps
	tierName := tier.Name
	if tierName != strings.ToUpper(tierName) {
		return errTierNameNotUppercase
	}

	// check if tier name already in use
	if _, exists := config.isTierNameInUse(tierName); exists {
		return errTierAlreadyExists
	}

	d, err := newWarmBackend(ctx, tier)
	if err != nil {
		return err
	}
	// Check if warmbackend is in use by other MinIO tenants
	inUse, err := d.InUse(ctx)
	if err != nil {
		return err
	}
	if inUse {
		return errTierBackendInUse
	}

	config.Tiers[tierName] = tier
	config.drivercache[tierName] = d

	return nil
}

// Remove removes tier if it is empty.
func (config *TierConfigMgr) Remove(ctx context.Context, tier string) error {
	d, err := config.getDriver(tier)
	if err != nil {
		return err
	}
	if inuse, err := d.InUse(ctx); err != nil {
		return err
	} else if inuse {
		return errTierBackendNotEmpty
	} else {
		config.Lock()
		delete(config.Tiers, tier)
		delete(config.drivercache, tier)
		config.Unlock()
	}
	return nil
}

// Verify verifies if tier's config is valid by performing all supported
// operations on the corresponding warmbackend.
func (config *TierConfigMgr) Verify(ctx context.Context, tier string) error {
	d, err := config.getDriver(tier)
	if err != nil {
		return err
	}
	return checkWarmBackend(ctx, d)
}

// Empty returns if tier targets are empty
func (config *TierConfigMgr) Empty() bool {
	if config == nil {
		return true
	}
	return len(config.ListTiers()) == 0
}

// ListTiers lists remote tiers configured in this deployment.
func (config *TierConfigMgr) ListTiers() []madmin.TierConfig {
	config.RLock()
	defer config.RUnlock()

	var tierCfgs []madmin.TierConfig
	for _, tier := range config.Tiers {
		// This makes a local copy of tier config before
		// passing a reference to it.
		tier := tier.Clone()
		tierCfgs = append(tierCfgs, tier)
	}
	return tierCfgs
}

// Edit replaces the credentials of the remote tier specified by tierName with creds.
func (config *TierConfigMgr) Edit(ctx context.Context, tierName string, creds madmin.TierCreds) error {
	config.Lock()
	defer config.Unlock()

	// check if tier by this name exists
	tierType, exists := config.isTierNameInUse(tierName)
	if !exists {
		return errTierNotFound
	}

	cfg := config.Tiers[tierName]
	switch tierType {
	case madmin.S3:
		if (creds.AccessKey == "" || creds.SecretKey == "") && !creds.AWSRole {
			return errTierMissingCredentials
		}
		switch {
		case creds.AWSRole:
			cfg.S3.AWSRole = true
		default:
			cfg.S3.AccessKey = creds.AccessKey
			cfg.S3.SecretKey = creds.SecretKey
		}
	case madmin.Azure:
		if creds.SecretKey == "" {
			return errTierMissingCredentials
		}
		cfg.Azure.AccountKey = creds.SecretKey

	case madmin.GCS:
		if creds.CredsJSON == nil {
			return errTierMissingCredentials
		}
		cfg.GCS.Creds = base64.URLEncoding.EncodeToString(creds.CredsJSON)
	case madmin.MinIO:
		if creds.AccessKey == "" || creds.SecretKey == "" {
			return errTierMissingCredentials
		}
		cfg.MinIO.AccessKey = creds.AccessKey
		cfg.MinIO.SecretKey = creds.SecretKey
	}

	d, err := newWarmBackend(ctx, cfg)
	if err != nil {
		return err
	}
	config.Tiers[tierName] = cfg
	config.drivercache[tierName] = d
	return nil
}

// Bytes returns msgpack encoded config with format and version headers.
func (config *TierConfigMgr) Bytes() ([]byte, error) {
	config.RLock()
	defer config.RUnlock()
	data := make([]byte, 4, config.Msgsize()+4)

	// Initialize the header.
	binary.LittleEndian.PutUint16(data[0:2], tierConfigFormat)
	binary.LittleEndian.PutUint16(data[2:4], tierConfigVersion)

	// Marshal the tier config
	return config.MarshalMsg(data)
}

// getDriver returns a warmBackend interface object initialized with remote tier config matching tierName
func (config *TierConfigMgr) getDriver(tierName string) (d WarmBackend, err error) {
	config.Lock()
	defer config.Unlock()

	var ok bool
	// Lookup in-memory drivercache
	d, ok = config.drivercache[tierName]
	if ok {
		return d, nil
	}

	// Initialize driver from tier config matching tierName
	t, ok := config.Tiers[tierName]
	if !ok {
		return nil, errTierNotFound
	}
	d, err = newWarmBackend(context.TODO(), t)
	if err != nil {
		return nil, err
	}
	config.drivercache[tierName] = d
	return d, nil
}

// configReader returns a PutObjReader and ObjectOptions needed to save config
// using a PutObject API. PutObjReader encrypts json encoded tier configurations
// if KMS is enabled, otherwise simply yields the json encoded bytes as is.
// Similarly, ObjectOptions value depends on KMS' status.
func (config *TierConfigMgr) configReader() (*PutObjReader, *ObjectOptions, error) {
	b, err := config.Bytes()
	if err != nil {
		return nil, nil, err
	}

	payloadSize := int64(len(b))
	br := bytes.NewReader(b)
	hr, err := hash.NewReader(br, payloadSize, "", "", payloadSize)
	if err != nil {
		return nil, nil, err
	}
	if GlobalKMS == nil {
		return NewPutObjReader(hr), &ObjectOptions{}, nil
	}

	// Note: Local variables with names ek, oek, etc are named inline with
	// acronyms defined here -
	// https://github.com/minio/minio/blob/master/docs/security/README.md#acronyms

	// Encrypt json encoded tier configurations
	metadata := make(map[string]string)
	encBr, oek, err := newEncryptReader(context.Background(), hr, crypto.S3, "", nil, minioMetaBucket, tierConfigPath, metadata, kms.Context{})
	if err != nil {
		return nil, nil, err
	}

	info := ObjectInfo{
		Size: payloadSize,
	}
	encSize := info.EncryptedSize()
	encHr, err := hash.NewReader(encBr, encSize, "", "", encSize)
	if err != nil {
		return nil, nil, err
	}

	pReader, err := NewPutObjReader(hr).WithEncryption(encHr, &oek)
	if err != nil {
		return nil, nil, err
	}
	opts := &ObjectOptions{
		UserDefined: metadata,
		MTime:       UTCNow(),
	}

	return pReader, opts, nil
}

// Reload updates config by reloading remote tier config from config store.
func (config *TierConfigMgr) Reload(ctx context.Context, objAPI ObjectLayer) error {
	newConfig, err := loadTierConfig(ctx, objAPI)
	switch err {
	case nil:
		break
	case errConfigNotFound: // nothing to reload
		return nil
	default:
		return err
	}

	config.Lock()
	defer config.Unlock()
	// Reset drivercache built using current config
	for k := range config.drivercache {
		delete(config.drivercache, k)
	}
	// Remove existing tier configs
	for k := range config.Tiers {
		delete(config.Tiers, k)
	}
	// Copy over the new tier configs
	for tier, cfg := range newConfig.Tiers {
		config.Tiers[tier] = cfg
	}

	return nil
}

// Save saves tier configuration onto objAPI
func (config *TierConfigMgr) Save(ctx context.Context, objAPI ObjectLayer) error {
	if objAPI == nil {
		return errServerNotInitialized
	}

	pr, opts, err := globalTierConfigMgr.configReader()
	if err != nil {
		return err
	}

	_, err = objAPI.PutObject(ctx, minioMetaBucket, tierConfigPath, pr, *opts)
	return err
}

// NewTierConfigMgr - creates new tier configuration manager,
func NewTierConfigMgr() *TierConfigMgr {
	return &TierConfigMgr{
		drivercache: make(map[string]WarmBackend),
		Tiers:       make(map[string]madmin.TierConfig),
	}
}

// loadTierConfig loads remote tier configuration from objAPI.
func loadTierConfig(ctx context.Context, objAPI ObjectLayer) (*TierConfigMgr, error) {
	if objAPI == nil {
		return nil, errServerNotInitialized
	}

	data, err := readConfig(ctx, objAPI, tierConfigPath)
	if err != nil {
		return nil, err
	}

	if len(data) <= 4 {
		return nil, fmt.Errorf("tierConfigInit: no data")
	}

	// Read header
	switch format := binary.LittleEndian.Uint16(data[0:2]); format {
	case tierConfigFormat:
	default:
		return nil, fmt.Errorf("tierConfigInit: unknown format: %d", format)
	}

	cfg := NewTierConfigMgr()
	switch version := binary.LittleEndian.Uint16(data[2:4]); version {
	case tierConfigV1, tierConfigVersion:
		if _, decErr := cfg.UnmarshalMsg(data[4:]); decErr != nil {
			return nil, decErr
		}
	default:
		return nil, fmt.Errorf("tierConfigInit: unknown version: %d", version)
	}

	return cfg, nil
}

// Reset clears remote tier configured and clears tier driver cache.
func (config *TierConfigMgr) Reset() {
	config.Lock()
	for k := range config.drivercache {
		delete(config.drivercache, k)
	}
	for k := range config.Tiers {
		delete(config.Tiers, k)
	}
	config.Unlock()
}

// Init initializes tier configuration reading from objAPI
func (config *TierConfigMgr) Init(ctx context.Context, objAPI ObjectLayer) error {
	return config.Reload(ctx, objAPI)
}