// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <http://www.gnu.org/licenses/>.

package cmd

import (
	"context"
	"sync"
	"sync/atomic"
	"time"

	"github.com/minio/madmin-go"
	"github.com/minio/minio/internal/logger"
)

const (
	mrfInfoResetInterval = 10 * time.Second
	mrfOpsQueueSize      = 10000
)

// partialOperation is a successful upload/delete of an object
// but not written in all disks (having quorum)
type partialOperation struct {
	bucket    string
	object    string
	versionID string
	size      int64
	setIndex  int
	poolIndex int
}

type setInfo struct {
	index, pool int
}

// mrfState sncapsulates all the information
// related to the global background MRF.
type mrfState struct {
	ready int32 // ref: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
	_     int32 // For 64 bits alignment

	ctx       context.Context
	objectAPI ObjectLayer

	mu                sync.Mutex
	opCh              chan partialOperation
	pendingOps        map[partialOperation]setInfo
	setReconnectEvent chan setInfo

	itemsHealed  uint64
	bytesHealed  uint64
	pendingItems uint64
	pendingBytes uint64

	triggeredAt time.Time
}

// Initialize healing MRF subsystem
func (m *mrfState) init(ctx context.Context, objAPI ObjectLayer) {
	m.mu.Lock()
	defer m.mu.Unlock()

	m.ctx = ctx
	m.objectAPI = objAPI
	m.opCh = make(chan partialOperation, mrfOpsQueueSize)
	m.pendingOps = make(map[partialOperation]setInfo)
	m.setReconnectEvent = make(chan setInfo)

	go globalMRFState.maintainMRFList()
	go globalMRFState.healRoutine()

	atomic.StoreInt32(&m.ready, 1)
}

func (m *mrfState) initialized() bool {
	return atomic.LoadInt32(&m.ready) != 0
}

// Add a partial S3 operation (put/delete) when one or more disks are offline.
func (m *mrfState) addPartialOp(op partialOperation) {
	if !m.initialized() {
		return
	}

	select {
	case m.opCh <- op:
	default:
	}
}

// Receive the new set (disk) reconnection event
func (m *mrfState) newSetReconnected(pool, set int) {
	if !m.initialized() {
		return
	}

	idler := time.NewTimer(100 * time.Millisecond)
	defer idler.Stop()

	select {
	case m.setReconnectEvent <- setInfo{index: set, pool: pool}:
	case <-idler.C:
	}
}

// Get current MRF stats of the last MRF activity
func (m *mrfState) getCurrentMRFRoundInfo() madmin.MRFStatus {
	m.mu.Lock()
	triggeredAt := m.triggeredAt
	itemsHealed := m.itemsHealed
	bytesHealed := m.bytesHealed
	pendingItems := m.pendingItems
	pendingBytes := m.pendingBytes
	m.mu.Unlock()

	if pendingItems == 0 {
		return madmin.MRFStatus{}
	}

	return madmin.MRFStatus{
		Started:     triggeredAt,
		ItemsHealed: itemsHealed,
		BytesHealed: bytesHealed,
		TotalItems:  itemsHealed + pendingItems,
		TotalBytes:  bytesHealed + pendingBytes,
	}
}

// maintainMRFList gathers the list of successful partial uploads
// from all underlying er.sets and puts them in a global map which
// should not have more than 10000 entries.
func (m *mrfState) maintainMRFList() {
	for fOp := range m.opCh {
		m.mu.Lock()
		if len(m.pendingOps) > mrfOpsQueueSize {
			m.mu.Unlock()
			continue
		}

		m.pendingOps[fOp] = setInfo{index: fOp.setIndex, pool: fOp.poolIndex}
		m.pendingItems++
		if fOp.size > 0 {
			m.pendingBytes += uint64(fOp.size)
		}

		m.mu.Unlock()
	}
}

// Reset current MRF stats
func (m *mrfState) resetMRFInfoIfNoPendingOps() {
	m.mu.Lock()
	defer m.mu.Unlock()

	if m.pendingItems > 0 {
		return
	}

	m.itemsHealed = 0
	m.bytesHealed = 0
	m.pendingItems = 0
	m.pendingBytes = 0
	m.triggeredAt = time.Time{}
}

// healRoutine listens to new disks reconnection events and
// issues healing requests for queued objects belonging to the
// corresponding erasure set
func (m *mrfState) healRoutine() {
	idler := time.NewTimer(mrfInfoResetInterval)
	defer idler.Stop()

	mrfHealingOpts := madmin.HealOpts{
		ScanMode: madmin.HealNormalScan,
		Remove:   healDeleteDangling,
	}

	for {
		idler.Reset(mrfInfoResetInterval)
		select {
		case <-m.ctx.Done():
			return
		case <-idler.C:
			m.resetMRFInfoIfNoPendingOps()
		case setInfo := <-m.setReconnectEvent:
			// Get the list of objects related the er.set
			// to which the connected disk belongs.
			var mrfOperations []partialOperation
			m.mu.Lock()
			for k, v := range m.pendingOps {
				if v == setInfo {
					mrfOperations = append(mrfOperations, k)
				}
			}
			m.mu.Unlock()

			if len(mrfOperations) == 0 {
				continue
			}

			m.mu.Lock()
			m.triggeredAt = time.Now().UTC()
			m.mu.Unlock()

			// Heal objects
			for _, u := range mrfOperations {
				if _, err := m.objectAPI.HealObject(m.ctx, u.bucket, u.object, u.versionID, mrfHealingOpts); err != nil {
					// If not deleted, assume they failed.
					logger.LogIf(m.ctx, err)
				} else {
					m.mu.Lock()
					m.itemsHealed++
					m.pendingItems--
					m.bytesHealed += uint64(u.size)
					m.pendingBytes -= uint64(u.size)
					m.mu.Unlock()
				}

				m.mu.Lock()
				delete(m.pendingOps, u)
				m.mu.Unlock()
			}

			waitForLowHTTPReq()
		}
	}
}

// Initialize healing MRF
func initHealMRF(ctx context.Context, obj ObjectLayer) {
	globalMRFState.init(ctx, obj)
}