// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <http://www.gnu.org/licenses/>.

package cmd

import (
	"context"
	"encoding/binary"
	"errors"
	"fmt"
	"io"
	"os"
	"path/filepath"
	"sync"
	"time"

	"github.com/minio/minio/internal/logger"
	"github.com/tinylib/msgp/msgp"
)

//go:generate msgp -file $GOFILE -unexported
//msgp:ignore tierJournal tierDiskJournal walkfn

type tierDiskJournal struct {
	sync.RWMutex
	diskPath string
	file     *os.File // active journal file
}

type tierJournal struct {
	*tierDiskJournal // for processing legacy journal entries
	*tierMemJournal  // for processing new journal entries
}

type jentry struct {
	ObjName   string `msg:"obj"`
	VersionID string `msg:"vid"`
	TierName  string `msg:"tier"`
}

const (
	tierJournalVersion = 1
	tierJournalHdrLen  = 2 // 2 bytes
)

var (
	errUnsupportedJournalVersion = errors.New("unsupported pending deletes journal version")
)

func newTierDiskJournal() *tierDiskJournal {
	return &tierDiskJournal{}
}

// initTierDeletionJournal intializes an in-memory journal built using a
// buffered channel for new journal entries. It also initializes the on-disk
// journal only to process existing journal entries made from previous versions.
func initTierDeletionJournal(ctx context.Context) (*tierJournal, error) {
	j := &tierJournal{
		tierMemJournal:  newTierMemJoural(1000),
		tierDiskJournal: newTierDiskJournal(),
	}
	for _, diskPath := range globalEndpoints.LocalDisksPaths() {
		j.diskPath = diskPath
		if err := os.MkdirAll(filepath.Dir(j.JournalPath()), os.FileMode(0700)); err != nil {
			logger.LogIf(ctx, err)
			continue
		}

		err := j.Open()
		if err != nil {
			logger.LogIf(ctx, err)
			continue
		}

		go j.deletePending(ctx)  // for existing journal entries from previous MinIO versions
		go j.processEntries(ctx) // for newer journal entries circa free-versions
		return j, nil
	}

	return nil, errors.New("no local disk found")
}

// rotate rotates the journal. If a read-only journal already exists it does
// nothing. Otherwise renames the active journal to a read-only journal and
// opens a new active journal.
func (jd *tierDiskJournal) rotate() error {
	// Do nothing if a read-only journal file already exists.
	if _, err := os.Stat(jd.ReadOnlyPath()); err == nil {
		return nil
	}
	// Close the active journal if present.
	jd.Close()
	// Open a new active journal for subsequent journalling.
	return jd.Open()
}

type walkFn func(ctx context.Context, objName, rvID, tierName string) error

func (jd *tierDiskJournal) ReadOnlyPath() string {
	return filepath.Join(jd.diskPath, minioMetaBucket, "ilm", "deletion-journal.ro.bin")
}

func (jd *tierDiskJournal) JournalPath() string {
	return filepath.Join(jd.diskPath, minioMetaBucket, "ilm", "deletion-journal.bin")
}

func (jd *tierDiskJournal) WalkEntries(ctx context.Context, fn walkFn) {
	err := jd.rotate()
	if err != nil {
		logger.LogIf(ctx, fmt.Errorf("tier-journal: failed to rotate pending deletes journal %s", err))
		return
	}

	ro, err := jd.OpenRO()
	switch {
	case errors.Is(err, os.ErrNotExist):
		return // No read-only journal to process; nothing to do.
	case err != nil:
		logger.LogIf(ctx, fmt.Errorf("tier-journal: failed open read-only journal for processing %s", err))
		return
	}
	defer ro.Close()
	mr := msgp.NewReader(ro)

	done := false
	for {
		var entry jentry
		err := entry.DecodeMsg(mr)
		if errors.Is(err, io.EOF) {
			done = true
			break
		}
		if err != nil {
			logger.LogIf(ctx, fmt.Errorf("tier-journal: failed to decode journal entry %s", err))
			break
		}
		err = fn(ctx, entry.ObjName, entry.VersionID, entry.TierName)
		if err != nil && !isErrObjectNotFound(err) {
			logger.LogIf(ctx, fmt.Errorf("tier-journal: failed to delete transitioned object %s from %s due to %s", entry.ObjName, entry.TierName, err))
			// We add the entry into the active journal to try again
			// later.
			jd.addEntry(entry)
		}
	}
	if done {
		os.Remove(jd.ReadOnlyPath())
	}
}

func deleteObjectFromRemoteTier(ctx context.Context, objName, rvID, tierName string) error {
	w, err := globalTierConfigMgr.getDriver(tierName)
	if err != nil {
		return err
	}
	err = w.Remove(ctx, objName, remoteVersionID(rvID))
	if err != nil {
		return err
	}
	return nil
}

func (jd *tierDiskJournal) deletePending(ctx context.Context) {
	ticker := time.NewTicker(30 * time.Minute)
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			jd.WalkEntries(ctx, deleteObjectFromRemoteTier)

		case <-ctx.Done():
			jd.Close()
			return
		}
	}
}

func (jd *tierDiskJournal) addEntry(je jentry) error {
	// Open journal if it hasn't been
	err := jd.Open()
	if err != nil {
		return err
	}

	b, err := je.MarshalMsg(nil)
	if err != nil {
		return err
	}

	jd.Lock()
	defer jd.Unlock()
	_, err = jd.file.Write(b)
	if err != nil {
		jd.file = nil // reset to allow subsequent reopen when file/disk is available.
	}
	return err
}

// Close closes the active journal and renames it to read-only for pending
// deletes processing. Note: calling Close on a closed journal is a no-op.
func (jd *tierDiskJournal) Close() error {
	jd.Lock()
	defer jd.Unlock()
	if jd.file == nil { // already closed
		return nil
	}

	var (
		f   *os.File
		fi  os.FileInfo
		err error
	)
	// Setting j.file to nil
	f, jd.file = jd.file, f
	if fi, err = f.Stat(); err != nil {
		return err
	}
	defer f.Close()
	// Skip renaming active journal if empty.
	if fi.Size() == tierJournalHdrLen {
		return nil
	}

	jPath := jd.JournalPath()
	jroPath := jd.ReadOnlyPath()
	// Rotate active journal to perform pending deletes.
	err = os.Rename(jPath, jroPath)
	if err != nil {
		return err
	}

	return nil
}

// Open opens a new active journal. Note: calling Open on an opened journal is a
// no-op.
func (jd *tierDiskJournal) Open() error {
	jd.Lock()
	defer jd.Unlock()
	if jd.file != nil { // already open
		return nil
	}

	var err error
	jd.file, err = os.OpenFile(jd.JournalPath(), os.O_APPEND|os.O_CREATE|os.O_WRONLY|writeMode, 0666)
	if err != nil {
		return err
	}

	// write journal version header if active journal is empty
	fi, err := jd.file.Stat()
	if err != nil {
		return err
	}
	if fi.Size() == 0 {
		var data [tierJournalHdrLen]byte
		binary.LittleEndian.PutUint16(data[:], tierJournalVersion)
		_, err = jd.file.Write(data[:])
		if err != nil {
			return err
		}
	}
	return nil
}

func (jd *tierDiskJournal) OpenRO() (io.ReadCloser, error) {
	file, err := os.Open(jd.ReadOnlyPath())
	if err != nil {
		return nil, err
	}

	// read journal version header
	var data [tierJournalHdrLen]byte
	if _, err := io.ReadFull(file, data[:]); err != nil {
		return nil, err
	}

	switch binary.LittleEndian.Uint16(data[:]) {
	case tierJournalVersion:
		return file, nil
	default:
		return nil, errUnsupportedJournalVersion
	}
}

// jentryV1 represents the entry in the journal before RemoteVersionID was
// added. It remains here for use in tests for the struct element addition.
type jentryV1 struct {
	ObjName  string `msg:"obj"`
	TierName string `msg:"tier"`
}